代码拉取完成,页面将自动刷新
同步操作将从 JUMEI_ARCH/volantmq 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
// Copyright (c) 2014 The VolantMQ Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mem
import (
"math/rand"
"strings"
"sync"
"time"
"github.com/VolantMQ/persistence"
"github.com/VolantMQ/volantmq/configuration"
"github.com/VolantMQ/volantmq/packet"
"github.com/VolantMQ/volantmq/subscriber"
"github.com/VolantMQ/volantmq/systree"
"github.com/VolantMQ/volantmq/topics/types"
"github.com/VolantMQ/volantmq/trace"
"github.com/VolantMQ/volantmq/types"
"go.uber.org/zap"
)
type provider struct {
smu sync.RWMutex
root *node
topicsStat systree.TopicsStat
subscriptionsStat systree.SubscriptionsStat
publishTopic map[string]bool
persist persistence.Retained
log *zap.Logger
onCleanUnsubscribe func([]string)
wgPublisher sync.WaitGroup
wgPublisherStarted sync.WaitGroup
inbound chan *packet.Publish
inRetained chan types.RetainObject
allowOverlapping bool
}
var _ topicsTypes.Provider = (*provider)(nil)
// NewMemProvider returns an new instance of the provider, which is implements the
// TopicsProvider interface. provider is a hidden struct that stores the topic
// subscriptions and retained messages in memory. The content is not persistent so
// when the server goes, everything will be gone. Use with care.
func NewMemProvider(config *topicsTypes.MemConfig) (topicsTypes.Provider, error) {
p := &provider{
topicsStat: config.TopicsStat,
subscriptionsStat: config.SubscriptionsStat,
persist: config.Persist,
onCleanUnsubscribe: config.OnCleanUnsubscribe,
inbound: make(chan *packet.Publish, 1024*512),
inRetained: make(chan types.RetainObject, 1024*512),
allowOverlapping: config.AllowOverlappingSubscriptions,
publishTopic: make(map[string]bool),
}
p.root = newNode(p.allowOverlapping, nil)
p.log = configuration.GetLogger().Named("topics").Named(config.Name)
if p.persist != nil {
entries, err := p.persist.Load()
if err != nil && err != persistence.ErrNotFound {
return nil, err
}
var ver packet.ProtocolVersion
for _, d := range entries {
if d.Version == 0 {
ver = packet.ProtocolV311 // TODO: may not be compatible.
} else {
ver = packet.ProtocolVersion(d.Version)
}
pkt, _, err := packet.Decode(ver, d.Data)
if err != nil {
p.log.Error("Couldn't decode retained message", zap.Error(err))
} else {
if m, ok := pkt.(*packet.Publish); ok {
m.SetCreateTimestamp(d.CreatedAt)
if len(d.ExpireAt) > 0 {
if tm, err := time.Parse(time.RFC3339, d.ExpireAt); err == nil {
m.SetExpiry(tm)
} else {
p.log.Error("Decode publish expire at", zap.Error(err))
}
}
p.log.Debug("load retained message", zap.String("topic", m.Topic()),
zap.Any("version", m.Version()),
zap.String("payload", string(m.Payload())),
zap.Int64("create", m.GetCreateTimestamp()),
)
p.Retain(m) // nolint: errcheck
} else {
p.log.Warn("invalid retained message type", zap.String("type", m.Type().Name()))
}
}
}
}
p.wgPublisher.Add(1)
p.wgPublisherStarted.Add(1)
go p.publisher()
p.wgPublisherStarted.Wait()
p.wgPublisher.Add(1)
p.wgPublisherStarted.Add(1)
go p.retainer()
p.wgPublisherStarted.Wait()
return p, nil
}
func (mT *provider) Subscribe(filter string, s topicsTypes.Subscriber, p *topicsTypes.SubscriptionParams) (packet.QosType, []*packet.Publish, error) {
mT.smu.Lock()
defer mT.smu.Unlock()
p.Granted = p.Ops.QoS()
exists := mT.subscriptionInsert(filter, s, p)
if exists == false && strings.HasPrefix(filter, "$SYS") == false {
mT.subscriptionsStat.Subscribed()
}
if p.Group != "" {
return p.Granted, nil, nil
}
var r []*packet.Publish
// [MQTT-3.3.1-5]
rh := p.Ops.RetainHandling()
if (rh == packet.RetainHandlingRetain) || ((rh == packet.RetainHandlingIfNotExists) && !exists) {
mT.retainSearch(filter, &r)
}
return p.Granted, r, nil
}
func (mT *provider) UnSubscribe(topic string, sub topicsTypes.Subscriber) error {
mT.smu.Lock()
defer mT.smu.Unlock()
err := mT.subscriptionRemove(topic, sub)
if err == nil && strings.HasPrefix(topic, "$SYS") == false {
mT.subscriptionsStat.UnSubscribed()
}
return err
}
func (mT *provider) Publish(m interface{}) error {
msg, ok := m.(*packet.Publish)
if !ok {
return topicsTypes.ErrUnexpectedObjectType
}
mT.inbound <- msg
return nil
}
func (mT *provider) Retain(obj types.RetainObject) error {
mT.inRetained <- obj
return nil
}
func (mT *provider) Retained(filter string) ([]*packet.Publish, error) {
// [MQTT-3.3.1-5]
var r []*packet.Publish
mT.smu.Lock()
defer mT.smu.Unlock()
// [MQTT-3.3.1-5]
mT.retainSearch(filter, &r)
return r, nil
}
func (mT *provider) Close() error {
mT.smu.Lock()
defer mT.smu.Unlock()
close(mT.inbound)
close(mT.inRetained)
mT.wgPublisher.Wait()
if mT.persist != nil {
var res []*packet.Publish
// [MQTT-3.3.1-5]
mT.retainSearch("#", &res)
mT.retainSearch("/#", &res)
mT.retainSearch("$share/#", &res)
var encoded []persistence.PersistedPacket
for _, pkt := range res {
// Discard retained QoS0 messages
if pkt.QoS() != packet.QoS0 && !pkt.Expired(false) {
if buf, err := packet.Encode(pkt); err != nil {
mT.log.Error("Couldn't encode retained message", zap.Error(err))
} else {
entry := persistence.PersistedPacket{
Data: buf,
Version: persistence.ProtocolVersion(pkt.Version()),
CreatedAt: pkt.GetCreateTimestamp(),
}
if tm := pkt.GetExpiry(); !tm.IsZero() {
entry.ExpireAt = tm.Format(time.RFC3339)
}
encoded = append(encoded, entry)
}
}
}
if len(encoded) > 0 {
mT.log.Debug("store retained messages", zap.Int("amount", len(encoded)))
if err := mT.persist.Store(encoded); err != nil {
mT.log.Error("Couldn't persist retained messages", zap.Error(err))
}
} else {
if err := mT.persist.Wipe(); err != nil {
mT.log.Error("Couldn't clean retained messages", zap.Error(err))
}
}
}
mT.root = nil
return nil
}
func (mT *provider) retain(obj types.RetainObject) {
insert := true
mT.smu.Lock()
defer mT.smu.Unlock()
switch t := obj.(type) {
case *packet.Publish:
// [MQTT-3.3.1-10] [MQTT-3.3.1-7]
if len(t.Payload()) == 0 || t.QoS() == packet.QoS0 {
mT.retainRemove(obj.Topic()) // nolint: errcheck
if len(t.Payload()) == 0 {
insert = false
}
}
}
if insert {
mT.retainInsert(obj.Topic(), obj)
}
}
func (mT *provider) retainer() {
defer func() {
mT.wgPublisher.Done()
}()
mT.wgPublisherStarted.Done()
for obj := range mT.inRetained {
mT.retain(obj)
}
}
func (mT *provider) subscriberBalance(entries publishEntries) []*publishEntry {
balance := make(map[string][]*publishEntry)
subs := make([]*publishEntry, 0)
for _, pub := range entries {
for _, e := range pub {
mT.log.Debug("subscribe list", zap.String("clientID", e.s.ID()), zap.String("group", e.group))
if e.group == "" {
subs = append(subs, e)
} else {
_, ok := balance[e.group]
if !ok {
balance[e.group] = make([]*publishEntry, 0)
}
balance[e.group] = append(balance[e.group], e)
}
}
}
rand.Seed(time.Now().UnixNano())
for group, arr := range balance {
num := len(arr)
rnd := 0
if num > 1 {
rnd = rand.Intn(num)
}
subs = append(subs, arr[rnd])
mT.log.Debug("choosed subscriber", zap.String("clientID", arr[rnd].s.ID()), zap.String("group", group))
}
balance = nil
return subs
}
func (mT *provider) publisher() {
var lockAcquired bool
defer func() {
mT.wgPublisher.Done()
if lockAcquired {
mT.smu.Unlock()
}
ef := recover()
if ef != nil {
mT.log.Error(ef.(error).Error())
}
}()
mT.wgPublisherStarted.Done()
for msg := range mT.inbound {
pubEntries := publishEntries{}
mT.smu.Lock()
lockAcquired = true
start := time.Now()
msg.SetFindSubscriber(time.Now().UnixNano())
if strings.HasPrefix(msg.Topic(), "$SYS") == false {
if _, ok := mT.publishTopic[msg.Topic()]; !ok {
mT.publishTopic[msg.Topic()] = true
mT.topicsStat.Added()
mT.log.Debug("publish new topic", zap.String("topic", msg.Topic()))
}
}
mT.subscriptionSearch(msg.Topic(), msg.PublishID(), &pubEntries)
if len(pubEntries) == 0 && strings.HasPrefix(msg.Topic(), "$SYS") == false {
mT.log.Warn("drop message because no subscribers",
zap.String("topic", msg.Topic()),
zap.Int64("create", msg.GetCreateTimestamp()),
)
}
subs := mT.subscriberBalance(pubEntries)
for _, sub := range subs {
if err := sub.s.Publish(msg, sub.qos, sub.ops, sub.ids); err != nil {
mT.log.Error("publish error", zap.Error(err))
} else {
s := sub.s.(*subscriber.Type)
if trace.GetInstance().Status(s.ID(), msg.Topic()) {
mT.log.Info("publish message",
zap.String("subsriber", s.ID()),
zap.String("topic", msg.Topic()),
zap.String("group", sub.group),
zap.Int64("create", msg.GetCreateTimestamp()),
)
} else {
mT.log.Debug("publish message",
zap.String("subsriber", s.ID()),
zap.String("topic", msg.Topic()),
zap.String("group", sub.group),
zap.Int64("create", msg.GetCreateTimestamp()),
)
}
}
sub.s.Release()
}
if strings.HasPrefix(msg.Topic(), "$SYS") == false {
mT.log.Debug("publish stat",
zap.String("topic", msg.Topic()),
zap.Int64("create", msg.GetCreateTimestamp()),
zap.Int("subs", len(subs)),
zap.Duration("cost", time.Now().Sub(start)),
)
}
mT.smu.Unlock()
lockAcquired = false
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。