¿Qué es la Programación Reactiva? Una introducción

La programación reactiva es un paradigma enfocado en el trabajo con flujos de datos finitos o infinitos de manera asíncrona. Su concepción y evolución ha ido ligada a la publicación del Reactive Manifesto, que establecía las bases de los sistemas reactivos, los cuales deben ser:

  • Responsivos: aseguran la calidad del servicio cumpliendo unos tiempos de respuesta establecidos.
  • Resilientes: se mantienen responsivos incluso cuando se enfrentan a situaciones de error.
  • Elásticos: se mantienen responsivos incluso ante aumentos en la carga de trabajo.
  • Orientados a mensajes: minimizan el acoplamiento entre componentes al establecer interacciones basadas en el intercambio de mensajes de manera asíncrona.

La motivación detrás de este nuevo paradigma procede de la necesidad de responder a las limitaciones de escalado presentes en los modelos de desarrollo actuales, que se caracterizan por su desaprovechamiento del uso de la CPU debido al I/O, el sobreuso de memoria (enormes thread pools) y la ineficiencia de las interacciones bloqueantes.

El diagrama siguiente muestra una interacción típica entre microservicios: Service1 tiene que invocar dos servicios Service2 y Service3 independientes. La programación actual no permite la paralelización de las peticiones ni la composición de los resultados de una manera sencilla y eficiente.

Diagrama de interacción entre microservicios

El modelo de programación reactiva ha evolucionado de manera significativa desde su concepción en 2010. Las librerías que lo implementan se clasifican en generaciones de acuerdo a su grado de madurez:

  • Generación 0: java.util.Observable/Observer proporcionaban la base de uso del patrón Observer del Gang of Four. Tienen los inconvenientes de su simplicidad y falta de opciones de composición.
  • 1ª Generación: en 2010 Microsoft publica RX.NET, que en 2013 sería portado a entorno Java a través de la librería RxJava. Evolucionó el concepto de Observable/Observer incorporando la composición de operaciones, pero presentaba limitaciones arquitectónicas.
  • 2ª Generación: se solucionan los problemas de backpressure y se introducen dos nuevas interfaces: Subscriber y Producer.
  • 3ª y 4ª Generación: se caracterizan principalmente por haber eliminado la incompatibilidad entre las múltiples librerías del ecosistema reactive a través de la adopción de la especificación Reactive Streams, que fija dos clases base Publisher y Subscriber. Entran dentro de esta generación proyectos como RxJava 2.x, Project Reactor y Akka Streams.

El uso de Reactive Streams es similar al del patrón Iterator (incluyendo Java 8 Streams) con una clara diferencia, el primero es push-based mientras que el segundo es pull-based:

EventoIterable (push)Reactive (pull)
Obtener datonext()onNext(Object data)
Errorthrows ExceptiononError(Exception)
Fin!hasNext()onComplete()

Iterable delega en el desarrollador las llamadas para obtener los siguientes elementos. Por contra, los Publisher de Reactive Streams son los encargados de notificar al Subscriber la llegada de nuevos elementos de la secuencia.

Adicionalmente las librerías reactivas se han ocupado de mejorar los siguientes aspectos:

  • Composición y legibilidad: hasta el momento las única manera de trabajar con operaciones asíncronas en entorno Java consistía en el uso de Future, callbacks, o, desde Java 8, CompletableFuture. Todas ellas presentan el gran inconveniente de dificultar la comprensión del código y la composición de operaciones, pudiendo fácilmente degenerar hacia un callback hell.
  • Operadores: permiten aplicar transformaciones sobre los flujos de datos. Si bien no forman parte de la especificación Reactive Streams, todas las librerías que la implementan los soportan de manera completamente compatible.
  • Backpressure: debido al flujo push-based, se pueden dar situaciones donde un Publisher genere más elementos de los que un Subscriber puede consumir. Para evitarlo se han establecido los siguientes mecanismos:
    • Los Subscriber pueden indicar el número de datos que quieren o pueden procesar mediante la operación subscriber.request(n), de manera que el Publisher nunca les enviará más de n elementos.
    • Los Publisher pueden aplicar diferentes operaciones para evitar saturar a los subscriptores lentos (buffers, descarte de datos, etc.).

APIs Java de programación reactiva

RxJava 2

Esta librería, y su versión 1.x fueron las pioneras en el desarrollo reactivo Java. Se encuentran completamente integradas en frameworks como Spring MVC, Spring Cloud y Netflix OSS.

Project Reactor

Fue concebida con la implicación del equipo responsable de RxJava 2 por lo que comparten gran parte de la base arquitectónica. Su principal ventaja es que al ser parte de Pivotal ha sido la elegida como fundación del futuro Spring 5 WebFlux Framework.

Este API introduce los tipos Flux y Mono como implementaciones de Publisher, los cuales generan series de 0…N y 0…1 elementos respectivamente.

