A connector plugin to use with Kafka's Connect API. It can be configured to map any topic from an mqtt broker to any topic on the Kafka broker. The connector can be configured to use SSL in communication with the mqtt broker.
To be able to test the connector, we first need to set up the Kafka infrastructure. For simplicity, we start by configuring single nodes (one Zookeeper and one Kafka).
java -version
in your terminal. We use openjdk version "11.0.6" 2020-01-14
.mvn -v
in your terminal. We use Maven 3.6.0"path-to-emqx"/emqx/bin/emqx start
"path-to-emqx"/emqx/bin/emqx_ctl status
Download a binary Kafka release from https://kafka.apache.org/downloads. We work with the compressed download:
kafka_2.13-2.4.1.tgz Extract the download to your desired destination, here termed "path-to-kafka".
About Zookeeper:
"Zookeeper is a top-level software developed by Apache that acts as a centralized service and is used to maintain naming and configuration data and to provide flexible and robust synchronization within distributed systems. Zookeeper keeps track of status of the Kafka cluster nodes and it also keeps track of Kafka topics, partitions etc. Zookeeper it self is allowing multiple clients to perform simultaneous reads and writes and acts as a shared configuration service within the system. The Zookeeper atomic broadcast (ZAB) protocol i s the brains of the whole system, making it possible for Zookeeper to act as an atomic broadcast system and issue orderly updates." Cloudkarafka
Start Zookeeper
"path-to-kafka"/kafka_2.13-2.4.1/bin/zookeeper-server-start.sh "path-to-kafka"/kafka_2.13-2.4.1/config/zookeeper.properties
P.S. The default properties of zookeeper.properties works well for this tutorial's purpose. It will start Zookeeper on the default port 2181
.
As mentioned, we will only kick up a single instance Kafka Broker. The Kafka Broker will use "path-to-kafka"/kafka_2.13-2.4.1/config/server.properties
, and it could be worth checking that
zookeeper.connect=localhost:2181
or set according to your custom configuration in zookeeper.properties
.
Start Kafka Broker
"path-to-kafka"/kafka_2.13-2.4.1/bin/kafka-server-start.sh "path-to-kafka"/kafka_2.13-2.4.1/config/server.properties
Create Kafka Topic
"path-to-kafka"/kafka_2.13-2.4.1/bin/kafka-topics --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic test
The Kafka Connect API is what we utilise as a framework around our connectors, to handle scaling, polling from Kafka, work distribution etc. Kafka Connect can run as connect-standalone or as connect-distributed. The connect-standalone is engineered for demo and test purposes, as it cannot provide fallback in a production environment.
Start Kafka Connect Follow the respective steps below to start Kafka Connect in preferred mode.
Connect in general
Build this java maven project, but navigating to root kafka-mqtt-source-connector
in a terminal and typing:
mvn install
Copy the kafka-mqtt-source-connector-"version".jar
from your maven target directory to the directory /usr/share/java/kafka
:
sudo mkdir /usr/share/java/kafka
sudo cp ./target/*with-dependencies.jar /usr/share/java/kafka/.
Insecure - using tcp Connect Standalone
plugin.path
in "path-to-kafka"/kafka_2.13-2.4.1/config/connect-standalone.properties
, so that it is set toplugin.path=/usr/share/java,/usr/local/share/kafka/plugins,/usr/local/share/java/
"path-to-kafka"/kafka_2.13-2.4.1/config/
(or create a new properties file with the same name in the given directory).source-connect-mqtt.properties
:name=mqtt-source-connector
tasks.max=1
connector.class=com.sintef.asam.MqttSourceConnector
mqtt.connector.broker.uri=tcp://0.0.0.0:1883
mqtt.connector.broker.topic=test/#
mqtt.conncetor.kafka.topic=test
where mqtt.connector.broker.topic
sets the topic one wants to subscribe to in the mqtt broker, while mqtt.connector.kafka.topic
sets the topic for publishing to the Kafka broker. The mqtt.connector.broker.uri
needs to be set according to your own mqtt broker, but the default for mosquitto and emqx will be the abovementioned.
"path-to-kafka"/kafka_2.13-2.4.1/bin/connect-standalone.sh "path-to-kafka"/kafka_2.13-2.4.1/config/connect-standalone.properties "path-to-kafka"/kafka_2.13-2.4.1/config/source-connect-mqtt.properties
Connect Distributed
Kafka Connect Distributed does not need properties files to configure connectors. It uses the Kafka Connect REST-interface.
5. Uncomment plugin.path
in "path-to-kafka"/kafka_2.13-2.4.1/config/connect-distributed.properties
, so that it is set to
plugin.path=/usr/share/java,/usr/local/share/kafka/plugins,/usr/local/share/java/
and that rest.port
so that it is set to
rest.port=19005
which will help one to avoid some "bind" exceptions. This will be the port for the Connect REST-interface. 6. Start Connect Distributed with by typing (this may take a minute or two):
"path-to-kafka"/kafka_2.13-2.4.1/bin/connect-distributed.sh "path-to-kafka"/kafka_2.13-2.4.1/config/connect-distributed.properties
curl -s -X POST -H 'Content-Type: application/json' http://127.0.0.1:19005/connectors -d '{"name":"mqtt-source-connector","config":{"connector.class":"com.sintef.asam.MqttSourceConnector","tasks.max":"1","mqtt.connector.broker.uri":"tcp://localhost:1883", "mqtt.connector.broker.topic":"test/#","mqtt.connector.kafka.topic":"test"}}'
curl 'Content-Type: application/json' http://127.0.0.1:19005/connectors
where the response is an array with connectors by name.
test
:Documents/confluent-5.4.0/bin/kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test
mosquitto_pub -h 127.0.0.1 -p 1883 -t test -m "Hello, world!"
and see the message appear base64 encoded to your Kafka Consumer.
Secure - using SSL
Setting up your own certificate authority (for test purposes) and configure MQTT broker to use SSL
We first need certificates and keys to encrypt our secure socket layer (SSL) communication to and from the broker. To make your own certificate authority, and to create a client certificate and a client key, this provides very thorough and instructive guide: https://deliciousbrains.com/ssl-certificate-authority-for-local-https-development/
Let us assume that you have a /home/CA.crt
, a /home/client.crt
and a /home/client.key
, we configure our EMQX broker by finding the configuration file "path-to-emqx"/etc/emqx.conf
and setting/uncommenting the following properties:
listener.ssl.external = 8883
listener.ssl.external.access.1 = allow all
listener.ssl.external.keyfile = /home/client.key
listener.ssl.external.certfile = /home/client.cert
listener.ssl.external.cacertfile = /home/CA.cert
Then restart your mqtt broker.
Connect Distributed 12. Delete the previous made connector using TCP, if one is running, using the following call to the Connect REST-interface:
curl -X DELETE 'Content-Type: application/json' http://127.0.0.1:19005/connectors/mqtt-source-connector
if your connector was named mqtt-source-connector
. Check running connectors by name using:
curl 'Content-Type: application/json' http://127.0.0.1:19005/connectors
/home/ca.crt
, /home/client.crt
and /home/client.key
.curl -s -X POST -H 'Content-Type: application/json' http://127.0.0.1:19005/connectors -d '{"name":"mqtt-source-connector","config":{"connector.class":"com.sintef.asam.MqttSourceConnector","tasks.max":"1","mqtt.connector.broker.uri":"ssl://localhost:8883", "mqtt.connector.broker.topic":"test/#", "mqtt.connector.kafka.topic":"test","mqtt.connector.ssl":true, "mqtt.connector.ssl.ca":"/home/ca.crt/","mqtt.connector.ssl.crt":"/home/client.crt","mqtt.connector.ssl.key":"/home/client.key"}}'
test
:Documents/confluent-5.4.0/bin/kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test
mosquitto_pub --url mqtts://127.0.0.1:8883/test --cafile /home/ca.crt --cert /home/client.crt --key /home/client.key --insecure --tls-version tlsv1.2 -m "Hello, world!"
and see the message appear base64 encoded to your Kafka Consumer.
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。