代码拉取完成,页面将自动刷新
package main
import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/dop251/goja"
amqp "github.com/rabbitmq/amqp091-go"
"go.mongodb.org/mongo-driver/bson"
"go.uber.org/zap"
)
func HandlerWaringDelay(messages <-chan amqp.Delivery) {
go func() {
for d := range messages {
HandlerWaringDelayStr(d)
err := d.Ack(false)
if err != nil {
zap.S().Errorf("消息确认异常:%+v", err)
}
}
}()
zap.S().Infof(" [*] Waiting for messages. To exit press CTRL+C")
}
func HandlerWaringDelayStr(d amqp.Delivery) bool {
var data []DataRowList
err := json.Unmarshal(d.Body, &data)
if err != nil {
zap.S().Infof("Failed to unmarshal message: %s body = %s", err, string(d.Body))
return false
}
for i := 0; i < len(data); i++ {
// 解引用指针并访问切片中的元素
row := (data)[i]
handlerWaringDelayOnce(row)
}
return true
}
// handlerWaringDelayOnce 函数用于处理handlerWaringDelayOnce数据
//
// 参数:
// - msg DataRowList: 包含反序列化后的消息的数据行列表
//
// 返回值:
//
// 无
func handlerWaringDelayOnce(msg DataRowList) {
zap.S().Infof("处理 handlerWaringDelayOnce 数据: %+v", msg)
uid := msg.DeviceUid
mapping := getDelayParam(uid, msg.IdentificationCode, msg.DataRows)
zap.S().Infof("getDelayParam 数据: %+v", mapping)
background := context.Background()
var scriptParam = make(map[string][]Tv)
for _, param := range mapping {
key := "signal_delay_warning:" + strconv.Itoa(param.DeviceUid) + ":" + param.IdentificationCode + ":" + strconv.Itoa(param.SignalId)
zap.S().Infof("key = %s", key)
members, _ := globalRedisClient.ZRevRangeWithScores(background, key, 0, -1).Result()
var vs []Tv
for _, member := range members {
float, _ := strconv.ParseFloat(member.Member.(string), 64)
vs = append(vs, Tv{Time: int64(member.Score), Value: float})
}
scriptParam[param.Name] = vs
}
script := getDelayScript(mapping)
zap.S().Infof("getDelayScript结果: %+v", script)
zap.S().Infof("脚本报警参数 = %+v", scriptParam)
db := GMongoClient.Database(globalConfig.MongoConfig.Db)
for _, waring := range script {
zap.S().Infof("key = %+v", waring)
delayScript := runWaringDelayScript(waring.Script, scriptParam)
zap.S().Infof("runWaringDelayScript 执行后数据: %+v", delayScript)
v := bson.M{
"device_uid": uid,
"param": scriptParam,
"script": waring.Script,
"value": delayScript,
"rule_id": waring.ID,
"insert_time": time.Now().Unix(),
"up_time": msg.Time,
}
name := CalcCollectionName(globalConfig.MongoConfig.ScriptWaringCollection, uint(waring.ID))
CheckCollectionAndCreate(globalConfig.MongoConfig.ScriptWaringCollection, name)
collection := db.Collection(name)
one, err := collection.InsertOne(context.Background(), v)
if err != nil {
zap.S().Errorf("插入数据异常: %+v", err)
} else {
zap.S().Infof("插入数据成功: %+v", one)
}
}
}
// runWaringDelayScript 函数执行传入的JavaScript脚本,并将传入的参数map[string][]Tv传递给该脚本执行
//
// 参数:
//
// - script string 要执行的JavaScript脚本
// - param map[string][]Tv 传递给JavaScript脚本的参数
//
// 返回值:
//
// - bool JavaScript脚本执行后返回的结果
func runWaringDelayScript(script string, param map[string][]Tv) bool {
vm := goja.New()
// 执行 JavaScript 脚本
_, err := vm.RunString(script)
if err != nil {
fmt.Println("JS代码有问题!")
return false // 直接返回 false 表示执行失败
}
// 将 JavaScript 中的 main 函数映射到 Go 的 fn 函数
var fn func(map[string][]Tv) bool
err = vm.ExportTo(vm.Get("main"), &fn)
if err != nil {
fmt.Println("Js函数映射到 Go 函数失败!")
return false // 直接返回 false 表示映射失败
}
// 使用 defer 和 recover 来捕获 fn 函数中的 panic
defer func() {
if r := recover(); r != nil {
fmt.Println("在执行 JavaScript 函数时发生 panic:", r)
// 这里可以根据需要进行错误处理,例如记录日志等
}
}()
// 调用映射的函数
a := fn(param)
return a
}
// getDelayScript 从Redis中获取SignalDelayWaring信息列表
//
// 参数:
//
// - mapping []SignalDelayWaringParam SignalDelayWaringParam类型的切片,用于从Redis中查询SignalDelayWaring信息
//
// 返回值:
//
// - []SignalDelayWaring SignalDelayWaring类型的切片,包含了从Redis中查询到的SignalDelayWaring信息
func getDelayScript(mapping []SignalDelayWaringParam) []SignalDelayWaring {
var res []SignalDelayWaring
for _, param := range mapping {
id := param.SignalDelayWaringId
val := globalRedisClient.HGet(context.Background(), "signal_delay_config", strconv.Itoa(id)).Val()
var singw SignalDelayWaring
err := json.Unmarshal([]byte(val), &singw)
if err != nil {
zap.S().Errorf("解析 json 异常 %+v", err)
}
res = append(res, singw)
}
// 使用map来存储已经出现过的ID
idMap := make(map[int]bool)
var uniqueRes []SignalDelayWaring
for _, item := range res {
if _, exists := idMap[item.ID]; !exists {
// 如果ID在map中不存在,则添加到结果数组中
uniqueRes = append(uniqueRes, item)
// 将ID添加到map中,标记为已存在
idMap[item.ID] = true
}
}
return uniqueRes
}
// getDelayParam 函数根据用户UID和DataRow切片从Redis中获取延迟报警参数
//
// 参数:
//
// - uid string 用户UID
// - rows []DataRow DataRow切片
//
// 返回值:
//
// - []SignalDelayWaringParam SignalDelayWaringParam切片,包含符合要求的延迟报警参数
func getDelayParam(uid string, code string, rows []DataRow) []SignalDelayWaringParam {
val := globalRedisClient.LRange(context.Background(), "delay_param", 0, -1).Val()
var mapping []SignalDelayWaringParam
for _, s := range val {
var param SignalDelayWaringParam
err := json.Unmarshal([]byte(s), ¶m)
if err != nil {
continue // 如果反序列化失败,跳过当前信号
}
if strconv.Itoa(param.DeviceUid) == uid && code == param.IdentificationCode && nameInDataRow(param.SignalName, rows) {
mapping = append(mapping, param)
}
}
return mapping
}
// nameInDataRow 函数用于判断给定的名称是否存在于DataRow切片中
//
// 参数:
//
// - name string 需要查找的名称
// - rows []DataRow DataRow切片
//
// 返回值:
//
// - bool 如果找到名称,则返回true;否则返回false
func nameInDataRow(name string, rows []DataRow) bool {
for _, row := range rows {
if row.Name == name {
return true
}
}
return false
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。