Spring Integration & Kafka & Kafka Rest
Spring Integration & Kafka & Kafka Rest
Spring Integration to rozwiązanie które rozszerza możliwości Springa o wsparcie dla wzorców EIP (z ang. Enterprise Integration Patterns) czyli mechanizmów które opisują dobre praktyki projektowania systemów składających się z rozproszonych komponentów. Z tego artykułu dowiesz się:
– jak uruchomić w kontenerze Dockera za pomocą pliku docker-compose.yml kolejkę Kafki?
– jak użyć rozwiązania Spring Integration do komunikacji z Kafką?
– jak wysyłać komunikaty na Kafkę za pomocą Kafka Rest oraz przeglądać wiadomości na Kafdrop?
Zacznijmy od definicji pliku docker-compose.yml:
version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:latest networks: - javaleader-network environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka: image: confluentinc/cp-kafka:latest networks: - javaleader-network depends_on: - zookeeper ports: - 9092:9092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,EXTERNAL://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 kafdrop: image: obsidiandynamics/kafdrop:latest networks: - javaleader-network depends_on: - kafka ports: - 19000:9000 environment: KAFKA_BROKERCONNECT: kafka:29092 schema-registry: networks: - javaleader-network image: confluentinc/cp-schema-registry:4.1.1 hostname: schema-registry ports: - "38081:38081" depends_on: - kafka environment: SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181 SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_LISTENERS: http://schema-registry:38081 SCHEMA_REGISTRY_DEBUG: "true" kafka-rest: image: confluentinc/cp-kafka-rest:4.1.1 hostname: kafka-rest networks: - javaleader-network ports: - "8082:8082" depends_on: - schema-registry environment: KAFKA_REST_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_REST_SCHEMA_REGISTRY_URL: schema-registry:38081 KAFKA_REST_HOST_NAME: kafka-rest KAFKA_REST_LISTENERS: http://kafka-rest:8082 networks: javaleader-network: driver: bridge external: true name: global
Korzystamy z tej samej wirtualnej sieci co pozwala utworzonym kontenerom ze sobą współpracować.
Uruchomienie pliku docker-compose.yml:
docker-compose up -d
wynik:
Creating desktop_zookeeper_1 ... done Creating desktop_kafka_1 ... done Creating desktop_schema-registry_1 ... done Creating desktop_kafdrop_1 ... done Creating desktop_kafka-rest_1 ... done
adresy narzędzi:
- Kafdrop: http://localhost:19000/
Aplikacja w Spring Boot:
- Zacznijmy od pliku pom.xml – niezbędne zależności:
<dependencies> <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.8.5</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>2.3.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.2.RELEASE</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
- Klasa konfiguracyjna – ProducerChannelConfig:
@Configuration public class ProducerChannelConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.topic}") private String springIntegrationKafkaTopic; @Bean public DirectChannel producerChannel() { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = "producerChannel") public MessageHandler kafkaMessageHandler() { KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler(kafkaTemplate()); handler.setMessageKeyExpression(new LiteralExpression("kafka-integration")); return handler; } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate(producerFactory()); } @Bean public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory(producerConfigs()); } @Bean public Map producerConfigs() { Map properties = new HashMap(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); return properties; } }
Ze względu na fakt, że:
jackson.databind.JsonSerializer
nie implementuje klasy:
org.apache.kafka.common.serialization.Serializer
konieczne jest zastosowanie:
import org.springframework.kafka.support.serializer.JsonSerializer;
inaczej otrzymamy błąd – „JsonSerializer null InstantiationException”
- Klasa konfiguracyjna – ConsumerChannelConfig:
@Configuration public class ConsumerChannelConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.topic}") private String springIntegrationKafkaTopic; @Bean public PollableChannel consumerChannel() { return new QueueChannel(); } @Bean public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory(consumerConfigs()); } @Bean public Map consumerConfigs() { Map properties = new HashMap(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return properties; } }
- Przykładowy obiekt transportowy – Item:
@Getter @Setter @NoArgsConstructor @ToString public class Item { private String name; public Item(String name) { this.name = name; } }
- Klasa serwisu – KafkaService :
@Service public class KafkaService { @Autowired ConfigurableApplicationContext context; @Autowired private IntegrationFlowContext flowContext; @Autowired private KafkaProperties kafkaProperties; @Autowired PollableChannel consumerChannel; public void createOnKafkaSampleItems(int itemNumber) { System.out.println("[START] create on kafka sample message"); MessageChannel producerChannel = context.getBean("producerChannel", MessageChannel.class); List<Item> items = new ArrayList(); items.add(new Item("item" + itemNumber)); items.forEach(item -> { producerChannel.send(new GenericMessage(item, Collections.singletonMap(KafkaHeaders.TOPIC, "items"))); }); System.out.println("[STOP] create on kafka sample message"); } public void addAnotherListenerForTopics(String... topics) { Map consumerProperties = kafkaProperties.buildConsumerProperties(); consumerProperties.put("group.id", "dummy"); IntegrationFlowBuilder flow = IntegrationFlows .from(Kafka.messageDrivenChannelAdapter(new DefaultKafkaConsumerFactory(consumerProperties), topics)) .publishSubscribeChannel(subscription -> subscription .subscribe(subflow -> subflow .transform(fromJson(Item.class, new Jackson2JsonObjectMapper())) .transform(x -> Item.class.cast(x).getName() + " [flow1]") .channel("consumerChannel")) .subscribe(subflow -> subflow .transform(fromJson(Item.class, new Jackson2JsonObjectMapper())) .transform(x -> Item.class.cast(x).getName() + " [flow2]") .channel("consumerChannel")) .subscribe(subflow -> subflow .transform(fromJson(Item.class, new Jackson2JsonObjectMapper())) .transform(x -> Item.class.cast(x).getName() + " [flow3]") .channel("consumerChannel"))); this.flowContext.registration(flow.get()).register(); } public void consumerMessage() { addAnotherListenerForTopics("items"); PollableChannel consumerChannel = context.getBean("consumerChannel", PollableChannel.class); Message<?> received = consumerChannel.receive(); while (received != null) { System.out.println("Received " + received.getPayload()); received = consumerChannel.receive(); } } }
- Klasa startowa – SpringIntegrationKafkaIntroApplication:
@SpringBootApplication public class SpringIntegrationKafkaIntroApplication implements CommandLineRunner { @Autowired KafkaService kafkaService; public static void main(String[] args) { SpringApplication.run(SpringIntegrationKafkaIntroApplication.class, args); } @Override public void run(String... strings) throws Exception { Runnable createOnKafkaSampleItemsTask = () -> { try { sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0; i < 5; i++) { kafkaService.createOnKafkaSampleItems(i); } }; new Thread(createOnKafkaSampleItemsTask).start(); kafkaService.consumerMessage(); } }
w wyniku uruchomienia aplikacji zostanie wystartowany wątek który po 10 sekundach tworzy na kolejce nowe wiadomości:
Offset: 0 Key: kafka-integration Timestamp: 2022-02-23 18:19:30.526 Headers: empty {"name":"item0"} Offset: 1 Key: kafka-integration Timestamp: 2022-02-23 18:19:30.553 Headers: empty {"name":"item1"} Offset: 2 Key: kafka-integration Timestamp: 2022-02-23 18:19:30.554 Headers: empty {"name":"item2"} Offset: 3 Key: kafka-integration Timestamp: 2022-02-23 18:19:30.555 Headers: empty {"name":"item3"} Offset: 4 Key: kafka-integration Timestamp: 2022-02-23 18:19:30.559 Headers: empty {"name":"item4"}
Zanim te wiadomości zostaną na kolejce utworzone uruchomiony zostanie konsument wiadomości który nasłuchuje na nowe eventy, konstrukcja:
.publishSubscribeChannel(subscription -> subscription .subscribe(subflow -> subflow .transform(fromJson(Item.class, new Jackson2JsonObjectMapper())) .transform(x -> Item.class.cast(x).getName() + "[flow1]") .channel("consumerChannel")) .subscribe(subflow -> subflow .transform(fromJson(Item.class, new Jackson2JsonObjectMapper())) .transform(x -> Item.class.cast(x).getName() + "[flow2]") .channel("consumerChannel")) .subscribe(subflow -> subflow .transform(fromJson(Item.class, new Jackson2JsonObjectMapper())) .transform(x -> Item.class.cast(x).getName() + "[flow3]") .channel("consumerChannel")));
pozwala równolegle przetwarzać wiadomości z brokera kafki jako niezależne flow:
Received item0[flow3] Received item0[flow2] Received item0[flow1] Received item1[flow3] Received item1[flow2] Received item1[flow1] Received item2[flow3] Received item2[flow2] Received item2[flow1] Received item3[flow3] Received item3[flow2] Received item3[flow1] Received item4[flow3] Received item4[flow2] Received item4[flow1]
za pomocą Kafka Rest wyślijmy wiadomość:
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" --data '{"records":[{"value":{"name": "item5"}}]}' "http://localhost:8082/topics/items"
zauważyć można poprawne „skonsumowanie” wiadomości jako niezależne flow:
Received item5[flow3] Received item5[flow2] Received item5[flow1]
Leave a comment