1 Star 0 Fork 0

h79/goim

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
serve.go 6.56 KB
一键复制 编辑 原始数据 按行查看 历史
huqiuyun 提交于 2026-02-03 12:16 +08:00 . 0.5.16 & MQ消息指定点发送
package im
import (
"context"
"fmt"
"net/http"
"os"
"strings"
"time"
"gitee.com/h79/goim/chat"
"gitee.com/h79/goim/internal"
"gitee.com/h79/goim/session"
commonconfig "gitee.com/h79/goutils/common/config"
"gitee.com/h79/goutils/common/logger"
"gitee.com/h79/goutils/common/queue"
"gitee.com/h79/goutils/common/server"
"gitee.com/h79/goutils/common/system"
"gitee.com/h79/goutils/discovery/resolver/builder"
"github.com/gorilla/websocket"
)
var (
inst *Hub
)
func parseSource(addr string) string {
host := strings.Split(addr, ".")
return fmt.Sprintf("%s:%d:%s:%s", commonconfig.Source, os.Getpid(), host[2], host[3])
}
func Inst() *Hub {
return inst
}
func NewInstance(cfg Config, workMod int, opts ...Option) *Hub {
inst = NewHub(cfg, workMod, opts...)
return inst
}
func NewHub(cfg Config, workMod int, opts ...Option) *Hub {
if cfg.HeartbeatDuration == 0 {
cfg.HeartbeatDuration = 120
}
if cfg.HeartbeatInterval == 0 {
cfg.HeartbeatInterval = 90
}
hub := &Hub{
chat: nil,
user: nil,
rpcClient: nil,
workMod: workMod,
appMap: make(map[string]struct{}),
connects: make(map[*Connect]struct{}),
userClient: make(map[string]*UserClient),
broadcastStatus: internal.NewObject[*Status](queue.NewCircular(broadcastStatusMax, nil)),
broadcast: internal.NewObject[*localData](queue.NewCircular(broadcastMax, nil)),
connChan: make(chan *ConnectCmd, clientCountMax),
groupMgr: nil,
mq: nil,
config: cfg,
host: "",
target: builder.Target{Scheme: "dim", Authority: "server"},
process: NewProcess(),
// @see https://github.com/gorilla/websocket/issues/523
upgrader: &websocket.Upgrader{
HandshakeTimeout: time.Second * 10,
ReadBufferSize: 4096 * 2,
WriteBufferSize: 4096 * 2,
CheckOrigin: func(r *http.Request) bool {
return true
},
},
}
for i := range opts {
opts[i](hub)
}
// workMod=default 20个组,每个client根据index取20的模
for i := 0; i < hub.workMod; i++ {
wk := newWork(i, hub)
hub.works = append(hub.works, wk)
wk.Start()
}
hub.process.RegisterDispose(ETClientHeartBeat.Int(), heartBeat)
hub.process.RegisterDispose(ETClientLogin.Int(), login)
hub.process.RegisterDispose(ETClientLogout.Int(), logout)
return hub
}
func (hub *Hub) Start(addr server.Address) {
if hub == nil {
return
}
hub.host = addr.To()
hub.source = parseSource(addr.Host)
hub.start()
}
func (hub *Hub) Stop() {
if hub == nil {
return
}
hub.stop = true
}
func (hub *Hub) Handler(ctx context.Context, version, remoteIP string, w http.ResponseWriter, r *http.Request) (*Connect, error) {
ws, err := hub.upgrader.Upgrade(w, r, nil)
if err != nil {
return nil, err
}
addr := ws.RemoteAddr().String()
currentTime := time.Now().Unix()
traceId := logger.TraceId(ctx)
conn := NewConnect(hub, ws, version, remoteIP, addr, hub.inrClientIndex(), currentTime, WithConnHeartbeatDuration(hub.config.HeartbeatDuration))
logger.I("IM", "new Connector conn=> %v,traceId= %s", conn, traceId)
hub.doConnectCmdSync(ctx, &ConnectCmd{TraceId: traceId, Cmd: CmdCreate, Conn: conn, Reason: ""})
conn.Run()
logger.I("IM", "new Connector start go running, client=> %v, traceId= %s", conn, traceId)
return conn, nil
}
func (hub *Hub) GetServerHost() string {
return hub.serverAddr.Host
}
func (hub *Hub) GetServerPort() int {
return hub.serverAddr.Port
}
const localHost = "127.0.0.1"
const localHostString = "localhost"
func (hub *Hub) IsLocalAddress(server string) bool {
return strings.EqualFold(localHostString, server) ||
strings.EqualFold(localHost, server) ||
strings.EqualFold(hub.host, server)
}
func (hub *Hub) Alarm(ctx context.Context, code int32, level int32, title, detail string, err error) {
if system.IsNormalQuit() {
return
}
logger.W("IM", "alarm detail: %s, err= %v", detail, err)
if hub.alarm != nil {
hub.alarm.Alarm(ctx, code, level, title, detail, err)
}
}
func (hub *Hub) Receive(ctx context.Context, from *session.Session, to *session.Session, ev *Event) (err error) {
if hub == nil {
return
}
if IsMQEvent(ev.EventType) {
if hub.mq != nil {
hub.mq.Receive(ctx, from, to, ev)
}
} else if IsClientEvent(ev.EventType) {
err, _ = hub.SendToLocalUser(ctx, from, to, ev)
} else if IsServerEvent(ev.EventType) {
ev.EventType -= ETServerBegin.Int()
hub.receiveToLocal(ctx, from, to, ev)
} else if IsExServerEvent(ev.EventType) {
ev.EventType -= ETExServerBegin.Int()
hub.receiveFromExServer(ctx, from, to, ev, func() {
err = hub.SendToUser(ctx, from, to, ev)
})
}
// 用户状态需要更新
if IsUserStatusChanged(ev.EventType) {
hub.receiveToLocal(ctx, from, to, ev)
}
return
}
func (hub *Hub) ReceiveAll(ctx context.Context, from *session.Session, to *session.Session, ev *Event) (err error) {
if hub == nil {
return
}
if IsMQEvent(ev.EventType) {
if hub.mq != nil {
hub.mq.Receive(ctx, from, to, ev)
}
} else if IsClientEvent(ev.EventType) {
hub.SendToLocalAppId(ctx, from, to, ev, emptyPayload)
} else if IsServerEvent(ev.EventType) {
ev.EventType -= ETServerBegin.Int()
hub.receiveToLocal(ctx, from, to, ev)
} else if IsExServerEvent(ev.EventType) {
ev.EventType -= ETExServerBegin.Int()
hub.receiveFromExServer(ctx, from, to, ev, func() {
hub.SendToAllUser(ctx, from, to, ev)
})
}
// 用户状态需要更新
if IsUserStatusChanged(ev.EventType) {
hub.receiveToLocal(ctx, from, to, ev)
}
return
}
// receiveToLocal 收到服务器之间的消息
func (hub *Hub) receiveToLocal(ctx context.Context, from *session.Session, to *session.Session, ev *Event) {
switch EventType(ev.EventType) {
case ETClientUserStatusChanged:
hub.receiveUserStatusChanged(ctx, from, ev)
case ETChat:
if err := hub.SendChat(ctx, from, ev, chat.Option{SaveEnabled: false}, func() {
//not handler
}); err != nil {
logger.E("IM", "SendChat failure, ev= %#v, err= %v, traceId= %s", ev, err, logger.TraceId(ctx))
}
}
}
// receiveFromExServer 收到外服务器的消息
func (hub *Hub) receiveFromExServer(ctx context.Context, from *session.Session, to *session.Session, ev *Event, defHandler func()) {
switch EventType(ev.EventType) {
case ETClientUserStatusChanged:
hub.receiveUserStatusChanged(ctx, from, ev)
case ETChat:
if err := hub.SendChat(ctx, from, ev, chat.Option{SyncData: true, SyncVersion: true, SaveEnabled: false}, func() {
// 同步到其它服务器
hub.SyncToRemoteOnly(ctx, from, to, ev)
}); err != nil {
logger.E("IM", "SendChat failure, ev= %#v, err= %v,traceId= %s", ev, err, logger.TraceId(ctx))
}
default:
defHandler()
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/h79/goim.git
git@gitee.com:h79/goim.git
h79
goim
goim
v0.5.16

搜索帮助