Kafka Connector to MySQL Source

Kafka Connector to MySQL Source – In this Kafka Tutorial, we shall learn to set up a connector to import and listen on a MySQL Database.

To setup a Kafka Connector to MySQL Database source, follow this step by step guide.

1. Install Confluent Open Source Platform

Refer Install Confluent Open Source Platform.

2. Download MySQL connector for Java

MySQL connector for java is required by the Connector to connect to MySQL Database. Download MySQL connector for java, mysql-connector-java-5.1.42-bin.jar , from https://dev.mysql.com/downloads/connector/j/5.1.html.

3. Copy MySQL Connector Jar

Add the jar to existing Kafka Connect JDBC Jars. [Location in Ubuntu /usr/share/java/kafka-connect-jdbc].

4. Configure Data Source Properties

Create a file, /etc/kafka-connect-jdbc/source-quickstart-mysql.properties with following content.

name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/studentsDB?user=arjun&password=password
mode=incrementing
incrementing.column.name=rollno
topic.prefix=test-mysql-jdbc-

Following are the configuration values that you might need to adjust for your MySQL database

  • connection.url connection.url=jdbc:mysql://127.0.0.1:3306/<DatabaseName>?user=<username>&password=<password> where username and password are the user credentials with which you login to MySQL Database.
  • incrementing.column.name
    The name of the strictly incrementing column in the tables of your database to use to detect new rows. Any empty value indicates the column should be autodetected by looking for an auto-incrementing column. This column may not be nullable. If you don’t have a column with these properties, you may update one of the column with following SQL Commands.
</>
Copy
ALTER TABLE <table_name> MODIFY COLUMN <column_name> INT auto_increment
ALTER TABLE <table_name> ADD PRIMARY KEY (<column_name>)
  • topic.prefix
    Prefix to prepend to table names to generate the name of the Kafka topic to publish data to, or in the case of a custom query, the full name of the topic to publish to.
    Example : If your topic.prefix=test-mysql-jdbc-  and if you have a table named students in your Database, the topic name to which Connector publishes the messages would be test-mysql-jdbc-students .

5. Start Zookeeper, Kafka and Schema Registry

To start Zookeeper, Kafka and Schema Registry, run the following confluent command

$ confluent start schema-registry
$ confluent start schema-registry
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]

6. Start standalone connector

Run the following command to start standalone connector.

$ /usr/bin/connect-standalone /etc/schema-registry/connect-avro-standalone.properties /etc/kafka-connect-jdbc/source-quickstart-mysql.properties

The Connector should start successfully.

7. Start a Console Consumer

To verify the messages posted to the topic, start a consumer that subscribes to topic named test-mysql-jdbc-students. [students  is the table name and test-mysql-jdbc-  is topic.prefix].

Run the following command to start a consumer

/usr/bin/kafka-avro-console-consumer –topic test-mysql-jdbc-students –zookeeper localhost:2181 –from-beginning

You may replace test-mysql-jdbc-students with the name that your configuration and tables in the MySQL Database generate.

root@tutorialkart:~# /usr/bin/kafka-avro-console-consumer --topic test-mysql-jdbc-students --zookeeper localhost:2181 --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
{"name":{"string":"John"},"rollno":1,"marks":{"int":84}}
{"name":{"string":"Arjun"},"rollno":2,"marks":{"int":84}}
{"name":{"string":"Prasanth"},"rollno":3,"marks":{"int":77}}
{"name":{"string":"Adarsh"},"rollno":4,"marks":{"int":78}}
{"name":{"string":"Raja"},"rollno":5,"marks":{"int":94}}
{"name":{"string":"Sai"},"rollno":6,"marks":{"int":84}}
{"name":{"string":"Ross"},"rollno":7,"marks":{"int":54}}
{"name":{"string":"Monica Gellar"},"rollno":8,"marks":{"int":86}}
{"name":{"string":"Lee"},"rollno":9,"marks":{"int":98}}
{"name":{"string":"Bruce Wane"},"rollno":10,"marks":{"int":92}}
{"name":{"string":"Jack"},"rollno":11,"marks":{"int":82}}
{"name":{"string":"Priya"},"rollno":12,"marks":{"int":88}}
{"name":{"string":"Amy"},"rollno":13,"marks":{"int":84}}

8. Add a row to the MySQL Table

Let us add a row to MySQL Table, students and check if the Console Consumer would receive the message.

mysql> use studentsDB;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
 
Database changed
mysql> INSERT INTO students (name, marks) VALUES ('Sastri',88);
Query OK, 1 row affected (0.06 sec)

And the consumer receives the message

root@tutorialkart:~# /usr/bin/kafka-avro-console-consumer --topic test-mysql-jdbc-students --zookeeper localhost:2181 --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
{"name":{"string":"John"},"rollno":1,"marks":{"int":84}}
{"name":{"string":"Arjun"},"rollno":2,"marks":{"int":84}}
{"name":{"string":"Prasanth"},"rollno":3,"marks":{"int":77}}
{"name":{"string":"Adarsh"},"rollno":4,"marks":{"int":78}}
{"name":{"string":"Raja"},"rollno":5,"marks":{"int":94}}
{"name":{"string":"Sai"},"rollno":6,"marks":{"int":84}}
{"name":{"string":"Ross"},"rollno":7,"marks":{"int":54}}
{"name":{"string":"Monica Gellar"},"rollno":8,"marks":{"int":86}}
{"name":{"string":"Lee"},"rollno":9,"marks":{"int":98}}
{"name":{"string":"Bruce Wane"},"rollno":10,"marks":{"int":92}}
{"name":{"string":"Jack"},"rollno":11,"marks":{"int":82}}
{"name":{"string":"Priya"},"rollno":12,"marks":{"int":88}}
{"name":{"string":"Amy"},"rollno":13,"marks":{"int":84}}
{"name":{"string":"Sastri"},"rollno":14,"marks":{"int":88}}

Conclusion

In this Apache Kafka TutorialKafka Connector to MySQL Source, we have learnt to setup a Connector to import data to Kafka from MySQL Database Source using Confluent JDBC Connector and MySQL Connect Driver.