1 Star 0 Fork 2

何吕 / volantmq

forked from JUMEI_ARCH / volantmq 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
sessions.go 2.10 KB
一键复制 编辑 原始数据 按行查看 历史
Artur Troian 提交于 2017-07-19 15:50 . Complete refactoring
package mem
import (
"sync"
"github.com/troian/surgemq/persistence/types"
)
type sessions struct {
status *dbStatus
lock sync.Mutex
sessions map[string]*persistenceTypes.SessionState
}
func (s *sessions) PutOutMessage(id []byte, data []byte) error {
select {
case <-s.status.done:
return persistenceTypes.ErrNotOpen
default:
}
defer s.lock.Unlock()
s.lock.Lock()
if _, ok := s.sessions[string(id)]; !ok {
s.sessions[string(id)] = &persistenceTypes.SessionState{}
}
s.sessions[string(id)].OutMessages = append(s.sessions[string(id)].OutMessages, data)
return nil
}
// Get
func (s *sessions) Get(id []byte) (*persistenceTypes.SessionState, error) {
select {
case <-s.status.done:
return nil, persistenceTypes.ErrNotOpen
default:
}
defer s.lock.Unlock()
s.lock.Lock()
ses, ok := s.sessions[string(id)]
if !ok {
return nil, persistenceTypes.ErrNotFound
}
return ses, nil
}
// Load
func (s *sessions) Load(load func([]byte, *persistenceTypes.SessionState)) error {
select {
case <-s.status.done:
return persistenceTypes.ErrNotOpen
default:
}
defer s.lock.Unlock()
s.lock.Lock()
for id, state := range s.sessions {
load([]byte(id), state)
}
return nil
}
// Store session state
func (s *sessions) Store(id []byte, state *persistenceTypes.SessionState) error {
select {
case <-s.status.done:
return persistenceTypes.ErrNotOpen
default:
}
defer s.lock.Unlock()
s.lock.Lock()
if st, ok := s.sessions[string(id)]; ok {
st.OutMessages = append(st.OutMessages, state.OutMessages...)
st.UnAckMessages = append(st.UnAckMessages, state.UnAckMessages...)
} else {
s.sessions[string(id)] = state
}
return nil
}
// Delete
func (s *sessions) Delete(id []byte) error {
select {
case <-s.status.done:
return persistenceTypes.ErrNotOpen
default:
}
defer s.lock.Unlock()
s.lock.Lock()
delete(s.sessions, string(id))
return nil
}
// Wipe
func (s *sessions) Wipe() error {
select {
case <-s.status.done:
return persistenceTypes.ErrNotOpen
default:
}
defer s.lock.Unlock()
s.lock.Lock()
s.sessions = make(map[string]*persistenceTypes.SessionState)
return nil
}
Go
1
https://gitee.com/kaifazhe/volantmq.git
git@gitee.com:kaifazhe/volantmq.git
kaifazhe
volantmq
volantmq
v0.0.1-beta

搜索帮助