Spring Integration & Kafka & Kafka Rest

Spring Integration & Kafka & Kafka Rest

Spring Integration to rozwiązanie które rozszerza możliwości Springa o wsparcie dla wzorców EIP (z ang. Enterprise Integration Patterns) czyli mechanizmów które opisują dobre praktyki projektowania systemów składających się z rozproszonych komponentów. Z tego artykułu dowiesz się:

– jak uruchomić w kontenerze Dockera za pomocą pliku docker-compose.yml kolejkę Kafki?

– jak użyć rozwiązania Spring Integration do komunikacji z Kafką?

– jak wysyłać komunikaty na Kafkę za pomocą Kafka Rest oraz przeglądać wiadomości na Kafdrop?

Zacznijmy od definicji pliku docker-compose.yml:

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    networks: 
      - javaleader-network
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  kafka:
    image: confluentinc/cp-kafka:latest
    networks: 
      - javaleader-network
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,EXTERNAL://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    networks: 
      - javaleader-network
    depends_on:
      - kafka
    ports:
      - 19000:9000
    environment:
      KAFKA_BROKERCONNECT: kafka:29092
  schema-registry:
    networks: 
      - javaleader-network
    image: confluentinc/cp-schema-registry:4.1.1
    hostname: schema-registry
    ports:
    - "38081:38081"
    depends_on:
    - kafka
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_LISTENERS: http://schema-registry:38081
      SCHEMA_REGISTRY_DEBUG: "true"
  kafka-rest:
    image: confluentinc/cp-kafka-rest:4.1.1
    hostname: kafka-rest
    networks: 
      - javaleader-network
    ports:
    - "8082:8082"
    depends_on:
      - schema-registry
    environment:
      KAFKA_REST_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_REST_SCHEMA_REGISTRY_URL: schema-registry:38081
      KAFKA_REST_HOST_NAME: kafka-rest
      KAFKA_REST_LISTENERS: http://kafka-rest:8082
networks: 
  javaleader-network:
    driver: bridge
    external: true
    name: global

Korzystamy z tej samej wirtualnej sieci co pozwala utworzonym kontenerom ze sobą współpracować.

Uruchomienie pliku docker-compose.yml:

docker-compose up -d

wynik:

Creating desktop_zookeeper_1 ... done
Creating desktop_kafka_1     ... done
Creating desktop_schema-registry_1 ... done
Creating desktop_kafdrop_1         ... done
Creating desktop_kafka-rest_1      ... done

adresy narzędzi:

  • Kafdrop: http://localhost:19000/

Aplikacja w Spring Boot:

  • Zacznijmy od pliku pom.xml – niezbędne zależności:
<dependencies>
	<dependency>
		<groupId>com.fasterxml.jackson.dataformat</groupId>
		<artifactId>jackson-dataformat-xml</artifactId>
		<version>2.8.5</version>
	</dependency>
	<dependency>
		<groupId>org.apache.kafka</groupId>
		<artifactId>kafka-clients</artifactId>
		<version>1.0.0</version>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-integration</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.integration</groupId>
		<artifactId>spring-integration-kafka</artifactId>
		<version>2.3.0.RELEASE</version>
	</dependency>
	<dependency>
		<groupId>org.springframework.kafka</groupId>
		<artifactId>spring-kafka</artifactId>
		<version>1.3.2.RELEASE</version>
	</dependency>
	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<optional>true</optional>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
	</dependency>
</dependencies>
  • Klasa konfiguracyjna – ProducerChannelConfig:
@Configuration
public class ProducerChannelConfig {
 
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
 
    @Value("${spring.kafka.topic}")
    private String springIntegrationKafkaTopic;
 
    @Bean
    public DirectChannel producerChannel() {
        return new DirectChannel();
    }
 
    @Bean
    @ServiceActivator(inputChannel = "producerChannel")
    public MessageHandler kafkaMessageHandler() {
        KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler(kafkaTemplate());
        handler.setMessageKeyExpression(new LiteralExpression("kafka-integration"));
        return handler;
    }
 
    @Bean
    public KafkaTemplate kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }
 
    @Bean
    public ProducerFactory producerFactory() {
        return new DefaultKafkaProducerFactory(producerConfigs());
    }
 
    @Bean
    public Map producerConfigs() {
        Map properties = new HashMap();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        return properties;
    }
 
}

Ze względu na fakt, że:

jackson.databind.JsonSerializer

nie implementuje klasy:

org.apache.kafka.common.serialization.Serializer

konieczne jest zastosowanie:

import org.springframework.kafka.support.serializer.JsonSerializer;

inaczej otrzymamy błąd – „JsonSerializer null InstantiationException”

  • Klasa konfiguracyjna – ConsumerChannelConfig:
@Configuration
public class ConsumerChannelConfig {
 
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
 
    @Value("${spring.kafka.topic}")
    private String springIntegrationKafkaTopic;
 
    @Bean
    public PollableChannel consumerChannel() {
        return new QueueChannel();
    }
 
    @Bean
    public ConsumerFactory consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerConfigs());
    }
 
    @Bean
    public Map consumerConfigs() {
        Map properties = new HashMap();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return properties;
    }
}
  • Przykładowy obiekt transportowy – Item:
@Getter
@Setter
@NoArgsConstructor
@ToString
public class Item {
    private String name;
    public Item(String name) {
        this.name = name;
    }
}
  • Klasa serwisu – KafkaService :
@Service
public class KafkaService {
 
    @Autowired
    ConfigurableApplicationContext context;
 
    @Autowired
    private IntegrationFlowContext flowContext;
 
    @Autowired
    private KafkaProperties kafkaProperties;
 
    @Autowired
    PollableChannel consumerChannel;
 
