Icono del sitio Profile Software Services

Qué es Apache Kafka y cómo dar los primeros pasos

Apache Kafka

En este artículo vamos a ver qué es Apache Kafka y cuáles son los primeros pasos para empezar a utilizar esta tecnología clave para realizar procesamiento de datos en tiempo real.

¿Qué es Apache Kafka?

Apache Kafka es un sistema de mensajería distribuido y escalable que permite comunicar aplicaciones tomando como referencia el patrón de diseño Observer (publicador-subscriptor). Se basa en streams de eventos que facilita el procesamiento de gran volumen de datos en tiempo real y permite comunicar microservicios de forma asíncrona, fiable, escalable y desacoplada. Es un sistema adecuado tanto para entornos Big Data como para entornos más tradicionales donde la información va creciendo de forma exponencial.

Hay varias opciones disponibles para instalar y configurar el entorno. Se pueden instalar los componentes necesarios en la máquina host de forma tradicional, pero en este ejemplo vamos a optar por la opción de utilizar contenedores Docker. Esta instalación tiene la ventaja de ser independiente del sistema y fácil de configurar. Una vez instalado el entorno desarrollamos dos microservicios en Spring-boot para ilustrar cómo comunicar un productor y consumidor de la forma más básica.

Instalación y ejecución de Apache kafka

Se necesita tener instalada la herramienta Docker-compose. Iremos a la página oficial y seguiremos las instrucciones en función del sistema operativo. 

Los componentes imprescindibles para operar con Apache Kafka son:

Se pueden encontrar distintas imágenes para estos componentes en DockerHub

Sería posible trabajar con comandos docker para cada una de las imágenes, pero resultaría bastante tedioso. En casos donde tenemos varios componentes que dependen entre sí la forma más directa es utilizar docker-compose. Necesitamos un fichero docker-compose.yml. 

Analicemos los elementos principales:

services:
zookeeper:
  image: zookeeper:3.8.0
  container_name: zookeeper
  restart: always
  networks:
    - kafka-net
  ports:
    - "2181:2181"
services:
kafka:
  image: wurstmeister/kafka:2.13-2.8.1
  container_name: kafka
  restart: always
  networks:
    - kafka-net
  ports:
    - "9092:9092"
  environment:
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_INTERNAL:PLAINTEXT,DOCKER_EXTERNAL:PLAINTEXT
    KAFKA_LISTENERS: DOCKER_INTERNAL://:29092,DOCKER_EXTERNAL://:9092
    KAFKA_ADVERTISED_LISTENERS: DOCKER_INTERNAL://kafka:29092,DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
    KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER_INTERNAL
    KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
    KAFKA_BROKER_ID: 1
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  depends_on:
    - zookeeper
networks:
  kafka-net:
    name: kafka-net
    driver: bridge

Una vez listo el fichero de configuración el siguiente paso es utilizar la herramienta docker-compose para ejecutar ambos contenedores. Nos situamos en el mismo directorio y ejecutamos el comando:

C:\post-kafka\Docker>docker-compose up --d
Creating network "kafka-net" with driver "bridge"
Creating zookeeper ... done
Creating kafka     ... done

Podemos ver los procesos en ejecución, puertos y comandos de arranque, con lo que tenemos lista la instalación básica de Apache Kafka.

C:\post-kafka\Docker>docker-compose up --d
Creating network "kafka-net" with driver "bridge"
Creating zookeeper ... done
Creating kafka     ... done

En caso que sea necesario se pueden consultar los logs de cada uno de los contenedores. En el ejemplo consultamos los relativos al broker kafka:

C:\post-kafka\Docker>docker compose logs kafka -f
kafka  | [Configuring] 'advertised.listeners' in '/opt/kafka/config/server.properties'
kafka  | [Configuring] 'port' in '/opt/kafka/config/server.properties'
kafka  | [Configuring] 'inter.broker.listener.name' in '/opt/kafka/config/server.properties'
kafka  | Excluding KAFKA_HOME from broker config
kafka  | [Configuring] 'log.dirs' in '/opt/kafka/config/server.properties'

Comandos básicos de Apache Kafka

El primer paso es ejecutar el comando docker que nos permite ejecutar bash dentro del contenedor Kafka. 

