代码拉取完成,页面将自动刷新
基于 Go 语言开发的一套支持私聊与群聊的即时通讯系统,采用 RabbitMQ 进行消息异步处理,使用 WebSocket 实现消息实时推送,具备用户登录、好友系统、群组管理、消息存储与离线推送等功能,支持高并发和水平扩展。
主要功能:
用户注册 / 登录、鉴权与会话管理(基于JWT) 私聊与群聊功能,支持文本、表情、图片等类型的消息 WebSocket 实时通信,心跳检测与连接断线重连机制 消息持久化与离线消息存储,历史消息查询接口 群组创建、成员管理、群公告、群消息同步等功能 使用 RabbitMQ + Topic Exchange 进行消息路由与分发,解耦发送与处理逻辑 Redis 实现用户在线状态记录与消息缓存 接入 Prometheus + Grafana 实现服务监控,支持多实例部署与负载均衡
创建一个用户单独的队列,每个请求都发送到这个队列
使用广播交换机Fanout Exchange / Direct/Topic Exchange
json复制编辑{
"message": "消息内容",
"to": "接收者ID或群ID",
"type": "消息类型,0-私聊,1-群聊",
"from": "发送者ID",
"group_id": 1,
"createTime": "2025-06-18T13:24:00Z"
}
type=0
表示私聊,消息通过私聊 routing key 路由(格式如 private.id.{userId}
)type=1
表示群聊,消息通过群聊 routing key 路由(格式如 group.id.{groupId}
)message.topic
durable=true
),确保服务器重启消息不过期queue.user.{userId}
)用途 | 格式 | 说明 |
---|---|---|
私聊 | private.id.{userId} |
只发给指定用户 |
群聊 | group.id.{groupId} |
群内所有成员对应队列绑定此key |
private.id.100
group.id.10
go复制编辑// 绑定 routing key(加入群聊)
func BindRoutingKey(ch *amqp.Channel, queueName, exchangeName, routingKey string) error {
return ch.QueueBind(queueName, routingKey, exchangeName, false, nil)
}
// 解绑 routing key(退出群聊)
func UnbindRoutingKey(ch *amqp.Channel, queueName, exchangeName, routingKey string) error {
return ch.QueueUnbind(queueName, routingKey, exchangeName, nil)
}
go复制编辑r.GET("/ws", func(c *gin.Context) {
// WebSocket 升级
upgrader := websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
panic(err)
}
defer conn.Close()
// 假设从请求参数或认证中获取用户ID
userId := c.Query("user_id")
queueName := "queue.user." + userId
exchangeName := "message.topic"
ch, _ := rabbitMQConn.Channel()
defer ch.Close()
// 订阅用户专属队列,消费消息推送到 WebSocket
go func() {
msgs, _ := ch.Consume(queueName, "", true, false, false, false, nil)
for d := range msgs {
err := conn.WriteMessage(websocket.TextMessage, d.Body)
if err != nil {
break
}
}
}()
// 读取客户端消息并发布到交换机
for {
_, msg, err := conn.ReadMessage()
if err != nil {
break
}
// 解析消息,确定 routing key
var payload MessagePayload
json.Unmarshal(msg, &payload)
var routingKey string
if payload.Type == "0" { // 私聊
routingKey = "private.id." + payload.To
} else if payload.Type == "1" { // 群聊
routingKey = "group.id." + strconv.Itoa(payload.GroupId)
}
ch.Publish(exchangeName, routingKey, false, false, amqp.Publishing{
ContentType: "application/json",
Body: msg,
})
}
})
功能 | 说明 |
---|---|
私聊 | 交换机通过 routing key private.id.{userId} 路由私聊消息给指定用户队列 |
群聊 | 用户队列动态绑定对应群聊 routing key,交换机广播群消息到所有成员队列 |
动态绑定解绑 | 用户加入或退出群聊时调用 RabbitMQ API 动态修改 routing key 绑定关系 |
WebSocket推送 | 消费者监听用户队列,实时推送消息到客户端 WebSocket |
消息发送 | 客户端发消息到后端,由后端发布到交换机指定 routing key |
如果需要,我还可以帮你写一份更详细的接口设计说明或配套代码示例。你看怎么样?
工具
ChatGPT 也可能会犯错。请核查重要信息。
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。