代码拉取完成,页面将自动刷新
package mqtt
import (
"bufio"
"context"
"fmt"
"os"
"runtime/debug"
"strings"
"time"
"gitee.com/huobowen/vulcans"
managev1 "gitee.com/huobowen/vulcans/api/devmanage/v1"
"gitee.com/huobowen/vulcans/connection"
"gitee.com/huobowen/vulcans/event"
"gitee.com/huobowen/vulcans/transport"
"gitee.com/huobowen/vulcans/utils"
"gitee.com/huobowen/vulcans/utils/crc"
"gitee.com/huobowen/vulcans/utils/date"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-resty/resty/v2"
"github.com/panjf2000/ants/v2"
"github.com/tidwall/gjson"
)
type ServerOption func(*Server)
// Broker with server broker.
func Broker(broker string) ServerOption {
return func(s *Server) {
s.broker = broker
}
}
// Username with server username.
func Username(username string) ServerOption {
return func(s *Server) {
s.username = username
}
}
// Password with server password.
func Password(password string) ServerOption {
return func(s *Server) {
s.password = password
}
}
// ClientId with server clientId.
func ClientId(clientId string) ServerOption {
return func(s *Server) {
s.clientId = clientId
}
}
// SubTopics with server subTopics.
func SubTopics(subTopics []string) ServerOption {
return func(s *Server) {
s.subTopics = subTopics
}
}
// PubTopic with server pubTopic.
func PubTopic(pubTopic string) ServerOption {
return func(s *Server) {
s.pubTopic = pubTopic
}
}
// Server is an MQTT Consumer wrapper.
type Server struct {
Client mqtt.Client
broker string
username string
password string
clientId string
subTopics []string
pubTopic string
}
var (
pubTopic string
p *ants.PoolWithFunc
)
// MQTT Connection lost callback func.
var l mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
log.Errorf("mqtt connection lost error: %s", err.Error())
}
// determineHandleProcess determine codec and handler of topic .
func determineHandleProcess(topic string, subTopics []string, codecs []vulcanus.MqttCodec, handlers []vulcanus.DataHandler) (c vulcanus.MqttCodec, h vulcanus.DataHandler) {
for i, t := range subTopics {
if strings.Contains(t, "#") {
if strings.HasPrefix(topic, t[0:len(t)-1]) {
return codecs[i], handlers[i]
}
} else if topic == t {
return codecs[i], handlers[i]
}
}
return
}
type taskArgs struct {
ctx *transport.Context
client mqtt.Client
msg mqtt.Message
subTopic []string
codecs []vulcanus.MqttCodec
handlers []vulcanus.DataHandler
}
// messageHandler MQTT message handle func.
func messageHandler(ctx *transport.Context, subTopic []string, codecs []vulcanus.MqttCodec, handlers []vulcanus.DataHandler) mqtt.MessageHandler {
return func(client mqtt.Client, msg mqtt.Message) {
args := &taskArgs{
ctx: ctx,
client: client,
msg: msg,
subTopic: subTopic,
codecs: codecs,
handlers: handlers,
}
p.Invoke(args)
}
}
// NewServer creates a MQTT consumer by options.
func NewServer(ctx *transport.Context, cs []vulcanus.MqttCodec, hs []vulcanus.DataHandler, opts ...ServerOption) *Server {
srv := &Server{}
for _, o := range opts {
o(srv)
}
handler := messageHandler(ctx, srv.subTopics, cs, hs)
options := mqtt.NewClientOptions().
AddBroker(srv.broker).
SetClientID(srv.clientId).
SetUsername(srv.username).
SetPassword(srv.password).
SetDefaultPublishHandler(handler). // 设置消息回调处理函数
SetConnectionLostHandler(l).
SetAutoReconnect(true).
SetCleanSession(false).
SetOnConnectHandler(func(client mqtt.Client) {
log.Infof("[MQTT] Connect [%s] success.", srv.broker)
for _, topic := range srv.subTopics {
// 订阅主题
if token := client.Subscribe(topic, 1, nil); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
log.Infof("[MQTT] subscribe [%s] topic success", topic)
}
})
c := mqtt.NewClient(options)
srv.Client = c
pubTopic = srv.pubTopic
return srv
}
func taskFunc(a interface{}) {
args := a.(*taskArgs)
msg := args.msg
client := args.client
subTopic := args.subTopic
codecs := args.codecs
handlers := args.handlers
ctx := args.ctx
defer func() {
if err := recover(); err != nil {
log.Errorf("[%s] message handle err: [%v].", msg.Topic(), err)
debug.PrintStack()
}
}()
payload := msg.Payload()
codec, handler := determineHandleProcess(msg.Topic(), subTopic, codecs, handlers)
if codec == nil || handler == nil {
log.Errorf("not found codec and handler of the topic, topic: [%s], msg: [%s]", msg.Topic(), msg.Payload())
return
}
if ctx.C.Device.TraceMsg {
log.Infof("topic: [%s], msg: [%s]", msg.Topic(), msg.Payload())
}
deviceMsg, err := codec.Decode(msg.Topic(), payload)
if err != nil {
log.Errorf("decode err: [%s]", err.Error())
return
}
deviceMsg.DeviceType = ctx.C.Device.DeviceType
deviceMsg.DeviceChannel = &connection.DeviceChannel{DeviceCode: deviceMsg.DeviceCode, Channel: client, LastCommTime: time.Now()}
deviceCode := deviceMsg.DeviceCode
// 获取设备配置信息
request := &managev1.GetDeviceConfigRequest{DeviceCode: deviceCode}
reply, err := ctx.D.M1.GetDeviceConfig(context.Background(), request)
if err != nil {
log.Errorf("[%s] get device conf err: %s", deviceCode, err.Error())
return
}
if reply.Code != 200 {
log.Warnf("[%s] not found device conf", deviceCode)
return
}
// 保存设备会话信息
ctx.Sm.Add(deviceMsg.DeviceChannel)
deviceMsg.DeviceConfig = reply.GetData()
// 向下游发送设备原始数据
metadata := &vulcanus.Metadata{
DeviceCode: deviceMsg.DeviceCode,
Timestamp: date.NowMs(),
Type: "设备数据",
Data: deviceMsg.Metadata,
}
ctx.D.Kp.SendMetadata(utils.ToJsonString(metadata))
// 处理设备指令
err = handler.Handle(ctx, deviceMsg)
if err != nil {
log.Errorf("[%s] handle error: [%s]", deviceCode, err.Error())
}
}
// Start start the MQTT consumer.
func (s *Server) Start(ctx context.Context) error {
//mqtt.DEBUG = goLog.New(os.Stdout, "", 0)
// Initial goroutine pool
p, _ = ants.NewPoolWithFunc(100, taskFunc)
if token := s.Client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
//log.Infof("[MQTT] connect broker success")
//for _, topic := range s.subTopics {
// // 订阅主题
// if token := s.Client.Subscribe(topic, 1, nil); token.Wait() && token.Error() != nil {
// fmt.Println(token.Error())
// os.Exit(1)
// }
// log.Infof("[MQTT] subscribe [%s] topic success", topic)
//}
return nil
}
// Stop stop the MQTT consumer.
func (s *Server) Stop(ctx context.Context) error {
log.Info("[MQTT] server stopping")
s.Client.Disconnect(5)
p.Release()
return nil
}
type MqttReplyContent struct {
Data interface{} `json:"data"`
}
type ReplyContent struct {
Id string `mapstructure:"id" json:"id"` // 设备码
Time string `mapstructure:"time" json:"time"` // 回复时间
Type int `mapstructure:"type" json:"type"` // 类型
SynInf int64 `mapstructure:"synInf" json:"synInf"` // 同步码
Err int `mapstructure:"err" json:"err"` // 没有错误 0 数据错误 1
Connect int `mapstructure:"connect" json:"connect"` // 是否继续通信 0 结束通信 1继续通信
}
func NewReplyContent(id string, _type int, synInf int64, err int, connect int) *ReplyContent {
content := &ReplyContent{
Id: id,
Time: date.Now(),
Type: _type,
SynInf: synInf,
Err: err,
Connect: connect,
}
return content
}
func NewReplyContentSimple(id string, _type int) *ReplyContent {
content := &ReplyContent{
Id: id,
Time: date.Now(),
Type: _type,
SynInf: 0,
Err: 0,
Connect: 0,
}
return content
}
type UpgradeReplyContent struct {
ReplyContent
Update int `json:"update"` // 升级标志 1:有升级任务 2:升级中 3:升级完成
Path string `json:"path"`
Pklen int `json:"pklen"`
Crc uint16 `json:"crc"`
Uid uint64 `json:"uid"`
}
type SettingReply struct {
Id int64 `json:"id"`
Reply string `json:"reply"`
}
// CheckIsUpgradePatch 检查是否有升级任务
func CheckIsUpgradePatch(ctx *transport.Context, deviceCode string) *managev1.UpgradePatch {
res, err := ctx.D.M1.GetDeviceUpgradeInfo(context.Background(), &managev1.GetUpgradeDeviceRequest{DeviceCode: deviceCode})
if err != nil {
log.Errorf("[%s] get upgrade patch err: [%s]", deviceCode, err.Error())
return nil
}
if res.Code == 200 {
return res.Data
}
return nil
}
// CheckIsSetting 检查是否有需要下发的配置
func CheckIsSetting(ctx *transport.Context, deviceCode string) map[string]interface{} {
resp, err := ctx.D.M1.DeviceSetting(context.Background(), &managev1.GetDeviceSettingRequest{DeviceCode: deviceCode})
if err != nil {
log.Errorf("[%s] get setting err: [%s]", deviceCode, err.Error())
return nil
}
if resp.Code == 200 {
return utils.JsonToMap(resp.Data)
}
return nil
}
// SendUpgradePatch 发送设备升级包
func SendUpgradePatch(ctx *transport.Context, msg *vulcanus.DeviceMsg, upgradePatch *managev1.UpgradePatch) {
replyContent := NewReplyContent(upgradePatch.DeviceCode, 301, msg.SynInf, 0, 1)
client := resty.New()
resp, err := client.R().Get(upgradePatch.Path)
if err == nil {
body := resp.Body()
content := &UpgradeReplyContent{
ReplyContent: *replyContent,
Update: 1,
Path: upgradePatch.Path,
Pklen: len(body),
Crc: crc.CheckSum(body),
Uid: upgradePatch.UpgradeDeviceId,
}
ReplyMqttDevice(ctx, msg.DeviceConfig, true, content)
// _, err := ctx.D.M1.PutDeviceUpgradeInfo(context.Background(), &managev1.PutUpgradeDeviceRequest{Id: upgradePatch.UpgradeDeviceId, State: "2"})
// if err != nil {
// log.Errorf("[%s] update upgrade device state err: [%s]", msg.DeviceCode, err.Error())
// }
}
}
// SendSettingInfo 发送配置数据包
// param: settingInfo 包含 type 和 synInf 字段
func SendSettingInfo(ctx *transport.Context, deviceConfig *managev1.DeviceConfig, settingInfo map[string]interface{}) {
replyContent := NewReplyContent(deviceConfig.DeviceCode, 2, 0, 0, 0)
settingInfo["id"] = replyContent.Id
settingInfo["time"] = replyContent.Time
settingInfo["connect"] = replyContent.Connect
ReplyMqttDevice(ctx, deviceConfig, true, settingInfo)
}
// ReplyMqttDevice 回复MQTT连接的设备
func ReplyMqttDevice(ctx *transport.Context, deviceConfig *managev1.DeviceConfig, needReply bool, c interface{}) {
deviceChannel := ctx.Sm.Get(deviceConfig.DeviceCode)
if deviceChannel != nil {
content := utils.ToJsonString(MqttReplyContent{Data: c})
encodedContent := utils.ToJsonStringGB2312(MqttReplyContent{Data: c})
mqttReply := vulcanus.MqttReplyEvent{
Topic: pubTopic + deviceConfig.DeviceCode,
Content: content,
EncodedContent: encodedContent,
DeviceConfig: deviceConfig,
DeviceChannel: deviceChannel,
NeedReply: needReply,
}
event.PublishMqttReplyEvent(mqttReply)
}
}
// ResolveSetting 解析设备状态回复
func ResolveSetting(ctx *transport.Context, msg *vulcanus.DeviceMsg, deviceData *gjson.Result) {
synInf := deviceData.Get("synInf").Int()
reply := &SettingReply{
Id: synInf,
Reply: msg.Metadata,
}
ctx.D.Kp.SendDeviceSetting(utils.ToJsonString(reply))
}
// ResolveUpgradeReply 解析设备升级回复
func ResolveUpgradeReply(ctx *transport.Context, msg *vulcanus.DeviceMsg, deviceData *gjson.Result) {
update := deviceData.Get("update").Int()
if update == 3 { // 升级完成
uid := deviceData.Get("uid").Uint()
_, err := ctx.D.M1.PutDeviceUpgradeInfo(context.Background(), &managev1.PutUpgradeDeviceRequest{Id: uid, State: "1"})
if err != nil {
log.Errorf("[%s] update upgrade device state err: [%s]", msg.DeviceCode, err.Error())
}
}
if update == 1 { //升级中
uid := deviceData.Get("uid").Uint()
_, err := ctx.D.M1.PutDeviceUpgradeInfo(context.Background(), &managev1.PutUpgradeDeviceRequest{Id: uid, State: "2"})
if err != nil {
log.Errorf("[%s] update upgrade device state err: [%s]", msg.DeviceCode, err.Error())
}
}
}
// ResolveState 解析设备状态数据
func ResolveState(deviceConfig *managev1.DeviceConfig, deviceData *gjson.Result, time string) {
stateConfigList := deviceConfig.StateConfigList
//此处存在问题需要修改
if len(stateConfigList) > 0 {
stateData := make([]string, 0, len(stateConfigList))
for _, stateConfig := range stateConfigList {
var value string
if stateConfig.Dtype == "array" {
array := deviceData.Get(stateConfig.FieldName).Array()
values := make([]string, 0, len(array))
for _, item := range array {
values = append(values, item.String())
}
value = strings.Join(values, ",")
} else {
value = deviceData.Get(stateConfig.FieldName).String()
}
stateData = append(stateData, value)
}
stateDataEvent := vulcanus.StateDataEvent{Time: time, Data: stateData, DeviceConfig: deviceConfig}
event.PublishStateDataEvent(stateDataEvent)
}
}
// ResolvePicData 解析图片数据
func ResolvePicData(ctx *transport.Context, msg *vulcanus.DeviceMsg, deviceData *gjson.Result, picData []byte) bool {
deviceCode := msg.DeviceCode
actime := deviceData.Get("actim").String()
key := "DEVICE:IMAGE:RECEIVED:" + deviceCode + ":" + actime
exists := ctx.D.Rdb.HExists(context.Background(), key, deviceData.Get("curnum").String())
if exists.Val() {
return true
}
checkSum := crc.CheckSum(picData)
_crc := deviceData.Get("crc").Int()
if checkSum != uint16(_crc) {
return false
}
filePath := "images/" + deviceCode + "-" + actime + ".jpg"
file, err := os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0660)
if err != nil {
log.Errorf("[%s] [%s] open err: [%s]", deviceCode, filePath, err.Error())
return false
}
defer file.Close()
write := bufio.NewWriter(file)
_, err = write.Write(picData)
if err != nil {
log.Errorf("[%s] file write err: [%s]", deviceCode, err.Error())
return false
}
write.Flush()
// 当前包号
curnum := deviceData.Get("curnum").Int()
// 包总数量
pknum := deviceData.Get("pknum").Int()
ctx.D.Rdb.HSetNX(context.Background(), key, deviceData.Get("curnum").String(), curnum)
ctx.D.Rdb.Expire(context.Background(), key, 4*time.Hour)
if curnum == pknum {
ctx.D.Rdb.Del(context.Background(), key)
// 图片已经传输完毕
data, err := os.ReadFile(filePath)
if err != nil {
log.Errorf("[%s] read file data err: [%s]", deviceCode, err.Error())
return false
}
checkSum = crc.CheckSum(data)
acrc := deviceData.Get("allcrc").Int()
if checkSum == uint16(acrc) {
pictureData := &vulcanus.PictureData{
DeviceCode: deviceCode,
DeviceType: msg.DeviceType,
Time: actime,
Data: data,
}
ctx.D.Kp.SendPictureData(utils.ToJsonString(pictureData))
os.Remove(filePath)
return true
} else {
os.Remove(filePath)
return false
}
}
return true
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。