1 Star 0 Fork 0

h79 / goim

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
serve.go 5.40 KB
一键复制 编辑 原始数据 按行查看 历史
huqiuyun 提交于 2024-01-04 12:12 . alarm
package im
import (
"context"
"fmt"
"gitee.com/h79/goim/chat"
commonconfig "gitee.com/h79/goutils/common/config"
"gitee.com/h79/goutils/common/logger"
"gitee.com/h79/goutils/common/system"
"github.com/gorilla/websocket"
"net/http"
"os"
"strings"
"sync"
"time"
)
var (
// @see https://github.com/gorilla/websocket/issues/523
_Upgrader = websocket.Upgrader{
HandshakeTimeout: time.Second * 10,
ReadBufferSize: 4096 * 2,
WriteBufferSize: 4096 * 2,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
imInst *Hub
imOnce sync.Once
)
func Inst(opts ...Option) *Hub {
imOnce.Do(func() {
imInst = NewHub(20, opts...)
})
return imInst
}
func parseSource(conf Config) string {
host := strings.Split(conf.ServerAddr.Host, ".")
return fmt.Sprintf("%s:%v:%s:%s", commonconfig.Source, os.Getpid(), host[2], host[3])
}
func (hub *Hub) Init(cfg Config) {
Register(ETClientHeartBeat.Int(), heartBeat)
Register(ETClientLogin.Int(), login)
Register(ETClientLogout.Int(), logout)
hub.config = cfg
hub.config.host = cfg.ServerAddr.To()
hub.source = parseSource(hub.config)
if hub.config.TokenCheckFunc == nil {
hub.config.TokenCheckFunc = func(ctx context.Context, client *Client, token string) error {
return nil
}
}
hub.start()
}
func (hub *Hub) UnInit() {
hub.stop = true
}
func (hub *Hub) Handler(ctx context.Context, info ConnectInfo, w http.ResponseWriter, r *http.Request) error {
c, err := _Upgrader.Upgrade(w, r, nil)
if err != nil {
return err
}
hub.process(ctx, info, c)
return nil
}
func (hub *Hub) process(ctx context.Context, info ConnectInfo, conn *websocket.Conn) {
addr := conn.RemoteAddr().String()
currentTime := time.Now().Unix()
traceId := logger.TraceId(ctx)
cli := NewClient(hub, conn, info, addr, hub.inrClientIndex(), currentTime)
logger.I("IM", "new Client client=> %v,traceId= %s", cli, traceId)
if hub.config.RegisterClient != 0 {
hub.DoClientCmd(ctx, &ClientCmd{TraceId: traceId, Cmd: CmdRegister, Client: cli, Reason: info.Reason})
} else {
hub.DoClientCmdSync(ctx, &ClientCmd{TraceId: traceId, Cmd: CmdRegister, Client: cli, Reason: info.Reason})
}
cli.Run()
logger.I("IM", "new Client start go running, client=> %v, traceId= %s", cli, traceId)
}
func (hub *Hub) GetServerHost() string {
return hub.config.ServerAddr.Host
}
func (hub *Hub) GetServerPort() int {
return hub.config.ServerAddr.Port
}
const localHost = "127.0.0.1"
const localHostString = "localhost"
func (hub *Hub) IsLocalAddress(server string) bool {
return strings.EqualFold(localHostString, server) ||
strings.EqualFold(localHost, server) ||
strings.EqualFold(hub.config.host, server)
}
func (hub *Hub) Alarm(ctx context.Context, code int32, level int32, title, detail string, err error) {
if system.IsNormalQuit() {
return
}
logger.W("IM", "alarm detail: %s, err= %v", detail, err)
if hub.config.AlarmFunc != nil {
hub.config.AlarmFunc(ctx, code, level, title, detail, err)
}
}
func (hub *Hub) Receive(ctx context.Context, from *Session, to *Session, ev *Event) (err error) {
if IsMQEvent(ev.EventType) {
if hub.mq != nil {
hub.mq.Receive(ctx, from, ev)
}
} else if IsClientEvent(ev.EventType) {
err, _ = hub.SendToLocalUser(ctx, from, to, ev)
} else if IsServerEvent(ev.EventType) {
ev.EventType -= ETServerBegin.Int()
hub.receiveToLocal(ctx, from, to, ev)
} else if IsExServerEvent(ev.EventType) {
ev.EventType -= ETExServerBegin.Int()
hub.receiveFromExServer(ctx, from, to, ev, func() {
err = hub.SendToUser(ctx, from, to, ev)
})
}
// 用户状态需要更新
if IsUserStatusChanged(ev.EventType) {
hub.receiveToLocal(ctx, from, to, ev)
}
return
}
func (hub *Hub) ReceiveAll(ctx context.Context, from *Session, to *Session, ev *Event) (err error) {
if IsMQEvent(ev.EventType) {
if hub.mq != nil {
hub.mq.Receive(ctx, from, ev)
}
} else if IsClientEvent(ev.EventType) {
hub.SendToLocalAppId(ctx, from, to, ev, nil)
} else if IsServerEvent(ev.EventType) {
ev.EventType -= ETServerBegin.Int()
hub.receiveToLocal(ctx, from, to, ev)
} else if IsExServerEvent(ev.EventType) {
ev.EventType -= ETExServerBegin.Int()
hub.receiveFromExServer(ctx, from, to, ev, func() {
hub.SendToAllUser(ctx, from, to, ev, nil)
})
}
// 用户状态需要更新
if IsUserStatusChanged(ev.EventType) {
hub.receiveToLocal(ctx, from, to, ev)
}
return
}
// receiveToLocal 收到服务器之间的消息
func (hub *Hub) receiveToLocal(ctx context.Context, from *Session, to *Session, ev *Event) {
switch EventType(ev.EventType) {
case ETClientUserStatusChanged:
hub.receiveUserStatusChanged(ctx, from, ev)
case ETChat:
if err := hub.SendChat(ctx, from, ev, chat.Option{SaveEnabled: false}, func() {
//not handler
}); err != nil {
logger.E("IM", "SendChat failure, ev= %#v, err= %v, traceId= %s", ev, err, logger.TraceId(ctx))
}
}
}
// receiveFromExServer 收到外服务器的消息
func (hub *Hub) receiveFromExServer(ctx context.Context, from *Session, to *Session, ev *Event, defHandler func()) {
switch EventType(ev.EventType) {
case ETClientUserStatusChanged:
hub.receiveUserStatusChanged(ctx, from, ev)
case ETChat:
if err := hub.SendChat(ctx, from, ev, chat.Option{SyncData: true, SyncVersion: true, SaveEnabled: false}, func() {
// 同步到其它服务器
hub.SyncToRemoteOnly(ctx, from, to, ev)
}); err != nil {
logger.E("IM", "SendChat failure, ev= %#v, err= %v,traceId= %s", ev, err, logger.TraceId(ctx))
}
default:
defHandler()
}
}
Go
1
https://gitee.com/h79/goim.git
git@gitee.com:h79/goim.git
h79
goim
goim
v0.3.7

搜索帮助