Apache Kafka – szybki start do strumieniowego przetwarzania danych

Apache Kafka – szybki start do strumieniowego przetwarzania danych

Apache Kafka to narzędzie do propagacji zdarzeń (z ang. events) w systemach rozproszonych. Projekt powstał w firmie LinkedIn, upubliczniono go w roku 2011 (dla porównania RabbitMQ dostępny jest od roku 2007) dziś zarządzany jest przez fundację Apache. Kafka świetnie sprawdzi się do obsługi ogromnych ilości komunikatów i jednoczesnych połączeń klienckich.

Podstawowa różnica między Apache Kafka i RabbitMQ:

W przypadku rozwiązania Apache Kafka kolejka Kafki jest trwała. Oznacza to, że dane wysyłane do Kafki są przechowywane do momentu upłynięcia określonego czasu lub osiągnięcia określonego limitu. Dopóki nie wydarzy się jedna z tych dwóch rzeczy, wiadomość pozostaje w kolejce nawet po jej zużyciu. Wiadomości mogą być odtwarzane lub konsumowane wiele razy. RabbitMQ przechowuje z kolei wiadomości dopóki aplikacja odbierająca nie połączy się z kolejką i nie odbierze jej. Klient może potwierdzić komunikat w tym momencie lub po zakończeniu jego przetwarzania. W obu tych przypadkach, gdy wiadomość zostanie potwierdzona przez konsumenta, zniknie z kolejki. RabbitMQ sprawdzi się doskonale kiedy odbiorca jest precyzyjnie zdefiniowany i wysyłamy tzw. komendy. Kafka natomiast sprawdzi się kiedy obsługujemy zdarzenia (z ang. events) gdzie odbiorca nie jest precyzyjnie zdefiniowany. Przejdźmy do praktyki!

Tworzymy nowy projekt Spring Boot 2.2.5 – plik pom.xml – zależność do Apache Kafka:

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<version>2.4.4.RELEASE</version>
</dependency>

Konfiguracja Apache Kafka i Kafka Managera:

Pobieramy projekt Apache Kafka:

https://www.apache.org/dyn/closer.cgi?path=/kafka/2.4.1/kafka_2.12-2.4.1.tgz

modyfikujemy plik:

config\server.properties

dodajemy linię która wskazuje na lokalnego hosta:

#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://127.0.0.1:9092

wykonujemy komendę (system windows):

kafka\bin\windows>zookeeper-server-start.bat ..\..\config\zookeeper.properties

Zookeper to usługa, która służy do koordynacji rozproszonych aplikacji. Aby uruchomić Kafkę, konieczna jest działająca instancja Zookepera.

wykonujemy teraz komendę (system windows):

kafka\bin\windows>kafka-server-start.bat ..\..\config\server.properties

Domyślnie Apache Kafka startuje na porcie 9092:

http://localhost:9092

Kafka nie posiada domyślnie graficznej konsoli tak jak RabbitMQ, instalujemy zatem Kafka-Manager:

1. Pobieramy i instalujemy ze strony https://www.scala-lang.org/download/ plik scala-2.13.1.msi:

weryfikujemy wersję:

scala -version

wynik:

Scala code runner version 2.13.1 -- Copyright 2002-2019, LAMP/EPFL and Lightbend, Inc.
[info] 1.2.

2. Pobieramy Kafka Managera ze strony https://github.com/yahoo/CMAK

Sprawdzamy wersję sbt w pliku :

\CMAK-3.0.0.4\CMAK-3.0.0.4\project\build.properties

zawartość pliku build.properties:

sbt.version=1.3.8

3. Pobieramy i instalujemy ze strony https://www.scala-sbt.org/download.html plik sbt-1.3.8 (wersja adekwatna do tej podanej w pliku build.properties):

W terminalu wydajemy polecenie:

sbt

następnie wpisujemy:

continue

weryfikujemy wersje:

sbt:system-tools> sbtVersion

wynik:

[info] sbt server started at local:sbt-server-379338fe84024462dacd
sbt:mwarycha> sbtVersion
[info] 1.3.8
sbt:mwarycha>

4. Budowanie paczki narzędziem sbt:

CMAK-3.0.0.4\CMAK-3.0.0.4>sbt clean dist

w katalogu:

CMAK-3.0.0.4\CMAK-3.0.0.4\target\universal

zbudowana została paczka:

cmak-3.0.0.4.rar

5. Rozpakowujemy plik cmak-3.0.0.4.rar:

[uwaga] ze względu na błąd „The input line is too long” zawartość pliku powinna być rozpakowana bezpośrednio na danej partycji.

W pliku:

./conf/application.conf

modyfikujemy:

kafka-manager.zkhosts="127.0.0.1:2181"

oraz:

cmak.zkhosts="127.0.0.1:2181"

zmiany te dotyczą portu i hosta, na którym działa zookeeper.

6. Start Kafka Managera:

\bin\cmak.bat

wynik – udało się!

Aplikacja Spring Boot

Klasa konfiguracyjna – dodajemy Topic (parametr numPartitions – ilość partycji przydzielonych do danego tematu, parametr replicationFactor odpowiada ile razy Topic ma zostać zreplikowany) :

@Configuration
public class Config {
    @Bean
    public NewTopic adviceTopic() {
        short replicationFactor = 1;
        int numPartitions       = 3;
        return new NewTopic("users", numPartitions, replicationFactor);
    }
}

Dodajemy RestControler:

@RestController
public class KafkaController {
 
    private final Producer producer;
 
    public KafkaController(Producer producer) {
        this.producer = producer;
    }
 
    @PostMapping(value = "/publish")
    public void sendMessageToKafkaTopic(@RequestParam("message") String message){
        this.producer.sendMessage(message);
    }
}

Klasa producenta:

@Service
public class Producer {
 
    private static final Logger logger = LoggerFactory.getLogger(Producer.class);
    private static final String TOPIC = "users";
 
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
 
    public void sendMessage(String message){
        logger.info(String.format("$$ -> Producing message --> %s",message));
        this.kafkaTemplate.send(TOPIC,message);
    }
}

Klasa konsumenta:

@Service
public class Consumer {
 
    private final Logger logger = LoggerFactory.getLogger(Consumer.class);
 
    @KafkaListener(topics = "users", groupId = "34")
    public void consume(String message){
        logger.info(String.format("$$ -> Consumed Message -> %s",message));
    }
}

Plik application.yml:

server:
  port: 8080
 
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id:34
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
 
    producer:
      bootstrap-servers: localhost:9092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

wysyłamy wiadomość – request metodą POST:

http://localhost:8080/publish?message=JavaLeader.pl

wynik na konsoli:

INFO p.j.kafkaquickstart.service.Producer     : $$ -> Producing message --> JavaLeader.pl
INFO p.j.kafkaquickstart.service.Consumer     : $$ -> Consumed Message -> JavaLeader.pl

Troubleshooting

Kolejność uruchomienia:

1. Usunięcie logów katalog -> C:\tmp (kafka-logs oraz zookeeper)

2. zookeeper-server-start.bat (Kafka)

3. kafka-server-start.bat (Zookeeper)

4. cmak.bat (Kafka Manager)

Zobacz kod na GitHubie i zapisz się na bezpłatny newsletter!

.

4 Comments

  1. Tutaj wkradł się chyba błąd. Czy nie powinien być tu serializer?
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

  2. marcin warycha - javaleader.pl 19 lipca 2020 at 10:42

    Dzięki Krystian!

  3. CMAK nie trzeba kompilowac. Po prostu pobrac i sa tam juz *.bat i *.sh

Leave a comment

Your email address will not be published.


*