Apache Kafka Connector

Apache Kafka Connector – Connectors are the components of Kafka that could be setup to listen the changes that happen to a data source like a file or database, and pull in those changes automatically.

Apache Kafka Connector Example – Import Data into Kafka

In this Kafka Connector Example, we shall deal with a simple use case. We shall setup a standalone connector to listen on a text file and import data from the text file. What it does is, once the connector is setup, data in text file is imported to a Kafka Topic as messages. And any further data appended to the text file creates an event. These events are being listened by the Connector. The change in data is written to the Kafka Topic.

ADVERTISEMENT
Apache Kafka Connector - Data Source Example

For this example, we shall try using the default configuration files, to keep things simple for understanding. Following is a step by step guide :

1. Create a Text File

We shall create a text file, test.txt next to bin folder.

arjun@tutorialkart:~/kafka_2.12-1.0.0$ ls
bin  config  data  libs  LICENSE  logs  NOTICE  site-docs  test.txt
arjun@tutorialkart:~/kafka_2.12-1.0.0$ cat test.txt
Hello!
Welcome to TutorialKart
Learn Apache Kafka

2. Start Zookeeper and Kafka Cluster

Navigate to the root of Kafka directory and run each of the following commands in separate terminals to start Zookeeper and Kafka Cluster.

$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties

3. Start Kafka Standalone Connector

To start a standalone Kafka Connector, we need following three configuration files.

  • connect-standalone.properties
  • connect-file-source.properties
  • connect-file-sink.properties

Kafka by default provides these configuration files in config folder. We shall use those config files as is. If you go through those config files, you may find in connect-file-source.properties, that the file is test.txt, which we have created in our first step.

Run the following command from the kafka directory to start a Kafka Standalone Connector :

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

You might observe some lines printed to the console as shown below :

arjun@tutorialkart:~/kafka/kafka_2.11-0.11.0.0$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
[2017-11-02 10:44:28,136] INFO Registered loader: sun.misc.Launcher$AppClassLoader@764c12b6 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-11-02 10:44:28,139] INFO Added plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-02 10:44:28,139] INFO Added plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-02 10:44:28,140] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-02 10:44:28,140] INFO Added plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-02 10:44:28,141] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-02 10:44:28,141] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-11-02 10:44:28,141] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)

4. Observe test.sync.txt created next to test.txt

arjun@tutorialkart:~/kafka_2.12-1.0.0$ ls
bin  config  libs  LICENSE  logs  NOTICE  site-docs  test.sink.txt  test.txt

Once the Connector is started, initially the data in test.txt would be synced to test.sync.txt and the data is published to the Kafka Topic named, connect-test. Then any changes to the test.txt file would be synced to test.sync.txt and published to connect-test topic.

Add a new line, ” Learn Connector with Example” to test.txt.

arjun@tutorialkart:~/kafka_2.12-1.0.0$ echo "Learn Connector" >> test.txt
arjun@tutorialkart:~/kafka_2.12-1.0.0$ cat test.sink.txt
Hello!
Welcome to TutorialKart
Learn Apache Kafka
Learn Connector

5. Consume the messages posted to connect-test topic by a Consumer

We shall start a Consumer and consume the messages (test.txt and additions to test.txt).

Following is a Kafka Console Consumer. You may create Kafka Consumer of your application choice.

arjun@tutorialkart:~/kafka_2.12-1.0.0$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"Hello!"}
{"schema":{"type":"string","optional":false},"payload":"Welcome to TutorialKart"}
{"schema":{"type":"string","optional":false},"payload":"Learn Apache Kafka"}
{"schema":{"type":"string","optional":false},"payload":"Learn Connector"}

Any changes made to the text file is written as messages to the topic by the Kafka Connector. Hence all the consumers subscribed to the topic receive the messages.

Conclusion

In this Kafka Tutorial, we have learnt to create a Kafka Connector to import data from a text file to Kafka Topic.