C:\post-kafka\Docker>docker container ls
CONTAINER ID   IMAGE                           COMMAND                  CREATED       STATUS          PORTS                                                  NAMES
d4b7296b1fc4   wurstmeister/kafka:2.13-2.8.1   "start-kafka.sh"         5 hours ago   Up 12 minutes   0.0.0.0:9092->9092/tcp                                 kafka
cd48bdfc2114   zookeeper:3.8.0                 "/docker-entrypoint...."   5 hours ago   Up 12 minutes   2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   zookeeper

C:\post-kafka\Docker>docker exec -it kafka bash
root@d4b7296b1fc4:/#

Creamos el topic my_fist_topic, indicando que queremos una réplica y partición. Las réplicas aportan tolerancia a fallos y en entornos reales es típico configurar valores superiores a uno, para que si un broker se cae otro pueda servir los datos. Las particiones permiten añadir más consumidores a leer del topic, añadiendo mayor capacidad de procesado concurrente. Cada mensaje creado por un productor se asigna automáticamente a una partición. Es inmutable y puede ser consumido por uno o varios procesos en dicha partición.

root@d4b7296b1fc4:/# kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my_first_topic --partitions 1 --replication-factor 1
Created topic my_first_topic.

Para obtener la lista de topic creados utilizamos el comando list. Con describe podemos ver las características detalladas de un topic a partir de su nombre.

root@d4b7296b1fc4:/# kafka-topics.sh --bootstrap-server localhost:9092 --list
my_first_topic
root@d4b7296b1fc4:/# kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my_first_topic
Topic: my_first_topic   TopicId: IWQ5TsElTaarFgUTcm0PWQ PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: my_first_topic   Partition: 0    Leader: 1       Replicas: 1     Isr: 1

Una vez creado el topic, podemos crear y consumir eventos sobre él. La instalación de kafka nos proporciona un productor y consumidor que podemos invocar por línea de comandos.

root@0aa7930e1daa:/# kafka-console-producer.sh --topic my_first_topic --bootstrap-server localhost:9092
>Mi primer evento
>Mi segundo evento
>
root@0aa7930e1daa:/# kafka-console-producer.sh --topic my_first_topic --bootstrap-server localhost:9092
>Mi primer evento
>Mi segundo evento
>

Los comandos para ver la lista actual de consumidores y el detalle de consumidores nos permite ver cómo hay un consumidor cuyo offset es 2 sobre el topic my_first_topic:

kafka-consumer-groups.sh  --list --bootstrap-server localhost:9092
console-consumer-70207
root@0aa7930e1daa:/# kafka-consumer-groups.sh --describe --group console-consumer-70207 --bootstrap-server localhost:9092

GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                            HOST            CLIENT-ID
console-consumer-70207 my_first_topic  0          -               2               -               consumer-console-consumer-70207-1-2909e37f /127.0.0.1      consumer-console-consumer-70207-1

Con el comando alter topic podemos modificar un topic, por ejemplo aumentando el número de particiones. 

Mediante delete topic podemos eliminar un topic a partir del nombre.

root@d4b7296b1fc4:/# kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my_first_topic --partitions 4

Productor y consumidor con Spring Boot

Para ilustrar el funcionamiento de Apache Kafka vamos a desarrollar una aplicación para monitorizar datos de participantes en una carrera popular. Tendremos un microservicio productor que cada cierto tiempo creará un mensaje indicando el nombre del participante y su localización actual. Esta información sería generada en un entorno real por un sensor GPS. Escribirá el mensaje en un topic. Un segundo microservicio consumidor leerá los mensajes del topic.

Utilizamos spring initializr para crear el proyecto de microservicios productor y el consumidor. En ambos casos seleccionamos la dependencia Spring for Apache Kafka.  

Productor:

La auto-configuración que proporciona Spring Boot permite crear un productor sin realizar ninguna configuración adicional si kafka se ejecuta en localhost:8082. En otro caso, estableceremos en propiedad spring.kafka.bootstrap-server la url del cluster kafka al que nos queramos conectar.

Para el ejemplo utilizamos el topic t_location. La opción recomendada en entornos de producción es que el administrador del cluster sea el encargado de crear manualmente los topics, para tener mayor control. Spring Boot proporciona un mecanismo para crearlos si no están creados en el arranque de la aplicación. Para ello, hay que definir un bean de la clase NewTopic en la configuración del proyecto, en nuestro caso con una sola partición y réplica.

@Configuration
public class KafkaConfig {

  @Bean
  public NewTopic topicLocation(){
      return new NewTopic("t_location", 1, (short)1);
  }
}