    public void createOnKafkaSampleItems(int itemNumber) {
 
        System.out.println("[START] create on kafka sample message");
 
        MessageChannel producerChannel = context.getBean("producerChannel", MessageChannel.class);
 
        List<Item> items = new ArrayList();
 
        items.add(new Item("item" + itemNumber));
 
        items.forEach(item -> {
            producerChannel.send(new GenericMessage(item, Collections.singletonMap(KafkaHeaders.TOPIC, "items")));
        });
 
        System.out.println("[STOP] create on kafka sample message");
    }
 
    public void addAnotherListenerForTopics(String... topics) {
        Map consumerProperties = kafkaProperties.buildConsumerProperties();
        consumerProperties.put("group.id", "dummy");
 
        IntegrationFlowBuilder flow = IntegrationFlows
                .from(Kafka.messageDrivenChannelAdapter(new DefaultKafkaConsumerFactory(consumerProperties), topics))
 
                .publishSubscribeChannel(subscription ->
                        subscription
 
                                .subscribe(subflow -> subflow
                                        .transform(fromJson(Item.class, new Jackson2JsonObjectMapper()))
                                        .transform(x -> Item.class.cast(x).getName() + " [flow1]")
                                        .channel("consumerChannel"))
 
                                .subscribe(subflow -> subflow
                                        .transform(fromJson(Item.class, new Jackson2JsonObjectMapper()))
                                        .transform(x -> Item.class.cast(x).getName() + " [flow2]")
                                        .channel("consumerChannel"))
 
                                .subscribe(subflow -> subflow
                                        .transform(fromJson(Item.class, new Jackson2JsonObjectMapper()))
                                        .transform(x -> Item.class.cast(x).getName() + " [flow3]")
                                        .channel("consumerChannel")));
 
        this.flowContext.registration(flow.get()).register();
    }
 
    public void consumerMessage() {
 
        addAnotherListenerForTopics("items");
 
        PollableChannel consumerChannel = context.getBean("consumerChannel", PollableChannel.class);
 
        Message<?> received = consumerChannel.receive();
 
        while (received != null) {
            System.out.println("Received " + received.getPayload());
            received = consumerChannel.receive();
        }
    }
}
  • Klasa startowa – SpringIntegrationKafkaIntroApplication:
@SpringBootApplication
public class SpringIntegrationKafkaIntroApplication implements CommandLineRunner {
 
	@Autowired
	KafkaService kafkaService;
 
	public static void main(String[] args) {
		SpringApplication.run(SpringIntegrationKafkaIntroApplication.class, args);
	}
 
	@Override
	public void run(String... strings) throws Exception {
 
		Runnable createOnKafkaSampleItemsTask = () -> {
			try {
				sleep(10000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			for (int i = 0; i < 5; i++) {
				kafkaService.createOnKafkaSampleItems(i);
			}
		};
 
		new Thread(createOnKafkaSampleItemsTask).start();
 
		kafkaService.consumerMessage();
	}
}

w wyniku uruchomienia aplikacji zostanie wystartowany wątek który po 10 sekundach tworzy na kolejce nowe wiadomości:

Offset: 0   Key: kafka-integration   Timestamp: 2022-02-23 18:19:30.526 Headers: empty
{"name":"item0"}
Offset: 1   Key: kafka-integration   Timestamp: 2022-02-23 18:19:30.553 Headers: empty
{"name":"item1"}
Offset: 2   Key: kafka-integration   Timestamp: 2022-02-23 18:19:30.554 Headers: empty
{"name":"item2"}
Offset: 3   Key: kafka-integration   Timestamp: 2022-02-23 18:19:30.555 Headers: empty
{"name":"item3"}
Offset: 4   Key: kafka-integration   Timestamp: 2022-02-23 18:19:30.559 Headers: empty
{"name":"item4"}

Zanim te wiadomości zostaną na kolejce utworzone uruchomiony zostanie konsument wiadomości który nasłuchuje na nowe eventy, konstrukcja:

.publishSubscribeChannel(subscription ->
	subscription
		.subscribe(subflow -> subflow
				.transform(fromJson(Item.class, new Jackson2JsonObjectMapper()))
				.transform(x -> Item.class.cast(x).getName() + "[flow1]")
				.channel("consumerChannel"))
 
		.subscribe(subflow -> subflow
				.transform(fromJson(Item.class, new Jackson2JsonObjectMapper()))
				.transform(x -> Item.class.cast(x).getName() + "[flow2]")
				.channel("consumerChannel"))
 
		.subscribe(subflow -> subflow
				.transform(fromJson(Item.class, new Jackson2JsonObjectMapper()))
				.transform(x -> Item.class.cast(x).getName() + "[flow3]")
				.channel("consumerChannel")));

pozwala równolegle przetwarzać wiadomości z brokera kafki jako niezależne flow:

Received item0[flow3]
Received item0[flow2]
Received item0[flow1]
Received item1[flow3]
Received item1[flow2]
Received item1[flow1]
Received item2[flow3]
Received item2[flow2]
Received item2[flow1]
Received item3[flow3]
Received item3[flow2]
Received item3[flow1]
Received item4[flow3]
Received item4[flow2]
Received item4[flow1]

za pomocą Kafka Rest wyślijmy wiadomość:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" --data '{"records":[{"value":{"name": "item5"}}]}' "http://localhost:8082/topics/items"

zauważyć można poprawne „skonsumowanie” wiadomości jako niezależne flow:

Received item5[flow3]
Received item5[flow2]
Received item5[flow1]

Zobacz kod na GitHubie i zapisz się na bezpłatny newsletter!

.

Leave a comment

Your email address will not be published.


*