代码拉取完成,页面将自动刷新
package domains
import (
"errors"
"fmt"
"strings"
)
type Broker struct {
topics map[string]*topic // key is name of topic
lenOfMessageQueue int
// todo
}
func NewBroker(lenOfMessageQueue int, defaultTopic string) *Broker {
topics := new(Broker)
topics.lenOfMessageQueue = lenOfMessageQueue
topics.topics = make(map[string]*topic)
if len(defaultTopic) == 0 {
_ = topics.CreateTopic("test")
} else {
_ = topics.CreateTopic(defaultTopic)
}
return topics
}
func (b *Broker) CreateTopic(topic string) error {
if _, ok := b.topics[topic]; ok {
return errors.New("topic["+topic+"] has existed")
}
b.topics[topic] = newTopic(topic, b.lenOfMessageQueue)
return nil
}
func (b *Broker) DeleteTopic(topicName string) {
if topic, ok := b.topics[topicName]; ok{
topic.destroyTopic()
delete(b.topics,topicName)
}
}
func (b *Broker) Subscribe(url string, topic string) error{
// check whether url is valid
if !strings.HasPrefix(strings.ToLower(url), "http://"){
return errors.New("url["+url+"] is invalid")
}
if topic, ok := b.topics[topic]; ok{
return topic.subscriber(url)
}
return errors.New("topic["+topic+"] has not existed")
}
func (b *Broker) Unsubscribe(url string, topic string) {
if topic, ok := b.topics[topic]; ok{
topic.unsubscribe(url)
}
}
func (b *Broker) Producer (topicName string, message []byte) error{
if message == nil{
return errors.New("message is empty")
}
if topic, ok := b.topics[topicName]; ok{
return topic.produceMessage(message)
}else {
return errors.New("topic["+topicName+"] not exist")
}
}
func (b *Broker) PerformanceStat () string {
result := fmt.Sprintf("\n\n%-25s|%-25s|%-25s|%-25s|%-25s|%-25s|%-25s\n", "Topic", "RX Packetes", "dropped with queue full", "TX Packets", "dropped with TX failure", "Depth of Queue", "Subscribers")
for _, topic := range b.topics{
result = result + fmt.Sprintf("%-25s|%-25d|%-25d|%-25d|%-25d|%-25d|%s\n",topic.name,topic.receivePackets,topic.dropPacketsWithQueueFull,topic.sendPacketsSucceed,topic.dropPacketsWithSendFailure,topic.queueDepth,topic.getSubscriberList())
}
return result
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。