1 Star 0 Fork 1

谷动谷力/MQTT-Client-Examples

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

NanoSDK

English

安装NanoSDK库

git clone https://github.com/emqx/NanoSDK
cd NanoSDK
mkdir build
cd build
cmake -G Ninja ..
# 编译TLS: 
# cmake -G Ninja -DNNG_ENABLE_TLS=ON ..
# 编译SQLITE: 
# cmake -G Ninja -DNNG_ENABLE_SQLITE=ON ..
ninja
ninja install

创建nng_socket

nng_socket sock;
nng_mqtt_client_open(&sock);
nng_mqttv5_client_open(&sock);

创建work

work定义

struct work {
    enum { INIT, RECV, WAIT, SEND } state;
    nng_aio *aio;
    nng_msg *msg;
    nng_ctx  ctx;
};

为socket创建n个work

static size_t nwork = 32;

struct work *works[nwork];

for (i = 0; i < nwork; i++) {
  works[i] = alloc_work(sock);
}

创建并设置mqtt connect消息体

nng_msg *msg;
nng_mqtt_msg_alloc(&msg, 0);
nng_mqtt_msg_set_packet_type(msg, NNG_MQTT_CONNECT);
nng_mqtt_msg_set_connect_keep_alive(msg, 60);
nng_mqtt_msg_set_connect_clean_session(msg, true);
nng_mqtt_msg_set_connect_proto_version(msg, opts->version);
 nng_mqtt_msg_set_connect_user_name(msg, "admin");
 nng_mqtt_msg_set_connect_password(msg, "public");

创建并设置nng_dialer


nng_mqtt_set_connect_cb(sock, connect_cb, &sock);
nng_mqtt_set_disconnect_cb(sock, disconnect_cb, NULL);

if ((rv = nng_dialer_create(&dialer, sock, opts->url)) != 0) {
    fatal("nng_dialer_create: %s", nng_strerror(rv));
}

if (opts->enable_ssl) {
    if ((rv = init_dialer_tls(dialer, opts->cacert, opts->cert,
                opts->key, opts->keypass)) != 0) {
        fatal("init_dialer_tls: %s", nng_strerror(rv));
    }
}

nng_dialer_set_ptr(dialer, NNG_OPT_MQTT_CONNMSG, msg);
nng_dialer_start(dialer, NNG_FLAG_NONBLOCK);

状态机回调函数

本实例演示从初始INIT状态进入到RECV状态后进入数据接收状态

RECV状态中获取收到的PUBLISH数据

WAIT状态中进行清理nng_msg并重用,组装一个PUBLISH消息,设置SEND状态并进入发送回调

SEND状态获取发送结果,并返回到RECV状态中

void
client_cb(void *arg)
{
	struct work *work = arg;
	nng_msg *    msg;
	int          rv;

	switch (work->state) {

	case INIT:
		work->state = RECV;
		nng_ctx_recv(work->ctx, work->aio);
		break;

	case RECV:
		if ((rv = nng_aio_result(work->aio)) != 0) {
			fatal("nng_recv_aio", rv);
			work->state = RECV;
			nng_ctx_recv(work->ctx, work->aio);
			break;
		}

		work->msg   = nng_aio_get_msg(work->aio);
		work->state = WAIT;
		nng_sleep_aio(0, work->aio);
		break;

	case WAIT:
		msg = work->msg;

		// Get PUBLISH payload and topic from msg;
		uint32_t payload_len;
		uint8_t *payload =
		    nng_mqtt_msg_get_publish_payload(msg, &payload_len);
		uint32_t    topic_len;
		const char *recv_topic =
		    nng_mqtt_msg_get_publish_topic(msg, &topic_len);

		printf("RECV: '%.*s' FROM: '%.*s'\n", payload_len,
		    (char *) payload, topic_len, recv_topic);

		uint8_t *send_data = nng_alloc(payload_len);
		memcpy(send_data, payload, payload_len);

		nng_msg_header_clear(work->msg);
		nng_msg_clear(work->msg);

		// Send payload to topic "/nanomq/msg/transfer"
		char *topic = "/nanomq/msg/transfer";
		nng_mqtt_msg_set_packet_type(work->msg, NNG_MQTT_PUBLISH);
		nng_mqtt_msg_set_publish_topic(work->msg, topic);
		nng_mqtt_msg_set_publish_payload(
		    work->msg, send_data, payload_len);

		printf("SEND: '%.*s' TO:   '%s'\n", payload_len,
		    (char *) send_data, topic);

		nng_free(send_data, payload_len);
		nng_aio_set_msg(work->aio, work->msg);
		work->msg   = NULL;
		work->state = SEND;
		nng_ctx_send(work->ctx, work->aio);
		break;

	case SEND:
		if ((rv = nng_aio_result(work->aio)) != 0) {
			nng_msg_free(work->msg);
			fatal("nng_send_aio", rv);
		}
		work->state = RECV;
		nng_ctx_recv(work->ctx, work->aio);
		break;

	default:
		fatal("bad state!", NNG_ESTATE);
		break;
	}
}

启动回调函数

for (i = 0; i < nwork; i++) {
    client_cb(works[i]);
}

for (;;) {
    nng_msleep(3600000); // neither pause() nor sleep() portable
}

工程编译及运行

mkdir build
cd build
cmake -G Ninja ..
# 编译TLS: 
# cmake -G Ninja -DNNG_ENABLE_TLS=ON ..
# 编译SQLITE: 
# cmake -G Ninja -DNNG_ENABLE_SQLITE=ON ..
ninja
mqtt_async: 
    -h, --help    
    --url           <url>    The url for mqtt broker 
                    ('mqtt-tcp://host:port' or 
                    'tls+mqtt-tcp://host:port')
                    [default: mqtt-tcp://127.0.0.1:1883]
    -n, --parallel  <number of works> (default: 32)
    -v, --version   <mqtt version> (default: 4)
    -u, --username  <username>
    -P, --password  <password>
    --sqlite        enable sqlite cache (default: false)
    -s, --secure    enable ssl/tls mode (default: disable)
    --cacert        <cafile path>
    -E, --cert      <cert file path>
    --key           <key file path>
    --pey_pass      <key password>
# connect broker via tcp
./mqtt_async --url "mqtt-tcp://127.0.0.1:1883"
# connect broker via tls
./mqtt_async --url "tls+mqtt-tcp://127.0.0.1:8883" -s --cacert ca.crt --cert server.crt --key server.key 

NanoSDK实现案例

nanomq_cli

完整代码工程

mqtt-client-nng

NanoSDK源码

NanoSDK

马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/lojam/MQTT-Client-Examples.git
git@gitee.com:lojam/MQTT-Client-Examples.git
lojam
MQTT-Client-Examples
MQTT-Client-Examples
master

搜索帮助