Fetch the repository succeeded.
//
// Created by JIN on 25-4-17.
//
#include "paho_mqtt.h"
#include <algorithm>
#include <fstream>
#include <iostream>
#include <utility>
#include <unistd.h>
PAHO_MQTT::PAHO_MQTT(std::string address, std::string client_id, std::string topic)
{
// MQTTAsync: 本身为 void* 类型,new 失败
client = nullptr;
this->address = std::move(address);
this->client_id = std::move(client_id);
this->topic = std::move(topic);
}
PAHO_MQTT::~PAHO_MQTT()
{
if (client != nullptr)
{
MQTTAsync_destroy(&client); // 使用 MQTTAsync_destroy 销毁 client
}
}
bool PAHO_MQTT::init()
{
int rc;
// 1. 创建客户端
if ((rc = MQTTAsync_create(&client, address.c_str(), client_id.c_str(),
MQTTCLIENT_PERSISTENCE_NONE, nullptr)) != MQTTASYNC_SUCCESS)
{
printf("Failed to create &client object, return code %d\n", rc);
return false;
}
// 2. 设置回调函数
if ((rc = MQTTAsync_setCallbacks(client, this, onConnectLost, onMessageArrived, nullptr)) != MQTTASYNC_SUCCESS)
{
printf("Failed to set callback, return code %d\n", rc);
return false;
}
return true;
}
bool PAHO_MQTT::connect()
{
int rc;
// 创建连接任务
printf("Start Connect MQTT Server...\n");
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnectSuccess;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = this;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
return false;
}
// 等待连接任务完成
while (connect_task_flag == false)
{
sleep(1);
}
// 判断连接状态
if (connect_state_flag == true)
{
printf("Connect MQTT Server Successful\n");
return true;
}
else
{
printf("Connect MQTT Server Failed\n");
return true;
}
}
// TODO: 优化此函数
bool PAHO_MQTT::send_img(const std::string& img_path)
{
// 1. 判断连接状态
if (MQTTAsync_isConnected(client) != true)
{
printf("Img Send Failed, Case MQTT Connect Lost\n");
return false;
}
// 2. 读取图片数据
std::optional<std::vector<uint8_t>> payloadOpt = readImg(img_path);
if (!payloadOpt)
{
std::cerr << "Img Read Failed" << std::endl;
return false;
}
const auto& payload = *payloadOpt;
// 3. MQTT发送
int rc;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message message = MQTTAsync_message_initializer;
opts.onSuccess = onSendSuccess;
opts.onFailure = onSendFailure;
opts.context = this;
message.payload = const_cast<void*>(reinterpret_cast<const void*>(payload.data()));
message.payloadlen = static_cast<int>(payload.size());
message.qos = QOS;
message.retained = 0;
if ((rc = MQTTAsync_sendMessage(client, topic.c_str(), &message, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
// 等待发送任务完成
while (send_task_flag == false)
{
sleep(1);
}
// 判断发送状态
if (send_state_flag == true)
{
printf("Send Img %s Successful\n", img_path.substr(img_path.find_last_of("/\\")).c_str());
return true;
}
else
{
printf("Send Img %s Failed\n", img_path.substr(img_path.find_last_of("/\\")).c_str());
return true;
}
}
void PAHO_MQTT::onConnectLost(void *context, char *cause)
{
printf("MQTT Connect Lost\n");
auto* self = static_cast<PAHO_MQTT*>(context);
self->connect();
}
int PAHO_MQTT::onMessageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *m)
{
printf("MQTT Connect messageArrived\n");
return 1;
}
void PAHO_MQTT::onConnectSuccess(void *context, MQTTAsync_successData *response)
{
auto* self = static_cast<PAHO_MQTT*>(context);
self->connect_task_flag = true;
self->connect_state_flag = true;
}
void PAHO_MQTT::onConnectFailure(void *context, MQTTAsync_failureData *response)
{
auto* self = static_cast<PAHO_MQTT*>(context);
self->connect_task_flag = true;
self->connect_state_flag = false;
}
void PAHO_MQTT::onSendSuccess(void *context, MQTTAsync_successData *response)
{
auto* self = static_cast<PAHO_MQTT*>(context);
self->send_task_flag = true;
self->send_state_flag = true;
}
void PAHO_MQTT::onSendFailure(void *context, MQTTAsync_failureData *response)
{
auto* self = static_cast<PAHO_MQTT*>(context);
self->send_task_flag = true;
self->send_state_flag = false;
}
std::optional<std::vector<uint8_t>> PAHO_MQTT::readImg(const std::string& img_path)
{
std::ifstream file(img_path, std::ios::binary | std::ios::ate);
if (!file.is_open()) {
std::cerr << "Failed to open image file: " << img_path << std::endl;
return std::nullopt;
}
std::streamsize imgSize = file.tellg();
file.seekg(0, std::ios::beg);
std::vector<uint8_t> imageData(static_cast<size_t>(imgSize));
if (!file.read(reinterpret_cast<char*>(imageData.data()), imgSize)) {
std::cerr << "Failed to read image file: " << img_path << std::endl;
return std::nullopt;
}
// 提取文件名
std::string fileName = img_path.substr(img_path.find_last_of("/\\") + 1);
// 替换文件名中的冒号为下划线,避免 Windows 不允许的字符
std::replace(fileName.begin(), fileName.end(), ':', '_');
uint16_t nameLen = static_cast<uint16_t>(fileName.size());
// 构建 payload: 2字节长度 + 文件名 + 图片数据
std::vector<uint8_t> payload;
payload.reserve(2 + nameLen + imageData.size());
payload.push_back(static_cast<uint8_t>(nameLen & 0xFF));
payload.push_back(static_cast<uint8_t>((nameLen >> 8) & 0xFF));
payload.insert(payload.end(), fileName.begin(), fileName.end());
payload.insert(payload.end(), imageData.begin(), imageData.end());
return payload;
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。