# 超级美眉-消息队列 **Repository Path**: qiuwenwu91/mm_queue ## Basic Information - **Project Name**: 超级美眉-消息队列 - **Description**: 用于服务端开发,方便实现消息队列,实现按顺序处理数据,避免数据冲突。 - **Primary Language**: NodeJS - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-12-07 - **Last Updated**: 2026-03-31 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # mm_queue [![npm version](https://img.shields.io/npm/v/mm_queue.svg)](https://www.npmjs.com/package/mm_queue) [![License](https://img.shields.io/npm/l/mm_queue.svg)](https://gitee.com/qiuwenwu91/mm_queue/blob/main/LICENSE) [![Node.js Version](https://img.shields.io/node/v/mm_queue.svg)](https://nodejs.org) [![GitHub stars](https://img.shields.io/github/stars/qiuwenwu91/mm_queue.svg?style=social&label=Star)](https://gitee.com/qiuwenwu91/mm_queue) [English](./README_EN.md) | 中文 ## 消息队列抽象层 一个支持多种消息队列实现的抽象层,提供统一的API接口,支持内存、Redis、RabbitMQ等多种队列后端。 ### 特性 #### 核心特性 - ✅ **统一抽象接口** - 提供一致的API,支持多种队列实现 - ✅ **插件化架构** - 易于扩展新的队列实现 - ✅ **类型安全** - 完整的TypeScript类型定义 - ✅ **异步操作** - 所有方法返回Promise,支持async/await #### 支持的队列类型 - ✅ **内存队列** - 高性能内存实现,适合单机应用 - ✅ **Redis队列** - 基于Redis的分布式队列,支持持久化 - ✅ **RabbitMQ队列** - 企业级消息队列,支持复杂路由 #### 功能特性 - ✅ **消息存储和顺序处理** - 保证FIFO(先进先出)顺序 - ✅ **异步消费模式** - 支持多个消费者并发处理 - ✅ **回调通知机制** - 消息处理完成后通知入队者 - ✅ **请求-响应模式** - 支持类似RPC的请求响应场景 - ✅ **队列管理功能** - 长度限制、状态监控、清空操作 - ✅ **错误处理** - 消费者错误不影响队列运行 - ✅ **消息ID跟踪** - 每个消息有唯一标识,支持结果查询 ## 安装 ```bash npm install mm_queue ``` ## 快速开始 ### 安装依赖 ```bash npm install mm_queue # 如果需要Redis或RabbitMQ支持,还需要安装相应依赖 npm install redis amqplib ``` ### 基本使用 ```javascript const { create, TYPE } = require('mm_queue'); // 创建内存队列实例 const queue = create(TYPE.MEMORY, { max_size: 100, name: 'my_queue' }); // 注册消费者 await queue.consume(async (data) => { console.log('处理消息:', data); return `处理结果: ${data}`; }); // 入队消息 const msgId = await queue.enqueue('测试消息', (result) => { console.log('回调收到结果:', result.results[0].result); }); console.log('消息ID:', msgId); ``` ### 多队列类型示例 ```javascript // 内存队列(无需外部依赖) const memory_queue = create(TYPE.MEMORY, { max_size: 1000 }); // Redis队列(需要Redis服务) const redis_queue = create(TYPE.REDIS, { host: 'localhost', port: 6379, name: 'redis_queue' }); // RabbitMQ队列(需要RabbitMQ服务) const rabbitmq_queue = create(TYPE.RABBITMQ, { host: 'localhost', port: 5672, name: 'rabbitmq_queue' }); // 所有队列使用相同的API await memory_queue.enqueue('内存消息'); await redis_queue.enqueue('Redis消息'); await rabbitmq_queue.enqueue('RabbitMQ消息'); ``` ### 请求-响应模式 ```javascript // 注册请求处理器 await queue.consume(async (request) => { if (request.type === 'query') { return { status: 'success', data: `响应数据: ${request.payload}` }; } }); // 发送请求 await queue.enqueue({ type: 'query', payload: '用户数据' }, (response) => { console.log('收到响应:', response); }); ``` ## API 文档 ### 核心接口 #### QueueInterface 抽象类 所有队列实现都必须遵循此接口: ```javascript /** * 消息队列接口 * @interface */ class QueueInterface { /** * 入队消息 * @param {any} data - 消息数据 * @param {Function} [callback] - 处理完成回调 * @returns {Promise} 消息ID */ async enqueue(data, callback) {} /** * 出队消息 * @returns {Promise} 消息对象 */ async dequeue() {} /** * 注册消费者 * @param {Function} handler - 消息处理函数 * @returns {Promise} */ async consume(handler) {} /** * 获取队列长度 * @returns {Promise} */ async size() {} /** * 检查队列是否为空 * @returns {Promise} */ async isEmpty() {} /** * 清空队列 * @returns {Promise} */ async clear() {} /** * 获取队列状态 * @returns {Promise} */ async status() {} /** * 获取消息处理结果 * @param {string} message_id - 消息ID * @returns {Promise} */ async get(message_id) {} /** * 删除消息 * @param {string} message_id - 消息ID * @returns {Promise} */ async del(message_id) {} } ``` ### 工厂方法 #### create(type, config) 创建队列实例 参数: - `type` (TYPE) - 队列类型 - `config` (Object) - 队列配置 返回值:`Queue` - 队列实例 #### getTypes() 获取支持的队列类型 返回值:`Array` - 支持的队列类型列表 #### isSupported(type) 检查是否支持指定队列类型 参数: - `type` (string) - 队列类型 返回值:`boolean` - 是否支持 ### 队列类型枚举 ```javascript /** * 队列类型枚举 * @enum {string} */ const TYPE = { MEMORY: 'memory', // 内存队列 REDIS: 'redis', // Redis队列 RABBITMQ: 'rabbitmq' // RabbitMQ队列 }; ``` ### 配置参数 #### 通用配置 - `max_size` (Number) - 队列最大长度,默认无限制 - `name` (String) - 队列名称,用于标识 #### Redis配置 - `host` (String) - Redis主机地址,默认'localhost' - `port` (Number) - Redis端口,默认6379 - `password` (String) - Redis密码,可选 - `db` (Number) - Redis数据库,默认0 #### RabbitMQ配置 - `host` (String) - RabbitMQ主机地址,默认'localhost' - `port` (Number) - RabbitMQ端口,默认5672 - `username` (String) - 用户名,默认'guest' - `password` (String) - 密码,默认'guest' - `vhost` (String) - 虚拟主机,默认'/' ## 示例 ### 基本示例 ```javascript const { create, Type } = require('./index'); async function main() { // 创建内存队列 const queue = create(Type.MEMORY, { max_size: 100 }); // 注册消费者 await queue.consume(async (data) => { console.log('处理消息:', data); return `处理完成: ${data}`; }); // 入队消息 await queue.enqueue('消息1'); await queue.enqueue('消息2'); // 获取状态 const status = await queue.status(); console.log('队列状态:', status); } main().catch(console.error); ``` ### 回调通知示例 ```javascript const { create, Type } = require('./index'); async function main() { const queue = create(Type.MEMORY); // 注册消费者 await queue.consume(async (data) => { console.log('处理消息:', data); return { result: 'success', data: `已处理: ${data}` }; }); // 入队带回调的消息 await queue.enqueue('重要任务', (result) => { console.log('回调通知:'); console.log(' 消息ID:', result.message_id); console.log(' 处理结果:', result.results[0].result); }); } main().catch(console.error); ``` ### 请求-响应模式 ```javascript const { create, Type } = require('./index'); async function main() { const queue = create(Type.MEMORY); // 注册请求处理器 await queue.consume(async (request) => { if (request.type === 'get_user') { return { user: { id: 1, name: '张三' } }; } return { error: '未知请求类型' }; }); // 发送请求 await queue.enqueue({ type: 'get_user', user_id: 1 }, (response) => { console.log('收到响应:', response.results[0].result); }); } main().catch(console.error); ``` ### 多队列类型切换示例 ```javascript const { create_queue, QueueType } = require('./index'); async function test_queue(type, config) { console.log(`\n测试 ${type} 队列:`); try { const queue = await create_queue(type, config); // 注册消费者 await queue.consume(async (data) => { console.log(`[${type}] 处理消息:`, data); return `处理完成: ${data}`; }); // 入队消息 const msgId = await queue.enqueue(`测试消息 - ${type}`); console.log(`[${type}] 消息ID:`, msgId); // 检查状态 const status = await queue.status(); console.log(`[${type}] 队列状态:`, status); // 关闭连接 await queue.close(); } catch (err) { console.error(`[${type}] 错误:`, err.message); } } async function main() { // 测试内存队列 await test_queue(QueueType.MEMORY, { max_size: 10 }); // 测试Redis队列(需要Redis服务) await test_queue(QueueType.REDIS, { host: 'localhost', port: 6379 }); // 测试RabbitMQ队列(需要RabbitMQ服务) await test_queue(QueueType.RABBITMQ, { host: 'localhost', port: 5672 }); } main().catch(console.error); ``` ## 高级用法 ### 批量处理 ```javascript const { create_queue, QueueType } = require('./index'); async function batch_processing() { const queue = await create_queue(QueueType.MEMORY); // 注册批量处理器 await queue.consume(async (data) => { console.log('批量处理消息:', data); return { processed: true, timestamp: Date.now() }; }); // 批量入队消息 const messages = ['消息1', '消息2', '消息3']; const promises = messages.map(msg => queue.enqueue(msg)); // 等待所有消息入队 await Promise.all(promises); console.log('批量处理完成'); } batch_processing().catch(console.error); ``` ### 自定义消息处理 ```javascript const { create_queue, QueueType } = require('./index'); async function custom_processing() { const queue = await create_queue(QueueType.MEMORY); // 复杂消息处理器 await queue.consume(async (message) => { switch (message.type) { case 'email': // 处理邮件 return await send_email(message); case 'notification': // 处理通知 return await send_notification(message); case 'report': // 生成报告 return await generate_report(message); default: return { error: '未知消息类型' }; } }); // 发送不同类型的消息 await queue.enqueue({ type: 'email', to: 'user@example.com', subject: '测试邮件' }); await queue.enqueue({ type: 'notification', user_id: 1, message: '系统通知' }); } custom_processing().catch(console.error); ``` ### 队列监控 ```javascript const { create_queue, QueueType } = require('./index'); async function monitor_queue() { const queue = await create_queue(QueueType.MEMORY); // 定期监控队列状态 setInterval(async () => { try { const status = await queue.status(); console.log('队列监控:'); console.log(' 长度:', status.size); console.log(' 消费者:', status.consumers); console.log(' 处理中:', status.processing); console.log(' 待处理回调:', status.pending_callbacks); } catch (err) { console.error('监控错误:', err.message); } }, 5000); // 模拟消息处理 await queue.consume(async (data) => { await new Promise(resolve => setTimeout(resolve, 1000)); return `处理完成: ${data}`; }); // 持续入队消息 for (let i = 0; i < 10; i++) { await queue.enqueue(`监控消息${i}`); await new Promise(resolve => setTimeout(resolve, 2000)); } } monitor_queue().catch(console.error); ``` ## 架构说明 ### 抽象层设计 新的消息队列抽象层采用插件化架构,包含以下核心组件: ``` mm_queue/ ├── index.js # 主入口,提供工厂方法和统一接口 ├── interface.js # QueueInterface抽象类定义 ├── factory.js # QueueFactory工厂类 └── impl/ # 具体实现 ├── memory.js # 内存队列实现 ├── redis.js # Redis队列实现 └── rabbitmq.js # RabbitMQ队列实现 ``` ### 设计模式 - **抽象工厂模式**:通过`QueueFactory`统一创建不同队列实例 - **策略模式**:不同的队列实现可以动态切换 - **适配器模式**:将第三方队列系统适配到统一接口 ### 扩展性 要添加新的队列实现,只需: 1. 在`impl`目录下创建新的实现文件 2. 实现`QueueInterface`接口的所有方法 3. 在`factory.js`中注册新的队列类型 4. 更新`QueueType`枚举 ### 性能考虑 - **内存队列**:适合单机应用,性能最高 - **Redis队列**:适合分布式应用,支持持久化 - **RabbitMQ队列**:适合企业级应用,支持复杂路由 ## 最佳实践 ### 1. 选择合适的队列类型 - **单机应用**:使用内存队列,性能最优 - **分布式应用**:使用Redis队列,支持多实例 - **企业级应用**:使用RabbitMQ队列,功能最全 ### 2. 错误处理策略 ```javascript try { await queue.enqueue(data); } catch (err) { console.error('入队失败:', err.message); // 重试逻辑或降级处理 } ``` ### 3. 资源管理 ```javascript // 使用完毕后关闭连接 await queue.close(); ``` ### 4. 消费者设计 ```javascript // 消费者应该处理所有可能的错误 await queue.consume(async (data) => { try { return await process_data(data); } catch (err) { console.error('处理失败:', err.message); return { error: err.message }; } }); ``` ## 命名规范 本项目严格遵循以下命名规范: - **单字优先**: 所有命名首先尝试使用一个单词 - **禁用废话**: 避免使用Manager、Processor、Handler等宽泛词汇 - **使用缩写**: 使用行业通用缩写保持简洁 ## 许可证 MIT