Apache Kafka – szybki start do strumieniowego przetwarzania danych
Apache Kafka – szybki start do strumieniowego przetwarzania danych
Apache Kafka to narzędzie do propagacji zdarzeń (z ang. events) w systemach rozproszonych. Projekt powstał w firmie LinkedIn, upubliczniono go w roku 2011 (dla porównania RabbitMQ dostępny jest od roku 2007) dziś zarządzany jest przez fundację Apache. Kafka świetnie sprawdzi się do obsługi ogromnych ilości komunikatów i jednoczesnych połączeń klienckich.
Podstawowa różnica między Apache Kafka i RabbitMQ:
W przypadku rozwiązania Apache Kafka kolejka Kafki jest trwała. Oznacza to, że dane wysyłane do Kafki są przechowywane do momentu upłynięcia określonego czasu lub osiągnięcia określonego limitu. Dopóki nie wydarzy się jedna z tych dwóch rzeczy, wiadomość pozostaje w kolejce nawet po jej zużyciu. Wiadomości mogą być odtwarzane lub konsumowane wiele razy. RabbitMQ przechowuje z kolei wiadomości dopóki aplikacja odbierająca nie połączy się z kolejką i nie odbierze jej. Klient może potwierdzić komunikat w tym momencie lub po zakończeniu jego przetwarzania. W obu tych przypadkach, gdy wiadomość zostanie potwierdzona przez konsumenta, zniknie z kolejki. RabbitMQ sprawdzi się doskonale kiedy odbiorca jest precyzyjnie zdefiniowany i wysyłamy tzw. komendy. Kafka natomiast sprawdzi się kiedy obsługujemy zdarzenia (z ang. events) gdzie odbiorca nie jest precyzyjnie zdefiniowany. Przejdźmy do praktyki!
Tworzymy nowy projekt Spring Boot 2.2.5 – plik pom.xml – zależność do Apache Kafka:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.4.4.RELEASE</version> </dependency>
Konfiguracja Apache Kafka i Kafka Managera:
Pobieramy projekt Apache Kafka:
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.4.1/kafka_2.12-2.4.1.tgz
modyfikujemy plik:
config\server.properties
dodajemy linię która wskazuje na lokalnego hosta:
#listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://127.0.0.1:9092
wykonujemy komendę (system windows):
kafka\bin\windows>zookeeper-server-start.bat ..\..\config\zookeeper.properties
Zookeper to usługa, która służy do koordynacji rozproszonych aplikacji. Aby uruchomić Kafkę, konieczna jest działająca instancja Zookepera.
wykonujemy teraz komendę (system windows):
kafka\bin\windows>kafka-server-start.bat ..\..\config\server.properties
Domyślnie Apache Kafka startuje na porcie 9092:
http://localhost:9092
Kafka nie posiada domyślnie graficznej konsoli tak jak RabbitMQ, instalujemy zatem Kafka-Manager:
1. Pobieramy i instalujemy ze strony https://www.scala-lang.org/download/ plik scala-2.13.1.msi:
weryfikujemy wersję:
scala -version
wynik:
Scala code runner version 2.13.1 -- Copyright 2002-2019, LAMP/EPFL and Lightbend, Inc. [info] 1.2.
2. Pobieramy Kafka Managera ze strony https://github.com/yahoo/CMAK
Sprawdzamy wersję sbt w pliku :
\CMAK-3.0.0.4\CMAK-3.0.0.4\project\build.properties
zawartość pliku build.properties:
sbt.version=1.3.8
3. Pobieramy i instalujemy ze strony https://www.scala-sbt.org/download.html plik sbt-1.3.8 (wersja adekwatna do tej podanej w pliku build.properties):
W terminalu wydajemy polecenie:
sbt
następnie wpisujemy:
continue
weryfikujemy wersje:
sbt:system-tools> sbtVersion
wynik:
[info] sbt server started at local:sbt-server-379338fe84024462dacd sbt:mwarycha> sbtVersion [info] 1.3.8 sbt:mwarycha>
4. Budowanie paczki narzędziem sbt:
CMAK-3.0.0.4\CMAK-3.0.0.4>sbt clean dist
w katalogu:
CMAK-3.0.0.4\CMAK-3.0.0.4\target\universal
zbudowana została paczka:
cmak-3.0.0.4.rar
5. Rozpakowujemy plik cmak-3.0.0.4.rar:
[uwaga] ze względu na błąd „The input line is too long” zawartość pliku powinna być rozpakowana bezpośrednio na danej partycji.
W pliku:
./conf/application.conf
modyfikujemy:
kafka-manager.zkhosts="127.0.0.1:2181"
oraz:
cmak.zkhosts="127.0.0.1:2181"
zmiany te dotyczą portu i hosta, na którym działa zookeeper.
6. Start Kafka Managera:
\bin\cmak.bat
wynik – udało się!
Aplikacja Spring Boot
Klasa konfiguracyjna – dodajemy Topic (parametr numPartitions – ilość partycji przydzielonych do danego tematu, parametr replicationFactor odpowiada ile razy Topic ma zostać zreplikowany) :
@Configuration public class Config { @Bean public NewTopic adviceTopic() { short replicationFactor = 1; int numPartitions = 3; return new NewTopic("users", numPartitions, replicationFactor); } }
Dodajemy RestControler:
@RestController public class KafkaController { private final Producer producer; public KafkaController(Producer producer) { this.producer = producer; } @PostMapping(value = "/publish") public void sendMessageToKafkaTopic(@RequestParam("message") String message){ this.producer.sendMessage(message); } }
Klasa producenta:
@Service public class Producer { private static final Logger logger = LoggerFactory.getLogger(Producer.class); private static final String TOPIC = "users"; @Autowired private KafkaTemplate<String,String> kafkaTemplate; public void sendMessage(String message){ logger.info(String.format("$$ -> Producing message --> %s",message)); this.kafkaTemplate.send(TOPIC,message); } }
Klasa konsumenta:
@Service public class Consumer { private final Logger logger = LoggerFactory.getLogger(Consumer.class); @KafkaListener(topics = "users", groupId = "34") public void consume(String message){ logger.info(String.format("$$ -> Consumed Message -> %s",message)); } }
Plik application.yml:
server: port: 8080 spring: kafka: consumer: bootstrap-servers: localhost:9092 group-id:34 auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: bootstrap-servers: localhost:9092 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
wysyłamy wiadomość – request metodą POST:
http://localhost:8080/publish?message=JavaLeader.pl
wynik na konsoli:
INFO p.j.kafkaquickstart.service.Producer : $$ -> Producing message --> JavaLeader.pl INFO p.j.kafkaquickstart.service.Consumer : $$ -> Consumed Message -> JavaLeader.pl
Troubleshooting
Kolejność uruchomienia:
1. Usunięcie logów katalog -> C:\tmp (kafka-logs oraz zookeeper)
2. zookeeper-server-start.bat (Kafka)
3. kafka-server-start.bat (Zookeeper)
4. cmak.bat (Kafka Manager)
Tutaj wkradł się chyba błąd. Czy nie powinien być tu serializer?
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Dzięki Krystian!
CMAK nie trzeba kompilowac. Po prostu pobrac i sa tam juz *.bat i *.sh
Dzięki!