1 Star 0 Fork 0

LY/File_Monitor

Create your Gitee Account
Explore and code with more than 13.5 million developers,Free private repositories !:)
Sign up
文件
This repository doesn't specify license. Please pay attention to the specific project description and its upstream code dependency when using it.
Clone or Download
paho_mqtt.cpp 6.07 KB
Copy Edit Raw Blame History
LY authored 10 days ago . 抽离配置文件
//
// Created by JIN on 25-4-17.
//
#include "paho_mqtt.h"
#include <algorithm>
#include <fstream>
#include <iostream>
#include <utility>
#include <unistd.h>
PAHO_MQTT::PAHO_MQTT(std::string address, std::string client_id, std::string topic)
{
// MQTTAsync: 本身为 void* 类型,new 失败
client = nullptr;
this->address = std::move(address);
this->client_id = std::move(client_id);
this->topic = std::move(topic);
}
PAHO_MQTT::~PAHO_MQTT()
{
if (client != nullptr)
{
MQTTAsync_destroy(&client); // 使用 MQTTAsync_destroy 销毁 client
}
}
bool PAHO_MQTT::init()
{
int rc;
// 1. 创建客户端
if ((rc = MQTTAsync_create(&client, address.c_str(), client_id.c_str(),
MQTTCLIENT_PERSISTENCE_NONE, nullptr)) != MQTTASYNC_SUCCESS)
{
printf("Failed to create &client object, return code %d\n", rc);
return false;
}
// 2. 设置回调函数
if ((rc = MQTTAsync_setCallbacks(client, this, onConnectLost, onMessageArrived, nullptr)) != MQTTASYNC_SUCCESS)
{
printf("Failed to set callback, return code %d\n", rc);
return false;
}
return true;
}
bool PAHO_MQTT::connect()
{
int rc;
// 创建连接任务
printf("Start Connect MQTT Server...\n");
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnectSuccess;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = this;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
return false;
}
// 等待连接任务完成
while (connect_task_flag == false)
{
sleep(1);
}
// 判断连接状态
if (connect_state_flag == true)
{
printf("Connect MQTT Server Successful\n");
return true;
}
else
{
printf("Connect MQTT Server Failed\n");
return true;
}
}
// TODO: 优化此函数
bool PAHO_MQTT::send_img(const std::string& img_path)
{
// 1. 判断连接状态
if (MQTTAsync_isConnected(client) != true)
{
printf("Img Send Failed, Case MQTT Connect Lost\n");
return false;
}
// 2. 读取图片数据
std::optional<std::vector<uint8_t>> payloadOpt = readImg(img_path);
if (!payloadOpt)
{
std::cerr << "Img Read Failed" << std::endl;
return false;
}
const auto& payload = *payloadOpt;
// 3. MQTT发送
int rc;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message message = MQTTAsync_message_initializer;
opts.onSuccess = onSendSuccess;
opts.onFailure = onSendFailure;
opts.context = this;
message.payload = const_cast<void*>(reinterpret_cast<const void*>(payload.data()));
message.payloadlen = static_cast<int>(payload.size());
message.qos = QOS;
message.retained = 0;
if ((rc = MQTTAsync_sendMessage(client, topic.c_str(), &message, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
}
// 等待发送任务完成
while (send_task_flag == false)
{
sleep(1);
}
// 判断发送状态
if (send_state_flag == true)
{
printf("Send Img %s Successful\n", img_path.substr(img_path.find_last_of("/\\")).c_str());
return true;
}
else
{
printf("Send Img %s Failed\n", img_path.substr(img_path.find_last_of("/\\")).c_str());
return true;
}
}
void PAHO_MQTT::onConnectLost(void *context, char *cause)
{
printf("MQTT Connect Lost\n");
auto* self = static_cast<PAHO_MQTT*>(context);
self->connect();
}
int PAHO_MQTT::onMessageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *m)
{
printf("MQTT Connect messageArrived\n");
return 1;
}
void PAHO_MQTT::onConnectSuccess(void *context, MQTTAsync_successData *response)
{
auto* self = static_cast<PAHO_MQTT*>(context);
self->connect_task_flag = true;
self->connect_state_flag = true;
}
void PAHO_MQTT::onConnectFailure(void *context, MQTTAsync_failureData *response)
{
auto* self = static_cast<PAHO_MQTT*>(context);
self->connect_task_flag = true;
self->connect_state_flag = false;
}
void PAHO_MQTT::onSendSuccess(void *context, MQTTAsync_successData *response)
{
auto* self = static_cast<PAHO_MQTT*>(context);
self->send_task_flag = true;
self->send_state_flag = true;
}
void PAHO_MQTT::onSendFailure(void *context, MQTTAsync_failureData *response)
{
auto* self = static_cast<PAHO_MQTT*>(context);
self->send_task_flag = true;
self->send_state_flag = false;
}
std::optional<std::vector<uint8_t>> PAHO_MQTT::readImg(const std::string& img_path)
{
std::ifstream file(img_path, std::ios::binary | std::ios::ate);
if (!file.is_open()) {
std::cerr << "Failed to open image file: " << img_path << std::endl;
return std::nullopt;
}
std::streamsize imgSize = file.tellg();
file.seekg(0, std::ios::beg);
std::vector<uint8_t> imageData(static_cast<size_t>(imgSize));
if (!file.read(reinterpret_cast<char*>(imageData.data()), imgSize)) {
std::cerr << "Failed to read image file: " << img_path << std::endl;
return std::nullopt;
}
// 提取文件名
std::string fileName = img_path.substr(img_path.find_last_of("/\\") + 1);
// 替换文件名中的冒号为下划线,避免 Windows 不允许的字符
std::replace(fileName.begin(), fileName.end(), ':', '_');
uint16_t nameLen = static_cast<uint16_t>(fileName.size());
// 构建 payload: 2字节长度 + 文件名 + 图片数据
std::vector<uint8_t> payload;
payload.reserve(2 + nameLen + imageData.size());
payload.push_back(static_cast<uint8_t>(nameLen & 0xFF));
payload.push_back(static_cast<uint8_t>((nameLen >> 8) & 0xFF));
payload.insert(payload.end(), fileName.begin(), fileName.end());
payload.insert(payload.end(), imageData.begin(), imageData.end());
return payload;
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/lystudio_top/file_monitor.git
git@gitee.com:lystudio_top/file_monitor.git
lystudio_top
file_monitor
File_Monitor
master

Search