代码拉取完成,页面将自动刷新
package kafka
import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"gitee.com/info-superbahn-ict/superbahn/internal/supbsponge/models"
"gitee.com/info-superbahn-ict/superbahn/internal/supbsponge/service/cache_service"
"gitee.com/info-superbahn-ict/superbahn/internal/supbsponge/service/resource_service"
"gitee.com/info-superbahn-ict/superbahn/pkg/supbnervous"
kafkaNervous "gitee.com/info-superbahn-ict/superbahn/pkg/supbnervous/kafka-nervous"
"gitee.com/info-superbahn-ict/superbahn/pkg/supbsponge/gredis"
"gitee.com/info-superbahn-ict/superbahn/pkg/supbsponge/logging"
"gitee.com/info-superbahn-ict/superbahn/pkg/supbsponge/util"
"gitee.com/info-superbahn-ict/superbahn/sync/define"
"gitee.com/info-superbahn-ict/superbahn/sync/spongeregister"
"math/rand"
"reflect"
"strings"
"time"
)
const (
INIT_SPONGE_NERVOUS = "init_sponge_nervous"
INIT_SPONGE_OCTOPUS = "sponge"
tryTime = 10
tryInterval = 500
)
var (
clientBus supbnervous.Controller
octopusBus supbnervous.Controller
agentNervous supbnervous.Controller
syncPushFlag = define.SPONGE_SYNC_PUSH_OFF
)
var err error
func ListenOctopus() {
fmt.Println("rpc start")
octopusBus, err = kafkaNervous.NewNervous(context.Background(), "../../config/nervous_config.json", define.INIT_SPONGE_OCTOPUS)
if err != nil {
fmt.Printf("spongeregister new error: %v\n", err)
logging.Fatal("kafkaNervous new err: %v", err)
}
octopusBus.RPCRegister(define.LIST_INFOS, getDeviceInfos)
octopusBus.RPCRegister(define.CHECK_PROPERTY, checkProperty)
octopusBus.RPCRegister(define.SPONGE_SYNC_PUSH, syncPush)
octopusBus.RPCRegister(define.LIST_ALIVE_AGENTS, listTagGroupInfos)
clientBus, err = kafkaNervous.NewNervous(context.Background(), "../../config/nervous_config.json", define.INIT_SPONGE_NERVOUS)
if err != nil {
fmt.Printf("spongeregister new error: %v\n", err)
logging.Fatal("kafkaNervous new err: %v", err)
}
clientBus.RPCRegister(define.REGISTER_DEVICE, registerDevice)
clientBus.RPCRegister(define.DELETE_RESOURCE, deleteResource)
clientBus.RPCRegister(define.DELETE_RESOURCES, deleteResources)
clientBus.RPCRegister(define.RECONNECT, reconnect)
clientBus.RPCRegister(define.RENEWAL_DEVICE, renewalDevice)
clientBus.RPCRegister(define.UPDATE_STATUS, updateStatus)
//clientBus.RPCRegister(define.LIST_ALIVE_AGENTS, listTagGroupInfos)
clientBus.RPCRegister(define.UPDATE_TAG, updateTag)
}
func Close() error {
err = clientBus.Close()
if err != nil {
return err
}
err = octopusBus.Close()
if err != nil {
return err
}
err = agentNervous.Close()
if err != nil {
return err
}
return nil
}
func SyncAgent() {
agentNervous, err = kafkaNervous.NewNervous(context.Background(), "../../config/nervous_config.json", INIT_SPONGE_NERVOUS)
if err != nil {
logging.Fatal("kafkaNervous new err: %v", err)
}
}
func syncPush(args ...interface{}) (interface{}, error) {
if len(args) < 1 {
return "", fmt.Errorf("args is nil")
}
fmt.Printf("syncPush Start")
fmt.Println(args[0].(string))
flag := args[0].(string)
fmt.Printf("the flag is: %s\n", flag)
if flag != define.SPONGE_SYNC_PUSH_ON && flag != define.SPONGE_SYNC_PUSH_OFF {
return "", fmt.Errorf("the param is error: %v", flag)
}
syncPushFlag = flag
return "", nil
}
func checkProperty(args ...interface{}) (interface{}, error) {
if len(args) < 1 {
return "", fmt.Errorf("args is nil")
}
fmt.Printf("checkProperty Start\n")
fmt.Println(args[0].(string))
var guIds []string
guIds = args[0].([]string)
var resources []*models.Resource
for _, guId := range guIds {
res := &resource_service.Resource{
GuId: guId,
}
resource, err := res.CheckProperty()
if err != nil {
return nil, errors.New("ERROR_EXIST_RESOURCE_FAIL")
}
if resource.IsMonitor == 1 {
resources = append(resources, resource)
}
}
return resources, nil
}
func getDeviceInfos(args ...interface{}) (interface{}, error) {
if len(args) < 1 {
return "", fmt.Errorf("args is nil")
}
resourceService := &resource_service.Resource{
Status: -1,
OType: -1,
IsControl: -1,
IsMonitor: -1,
}
fmt.Printf("getDeviceInfos Start\n")
fmt.Println(args[0].(string))
infos, err := resourceService.GetDeviceInfos()
if err != nil {
return nil, err
}
listInfos := spongeregister.ListResources{
Resources: make(map[string]spongeregister.Resource),
}
for _, resource := range infos {
var res = spongeregister.Resource{
GuId: resource.GuId,
Description: resource.Description,
Status: resource.Status,
OType: resource.OType,
IsControl: resource.IsControl,
IsMonitor: resource.IsMonitor,
//AutoRemove: resource.AutoRemove,
Area: resource.Area,
PreNode: resource.PreNode,
SubNode: resource.SubNode,
}
listInfos.Resources[resource.GuId] = res
}
bts, err := json.Marshal(listInfos)
if err != nil {
return nil, err
}
return string(bts), nil
}
/**
资源注册(rpc方式)
*/
func registerDevice(args ...interface{}) (interface{}, error) {
if len(args) < 1 {
return "", fmt.Errorf("args is nil")
}
fmt.Printf("start spongeregister device\n")
fmt.Println(args[0].(string))
resourceService := convertToResource(args[0])
md5Str := (util.EncodeMD5(resourceService.Description))[0:6]
createTime := time.Now().Format("2006-01-02 15:04:05")
updateTime := time.Now().Format("2006-01-02 15:04:05")
guId := ""
//TODO
//if resourceService.OType == 1 {
di := util.DeviceInfo{
ManufacturerId: "01",
ProductId: md5Str,
ResourceType: 0,
ResourceId: 0,
}
guId = util.GenerateDeviceInfo(di)
//}
utilization := rand.Intn(100)
resourceService.GuId = guId
resourceService.CreateTime = createTime
resourceService.UpdateTime = updateTime
resourceService.Utilization = utilization
exists, err := resourceService.ExistByGuId()
if err != nil {
fmt.Printf("error: %v\n", err)
return nil, errors.New("ERROR_EXIST_RESOURCE_FAIL")
}
if exists {
fmt.Printf("exist guid: %v\n", guId)
return guId, nil
}
if err := resourceService.Add(); err != nil {
fmt.Printf("error: %v\n", err)
return nil, errors.New("ERROR_ADD_RESOURCE_FAIL")
}
// 如果依赖节点不为空,在需要修改依赖节点下的子节点信息
if resourceService.PreNode != "" {
preResourceService := resource_service.Resource{
GuId: resourceService.PreNode,
}
// 首先判断依赖节点是否存在
exists, _ = preResourceService.ExistByGuId()
if exists {
// 查询出依赖节点下的子节点信息
preResouce, err := preResourceService.GetDeviceInfo()
if err != nil {
return nil, errors.New("ERROR_ADD_RESOURCE_FAIL")
}
var subNodes []string
if strings.Contains(preResouce.SubNode, "|") {
subNodes = strings.Split(preResouce.SubNode, "|")
} else if preResouce.SubNode != "" {
subNodes = append(subNodes, preResouce.SubNode)
}
subNodes = append(subNodes, guId)
preResourceService := resource_service.Resource{
GuId: resourceService.PreNode,
SubNode: strings.Join(subNodes, "|"),
}
err = preResourceService.UpdateSubNode()
if err != nil {
return nil, errors.New("ERROR_ADD_RESOURCE_FAIL")
}
}
}
fmt.Printf("new guid: %v\n", guId)
return guId, nil
}
func deleteResource(args ...interface{}) (interface{}, error) {
if len(args) < 1 {
return "", fmt.Errorf("args is nil")
}
fmt.Printf("deleteResource Start\n")
fmt.Println(args[0].(string))
resourceService := convertToResource(args[0])
tag, err := resourceService.GetResourceTagByKey("remove")
if err != nil {
fmt.Printf("Guid:%s, GetResourceTagByKey error: %v\n", resourceService.GuId, err)
logging.Info("Guid:%s, GetResourceTagByKey error: %v\n", resourceService.GuId, err)
}
if tag.TagValue == "auto" || tag.TagValue == "" {
return "", resourceService.DeleteResource()
}
resourceService.Status = spongeregister.StatusTerminated
if err := resourceService.UpdateStatus(); err != nil {
fmt.Printf("resourceService.UpdateStatus error: %v\n", err)
return "", err
}
return "", nil
}
func deleteResources(args ...interface{}) (interface{}, error) {
if len(args) < 1 {
return "", fmt.Errorf("args is nil")
}
fmt.Printf("deleteResources Start\n")
fmt.Println(args[0].(string))
resourceService := convertToResource(args[0])
return "", resourceService.DeleteResources()
}
func reconnect(args ...interface{}) (interface{}, error) {
if len(args) < 1 {
return "", fmt.Errorf("args is nil")
}
fmt.Printf("reconnect Start\n")
fmt.Println(args[0].(string))
resourceService := convertToResource(args[0])
resourceService.Status = 1
return "", resourceService.UpdateStatus()
}
func renewalDevice(args ...interface{}) (interface{}, error) {
if len(args) < 1 {
return "", fmt.Errorf("args is nil")
}
fmt.Printf("renewalDevice Start\n")
fmt.Println(args[0].(string))
resourceService := convertToResource(args[0])
return "", updateRedis(resourceService)
}
func isGuid(guid string) bool {
for _, v := range guid {
if ('0' <= v && v <= '9') || ('a' <= v && v <= 'f') {
continue
} else {
return false
}
}
return true
}
func updateStatus(args ...interface{}) (interface{}, error) {
if len(args) < 1 {
return "", fmt.Errorf("args is nil")
}
fmt.Printf("updateStatus Start\n")
fmt.Println(args[0].(string))
syncInfo := convertToSyncInfo(args[0])
var cacheSyncInfo spongeregister.SyncInfo
for guid, status := range syncInfo.Guids {
fmt.Printf("Guid: %s, status: %d\n", guid, status)
if !isGuid(guid) {
continue
}
res := &resource_service.Resource{
GuId: guid,
Status: status,
CpuSet: syncInfo.Metrics[guid].CpuSet,
CpuRatio: syncInfo.Metrics[guid].CpuRatio,
}
cache := cache_service.Resource{
GuId: guid,
Status: status,
}
key := cache.GetSyncInfoKey()
//redis中存在key
if gredis.Exists(key) {
data, err := gredis.Get(key)
if err != nil {
logging.Error(err)
} else {
json.Unmarshal(data, &cacheSyncInfo)
}
//比较redis中的对象和传过来的对象,值相同,更新redis
if reflect.DeepEqual(syncInfo, cacheSyncInfo) {
gredis.Set(key, syncInfo, 3600)
} else { //值不同,更新redis及数据库
gredis.Set(key, syncInfo, 3600)
err := res.UpdateStatus()
if err != nil {
fmt.Printf("Update status error: %v\n", err)
return nil, err
}
err = res.UpdateMetricInfo()
if err != nil {
fmt.Printf("UpdateMetricInfo failed: %v\n", err)
return nil, err
}
}
} else { //redis中不存在key
gredis.Set(key, syncInfo, 3600)
exists, err := res.CheckMetricExist()
if err != nil {
fmt.Printf("error: %v\n", err)
return nil, errors.New("ERROR_EXIST_SYNCINFO_FAIL")
} //如果metric_info存在guid,更新
if exists {
err := res.UpdateMetricInfo()
if err != nil {
fmt.Printf("UpdateMetricInfo failed: %v\n", err)
return nil, err
}
} else { //如果metric_info不存在guid,插入
err := res.InsertMetricInfo()
if err != nil {
fmt.Printf("InsertMetricInfo failed: %v\n", err)
return nil, err
}
}
}
info, err := res.GetResourceInfo()
if err != nil {
fmt.Printf("GetDeviceInfo failed error: %v\n", err)
return "", err
}
fmt.Printf("Info SlaveStatus %v\n", info.Status)
if info.Status != status && info.GuId != "" {
info.Status = status
err := res.UpdateStatus()
if err != nil {
fmt.Printf("Guid:%s, UpdateStatus error: %v\n", info.GuId, err)
return nil, err
}
if syncPushFlag == define.SPONGE_SYNC_PUSH_ON {
resourceSync := convertToResourceSync(info)
bts, err := json.Marshal(resourceSync)
if err != nil {
fmt.Printf("json.Marshal error: %v\n", err)
}
if _, err := agentNervous.RPCCallCustom(define.RPCCommonGuidOfManager, tryTime, tryInterval, define.RPCFunctionNameOfManagerReceiveResourceSyncMessageFromSponge, string(bts)); err != nil {
fmt.Printf("rpc call error:%v\n", err)
}
}
}
}
return "", nil
}
func listTagGroupInfos(args ...interface{}) (interface{}, error) {
fmt.Printf("listTagGroupInfos Start\n")
//fmt.Println(args[0].(string))
//tagGroupSearchInfo := convertToTagGroupSearchInfo(args[0])
res := &resource_service.Resource{
OType: 1,
Status: 2,
}
tagKey := "system"
tagValue := "agent"
resp := &spongeregister.ListResources{
Resources: make(map[string]spongeregister.Resource),
}
cache := cache_service.Resource{
OType: res.OType,
Status: res.Status,
TagKey: tagKey,
TagValue: tagValue,
}
key := cache.GetAliveAgentKey()
if gredis.Exists(key) {
data, err := gredis.Get(key)
if err != nil {
logging.Error(err)
} else {
json.Unmarshal(data, &resp)
}
} else {
resInfor, _ := res.GetTagGroupInfos(tagKey, tagValue)
for _, infor := range resInfor {
temp := &resource_service.Resource{
GuId: infor.GuId,
}
tagInfos, _ := temp.GetTagInfosByGuId()
tags := make([]spongeregister.Tag, 0, len(tagInfos))
for _, tag := range tagInfos {
t := spongeregister.Tag{
Key: tag.TagKey,
Type: tag.TagType,
Value: tag.TagValue,
}
tags = append(tags, t)
}
resp.Resources[infor.GuId] = spongeregister.Resource{
GuId: infor.GuId,
Description: infor.Description,
Status: infor.Status,
Area: infor.Area,
PreNode: infor.PreNode,
SubNode: infor.SubNode,
OType: infor.OType,
IsControl: infor.IsControl,
IsMonitor: infor.IsMonitor,
Tags: tags,
}
}
err = gredis.Set(key, resp, 120)
if err != nil {
logging.Error(err)
}
}
bts, err := json.Marshal(resp)
if err != nil {
return nil, err
}
fmt.Printf("list agent alive: %s\n", string(bts))
return string(bts), nil
//data, err := res.GetTagGroupInfos(tagKey, tagValue)
//
//for _, info := range data {
// fmt.Println(info)
// syncInfo := &spongeregister.SyncInfo{}
// syncMap := make(map[string]int)
// syncMap[info.GuId] = info.Status
// syncInfo.Guids = syncMap
// syncMetricMap := make(map[string]spongeregister.SyncMetric)
// syncMetric := spongeregister.SyncMetric{}
// syncMetric.CpuSet = info.CpuSet
// syncMetric.CpuRatio = info.CpuRatio
// syncMetricMap[info.GuId] = syncMetric
// syncInfo.Metrics = syncMetricMap
// resources = append(resources, syncInfo)
//}
//if err != nil {
// return nil, err
//}
//bts, err := json.Marshal(resources)
//if err != nil {
// return nil, err
//}
//return string(bts), nil
}
func updateTag(args ...interface{}) (interface{}, error) {
fmt.Printf("updateTag Start\n")
fmt.Println(args[0].(string))
syncTagInfo := convertToSyncTagInfo(args[0])
guId := syncTagInfo.GuId
tag := syncTagInfo.Tag
res := &resource_service.Resource{
GuId: guId,
}
//查询此guId是否有tag_info
tags, err := res.GetTagInfosByGuId()
if err != nil {
return "", err
}
resource, err := res.GetDeviceInfo()
//没有tag_info则直接插入数据
if len(tags) == 0 {
if err != nil {
return "", err
}
err = res.CreateTagInfo(tag, resource)
if err != nil {
return "", err
}
} else { //有tag_info则查看tag的key
for _, tmpTag := range tags {
if tmpTag.TagKey == tag.Key { //key已有则更新值
err = res.UpdateTagInfo(tag, resource)
if err != nil {
return "", err
}
} else { //key还没则插入数据
err = res.CreateTagInfo(tag, resource)
if err != nil {
return "", err
}
}
}
}
return "", nil
}
func convertToResourceSync(resource *models.Resource) *spongeregister.ResourceSync {
var op string
switch resource.Status {
case spongeregister.StatusPending:
case spongeregister.StatusReady:
op = spongeregister.OpADD
case spongeregister.StatusPause:
op = spongeregister.OpUPD
case spongeregister.StatusError:
case spongeregister.StatusTerminated:
op = spongeregister.OpDEL
}
return &spongeregister.ResourceSync{
Op: op,
GuId: resource.GuId,
Description: resource.Description,
Status: resource.Status,
OType: resource.OType,
IsControl: resource.IsControl,
IsMonitor: resource.IsMonitor,
//AutoRemove: resource.AutoRemove,
Area: resource.Area,
PreNode: resource.PreNode,
SubNode: resource.SubNode,
}
}
func convertToSyncInfo(args interface{}) *spongeregister.SyncInfo {
data := []byte(args.(string))
info := &spongeregister.SyncInfo{}
json.Unmarshal(data, info)
return info
}
func convertToSyncTagInfo(args interface{}) *spongeregister.SyncTagInfo {
data := []byte(args.(string))
info := &spongeregister.SyncTagInfo{}
json.Unmarshal(data, info)
return info
}
func convertToTagGroupSearchInfo(args interface{}) *spongeregister.TagGroupSearchInfo {
data := []byte(args.(string))
info := &spongeregister.TagGroupSearchInfo{}
json.Unmarshal(data, info)
return info
}
func convertToResource(args interface{}) *resource_service.Resource {
data := []byte(args.(string))
resource := &spongeregister.Resource{}
json.Unmarshal(data, resource)
tags := make([]resource_service.Tag, 0)
if len(resource.Tags) > 0 {
for _, tempTag := range resource.Tags {
var tag resource_service.Tag
tag.Key = tempTag.Key
tag.Type = tempTag.Type
tag.Value = tempTag.Value.(string)
tags = append(tags, tag)
}
}
return &resource_service.Resource{
GuId: resource.GuId,
Description: resource.Description,
Status: resource.Status,
OType: resource.OType,
IsControl: resource.IsControl,
IsMonitor: resource.IsMonitor,
//AutoRemove: resource.AutoRemove,
Area: resource.Area,
PreNode: resource.PreNode,
SubNode: resource.SubNode,
Tags: tags,
}
}
func updateRedis(resourceService *resource_service.Resource) error {
guId := resourceService.GuId
status := resourceService.Status
res := &resource_service.Resource{
GuId: guId,
Status: status,
}
if !gredis.Exists(guId) {
gredis.Set(guId, status, 3600)
return res.UpdateStatus()
} else {
tmp, err := gredis.Get(guId)
if err != nil {
fmt.Printf("json.Marshal error: %v\n", err)
}
tmpStatus := int(binary.BigEndian.Uint64(tmp))
if status != tmpStatus {
gredis.Set(guId, status, 3600)
return res.UpdateStatus()
} else {
gredis.Set(guId, tmpStatus, 3600)
return nil
}
}
}
func syncStatus(guId string, status int) {
res := &resource_service.Resource{
GuId: guId,
Status: status,
}
info, err := res.GetDeviceInfo()
if err != nil {
fmt.Printf("get deviceInfo error: %v\n", err)
}
resourceSync := convertToResourceSync(info)
bts, err := json.Marshal(resourceSync)
if err != nil {
fmt.Printf("json.Marshal error: %v\n", err)
}
if _, err := agentNervous.RPCCallCustom(define.RPCCommonGuidOfManager, tryTime, tryInterval, define.RPCFunctionNameOfManagerReceiveResourceSyncMessageFromSponge, string(bts)); err != nil {
fmt.Printf("rpc call error:%v\n", err)
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。