¡Compártelo!

Aprende a integrar Spring y RabittMQ usando Spring Cloud Stream

Spring Boot es uno de los frameworks más populares en la actualidad y, dada esta popularidad, existen muchas implementaciones, como Redis, MongoDB, etc. Esto nos permite hacer aplicaciones rápidas y funcionales en unos pocos pasos. Este tutorial tiene como fin mostrar un ejemplo básico de cómo publicar y consumir mensajes usando Spring Cloud Stream y RabbitMQ.

Herramientas

Para este ejemplo, usare Spring Boot en su versión 2.2.6.RELEASE, Maven 3.6.3 y RabbitMQ Manager en su versión 3.8.5. Para este último, usaré una imagen de Docker para facilitar el ejemplo. Pero si quieres, puedes instalar esta herramienta, no hay problema. 

Prerrequisitos

Configuración RabbitMQ 

Como mencioné anteriormente, para este ejemplo usaré una imagen de Docker para poder tener un servidor de RabbitMQ disponible en mi máquina local. Pero si tú lo instalaste, puedes omitir este paso.

Para crear un contenedor con RabbitMQ, necesitas ejecutar el siguiente comando en tu terminal:

docker run -d --name rabbit -p 5672:5672 -p 5673:5673 -p 15672:15672 -e
RABBITMQ_DEFAULT_USER=someuser -e
RABBITMQ_DEFAULT_PASS=somepass rabbitmq:3.8.5-management-alpine

Servicio publicador

Este será el servicio encargado de enviar todos los mensajes que queramos en RabbitMQ mediante colas, las cuales más adelante explicaremos. 

Para crear nuestro proyecto de Spring Boot tenemos dos caminos: uno es el asistente de nuestro IDE y el otro es mediante la página proporcionada por Spring, la cual usaré.

Integración de RabbitMQ con Spring

Una vez seleccionamos la configuración anterior, descargamos el proyecto generado por Spring.

Nota: En RabbitMQ el usuario y contraseña por defecto es “guest”, pero para este ejemplo usaremos como usuario “someuser” y como contraseña “somepass”, para modificar un poco los valores por defecto.

Inicialmente crearemos una clase, la cual llamaremos Person. Esta clase será el objeto que enviaremos a RabbitMQ para que sea consumida por algún subscriptor.

@Getter
@ToString
@AllArgsConstructor
public final class Person {
    private final String id;
    private final String name;
    private final int age;
}

Nota: Para que fuera más sencillo, he usado Lombok para generar los getters y un constructor por defecto con todos los parámetros. Conoce un poco más cómo funciona esta librería

En el archivo de configuración application.yml es necesario configurar algunas propiedades para que Spring Cloud Stream pueda comunicarse con RabbitMQ:

spring:
  application:
    name: producer-service
  cloud:
    stream:
      bindings:
        exampleChannel:
          destination: example.topic
      default:
        contentType: application/json
  rabbitmq:
    broker-url: tcp://127.0.0.1:5672
    username: someuser
    password: somepassword
    host: localhost

En este paso necesitas configurar los bindings, los cuales serán los canales con los que se podrá conectar Spring con RabbitMQ. Si estos canales no existen, Spring los creará de manera automática.

public interface ExampleOutputChannel {
    String OUTPUT = "exampleChannel";
    @Output(ExampleOutputChannel.OUTPUT)
    MessageChannel output();
}

La anotación @Output se usa para identificar los canales de salida (Mensajes que serán publicados en RabbitMQ).

@Slf4j
@Component
@EnableBinding(ExampleOutputChannel.class)
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ExamplePublisherChannelImpl implements IExamplePublisherChannel {
    private final ExampleOutputChannel source;
    @Override
    public void sendMessage(Person person) {
        log.info("Sending person: {}", person);
        source.output().send(MessageBuilder.withPayload(person).build());
        log.info("Message sent successfully");
    }
}

Para enviar los datos a los canales configurados, es necesario inyectar la clase con la anotación @EnableBinding, la cual se configura con el canal creado en el paso anterior.

Si en este punto iniciamos nuestra aplicación, podremos ver en la consola de RabbitMQ el tópico configurado para nuestro canal de salida: http://localhost:15672/#/exchanges

Integración de RabbitMQ con Spring

Probando nuestro servicio publicador

Para probar nuestro servicio, crearemos una nueva clase, la cual inyectamos con la anotación @RestController y así poder validar nuestra funcionalidad mediante un cliente REST:

