1 Star 0 Fork 0

yangtxiang/mg-fw

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
ws_log_writer.go 4.67 KB
一键复制 编辑 原始数据 按行查看 历史
yangtxiang 提交于 2022-08-21 16:42 +08:00 . 增加日志库
package glogger
import (
"context"
"fmt"
"github.com/gorilla/websocket"
"github.com/spf13/cast"
"gopkg.in/mgo.v2/bson"
"log"
"net"
"strings"
"sync/atomic"
"time"
)
const pongTimeOutSec = 90
const writeTimeoutSec = 30
const allowLossPongCount = 3 // 允许丢失的心跳包数
type GWSLogWriter interface {
ID() string
GLogger
}
type wsLogWriter struct {
ctx context.Context
id string
conn *websocket.Conn
closeState int32
pingCount int32
buffer chan *wsLogEventData
}
func newLogWriter(ctx context.Context, conn *websocket.Conn) *wsLogWriter {
result := &wsLogWriter{
ctx: ctx,
id: bson.NewObjectId().Hex(),
conn: conn,
buffer: make(chan *wsLogEventData, 126),
}
atomic.StoreInt32(&result.closeState, 0)
return result
}
func (lw *wsLogWriter) close() error {
if n := atomic.LoadInt32(&lw.closeState); n > 0 {
return nil
}
defer atomic.StoreInt32(&lw.closeState, 1)
return lw.conn.Close()
}
func (lw *wsLogWriter) readLoop() {
defer func() {
// 关闭
if err := lw.close(); err != nil {
fmt.Printf("close error:%s\n", err)
}
}()
lw.conn.SetReadLimit(-1) // 不限制大小
lw.conn.SetPongHandler(func(appData string) error {
lw.pongHandler([]byte(appData))
return nil
})
go func() {
select {
case <-lw.ctx.Done():
if err := lw.close(); err != nil {
log.Printf("close websocket log_writer error:%s\n", err)
}
}
}()
for {
if n := atomic.LoadInt32(&lw.closeState); n > 0 {
return
}
if err := lw.conn.SetReadDeadline(time.Now().Add(pongTimeOutSec * time.Second)); err != nil {
fmt.Printf("setReadDeadline error:%s\n", err)
}
t, data, err := lw.conn.ReadMessage()
if err != nil {
if netError, ok := err.(net.Error); ok {
if netError.Timeout() {
// 超时处理
if n := atomic.LoadInt32(&lw.pingCount); n > allowLossPongCount {
// 丢失3次心跳,断开连接
return
}
break
}
} else {
log.Printf("websocket client error: %v", err)
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
break
}
// 未知错误
return
}
}
switch t {
case websocket.PingMessage:
// ping -> pong
lw.pingHandler(data)
break
case websocket.PongMessage:
// pong
lw.pongHandler(data)
break
case websocket.TextMessage:
s := string(data)
if idx := strings.Index(s, "ping:"); idx != -1 {
pingData := s[idx:]
lw.pingHandler([]byte(pingData))
} else if idx := strings.Index(s, "pong:"); idx != -1 {
pongData := s[idx:]
lw.pongHandler([]byte(pongData))
}
break
}
}
}
func (lw *wsLogWriter) pingHandler(data []byte) {
atomic.StoreInt32(&lw.pingCount, 0)
if err := lw.conn.SetReadDeadline(time.Now().Add(pongTimeOutSec * time.Second)); err != nil {
fmt.Printf("setReadDeadline error:%s\n", err)
}
if err := lw.conn.SetWriteDeadline(time.Now().Add(writeTimeoutSec * time.Second)); err != nil {
fmt.Printf("setWriteDeadline error:%s\n", err)
}
// ping -> pong 返回ping的数据
if err := lw.conn.WriteMessage(websocket.PongMessage, data); err != nil {
fmt.Printf("write pong message error:%s\n", err)
return
}
}
func (lw *wsLogWriter) pongHandler(data []byte) {
// pong
atomic.StoreInt32(&lw.pingCount, 0)
if err := lw.conn.SetReadDeadline(time.Now().Add(pongTimeOutSec * time.Second)); err != nil {
log.Printf("setReadDeadline error:%s\n", err)
}
}
func (lw *wsLogWriter) writeLoop() {
ticker := time.NewTicker(30 * time.Second)
defer func() {
if err := lw.close(); err != nil {
log.Printf("close websocket client error:%s\n", err)
}
}()
for {
if n := atomic.LoadInt32(&lw.closeState); n > 0 {
return
}
select {
case item, ok := <-lw.buffer:
if !ok {
return
}
wsLogEmitEvent(lw.conn, wsLogEventLog, item)
break
case <-ticker.C:
t := time.Now().Unix()
if err := lw.conn.SetWriteDeadline(time.Now().Add(writeTimeoutSec * time.Second)); err != nil {
fmt.Printf("setWriteDeadline error:%s\n", err)
}
if err := lw.conn.WriteMessage(websocket.PingMessage, []byte("ping:"+cast.ToString(t))); err != nil {
return
}
atomic.AddInt32(&lw.pingCount, 1)
}
}
}
func (lw *wsLogWriter) ID() string {
return lw.id
}
func (lw *wsLogWriter) Log(level LogLevel, tag string, data any) {
lw.buffer <- &wsLogEventData{
Level: level,
Tag: tag,
Log: data,
}
}
func (lw *wsLogWriter) Info(tag string, data any) {
lw.Log(LogLevelInfo, tag, data)
}
func (lw *wsLogWriter) Warn(tag string, data any) {
lw.Log(LogLevelWarn, tag, data)
}
func (lw *wsLogWriter) Debug(tag string, data any) {
lw.Log(LogLevelDebug, tag, data)
}
func (lw *wsLogWriter) Error(tag string, data any) {
lw.Log(LogLevelError, tag, data)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/maglsoft/mg-fw.git
git@gitee.com:maglsoft/mg-fw.git
maglsoft
mg-fw
mg-fw
v0.0.5

搜索帮助