1 Star 0 Fork 2

何吕 / volantmq

forked from JUMEI_ARCH / volantmq 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
receiver.go 4.36 KB
一键复制 编辑 原始数据 按行查看 历史
hawklin 提交于 2018-06-20 20:14 . bugfix
package connection
import (
"bufio"
"encoding/binary"
"sync/atomic"
"errors"
"github.com/VolantMQ/volantmq/packet"
"github.com/VolantMQ/volantmq/trace"
"github.com/troian/easygo/netpoll"
"go.uber.org/zap"
)
func (s *Type) keepAliveExpired() {
s.log.Warn("while keepalive timeout, close conn", zap.String("clientID", s.ID), zap.String("remote", s.remoteAddr), zap.Duration("time", s.keepAlive))
s.onConnectionClose(true, packet.CodeKeepAliveTimeout)
}
func (s *Type) rxRun(event netpoll.Event) {
defer func() {
criticalErr := recover()
if criticalErr != nil {
s.log.Error("Receiver encountered a critical error",
zap.String("clientID", s.ID),
zap.Error(criticalErr.(error)),
zap.Any("event", event),
zap.Any("conn", s.Conn),
)
}
}()
select {
case <-s.quit:
return
default:
}
if atomic.CompareAndSwapUint32(&s.rxRunning, 0, 1) {
s.rxWg.Wait()
s.rxWg.Add(1)
exit := false
if event&(netpoll.EventReadHup|netpoll.EventWriteHup|netpoll.EventHup|netpoll.EventErr) != 0 {
if event&netpoll.EventRead == 0 {
exit = true
} else {
s.log.Debug("data need to finish reading", zap.String("client_id", s.ID), zap.String("client_addr", s.remoteAddr), zap.Any("event", event.String()))
}
}
if exit {
s.log.Debug("client exit.", zap.Any("error_reason", event.String()),
zap.String("client_id", s.ID), zap.String("client_addr", s.remoteAddr))
s.rxRoutine(exit)
} else {
s.log.Debug("starting receiver",
zap.String("client_id", s.ID), zap.String("client_addr", s.remoteAddr))
go s.rxRoutine(exit)
}
}
}
func (s *Type) rxRoutine(exit bool) {
var err error
defer func() {
s.rxWg.Done()
if err != nil {
if _, ok := err.(packet.ReasonCode); !ok {
err = nil
}
s.onConnectionClose(s.will, err)
}
}()
if exit {
err = errors.New("disconnect")
return
}
buf := bufio.NewReader(s.Conn)
for atomic.LoadUint32(&s.rxRunning) == 1 {
if s.keepAlive > 0 {
s.keepAliveTimer.Reset(s.keepAlive)
}
var pkt packet.Provider
if pkt, err = s.readPacket(buf); err == nil {
err = s.processIncoming(pkt)
}
if err != nil {
if err.Error() != "disconnect" {
s.log.Error("read packet fail", zap.String("client_id", s.ID), zap.String("remote", s.Conn.RemoteAddr().String()), zap.Error(err))
}
atomic.StoreUint32(&s.rxRunning, 0)
}
}
if _, ok := err.(packet.ReasonCode); !ok {
s.EventPoll.Resume(s.Desc)
}
}
func (s *Type) readPacket(buf *bufio.Reader) (packet.Provider, error) {
var err error
if len(s.rxRecv) == 0 {
var header []byte
peekCount := 2
// Let's read enough bytes to get the fixed header/fh (msg type/flags, remaining length)
for {
// max length of fh is 5 bytes
// if we have read 5 bytes and still not done report protocol error and exit
if peekCount > 5 {
return nil, packet.CodeProtocolError
}
if header, err = buf.Peek(peekCount); err != nil {
return nil, err
}
// If not enough bytes are returned, then continue until there's enough.
if len(header) < peekCount {
continue
}
// If we got enough bytes, then check the last byte to see if the continuation
// bit is set. If so, increment cnt and continue peeking
if header[peekCount-1] >= 0x80 {
peekCount++
} else {
break
}
}
// Get the remaining length of the message
remLen, m := binary.Uvarint(header[1:])
// Total message length is remlen + 1 (msg type) + m (remlen bytes)
s.rxRemaining = int(remLen) + 1 + m
s.rxRecv = make([]byte, s.rxRemaining)
}
if s.rxRemaining > int(s.MaxRxPacketSize) {
return nil, packet.CodePacketTooLarge
}
offset := len(s.rxRecv) - s.rxRemaining
for offset != s.rxRemaining {
var n int
if n, err = buf.Read(s.rxRecv[offset:]); err != nil {
return nil, err
}
offset += n
}
var pkt packet.Provider
pkt, _, err = packet.Decode(s.Version, s.rxRecv)
if err != nil {
return nil, err
}
topic := ""
if _p, ok := pkt.(*packet.Publish); ok {
topic = _p.Topic()
s.Metric.Messages().Received(_p.QoS())
}
if trace.GetInstance().Status(s.ID, topic) {
s.log.Info("receive packet", zap.String("type", pkt.Type().Desc()), zap.String("topic", topic), zap.String("clientID", s.ID))
} else {
s.log.Debug("receive packet", zap.String("type", pkt.Type().Desc()), zap.String("topic", topic), zap.String("clientID", s.ID))
}
s.Metric.Packets().Received(pkt.Type())
s.rxRecv = []byte{}
s.rxRemaining = 0
return pkt, nil
}
Go
1
https://gitee.com/kaifazhe/volantmq.git
git@gitee.com:kaifazhe/volantmq.git
kaifazhe
volantmq
volantmq
v0.0.4

搜索帮助