@RestController
@RestController
@RequestMapping(
        value = "/producer",
        consumes = MediaType.APPLICATION_JSON_VALUE,
        produces = MediaType.APPLICATION_JSON_VALUE
)
@AllArgsConstructor
public class ProducerController {
    private final IExamplePublisherChannel publisher;
@PostMapping
    public ResponseEntity getAllUserById(@RequestBody Person person) {
        publisher.sendMessage(person);
        return ResponseEntity.accepted().build();
    }
}

Para probar este método, yo usare una petición curl, pero tú puedes usar un cliente REST como Postman: 

curl --header "Content-Type: application/json" \
--request POST \
--data '{
   "id": "123456",
   "name":"David",
   "age":"29"
}' \
http://localhost:8080/producer

Integración de RabbitMQ con Spring

Si miramos en la pestaña “Exchanges” de RabbitMQ, podemos ver que el mensaje se publicó exitosamente.

Integración de RabbitMQ con Spring

Servicio consumidor

Para consumir el mensaje publicado, necesitas crear un servicio que pueda consumir estos mensajes. Para esto, crearemos un nuevo proyecto en https://start.spring.io/ con la misma configuración que el servicio publicador. En el application.yml dejaremos la misma configuración, con un único cambio: agregaremos la propiedad  “group”, que indica el nombre de nuestra cola. En este ejemplo “myqueue”. 

server:
  port: 8090
spring:
  application:
    name: consumer-service
  cloud:
    stream:
      bindings:
        exampleChannel:
          destination: example.topic
          group: myqueue
  rabbitmq:
    broker-url: tcp://127.0.0.1:5672
    username: someuser
    password: somepassword
    host: localhost

En este paso, necesitas configurar los bindings de entrada que tendrás en RabbitMQ. Spring Cloud Stream creará estos en el caso de no existir:

public interface ExampleInputChannel {
    String INPUT = "exampleChannel";
    @Input(ExampleInputChannel.INPUT)
    MessageChannel input();
}

La anotación @Input es usada para identificar los canales de entrada (los mensajes que son publicados por en RabbitMQ).

Para consumir nuestro mensaje, necesitamos inyectar un bean con la anotación @EnableBinding, para que Spring pueda crear un listener y escuchar todos los mensajes que lleguen a la cola. Con la anotación @StreamListener se consumirán todos los mensajes publicados.

@Slf4j
@EnableBinding(ExampleInputChannel.class)
public class ExampleConsumerChannel {
    @StreamListener(target = ExampleInputChannel.INPUT)
    public void consumer(Person person) {
        log.info("Person receive: {}", person);
    }
}

Después de finalizar estos pasos, podremos iniciar nuestro servicio consumidor y ver la cola creada en RabbitMQ, en donde se consumirán todos los mensajes que lleguen a ella:

Integración de RabbitMQ con Spring

Si publicamos un nuevo mensaje y no existe ningún consumidor disponible para procesar este mensaje, éste quedará en cola, esperando a ser procesado por algún consumidor.

Integración de RabbitMQ con Spring

Cuando este mensaje es procesado por nuestro servicio consumidor, dejará de estar en cola y pasará a estado entregado.

Integración de RabbitMQ con Spring

Integración de RabbitMQ con Spring

¿Tienes alguna duda o comentario sobre cómo integrar Spring y RabittMQ usando Spring Cloud Stream? ¡Compártelos en las redes sociales!

Si quieres aprender más formas para exprimir Spring Boot, no te pierdas esta guía sobre cómo ejecutar aplicaciones Spring Boot como imágenes nativas de GraalVM. También puede interesarte este tutorial para ejecutar una aplicación Spring Boot como un servicio Linux. ¡No te los pierdas!

Artículos relacionados

Object Pooling

Patrones de diseño en los videojuegos: Object Pooling

El uso de patrones de diseño, como el Object Pooling, es una práctica muy recomendable cuando se quieren realizar desarrollos de software escalables, robustos y que tengan un buen rendimiento. Además, nos puede ayudar a mantener una estructuración de todo el código fuente para ayudar

jdk 21

Jdk 21: mejoras en la última versión LTS de Java

Cada 6 meses Java lanza una nueva versión de nuestro lenguaje favorito. Da igual si la estábamos esperando con ganas o si nos pilla por sorpresa, es algo que celebrar dentro de la comunidad. Esta vez la versión 21 incluye diferentes características estables, otras en

openAPI

Explorando OpenApi: estructura, rutas y seguridad

En este artículo, nos adentraremos en la utilización de OpenApi para crear los diferentes endpoints de nuestra aplicación, con sus diferentes objetos de request y response que necesitemos. ¡Vamos allá! ¿Qué es una API? Las API (Interfaz de Programación de Aplicaciones) son piezas de código