# php_redis_mq **Repository Path**: lzw7758/php_redis_mq ## Basic Information - **Project Name**: php_redis_mq - **Description**: No description available - **Primary Language**: Unknown - **License**: MIT - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-08-07 - **Last Updated**: 2025-08-07 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # PHP Redis 消息队列 一个功能完整的PHP Redis消息队列实现,支持可靠队列、重试机制、死信队列、多进程消费等功能。 ## 特性 - ✅ **可靠队列**: 使用BRPOPLPUSH确保消息不丢失 - ✅ **重试机制**: 指数退避重试策略 - ✅ **死信队列**: 处理无法处理的消息 - ✅ **进程管理**: 心跳检测和优雅关闭 - ✅ **监控告警**: 完整的队列状态监控 - ✅ **多进程支持**: 支持多进程并发消费 - ✅ **连接管理**: 自动重连和连接池 - ✅ **错误处理**: 完善的异常处理机制 - ✅ **优先级队列**: 支持高、中、低优先级 - ✅ **延迟队列**: 支持延迟消息处理 - ✅ **批量操作**: 支持批量推送消息 - ✅ **Web管理界面**: 提供可视化的队列管理界面 ## 安装 ```bash composer require Lzw7758/php_redis_mq ``` ## 快速开始 ### 基本使用 ```php '127.0.0.1', 'port' => 6379, 'password' => null, 'database' => 0, 'max_retry_count' => 3, ]); // 推送消息 $messageId = $queueManager->push('email_queue', [ 'to' => 'user@example.com', 'subject' => '测试邮件', 'body' => '这是一封测试邮件' ]); // 消费消息 $queueManager->consume('email_queue', function($message, $messageData) { echo "处理消息: " . $messageData['id'] . "\n"; // 处理消息逻辑 $result = sendEmail($message['to'], $message['subject'], $message['body']); return $result; // 返回true表示处理成功,false表示处理失败 }); ``` ### 高级功能 #### 优先级队列 ```php // 高优先级消息 $queueManager->push('email_queue', $message, 'high'); // 低优先级消息 $queueManager->push('email_queue', $message, 'low'); ``` #### 延迟队列 ```php // 延迟60秒处理 $queueManager->pushDelayed('email_queue', $message, 60); ``` #### 批量推送 ```php $messages = [ ['to' => 'user1@example.com', 'subject' => '邮件1'], ['to' => 'user2@example.com', 'subject' => '邮件2'], ['to' => 'user3@example.com', 'subject' => '邮件3'], ]; $messageIds = $queueManager->pushBatch('email_queue', $messages); ``` #### 多进程消费 ```php // 启动4个进程并发消费 $queueManager->consumeMulti('email_queue', function($message, $messageData) { // 消息处理逻辑 return true; }, [], 4); ``` ## 常量定义 项目提供了完整的常量定义,用于统一配置管理和错误处理: ### 使用常量 ```php use Lzw7758\PhpRedisMq\Constants; // 使用预定义的优先级 $messageId = $queueManager->push('email_queue', $message, Constants::PRIORITY_HIGH); // 使用预定义的消息状态 $messageData[Constants::MESSAGE_FIELD_STATUS] = Constants::MESSAGE_STATUS_PENDING; // 使用预定义的错误码 if ($error) { throw new Exception(Constants::getErrorMessage(Constants::ERROR_CODE_MESSAGE_PUSH_FAILED)); } // 获取默认配置 $config = Constants::getDefaultConfig(); $consumerOptions = Constants::getDefaultConsumerOptions(); ``` ### 主要常量分类 - **队列类型常量**: 定义不同类型的队列前缀 - **优先级常量**: 定义消息优先级级别 - **消息状态常量**: 定义消息生命周期状态 - **工作者状态常量**: 定义工作者运行状态 - **重试策略常量**: 定义消息重试策略 - **错误码常量**: 定义系统错误码 - **配置常量**: 定义默认配置选项 详细使用说明请参考 [常量使用文档](docs/constants_usage.md)。 ## 消息处理器系统 项目提供了完整的消息处理器系统,支持灵活、可扩展的消息处理机制: ### 使用消息处理器 ```php use Lzw7758\PhpRedisMq\HandlerManager; use Lzw7758\PhpRedisMq\Handlers\EmailMessageHandler; use Lzw7758\PhpRedisMq\Constants; // 创建处理器管理器 $handlerManager = new HandlerManager(); // 创建并注册邮件处理器 $emailHandler = new EmailMessageHandler([ 'email' => [ 'default_from' => 'noreply@example.com', 'smtp_host' => 'localhost', ] ]); $handlerManager->registerHandler($emailHandler); // 处理消息 $message = [ Constants::MESSAGE_FIELD_TYPE => 'email', 'to' => 'user@example.com', 'subject' => '测试邮件', 'body' => '邮件内容' ]; $results = $handlerManager->handleMessage($message, $messageData); ``` ### 主要特性 - **灵活的消息处理**: 支持多种消息类型和处理器 - **完整的生命周期管理**: 初始化、处理、清理 - **强大的错误处理**: 重试策略和异常处理 - **丰富的监控功能**: 统计信息和健康检查 - **可扩展的架构**: 易于创建自定义处理器 详细使用说明请参考 [消息处理器文档](docs/message_handlers.md)。 ## Web管理界面 项目提供了完整的Web管理界面,支持可视化的队列监控和管理: ### 快速集成 1. **安装包** ```bash composer require Lzw7758/php_redis_mq ``` 2. **配置路由** ```php // 在 config/route.php 中添加 Route::any('queue-manage/:action?', function ($action = 'index') { $handler = new \Lzw7758\PhpRedisMq\Web\ThinkPhpWebHandler(); $_GET['action'] = $action; return $handler->handle(); }); ``` 3. **访问管理界面** ``` http://your-domain.com/queue-manage/ ``` ### 主要特性 - ✅ **零配置**: 无需编写控制器代码 - ✅ **一行路由**: 一行配置搞定所有功能 - ✅ **完整功能**: 支持所有队列管理操作 - ✅ **可扩展**: 支持自定义配置和中间件 - ✅ **ThinkPHP原生**: 完全兼容ThinkPHP框架 详细使用说明请参考 [ThinkPHP集成指南](docs/thinkphp_integration_guide.md)。 ## 配置选项 ```php $config = [ // Redis连接配置 'host' => '127.0.0.1', 'port' => 6379, 'password' => null, 'database' => 0, // 连接超时配置 'connection_timeout' => 5, 'read_timeout' => 5, 'write_timeout' => 5, // 重试配置 'retry_attempts' => 3, 'retry_delay' => 1000000, // 1秒 'max_retry_count' => 3, // 处理配置 'max_processing_time' => 300, 'heartbeat_interval' => 30, ]; ``` ## 监控和管理 ### 获取队列统计信息 ```php $stats = $queueManager->getQueueStats('email_queue'); echo "主队列长度: " . $stats['main_queue_length'] . "\n"; echo "处理中队列长度: " . $stats['processing_length'] . "\n"; echo "死信队列长度: " . $stats['dead_queue_length'] . "\n"; ``` ### 获取性能指标 ```php $metrics = $queueManager->getPerformanceMetrics('email_queue'); echo "成功率: " . $metrics['success_rate'] . "%\n"; echo "失败率: " . $metrics['failure_rate'] . "%\n"; echo "处理速率: " . $metrics['processing_rate_per_minute'] . " 消息/分钟\n"; ``` ### 清理过期数据 ```php // 清理24小时前的数据 $result = $queueManager->cleanupExpiredData('email_queue', 86400); echo "清理了 " . $result['dead_messages'] . " 条死信消息\n"; ``` ### 处理重试队列 ```php $result = $queueManager->processRetryQueues('email_queue'); echo "处理了 " . $result['processed'] . " 条重试消息\n"; ``` ## 完整示例 ### 生产者示例 ```php '127.0.0.1', 'port' => 6379, 'database' => 0, ]); // 发送普通消息 $messageId = $queueManager->push('email_queue', [ 'to' => 'user@example.com', 'subject' => '欢迎邮件', 'body' => '欢迎使用我们的服务!' ]); // 发送高优先级消息 $messageId = $queueManager->push('email_queue', [ 'to' => 'admin@example.com', 'subject' => '紧急通知', 'body' => '系统出现异常,请立即处理!' ], 'high'); // 发送延迟消息 $messageId = $queueManager->pushDelayed('email_queue', [ 'to' => 'user@example.com', 'subject' => '提醒邮件', 'body' => '您的订单即将到期,请及时处理。' ], 3600); // 1小时后发送 echo "消息发送成功,ID: {$messageId}\n"; ``` ### 消费者示例 ```php '127.0.0.1', 'port' => 6379, 'database' => 0, ]); // 定义消息处理函数 $messageHandler = function($message, $messageData) { echo "处理消息: " . $messageData['id'] . "\n"; echo "优先级: " . $messageData['priority'] . "\n"; echo "重试次数: " . $messageData['attempts'] . "\n"; try { // 模拟邮件发送 $result = sendEmail($message['to'], $message['subject'], $message['body']); if ($result) { echo "邮件发送成功: {$message['to']}\n"; return true; // 处理成功 } else { echo "邮件发送失败: {$message['to']}\n"; return false; // 处理失败,将进入重试队列 } } catch (Exception $e) { echo "处理异常: " . $e->getMessage() . "\n"; return false; // 处理失败 } }; // 启动消费者 $queueManager->consume('email_queue', $messageHandler, [ 'timeout' => 5, 'max_memory' => 128 * 1024 * 1024, // 128MB 'max_execution_time' => 3600, ]); ``` ### 多进程消费者示例 ```php '127.0.0.1', 'port' => 6379, 'database' => 0, ]); // 启动4个进程并发消费 $queueManager->consumeMulti('email_queue', function($message, $messageData) { echo "进程 " . getmypid() . " 处理消息: " . $messageData['id'] . "\n"; // 处理消息逻辑 $result = sendEmail($message['to'], $message['subject'], $message['body']); return $result; }, [ 'timeout' => 5, 'max_memory' => 128 * 1024 * 1024, 'max_execution_time' => 3600, ], 4); ``` ### 监控脚本示例 ```php '127.0.0.1', 'port' => 6379, 'database' => 0, ]); // 获取队列统计信息 $stats = $queueManager->getQueueStats('email_queue'); echo "=== 队列统计信息 ===\n"; echo "主队列长度: " . $stats['main_queue_length'] . "\n"; echo "高优先级队列长度: " . $stats['high_queue_length'] . "\n"; echo "低优先级队列长度: " . $stats['low_queue_length'] . "\n"; echo "处理中队列长度: " . $stats['processing_length'] . "\n"; echo "延迟队列长度: " . $stats['delayed_queue_length'] . "\n"; echo "死信队列长度: " . $stats['dead_queue_length'] . "\n"; echo "总长度: " . $stats['total_length'] . "\n"; // 获取性能指标 $metrics = $queueManager->getPerformanceMetrics('email_queue'); echo "\n=== 性能指标 ===\n"; echo "总推送数: " . $metrics['total_pushed'] . "\n"; echo "总处理数: " . $metrics['total_processed'] . "\n"; echo "总失败数: " . $metrics['total_failed'] . "\n"; echo "成功率: " . $metrics['success_rate'] . "%\n"; echo "失败率: " . $metrics['failure_rate'] . "%\n"; echo "处理速率: " . $metrics['processing_rate_per_minute'] . " 消息/分钟\n"; // 获取活跃工作者 $workers = $queueManager->getActiveWorkers('email_queue'); echo "\n=== 活跃工作者 ===\n"; foreach ($workers as $worker) { echo "工作者ID: " . $worker['worker_id'] . "\n"; echo "最后心跳: " . date('Y-m-d H:i:s', $worker['last_heartbeat']) . "\n"; echo "是否活跃: " . ($worker['is_active'] ? '是' : '否') . "\n"; echo "---\n"; } ``` ## 部署建议 ### 系统配置 ```bash # 增加文件描述符限制 ulimit -n 65536 # 调整内核参数 echo 'net.core.somaxconn = 65535' >> /etc/sysctl.conf sysctl -p ``` ### Redis配置优化 ```conf # redis.conf maxmemory 2gb maxmemory-policy allkeys-lru timeout 300 tcp-keepalive 60 ``` ### 进程管理脚本 ```bash #!/bin/bash # start_consumers.sh QUEUE_NAME="email_queue" WORKER_COUNT=4 SCRIPT_PATH="/path/to/consumer.php" for i in $(seq 1 $WORKER_COUNT); do php $SCRIPT_PATH --queue=$QUEUE_NAME --worker-id=worker_$i & done wait ``` ## 注意事项 1. **Redis扩展**: 确保PHP安装了Redis扩展 2. **PCNTL扩展**: 多进程功能需要PCNTL扩展支持 3. **内存限制**: 根据实际情况调整内存限制 4. **连接池**: 建议在生产环境中使用连接池 5. **监控告警**: 建议设置队列积压告警 6. **数据备份**: 定期备份Redis数据 ## 许可证 MIT License ## 贡献 欢迎提交Issue和Pull Request! ## 更新日志 ### v1.0.0 - 初始版本发布 - 支持基本的消息队列功能 - 支持重试机制和死信队列 - 支持多进程消费 - 支持优先级队列和延迟队列 - 提供完整的监控和管理功能