Komponent integracyjny Kafka Connect

Komponent integracyjny Kafka Connect

Kafka Connect jest częścią platformy Apache Kafka i służy do integracji brokera z zewnętrznymi serwisami np. z bazami danych. Przykładem takiej integracji może być propagowanie zdarzenia systemowego do brokera Kafki kiedy nowy wpis trafia do bazy danych. Zdarzenie to następnie zapisywane jest w Elastic Search celem dalszego przetwarzania.

W tym wpisie pokażę Ci w jaki sposób zintegrować brokera Kafki z bazą danych MySQL. Pierwszą rzeczą jaką należy zrobić to instalacja Confluenta czyli narzędzia będącego częścią projektu Apache Kafka które posłuży nam do integracji. Przykład zostanie zaprezentowany z użyciem Ubuntu 18.0.4:

Instalacja Confluent:

wget -qO – http://packages.confluent.io/deb/3.3/archive.key | sudo apt-key add –
sudo add-apt-repository "deb [arch=amd64] http://packages.confluent.io/deb/3.3 stable main"
sudo apt-get install confluent-platform-oss-2.11

Instalujemy bazę danych MySQL i tworzymy przykładową tabelę:

sudo apt install mysql-server

Warto zmienić hasło:

ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'password';

Tworzymy tabelę:

CREATE TABLE Persons(
    id INT(6) UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    firstname VARCHAR(30) NOT NULL
);

Pobieramy connector JDBC (adekwatny do zainstalowanej wersji bazy danych):

mysql-connector-java-5.1.42-bin i kopiujemy go do folderu:

/usr/share/java/kafka-connect-jdbc

Tworzymy plik odpowiadający za konfigurację dostępu do bazy danych:

/etc/kafka-connect-jdbc/source-quickstart-mysql.properties<!--?prettify linenums=true?-->
name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/person?user=username&password=password&useSSL=false
mode=incrementing
incrementing.column.name=id
topic.prefix=persons-jdbc-

Uruchamiamy Zookeeper:

[uwaga] zanim zostanie Zookeeper uruchomiony należy zainstalować Jave:

sudo apt install default-jdk

Na dzień pisania tego wpisu rekomendowaną wersją jest Java 8:

confluent start schema-registry

wynik:

Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]

Uruchamiamy Kafka Connect:

connect-standalone /etc/schema-registry/connect-avro-standalone.properties /usr/share/java/kafka-connect-jdbc/source-quickstart-mysql.properties

w przypadku błędu należy ustawić dostęp do bazy z „zewnątrz”:

/etc/mysql/mysql.conf.d/mysqld.cnf

gdzie:

#bind-address = 127.0.0.1

oraz weryfikować czy link JDBC posiada:

&useSSL=false

Uruchamiamy Consumera Apache Kafka:

kafka-avro-console-consumer –topic persons-jdbc-Persons -zookeeper localhost:2181 -from-beginning --whitelist persons-jdbc-Persons

Obserwacja publikowanych zdarzeń:

Po dodaniu nowego wpisu do bazy danych:

insert into Persons value(1,'test');

otrzymamy na konsoli wynik publikowanego zdarzenia:

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
{"id":1,"firstname":"test"}

Leave a comment

Your email address will not be published.


*