Komponent integracyjny Kafka Connect
Komponent integracyjny Kafka Connect
Kafka Connect jest częścią platformy Apache Kafka i służy do integracji brokera z zewnętrznymi serwisami np. z bazami danych. Przykładem takiej integracji może być propagowanie zdarzenia systemowego do brokera Kafki kiedy nowy wpis trafia do bazy danych. Zdarzenie to następnie zapisywane jest w Elastic Search celem dalszego przetwarzania.
W tym wpisie pokażę Ci w jaki sposób zintegrować brokera Kafki z bazą danych MySQL. Pierwszą rzeczą jaką należy zrobić to instalacja Confluenta czyli narzędzia będącego częścią projektu Apache Kafka które posłuży nam do integracji. Przykład zostanie zaprezentowany z użyciem Ubuntu 18.0.4:
Instalacja Confluent:
wget -qO – http://packages.confluent.io/deb/3.3/archive.key | sudo apt-key add –
sudo add-apt-repository "deb [arch=amd64] http://packages.confluent.io/deb/3.3 stable main"
sudo apt-get install confluent-platform-oss-2.11
Instalujemy bazę danych MySQL i tworzymy przykładową tabelę:
sudo apt install mysql-server
Warto zmienić hasło:
ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'password';
Tworzymy tabelę:
CREATE TABLE Persons( id INT(6) UNSIGNED AUTO_INCREMENT PRIMARY KEY, firstname VARCHAR(30) NOT NULL );
Pobieramy connector JDBC (adekwatny do zainstalowanej wersji bazy danych):
mysql-connector-java-5.1.42-bin i kopiujemy go do folderu:
/usr/share/java/kafka-connect-jdbc
Tworzymy plik odpowiadający za konfigurację dostępu do bazy danych:
/etc/kafka-connect-jdbc/source-quickstart-mysql.properties<!--?prettify linenums=true?-->
name=test-source-mysql-jdbc-autoincrement connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:mysql://127.0.0.1:3306/person?user=username&password=password&useSSL=false mode=incrementing incrementing.column.name=id topic.prefix=persons-jdbc-
Uruchamiamy Zookeeper:
[uwaga] zanim zostanie Zookeeper uruchomiony należy zainstalować Jave:
sudo apt install default-jdk
Na dzień pisania tego wpisu rekomendowaną wersją jest Java 8:
confluent start schema-registry
wynik:
Starting zookeeper zookeeper is [UP] Starting kafka kafka is [UP] Starting schema-registry schema-registry is [UP]
Uruchamiamy Kafka Connect:
connect-standalone /etc/schema-registry/connect-avro-standalone.properties /usr/share/java/kafka-connect-jdbc/source-quickstart-mysql.properties
w przypadku błędu należy ustawić dostęp do bazy z „zewnątrz”:
/etc/mysql/mysql.conf.d/mysqld.cnf
gdzie:
#bind-address = 127.0.0.1
oraz weryfikować czy link JDBC posiada:
&useSSL=false
Uruchamiamy Consumera Apache Kafka:
kafka-avro-console-consumer –topic persons-jdbc-Persons -zookeeper localhost:2181 -from-beginning --whitelist persons-jdbc-Persons
Obserwacja publikowanych zdarzeń:
Po dodaniu nowego wpisu do bazy danych:
insert into Persons value(1,'test');
otrzymamy na konsoli wynik publikowanego zdarzenia:
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. {"id":1,"firstname":"test"}
Leave a comment