本文将使用RT-Thread配合ART-Pi搭建MQTT客户端,快速接入EMQ X Cloud 。
EMQ X Cloud 是由 EMQ 公司推出的可连接海量物联网设备,集成各类数据库及业务系统的全托管云原生 MQTT 服务。作为全球首个全托管的 MQTT 5.0 公有云服务,EMQ X Cloud 提供了一站式运维代管、独有隔离环境的 MQTT 消息服务。
在万物互联的时代,EMQ X Cloud 可以帮助用户快速构建面向物联网领域的行业应用,轻松实现物联网数据的采集、传输、计算和持久化。
借助云服务商提供的基础计算设施,EMQ X Cloud 面向全球数十个国家与地区提供服务,为 5G 与万物互联应用提供低成本、安全可靠的云服务。
通过快速入门创建部署EMQX Cloud
以下是创建完成的实例
本文使用RT-Thread官方IDE RT-Thread-Studio来创建工程;
本次Demo使用的是RT-Thread官方的开发板 ART-Pi,通过板载Wifi模块进行联网,可以直接创建一个art_pi_wifi样例工程进行MQTT客户端的开发;
进入配置页面
选择“More”
启用RTC驱动
引入MQTT依赖包
启动TLS需设置mqtt线程栈大小 ≥ 6144
配置mbedtls
若选择用户CA证书(单/双向认证)
若选择无证书SSL连接(单向认证)
保存当前配置,IDE会将配置更新到工程中
修改宏 MEMP_NUM_NETDB 为 2
位于项目路径"rt-thread\components\net\lwip-2.0.2\src\include\lwip\opt.h:488"
/**
* MEMP_NUM_NETDB: the number of concurrently running lwip_addrinfo() calls
* (before freeing the corresponding memory using lwip_freeaddrinfo()).
*/
#if !defined MEMP_NUM_NETDB || defined __DOXYGEN__
#define MEMP_NUM_NETDB 2
#endif
打开applications/main.c,可见RT-Thread-Studio已经帮我们生成好了连接WiFi和LED操作的代码
#include <rtthread.h>
#include <rtdevice.h>
#include "drv_common.h"
#define LED_PIN GET_PIN(I, 8)
extern void wlan_autoconnect_init(void);
int main(void)
{
rt_uint32_t count = 1;
rt_pin_mode(LED_PIN, PIN_MODE_OUTPUT);
/* init Wi-Fi auto connect feature */
wlan_autoconnect_init();
/* enable auto reconnect on WLAN device */
rt_wlan_config_autoreconnect(RT_TRUE);
while(count++)
{
rt_thread_mdelay(500);
rt_pin_write(LED_PIN, PIN_HIGH);
rt_thread_mdelay(500);
rt_pin_write(LED_PIN, PIN_LOW);
}
return RT_EOK;
}
#include "stm32h7xx.h"
static int vtor_config(void)
{
/* Vector Table Relocation in Internal QSPI_FLASH */
SCB->VTOR = QSPI_BASE;
return 0;
}
INIT_BOARD_EXPORT(vtor_config);
为了实现第一次启动也能自动连接WiFi,我们可以在main()加入连接函数;
rt_wlan_connect(WIFI_SSID, WIFI_PASSWORD);
分别新建mqtt-client.c和mqtt-client.h;
创建mqtt client
static void mqtt_create(void)
{
/* init condata param by using MQTTPacket_connectData_initializer */
MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
static char client_id[50] = { 0 };
rt_memset(&client, 0, sizeof(MQTTClient));
/* config MQTT context param */
{
client.uri = MQTT_BROKER_URI;
/* config connect param */
memcpy(&client.condata, &condata, sizeof(condata));
rt_snprintf(client_id, sizeof(client_id), "%s%d",MQTT_CLIENTID, rt_tick_get());
client.condata.clientID.cstring = client_id;
client.condata.keepAliveInterval = 60;
client.condata.cleansession = 1;
client.condata.username.cstring = MQTT_USERNAME;
client.condata.password.cstring = MQTT_PASSWORD;
/* config MQTT will param. */
client.condata.willFlag = 1;
client.condata.will.qos = MQTT_QOS;
client.condata.will.retained = 0;
client.condata.will.topicName.cstring = MQTT_PUBTOPIC;
client.condata.will.message.cstring = MQTT_WILLMSG;
/* malloc buffer. */
client.buf_size = client.readbuf_size = MQTT_PUB_SUB_BUF_SIZE;
client.buf = rt_malloc(client.buf_size);
client.readbuf = rt_malloc(client.readbuf_size);
if (!(client.buf && client.readbuf))
{
rt_kprintf("no memory for MQTT client buffer!\n");
goto _exit;
}
/* set event callback function */
client.connect_callback = mqtt_connect_callback;
client.online_callback = mqtt_online_callback;
client.offline_callback = mqtt_offline_callback;
/* set subscribe table and event callback */
client.messageHandlers[0].topicFilter = rt_strdup(MQTT_SUBTOPIC);
client.messageHandlers[0].callback = mqtt_sub_callback;
client.messageHandlers[0].qos = MQTT_QOS;
/* set default subscribe event callback */
client.defaultMessageHandler = mqtt_sub_callback;
}
/* run mqtt client */
paho_mqtt_start(&client);
return;
_exit:
if (client.buf)
{
rt_free(client.buf);
client.buf = RT_NULL;
}
if (client.readbuf)
{
rt_free(client.readbuf);
client.readbuf = RT_NULL;
}
return;
}
对应 连接/订阅/上线/离线 回调函数
static void mqtt_connect_callback(MQTTClient *c)
{
LOG_D("mqtt_connect_callback!");
}
static void mqtt_sub_callback(MQTTClient *c, MessageData *msg_data)
{
sub_count++;
*((char *)msg_data->message->payload + msg_data->message->payloadlen) = '\0';
rt_kprintf("mqtt sub callback[%u]: \n topic: %.*s \n message %.*s",
sub_count,
msg_data->topicName->lenstring.len,
msg_data->topicName->lenstring.data,
msg_data->message->payloadlen,
(char *)msg_data->message->payload);
}
static void mqtt_online_callback(MQTTClient *c)
{
LOG_D("mqtt_online_callback!");
}
static void mqtt_offline_callback(MQTTClient *c)
{
LOG_D("mqtt_offline_callback!");
}
创建pub线程回调函数和mqtt客户端启动函数
static void thread_pub(void *parameter)
{
pub_data = rt_malloc(TEST_DATA_SIZE * sizeof(char));
if (!pub_data)
{
rt_kprintf("no memory for pub_data\n");
return;
}
start_tm = time((time_t *) RT_NULL);
rt_kprintf("test start at '%d'\r\n", start_tm);
while (1)
{
rt_snprintf(pub_data, TEST_DATA_SIZE, "Pub EMQX message-%d", pub_count);
if (!paho_mqtt_publish(&client, QOS1, MQTT_PUBTOPIC, pub_data))
{
++pub_count;
}
rt_thread_delay(PUB_CYCLE_TM);
}
}
void mqtt_client_start(void)
{
if (is_started)
{
return;
}
mqtt_create();
while (!client.isconnected)
{
rt_kprintf("Waiting for mqtt connection...\n");
rt_thread_delay(1000);
}
pub_thread_tid = rt_thread_create("pub_thread", thread_pub, RT_NULL, 1024, 8, 100);
if (pub_thread_tid != RT_NULL)
{
rt_thread_startup(pub_thread_tid);
}
is_started = 1;
return;
}
设置mqtt连接参数和用户名
#define EMQX_Cloud_Professional_Version 1
#define EMQX_Cloud_TLS_SSL 1
#if !EMQX_Cloud_Professional_Version
#if EMQX_Cloud_TLS_SSL
#define MQTT_BROKER_URI "ssl://ge06f1e1.cn-shenzhen.emqx.cloud:15455"
#else
#define MQTT_BROKER_URI "tcp://ge06f1e1.cn-shenzhen.emqx.cloud:15915"
#endif
#else
#if EMQX_Cloud_TLS_SSL
#define MQTT_BROKER_URI "ssl://oba9d641.emqx.cloud:8883"
#else
#define MQTT_BROKER_URI "tcp://oba9d641.emqx.cloud:1883"
#endif
#endif
#define MQTT_CLIENTID_PREFIX "rtthread-mqtt"
#define MQTT_USERNAME "EMQX_RTT"
#define MQTT_PASSWORD "emqx_rtt_0813"
#define MQTT_SUBTOPIC "/emqx/mqtt/sub"
#define MQTT_PUBTOPIC "/emqx/mqtt/pub"
#define MQTT_WILLMSG "Goodbye!"
#define MQTT_QOS 1
main()函数启动mqtt client
mqtt_client_start()
也可在终端手动启动/停止
mqtt_ctrl start
mqtt_ctrl stop
若选择用户CA证书验证,则将CA证书(双向认证还需client.crt和client.key)放置到packages/mbedtls-latest/certs文件夹中
重新更新工程,会自动将证书内容复制到源文件中
构建项目并下载到目标板上
打开终端Terminal可以看到运行日志
客户端连接配置
单向无证书认证
单向自签名证书认证
双向认证
订阅和发布
查看RT-Thread-Stdio终端
数据收发正常!
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。