Para escribir mensajes Spring – Kafka nos proporciona el bean KafkaTemplate. Este bean viene configurado por defecto con lo que podemos inyectar en cualquier clase de spring y utilizarlo. En el ejemplo utilizamos el método send para escribir un mensaje en el topic t_location. Como key del mensaje utilizamos el nombre del corredor y valor del mensaje el Json que representa la localización actual.

@Service
public class RunnerLocationProducerImpl implements RunnerLocationProducer {

  private static final Logger LOG = LoggerFactory.getLogger(RunnerLocationProducerImpl.class);
  @Autowired
  private KafkaTemplate<String, String> kafkaTemplate;
  private ObjectMapper objectMapper = new ObjectMapper();

  @Override
  public void send(RunnerLocation runnerLocation) {
      try {
          String json = objectMapper.writeValueAsString(runnerLocation);
          LOG.info("Sending message {}", json);
          kafkaTemplate.send("t_location", runnerLocation.getName(), json);
      } catch (JsonProcessingException e) {
          LOG.error("Error send", e);
      }
  }
}
@Service
public class RunnerLocationProducerImpl implements RunnerLocationProducer {

  private static final Logger LOG = LoggerFactory.getLogger(RunnerLocationProducerImpl.class);
  @Autowired
  private KafkaTemplate<String, String> kafkaTemplate;
  private ObjectMapper objectMapper = new ObjectMapper();

  @Override
  public void send(RunnerLocation runnerLocation) {
      try {
          String json = objectMapper.writeValueAsString(runnerLocation);
          LOG.info("Sending message {}", json);
          kafkaTemplate.send("t_location", runnerLocation.getName(), json);
      } catch (JsonProcessingException e) {
          LOG.error("Error send", e);
      }
  }
}

Para completar el productor añadimos un Scheduler que cada tres segundos inserte datos aleatorios de posición de cada corredor. En una aplicación más realista podríamos trabajar con datos reales generados por el sensor de GPS de cada corredor.

@Service
public class RunnerLocationProducerImpl implements RunnerLocationProducer {

  private static final Logger LOG = LoggerFactory.getLogger(RunnerLocationProducerImpl.class);
  @Autowired
  private KafkaTemplate<String, String> kafkaTemplate;
  private ObjectMapper objectMapper = new ObjectMapper();

  @Override
  public void send(RunnerLocation runnerLocation) {
      try {
          String json = objectMapper.writeValueAsString(runnerLocation);
          LOG.info("Sending message {}", json);
          kafkaTemplate.send("t_location", runnerLocation.getName(), json);
      } catch (JsonProcessingException e) {
          LOG.error("Error send", e);
      }
  }
}

Consumidor:

Spring-Kafka también nos proporciona por defecto la autoconfiguración para trabajar con el servidor de kafka en localhost:8082. Configuramos el consumer group a utilizar. También, indicamos que el consumidor leerá mensajes aunque sean previos a que empiece su ejecución el valor earliest en la propiedad auto-offset-reset. Con esto el application.yml del consumidor:

server:
port: 8081
spring:
kafka:
  consumer:
    group-id: default-spring-consumer
    auto-offset-reset: earliest

Para consumir los mensajes solo necesitamos tener un bean con un método anotado con @KafkaListener indicando el topic del que queremos leer. Cada vez que llegue un nuevo mensaje al topic este consumidor se ejecuta, procesa el mensaje e incrementa el offset del consumidor para procesar el siguiente mensaje. En el ejemplo obtenemos el mensaje en formato json y se serializa a la clase java adecuada:

@Service
public class RunnerLocationConsumerImpl implements RunnerLocationConsumer{

  private static final Logger log = LoggerFactory.getLogger(RunnerLocationConsumerImpl.class);
  private ObjectMapper objectMapper = new ObjectMapper();


  @Override
  @KafkaListener(topics = "t_location")
  public void listen(String message) throws JsonProcessingException {
      RunnerLocation runnerLocation = objectMapper.readValue(message, RunnerLocation.class);
      log.info("listen() : {}", runnerLocation);
  }
}

Ejecutando productor y consumidor podemos observar como los mensajes generados periódicamente por el productor son consumidos por el consumidor:

2022-07-26 12:04:29.797  INFO 15244 --- [   scheduling-1] c.e.d.p.RunnerLocationProducerImpl       : Sending message {"name":"Jose","latitude":16.95073816851057,"longitude":66.16093340714409}
2022-07-26 12:04:29.798  INFO 15244 --- [   scheduling-1] c.e.d.p.RunnerLocationProducerImpl       : Sending message {"name":"Noelia","latitude":5.642663439276047,"longitude":71.53689320133863}
2022-07-26 12:04:29.799  INFO 15244 --- [   scheduling-1] c.e.d.p.RunnerLocationProducerImpl       : Sending message {"name":"Sergio","latitude":38.262168550307685,"longitude":17.74001041993965}
2022-07-26 12:04:32.692  INFO 15244 --- [   scheduling-1] c.e.d.p.RunnerLocationProducerImpl       : Sending message {"name":"Natalia","latitude":53.41710664046941,"longitude":58.59751921176856}
2022-07-26 12:06:57.316  INFO 5924 --- [ntainer#0-0-C-1] c.e.d.c.RunnerLocationConsumerImpl       : listen() : RunnerLocation{name='Natalia', latitude=50.133489026738495, longitude=28.416117830920662}
2022-07-26 12:06:57.321  INFO 5924 --- [ntainer#0-0-C-1] c.e.d.c.RunnerLocationConsumerImpl       : listen() : RunnerLocation{name='Jose', latitude=32.2442143148276, longitude=80.65603976351473}
2022-07-26 12:06:57.321  INFO 5924 --- [ntainer#0-0-C-1] c.e.d.c.RunnerLocationConsumerImpl       : listen() : RunnerLocation{name='Noelia', latitude=47.32174414979816, longitude=2.435827209957644}
2022-07-26 12:06:57.321  INFO 5924 --- [ntainer#0-0-C-1] c.e.d.c.RunnerLocationConsumerImpl       : listen() : RunnerLocation{name='Sergio', latitude=40.9252234555011, longitude=100.75676541105635}

Vamos a suponer que aparece un nuevo requisito de conocer la velocidad media actual de cada participante y que procesar este cálculo tiene bastante coste computacional. En lugar de situar en el listener que hemos definido el código para calcular dicho dato y empeorar el rendimiento de la aplicación podemos definir otro listener sobre el mismo topic.  Para ello creamos un nuevo método anotado con @KafkaListener y con el grupo de consumidores spring-consumer-speed, de forma que cada mensaje será procesado por cada uno de los consumidores de forma independiente.

@Service
public class RunnerLocationConsumerImpl implements RunnerLocationConsumer{

  private static final Logger log = LoggerFactory.getLogger(RunnerLocationConsumerImpl.class);
  private ObjectMapper objectMapper = new ObjectMapper();


  @Override
  @KafkaListener(topics = "t_location")
  public void listen(String message) throws JsonProcessingException {
      RunnerLocation runnerLocation = objectMapper.readValue(message, RunnerLocation.class);
      log.info("listen() : {}", runnerLocation);
  }

  @Override
  @KafkaListener(topics = "t_location", groupId = "spring-consumer-speed")
  public void listenSpeed(String message) throws JsonProcessingException {
      RunnerLocation runnerLocation = objectMapper.readValue(message, RunnerLocation.class);
      log.info("listenSpeed() : {}", runnerLocation);
  }
}

Podemos ver cómo se comportan ambos consumidores trabajan de forma independiente con el topic t_location:

root@0aa7930e1daa:/# kafka-consumer-groups.sh --describe --group default-spring-consumer --bootstrap-server localhost:9092

GROUP                   TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                             HOST            CLIENT-ID
default-spring-consumer t_location      0          60              60              0               consumer-default-spring-consumer-1-b6ce931b /172.18.0.1     consumer-default-spring-consumer-1
root@0aa7930e1daa:/# kafka-consumer-groups.sh --describe --group spring-consumer-speed --bootstrap-server localhost:9092

GROUP                 TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                           HOST            CLIENT-ID
spring-consumer-speed t_location      0          92              92              0               consumer-spring-consumer-speed-2-4265f788/172.18.0.1     consumer-spring-consumer-speed-2

Esta es una de las características de Kafka más interesantes y que lo diferencia de los sistemas de colas tradicionales. En un sistema tradicional típicamente cada mensaje sólo puede ser procesado por un consumidor y se elimina de la cola una vez procesado. En Apache Kafka los mensajes no desaparecen al ser consumidos sino que permanecen en el topic (por defecto una semana) y pueden ser procesados por distintos grupos de consumidores.  

Pruebas con jUnit5

Para hacer pruebas unitarias con JUnit5 + Mockito de las clases productoras y consumidoras en el ejemplo se aplicarían las mismas técnicas que en cualquier otra clase.

Además se pueden hacer pruebas de integración con JUnit5 + EmbebedKafka. Es una implementación en memoria de Kafka Brokers que nos proporciona Spring y se puede ejecutar sin necesidad de una instalación kafka real.

La configuración básica de un test sería:

@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, topics = {"${kafka-producer.topics.location}"},
      brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
public class RunnerLocationProducerIT {

  @Autowired
  private EmbeddedKafkaBroker embeddedKafka;
}

Donde tenemos las anotaciones:

Para probar el productor de mensajes desarrollados generamos un mensaje con el objeto productor RunnerLocationProducer y configuramos un consumidor genérico apoyándonos en las utilidades de EmbebedKafkaBroker. Estará vinculado al topic que estemos probando y esperando mensajes. Usamos KafkaTestUtils.getSingleRecord(consumer, topic) de Spring para recuperar el mensaje generado y comprobamos que tiene los valores que esperamos.

@BeforeEach
void setUp() {
  ConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
          KafkaTestUtils.consumerProps("my-test-groupId", "true", embeddedKafka),
  StringDeserializer::new, StringDeserializer::new);
  consumer = cf.createConsumer();
  embeddedKafka.consumeFromEmbeddedTopics(consumer, topic);
}
@Test
void when_publishRunnerLocation_expectConsumedRunnerLocation() throws JsonProcessingException {
  RunnerLocation runnerLocation = new RunnerLocation(NAME, LATITUDE, LONGITUDE);

  runnerLocationProducer.send(runnerLocation);

  ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, topic);
  RunnerLocation consumed = objectMapper.readValue(record.value(), RunnerLocation.class);
  Assertions.assertEquals(NAME, consumed.getName());
  Assertions.assertEquals(LONGITUDE, consumed.getLongitude());
  Assertions.assertEquals(LATITUDE, consumed.getLatitude());

}

Para probar los consumidores utilizamos un productor genérico con el que generar un mensaje. Esperamos que el mensaje generado llegue a cada uno de los consumidores que hemos desarrollado (RunnerLocationConsumer.listen y RunnerLocationConsumer.listenSpeed). 

@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, topics = {"t_location"},
       brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
public class RunnerLocationConsumerIT {
   @Autowired
   private EmbeddedKafkaBroker embeddedKafka;

   @Autowired
   private KafkaTemplate<String, String> kafkaTemplate;
   private ObjectMapper objectMapper = new ObjectMapper();

   @SpyBean
   private RunnerLocationConsumer runnerLocationConsumer;

   @Captor
   ArgumentCaptor<String> listenArgumentCaptor;

   @Captor
   ArgumentCaptor<String> listenSpeedArgumentCaptor;

   @Test
   void when_receiveMessage_expectBothListenerListen() throws JsonProcessingException, InterruptedException {
       String valueString = objectMapper.writeValueAsString(new RunnerLocation("Jose", 8L, 2l));
       Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
       KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(senderProps);
       kafkaProducer.send(new ProducerRecord<>("t_location", valueString));

       waitAtMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
           verify(runnerLocationConsumer).listen(listenSpeedArgumentCaptor.capture());
           verify(runnerLocationConsumer).listenSpeed(listenSpeedArgumentCaptor.capture());
       });
   }

}

@SpyBean: Sobre el consumer que queremos probar.

@Captor: Objetos que mockito utilizará para capturar los argumentos de los métodos de cada listener

Hacer la prueba sobre el consumidor es complejo. Hay que tener en cuenta que el envío de mensaje es asíncrono, así que si escribimos el test sin tener en cuenta esto se ejecutará antes de que el mensaje llegue al consumidor. Por ello utilizamos waitAtMost, de la librería Awaitility que facilita el testing de aplicaciones asíncronas. Esta función espera hasta que se cumpla que se haya llamado cada uno de los listeners definidos. Establecemos un timeout de espera de 5 segundos para que en caso de fallo el test termine.

Conclusiones

Hemos visto una forma estandarizada para empezar a trabajar con Apache Kafka. Utilizar contenedores Docker hace muy sencilla y portable la instalación y configuración inicial. Además, a la hora de desarrollar Spring Boot facilita el trabajo gracias a la autoconfiguración que proporciona para Kafka. 

Salir de la versión móvil