# go-zero-IM **Repository Path**: openour/go-zero-im ## Basic Information - **Project Name**: go-zero-IM - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-10-13 - **Last Updated**: 2025-10-31 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # golang websocket IM 基于go-zero实现的websocket IM服务 ## 基础安装 * 安装go环境 ```shell go mod init ``` * 安装go-zero ```shell # 1. 删除旧的 goctl(可能是 x86_64 架构) rm -f $(which goctl) # 2. 克隆源码 git clone https://github.com/zeromicro/go-zero.git cd go-zero/tools/goctl # 3. 确保使用的是 arm64 架构的 go 工具链 go env GOARCH # 如果不是 arm64,运行下面命令切换回正确架构: # arch -arm64 zsh # 4. 编译 goctl go build -o ~/go/bin/goctl # 5. 加入环境变量(如果没加过) export PATH=$HOME/go/bin:$PATH # 6. 测试安装情况 goctl -h ``` ```shell # 进入项目目录下 go install github.com/zeromicro/go-zero/tools/goctl@latest go get -u github.com/zeromicro/go-zero@latest ``` * 部署开发过程中需要的组件 参考`docker-compose.yaml`,项目的启动依赖于`etcd` ``` docker-compose up -d ``` ## 常用命令 ```shell # 构建rpc服务 cd apps/user/rpc goctl rpc protoc user.proto --go_out=. --go-grpc_out=. --zrpc_out=. # 构建api服务 cd apps/user/api goctl api go -api user.api -dir . -style gozero ``` ## mysql数据管理 ```sql CREATE TABLE `users` ( `id` varchar(24) COLLATE utf8mb4_unicode_ci NOT NULL, `avatar` varchar(191) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '', `name` varchar(24) COLLATE utf8mb4_unicode_ci NOT NULL, `phone` varchar(20) COLLATE utf8mb4_unicode_ci NOT NULL, `password` varchar(191) COLLATE utf8mb4_unicode_ci DEFAULT NULL, `status` int(10) COLLATE utf8mb4_unicode_ci DEFAULT NULL, `created_at` timestamp NULL DEFAULT NULL, `updated_at` timestamp NULL DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci; ``` ```shell goctl model mysql ddl --src user.sql --dir "./models/" -c ``` ## mongodb模型构建 ```shell go get go.mongodb.org/mongo-driver/v2@latest goctl model mongo --type chatLog --dir ./apps/im/immodels # 然后修改模型即可 ``` ## 私聊消息模版 ```json { "method": "conversation.chat", "fromId": "94673760430326036", "data": { "message": { "messageType": 0, "content": "发送一段测试信息内容" }, "sendID": "xxxx", "receiverID": "xxxx", "chatType": 1 } } ``` ## kafka ```sell go get github.com/zeromicro/go-queue/kq ``` ## 三种心跳检测定时器 1. 检测超过最大空闲时间 2. 检测超过最大连接时间 3. 指定时间段内没有有给数据交互 以上三种任意三种满足都应该断开连接 | 类型 | 检测粒度 | 用途 | 建议默认值 | |-------------|-------|------------|------------| | 最大空闲时间 | 分钟级 | 回收长时间无操作连接 | 1-5 分钟 | | 最大连接时间 | 小时-天级 | 负载均衡、强制重连 | 12-48 小时 | | 心跳超时(短期无交互) | 秒级 | 快速检测异常 | 心跳间隔 + 5 秒 | ## 消息投递到kafka流转说明 客户端发送消息,IM服务 ,IM服务将消息投递到kafka,kafka消费侧进行消费,消费消息存储消息到数据库中, 消费的同时通过连接到IM的socket连接将消息push给在线的用户连接,对不在线的用户则走移动端的消息提醒 ```Mermaid sequenceDiagram autonumber participant 客户端A as 客户端A(发送方) participant IM服务 as IM服务 participant Kafka as Kafka消息队列 participant 消费者 as 消费者(Message Consumer) participant 推送服务 as 推送服务(APNs / FCM) participant 客户端B as 客户端B(接收方) 客户端A->>IM服务: ① 发送消息(WebSocket/TCP) IM服务->>IM服务: 校验消息、生成 message_id IM服务->>Kafka: ② 投递消息到 Kafka Topic(im_message_topic) Kafka-->>消费者: ③ 消费者订阅并消费消息 消费者->>数据库: ④ 存储消息记录(MySQL/MongoDB) 消费者->>Redis: ⑤ 查询接收方是否在线 alt 接收方在线 消费者->>IM服务: ⑥ 通知IM服务推送消息 IM服务->>客户端B: ⑦ 通过Socket连接推送消息 else 接收方离线 消费者->>推送服务: ⑥ 调用移动端推送(APNs / FCM) 推送服务->>客户端B: ⑦ 系统级通知提醒 end 客户端B-->>IM服务: ⑧ 回执ACK(已接收/已读) ``` 以上时序图为用redis维护用户状态,本项目为单点系统,没有实现这个过程 ## ack的三种模式 ### 1️⃣ **不采用 ACK 机制** ```Mermaid sequenceDiagram participant Client participant Server Client->>Server: Send Message Note right of Server: Server receives message\nbut does not send any ACK ``` **说明**: - 客户端发送消息后不等待 ACK,消息可能丢失。 - 简单但不可靠。 ------ ### 2️⃣ **一次应答处理(单次 ACK)** ```Mermaid sequenceDiagram participant Client participant Server Client->>Server: Send Message Server-->>Client: ACK ``` **说明**: - 客户端发送消息后等待一次 ACK。 - 如果 ACK 收到,认为消息成功送达。 - 如果 ACK 超时,可选择重试一次。 ------ ### 3️⃣ **三次通信保证可靠性(类似 TCP 三次握手 + 消息确认)** ```Mermaid sequenceDiagram participant Client participant Server Client->>Server: Send Message (1st) Server-->>Client: ACK1 Client->>Server: ACK2 Server->>Client: ACK3 Note right of Server: 经过三次确认\n保证消息可靠到达 ``` ### 实现的 appendMsgMq ```Mermaid sequenceDiagram participant Client participant AckManager %% 客户端发送新消息 Client->>AckManager: Send Message(msg) %% 加锁处理消息队列 Note right of AckManager: messageMu.Lock() %% 判断消息是否已存在 AckManager->>AckManager: check readMessageSeq[msg.ID] alt 消息已存在 Note right of AckManager: 消息正在 ACK 流程中 alt 队列为空 Note right of AckManager: readMessage 为空,直接返回 else AckSeq >= 旧消息 AckSeq Note right of AckManager: 重复消息,直接返回 else 更新消息记录 Note right of AckManager: 更新 readMessageSeq[msg.ID] = msg end else 消息不存在 alt 是 ACK 帧 Note right of AckManager: 没有对应消息,忽略 ACK else 普通新消息 AckManager->>AckManager: append to readMessage AckManager->>AckManager: add to readMessageSeq[msg.ID] = msg end end %% 解锁 Note right of AckManager: messageMu.Unlock() ``` ### OnceAck 模式 ```Mermaid sequenceDiagram autonumber participant Client participant Server participant conn as HeartbeatConnection participant readAck as readAck 循环 participant handlerWrite as handlerWrite 循环 Note over Client,Server: OnceAck 模式:一次性确认后立即进入业务处理 %% Step 1: 客户端发送消息 Client->>Server: FrameData {Id, Method, Data...} Server->>conn: 保存到 readMessageSeq 和 readMessage %% Step 2: 服务端 readAck 检测 loop 每100ms扫描 readMessage readAck->>conn: 读取 readMessage[0] readAck->>Client: 发送 FrameAck(AckSeq+1) readAck->>conn: 移除 readMessage[0] readAck->>conn.message: 投递消息给 handlerWrite end %% Step 3: handlerWrite 处理业务消息 handlerWrite->>Server: 查找并执行 handler(按 message.Method) alt handler 存在 handlerWrite->>Client: 返回业务执行结果 else handler 不存在 handlerWrite->>Client: 返回错误 "不存在的执行方法" end %% Step 4: 清理确认状态 handlerWrite->>conn: 删除 readMessageSeq[message.Id] Note right of handlerWrite: 消息处理完成并清理 ``` ### RigorAck 模式 ```Mermaid sequenceDiagram autonumber participant Client participant Server participant conn as HeartbeatConnection participant readAck as readAck 循环 participant handlerWrite as handlerWrite 循环 Note over Client,Server: RigorAck 模式:双阶段确认,可靠交互 %% Step 1: 客户端发送消息 Client->>Server: FrameData {Id, Method, Data...} Server->>conn: 保存到 readMessageSeq 和 readMessage %% Step 2: readAck 进入确认流程 loop 每100ms扫描 readMessage readAck->>conn: 读取 readMessage[0] alt 首次收到 (AckSeq == 0) readAck->>conn: 更新 AckSeq++, AckTime readAck->>Client: 发送 FrameAck(AckSeq) Note right of readAck: 第一次确认(告知客户端已收到) else 已有确认记录 Client->>Server: 返回 FrameAck(AckSeq++) readAck->>conn: msgseq := readMessageSeq[message.Id] alt msgseq.AckSeq > message.AckSeq readAck->>conn: 移除 readMessage[0] readAck->>conn.message: 投递消息给 handlerWrite Note right of readAck: 客户端已确认,进入业务处理 else 检查是否超时 alt 超时 (ackTimeout 到期) readAck->>conn: 删除 readMessageSeq[message.Id] readAck->>conn: 移除消息并放弃确认 else 未超时 readAck->>Client: 重新发送 FrameAck 重试 end end end end %% Step 3: handlerWrite 执行业务逻辑 handlerWrite->>Server: 查找并执行 handler(按 message.Method) alt handler 存在 handlerWrite->>Client: 返回业务执行结果 else handler 不存在 handlerWrite->>Client: 返回错误 "不存在的执行方法" end %% Step 4: 清理 handlerWrite->>conn: 删除 readMessageSeq[message.Id] Note right of handlerWrite: 消息处理完成,可靠确认闭环 ``` ## 镜像拉取失败 ```shell docker pull bitnami/kafka --disable-content-trust=false ``` ## 更新进度 * v1.0.0 完成go-zero环境的搭建✅ * v2.0.0 完成用户rpc基本服务✅ * v2.0.1 user-rpc服务能启动,并访问✅ * v2.0.2 api服务实现并能启动访问✅ * v2.0.3 api服务对rpc服务的调用✅ * v2.0.4 mysql数据读写操作✅ * v2.0.5 api中间件的使用✅ * v3.0.0 实现user rpc/web服务✅ * v3.0.1 增加rpc用户手机号、密码加密、雪花id、jwt、注册登录的业务✅ * v3.0.2 优化响应输出,api错误码的统一处理✅ * v4.0.0 完成im基本服务框架结构搭建及消息的路由分发✅ * v5.0.0 存储连接对象及设计鉴权✅ * v6.0.0 消息的发送✅ * v7.0.0 使用options代码风格的优化、连接的鉴权、记录连接通道✅ * v8.0.0 接入消息发送,路由加载✅ * v9.0.0 实现im服务用户登入连接,鉴权✅ * v10.0.0 实现im心跳检测✅ * v10.0.1 替换使用带心跳检测的连接✅ * v10.0.2 区分心跳消息和普通消息,心跳消息更新心跳时间,普通消息更新最后活跃时间✅ * v11.0.0 好友私聊,私聊数据存储、请求信息、实现私聊【这种直接写入数据库模式是不合理的】✅ * v12.0.0 使用kafka构建异步消费服务✅ * v13.0.0 基于kafka异步数据存储落地及消息通信, 构建好websocket客户端✅ * v13.0.1 超级token验证,mq中的服务业务,push消息到客户端,websocket将接收消息写入消息队列✅ * v14.0.0 实现消息的ack机制,基础结构,options配置与消息属性✅ * v14.0.1 实现ack机制与使用✅ * v15.0.0 用户拉取离线消息 * v15.0.1 完成im-rpc服务的功能开发 * 切换为pgsql * 使用延迟队列 * 链路追踪/日志收集统计 * 参考https://github.com/go-redis/cache,优化下redis的使用