# qq-robot **Repository Path**: hjhcos/qq-robot ## Basic Information - **Project Name**: qq-robot - **Description**: QQ机器人 - **Primary Language**: Python - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-02-22 - **Last Updated**: 2025-04-16 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 大规模数据处理线程池系统设计 ## 系统架构 系统分为五个主要区块:预处理区、缓冲区、任务区、业务区和响应区。每个区块都有其特定的职责和处理逻辑。 ### 架构图 ```mermaid graph LR A[数据输入] --> B[预处理区] B --> C[SQLite存储] C --> D[缓冲区] D --> E[任务区] E --> F[业务区] F --> G[响应区] G --> H[结果输出] subgraph 持久化存储 C I[Redis缓存] end D <--> I ``` ### 区块职责 1. **预处理区** - 接收原始数据输入 - 数据格式验证和清洗 - 任务类型判断 - 将处理后的数据存入SQLite 2. **缓冲区** - 从SQLite读取待处理数据 - 控制内存使用量 - 数据分批处理 - 与Redis配合实现数据持久化 3. **任务区** - 任务优先级管理 - 任务分发控制 - 锁机制管理 - 任务状态追踪 4. **业务区** - 具体业务逻辑处理 - 可扩展的处理器注册机制 - 超时控制 - 错误重试 5. **响应区** - 处理结果分类 - 响应策略管理 - 结果分发 - 异常处理 ### 类图 ```mermaid classDiagram class PreprocessPool { +max_workers: int +db_path: str +workers: List +task_queue: Queue +running: bool +start() +process_data() } class BufferManager { +max_buffer_size: int +current_size: int +buffer_data: Dict +fetch_from_db() +persist_buffer() } class TaskManager { +max_locks: int +locks: Dict +priority_queues: Dict +acquire_lock() +release_lock() } class BusinessProcessor { +timeout: int +process() +process_with_timeout() } class ResponseHandler { +handlers: Dict +response_queue: Queue +handle_response() } PreprocessPool --> BufferManager BufferManager --> TaskManager TaskManager --> BusinessProcessor BusinessProcessor --> ResponseHandler ``` ### 数据流程图 ```mermaid sequenceDiagram participant P as 预处理区 participant B as 缓冲区 participant T as 任务区 participant BU as 业务区 participant R as 响应区 P->>P: 数据验证 P->>SQLite: 存储原始数据 B->>SQLite: 获取待处理数据 B->>Redis: 缓存数据 B->>T: 请求任务锁 T->>BU: 分发任务 BU->>R: 处理结果 R->>Client: 响应结果 ``` ### 流量控制机制 ```mermaid graph TD A[输入流量] --> B{流量检测} B -->|正常| C[正常处理] B -->|超限| D[限流处理] D --> E[请求排队] D --> F[请求丢弃] D --> G[降级处理] ``` 1. **限流策略** - 令牌桶算法 - 滑动窗口 - 计数器限流 - 自适应限流 2. **降级机制** - 优先级降级 - 功能降级 - 超时降级 - 熔断降级 ## 数据库设计 ### SQLite表设计 | 表名 | 字段 | 类型 | 说明 | |------|------|------|------| | raw_data | id | TEXT | 主键 | | | content | TEXT | 数据内容 | | | status | TEXT | 处理状态 | | | task_type | TEXT | 任务类型 | | | created_at | REAL | 创建时间 | | | updated_at | REAL | 更新时间 | ### Redis数据结构 | 键模式 | 类型 | 用途 | |--------|------|------| | buffer:{task_id} | HASH | 缓冲区数据 | | task:{task_id} | HASH | 任务状态 | | lock:{task_id} | STRING | 任务锁 | ## 关键特性 1. **容灾机制** - 定期检查点 - 数据持久化 - 状态恢复 - 异常处理 2. **性能优化** - 内存控制 - 任务优先级 - 批量处理 - 流量控制 3. **可扩展性** - 插件式业务处理器 - 动态响应处理 - 配置化管理 4. **监控告警** - 系统指标收集 - 健康检查 - 告警通知 - 性能分析 ## 使用示例 ```python # 配置系统 config = { "preprocess_workers": 5, "max_buffer_size": 10000, "max_task_locks": 100, "sqlite_path": "data.db", "redis_config": { "host": "localhost", "port": 6379, "db": 0 } } # 初始化组件 preprocess_pool = PreprocessPool( max_workers=config["preprocess_workers"], db_path=config["sqlite_path"] ) buffer_manager = BufferManager( db_path=config["sqlite_path"], redis_config=config["redis_config"], max_buffer_size=config["max_buffer_size"] ) task_manager = TaskManager(max_locks=config["max_task_locks"]) business_manager = BusinessManager() # 注册业务处理器 business_manager.register_processor("image", ImageProcessor()) business_manager.register_processor("text", TextProcessor()) business_manager.register_processor("voice", VoiceProcessor()) # 启动系统 preprocess_pool.start() ``` ## 待优化项 1. 添加更详细的监控指标 2. 实现更灵活的调度策略 3. 优化数据库操作性能 4. 增强异常处理机制 5. 添加更多业务处理器 ## 注意事项 1. 合理配置各区块参数 2. 注意内存使用控制 3. 做好异常处理 4. 定期维护数据库 5. 监控系统状态 ## 系统监控与控制机制 ### 内存使用控制 1. **内存限制策略** ```mermaid graph TD A[内存使用监控] --> B{是否超过阈值} B -->|是| C[触发清理机制] B -->|否| D[继续处理] C --> E[暂停数据输入] C --> F[强制GC] C --> G[持久化缓存] G --> H[恢复处理] ``` 2. **缓冲区控制** - 动态调整缓冲区大小 - 批量处理阈值设置 - 内存水位预警机制 - 自动清理策略 3. **内存泄漏防护** - 对象引用追踪 - 周期性内存分析 - 资源自动释放 - 内存使用报告 ### 全局异常处理 1. **异常分级** | 级别 | 类型 | 处理策略 | 恢复机制 | |------|------|----------|----------| | 严重 | 系统崩溃 | 自动重启 | 状态恢复 | | 错误 | 业务异常 | 重试机制 | 任务重放 | | 警告 | 性能问题 | 降级处理 | 资源释放 | | 提示 | 运行告警 | 日志记录 | 自动修正 | 2. **异常处理流程** ```mermaid sequenceDiagram participant S as 系统 participant E as 异常处理器 participant L as 日志系统 participant M as 监控系统 participant R as 恢复机制 S->>E: 捕获异常 E->>L: 记录异常信息 E->>M: 发送告警 E->>E: 分析异常级别 E->>R: 触发恢复流程 R->>S: 恢复正常运行 ``` ### 系统状态监控 1. **核心指标监控** - CPU使用率 - 内存占用 - 磁盘IO - 网络流量 - 线程池状态 - 队列深度 - 响应时间 - 错误率 2. **监控面板设计** ```mermaid graph LR A[数据采集] --> B[指标聚合] B --> C[状态分析] C --> D[告警触发] C --> E[性能优化] C --> F[资源调度] ``` 3. **预警机制** | 指标 | 警告阈值 | 严重阈值 | 处理策略 | |------|----------|----------|----------| | CPU使用率 | 70% | 90% | 任务限流 | | 内存使用率 | 75% | 85% | 清理缓存 | | 队列积压 | 1000 | 5000 | 扩容处理 | | 响应延迟 | 1s | 5s | 任务降级 | ### 性能优化策略 1. **动态调优** - 自适应线程池 - 智能批处理 - 资源弹性伸缩 - 任务动态调度 2. **资源调度** ```mermaid graph TD A[资源监控] --> B{负载均衡} B -->|过载| C[扩容] B -->|空闲| D[缩容] C --> E[任务重分配] D --> E ``` ### 系统配置建议 1. **内存配置** ```python MEMORY_CONFIG = { "max_buffer_size": "30%", # 系统内存的30% "warning_threshold": "75%", # 警告阈值 "critical_threshold": "85%", # 严重阈值 "cleanup_interval": 300, # 清理间隔(秒) } ``` 2. **监控配置** ```python MONITOR_CONFIG = { "metrics_interval": 60, # 指标采集间隔(秒) "health_check_interval": 30, # 健康检查间隔(秒) "alert_channels": ["email", "slack", "sms"], "log_level": "INFO", } ``` 3. **异常处理配置** ```python ERROR_HANDLING_CONFIG = { "max_retries": 3, # 最大重试次数 "retry_interval": 5, # 重试间隔(秒) "error_threshold": 100, # 错误阈值 "circuit_breaker_timeout": 60, # 熔断超时(秒) } ``` ## 系统维护建议 1. **定期维护** - 日志分析和清理 - 数据库优化 - 性能测试 - 容量规划 2. **应急预案** - 系统降级方案 - 数据备份策略 - 故障恢复流程 - 应急响应机制 3. **监控报告** - 每日性能报告 - 异常统计分析 - 资源使用趋势 - 系统健康评分 ## 最佳实践 1. **内存管理** - 使用内存池 - 实现LRU缓存 - 周期性垃圾回收 - 大对象及时释放 2. **异常处理** - 统一异常封装 - 分级处理策略 - 完整错误链路 - 自动恢复机制 3. **监控告警** - 多维度监控 - 智能告警策略 - 故障自动诊断 - 预测性分析 ## 日志管理系统 ### 日志架构设计 ```mermaid graph TD A[日志生成] --> B[日志过滤] B --> C[日志分发] C --> D[控制台输出] C --> E[文件存储] E --> F[日志轮转] B --> G[日志级别] G --> H[DEBUG] G --> I[INFO] G --> J[WARNING] G --> K[ERROR] ``` ### 日志配置 ```python LOG_CONFIG = { "LOG_DIR": "log", # 日志存储目录 "LOG_LEVEL": "INFO", # 日志级别 "LOG_FORMAT": "%(asctime)s - %(levelname)s - %(message)s", # 日志格式 "DATE_FORMAT": "%Y-%m-%d %H:%M:%S", # 时间格式 "FILTER_STRING": "Release B{是否达到轮转条件} B -->|是| C[创建新日志文件] B -->|否| D[继续写入] C --> E[压缩历史日志] C --> F[清理过期日志] ``` ### 日志使用示例 `logger.py` ```python import os import logging from logging.handlers import TimedRotatingFileHandler # Configuration LOG_DIR = "log" LOG_LEVEL = logging.INFO # 可以通过配置文件或环境变量设置 LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' DATE_FORMAT = '%Y-%m-%d %H:%M:%S' FILTER_STRING = "Release B[日志收集器] C[系统日志] --> B D[错误日志] --> B B --> E[日志分析平台] E --> F[监控告警] E --> G[统计报表] E --> H[故障分析] ``` 2. **告警联动** - 错误日志触发告警 - 警告日志统计分析 - 异常模式识别 - 自动报告生成 3. **性能分析** - 响应时间记录 - 资源使用统计 - 并发情况分析 - 系统瓶颈识别 4. **故障诊断** - 错误日志关联分析 - 问题根因追踪 - 影响范围评估 - 解决方案建议 ## 区块间通信机制 ### 缓冲区与任务区交互 ```mermaid sequenceDiagram participant B as 缓冲区 participant T as 任务区 B->>T: 1. 检查任务区锁状态 T-->>B: 2. 返回可用锁数量 alt 有可用锁 B->>T: 3a. 申请锁并放入数据 T-->>B: 4a. 确认数据接收 else 无可用锁 T->>B: 3b. 订阅锁释放通知 Note over B,T: 数据保持在缓冲区 T->>B: 4b. 锁释放通知 B->>T: 5b. 重新尝试放入数据 end ``` ### 任务区与业务区交互 ```mermaid sequenceDiagram participant T as 任务区 participant B as 业务区 participant R as 响应区 T->>B: 1. 根据数据标识选择业务处理器 B-->>T: 2. 返回处理结果 T->>T: 3. 分析处理结果 T->>R: 4. 选择对应响应处理器 ``` ### 数据流转状态图 ```mermaid stateDiagram-v2 [*] --> 预处理 预处理 --> 待处理: 存入SQLite 待处理 --> 缓冲中: 读取数据 缓冲中 --> 处理中: 获得任务锁 处理中 --> 已完成: 业务处理成功 处理中 --> 失败: 业务处理异常 失败 --> 待处理: 重试策略 已完成 --> [*] ``` ## 关键机制实现 ### 任务锁管理 ```python class TaskLockManager: def __init__(self, max_locks: int): self.max_locks = max_locks self.active_locks = {} self.subscribers = [] def subscribe_lock_release(self, callback): """订阅锁释放事件""" self.subscribers.append(callback) def notify_lock_release(self, lock_id: str): """通知所有订阅者锁已释放""" for subscriber in self.subscribers: subscriber(lock_id) ``` ### 业务处理器注册 ```python class BusinessHandlerRegistry: def __init__(self): self.handlers = { 'image': ImageProcessor(), 'text': TextProcessor(), 'voice': VoiceProcessor(), 'friend': FriendProcessor() } def register_handler(self, type_id: str, handler: BusinessProcessor): """动态注册新的业务处理器""" self.handlers[type_id] = handler ``` ### 响应处理器配置 ```python RESPONSE_HANDLERS = { 'image': { 'generate': ImageGenerationHandler(), 'describe': ImageDescriptionHandler() }, 'text': { 'translate': TextTranslationHandler(), 'summarize': TextSummaryHandler() }, 'voice': { 'synthesize': VoiceSynthesisHandler(), 'recognize': VoiceRecognitionHandler() } } ``` ## 容灾机制 ### 数据恢复流程 ```mermaid graph TD A[系统启动] --> B{检查恢复点} B -->|存在| C[加载上次状态] B -->|不存在| D[初始化系统] C --> E[恢复处理队列] C --> F[重建任务锁] C --> G[重连数据库] E --> H[继续处理] F --> H G --> H ``` ### 状态持久化 | 状态类型 | 存储位置 | 恢复策略 | |----------|----------|----------| | 任务状态 | SQLite | 重新加载未完成任务 | | 锁状态 | Redis | 重建任务锁分配 | | 缓冲数据 | Redis | 重新放入处理队列 | | 处理结果 | SQLite | 继续未完成响应 | ## 数据库维护策略 ### SQLite优化 ```python SQLITE_MAINTENANCE = { "vacuum_interval": 86400, # 每日执行VACUUM "analyze_interval": 3600, # 每小时更新统计信息 "index_rebuild_threshold": 1000000, # 索引重建阈值 "max_page_count": 1000000, # 最大页面数 } ``` ### 数据库性能监控 | 指标 | 描述 | 警告阈值 | 处理策略 | |------|------|----------|----------| | 查询响应时间 | 单次查询平均耗时 | >100ms | 优化索引 | | 写入延迟 | 写入操作耗时 | >200ms | 批量写入 | | 连接数 | 当前活动连接数 | >80% | 释放空闲连接 | | 缓存命中率 | 查询缓存效率 | <60% | 调整缓存策略 | ### 定期维护任务 ```python DB_MAINTENANCE_TASKS = { 'daily': [ 'VACUUM', 'ANALYZE', '清理过期数据' ], 'weekly': [ '重建索引', '更新统计信息', '备份数据库' ], 'monthly': [ '归档历史数据', '优化表结构', '检查数据完整性' ] } ``` ### Redis维护策略 ```python REDIS_MAINTENANCE = { "max_memory": "2gb", "max_memory_policy": "allkeys-lru", "cleanup_interval": 3600, "backup_interval": 86400, "key_expire_notify": True } ``` ## 系统扩展性设计 ### 插件化架构 ```python PLUGIN_CONFIG = { "processor_plugins": { "enabled": True, "auto_discovery": True, "plugin_dir": "plugins/processors" }, "monitor_plugins": { "enabled": True, "auto_discovery": True, "plugin_dir": "plugins/monitors" } } ``` ### 动态加载机制 ```mermaid graph LR A[插件目录] --> B[插件扫描] B --> C[验证接口] C --> D[注册插件] D --> E[启用插件] ``` ## 性能优化指南 ### 数据库优化 1. **SQLite优化** - 使用WAL模式 - 适当的缓存大小 - 定期VACUUM - 索引优化 2. **Redis优化** - 合理的内存策略 - pipeline批量操作 - 避免大key - 定期持久化 ### 代码层优化 1. **内存管理** - 对象池复用 - 及时释放资源 - 避免内存泄漏 - 控制对象大小 2. **并发处理** - 异步IO - 协程使用 - 线程池优化 - 锁粒度控制 ## Python 使用 ollama 框架 API ```python from ollama import chat from ollama import generate from ollama import ChatResponse from ollama import list from ollama import ListResponse from ollama import ps from ollama import ProcessResponse # 1. chat 方法 # 与模型进行对话生成,发送用户消息并获取模型响应: # user 用户角色 assistant 助手角色 response: ChatResponse = chat( model='llava:7b', messages=[{ 'role': 'user', 'content': 'Why is the sky blue?' }] ) print(response.message.content) # 2. generate 方法 # 用于文本生成任务。与 chat 方法类似,但是它只需要一个 prompt 参数: response: ChatResponse = generate( model='llava:7b', prompt='Why is the sky blue?' ) print(response.response) # 3. list 方法 # 列出所有可用的模型: response: ListResponse = list() models = response.models print(len(models), models[0].model, models[0].size) # 4. ps 方法 # 查看正在运行的模型列表: response: ProcessResponse = ps() models = response.models print(len(models), models[0].model, models[0].size) ``` ## Python SILK 转 wav ```python import pilk import struct print("语音时间为:", duration) def add_wav_header(silk_path, wav_path, sample_rate=24000): # SILK 转 PCM duration = pilk.decode(silk_path, silk_path.replace('.silk', '.pcm')) with open(pcm_path, "rb") as f_pcm: pcm_data = f_pcm.read() # 构造WAV头(44字节) header = struct.pack( '<4sI4s4sIHHIIHH4sI', b'RIFF', len(pcm_data) + 36, b'WAVE', b'fmt ', 16, 1, 1, sample_rate, sample_rate * 2, 2, 16, b'data', len(pcm_data) ) with open(wav_path, "wb") as f_wav: f_wav.write(header) f_wav.write(pcm_data) # 使用示例 add_wav_header("xifu.silk", "xifu.wav") ``` ## Python 使用 SenseVoice 识别语音 ```python # /model_inference from gradio_client import Client, handle_file client = Client("http://127.0.0.1:7860/") result = client.predict( # The input value that is provided in the "Upload audio or use the microphone" Audio component. input_wav=handle_file('D:\\topmes\\weChat\\audio\\xifu.wav'), # The input value that is provided in the "Language" Dropdown component. 'auto', 'zh', 'en', 'yue', 'ja', 'ko', 'nospeech' Default: "auto" language="auto", api_name="/model_inference" ) # The output value that appears in the "Results" Textbox component. print(result) ``` ## Python 使用 CosyVoice2 生成声音 ```python # /generate_audio from gradio_client import Client, file tts_text = "寻寻觅觅,冷冷清清,凄凄惨惨戚戚。乍暖还寒时候,最难将息。三杯两盏淡酒,怎敌他、晚来风急?雁过也,正伤心,却是旧时相识。满地黄花堆积。憔悴损,如今有谁堪摘?守著窗儿,独自怎生得黑?梧桐更兼细雨,到黄昏、点点滴滴。这次第,怎一个愁字了得!" client = Client("http://127.0.0.1:50004/") result = client.predict( # The input value that is provided in the "输入合成文本" Textbox component. tts_text=tts_text, # The input value that is provided in the "选择推理模式" Radio component. # Literal['3s极速复刻', '跨语种复刻', '自然语言控制'] Default: "3s极速复刻" mode_checkbox_group="3s极速复刻", # The input value that is provided in the "输入prompt文本" Textbox component. prompt_text="我是通义实验室语音团队全新推出的生成式语音大模型,提供舒适自然的语音合成能力。", # The input value that is provided in the "选择prompt音频文件,注意采样率不低于16khz" Audio component. prompt_wav_upload=file('d:\\topmes\\weChat\\audio\\xifu.wav'), # The input value that is provided in the "录制prompt音频文件" Audio component. prompt_wav_record=file('d:\\topmes\\weChat\\audio\\xifu.wav'), # The input value that is provided in the "输入instruct文本" Textbox component. instruct_text="", # The input value that is provided in the "随机推理种子" Number component. seed=0, # The input value that is provided in the "是否流式推理" Radio component. stream="true", # The input value that is provided in the "速度调节(仅支持非流式推理)" Number component. speed=1, api_name="/generate_audio" ) # The output value that appears in the "合成音频" Audio component. result # filepath str ``` ## Python 发送 QQ 信息 ```python import requests import json url = "http://127.0.0.1:3000" # 处理好友请求 set_friend_add_request = '/set_friend_add_request' payload = json.dumps({ # 请求ID "flag": "textValue", # 是否同意 "approve": False, # 好友备注 "remark": "textValue" }) headers = { 'Content-Type': 'application/json' } response = requests.request("POST", url + set_friend_add_request, headers=headers, data=payload) response.json() # 响应结果 { # 请求状态 ok failed 'status': 'failed', # 响应码 'retcode': 200, # 数据 'data': None, # 提示信息 'message': 'No such request', # 提示信息(人性化) 'wording': 'No such request', # 回显 'echo': None } ``` ```python import requests import json url = "http://127.0.0.1:3000" # 发送私聊文本 send_private_msg = '/send_private_msg' payload = json.dumps({ # 对方QQ号 "user_id": '3140546263', # 消息内容 "message": [ { # 文本信息 "type": "text", # 文本内容 "data": { "text": "napcat" } } ] }) headers = { 'Content-Type': 'application/json' } response = requests.request("POST", url + send_private_msg, headers=headers, data=payload) response.json() # 响应结果 { # 请求状态 ok failed 'status': 'ok', # 响应码 'retcode': 200, # 数据 'data': {'message_id': 953059377}, # 提示信息 'message': 'No such request', # 提示信息(人性化) 'wording': 'No such request', # 回显 'echo': None } ``` ```python import requests import json url = "http://127.0.0.1:3000" # 发送私聊图片 send_private_msg = '/send_private_msg' payload = json.dumps({ # 对方QQ号 "user_id": '3140546263', # 消息内容 "message": [ { # 图像 "type": "image", # 图像文件 "data": { "file": r"file://C:\Users\hjhco\Pictures\46a3e34f3e4ceec0b388099f4658d88106973bae9d6aebffe643eb5c941e74fd.png" } } ] }) headers = { 'Content-Type': 'application/json' } response = requests.request("POST", url + send_private_msg, headers=headers, data=payload) response.json() # 响应结果 { # 请求状态 ok failed 'status': 'ok', # 响应码 'retcode': 200, # 数据 'data': { # 消息ID 'message_id': 953059377 }, # 提示信息 'message': 'No such request', # 提示信息(人性化) 'wording': 'No such request', # 回显 'echo': None } ``` ```python import requests import json url = "http://127.0.0.1:3000" # 发送私聊语音 get_friends_with_category = '/send_private_msg' payload = json.dumps({ # 对方QQ号 "user_id": '3140546263', # 消息内容 "message": [ { # 文件 "type": "record", # 文件 "data": { "file": r"file://D:\topmes\weChat\output_manual.wav" } } ] }) headers = { 'Content-Type': 'application/json' } response = requests.request("POST", url + get_friends_with_category, headers=headers, data=payload) response.json() # 响应结果 { # 请求状态 ok failed 'status': 'ok', # 响应码 'retcode': 200, # 数据 'data': { # 消息ID 'message_id': 953059377 }, # 提示信息 'message': 'No such request', # 提示信息(人性化) 'wording': 'No such request', # 回显 'echo': None } ``` ```python import requests import json url = "http://127.0.0.1:3000" # 获取语音 get_record = '/get_record' payload = json.dumps({ # 文件ID "file_id": "9a501ca30d6fb902a33b9aa1f2b8bee3.amr", # 输出格式:mp3 amr wma m4a spx ogg wav flac 默认 mp3 "out_format": "mp3" }) headers = { 'Content-Type': 'application/json' } response = requests.request("POST", url + get_friends_with_category, headers=headers, data=payload) response.json() # 响应结果 { # 请求状态 ok failed 'status': 'ok', # 响应码 'retcode': 0, # 数据 'data': { # 路径或链接 'file': 'F:\\Tencent\\QQNT\\file\\Tencent Files\\3821984330\\nt_qq\\nt_data\\Ptt\\2025-02\\Ori\\9a501ca30d6fb902a33b9aa1f2b8bee3.amr.mp3', # 路径或链接 'url': 'F:\\Tencent\\QQNT\\file\\Tencent Files\\3821984330\\nt_qq\\nt_data\\Ptt\\2025-02\\Ori\\9a501ca30d6fb902a33b9aa1f2b8bee3.amr.mp3', # 文件大小 'file_size': '1648', # 文件名 'file_name': '9a501ca30d6fb902a33b9aa1f2b8bee3.amr', # 文件base64编码 "base64": "SUQzBAAAAAAAI1RTU0UAAAAPAAADTGF2ZjU4LjQ1LjEwMAAAAAAAAAAAAAAA//OEwAAAAAAAAAAAAEluZm8AAAAPAAAAJQAADqAAExMaGhogICAnJy0tLTQ0NDs7QUFBSEhITk5OVVVbW1tiYmJpaW9vb3Z2dnx8g4ODiYmJkJCQlpadnZ2kpKSqqrGxsbe3t76+xMTEy8vL0tLS2Njf39/l5eXs7PLy8vn5+f//AAAAAExhdmM1OC45MQAAAAAAAAAAAAAAACQCQAAAAAAAAA6ge4AM2AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA//NExAAQ6NIsAGGGjH8WEMXhIamcZXJ6ETGTMeCIgA0nbEMvbsxVsXDQq8Kn+hBzDHQhYEAcc5aPJnzkMVhYIvPnJRH8oUG+UJ+NPk5coJCaBZmtuN/AmJwEJRjkjL1V//NExA8S8T5pnEpGDJJNA5tcY6Zq+jihkd82uhHeGke1r6cIu/okXPOLXwgAPsEFCShxpAWFdgx0xIVpuWtTXqa05PlF4JvY1qaOlIlOC7JyA+SVsxiT6q8y/7jrctec//NExBYWmrJoXHmEvYR+/5FJZNz06hpxn3GLbHLdm2tf2kYEmGGbFo0pOmkuP4m9KG5utn2V0W7u20jLYujOn3u3//TSRlpWqsjOIUCecurfVQhYbEC2QsoXIFRIZOF9//NExA4TAJpk7MMGDI5XtLntCXMhMpAqV9iogUEHkAGA4wwFYrWpiRdoaNjp1jHyLlPaZEjDVDcSlCqKTyitP+TMK4WQGA81Vx8WCtuvWI9hMEQL9iruLCn1PLB34mBw//NExBUW0UZ9nHhwkIhAn40Ad4+0m7csmIcutOLuLlMJzSlXyry1CrWXv8sR64lFJqihu1DmPd58/X3csK9YcAxAHwSGlC/6PLkPy6v295ygvcaVBstuskslwoGVgL5P//NExAwU2RK6XmDw6rNHoVv1b9ykHU75l5TXICLh880fLiGFDJK49gtM0FdCmgSJ9iuUamaPOg3uwJi7QGIRyBaI9lAokiVVjUCdhLzx5QH//0BJii3dbY05ZAIBNU7A//NExAsVEPquXtMGjgwn0xvI4SY3EQyCEoXGzgmNOHGSAkqNEwwIE5soETKJDNUTEq8/aijxzNGKmleEinSljrGUzxzIZHD7zwbW5G3ora7////+RKLqAUjcgTttoAH5//NExAkUgZbKXsCSnkBpRSlup9m9sTB8G/gpCjHMPOM60gZLHndf/5JCNTyMSEK0iogwoSEEUhQKCSJwcMEZsgcgUQIG7RyZJ0mFGrD4jT00cEgGQByhngbPJv02DdJs//NExAoUqX6UANFSuMs2cdBq9trQhnkrk/0IbEw2o6gNOmOzIqVJjyiYOKjRapVU7U3anw2pMsiAUkgUkgRmFyxKqUiw3jb2WtArXf1eH6/mSA7+jP0hzzSWSOW3AYUJ//NExAoUcW7eXmvEMgoqhF1NkxMDSFsuDZd9dvqvXcG07wudpsHrns1hrFom2v/ykMDEOc7CbAJEEqYJOGklldv1oGiNFqt5JAnGxohHeRYCqFNcNQA3I7E4nZKB6EZK//NExAsU8TrKXnpEqvyFBNfFRWRT+MTTYg9okcws/qz6sIFD/ceuCem2ZooRMt2j6Lev74ooBwKDwnBu5ZZ6lnN6VP31G2C7VKnVMI57WyIAK48EjCEBmviga5AMJwdX//NExAoU6V6hnssETCtuaILQdNHZeBF8wCWNpVOp/x6cFimQCZEe9jZqFsohSiKvL202+x2KrBSmDDuCO978zclbNrY/OqNhWljBQWBZjxQbRvLCg5tC7wg7wqUwWRmK//NExAkTUrKUVNDEjSkfESE7BBJCC09nIwcqn9UISBFlsLSgNtrmdk+layWfy/Znvb7meR3BOCIZl/0///r5n6tupb7d6WalUbI6dv3FK5h26kAnI5W43IIBhyOkD6+g//NExA4SQVrCXnsGTplDw6QESMmK2rJPUULMHnaBDLwkNDZD/K16GzEzyN3+sfvS/OeZdJ9FFvmbr//zgJqAYWCDH//3w2uZAAtk2aUYuwH23SAulIQbIUEs9nRr3CM4//NExBgSUMqyXsMGbA6LHYcvzp5TWfRhf9rGiT/VCgGBmwQePScUcD58IwukxSTfu//9UJjuPppb9F4ncmSqqT+u9cknAwDXGVhMzUhyGxv3JQIQQ1EEvf32+m76DPlt//NExCESWOruXniNBl+fM2NfiNTvBgdyOQ3nIYkGwCbNLCSfUCBoX+yYHJn2s////jwOIF3VAJXQX4OoSo+RUm6VYJ7NdIRFx/l7Ze52Nu/KN+1DfmtH+cKwB25rsVRC//NExCoSOPqxjDYSEAEDakQqfhEQzqQqZmWSurrg2cDv///Lf/oVQoQmniwJicnImIxycmUxiFyDLAlAEir9ZscIywFVP0T/kpePv2nBYqBkVPus9lmrg1WwN4UomidH//NExDQReV6oKspEtGtb6SqlXGDZD/////7xY+QYLQQCWaAElREYNhyb2LScfQmCVQtDW3bTCmX0kDHmgb6JLNSCIeDI2ieQYFUd7qHCzGpQUp7NU08MgQmCCj5MU/+X//NExEERwKKo9E4SBFDqjEmI7wDZSQrXZYkBXQEzd1CkcXD19gxIpmgWRqA+LMzAW3QBhX/P/VMYgEMHnxkOO5oU/Puhbxq1M9OJSgTMSPVEOPTemCCSnB+Z31JyyklF//NExE0TIirGXkhGvAVG0jLZU1g15DNJ4lnNXi9RkorSggbBliiwVqBWpK6jXOmqt+fnWuWS57CYqDRAFWSVB2POJQDrEf9pdJk6vIiqzuhKENesqMDyAFlTIMutdjYD//NExFMSaRa+XFmGLNnBCrpGkLomnsrrMXxAS06pipTxIxxr0JHqKj1kmrUVWxqDDHMiYeoNB4PCxYEpEotAtvZYwuVJaKzvXU3EZlCyVQHWAC1bkKHTqM2DURZ6RAB2//NExFwSmI6+/jJGLN6kdzDsHuoS6COGK5NwfhExiYl/JurJDTR9NwKyi2tOFCZCHxiOkYqf9A+jRL16X+Rd+z7/mE5tAUKAv66UBVsUIV6HFqVQwUCTSlIItPWkPArU//NExGQSMJKqNMPGcOcoCaxGIQAOqfgmzqZoStrkH2rFgY45kQnRME0rNONTKKHp0EzwLBRvGc4ihRNf//0vAYEJlakEC9nPCmR4DNfPjcA5N7DNSNi8ltZpH3X3CZcl//NExG4UcJquNsPMjOJBUzIFeMmOkYSpdGZVXcTSYRx9rDASUMcR0KSABZJl9W4W91vv///9LXHtaoBAH7uOASYMkBK1sSBa3Y7a6J+GTFkN2NIK4Xp2iybPsFAzybfd//NExG8SGLas9E4SGKMIFG7rWUt/v/SpmbSDMxLqeqC1EzYQuZp/0M/r/8eadFBSgADttm+4EnwIG48txArbhl8PL7HSnU+czMoyNE2cifnQhisXLF8zOUYWQBOQGhA2//NExHkSMUbBlnlG4i3ULJJXptXPqDw1zp3VJvk/3f/xoDNJ3ktNATlklsoFweID45h7NZuZqTg1e8qHtyYWmUE9pVhx7e1YR53JYRphYGguUAL1gOERdTiaFJQ48AVv//NExIMSaH65HmYeAADFlkxzb25Wr//5FsOhpx0XTQBlutuGwFQfDAoD06JN7+jZidQprNNs5mLMs5w8azRodp5qqxG6X8iQ4pXf7nP6jE4UHBYdeiSdEwZCJ4kV//9V//NExIwSgMrMflMGHvmxjj4kQVWqIAdusl1wH4SeyJ6fK7nLF5BqycGGz/8zN7yzteSup1u1upbZV9n6Qfw/yZOmYOf7unV/7LRtZC6q5hDkBGZA/nV8514DaZrqAkks//NExJUSSWbdvipGOnJIB0A8sVFJA/dTFJKkKiyKSSJEiQVqgcKJowiEoTh0GTINERyL3AIRAdBUm3YjDvRxr/+vd3ERg8yx9abDc28FSSyRuAQMHIYlmghbWmkTLeQl//NExJ4R0q7VHjBHdvkTnQDwDkVi4qSPBhRU4cUVFUguBHPSoUQSFmAUQioiyLVgJyHik0z/bvzRHMMspnVA0dOlahCXr//9uAmI5MCW11EndTInm7hcXjWN1tS1mOZc//NExKkQwIq4fkhGwkgUy9QJoOvOhGiGKXCQNMgckONHxVrklRfERtZ1r9lbfQzqeSTvdKMBqBgkigEqBJrtt/qBpRCsIvsdsb0Dy2NygJdVkAy7ndXuPkJWppx6VPBc//NExLkSEGKsfmHMIrjGjjALttBc1BwcfFyCuepAHFY1iGKl6jLnt09WeMcmxa11aSosIYKXpvdBRXWaOCINCC8+SDkwssXPB2EuxKEMLzdNbdjnlh77IWfdfdwfRqOP//NExMMSYJ6ZHmGENJUvYXW16mBbIxVHvRJ9f1I+eUkyM10RUXXbdTAwnGsRUhUH4ZIJuaRnJisLCgfQg6y6R0wyEAXKAAsoPuIoUaMl1OMEAO5TFkxGDVC0wyu5u5gt//NExMwSGJqWXmDMhNceoUcLJ8db0IenHRy0KjeIUpyCnCPoTJFhqT88EonGQ5ASCo+5MShKVVszEyST3F0ICQEjcDCjjeWzZRegInUmgYCnRLiVYKxEDUGXW/ksqt2H//NExNYSIOpkAMMQTAs/4KqHq1A08GpaIqxn1upMQU1FMy4xMDCqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq//NExOASCG5wNGMMKKqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq//NExOoU4QZAAHsGVKqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq" }, # 提示信息 'message': '', # 提示信息(人性化) 'wording': '', # 回显 'echo': None } ``` ## Python 监听 QQ 信息 `NapCatServer.py` ```python import asyncio import websockets import signal import json from logger import log_message # 导入 log_message 函数 from NapCatMessageHandler import MessageHandler handler = MessageHandler() async def echo(websocket, path): async for message in websocket: # await websocket.send(f"Echo: {message}") # log_message('INFO', f"Sent echo message: {message}") log_message('INFO', f"{message}") data = json.loads(message) log_message('INFO', f"{data['post_type']}") start_server = websockets.serve(echo, "localhost", 8765) loop = asyncio.get_event_loop() # Function to handle shutdown def shutdown(): log_message('INFO', "Shutting down server...") start_server.ws_server.close() loop.stop() # Register signal handlers signal.signal(signal.SIGINT, lambda s, f: shutdown()) signal.signal(signal.SIGTERM, lambda s, f: shutdown()) log_message('INFO', "Starting server on ws://localhost:8765") loop.run_until_complete(start_server) loop.run_forever() ``` `NapCatMessageHandler.py` ```python import json from logger import log_message # 导入 log_message 函数 class MessageHandler: def process_log(self, log_entry): # 解析 JSON 日志 try: log_data = json.loads(log_entry) except json.JSONDecodeError: log_message('ERROR', "Invalid JSON format") return # 检查 target_id 和 self_id if log_data.get("target_id") == log_data.get("self_id"): log_message('WARNING', "Ignoring message sent to self") return # 根据 post_type 处理不同类型的消息 post_type = log_data.get("post_type") if post_type == "meta_event": self.handle_event(log_data) elif post_type == "message_sent": self.handle_message(log_data) elif post_type == "message": self.handle_message(log_data) elif post_type == "notice": self.handle_notice(log_data) else: # log_message('ERROR', f"Unhandled post_type: {post_type}") pass def handle_event(self, event_data): meta_event_type = event_data.get("meta_event_type") if meta_event_type == "lifecycle": # 生命周期 self.handle_lifecycle(event_data) elif meta_event_type == "heartbeat": # 心跳 self.handle_heartbeat(event_data) else: log_message('ERROR', f"Unhandled meta_event_type: {meta_event_type}") def handle_lifecycle(self, event_data): sub_type = event_data.get("sub_type") # WebSocket 连接成功 if sub_type == 'connect': pass def handle_heartbeat(self, event_data): # "status":{"online":true,"good":true} status = json.loads(event_data.get("status")) if status == '': pass def handle_message(self, message_data): message_type = message_data.get("message_type") if message_type == "private": self.handle_private_message(message_data) else: log_message('ERROR', f"Unhandled message_type: {message_type}") def handle_private_message(self, message_data): # 根据消息内容类型调用不同的处理方法 message_content = message_data.get("message", []) for content in message_content: content_type = content.get("type") if content_type == "text": self.handle_text_message(content) elif content_type == "face": self.handle_face_message(content) elif content_type == "image": self.handle_image_message(content) elif content_type == "record": self.handle_record_message(content) else: log_message('ERROR', f"Unhandled content_type: {content_type}") def handle_text_message(self, content): text = content.get("data", {}).get("text", "") log_message('INFO', f"Handling text message: {text}") def handle_face_message(self, content): face_id = content.get("data", {}).get("id", "") log_message('INFO', f"Handling face message with id: {face_id}") def handle_image_message(self, content): image_url = content.get("data", {}).get("url", "") log_message('INFO', f"Handling image message with url: {image_url}") def handle_record_message(self, content): record_file = content.get("data", {}).get("file", "") log_message('INFO', f"Handling record message with file: {record_file}") def handle_notice(self, notice_data): notice_type = notice_data.get("notice_type") if notice_type == "notify": self.handle_notify(notice_data) else: log_message('ERROR', f"Unhandled notice_type: {notice_type}") def handle_notify(self, notify_data): sub_type = notify_data.get("sub_type") # 输入状态更新 if sub_type == "input_status": self.handle_input_status(notify_data) else: log_message('ERROR', f"Unhandled sub_type: {sub_type}") def handle_input_status(self, notify_data): pass ``` `data = json.loads(message)`:QQ 消息数据格式 ### meta_event 事件
事件名说明可用备注
meta_event.lifecycle生命周期
meta_event.lifecycle.enable生命周期 - OneBot 启用
meta_event.lifecycle.disable生命周期 - OneBot 停用
meta_event.lifecycle.connect生命周期 - WebSocket 连接成功
meta_event.heartbeat心跳
### message 事件
事件名说明可用备注
message.private私聊消息
message.private.friend私聊消息 - 好友
message.private.group私聊消息 - 群临时
message.private.group_self私聊消息 - 群中自身发送
message.private.other私聊消息 - 其他
message.group群聊消息
message.group.normal群聊消息 - 普通
message.group.anonymous群聊消息 - 匿名消息
message.group.notice群聊消息 - 系统提示
### message_sent 事件
事件名说明可用备注
message_sent.private私聊消息
message_sent.private.friend私聊消息 - 好友
message_sent.private.group私聊消息 - 群临时
message_sent.private.group_self私聊消息 - 群中自身发送
message_sent.private.other私聊消息 - 其他
message_sent.group群聊消息
message_sent.group.normal群聊消息 - 普通
message_sent.group.anonymous群聊消息 - 匿名消息
message_sent.group.notice群聊消息 - 系统提示
### request 事件
事件名说明可用备注
request.friend加好友请求
request.group.add加群请求需要管理员权限
request.group.invite邀请登录号入群
### notice 事件
事件名说明可用备注
notice.friend_add好友添加
notice.friend_recall私聊消息撤回
notice.offline_file接收到离线文件
notice.client_status其他客户端在线状态变更
notice.group_admin群聊管理员变动
notice.group_admin.set群聊管理员变动 - 增加
notice.group_admin.unset群聊管理员变动 - 减少
notice.group_ban群聊禁言
notice.group_ban.ban群聊禁言 - 禁言
notice.group_ban.lift_ban群聊禁言 - 取消禁言
notice.group_card群成员名片更新
notice.group_decrease群聊成员减少
notice.group_decrease.leave群聊成员减少 - 主动退群
notice.group_decrease.kick群聊成员减少 - 成员被踢
notice.group_decrease.kick_me群聊成员减少 - 登录号被踢
notice.group_increase群聊成员增加
notice.group_increase.approve群聊成员增加 - 管理员已同意入群
notice.group_increase.invite群聊成员增加 - 管理员邀请入群
notice.group_recall群聊消息撤回
notice.group_upload群聊文件上传
notice.group_msg_emoji_like群聊表情回应仅收自己的 其余扩展接口拉取
notice.essence群聊设精
notice.essence.add群聊设精 - 增加
notice.essence.delete群聊设精 - 取消
notice.notify.poke戳一戳
notice.notify.lucky_king群红包运气王
notice.notify.honor群成员荣誉变更
notice.notify.honor.talkative群成员荣誉变更 - 龙王
notice.notify.honor.performer群成员荣誉变更 - 群聊之火
notice.notify.honor.emotion群成员荣誉变更 - 快乐源泉
notice.notify.input_status输入状态更新
notice.notify.title群成员头衔变更
notice.notify.profile_like点赞