Kafka Connector to MySQL Source
Kafka Connector to MySQL Source – In this Kafka Tutorial, we shall learn to set up a connector to import and listen on a MySQL Database.
To setup a Kafka Connector to MySQL Database source, follow this step by step guide.
1. Install Confluent Open Source Platform
Refer Install Confluent Open Source Platform.
2. Download MySQL connector for Java
MySQL connector for java is required by the Connector to connect to MySQL Database. Download MySQL connector for java, mysql-connector-java-5.1.42-bin.jar , from https://dev.mysql.com/downloads/connector/j/5.1.html.
3. Copy MySQL Connector Jar
Add the jar to existing Kafka Connect JDBC Jars. [Location in Ubuntu /usr/share/java/kafka-connect-jdbc].
4. Configure Data Source Properties
Create a file, /etc/kafka-connect-jdbc/source-quickstart-mysql.properties with following content.
name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/studentsDB?user=arjun&password=password
mode=incrementing
incrementing.column.name=rollno
topic.prefix=test-mysql-jdbc-
Following are the configuration values that you might need to adjust for your MySQL database
- connection.url
connection.url=jdbc:mysql://127.0.0.1:3306/<DatabaseName>?user=<username>&password=<password>
whereusername
andpassword
are the user credentials with which you login to MySQL Database. - incrementing.column.name
The name of the strictly incrementing column in the tables of your database to use to detect new rows. Any empty value indicates the column should be autodetected by looking for an auto-incrementing column. This column may not be nullable. If you don’t have a column with these properties, you may update one of the column with following SQL Commands.
ALTER TABLE <table_name> MODIFY COLUMN <column_name> INT auto_increment
ALTER TABLE <table_name> ADD PRIMARY KEY (<column_name>)
- topic.prefix
Prefix to prepend to table names to generate the name of the Kafka topic to publish data to, or in the case of a custom query, the full name of the topic to publish to.
Example : If yourtopic.prefix=test-mysql-jdbc-
and if you have a table namedstudents
in your Database, the topic name to which Connector publishes the messages would betest-mysql-jdbc-students
.
5. Start Zookeeper, Kafka and Schema Registry
To start Zookeeper, Kafka and Schema Registry, run the following confluent command
$ confluent start schema-registry
$ confluent start schema-registry
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
6. Start standalone connector
Run the following command to start standalone connector.
$ /usr/bin/connect-standalone /etc/schema-registry/connect-avro-standalone.properties /etc/kafka-connect-jdbc/source-quickstart-mysql.properties
The Connector should start successfully.
7. Start a Console Consumer
To verify the messages posted to the topic, start a consumer that subscribes to topic named test-mysql-jdbc-students. [students is the table name and test-mysql-jdbc-
is topic.prefix].
Run the following command to start a consumer
/usr/bin/kafka-avro-console-consumer –topic test-mysql-jdbc-students –zookeeper localhost:2181 –from-beginning
You may replace test-mysql-jdbc-students with the name that your configuration and tables in the MySQL Database generate.
root@tutorialkart:~# /usr/bin/kafka-avro-console-consumer --topic test-mysql-jdbc-students --zookeeper localhost:2181 --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
{"name":{"string":"John"},"rollno":1,"marks":{"int":84}}
{"name":{"string":"Arjun"},"rollno":2,"marks":{"int":84}}
{"name":{"string":"Prasanth"},"rollno":3,"marks":{"int":77}}
{"name":{"string":"Adarsh"},"rollno":4,"marks":{"int":78}}
{"name":{"string":"Raja"},"rollno":5,"marks":{"int":94}}
{"name":{"string":"Sai"},"rollno":6,"marks":{"int":84}}
{"name":{"string":"Ross"},"rollno":7,"marks":{"int":54}}
{"name":{"string":"Monica Gellar"},"rollno":8,"marks":{"int":86}}
{"name":{"string":"Lee"},"rollno":9,"marks":{"int":98}}
{"name":{"string":"Bruce Wane"},"rollno":10,"marks":{"int":92}}
{"name":{"string":"Jack"},"rollno":11,"marks":{"int":82}}
{"name":{"string":"Priya"},"rollno":12,"marks":{"int":88}}
{"name":{"string":"Amy"},"rollno":13,"marks":{"int":84}}
8. Add a row to the MySQL Table
Let us add a row to MySQL Table, students and check if the Console Consumer would receive the message.
mysql> use studentsDB;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> INSERT INTO students (name, marks) VALUES ('Sastri',88);
Query OK, 1 row affected (0.06 sec)
And the consumer receives the message
root@tutorialkart:~# /usr/bin/kafka-avro-console-consumer --topic test-mysql-jdbc-students --zookeeper localhost:2181 --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
{"name":{"string":"John"},"rollno":1,"marks":{"int":84}}
{"name":{"string":"Arjun"},"rollno":2,"marks":{"int":84}}
{"name":{"string":"Prasanth"},"rollno":3,"marks":{"int":77}}
{"name":{"string":"Adarsh"},"rollno":4,"marks":{"int":78}}
{"name":{"string":"Raja"},"rollno":5,"marks":{"int":94}}
{"name":{"string":"Sai"},"rollno":6,"marks":{"int":84}}
{"name":{"string":"Ross"},"rollno":7,"marks":{"int":54}}
{"name":{"string":"Monica Gellar"},"rollno":8,"marks":{"int":86}}
{"name":{"string":"Lee"},"rollno":9,"marks":{"int":98}}
{"name":{"string":"Bruce Wane"},"rollno":10,"marks":{"int":92}}
{"name":{"string":"Jack"},"rollno":11,"marks":{"int":82}}
{"name":{"string":"Priya"},"rollno":12,"marks":{"int":88}}
{"name":{"string":"Amy"},"rollno":13,"marks":{"int":84}}
{"name":{"string":"Sastri"},"rollno":14,"marks":{"int":88}}
Conclusion
In this Apache Kafka Tutorial – Kafka Connector to MySQL Source, we have learnt to setup a Connector to import data to Kafka from MySQL Database Source using Confluent JDBC Connector and MySQL Connect Driver.