Fetch the repository succeeded.
#pragma once
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
#include <iostream>
#include <functional>
#include "log.hpp"
/// 默认初始化对应的异步事件监控 连接信道等;对外提供声明对应交换机 队列 绑定信息的函数,以及发布+订阅。
//注意对应异步线程去跑底层的IO监控,然后勿忘异步事件通知来break:
class MQclient
{
public:
using ptr = std::shared_ptr<MQclient>;
// 完成初始化网络底层IO以及与amqp框架,连接信道等--->一个程序中定义多个MQclient对象完成对应的不同的TcpConnection对象但是底层的连接是同一个只不过上层不是而已
MQclient(const std::string &user,
const std::string passwd,
const std::string host)
{
// 初始化网络底层IO以及与amqp框架联系起来:
_loop = EV_DEFAULT;
_handler = std::make_unique<AMQP::LibEvHandler>(_loop);
// amqp://root:123456@127.0.0.1:5672/
std::string url = "amqp://" + user + ":" + passwd + "@" + host + "/";
AMQP::Address address(url);
_connection = std::make_unique<AMQP::TcpConnection>(_handler.get(), address);
_channel = std::make_unique<AMQP::TcpChannel>(_connection.get());
// 这里因为默认主线程调用ev_run就会阻塞住,因此搞一个异步线程进行底层网络IO:
_loop_thread = std::thread([this]()
{ ev_run(_loop, 0); });
}
// 完成交换机 队列 绑定信息:
void DeclareComponents(const std::string &exchange,
const std::string &queue,
const std::string &binding_mg = "default_route",
AMQP::ExchangeType echange_type = AMQP::ExchangeType::direct)
{
_channel->declareExchange(exchange, echange_type)
.onError([](const char *message)
{
LOG_ERROR("声明交换机失败:{}", message);
exit(0); })
.onSuccess([exchange]()
{ LOG_ERROR("{} 交换机创建成功!", exchange); });
_channel->declareQueue(queue)
.onError([](const char *message)
{
LOG_ERROR("声明队列失败:{}", message);
exit(0); })
.onSuccess([queue]()
{ LOG_ERROR("{} 队列创建成功!", queue); });
_channel->bindQueue(exchange, queue, binding_mg)
.onError([exchange, queue](const char *message)
{
LOG_ERROR("{} - {} 绑定失败:", exchange, queue);
exit(0); })
.onSuccess([exchange, queue, binding_mg]()
{ LOG_ERROR("{} - {} - {} 绑定成功!", exchange, queue, binding_mg); });
}
// 消息发布(发布一次):
bool Publish(const std::string &exchange,
const std::string &msg,
const std::string &routing_key = "default_route")
{
LOG_DEBUG("向交换机 {}-{} 发布消息!", exchange, routing_key);
bool ret = _channel->publish(exchange, routing_key, msg);
if (ret == false)
{
LOG_ERROR("{} 发布消息失败:", exchange);
return false;
}
return true;
}
// 消息订阅:(只要订阅了,有消息就推送,除非取消订阅!)
using MessageCallback = std::function<void(const char *, size_t)>;
void Consume(const std::string &queue, const MessageCallback &cb)
{
LOG_DEBUG("开始订阅 {} 队列消息!", queue);
_channel->consume(queue, "consume-tag") // 返回值 DeferredConsumer
.onReceived([this, cb](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
cb(message.body(), message.bodySize());
_channel->ack(deliveryTag); })
.onError([queue](const char *message)
{
LOG_ERROR("订阅 {} 队列消息失败: {}", queue, message);
exit(0); });
}
//析构的时候必须回收之前的那个loop的异步线程,那个线程在一直loop循环监听,因此需要异步让那个线程自动调用结束函数来释放对应的loop的资源
//以及退出对应的loop_run,结束线程跑的那个函数等待回收,
~MQclient()
{
ev_async_init(&_async_watcher, watcher_callback);//给这个异步事件注册对应回调函数
ev_async_start(_loop, &_async_watcher);//把_async_watcher这个异步事件添加到这个事件循环loop里
ev_async_send(_loop, &_async_watcher);//手动告诉这个loop里的线程去调用_async_watcher中的回调
_loop_thread.join();//完成回收
_loop = nullptr;
}
private:
// 注意this指针
static void watcher_callback(struct ev_loop *loop, ev_async *watcher, int32_t revents)
{
//对应线程默认传递对应参数给它
ev_break(loop, EVBREAK_ALL);//结束循环监测及异步线程的那个函数
}
private:
std::thread _loop_thread;
std::unique_ptr<AMQP::LibEvHandler> _handler;
std::unique_ptr<AMQP::TcpConnection> _connection;
std::unique_ptr<AMQP::TcpChannel> _channel;
struct ev_loop *_loop;
struct ev_async _async_watcher;
};
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。