La programación reactiva es un paradigma enfocado en el trabajo con flujos de datos finitos o infinitos de manera asíncrona. Su concepción y evolución ha ido ligada a la publicación del Reactive Manifesto, que establecía las bases de los sistemas reactivos, los cuales deben ser:
- Responsivos: aseguran la calidad del servicio cumpliendo unos tiempos de respuesta establecidos.
- Resilientes: se mantienen responsivos incluso cuando se enfrentan a situaciones de error.
- Elásticos: se mantienen responsivos incluso ante aumentos en la carga de trabajo.
- Orientados a mensajes: minimizan el acoplamiento entre componentes al establecer interacciones basadas en el intercambio de mensajes de manera asíncrona.
La motivación detrás de este nuevo paradigma procede de la necesidad de responder a las limitaciones de escalado presentes en los modelos de desarrollo actuales, que se caracterizan por su desaprovechamiento del uso de la CPU debido al I/O, el sobreuso de memoria (enormes thread pools) y la ineficiencia de las interacciones bloqueantes.
El diagrama siguiente muestra una interacción típica entre microservicios: Service1 tiene que invocar dos servicios Service2 y Service3 independientes. La programación actual no permite la paralelización de las peticiones ni la composición de los resultados de una manera sencilla y eficiente.
El modelo de programación reactiva ha evolucionado de manera significativa desde su concepción en 2010. Las librerías que lo implementan se clasifican en generaciones de acuerdo a su grado de madurez:
- Generación 0:
java.util.Observable/Observer
proporcionaban la base de uso del patrón Observer del Gang of Four. Tienen los inconvenientes de su simplicidad y falta de opciones de composición. - 1ª Generación: en 2010 Microsoft publica RX.NET, que en 2013 sería portado a entorno Java a través de la librería RxJava. Evolucionó el concepto de Observable/Observer incorporando la composición de operaciones, pero presentaba limitaciones arquitectónicas.
- 2ª Generación: se solucionan los problemas de backpressure y se introducen dos nuevas interfaces:
Subscriber
yProducer
. - 3ª y 4ª Generación: se caracterizan principalmente por haber eliminado la incompatibilidad entre las múltiples librerías del ecosistema reactive a través de la adopción de la especificación Reactive Streams, que fija dos clases base
Publisher
ySubscriber
. Entran dentro de esta generación proyectos como RxJava 2.x, Project Reactor y Akka Streams.
El uso de Reactive Streams es similar al del patrón Iterator (incluyendo Java 8 Streams) con una clara diferencia, el primero es push-based
mientras que el segundo es pull-based
:
Evento | Iterable (push) | Reactive (pull) |
Obtener dato | next() | onNext(Object data) |
Error | throws Exception | onError(Exception) |
Fin | !hasNext() | onComplete() |
Iterable
delega en el desarrollador las llamadas para obtener los siguientes elementos. Por contra, los Publisher
de Reactive Streams son los encargados de notificar al Subscriber
la llegada de nuevos elementos de la secuencia.
Adicionalmente las librerías reactivas se han ocupado de mejorar los siguientes aspectos:
- Composición y legibilidad: hasta el momento las única manera de trabajar con operaciones asíncronas en entorno Java consistía en el uso de
Future
, callbacks, o, desde Java 8,CompletableFuture
. Todas ellas presentan el gran inconveniente de dificultar la comprensión del código y la composición de operaciones, pudiendo fácilmente degenerar hacia un callback hell. - Operadores: permiten aplicar transformaciones sobre los flujos de datos. Si bien no forman parte de la especificación Reactive Streams, todas las librerías que la implementan los soportan de manera completamente compatible.
- Backpressure: debido al flujo push-based, se pueden dar situaciones donde un
Publisher
genere más elementos de los que unSubscriber
puede consumir. Para evitarlo se han establecido los siguientes mecanismos:- Los
Subscriber
pueden indicar el número de datos que quieren o pueden procesar mediante la operaciónsubscriber.request(n)
, de manera que elPublisher
nunca les enviará más den
elementos. - Los
Publisher
pueden aplicar diferentes operaciones para evitar saturar a los subscriptores lentos (buffers, descarte de datos, etc.).
- Los
Qué vas as ver en esta entrada
APIs Java de programación reactiva
RxJava 2
Esta librería, y su versión 1.x fueron las pioneras en el desarrollo reactivo Java. Se encuentran completamente integradas en frameworks como Spring MVC, Spring Cloud y Netflix OSS.
Project Reactor
Fue concebida con la implicación del equipo responsable de RxJava 2 por lo que comparten gran parte de la base arquitectónica. Su principal ventaja es que al ser parte de Pivotal ha sido la elegida como fundación del futuro Spring 5 WebFlux Framework.
Este API introduce los tipos Flux
y Mono
como implementaciones de Publisher
, los cuales generan series de 0…N y 0…1 elementos respectivamente.
Flux:
Mono:
El siguiente ejemplo muestra la creación y suscripción a una secuencia de números aleatorios generados cada segundo. Como se puede ver se trata de un API sencilla e intuitiva que le resultará familiar a cualquiera que haya trabajado con Java 8 Streams:
Flux<Double> randomGenerator = Flux.range(1, 4) .delayMillis(1000) .map(i -> Math.random()) .log(); randomGenerator.subscribe(number -> logger.info("Got random number {}", number);
Output:
[main] INFO reactor.Flux.Peek.1 - onSubscribe(FluxPeek.PeekSubscriber) [main] INFO reactor.Flux.Peek.1 - request(unbounded) [timer-1] INFO reactor.Flux.Peek.1 - onNext(0.05361127029313428) [timer-1] INFO es.profile.spring5.playground.reactive.service.RandomNumbersServiceImpl - Got random number 0.05361127029313428 [timer-1] INFO reactor.Flux.Peek.1 - onNext(0.711925912668467) [timer-1] INFO es.profile.spring5.playground.reactive.service.RandomNumbersServiceImpl - Got random number 0.711925912668467 [timer-1] INFO reactor.Flux.Peek.1 - onNext(0.8631082308572313) [timer-1] INFO es.profile.spring5.playground.reactive.service.RandomNumbersServiceImpl - Got random number 0.8631082308572313 [timer-1] INFO reactor.Flux.Peek.1 - onNext(0.2797390339259114) [timer-1] INFO es.profile.spring5.playground.reactive.service.RandomNumbersServiceImpl - Got random number 0.2797390339259114 [timer-1] INFO reactor.Flux.Peek.1 - onComplete()
Los logs de ejecución muestran los diferentes eventos:
- Suscripción.
- Solicitud de elementos sin límite.
- Generación de elementos en un hilo
timer-1
. - Entrega de los elementos al suscriptor en el hilo
main
. - Fin de la secuencia señalizado por el evento
onComplete()
.
El futuro del desarrollo web
Compañías como Netflix llevan años aplicando la programación reactiva para mejorar el rendimiento de sus aplicaciones, superando las limitaciones nativas del lenguaje Java. RxJava les proporcionó la pieza que necesitaban para realizar la orquestación de microservicios, la implementación del patrón circuit breaker y demás mecanismos que permiten a sus desarrollos adoptar los principios descritos por el manifiesto reactivo.
La siguiente iteración de Spring Framework, en su versión 5.0, introduce como principal novedad este modelo de desarrollo en su core web. Este stack ejecuta sobre una capa construida directamente sobre HTTP y, aunque es compatible con contenedores Servlet 3.1+, está diseñado para ejecutarse sobre plataformas reactivas como RxNetty o Reactor Netty. Se ofrecen dos variantes:
- Modelo basado en anotaciones: reutiliza las archiconocidas anotaciones de Spring MVC:
@RestController public class PersonController { @Autowired private final PersonRepository repository; @PostMapping("/person") Mono<Void> create(@RequestBody Publisher<Person> personStream){ return this.repository.save(personStream).then(); } }
- Modelo de programación funcional: se sirve de un estilo de programación basado en lambdas:
RouterFunctions .route(GET("/person/{id}").and(accept(APPLICATION_JSON)), request -> { int personId = Integer.valueOf(request.pathVariable("id")); Mono<ServerResponse> notFound = ServerResponse.notFound().build(); return repository.findOne(personId) .then(person -> ServerResponse.ok().body(Mono.just(person), Person.class)) .otherwiseIfEmpty(notFound); });
El futuro próximo del desarrollo web pasará sin duda por la adopción de estas técnicas, conllevando necesariamente la modificación de las actuales arquitecturas de microservicios bloqueantes hacia arquitecturas completamente reactivas (Reactive all the Way Down), desde la capa de persistencia hasta la presentación. A pesar de sus múltiples ventajas debemos tener presentes los retos que este enfoque plantea, como la modificación del estilo de programación, los riesgos derivados de la falta de experiencia o la dificultad añadida en el diagnóstico de errores.