1 Star 0 Fork 0

羑悻./An instant messaging system based on microservices

Create your Gitee Account
Explore and code with more than 14 million developers,Free private repositories !:)
Sign up
文件
This repository doesn't specify license. Please pay attention to the specific project description and its upstream code dependency when using it.
Clone or Download
rabbitmq.hpp 5.48 KB
Copy Edit Raw Blame History
羑悻. authored 2025-11-15 06:55 +08:00 . RabbitMQ模块封装更新
#pragma once
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
#include <iostream>
#include <functional>
#include "log.hpp"
/// 默认初始化对应的异步事件监控 连接信道等;对外提供声明对应交换机 队列 绑定信息的函数,以及发布+订阅。
//注意对应异步线程去跑底层的IO监控,然后勿忘异步事件通知来break:
class MQclient
{
public:
using ptr = std::shared_ptr<MQclient>;
// 完成初始化网络底层IO以及与amqp框架,连接信道等--->一个程序中定义多个MQclient对象完成对应的不同的TcpConnection对象但是底层的连接是同一个只不过上层不是而已
MQclient(const std::string &user,
const std::string passwd,
const std::string host)
{
// 初始化网络底层IO以及与amqp框架联系起来:
_loop = EV_DEFAULT;
_handler = std::make_unique<AMQP::LibEvHandler>(_loop);
// amqp://root:123456@127.0.0.1:5672/
std::string url = "amqp://" + user + ":" + passwd + "@" + host + "/";
AMQP::Address address(url);
_connection = std::make_unique<AMQP::TcpConnection>(_handler.get(), address);
_channel = std::make_unique<AMQP::TcpChannel>(_connection.get());
// 这里因为默认主线程调用ev_run就会阻塞住,因此搞一个异步线程进行底层网络IO:
_loop_thread = std::thread([this]()
{ ev_run(_loop, 0); });
}
// 完成交换机 队列 绑定信息:
void DeclareComponents(const std::string &exchange,
const std::string &queue,
const std::string &binding_mg = "default_route",
AMQP::ExchangeType echange_type = AMQP::ExchangeType::direct)
{
_channel->declareExchange(exchange, echange_type)
.onError([](const char *message)
{
LOG_ERROR("声明交换机失败:{}", message);
exit(0); })
.onSuccess([exchange]()
{ LOG_ERROR("{} 交换机创建成功!", exchange); });
_channel->declareQueue(queue)
.onError([](const char *message)
{
LOG_ERROR("声明队列失败:{}", message);
exit(0); })
.onSuccess([queue]()
{ LOG_ERROR("{} 队列创建成功!", queue); });
_channel->bindQueue(exchange, queue, binding_mg)
.onError([exchange, queue](const char *message)
{
LOG_ERROR("{} - {} 绑定失败:", exchange, queue);
exit(0); })
.onSuccess([exchange, queue, binding_mg]()
{ LOG_ERROR("{} - {} - {} 绑定成功!", exchange, queue, binding_mg); });
}
// 消息发布(发布一次):
bool Publish(const std::string &exchange,
const std::string &msg,
const std::string &routing_key = "default_route")
{
LOG_DEBUG("向交换机 {}-{} 发布消息!", exchange, routing_key);
bool ret = _channel->publish(exchange, routing_key, msg);
if (ret == false)
{
LOG_ERROR("{} 发布消息失败:", exchange);
return false;
}
return true;
}
// 消息订阅:(只要订阅了,有消息就推送,除非取消订阅!)
using MessageCallback = std::function<void(const char *, size_t)>;
void Consume(const std::string &queue, const MessageCallback &cb)
{
LOG_DEBUG("开始订阅 {} 队列消息!", queue);
_channel->consume(queue, "consume-tag") // 返回值 DeferredConsumer
.onReceived([this, cb](const AMQP::Message &message,
uint64_t deliveryTag,
bool redelivered)
{
cb(message.body(), message.bodySize());
_channel->ack(deliveryTag); })
.onError([queue](const char *message)
{
LOG_ERROR("订阅 {} 队列消息失败: {}", queue, message);
exit(0); });
}
//析构的时候必须回收之前的那个loop的异步线程,那个线程在一直loop循环监听,因此需要异步让那个线程自动调用结束函数来释放对应的loop的资源
//以及退出对应的loop_run,结束线程跑的那个函数等待回收,
~MQclient()
{
ev_async_init(&_async_watcher, watcher_callback);//给这个异步事件注册对应回调函数
ev_async_start(_loop, &_async_watcher);//把_async_watcher这个异步事件添加到这个事件循环loop里
ev_async_send(_loop, &_async_watcher);//手动告诉这个loop里的线程去调用_async_watcher中的回调
_loop_thread.join();//完成回收
_loop = nullptr;
}
private:
// 注意this指针
static void watcher_callback(struct ev_loop *loop, ev_async *watcher, int32_t revents)
{
//对应线程默认传递对应参数给它
ev_break(loop, EVBREAK_ALL);//结束循环监测及异步线程的那个函数
}
private:
std::thread _loop_thread;
std::unique_ptr<AMQP::LibEvHandler> _handler;
std::unique_ptr<AMQP::TcpConnection> _connection;
std::unique_ptr<AMQP::TcpChannel> _channel;
struct ev_loop *_loop;
struct ev_async _async_watcher;
};
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/wang-yimingq/an-instant-messaging-system-based-on-microservices.git
git@gitee.com:wang-yimingq/an-instant-messaging-system-based-on-microservices.git
wang-yimingq
an-instant-messaging-system-based-on-microservices
An instant messaging system based on microservices
master

Search