Apache Kafka Connect to Azure Event Hubs
Recently I was doing integration with Azure Event Hubs. A colleague struggled to export the messages in an existing Kafka topic and import them to Event Hubs. Therefore I document the steps below, which you may find helpful.
Step 1: Get Kafka, download and extract it:
Apache Kafka is an open-source distributed event streaming platform. It helps build distributed system and ensures high throughput. The Apache Kafka can be downloaded from this address: https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.13-3.1.0.tgz
$ tar -xzf kafka_2.13-3.1.0.tgz
$ cd kafka_2.13-3.1.0
Step 2: Start the Kafka environment
If your local environment already has Java 8+ installed, follow and run the below command to start all services. (If not, download and install Java: https://www.oracle.com/java/technologies/downloads/#jdk18-mac
Run zookeeper service:
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Start Kafka broker
$ bin/kafka-server-start.sh config/server.properties
Step 3: Create and setup config
Create a new file connector.properties
with the values below.
bootstrap.servers={NAMESPACE.NAME}.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://{NAMESPACE.NAME}.servicebus.windows.net/;SharedAccessKeyName={XXXXXX};SharedAccessKey={XXXXXX}";
And create a new file connect-distributed.properties
bootstrap.servers={NAMESPACE.NAME}.servicebus.windows.net:9093
group.id=connect-cluster-group
# connect internal topic names, automatically created by Kafka Connect with AdminClient API if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
connections.max.idle.ms=180000
metadata.max.age.ms=180000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://{NAMESPACE.NAME}.servicebus.windows.net/;SharedAccessKeyName={XXXXXX};SharedAccessKey={XXXXXX}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://{NAMESPACE.NAME}.servicebus.windows.net/;SharedAccessKeyName={XXXXXX};SharedAccessKey={XXXXXX}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://{NAMESPACE.NAME}.servicebus.windows.net/;SharedAccessKeyName={XXXXXX};SharedAccessKey={XXXXXX}";
plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release
Replace the placeholder values with the Azure endpoint. Create a new namespace and deploy Events Hubs resources if you haven't already in the Azure portal. Note that you might need to select pricing tier Standard
or higher to create Kafka topics successfully in the next step.
The password above can be found in the settings of Event Hub namespace, Shared access policies
and using the SAS Policy: RootManageSharedAccessKey
. Copy the Connection string–primary key
values and replace the above config file values.
Step 4: Create 3 kafka topics:
We would use kafka-topics
commands to create the topics ourselves:
Create configs topic:
$ bin/kafka-topics.sh --bootstrap-server {NAMESPACE.NAME}.servicebus.windows.net:9093 --command-config path/to/connector.properties --create --topic CONFIGS-TOPIC-NAME --config cleanup.policy=compact --partitions 1 --replication-factor 1
If successful, you would see the response Created topic CONFIGS-TOPIC-NAME.
Create offsets topic:
$ bin/kafka-topics.sh --bootstrap-server {NAMESPACE.NAME}.servicebus.windows.net:9093 --command-config path/to/connector.properties --create --topic OFFSETS-TOPIC-NAME --config cleanup.policy=compact --partitions 25 --replication-factor 1
If successful, you would see the response Created topic OFFSETS-TOPIC-NAME.
Create status topic:
$ bin/kafka-topics.sh --bootstrap-server {NAMESPACE.NAME}.servicebus.windows.net:9093 --command-config path/to/connector.properties --create --topic STATUS-TOPIC-NAME --config cleanup.policy=compact --partitions 5 --replication-factor 1
If successful, you would see the response Created topic STATUS-TOPIC-NAME.
Step 5: Running Kafka Connect
Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and Azure Event Hubs. It allows you to continuously ingest data from Azure Event Hubs into Kafka and vice versa. To continuously import/export your data into and out of Kafka, start worker locally in distributed mode.
$ bin/connect-distributed.sh path/to/connect-distributed.properties
With everything above up and running, you can test out the import and export in the next step.
Step 6: Create input and output files:
Create a directory and then create two files: one file with seed data from which the FileStreamSource connector will read and another to which our FileStreamSink connector will write.
$ mkdir ~/connect-demo
$ seq 1000 > ~/connect-demo/input.txt
$ touch ~/connect-demo/output.txt
Step 7: Create filestream source connector:
Next, I will walk you through spinning up FileStreamSource.
curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-demo","file": "{YOUR/HOME/PATH}/connect-demo/input.txt"}}' http://localhost:8083/connectors
And check the status:
curl -s http://localhost:8083/connectors/file-source/status
If it's a success, it would respond:
{"name":"file-source","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"}],"type":"source"}
Step 8: Create FileStreamSink Connector
Similar to above, spinning up FileStreamSink connectors.
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-demo", "file": "{YOUR/HOME/PATH}/connect-demo/output.txt"}}' http://localhost:8083/connectors
And check the status:
curl -s http://localhost:8083/connectors/file-sink/status
Finally, verify that data has been replicated between files and that the data is identical across both files.
Read the file
cat ~/connect-demo/output.txt
You would see the output.txt
has 1 to 1000 like the input.txt
file. That's it, and if you change the input.txt
, the output would be synced to update accordingly.
Finally, please note that Event Hubs support for the Kafka Connect API is in public preview. The deployed FileStreamSource and FileStreamSink connectors are not meant for production use. They are only used for demonstration purposes.