Flux:

Flux - API Spring 5 WebFlux Framework

Mono:

Mono - API Spring 5 WebFlux Framework

El siguiente ejemplo muestra la creación y suscripción a una secuencia de números aleatorios generados cada segundo. Como se puede ver se trata de un API sencilla e intuitiva que le resultará familiar a cualquiera que haya trabajado con Java 8 Streams:

Flux<Double> randomGenerator = Flux.range(1, 4)
.delayMillis(1000)
.map(i -> Math.random())
.log();
randomGenerator.subscribe(number -> logger.info("Got random number {}", number);

Output:

1: [main] INFO reactor.Flux.Peek.1 - onSubscribe(FluxPeek.PeekSubscriber)
2: [main] INFO reactor.Flux.Peek.1 - request(unbounded)
3: [timer-1] INFO reactor.Flux.Peek.1 - onNext(0.05361127029313428)
4: [timer-1] INFO es.profile.spring5.playground.reactive.service.RandomNumbersServiceImpl - Got random number 0.05361127029313428
3: [timer-1] INFO reactor.Flux.Peek.1 - onNext(0.711925912668467)
4: [timer-1] INFO es.profile.spring5.playground.reactive.service.RandomNumbersServiceImpl - Got random number 0.711925912668467
3: [timer-1] INFO reactor.Flux.Peek.1 - onNext(0.8631082308572313)
4: [timer-1] INFO es.profile.spring5.playground.reactive.service.RandomNumbersServiceImpl - Got random number 0.8631082308572313
3: [timer-1] INFO reactor.Flux.Peek.1 - onNext(0.2797390339259114)
4: [timer-1] INFO es.profile.spring5.playground.reactive.service.RandomNumbersServiceImpl - Got random number 0.2797390339259114
5: [timer-1] INFO reactor.Flux.Peek.1 - onComplete()

Los logs de ejecución muestran los diferentes eventos:

  1. Suscripción.
  2. Solicitud de elementos sin límite.
  3. Generación de elementos en un hilo timer-1.
  4. Entrega de los elementos al suscriptor en el hilo main.
  5. Fin de la secuencia señalizado por el evento onComplete().

El futuro del desarrollo web

Compañías como Netflix llevan años aplicando la programación reactiva para mejorar el rendimiento de sus aplicaciones, superando las limitaciones nativas del lenguaje Java. RxJava les proporcionó la pieza que necesitaban para realizar la orquestación de microservicios, la implementación del patrón circuit breaker y demás mecanismos que permiten a sus desarrollos adoptar los principios descritos por el manifiesto reactivo.

La siguiente iteración de Spring Framework, en su versión 5.0, introduce como principal novedad este modelo de desarrollo en su core web. Este stack ejecuta sobre una capa construida directamente sobre HTTP y, aunque es compatible con contenedores Servlet 3.1+, está diseñado para ejecutarse sobre plataformas reactivas como RxNetty o Reactor Netty. Se ofrecen dos variantes:

  1. Modelo basado en anotaciones: reutiliza las archiconocidas anotaciones de Spring MVC:
    @RestController
    public class PersonController {
        @Autowired
        private final PersonRepository repository;
        @PostMapping("/person")
        Mono<Void> create(@RequestBody Publisher<Person> personStream){
             return this.repository.save(personStream).then();
        }
    }

  2. Modelo de programación funcional: se sirve de un estilo de programación basado en lambdas:

    RouterFunctions
        .route(GET("/person/{id}").and(accept(APPLICATION_JSON)), request -> {
             int personId = Integer.valueOf(request.pathVariable("id"));
             Mono<ServerResponse> notFound = ServerResponse.notFound().build();
             return repository.findOne(personId)
             .then(person -> ServerResponse.ok().body(Mono.just(person), Person.class))
             .otherwiseIfEmpty(notFound);
        });

El futuro próximo del desarrollo web pasará sin duda por la adopción de estas técnicas, conllevando necesariamente la modificación de las actuales arquitecturas de microservicios bloqueantes hacia arquitecturas completamente reactivas (Reactive all the Way Down), desde la capa de persistencia hasta la presentación. A pesar de sus múltiples ventajas debemos tener presentes los retos que este enfoque plantea, como la modificación del estilo de programación, los riesgos derivados de la falta de experiencia o la dificultad añadida en el diagnóstico de errores.

¿Te pareció interesante? Compártelo:
Share on Facebook
Facebook
Share on Google+
Google+
Tweet about this on Twitter
Twitter
Share on LinkedIn
Linkedin
Print this page
Print
Email this to someone
email

Jose Antonio Íñigo

Arquitecto Software en Profile. Soy un apasionado de la tecnología, con especial interés en el diseño e implantación de arquitecturas distribuidas en sistemas cloud.