代码拉取完成,页面将自动刷新
package im
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"gitee.com/h79/goim/session"
"gitee.com/h79/goutils/common/algorithm"
"gitee.com/h79/goutils/common/logger"
"gitee.com/h79/goutils/common/random"
"gitee.com/h79/goutils/common/result"
"gitee.com/h79/goutils/common/system"
"gitee.com/h79/goutils/common/timer"
"github.com/gorilla/websocket"
)
var (
writeWait = 10 * time.Second
pongWait = 60 * time.Second
pingPeriod = (pongWait * 9) / 10
)
const (
readCountMax = 500
highCountMax = 500
writeCountMax = 2500
CmdCreate = 1
CmdDestroy = 2
CmdClose = 3
CmdNotSession = 4
)
type ConnectCmd struct {
Cmd int
Reason string
TraceId string
Conn *Connect
}
type Handler interface {
// HeartbeatSynDuration 心跳同步时长(单位秒)
HeartbeatSynDuration() int64
Source() string
Process() Process
// OnReady 链接准备好了
OnReady(ctx context.Context, conn *Connect)
// OnClosing close之前
OnClosing(ctx context.Context, conn *Connect)
OnClosed(ctx context.Context, conn *Connect)
OnDestroy(ctx context.Context, conn *Connect)
// OnHeartbeat websocket发生心跳的回调
OnHeartbeat(ctx context.Context, conn *Connect, timeAt int64)
// OnLogin 用户
OnLogin(ctx context.Context, user *session.Session, conn *Connect, timeAt int64)
OnLogout(ctx context.Context, user *session.Session, conn *Connect, timeAt int64)
}
type Client interface {
GetSource() string
GetIndex() int64
GetHeartTime() int64
GetSyncTime() int64
GetConnectId() uint64
GetAddr() string
GetRemoteIP() string
GetConnectTime() int64
IsClosed() bool
Close(ctx context.Context, reason string) error
SendTo(msg []byte) error
SendToQueue(ctx context.Context, msg []byte) error
}
type Query interface {
Search(client *Connect) int
}
type QueryFunc func(client *Connect) int
func (f QueryFunc) Search(client *Connect) int {
return f(client)
}
var _ Client = (*Connect)(nil)
type Connect struct {
handler Handler
ws *websocket.Conn // 用户连接
addr string // 客户端地址
remoteIP string
version string
firstTimeString string // 主要为日志输出(不要每次都转化)
firstTime int64 // 首次连接事件 //秒
heartbeatTime int64 // 用户上次心跳时间
heartbeatDuration int64
heartbeatSyncTime int64
index int64
connectId uint64
connClosed int32 // 连路关闭
rwClosed bool // 消息通道关闭
users map[string]*session.Session // 支持在一个链接,可以有多种登录的用户, eg: UserId@AppId/Plat , Account@AppId/Plat
userLock sync.RWMutex // 读写锁
rw sync.RWMutex
writeMu sync.Mutex // 写保护
normalReadCh chan *EventReq
highReadCh chan *EventReq
writeCh chan msgPack
}
// BuildConn for ws connect
func BuildConn(url string, head http.Header, resp func(resp *http.Response), opts ...OptionDial) (*websocket.Conn, error) {
dia := &websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: time.Minute * 2,
}
for _, opt := range opts {
opt(dia)
}
conn, res, err := dia.Dial(url, head)
if err != nil {
logger.E("IM", "dial, url=%s, err= %v", url, err)
return nil, err
}
if resp != nil {
resp(res)
}
return conn, nil
}
func NewConnect(handler Handler, conn *websocket.Conn, version, remoteIp, addr string, index, firstTime int64, opts ...OptionConnect) *Connect {
if conn == nil {
return nil
}
if handler == nil {
panic("handler is nil")
}
cli := &Connect{
handler: handler,
version: version,
remoteIP: remoteIp,
ws: conn,
connectId: algorithm.FnvSum64(addr),
index: index,
addr: addr,
firstTime: firstTime,
heartbeatSyncTime: firstTime,
firstTimeString: timer.UnixToFormat(firstTime),
heartbeatTime: firstTime,
connClosed: 0,
rwClosed: false,
highReadCh: make(chan *EventReq, highCountMax),
normalReadCh: make(chan *EventReq, readCountMax),
writeCh: make(chan msgPack, writeCountMax),
users: make(map[string]*session.Session),
}
for _, opt := range opts {
opt(cli)
}
return cli
}
func (c *Connect) read() {
defer func() {
// close highReadCh,引起 write 函数,读取 SendChan出错,从而 unRegister
c.close("read defer")
}()
var isIgnoreError = func(err error) bool {
if err == nil {
return true
}
if errors.Is(err, websocket.ErrCloseSent) {
return true
}
var e net.Error
if errors.As(err, &e) && e.Timeout() {
return true
}
return false
}
//连接通过调用使用 SetPongHandler 方法设置的处理程序函数来处理接收到的 pong 消息。
//默认的 pong 处理程序不执行任何操作。如果应用程序发送 ping 消息,则应用程序应设置 pong 处理程序来接收相应的 pong。
c.ws.SetPongHandler(func(pong string) error {
c.onHeartbeat(context.Background(), time.Now().Unix())
return nil
})
//连接通过调用使用 SetPingHandler 方法设置的处理程序函数来处理接收到的 ping 消息。
// 默认 ping 处理程序向对等方发送 pong 消息。
c.ws.SetPingHandler(func(message string) error {
err := c.ws.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(writeWait))
if isIgnoreError(err) {
c.onHeartbeat(context.Background(), time.Now().Unix())
return nil
}
logger.E("IM", "PingMessage data= %s, err= %s", message, err)
return err
})
for {
msgType, msg, err := c.ws.ReadMessage()
if err != nil {
logger.E("IM", "read message failure, client=> %v, err= %v", c, err)
break
}
if msgType > 0 && len(msg) > 0 {
ev, err := c.unmarshalRead(msg)
if err != nil {
continue
}
if c.highEvent(ev.EventType) {
c.highReadCh <- ev
} else {
c.normalReadCh <- ev
}
}
if system.IsQuit() {
break
}
}
}
func (c *Connect) highEvent(ev int32) bool {
return ev == ETMessage.Int() ||
ev == ETClientLogin.Int() ||
ev == ETClientLogout.Int()
}
func (c *Connect) write() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
ctx := NewWithTraceId("writeDefer:" + random.GenerateNumString(16))
_ = c.Close(ctx, "write defer")
c.handler.OnDestroy(ctx, c)
}()
ctx := NewWithTraceId("connectReady:" + random.GenerateNumString(16))
c.handler.OnReady(ctx, c)
var err error
for {
select {
case msg, ok := <-c.writeCh:
if !ok {
logger.E("IM", "write chan is closed, client=> %v", c)
return
}
err = c.SendTo(msg.Body)
if err != nil {
logger.E("IM", "write message failure, client=> %v, traceId= %s, err= %v", c, msg.TraceId, err)
return
}
case msg, ok := <-c.highReadCh:
if !ok {
logger.E("IM", "read(high) chan is closed, client=> %v", c)
return
}
c.processEvent(msg)
case msg, ok := <-c.normalReadCh:
if !ok {
logger.E("IM", "read chan is closed, client=> %v", c)
return
}
c.processEvent(msg)
case _ = <-ticker.C:
//心跳包 下面ping出错就会报错退出;断开这个连接
err = c.ws.WriteControl(websocket.PingMessage, []byte("ping"), time.Now().Add(writeWait))
if err != nil {
logger.E("IM", "ping message failure, client=> %v, err= %v", c, err)
}
if c.autoClose() {
logger.E("IM", "not session to auto close, client=> %v", c)
return
}
case _, ok := <-system.Closed():
if !ok {
return
}
}
}
}
func (c *Connect) close(reason string) {
c.rw.Lock()
if c.rwClosed {
c.rw.Unlock()
return
}
c.rwClosed = true
c.rw.Unlock()
// only close once, maybe message lost
close(c.normalReadCh)
close(c.highReadCh)
logger.W("IM", "chan is closed, client=> %v, reason= '%v'", c, reason)
}
func (c *Connect) isRWClosed() bool {
c.rw.RLock()
defer c.rw.RUnlock()
return c.rwClosed
}
func (c *Connect) isHeartbeatTimeout(currentTime int64) bool {
//2分钟
return c.heartbeatTime+c.heartbeatDuration <= currentTime
}
func (c *Connect) onHeartbeat(ctx context.Context, currentTime int64) {
c.heartbeatTime = currentTime
if currentTime-c.heartbeatSyncTime <= c.getHeartbeatSyncDuration() {
//1分半钟同步一次
return
}
c.heartbeatSyncTime = currentTime
c.handler.OnHeartbeat(ctx, c, currentTime)
}
func (c *Connect) getHeartbeatSyncDuration() int64 {
return c.handler.HeartbeatSynDuration()
}
func (c *Connect) Run() {
go func() {
defer system.Recover()
c.write()
}()
go func() {
defer system.Recover()
c.read()
}()
}
func (c *Connect) Close(ctx context.Context, reason string) error {
if atomic.AddInt32(&c.connClosed, 1) != 1 {
return errConnClose
}
c.handler.OnClosing(ctx, c)
var err = c.ws.Close()
c.handler.OnClosed(ctx, c)
logger.W("IM", "the connect is closed, reason= '%v', client=> %v, err= '%v', traceId= %s", reason, c, err, logger.TraceId(ctx))
return err
}
func (c *Connect) SendTo(msg []byte) error {
if c == nil || len(msg) <= 0 {
return nil
}
if c.IsClosed() {
return errConnClose
}
if c.handler.Process().HasMessageLog(SMsgLog) {
logger.N("IM", "send msg= '%s'", string(msg))
}
// https://pkg.go.dev/github.com/gorilla/websocket?utm_source=godoc#hdr-Concurrency
// Connections support one concurrent reader and one concurrent writer.
c.writeMu.Lock()
defer c.writeMu.Unlock()
return c.ws.WriteMessage(websocket.TextMessage, msg)
}
func (c *Connect) SendToQueue(ctx context.Context, msg []byte) error {
if c == nil || len(msg) <= 0 {
return result.RErrParam
}
// 在一定的范围内,保持write channel协程安全
// 大多协程下,也会引起 write channel panic, because it is closed
// 如果使用 mutex,感觉代价有点高,容易阻塞
if c.IsClosed() {
return errConnectClosed
}
c.writeCh <- msgPack{TraceId: logger.TraceId(ctx), Body: msg}
return nil
}
// Subscribe send subscribe pack
func (c *Connect) Subscribe(ctx context.Context, session *session.Session, topic ...string) error {
top := strings.Join(topic, ",")
if top == "" {
panic("topic is empty")
}
pack := CreateObjEvent(logger.TraceId(ctx), ETMq, MessageMQ{Cmd: SubscribeMQ, Topic: top})
return c.SendTo(RequestEvent(c.GetSource(), pack, session).ToBytes())
}
// UnSubscribe send unsubscribe pack
func (c *Connect) UnSubscribe(ctx context.Context, session *session.Session, topic ...string) error {
top := strings.Join(topic, ",")
if top == "" {
panic("topic is empty")
}
pack := CreateObjEvent(logger.TraceId(ctx), ETMq, MessageMQ{Cmd: UnSubscribeMQ, Topic: top})
return c.SendTo(RequestEvent(c.GetSource(), pack, session).ToBytes())
}
// SendMQ send mq pack
func (c *Connect) SendMQ(ctx context.Context, session *session.Session, topic string, content string) error {
if topic == "" {
panic("topic is empty")
}
pack := CreateObjEvent(logger.TraceId(ctx), ETMq, MessageMQ{Cmd: SendMQ, Topic: topic, Content: content})
return c.SendTo(RequestEvent(c.GetSource(), pack, session).ToBytes())
}
func (c *Connect) Enter(ctx context.Context, user *session.Session, loginTime int64) {
logger.D("IM", "client enter in conn=> %v, traceId= %s", c, logger.TraceId(ctx))
c.heartbeatTime = loginTime
c.handler.OnLogin(ctx, user, c, loginTime)
}
func (c *Connect) Leave(ctx context.Context, user *session.Session, logoutTime int64) {
logger.D("IM", "client leave conn=> %v, traceId= %s", c, logger.TraceId(ctx))
c.heartbeatTime = logoutTime
c.handler.OnLogout(ctx, user, c, logoutTime)
}
func (c *Connect) GetSource() string {
return c.handler.Source()
}
func (c *Connect) AddSession(key string, se *session.Session) {
if c == nil {
return
}
if key == "" {
panic("key is empty")
}
c.userLock.Lock()
c.users[key] = se
c.userLock.Unlock()
}
func (c *Connect) RemoveSession(key string) int {
if c == nil {
return 0
}
c.userLock.Lock()
defer c.userLock.Unlock()
if key != "" {
delete(c.users, key)
}
return len(c.users)
}
func (c *Connect) HasSession() int {
c.userLock.RLock()
defer c.userLock.RUnlock()
return len(c.users)
}
func (c *Connect) autoClose() bool {
c.userLock.RLock()
defer c.userLock.RUnlock()
return len(c.users) == 0
}
func (c *Connect) GetSessionKey(fn func(s *session.Session) string) []string {
if c == nil {
return nil
}
c.userLock.RLock()
defer c.userLock.RUnlock()
ret := make([]string, 0, len(c.users))
for _, s := range c.users {
ret = append(ret, fn(s))
}
return ret
}
func (c *Connect) GetIndex() int64 { // 客户端地址
return c.index
}
func (c *Connect) GetHeartTime() int64 { // 客户端地址
return c.heartbeatTime
}
func (c *Connect) GetSyncTime() int64 { // 客户端地址
return c.heartbeatSyncTime
}
func (c *Connect) GetConnectId() uint64 { // 客户端地址
return c.connectId
}
func (c *Connect) GetAddr() string { // 客户端地址
return c.addr
}
func (c *Connect) GetRemoteIP() string {
return c.remoteIP
}
func (c *Connect) GetConnectTime() int64 {
return c.firstTime
}
func (c *Connect) IsClosed() bool {
return c == nil || c.isRWClosed() || atomic.LoadInt32(&c.connClosed) >= 1
}
func (c *Connect) String() string {
return fmt.Sprintf("( index= '%d', remoteIP= '%s',addr= '%s', rwClosed= '%v' createTime= '%v')",
c.index, c.remoteIP, c.addr, c.rwClosed, c.firstTimeString)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。