Ai
1 Star 9 Fork 5

张慧君/Chat

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
chat.go 13.80 KB
一键复制 编辑 原始数据 按行查看 历史
zhj 提交于 2019-11-18 11:55 +08:00 . 代码调整
package ctrl
import (
"net/http"
"strconv"
"github.com/gorilla/websocket"
"log"
"gopkg.in/fatih/set.v0"
"sync"
"fmt"
"encoding/json"
"net"
"github.com/streadway/amqp"
"../common"
)
//核心在于形成userid和Node的映射关系
type Node struct {
Conn *websocket.Conn //保存websocket连接,conn是io型的资源
DataQueue chan []byte //并行的数据转成串行的数据
GroupSets set.Interface //组,线程安全
}
//映射关系表(map的键是userid,值是Node,全局的map,所有协程共享)
var clientMap map[int64]*Node = make(map[int64]*Node)
//读写锁
var rwlocker sync.RWMutex
//接收到通过websocket传过来的消息结构体
type Message struct {
Id int64 `json:"id,omitempty" form:"id"` //消息ID
//谁发的
Userid interface{} `json:"userid,omitempty" form:"userid"` //谁发的
//什么业务
Cmd int `json:"cmd,omitempty" form:"cmd"` //群聊还是私聊
//发给谁
Dstid int64 `json:"dstid,omitempty" form:"dstid"` //对端用户ID/群ID
//怎么展示
Media int `json:"media,omitempty" form:"media"` //消息按照什么样式展示
//内容是什么
Content string `json:"content,omitempty" form:"content"` //消息的内容
//图片是什么
Pic string `json:"pic,omitempty" form:"pic"` //预览图片
//连接是什么
Url string `json:"url,omitempty" form:"url"` //服务的URL
//简单描述
Memo string `json:"memo,omitempty" form:"memo"` //简单描述
//其他的附加数据,语音长度/红包金额
Amount int `json:"amount,omitempty" form:"amount"` //其他和数字相关的
}
//定义常量,消息类型是单聊,还是群聊,还是心跳
const (
//点对点单聊,dstid是用户ID
CMD_SINGLE_MSG = 10
//群聊消息,dstid是群id
CMD_ROOM_MSG = 11
//心跳消息,不处理
CMD_HEART = 0
)
//这个channel专门来存储udp广播的数据
var udpsendchan chan []byte = make(chan []byte, 2014)
//这个channel专门来存储rabbitmq广播的数据
var mqsendchan chan []byte = make(chan []byte, 2014)
//ws://127.0.0.1/chat?id=1&token=xxx
func Chat(writer http.ResponseWriter, request *http.Request) {
//todo检验token是否合法
query := request.URL.Query() //获取get后面携带的参数,得到的都是字符串
id := query.Get("id")
token := query.Get("token")
//将id转换为int型
userId, _ := strconv.ParseInt(id, 10, 64)
//返回验证结果(验证参数正确性,websocket自带验证,所以我们就用websocket自带的)
isvalida := checkToken(userId, token)
//将当前http连接升级为websocket连接,之后每个协程都有自己的结构体,
//结构体中保存当前协程的websocket连接,管道,群集合等,并且将自己的协程对应的
//结构体bind到全局共享的map中,userid=>node,key=>value的形式,
//并且每个协程都有自己两个子协程,(1)发送子协程:不断的读取当前这个协程对应的结构体
//的管道,是否有数据通过管道传过来,如果有,则通过当前的node.Conn发送出去(管道:保证发送消息的顺序性)
//(2)接收子协程:不断的从node.Conn中读取数据
//
conn, err := (&websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return isvalida
},
}).Upgrade(writer, request, nil)
if (err != nil) {
log.Println(err.Error()) //打印日志
return //结束
}
//todo 获得conn
node := &Node{
Conn: conn,
DataQueue: make(chan []byte, 50), //并行的数据转成串行的数据,有缓存
GroupSets: set.New(set.ThreadSafe), //初始化set,线程安全
}
//获取用户关注的全部群id,之后放入这个用户的node.Groupset中,
//发送群消息的时候,根据群id,遍历所有用户,看看每一个用户是否
//包含这个群id,有则发送
comIds := contactService.SearchComunityIds(userId)
for _, v := range comIds {
node.GroupSets.Add(v)
}
//todo userid和node形成绑定关系,
//由于这个map操作频率非常大,我们要用锁,保证数据安全
rwlocker.Lock()
clientMap[userId] = node
rwlocker.Unlock()
log.Printf("%d客户端连接啦", userId)
//完成发送逻辑
go sendproc(node)
//完成接收逻辑
go recvproc(node, userId)
//往发送协程的channel(有缓存)写数据
sendMsg(userId, []byte("hello world"))
}
//检测是否有效
func checkToken(userid int64, token string) bool {
//从数据库里面查询并比对,todo
token_resp := userService.Find(userid).Token
if (token == token_resp) {
return true
} else {
return false
}
}
//服务端向客户端发送协程
func sendproc(node *Node) {
for {
select {
//一直监听当前这个连接,是否有数据过来,将数据写到node.DataQueue,
//之后从node.DataQueue管道中读取数据,通过node.Conn发送到客户端
case data := <-node.DataQueue:
err := node.Conn.WriteMessage(websocket.TextMessage, data)
if err != nil {
//发送出错
log.Println(err.Error())
return
}
}
}
}
//接收协程
func recvproc(node *Node, userId int64) {
for {
//不断的从websocket连接中读取数据,
_, data, err := node.Conn.ReadMessage()
//websocket关闭时,连接回收
defer node.Conn.Close()
if err != nil {
log.Println(err.Error())
log.Printf("%d客户端websocket连接关闭啦", userId)
//前端断线清理clientMap中的信息
rwlocker.Lock()
delete(clientMap, userId)
rwlocker.Unlock()
return
}
//对通过websocket穿过的data(json格式的[]byte)做进一步处理
switch common.G_config.DistributedType {
case "1":
//单机推送模式
dispatch(data)
case "2":
//udp推送模式
//把消息广播到局域网
broadMsg(data)
case "3":
//rabbitmq推送模式
pushMsg(data)
}
fmt.Printf("recv<=%s\n", data)
}
}
//rabbitmq推送消息,把websocket发过来的消息都通过mq广播出去
func pushMsg(data []byte) {
mqsendchan <- data
}
//todo 添加新的群ID到用户的groupset中
func AddGroupId(userId, gid int64) {
//取得node
rwlocker.Lock()
node, ok := clientMap[userId]
if ok {
node.GroupSets.Add(gid)
}
//clientMap[userId] = node
rwlocker.Unlock()
//添加gid到set
}
//发送消息
func sendMsg(userId int64, msg []byte) {
//对于map,需要判断这个值是不是存在的,对于map操作我们要保证并发线程安全
rwlocker.RLock()
node, ok := clientMap[userId]
rwlocker.RUnlock()
if ok {
//真,这个值存在
node.DataQueue <- msg
}
}
//后端调度逻辑处理(单体架构)
func dispatch(data []byte) {
//解析data为message
msg := Message{}
//将传过来的[]byte数据赋值到结构体中
err := json.Unmarshal(data, &msg)
if (err != nil) {
log.Println(err.Error())
return
}
//根据cmd对逻辑进行处理
switch msg.Cmd {
case CMD_SINGLE_MSG:
//单聊
sendMsg(msg.Dstid, data)
case CMD_ROOM_MSG:
//群聊,遍历所有的clientmap
for _, v := range clientMap {
if v.GroupSets.Has(msg.Dstid) {
v.DataQueue <- data
}
}
case CMD_HEART:
//todo心跳,保证websocket长连接一直存在,一般啥都不做
}
}
//分布式:把消息广播到局域网:流程为服务端接收协程收到消息后,
//把消息放到udp的channel中,之后每一台机器启动的时候都会建立一个udp的发送协程,接收协程
//之后这台机器的udp发送协程从udp的channel中读取数据,将数据写入udp,广播出去,别的所有机器的udp接收协程会收到消息,
//之后将这条消息发到指定用户,有的机器有这个用户,有的机器没有这个用户,没有就不发,有就发,因为用户userid是唯一的
//一个用户只会连接到一台服务器上,可以换成rabbitmq则更好一些,确保消息准确到达
func broadMsg(data []byte) {
udpsendchan <- data
}
//读取配置文件,选择是单机推送,还是分布式udp推送模式,还是分布式rabbitmq推送模式
func InitPush() {
switch common.G_config.DistributedType {
case "2":
//udp广播
go udpsendproc()
go udprecvproc()
case "3":
//(1)服务启动时候,创建交换机
rabbitmq_create_exchange()
//(2)rabbitmq发送协程,一直读取mqsendchan,有消息之后就投递到mq
go rabbitmq_send()
//(3)rabbitmq接收协程
go rabbitmq_recv()
}
}
//rabbitmq创建交换机
func rabbitmq_create_exchange() {
rabbitmq_user := common.G_config.Distributed_rabbitmq_user
rabbitmq_password := common.G_config.Distributed_rabbitmq_password
rabbitmq_ip := common.G_config.Distributed_rabbitmq_ip
rabbitmq_port := common.G_config.Distributed_rabbitmq_port
conn, err := amqp.Dial("amqp://" + rabbitmq_user + ":" + rabbitmq_password + "@" + rabbitmq_ip + ":" + rabbitmq_port + "/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"zhj-exchange", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
}
//rabbitmq接收协程
func rabbitmq_recv() {
rabbitmq_user := common.G_config.Distributed_rabbitmq_user
rabbitmq_password := common.G_config.Distributed_rabbitmq_password
rabbitmq_ip := common.G_config.Distributed_rabbitmq_ip
rabbitmq_port := common.G_config.Distributed_rabbitmq_port
conn, err := amqp.Dial("amqp://" + rabbitmq_user + ":" + rabbitmq_password + "@" + rabbitmq_ip + ":" + rabbitmq_port + "/")
defer conn.Close()
failOnError(err, "连接mq失败")
log.Println("rabbitmq连接成功!")
ch, err := conn.Channel()
failOnError(err, "创建mq-channel失败")
log.Println("创建mq-channel成功")
defer ch.Close()
err = ch.ExchangeDeclare(
"zhj-exchange", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "声明交换机失败")
log.Println("声明交换机成功")
q, err := ch.QueueDeclare(
"zhj-queue-2", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "声明队列失败")
log.Println("声明队列成功")
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"zhj-exchange", // exchange
false,
nil)
failOnError(err, "bind到交换机失败")
log.Println("bind到交换机成功")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "消费mq消息失败")
log.Println("消费mq消息成功")
forever := make(chan bool)
go func() {
for d := range msgs {
//接收到别的机器发送来的消息
log.Printf(" [x] %s", d.Body)
dispatch(d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
//rabbitmq发送协程
func rabbitmq_send() {
rabbitmq_user := common.G_config.Distributed_rabbitmq_user
rabbitmq_password := common.G_config.Distributed_rabbitmq_password
rabbitmq_ip := common.G_config.Distributed_rabbitmq_ip
rabbitmq_port := common.G_config.Distributed_rabbitmq_port
conn, err := amqp.Dial("amqp://" + rabbitmq_user + ":" + rabbitmq_password + "@" + rabbitmq_ip + ":" + rabbitmq_port + "/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"zhj-exchange", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
//rabbitmq发送协程,一直读取mqsendchan,有消息之后就投递到mq
for {
select {
case body := <-mqsendchan:
//有消息被投递,则将此消息发送至交换机
err = ch.Publish(
"zhj-exchange", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
}
}
//完成udp数据的发送协程
func udpsendproc() {
log.Println("start udpsendproc success")
//使用udp协议拨号(第一个参数:协议,第二个: udp地址(nil本机地址),第三个:能接收到udp消息的地址)//10.33.162.14
net_IPv4_a, _ := strconv.Atoi(common.G_config.Distributed_net_IPv4_a)
net_IPv4_b, _ := strconv.Atoi(common.G_config.Distributed_net_IPv4_b)
net_IPv4_c, _ := strconv.Atoi(common.G_config.Distributed_net_IPv4_c)
netmask, _ := strconv.Atoi(common.G_config.Distributed_netmask)
port, _ := strconv.Atoi(common.G_config.Distributed_port)
// 数据类型需要转换一下
conn, err := net.DialUDP("udp", nil, &net.UDPAddr{IP: net.IPv4(uint8(net_IPv4_a), uint8(net_IPv4_b), uint8(net_IPv4_c), uint8(netmask)), Port: port,})
if err != nil {
log.Println(err.Error())
return
}
defer conn.Close()
//通过conn发送消息,广播出去
for {
select {
case data := <-udpsendchan:
_, err := conn.Write(data)
if err != nil {
log.Println(err.Error())
return
}
}
}
}
//完成udp接收协程
func udprecvproc() {
port, err := strconv.Atoi(common.G_config.Distributed_port)
log.Println("start udprecvproc success")
//监听udp广播端口
conn, err := net.ListenUDP("udp", &net.UDPAddr{
IP: net.IPv4zero,
Port: port,
})
defer conn.Close()
if err != nil {
log.Println(err.Error())
return
}
//处理端口发送过来的消息
for {
var buf [512]byte
n, err := conn.Read(buf[0:])
if err != nil {
log.Println(err.Error())
return
}
//直接通过udp传送过来的数据
dispatch(buf[0:n])
}
}
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/jshzhj/Chat.git
git@gitee.com:jshzhj/Chat.git
jshzhj
Chat
Chat
master

搜索帮助