Apache Kafka Connect to Azure Event Hubs

Recently I was doing integration with Azure Event Hubs. A colleague struggled to export the messages in an existing Kafka topic and import them to Event Hubs. Therefore I document the steps below, which you may find helpful.

Step 1: Get Kafka, download and extract it:

Apache Kafka is an open-source distributed event streaming platform. It helps build distributed system and ensures high throughput. The Apache Kafka can be downloaded from this address: https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.13-3.1.0.tgz

$ tar -xzf kafka_2.13-3.1.0.tgz
$ cd kafka_2.13-3.1.0

Step 2: Start the Kafka environment

If your local environment already has Java 8+ installed, follow and run the below command to start all services. (If not, download and install Java: https://www.oracle.com/java/technologies/downloads/#jdk18-mac

Run zookeeper service:

$ bin/zookeeper-server-start.sh config/zookeeper.properties

Start Kafka broker

$ bin/kafka-server-start.sh config/server.properties

Step 3: Create and setup config

Create a new file connector.properties with the values below.

bootstrap.servers={NAMESPACE.NAME}.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://{NAMESPACE.NAME}.servicebus.windows.net/;SharedAccessKeyName={XXXXXX};SharedAccessKey={XXXXXX}";

And create a new file connect-distributed.properties

bootstrap.servers={NAMESPACE.NAME}.servicebus.windows.net:9093
group.id=connect-cluster-group

# connect internal topic names, automatically created by Kafka Connect with AdminClient API if not exists

config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000
connections.max.idle.ms=180000
metadata.max.age.ms=180000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://{NAMESPACE.NAME}.servicebus.windows.net/;SharedAccessKeyName={XXXXXX};SharedAccessKey={XXXXXX}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://{NAMESPACE.NAME}.servicebus.windows.net/;SharedAccessKeyName={XXXXXX};SharedAccessKey={XXXXXX}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://{NAMESPACE.NAME}.servicebus.windows.net/;SharedAccessKeyName={XXXXXX};SharedAccessKey={XXXXXX}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

Replace the placeholder values with the Azure endpoint. Create a new namespace and deploy Events Hubs resources if you haven't already in the Azure portal. Note that you might need to select pricing tier Standard or higher to create Kafka topics successfully in the next step.

The password above can be found in the settings of Event Hub namespace, Shared access policies and using the SAS Policy: RootManageSharedAccessKey. Copy the Connection string–primary key values and replace the above config file values.

Step 4: Create 3 kafka topics:

We would use kafka-topics commands to create the topics ourselves:

Create configs topic:

$ bin/kafka-topics.sh --bootstrap-server {NAMESPACE.NAME}.servicebus.windows.net:9093 --command-config path/to/connector.properties --create --topic CONFIGS-TOPIC-NAME --config cleanup.policy=compact --partitions 1 --replication-factor 1

If successful, you would see the response Created topic CONFIGS-TOPIC-NAME.

Create offsets topic:

$ bin/kafka-topics.sh --bootstrap-server {NAMESPACE.NAME}.servicebus.windows.net:9093 --command-config path/to/connector.properties --create --topic OFFSETS-TOPIC-NAME --config cleanup.policy=compact --partitions 25 --replication-factor 1

If successful, you would see the response Created topic OFFSETS-TOPIC-NAME.

Create status topic:

$ bin/kafka-topics.sh --bootstrap-server {NAMESPACE.NAME}.servicebus.windows.net:9093 --command-config path/to/connector.properties --create --topic STATUS-TOPIC-NAME --config cleanup.policy=compact --partitions 5 --replication-factor 1

If successful, you would see the response Created topic STATUS-TOPIC-NAME.

Step 5: Running Kafka Connect

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and Azure Event Hubs. It allows you to continuously ingest data from Azure Event Hubs into Kafka and vice versa. To continuously import/export your data into and out of Kafka, start worker locally in distributed mode.

$ bin/connect-distributed.sh path/to/connect-distributed.properties

With everything above up and running, you can test out the import and export in the next step.

Step 6: Create input and output files:

Create a directory and then create two files: one file with seed data from which the FileStreamSource connector will read and another to which our FileStreamSink connector will write.

$ mkdir ~/connect-demo
$ seq 1000 > ~/connect-demo/input.txt
$ touch ~/connect-demo/output.txt

Step 7: Create filestream source connector:

Next, I will walk you through spinning up FileStreamSource.

curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-demo","file": "{YOUR/HOME/PATH}/connect-demo/input.txt"}}' http://localhost:8083/connectors

And check the status:

curl -s http://localhost:8083/connectors/file-source/status

If it's a success, it would respond:

{"name":"file-source","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"}],"type":"source"}

Step 8: Create FileStreamSink Connector

Similar to above, spinning up FileStreamSink connectors.

curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-demo", "file": "{YOUR/HOME/PATH}/connect-demo/output.txt"}}' http://localhost:8083/connectors

And check the status:

curl -s http://localhost:8083/connectors/file-sink/status

Finally, verify that data has been replicated between files and that the data is identical across both files.

Read the file

cat ~/connect-demo/output.txt

You would see the output.txt has 1 to 1000 like the input.txt file. That's it, and if you change the input.txt, the output would be synced to update accordingly.

Finally, please note that Event Hubs support for the Kafka Connect API is in public preview. The deployed FileStreamSource and FileStreamSink connectors are not meant for production use. They are only used for demonstration purposes.