2 Star 1 Fork 0

法马智慧/fmgo

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
emqttx.go 3.48 KB
一键复制 编辑 原始数据 按行查看 历史
零海 提交于 2022-11-10 15:33 . 原mqtt->emqx ,新增旧版mqttx
package mqttx
// 与后端mqtt服务交互
import (
"errors"
"fmt"
"gitee.com/fmpt/fmgo/logx"
"gitee.com/fmpt/fmgo/stringx"
mqtt "github.com/eclipse/paho.mqtt.golang"
"strconv"
"strings"
"time"
)
type EClient struct {
client mqtt.Client
addr string
id string
topics map[string]HandFunc
}
func NewE(addr, id, username, password string) *EClient {
cl := &EClient{
addr: addr,
id: id,
}
opts := mqtt.NewClientOptions().AddBroker("tcp://" + addr).SetClientID(id)
opts.SetAutoReconnect(true)
if username != "" {
opts.SetUsername(username)
}
if password != "" {
opts.SetPassword(password)
}
opts.SetKeepAlive(60 * time.Second)
// Message callback handler,在没有任何订阅时,发布端调用此函数
//opts.SetDefaultPublishHandler(messagePubHandler)
opts.SetPingTimeout(1 * time.Second)
opts.OnConnect = cl.connectHandler
opts.OnConnectionLost = cl.connectLostHandler
cl.client = mqtt.NewClient(opts)
return cl
}
// 连接的回掉函数
func (c *EClient) connectHandler(client mqtt.Client) {
logx.Info("Connected to server")
if len(c.topics) == 0 {
return
}
for topic := range c.topics {
top := topic
if token := client.Subscribe(top+"/#", 1, func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Receive Subscribe Message :")
fmt.Printf("Sub Client Topic : %s, Data size is %d \n", msg.Topic(), len(msg.Payload()))
if len(msg.Payload()) > 0 {
topics := strings.Split(msg.Topic(), "/")
if len(topics) >= 2 {
c.topics[top](topics[0], strings.Join(topics[1:], "/"), msg.Payload())
return
}
c.topics[top](msg.Topic(), "", msg.Payload())
}
}); token.Wait() && token.Error() != nil {
logx.Error("Subscribe : ", top, "error ==> ", token.Error())
}
}
}
// 丢失连接的回掉函数
func (c *EClient) connectLostHandler(client mqtt.Client, err error) {
logx.Error("mqtt Connect loss: ", err)
}
func (c *EClient) errorHandler(err error) {
if err.Error() == "EOF" {
_ = c.Connect()
}
}
func (c *EClient) AddTopic(topic string, hFunc HandFunc) {
if c.topics == nil {
c.topics = make(map[string]HandFunc)
}
c.topics[topic] = hFunc
}
func (c *EClient) AddTopics(topics map[string]HandFunc) {
c.topics = topics
}
func (c *EClient) Connect() error {
if token := c.client.Connect(); token.Wait() && token.Error() != nil {
time.AfterFunc(1*time.Minute, func() {
err := c.Connect()
if err != nil {
logx.Error(err.Error())
}
})
return token.Error()
}
return nil
}
func (c *EClient) Publish(topic string, data []byte) error {
if token := c.client.Publish(topic, 1, true, data); token != nil {
return token.Error()
}
return nil
}
func (c *EClient) Request(topic string, data []byte) ([]byte, error) {
respTopic := c.randTopic("Request")
mc := make(chan []byte)
var msg []byte
if token := c.client.Subscribe(respTopic, 1, func(client mqtt.Client, msg mqtt.Message) {
mc <- msg.Payload()
}); token.Wait() && token.Error() != nil {
logx.Error("Subscribe : ", respTopic, "error ==> ", token.Error())
return nil, token.Error()
}
defer func() {
_ = c.client.Unsubscribe(respTopic)
}()
if err := c.Publish(fmt.Sprintf("%s/%s", topic, respTopic), data); err != nil {
return nil, err
}
select {
case msg = <-mc:
return msg, nil
case <-time.After(time.Second * 5):
return nil, errors.New("timeout")
}
}
func (c *EClient) randTopic(customStr string) string {
randString := stringx.RandString(32)
return strings.Join([]string{strconv.FormatInt(time.Now().UnixNano(), 10), randString, customStr}, "/")
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/fmpt/fmgo.git
git@gitee.com:fmpt/fmgo.git
fmpt
fmgo
fmgo
v1.2.38

搜索帮助