1 Star 0 Fork 2

何吕/volantmq

forked from JUMEI_ARCH/volantmq 
Create your Gitee Account
Explore and code with more than 12 million developers,Free private repositories !:)
Sign up
Clone or Download
sessions.go 30.59 KB
Copy Edit Raw Blame History
hawklin authored 2018-06-20 10:58 . bugfix
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044
package clients
import (
"crypto/rand"
"encoding/base64"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"strconv"
"sync"
"sync/atomic"
"time"
"container/list"
"github.com/VolantMQ/persistence"
"github.com/VolantMQ/volantmq/auth"
"github.com/VolantMQ/volantmq/configuration"
"github.com/VolantMQ/volantmq/connection"
"github.com/VolantMQ/volantmq/packet"
"github.com/VolantMQ/volantmq/routines"
"github.com/VolantMQ/volantmq/subscriber"
"github.com/VolantMQ/volantmq/systree"
"github.com/VolantMQ/volantmq/topics/types"
"github.com/VolantMQ/volantmq/trace"
"github.com/VolantMQ/volantmq/types"
"github.com/troian/easygo/netpoll"
"go.uber.org/zap"
)
// load sessions owning subscriptions
type subscriberConfig struct {
version packet.ProtocolVersion
topics subscriber.Subscriptions
}
// Config manager configuration
type Config struct {
TopicsMgr topicsTypes.Provider
Persist persistence.Provider
Systree systree.Provider
OnReplaceAttempt func(string, bool)
NodeName string
ConnectTimeout int
KeepAlive int
MaxPacketSize uint32
ReceiveMax uint16
TopicAliasMaximum uint16
MaximumQoS packet.QosType
AvailableRetain bool
AvailableWildcardSubscription bool
AvailableSubscriptionID bool
AvailableSharedSubscription bool
OfflineQoS0 bool
AllowReplace bool
ForceKeepAlive bool
PersistUseBuffer bool
PersistBufferSyncInterval uint64
MaxSessionCount int64
}
type offlineMessageBufferManager struct {
sm *Manager
syncBuf []persistence.PersistedPacket
// 采用双buf链表, 保证存入和sync时各自使用不同的链表,避免数据冲突.
buf0 *map[string]*types.ListWithLock
buf1 *map[string]*types.ListWithLock
mLock sync.RWMutex
bLock sync.RWMutex
// workingBuf for buffering messages, relatively, another buffer is for synchronizing buffered message to solid storage.
workingBuf *map[string]*types.ListWithLock
}
// Manager clients manager
type Manager struct {
Config
persistence persistence.Sessions
log *zap.Logger
quit chan struct{}
sessionsCount sync.WaitGroup
ombmRtWg sync.WaitGroup
sessions sync.Map
subscribers sync.Map
curClientCount int64
poll netpoll.EventPoll
ombm *offlineMessageBufferManager
boltOps sync.WaitGroup
}
// StartConfig used to reconfigure session after connection is created
type StartConfig struct {
Req *packet.Connect
Resp *packet.ConnAck
Conn net.Conn
Auth auth.SessionPermissions
}
// NewManager create new clients manager
func NewManager(c *Config) (*Manager, error) {
m := &Manager{
Config: *c,
quit: make(chan struct{}),
log: configuration.GetLogger().Named("sessions"),
ombm: &offlineMessageBufferManager{
buf0: &map[string]*types.ListWithLock{},
buf1: &map[string]*types.ListWithLock{},
},
}
m.ombm.sm = m
m.ombm.workingBuf = m.ombm.buf0
m.poll, _ = netpoll.New(nil)
m.persistence, _ = c.Persist.Sessions()
var err error
// load sessions for fill systree
// those sessions having either will delay or expire are created with and timer started
if err = m.loadSessions(); err != nil {
return nil, err
}
m.log.Info("finish loading sessions")
m.ombmRtWg.Add(1)
go m.ombm.syncRoutine()
//m.persistence.StatesWipe() // nolint: errcheck
//m.persistence.SubscriptionsWipe() // nolint: errcheck
return m, nil
}
// Shutdown clients manager
// gracefully shutdown by stopping all active sessions and persist states
func (m *Manager) Shutdown() error {
select {
case <-m.quit:
return errors.New("already stopped")
default:
close(m.quit)
}
m.sessions.Range(func(k, v interface{}) bool {
wrap := v.(*sessionWrap)
state := wrap.s.stop(packet.CodeServerShuttingDown)
m.log.Debug("persisting session state",
zap.String("clientID", k.(string)),
zap.Any("state", state),
)
if state.Expire != nil || len(state.Subscriptions) != 0 {
m.persistence.StateStore([]byte(k.(string)), state) // nolint: errcheck
}
return true
})
m.sessionsCount.Wait()
m.ombmRtWg.Wait()
m.boltOps.Wait()
return nil
}
func (m *Manager) ListOneClient(id string, handle func(id string, remote string, clean bool, user string, create string, subTopics map[string]byte)) error {
select {
case <-m.quit:
return errors.New("already stopped")
default:
}
v, ok := m.sessions.Load(id)
if !ok {
return fmt.Errorf("client(%s) does not exist", id)
}
wrap, ok := v.(*sessionWrap)
if !ok {
return fmt.Errorf("invalid data")
}
conn := wrap.s.conn
sub := conn.Subscriber.ListSubscriptions()
handle(conn.ID, conn.Conn.RemoteAddr().String(), conn.KillOnDisconnect, conn.Username, wrap.s.createdAt.Format(time.RFC3339), sub)
return nil
}
func (m *Manager) ListAllClients(handle func(id string, remote string, clean bool, user string, create string, topics map[string]byte)) error {
select {
case <-m.quit:
return errors.New("already stopped")
default:
}
m.sessions.Range(func(k, v interface{}) bool {
wrap, ok := v.(*sessionWrap)
if !ok {
return true
}
session := wrap.s
conn := session.conn
sub := conn.Subscriber.ListSubscriptions()
handle(conn.ID, conn.Conn.RemoteAddr().String(), conn.KillOnDisconnect, conn.Username, session.createdAt.Format(time.RFC3339), sub)
return true
})
return nil
}
func (m *Manager) KickOff(clientID string) error {
select {
case <-m.quit:
return errors.New("already stopped")
default:
}
v, ok := m.sessions.Load(clientID)
if !ok {
return fmt.Errorf("client(%s) does not exist", clientID)
}
wrap, ok := v.(*sessionWrap)
if !ok {
return fmt.Errorf("invalid data")
}
_ = wrap.s.stop(packet.CodeAdministrativeAction)
m.log.Info("kickOff session",
zap.String("clientID", clientID),
)
return nil
}
// NewSession create new session with provided established connection
// This is god function. Might be try split it
func (m *Manager) NewSession(config *StartConfig) (err error) {
var id string
var ses *sessionWrap
idGenerated := false
// client might come with empty client id
if id = string(config.Req.ClientID()); len(id) == 0 {
id = m.genClientID()
idGenerated = true
}
defer func() {
if err != nil {
var reason packet.ReasonCode
var hasErrReason bool
if reason, hasErrReason = err.(packet.ReasonCode); !hasErrReason {
switch config.Req.Version() {
case packet.ProtocolV50:
reason = packet.CodeUnspecifiedError
default:
reason = packet.CodeRefusedServerUnavailable
}
}
config.Resp.SetReturnCode(reason) // nolint: errcheck
m.log.Error("Session create error.", zap.Error(err), zap.String("clientID", id), zap.String("remote", config.Conn.RemoteAddr().String()))
}
username, _ := config.Req.Credentials()
systreeConnStatus := &systree.ClientConnectStatus{
Username: string(username),
Timestamp: time.Now().Unix(),
Address: config.Conn.RemoteAddr().String(),
Protocol: config.Req.Version(),
ConnAckCode: config.Resp.ReturnCode(),
CleanSession: config.Req.IsClean(),
}
m.Systree.Clients().Connected(id, systreeConnStatus)
if rErr := routines.WriteMessage(config.Conn, config.Resp); rErr != nil {
m.Systree.Clients().Disconnected(id, packet.CodeRefusedServerUnavailable, true)
m.log.Error("Couldn't write CONNACK", zap.String("ClientID", id), zap.Error(rErr), zap.String("remote", config.Conn.RemoteAddr().String()))
} else {
if ses != nil {
m.log.Debug("starting new session", zap.String("ClientID", id), zap.String("remote", config.Conn.RemoteAddr().String()))
ses.s.start()
} else {
m.Systree.Clients().Disconnected(id, config.Resp.ReturnCode(), true)
m.log.Error("client login fail", zap.String("clientID", id), zap.String("reason", config.Resp.ReturnCode().Error()), zap.String("remote", config.Conn.RemoteAddr().String()))
}
}
m.Systree.Metric().Packets().Sent(config.Resp.Type())
if trace.GetInstance().Status(id, "") {
m.log.Info("client connack response", zap.String("clientID", id), zap.String("reason", config.Resp.ReturnCode().Error()), zap.String("remote", config.Conn.RemoteAddr().String()))
} else {
m.log.Debug("client connack response", zap.String("clientID", id), zap.String("reason", config.Resp.ReturnCode().Error()), zap.String("remote", config.Conn.RemoteAddr().String()))
}
if ses != nil {
ses.release()
}
}()
m.checkServerStatus(config.Req.Version(), config.Resp)
// if response has return code differs from CodeSuccess return from this point
// and send connack in deferred statement
if config.Resp.ReturnCode() != packet.CodeSuccess {
return
}
if ses, _, err = m.loadSession(id, config.Req.Version(), config.Resp); err == nil {
if err = m.configureSession(config, ses.s, id, idGenerated); err != nil {
m.log.Debug("Failed to configure session, closing", zap.String("ClientID", id))
}
}
return
}
func (m *Manager) loadSession(id string, v packet.ProtocolVersion, resp *packet.ConnAck) (ses *sessionWrap, presents bool, err error) {
curCnt := atomic.LoadInt64(&m.curClientCount)
if curCnt >= m.MaxSessionCount {
err = packet.CodeQuotaExceeded
m.log.Error("online sessions overflow", zap.Int64("cur", curCnt), zap.Int64("max", m.MaxSessionCount), zap.String("clientID", id))
return
}
wrap := m.allocSession(id, time.Now())
var ss interface{}
if ss, presents = m.sessions.LoadOrStore(id, wrap); presents {
// release lock of newly allocated session as lock from old one will be used
wrap.release()
// there is some old session exists, check if it has active network connection then stop if so
// if it is offline (either waiting for expiration to fire or will event or) switch back to online
// and use this session henceforth
oldWrap := ss.(*sessionWrap)
// lock id to prevent other upcoming session make any changes until we done
oldWrap.acquire()
switch oldWrap.s.setOnline() {
case swStatusIsOnline:
m.log.Warn("old session still online",
zap.String("ClientID", id),
)
// existing session has active network connection
// exempt it if allowed
m.OnReplaceAttempt(id, m.AllowReplace)
if !m.AllowReplace {
// we do not make any changes to current network connection
// response to new one with error and release both new & old sessions
err = packet.CodeRefusedIdentifierRejected
if v >= packet.ProtocolV50 {
err = packet.CodeInvalidClientID
}
oldWrap.release()
} else {
// [MQTT-5]
// session will be replaced with new connection
// stop current active connection
oldWrap.s.stop(packet.CodeSessionTakenOver)
oldWrap.swap(wrap)
m.sessions.Store(id, oldWrap)
m.sessionsCount.Add(1)
atomic.AddInt64(&m.curClientCount, 1)
m.log.Debug("replace old session", zap.String("ClientID", id))
ses = oldWrap
}
case swStatusSwitched:
// session has been turned online successfully
ses = oldWrap
m.log.Debug("reuse old session", zap.String("ClientID", id))
default:
oldWrap.swap(wrap)
ses = oldWrap
m.sessions.Store(id, oldWrap)
m.sessionsCount.Add(1)
atomic.AddInt64(&m.curClientCount, 1)
m.log.Debug("reuse old session", zap.String("ClientID", id))
}
} else {
ses = wrap
m.sessionsCount.Add(1)
atomic.AddInt64(&m.curClientCount, 1)
m.log.Debug("create new session", zap.String("ClientID", id))
}
return
}
func (m *Manager) checkServerStatus(v packet.ProtocolVersion, resp *packet.ConnAck) {
// check first if server is not about to shutdown
// if so just give reject and exit
select {
case <-m.quit:
var reason packet.ReasonCode
switch v {
case packet.ProtocolV50:
reason = packet.CodeServerShuttingDown
// TODO: if cluster route client to another node
default:
reason = packet.CodeRefusedServerUnavailable
}
resp.SetReturnCode(reason) // nolint: errcheck
default:
}
}
func (m *Manager) allocSession(id string, createdAt time.Time) *sessionWrap {
wrap := &sessionWrap{
lock: &sync.Mutex{},
s: newSession(&sessionPreConfig{
id: id,
createdAt: createdAt,
messenger: m.TopicsMgr,
sessionEvents: sessionEvents{
signalClose: m.onSessionClose,
signalDisconnected: m.onDisconnect,
shutdownSubscriber: m.onSubscriberShutdown,
},
})}
wrap.acquire()
return wrap
}
func (m *Manager) getWill(pkt *packet.Connect) *packet.Publish {
var willPkt *packet.Publish
if willTopic, willPayload, willQoS, willRetain, will := pkt.Will(); will {
_m, _ := packet.New(pkt.Version(), packet.PUBLISH)
willPkt = _m.(*packet.Publish)
willPkt.Set(willTopic, willPayload, willQoS, willRetain, false) // nolint: errcheck
}
return willPkt
}
func (m *Manager) newConnectionPreConfig(config *StartConfig) *connection.PreConfig {
username, _ := config.Req.Credentials()
return &connection.PreConfig{
Username: string(username),
Auth: config.Auth,
Conn: config.Conn,
KeepAlive: config.Req.KeepAlive(),
Version: config.Req.Version(),
Desc: netpoll.Must(netpoll.HandleReadOnce(config.Conn)),
MaxTxPacketSize: types.DefaultMaxPacketSize,
SendQuota: types.DefaultReceiveMax,
PersistedSession: m.persistence,
EventPoll: m.poll,
Metric: m.Systree.Metric(),
RetainAvailable: m.AvailableRetain,
OfflineQoS0: m.OfflineQoS0,
MaxRxPacketSize: m.MaxPacketSize,
MaxRxTopicAlias: m.TopicAliasMaximum,
MaxTxTopicAlias: 0,
}
}
func (m *Manager) configureSession(config *StartConfig, ses *session, id string, idGenerated bool) error {
sub, sessionPresent := m.getSubscriber(id, config.Req.IsClean(), config.Req.Version())
sConfig := &sessionReConfig{
subscriber: sub,
will: m.getWill(config.Req),
killOnDisconnect: false,
}
cConfig := m.newConnectionPreConfig(config)
if config.Req.Version() >= packet.ProtocolV50 {
if err := readSessionProperties(config.Req, sConfig, cConfig); err != nil {
return err
}
ids := ""
if idGenerated {
ids = id
}
m.writeSessionProperties(config.Resp, ids)
if err := config.Resp.PropertySet(packet.PropertyServerKeepAlive, m.KeepAlive); err != nil {
return err
}
}
// MQTT v5 has different meaning of clean comparing to MQTT v3
// - v3: if session is clean it lasts when Network connection os close
// - v5: clean means clean start and server must wipe any previously created session with same id
// but keep this one if Network Connection is closed
if (config.Req.Version() <= packet.ProtocolV311 && config.Req.IsClean()) ||
(sConfig.expireIn != nil && *sConfig.expireIn == 0) {
sConfig.killOnDisconnect = true
}
ses.reconfigure(sConfig, false)
if err := ses.allocConnection(cConfig); err == nil {
if config.Req.IsClean() {
go func() {
m.boltOps.Add(1)
defer m.boltOps.Done()
start := time.Now()
err := m.persistence.Delete([]byte(id)) // nolint: errcheck
if err != nil {
m.log.Error("Couldn't wipe session", zap.String("ClientID", id), zap.Error(err))
}
cost := time.Now().Sub(start)
if cost >= 500*time.Millisecond {
m.log.Warn("delete state too slowly", zap.Duration("cost", cost), zap.String("ClientId", id))
}
}()
}
} else {
return err
}
config.Resp.SetSessionPresent(sessionPresent)
return nil
}
func boolToByte(v bool) byte {
if v {
return 1
}
return 0
}
func readSessionProperties(req *packet.Connect, sc *sessionReConfig, cc *connection.PreConfig) (err error) {
// [MQTT-3.1.2.11.2]
if prop := req.PropertyGet(packet.PropertySessionExpiryInterval); prop != nil {
if val, e := prop.AsInt(); e == nil {
sc.expireIn = &val
}
}
// [MQTT-3.1.2.11.3]
if prop := req.PropertyGet(packet.PropertyWillDelayInterval); prop != nil {
if val, e := prop.AsInt(); e == nil {
sc.willDelay = val
}
}
// [MQTT-3.1.2.11.4]
if prop := req.PropertyGet(packet.PropertyReceiveMaximum); prop != nil {
if val, e := prop.AsShort(); e == nil {
cc.SendQuota = int32(val)
}
}
// [MQTT-3.1.2.11.5]
if prop := req.PropertyGet(packet.PropertyMaximumPacketSize); prop != nil {
if val, e := prop.AsInt(); e == nil {
cc.MaxTxPacketSize = val
}
}
// [MQTT-3.1.2.11.6]
if prop := req.PropertyGet(packet.PropertyTopicAliasMaximum); prop != nil {
if val, e := prop.AsShort(); e == nil {
cc.MaxTxTopicAlias = val
}
}
// [MQTT-3.1.2.11.10]
if prop := req.PropertyGet(packet.PropertyAuthMethod); prop != nil {
if val, e := prop.AsString(); e == nil {
cc.AuthMethod = val
}
}
// [MQTT-3.1.2.11.11]
if prop := req.PropertyGet(packet.PropertyAuthData); prop != nil {
if len(cc.AuthMethod) == 0 {
err = packet.CodeProtocolError
return
}
if val, e := prop.AsBinary(); e == nil {
cc.AuthData = val
}
}
return
}
func (m *Manager) writeSessionProperties(resp *packet.ConnAck, id string) {
// [MQTT-3.2.2.3.2] if server receive max less than 65536 than let client to know about
if m.ReceiveMax < types.DefaultReceiveMax {
resp.PropertySet(packet.PropertyReceiveMaximum, m.ReceiveMax) // nolint: errcheck
}
// [MQTT-3.2.2.3.3] if supported server's QoS less than 2 notify client
if m.MaximumQoS < packet.QoS2 {
resp.PropertySet(packet.PropertyMaximumQoS, byte(m.MaximumQoS)) // nolint: errcheck
}
// [MQTT-3.2.2.3.4] tell client whether retained messages supported
resp.PropertySet(packet.PropertyRetainAvailable, boolToByte(m.AvailableRetain)) // nolint: errcheck
// [MQTT-3.2.2.3.5] if server max packet size less than 268435455 than let client to know about
if m.MaxPacketSize < types.DefaultMaxPacketSize {
resp.PropertySet(packet.PropertyMaximumPacketSize, m.MaxPacketSize) // nolint: errcheck
}
// [MQTT-3.2.2.3.6]
if len(id) > 0 {
resp.PropertySet(packet.PropertyAssignedClientIdentifier, id) // nolint: errcheck
}
// [MQTT-3.2.2.3.7]
if m.TopicAliasMaximum > 0 {
resp.PropertySet(packet.PropertyTopicAliasMaximum, m.TopicAliasMaximum) // nolint: errcheck
}
// [MQTT-3.2.2.3.10] tell client whether server supports wildcard subscriptions or not
resp.PropertySet(packet.PropertyWildcardSubscriptionAvailable, boolToByte(m.AvailableWildcardSubscription)) // nolint: errcheck
// [MQTT-3.2.2.3.11] tell client whether server supports subscription identifiers or not
resp.PropertySet(packet.PropertySubscriptionIdentifierAvailable, boolToByte(m.AvailableSubscriptionID)) // nolint: errcheck
// [MQTT-3.2.2.3.12] tell client whether server supports shared subscriptions or not
resp.PropertySet(packet.PropertySharedSubscriptionAvailable, boolToByte(m.AvailableSharedSubscription)) // nolint: errcheck
}
func (m *Manager) getSubscriber(id string, clean bool, v packet.ProtocolVersion) (subscriber.ConnectionProvider, bool) {
var sub subscriber.ConnectionProvider
var present bool
if clean {
if sb, ok := m.subscribers.Load(id); ok {
sub = sb.(subscriber.ConnectionProvider)
sub.Offline(true)
m.subscribers.Delete(id)
}
go func() {
m.boltOps.Add(1)
defer m.boltOps.Done()
start := time.Now()
if err := m.persistence.Delete([]byte(id)); err != nil {
m.log.Error("Couldn't wipe session", zap.String("ClientID", id), zap.Error(err))
}
cost := time.Now().Sub(start)
if cost >= 500*time.Millisecond {
m.log.Warn("delete state too slowly", zap.Duration("cost", cost), zap.String("ClientId", id))
}
}()
}
if sb, ok := m.subscribers.Load(id); !ok {
sub = subscriber.New(&subscriber.Config{
ID: id,
Topics: m.TopicsMgr,
OnOfflinePublish: m.onOfflinePublish,
OfflineQoS0: m.OfflineQoS0,
Version: v,
})
present = m.persistence.Exists([]byte(id))
m.subscribers.Store(id, sub)
m.log.Debug("subscriber created", zap.String("ClientID", id))
} else {
m.log.Debug("subscriber found", zap.String("ClientID", id))
sub = sb.(subscriber.ConnectionProvider)
present = true
}
return sub, present
}
func (m *Manager) genClientID() string {
b := make([]byte, 15)
if _, err := io.ReadFull(rand.Reader, b); err != nil {
return ""
}
return base64.URLEncoding.EncodeToString(b)
}
func (m *Manager) onDisconnect(id string, reason packet.ReasonCode, retain bool) {
m.log.Debug("client logout", zap.String("clientID", id), zap.String("reason", reason.Error()))
m.Systree.Clients().Disconnected(id, reason, retain)
}
func (m *Manager) onSubscriberShutdown(sub subscriber.ConnectionProvider) {
m.log.Debug("shutdown subscriber", zap.String("ClientID", sub.ID()))
sub.Offline(true)
m.subscribers.Delete(sub.ID())
}
func (m *Manager) onSessionClose(id string, reason exitReason) {
if reason == exitReasonClean || reason == exitReasonExpired {
m.log.Debug("clean session state", zap.String("ClientId", id))
go func() {
m.boltOps.Add(1)
defer m.boltOps.Done()
start := time.Now()
if err := m.persistence.Delete([]byte(id)); err != nil {
m.log.Error("Couldn't wipe session", zap.String("ClientID", id), zap.Error(err))
}
cost := time.Now().Sub(start)
if cost >= 500*time.Millisecond {
m.log.Warn("delete state too slowly", zap.Duration("cost", cost), zap.String("ClientId", id))
}
}()
} else if s, ok := m.sessions.Load(id); ok {
sub := s.(*sessionWrap).s.subscriber
if sub != nil && sub.HasSubscriptions() {
st := s.(*sessionWrap).s.getRuntimeState()
m.log.Debug("persisting session state",
zap.String("ClientId", id), zap.Any("Ver", st.Version),
zap.Any("Errors", st.Errors), zap.Any("Expire", st.Expire),
zap.String("Timestamp", st.Timestamp),
zap.Any("Subscriptions", sub.Subscriptions()),
)
go func() {
m.boltOps.Add(1)
defer m.boltOps.Done()
start := time.Now()
err := m.persistence.StateStore([]byte(id), st)
if err != nil {
m.log.Warn("persist session state fail", zap.String("ClientId", id), zap.Error(err))
}
cost := time.Now().Sub(start)
if cost >= 500*time.Millisecond {
m.log.Warn("store state too slowly", zap.Duration("cost", cost), zap.String("ClientId", id))
}
}()
} else {
// if the client has no subscriptions, maybe no persistence are needed.
m.log.Debug("client has no subscriptions", zap.String("ClientId", id))
go func() {
m.boltOps.Add(1)
defer m.boltOps.Done()
start := time.Now()
err := m.persistence.Delete([]byte(id))
if err != nil {
m.log.Warn("failed to delete persisted session ", zap.String("ClientId", id), zap.Error(err))
}
cost := time.Now().Sub(start)
if cost >= 500*time.Millisecond {
m.log.Warn("delete state too slowly", zap.Duration("cost", cost), zap.String("ClientId", id))
}
}()
}
}
m.sessions.Delete(id)
m.sessionsCount.Done()
atomic.AddInt64(&m.curClientCount, -1)
m.log.Debug("shutdown session", zap.Any("reason", reason), zap.String("clientID", id))
}
func (m *Manager) loadSessions() error {
m.log.Info("loading persisted sessions")
subs := map[string]*subscriberConfig{}
delayedWills := []*packet.Publish{}
var toBeCleaned [][]byte
err := m.persistence.LoadForEach(func(id []byte, state *persistence.SessionState) error {
sID := string(id)
if len(state.Errors) != 0 {
m.log.Warn("load session failed", zap.String("ClientID", sID), zap.Errors("errors", state.Errors))
toBeCleaned = append(toBeCleaned, id)
return nil
}
if state.Expire != nil {
since, err := time.Parse(time.RFC3339, state.Expire.Since)
if err != nil {
m.log.Warn("parse expiration fail", zap.String("ClientID", sID), zap.Error(err))
toBeCleaned = append(toBeCleaned, id)
return nil
}
var will *packet.Publish
var willIn uint32
var expireIn uint32
// if persisted state has delayed will lets check if it has not elapsed its time
if len(state.Expire.WillIn) > 0 && len(state.Expire.WillData) > 0 {
pkt, _, _ := packet.Decode(packet.ProtocolVersion(state.Version), state.Expire.WillData)
will, _ = pkt.(*packet.Publish)
if val, err := strconv.Atoi(state.Expire.WillIn); err == nil {
willIn = uint32(val)
willAt := since.Add(time.Duration(willIn) * time.Second)
if time.Now().After(willAt) {
// will delay elapsed. notify that
delayedWills = append(delayedWills, will)
will = nil
}
} else {
m.log.Error("decode will fail", zap.String("ClientID", sID), zap.Error(err))
}
}
if len(state.Expire.ExpireIn) > 0 {
if val, err := strconv.Atoi(state.Expire.ExpireIn); err == nil {
expireIn = uint32(val)
expireAt := since.Add(time.Duration(expireIn) * time.Second)
if time.Now().After(expireAt) {
m.log.Debug("persisted session expired", zap.String("ClientID", string(id)))
toBeCleaned = append(toBeCleaned, id)
return nil
}
} else {
m.log.Error("decode expire fail", zap.String("ClientID", sID), zap.Error(err))
}
}
// persisted session has either delayed will or expiry
// create it and run timer
if will != nil || expireIn > 0 {
createdAt, _ := time.Parse(time.RFC3339, state.Timestamp)
ses := m.allocSession(sID, createdAt)
var exp *uint32
if expireIn > 0 {
exp = &expireIn
}
setup := &sessionReConfig{
subscriber: nil,
expireIn: exp,
will: will,
willDelay: willIn,
killOnDisconnect: false,
}
ses.s.reconfigure(setup, true)
m.sessions.Store(id, ses)
m.sessionsCount.Add(1)
atomic.AddInt64(&m.curClientCount, 1)
ses.release()
}
}
if len(state.Subscriptions) > 0 {
if sCfg, err := m.loadSubscriber(state.Subscriptions); err == nil {
subs[sID] = sCfg
m.log.Info("persisted subscriptions loaded", zap.String("ClientID", sID), zap.Any("ver", sCfg.version), zap.Any("sub", sCfg.topics))
} else {
m.log.Error("decode subscriptions fail", zap.String("ClientID", sID), zap.Error(err))
}
}
return nil
})
for i := 0; i < len(toBeCleaned); i++ {
m.log.Debug("remove session", zap.ByteString("ClientId", toBeCleaned[i]))
if err := m.persistence.Delete(toBeCleaned[i]); err != nil {
m.log.Debug("wipe out session error", zap.Error(err), zap.String("ClientId", string(toBeCleaned[i])))
}
}
for id, t := range subs {
sub := subscriber.New(
&subscriber.Config{
ID: id,
Topics: m.TopicsMgr,
OnOfflinePublish: m.onOfflinePublish,
OfflineQoS0: m.OfflineQoS0,
Version: t.version,
})
for topic, ops := range t.topics {
if _, _, err = sub.Subscribe(topic, ops); err != nil {
m.log.Error("Couldn't subscribe", zap.Error(err))
}
}
m.subscribers.Store(id, sub)
}
// publish delayed wills if any
for _, will := range delayedWills {
if err = m.TopicsMgr.Publish(will); err != nil {
m.log.Error("Publish delayed will", zap.Error(err))
}
}
return err
}
func (m *Manager) loadSubscriber(from []byte) (*subscriberConfig, error) {
subscriptions := subscriber.Subscriptions{}
offset := 0
version := packet.ProtocolVersion(from[offset])
offset++
remaining := len(from) - 1
for offset < remaining {
t, total, e := packet.ReadLPBytes(from[offset:])
if e != nil {
return nil, e
}
offset += total
params := &topicsTypes.SubscriptionParams{}
group, total, e := packet.ReadLPBytes(from[offset:])
if e != nil {
return nil, e
}
offset += total
params.Group = string(group)
params.Ops = packet.SubscriptionOptions(from[offset])
offset++
params.ID = binary.BigEndian.Uint32(from[offset:])
offset += 4
subscriptions[string(t)] = params
}
return &subscriberConfig{
version: version,
topics: subscriptions,
}, nil
}
func (b *offlineMessageBufferManager) save(sessionID string, p *persistence.PersistedPacket) {
// read lock for using workingBuf that will be use by sync process.
b.bLock.RLock()
defer b.bLock.RUnlock()
b.mLock.Lock()
l, ok := (*b.workingBuf)[sessionID]
if !ok {
l = types.NewListWithLock()
(*b.workingBuf)[sessionID] = l
}
b.mLock.Unlock()
l.PushBack(p)
}
func (b *offlineMessageBufferManager) syncRoutine() {
defer b.sm.ombmRtWg.Done()
tkd := time.Duration(b.sm.PersistBufferSyncInterval) * time.Millisecond
tk := time.NewTimer(tkd)
for {
<-tk.C
select {
case <-b.sm.quit:
// wait for all sessions to be closed to ensure no new offline messages will be pushed in anymore.
b.sm.sessionsCount.Wait()
b.sm.log.Info("flushed offline messages")
b.flush()
return
default:
}
b.sync()
tk.Reset(tkd)
}
}
func (b *offlineMessageBufferManager) sync() {
var buf *map[string]*types.ListWithLock
b.bLock.Lock()
if b.workingBuf == b.buf0 {
b.workingBuf = b.buf1
buf = b.buf0
} else {
buf = b.buf1
b.workingBuf = b.buf0
}
b.bLock.Unlock()
var p *persistence.PersistedPacket
var maxSyncBuf = 10000
for id, l := range *buf {
if l.Len() < 1 {
continue
}
b.sm.log.Debug("buffered message remains", zap.Int("count", l.Len()))
n := 0
removedFromList := []*list.Element{}
TRAVERSE:
for m := l.Front(); ; m = m.Next() {
if m != nil && n < maxSyncBuf { // push into sync buffer
p = m.Value.(*persistence.PersistedPacket)
b.sm.log.Debug("persisting packet", zap.String("ClientID", id), zap.Int64("MsgCreatedAt", p.CreatedAt))
b.syncBuf = append(b.syncBuf, *p)
removedFromList = append(removedFromList, m)
n++
continue
} else if len(removedFromList) > 0 { // sync buffer
n = 0 // reset counter
b.sm.log.Debug("persisting packets", zap.String("ClientID", id), zap.Int("Count", len(b.syncBuf)))
if err := b.sm.persistence.PacketsStore([]byte(id), b.syncBuf); err != nil {
b.sm.log.Error("Couldn't persisting message", zap.String("ClientID", id), zap.Error(err))
b.syncBuf = nil
break
} else {
b.syncBuf = nil
for _, elm := range removedFromList {
l.Remove(elm)
}
removedFromList = nil
}
}
if m != nil { // more to be handled, traverse the rest of the list.
n = 0
goto TRAVERSE
} else { // all popped
b.sm.log.Debug("all packets persisted", zap.String("ClientID", id))
break
}
}
}
}
func (b *offlineMessageBufferManager) flush() {
b.sync()
// 交换工作buf链表,再同步另外一个链表的数据
b.sync()
}
func (m *Manager) onOfflinePublish(id string, p *packet.Publish) {
pkt := persistence.PersistedPacket{UnAck: false}
if p.Expired(false) {
return
}
if tm := p.GetExpiry(); !tm.IsZero() {
pkt.ExpireAt = tm.Format(time.RFC3339)
}
pkt.CreatedAt = p.GetCreateTimestamp()
p.SetPacketID(0)
var err error
pkt.Data, err = packet.Encode(p)
if err != nil {
m.log.Error("Couldn't encode packet", zap.String("clientID", id), zap.Error(err))
return
}
if m.PersistUseBuffer {
m.log.Debug("use sync buffer", zap.String("clientID", id), zap.Int64("create", p.GetCreateTimestamp()))
m.ombm.save(id, &pkt)
return
}
m.log.Debug("persisting packet directly", zap.String("clientID", id), zap.Int64("create", p.GetCreateTimestamp()))
if err = m.persistence.PacketStore([]byte(id), pkt); err != nil {
m.log.Error("Couldn't persist message", zap.String("ClientID", id), zap.Error(err))
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/kaifazhe/volantmq.git
git@gitee.com:kaifazhe/volantmq.git
kaifazhe
volantmq
volantmq
v0.0.4

Search

D67c1975 1850385 1daf7b77 1850385