# pub_sub_mq **Repository Path**: Ape-LHR/rabbit-mq ## Basic Information - **Project Name**: pub_sub_mq - **Description**: 基于C++开发的高性能消息队列系统,实现了类似RabbitMQ的核心功能。系统采用多线程架构,支持多种交换机类型、消息持久化、消息确认机制等企业级特性。 - **Primary Language**: C++ - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-11-23 - **Last Updated**: 2025-12-13 ## Categories & Tags **Categories**: Uncategorized **Tags**: mq, Sqlite ## README # PubSubMQ - 高性能消息队列系统 ## 项目概述 PubSubMQ是一个基于C++开发的高性能消息队列系统,实现了类似RabbitMQ的核心功能。系统采用多线程架构,支持多种交换机类型、消息持久化、消息确认机制等企业级特性。 ## 系统架构 ### 架构图 #### 整体架构 ![输入图片说明](demo/image/system_architecture.png) #### 消息流 ![输入图片说明](demo/image/msg_flow.png) #### 组件依赖 ![输入图片说明](demo/image/component_dependency.png) #### 类层次 ![输入图片说明](demo/image/class_diagram.png) #### 数据流 ![输入图片说明](demo/image/data_flow.png) #### 网络通信 ![输入图片说明](demo/image/net_communication.png) ### 核心组件层次 1. **网络层**: 基于muduo网络库实现高性能网络通信 2. **协议层**: 使用Protobuf定义消息格式和RPC协议 3. **业务层**: 虚拟主机、交换机、队列、绑定等核心业务逻辑 4. **存储层**: SQLite元数据存储 + 文件系统消息持久化 ## 核心模块详解 ### 1. 协议定义 (common/) #### 消息格式 (message.proto) ```protobuf message Message { message Payload { Attribute attribute = 1; // 消息属性 string body = 2; // 消息内容 string valid = 3; // 有效性标志 }; Payload payload = 1; // 有效载荷 uint32 offset = 2; // 文件偏移量 uint32 length = 3; // 消息长度 } ``` #### 交换机类型 - `DIRECT`: 直接交换机,精确匹配路由键 - `FANOUT`: 广播交换机,忽略路由键 - `TOPIC`: 主题交换机,支持通配符匹配 #### 投递模式 - `UNDURABLE`: 非持久化消息 - `DURABLE`: 持久化消息 ### 2. 服务器端 (server/) #### BrokerServer (broker_server.hpp) **核心功能**: 消息队列服务器主入口,负责网络连接管理和请求分发 **主要特性**: - 基于muduo的异步事件驱动架构 - Protobuf协议编解码 - 请求分发器模式 - 线程池支持 **关键方法**: - `start()`: 启动服务器 - `OnPublishMsg()`: 处理消息发布 - `OnSubscribe()`: 处理队列订阅 #### VirtualHost (virtual_host.hpp) **核心功能**: 虚拟主机,管理交换机、队列、绑定和消息 **组件管理**: - `ExchangeManager`: 交换机管理器 - `MsgQueueManager`: 队列管理器 - `BindingManager`: 绑定管理器 - `MessageManager`: 消息管理器 **关键特性**: - 统一的资源管理接口 - 数据持久化支持 - 线程安全操作 #### ExchangeManager (exchange.hpp) **核心功能**: 交换机生命周期管理 **存储结构**: ```sql CREATE TABLE exchange_table( name VARCHAR(32) PRIMARY KEY, type INT, durable INT, auto_delete INT, args VARCHAR(128) ); ``` **支持操作**: - 创建/删除交换机 - 交换机信息持久化 - 交换机类型管理 #### MsgQueueManager (queue.hpp) **核心功能**: 消息队列管理 **队列属性**: - `durable`: 持久化标志 - `exclusive`: 排他性队列 - `auto_delete`: 自动删除 **存储结构**: ```sql CREATE TABLE queue_table( name VARCHAR(32) PRIMARY KEY, durable INT, exclusive INT, auto_delete INT, args VARCHAR(128) ); ``` #### MessageManager (message.hpp) **核心功能**: 消息存储和生命周期管理 **消息存储策略**: - 二进制文件存储消息内容 - 文本文件存储可读格式 - 支持消息垃圾回收 **文件结构**: - `queue_name.mqd`: 二进制消息数据文件 - `queue_name.txt`: 可读消息文本文件 - `queue_name.mqd.tmp`: 临时文件 **垃圾回收机制**: - 当持久化消息总量 > 2000 且有效比例 < 50%时触发 - 重新整理消息文件,删除无效消息 #### Router (router.hpp) **核心功能**: 消息路由匹配算法 **路由规则**: - `DIRECT`: 精确匹配 `routing_key == binding_key` - `FANOUT`: 无条件匹配,返回true - `TOPIC`: 通配符匹配算法 **通配符语义**: - `*`: 匹配单个单词 - `#`: 匹配0个或多个单词 **匹配算法**: 使用动态规划实现高效的模式匹配 #### BindingManager (binding.hpp) **核心功能**: 交换机和队列的绑定关系管理 **存储结构**: ```sql CREATE TABLE binding_table( exchange_name VARCHAR(32), msgqueue_name VARCHAR(32), binding_key VARCHAR(128) ); ``` ### 3. 客户端 (client/) #### Connection (connection.hpp) **核心功能**: 客户端连接管理 **主要特性**: - TCP连接管理 - 信道创建和销毁 - 异步消息处理 #### Channel (channel.hpp) **核心功能**: 信道操作封装 **支持操作**: - 交换机声明和管理 - 队列声明和管理 - 绑定关系管理 - 消息发布和消费 #### 示例客户端 **发布者客户端 (publish_client.cc)**: ```cpp // 创建连接和信道 auto conn = std::make_shared("127.0.0.1", 8085, async_worker); auto channel = conn->OpenChannel(); // 声明交换机和队列 channel->BuildExchange("exchange1", PubSubMQ::ExchangeType::TOPIC, true, false, args_map); channel->BuildQueue("queue1", true, false, false, args_map); // 发布消息 channel->PublishMsg("exchange1", &attribute, "Hello World"); ``` **消费者客户端 (consumer_client.cc)**: ```cpp // 消息回调函数 void OnMessageCallBack(PubSubMQ::Channel::ptr& channel, const std::string consumer_tag, const PubSubMQ::Attribute* attribute, const std::string& body) { std::cout << consumer_tag << " received: " << body << std::endl; channel->HandleAckMsg(attribute->msg_id()); // 消息确认 } // 订阅队列 channel->Subscribe("consumer1", "queue1", false, OnMessage); ``` ### 4. 公共组件 (common/) #### ThreadPool (threadpool.hpp) **核心功能**: 线程池实现,支持异步任务执行 #### Logger (logger.hpp) **核心功能**: 日志系统,支持不同级别的日志输出 #### Helper (helper.hpp) **工具函数**: - `UUIDHelper`: UUID生成 - `FileHelper`: 文件操作 - `StrHelper`: 字符串处理 - `SqliteHelper`: SQLite数据库操作 ## 消息流处理 ### 消息发布流程 1. 客户端通过信道发布消息到交换机 2. 交换机根据类型和绑定关系路由消息到队列 3. 消息管理器将消息持久化到文件系统 4. 返回发布确认 ### 消息消费流程 1. 消费者订阅队列并设置回调函数 2. 服务器推送队列中的消息到消费者 3. 消费者处理消息后发送确认 4. 服务器删除已确认的消息 ### 消息确认机制 - 支持自动确认和手动确认模式 - 确保消息至少被消费一次 - 防止消息丢失 ## 部署和运行 ### 环境要求 - C++17 编译器 - Protobuf 3.x - SQLite3 - muduo网络库 ### 编译步骤 ```bash cd build make ``` ### 启动服务器 ```bash cd build ./server ``` ### 运行示例 ```bash # 启动发布者 cd build ./publish_client # 启动消费者(指定队列名) ./consumer_client queue1 ``` ## 配置说明 ### 服务器配置 - 默认端口: 8085 - 数据目录: ./data/ - 元数据库: ./data/meta.db ### 客户端配置 - 服务器地址: 127.0.0.1 - 服务器端口: 8085 - 异步工作线程数: 可配置 ## 性能特性 ### 高可用性 - 消息持久化到磁盘 - 支持消息确认机制 - 自动故障恢复 ### 高性能 - 多线程架构 - 异步I/O操作 - 高效的内存管理 ### 可扩展性 - 模块化设计 - 支持多种交换机类型 - 灵活的绑定规则 ## 测试用例 项目包含完整的测试套件,覆盖以下功能: - 交换机管理测试 - 队列管理测试 - 绑定关系测试 - 消息路由测试 - 持久化测试 运行测试: ```bash cd test make ``` ## 开发指南 ### 添加新的交换机类型 1. 在 `message.proto` 中定义新的枚举值 2. 在 `Router::Route()` 方法中添加路由逻辑 3. 在客户端和服务器端添加相应的处理逻辑 ### 自定义消息属性 修改 `Attribute` 消息结构,添加所需的字段。 ### 扩展存储后端 实现新的 `MessageStore` 类,支持不同的存储引擎。 ## 故障排除 ### 常见问题 1. **端口占用**: 检查8085端口是否被占用 2. **权限问题**: 确保对数据目录有读写权限 3. **依赖缺失**: 确认所有第三方库已正确安装 ### 日志查看 系统日志输出到标准输出,包含详细的调试信息。