35 Star 367 Fork 113

联犀/物联网模块

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
onlineCheck.go 3.18 KB
一键复制 编辑 原始数据 按行查看 历史
杨磊 提交于 2024-10-10 22:13 . feat: 更新mod
package onlineCheck
import (
"context"
"encoding/json"
"gitee.com/unitedrhino/share/caches"
"gitee.com/unitedrhino/share/clients"
"gitee.com/unitedrhino/share/def"
"gitee.com/unitedrhino/share/devices"
"gitee.com/unitedrhino/share/utils"
"gitee.com/unitedrhino/things/sdk/service/protocol"
"gitee.com/unitedrhino/things/service/dgsvr/internal/svc"
"gitee.com/unitedrhino/things/service/dmsvr/pb/dm"
"github.com/zeromicro/go-zero/core/logx"
"go.uber.org/atomic"
)
type CheckEvent struct {
svcCtx *svc.ServiceContext
logx.Logger
ctx context.Context
}
func NewOnlineCheckEvent(svcCtx *svc.ServiceContext, ctx context.Context) *CheckEvent {
return &CheckEvent{
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
ctx: ctx,
}
}
var isRun atomic.Bool
func (o *CheckEvent) Check() error {
logx.WithContext(o.ctx).Infof("online_sync")
if !isRun.CompareAndSwap(false, true) {
logx.WithContext(o.ctx).Infof("online_sync other run")
return nil
}
defer isRun.Store(false)
var total int64 = 10000
var limit int64 = 500
var page int64 = 0
devs, err := protocol.GetActivityDevices(o.ctx)
if err != nil {
logx.WithContext(o.ctx).Error(err)
devs = map[devices.Core]struct{}{}
}
var needOnlineDevices []*dm.DeviceOnlineMultiFix
for page*limit < total {
page++
infos, to, err := o.svcCtx.MqttClient.GetOnlineClients(o.ctx, clients.GetOnlineClientsFilter{}, &clients.PageInfo{
Page: page,
Size: limit,
})
if err != nil {
logx.WithContext(o.ctx).Error(err)
return err
}
o.Infof("GetOnlineClients page:%v total:%v", page, total)
total = to
for _, info := range infos {
devStr, err := caches.GetStore().HgetCtx(o.ctx, protocol.DeviceMqttClientID, info.ClientID)
if err != nil {
continue
}
var dev devices.DevConn
err = json.Unmarshal([]byte(devStr), &dev)
if err != nil {
continue
}
c := devices.Core{
ProductID: dev.ProductID,
DeviceName: dev.DeviceName,
}
di, err := o.svcCtx.DeviceCache.GetData(o.ctx, c)
if err != nil {
continue
}
delete(devs, c)
if di.IsOnline != def.True {
needOnlineDevices = append(needOnlineDevices, &dm.DeviceOnlineMultiFix{
Device: &dm.DeviceCore{
ProductID: di.ProductID,
DeviceName: di.DeviceName,
},
IsOnline: def.True,
ConnectAt: info.Timestamp,
})
}
}
}
if len(devs) > 0 { //如果全部过滤完了这里还有在线的,同时在emq上是离线的,那么需要下线该设备
logx.WithContext(o.ctx).Infof("fixOffLine %v", utils.Fmt(devs))
for dev := range devs {
di, err := o.svcCtx.DeviceCache.GetData(o.ctx, dev)
if err != nil || di.DeviceType == def.DeviceTypeSubset {
continue
}
if di.IsOnline == def.True {
needOnlineDevices = append(needOnlineDevices, &dm.DeviceOnlineMultiFix{
Device: &dm.DeviceCore{
ProductID: di.ProductID,
DeviceName: di.DeviceName,
},
IsOnline: def.False,
ConnectAt: 0,
})
}
protocol.DeleteDeviceActivity(o.ctx, dev)
}
}
logx.WithContext(o.ctx).Infof("fixOnline %v", utils.Fmt(needOnlineDevices))
if len(needOnlineDevices) > 0 {
_, err = o.svcCtx.DeviceM.DeviceOnlineMultiFix(o.ctx, &dm.DeviceOnlineMultiFixReq{Devices: needOnlineDevices})
}
return err
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/unitedrhino/things.git
git@gitee.com:unitedrhino/things.git
unitedrhino
things
物联网模块
v1.1.0

搜索帮助

0d507c66 1850385 C8b1a773 1850385