1 Star 0 Fork 0

jerryduren / architecture

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
broker.go 2.08 KB
一键复制 编辑 原始数据 按行查看 历史
jerryduren 提交于 2021-08-26 22:31 . m
package domains
import (
"errors"
"fmt"
"strings"
)
type Broker struct {
topics map[string]*topic // key is name of topic
lenOfMessageQueue int
// todo
}
func NewBroker(lenOfMessageQueue int, defaultTopic string) *Broker {
topics := new(Broker)
topics.lenOfMessageQueue = lenOfMessageQueue
topics.topics = make(map[string]*topic)
if len(defaultTopic) == 0 {
_ = topics.CreateTopic("test")
} else {
_ = topics.CreateTopic(defaultTopic)
}
return topics
}
func (b *Broker) CreateTopic(topic string) error {
if _, ok := b.topics[topic]; ok {
return errors.New("topic["+topic+"] has existed")
}
b.topics[topic] = newTopic(topic, b.lenOfMessageQueue)
return nil
}
func (b *Broker) DeleteTopic(topicName string) {
if topic, ok := b.topics[topicName]; ok{
topic.destroyTopic()
delete(b.topics,topicName)
}
}
func (b *Broker) Subscribe(url string, topic string) error{
// check whether url is valid
if !strings.HasPrefix(strings.ToLower(url), "http://"){
return errors.New("url["+url+"] is invalid")
}
if topic, ok := b.topics[topic]; ok{
return topic.subscriber(url)
}
return errors.New("topic["+topic+"] has not existed")
}
func (b *Broker) Unsubscribe(url string, topic string) {
if topic, ok := b.topics[topic]; ok{
topic.unsubscribe(url)
}
}
func (b *Broker) Producer (topicName string, message []byte) error{
if message == nil{
return errors.New("message is empty")
}
if topic, ok := b.topics[topicName]; ok{
return topic.produceMessage(message)
}else {
return errors.New("topic["+topicName+"] not exist")
}
}
func (b *Broker) PerformanceStat () string {
result := fmt.Sprintf("\n\n%-25s|%-25s|%-25s|%-25s|%-25s|%-25s|%-25s\n", "Topic", "RX Packetes", "dropped with queue full", "TX Packets", "dropped with TX failure", "Depth of Queue", "Subscribers")
for _, topic := range b.topics{
result = result + fmt.Sprintf("%-25s|%-25d|%-25d|%-25d|%-25d|%-25d|%s\n",topic.name,topic.receivePackets,topic.dropPacketsWithQueueFull,topic.sendPacketsSucceed,topic.dropPacketsWithSendFailure,topic.queueDepth,topic.getSubscriberList())
}
return result
}
Go
1
https://gitee.com/jerryduren/architecture.git
git@gitee.com:jerryduren/architecture.git
jerryduren
architecture
architecture
36a2cef21a1f

搜索帮助