Ai
1 Star 0 Fork 2

何吕/volantmq

forked from JUMEI_ARCH/volantmq 
Create your Gitee Account
Explore and code with more than 13.5 million developers,Free private repositories !:)
Sign up
文件
Clone or Download
session.go 9.16 KB
Copy Edit Raw Blame History
hawklin authored 2018-05-10 14:30 +08:00 . 共享订阅:
package clients
import (
"sync"
"time"
"strconv"
"encoding/binary"
"unsafe"
"github.com/VolantMQ/persistence"
"github.com/VolantMQ/volantmq/connection"
"github.com/VolantMQ/volantmq/packet"
"github.com/VolantMQ/volantmq/subscriber"
"github.com/VolantMQ/volantmq/types"
)
type exitReason int
const (
exitReasonClean exitReason = iota
exitReasonShutdown
exitReasonExpired
)
type switchStatus int
const (
swStatusSwitched switchStatus = iota
swStatusIsOnline
swStatusFinalized
)
type onSessionClose func(string, exitReason)
type onDisconnect func(string, packet.ReasonCode, bool)
type onSubscriberShutdown func(subscriber.ConnectionProvider)
type sessionEvents struct {
signalClose onSessionClose
signalDisconnected onDisconnect
shutdownSubscriber onSubscriberShutdown
}
type sessionPreConfig struct {
sessionEvents
id string
createdAt time.Time
messenger types.TopicMessenger
}
type sessionReConfig struct {
subscriber subscriber.ConnectionProvider
will *packet.Publish
expireIn *uint32
willDelay uint32
killOnDisconnect bool
}
type session struct {
sessionEvents
*sessionReConfig
id string
messenger types.TopicMessenger
createdAt time.Time
expiringSince time.Time
lock sync.Mutex
connStop *types.Once
disconnectOnce *types.OnceWait
wgDisconnected sync.WaitGroup
conn *connection.Type
timer *time.Timer
timerLock sync.Mutex
finalized bool
isOnline chan struct{}
}
type sessionWrap struct {
s *session
lock *sync.Mutex
}
func (s *sessionWrap) acquire() {
s.lock.Lock()
}
func (s *sessionWrap) release() {
s.lock.Unlock()
}
func (s *sessionWrap) swap(w *sessionWrap) *session {
s.s = w.s
return s.s
}
func newSession(c *sessionPreConfig) *session {
s := &session{
sessionEvents: c.sessionEvents,
id: c.id,
createdAt: c.createdAt,
messenger: c.messenger,
isOnline: make(chan struct{}),
}
s.timer = time.AfterFunc(10*time.Second, s.timerCallback)
s.timer.Stop()
return s
}
func (s *session) reconfigure(c *sessionReConfig, runExpiry bool) {
s.sessionReConfig = c
s.finalized = false
if runExpiry {
s.runExpiry(true)
}
}
func (s *session) allocConnection(c *connection.PreConfig) error {
cfg := &connection.Config{
PreConfig: c,
ID: s.id,
OnDisconnect: s.onDisconnect,
Subscriber: s.subscriber,
Messenger: s.messenger,
KillOnDisconnect: s.killOnDisconnect,
ExpireIn: s.expireIn,
}
s.disconnectOnce = &types.OnceWait{}
s.connStop = &types.Once{}
var err error
s.wgDisconnected.Add(1)
s.conn, err = connection.New(cfg)
return err
}
func (s *session) start() {
s.isOnline = make(chan struct{})
s.conn.Start()
}
func (s *session) stop(reason packet.ReasonCode) *persistence.SessionState {
s.connStop.Do(func() {
if s.conn != nil {
s.conn.Stop(reason)
}
})
s.wgDisconnected.Wait()
if !s.timer.Stop() {
s.timerLock.Lock()
s.timerLock.Unlock() // nolint: megacheck
}
if !s.finalized {
s.signalClose(s.id, exitReasonShutdown)
s.finalized = true
}
return s.getRuntimeState()
}
func (s *session) getRuntimeState() *persistence.SessionState {
state := &persistence.SessionState{
Timestamp: s.createdAt.Format(time.RFC3339),
}
if s.expireIn != nil || (s.willDelay > 0 && s.will != nil) {
state.Expire = &persistence.SessionDelays{
Since: s.expiringSince.Format(time.RFC3339),
}
elapsed := uint32(time.Since(s.expiringSince) / time.Second)
if (s.willDelay > 0 && s.will != nil) && (s.willDelay-elapsed) > 0 {
s.willDelay = s.willDelay - elapsed
s.will.SetPacketID(0)
if buf, err := packet.Encode(s.will); err != nil {
} else {
state.Expire.WillIn = strconv.Itoa(int(s.willDelay))
state.Expire.WillData = buf
}
}
if s.expireIn != nil && *s.expireIn > 0 && (*s.expireIn-elapsed) > 0 {
*s.expireIn = *s.expireIn - elapsed
}
}
if s.subscriber != nil {
state.Version = byte(s.subscriber.Version())
state.Subscriptions = s.GenPersistSubscriptions()
}
return state
}
func (s *session) GenPersistSubscriptions() []byte {
if s.subscriber == nil || !s.subscriber.HasSubscriptions() {
return nil
}
topics := s.subscriber.Subscriptions()
if len(topics) < 1 {
return nil
}
// calculate size of the encoded entry
// consist of:
// _ _ _ _ _ _ _ _ _ _ _
// |_|_|_|_|_|...|_|_|_|_|_|_|
// ___ _ _________ _ _______
// | | | | |
// | | | | 4 bytes - subscription id
// | | | | 1 byte - topic options
// | | | n bytes - topic
// | | 1 bytes - protocol version
// | 2 bytes - length prefix
size := 0
for topic, param := range topics {
size += 2 + len(topic) + 2 + len(param.Group) + 1 + int(unsafe.Sizeof(uint32(0)))
}
buf := make([]byte, size+1)
offset := 0
buf[offset] = byte(s.subscriber.Version())
offset++
for s, params := range topics {
total, _ := packet.WriteLPBytes(buf[offset:], []byte(s))
offset += total
total, _ = packet.WriteLPBytes(buf[offset:], []byte(params.Group))
offset += total
buf[offset] = byte(params.Ops)
offset++
binary.BigEndian.PutUint32(buf[offset:], params.ID)
offset += 4
}
return buf
}
// setOnline try switch session state from offline to online. This is necessary when
// when previous network connection has set session expiry or will delay or both
// if switch is successful then swStatusSwitched returned.
// if session has active network connection then returned value is swStatusIsOnline
// if connection has been closed and must not be used anymore then it returns swStatusFinalized
func (s *session) setOnline() switchStatus {
isOnline := false
// check session online status
s.lock.Lock()
select {
case <-s.isOnline:
default:
isOnline = true
}
s.lock.Unlock()
status := swStatusSwitched
if !isOnline {
// session is offline. before making any further step wait disconnect procedure is done
s.wgDisconnected.Wait()
// if stop returns false timer has been fired and there is goroutine might be running
if !s.timer.Stop() {
s.timerLock.Lock()
s.timerLock.Unlock() // nolint: megacheck
}
if s.finalized {
status = swStatusFinalized
}
} else {
status = swStatusIsOnline
}
return status
}
func (s *session) runExpiry(will bool) {
var timerPeriod uint32
// if meet will requirements point that
if will && s.will != nil && s.willDelay > 0 {
timerPeriod = s.willDelay
} else {
s.will = nil
}
if s.expireIn != nil {
// if will delay is set before and value less than expiration
// then timer should fire 2 times
if (timerPeriod > 0) && (timerPeriod < *s.expireIn) {
*s.expireIn = *s.expireIn - timerPeriod
} else {
timerPeriod = *s.expireIn
*s.expireIn = 0
}
}
s.expiringSince = time.Now()
s.timer.Reset(time.Duration(timerPeriod) * time.Second)
}
func (s *session) onDisconnect(p *connection.DisconnectParams) {
s.disconnectOnce.Do(func() {
defer s.wgDisconnected.Done()
s.lock.Lock()
close(s.isOnline)
s.lock.Unlock()
finalize := func(err exitReason) {
s.signalClose(s.id, err)
s.finalized = true
}
if p.ExpireAt != nil {
s.expireIn = p.ExpireAt
}
// If session expiry is set to 0, the Session ends when the Network Connection is closed
if s.expireIn != nil && *s.expireIn == 0 {
s.killOnDisconnect = true
}
// valid willMsg pointer tells we have will message
// if session is clean send will regardless to will delay
if p.Will && s.will != nil && (s.killOnDisconnect || s.willDelay == 0) {
s.messenger.Publish(s.will) // nolint: errcheck
s.will = nil
}
s.signalDisconnected(s.id, p.Reason, !s.killOnDisconnect)
if s.killOnDisconnect || !s.subscriber.HasSubscriptions() {
s.shutdownSubscriber(s.subscriber)
s.subscriber = nil
}
if s.killOnDisconnect {
defer finalize(exitReasonClean)
} else {
// check if remaining subscriptions exists, expiry is presented and will delay not set to 0
if s.expireIn == nil && s.willDelay == 0 {
// signal to shutdown session
defer finalize(exitReasonShutdown)
} else if (s.expireIn != nil && *s.expireIn > 0) || s.willDelay > 0 {
// new expiry value might be received upon disconnect message from the client
if p.ExpireAt != nil {
s.expireIn = p.ExpireAt
}
s.runExpiry(p.Will)
}
}
s.conn = nil
})
}
func (s *session) timerCallback() {
defer s.timerLock.Unlock()
s.timerLock.Lock()
finalize := func(reason exitReason) {
s.signalClose(s.id, reason)
s.finalized = true
}
// 1. check for will message available
if s.will != nil {
// publish if exists and wipe state
s.messenger.Publish(s.will) // nolint: errcheck
s.will = nil
s.willDelay = 0
}
if s.expireIn == nil {
// 2.a session has processed delayed will and there is nothing to do
// completely shutdown the session
defer finalize(exitReasonShutdown)
} else if *s.expireIn == 0 {
// session has expired. WIPE IT
if s.subscriber != nil {
s.shutdownSubscriber(s.subscriber)
}
defer finalize(exitReasonExpired)
} else {
// restart timer and wait again
val := *s.expireIn
// clear value pointed by expireIn so when next fire comes we signal session is expired
*s.expireIn = 0
s.timer.Reset(time.Duration(val) * time.Second)
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/kaifazhe/volantmq.git
git@gitee.com:kaifazhe/volantmq.git
kaifazhe
volantmq
volantmq
v0.0.4

Search