1 Star 0 Fork 0

张旭 / surguard-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
zookeeper.go 7.02 KB
一键复制 编辑 原始数据 按行查看 历史
张旭 提交于 2023-12-21 09:48 . add sm2 algorithm support
package device
import (
"encoding/hex"
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/go-zookeeper/zk"
)
type ZkDiscovery struct {
conn *zk.Conn
device *Device
children []string
childrenData map[string]string
root string
exitCh chan chan bool
sync.RWMutex
}
func (zkCli *ZkDiscovery) GetData(pk NoisePublicKey) (string, error) {
path := fmt.Sprintf("%s/%s", zkCli.root, hex.EncodeToString(pk[:]))
data, _, err := zkCli.conn.Get(path)
if err != nil {
zkCli.device.log.Errorf("ZooKeeper: failed to get data of path %s: %s", path, err)
return "", err
}
return string(data), nil
}
func (zkCli *ZkDiscovery) ExistPeer(pk NoisePublicKey) (bool, error) {
pathExist, _, err := zkCli.conn.Exists(fmt.Sprintf("%s/%s", zkCli.root, hex.EncodeToString(pk[:])))
if err != nil {
return false, err
}
return pathExist, nil
}
func (zkCli *ZkDiscovery) AddPeer(pk NoisePublicKey, endPoints string) error {
var flags int32 = zk.FlagEphemeral
// 获取访问控制权限
acls := zk.WorldACL(zk.PermAll)
path, err := zkCli.conn.Create(fmt.Sprintf("%s/%s", zkCli.root, hex.EncodeToString(pk[:])), []byte(endPoints), flags, acls)
if err != nil {
zkCli.device.log.Errorf("ZooKeeper: create peer connection failed: %s", err)
return err
}
zkCli.device.log.Verbosef("ZooKeeper: create peer connection with path: %s->%s", path, endPoints)
return nil
}
func (zkCli *ZkDiscovery) RemovePeer(pk NoisePublicKey) error {
_, sate, _ := zkCli.conn.Get(fmt.Sprintf("%s/%s", zkCli.root, hex.EncodeToString(pk[:])))
err := zkCli.conn.Delete(fmt.Sprintf("%s/%s", zkCli.root, hex.EncodeToString(pk[:])), sate.Version)
if err != nil {
zkCli.device.log.Errorf("ZooKeeper: remove peer connection failed: %s", err)
return err
}
return nil
}
func CreateZkDiscovery(urls string, device *Device, root string) (*ZkDiscovery, error) {
zkServers := strings.Split(urls, ",")
zkPath := root // 监控的ZooKeeper路径
// 连接到ZooKeeper服务器
conn, _, err := zk.Connect(zkServers, time.Second*5)
if err != nil {
return nil, err
}
zkCli := &ZkDiscovery{conn: conn, device: device, root: zkPath, childrenData: make(map[string]string), exitCh: make(chan chan bool)}
//创建/surguard路径
pathExist, _, err := zkCli.conn.Exists(zkCli.root)
if err != nil {
zkCli.device.log.Errorf("ZooKeeper: check existence of root path failed: %s", err)
return nil, err
}
if !pathExist {
var flags int32 = 0
// 获取访问控制权限
acls := zk.WorldACL(zk.PermAll)
_, err := zkCli.conn.Create(zkCli.root, []byte{}, flags, acls)
if err != nil {
pathExist, _, err2 := zkCli.conn.Exists(zkCli.root)
if err2 != nil {
return nil, err2
}
if !pathExist {
return nil, err
}
}
}
// 创建Watcher
paths, _, childCh, err := zkCli.conn.ChildrenW(zkCli.root)
if err != nil {
zkCli.device.log.Errorf("ZooKeeper: get and check existence of child path failed: %s", err)
return nil, err
}
// for i, _ := range paths {
// paths[i] = strings.Split(paths[i], "-")[1]
// paths[i] = paths[i][0:66]
// }
initialMap := make(map[string]string)
for _, path := range paths {
data, _, err := zkCli.conn.Get(fmt.Sprintf("%s/%s", zkCli.root, path))
if err != nil {
zkCli.device.log.Errorf("ZooKeeper: failed to get data of path %s: %s", path, err)
continue
}
pkb, err := hex.DecodeString(path)
if err != nil {
zkCli.device.log.Errorf("ZooKeeper: failed to decode path %s: %s", path, err)
continue
}
if device.staticIdentity.publicKey.Equals(NoisePublicKey(pkb)) {
continue
}
zkCli.device.AddPeer(NoisePublicKey(pkb), string(data))
zkCli.children = append(zkCli.children, path)
zkCli.childrenData[path] = string(data)
initialMap[path] = string(data)
}
zkCli.device.zkChangedCallback(initialMap, nil)
// 启动监控子节点变化的协程
go func() {
var ch chan bool
OUTER:
for {
select {
case ch = <-zkCli.exitCh:
break OUTER
case event := <-childCh:
//zkCli.device.log.Verbosef("Event type: %s", event.Type)
if event.Type == zk.EventNodeChildrenChanged {
// 获取更新后的子节点列表
newChildren, _, newChildCh, err := conn.ChildrenW(zkCli.root)
if err != nil {
zkCli.device.log.Errorf("ZooKeeper: error when watch children on path %s: %s", zkCli.root, err)
}
// for i, _ := range newChildren {
// newChildren[i] = strings.Split(newChildren[i], "-")[1]
// newChildren[i] = newChildren[i][0:66]
// }
sort.Strings(newChildren) // 按字典顺序排序
// 查找新增的节点
addedNodes := diff(newChildren, zkCli.children)
//zkCli.device.log.Verbosef("ZooKeeper: Added nodes: %s", strings.Join(addedNodes, ", "))
// 查找删除的节点
deletedNodes := diff(zkCli.children, newChildren)
//zkCli.device.log.Verbosef("ZooKeeper: Deleted nodes: %s", strings.Join(deletedNodes, ", "))
addedMap := make(map[string]string)
deletedMap := make(map[string]string)
for _, delPath := range deletedNodes {
delPK, err := hex.DecodeString(delPath)
if err != nil {
zkCli.device.log.Errorf("ZooKeeper: decode public key from delete path %s failed: %s", delPath, err)
continue
}
if device.staticIdentity.publicKey.Equals(NoisePublicKey(delPK)) {
zkCli.device.log.Errorf("ZooKeeper: delete current node: %s", delPath)
continue
}
zkCli.device.DeletePeer(NoisePublicKey(delPK))
delData := zkCli.childrenData[delPath]
deletedMap[delPath] = zkCli.childrenData[delPath]
delete(zkCli.childrenData, delPath)
zkCli.device.log.Verbosef("ZooKeeper: Deleted node: %s->%s", delPath, delData)
}
for _, addPath := range addedNodes {
addPK, err := hex.DecodeString(addPath)
if err != nil {
zkCli.device.log.Errorf("ZooKeeper: decode public key from added path %s failed: %s", addPath, err)
continue
}
if device.staticIdentity.publicKey.Equals(NoisePublicKey(addPK)) {
zkCli.device.log.Errorf("ZooKeeper: add current node: %s", addPath)
continue
}
data, _, err := zkCli.conn.Get(fmt.Sprintf("%s/%s", zkCli.root, addPath))
if err != nil {
zkCli.device.log.Errorf("ZooKeeper: failed to get endpoint of add path %s: %s", addPath, err)
continue
}
zkCli.device.AddPeer(NoisePublicKey(addPK), string(data))
zkCli.childrenData[addPath] = string(data)
addedMap[addPath] = string(data)
zkCli.device.log.Verbosef("ZooKeeper: Added node: %s->%s", addPath, zkCli.childrenData[addPath])
}
// 更新子节点列表和Watcher
zkCli.children = newChildren
childCh = newChildCh
zkCli.device.zkChangedCallback(addedMap, deletedMap)
}
}
}
ch <- true
}()
return zkCli, nil
}
func (zkCli *ZkDiscovery) Close() {
ch := make(chan bool)
zkCli.exitCh <- ch
<-ch
}
// diff 返回a和b之间的差异元素(即a有但b没有的元素)
func diff(a, b []string) []string {
var diff []string
for _, x := range a {
i := sort.SearchStrings(b, x)
if i == len(b) || b[i] != x {
diff = append(diff, x)
}
}
return diff
}
1
https://gitee.com/aurawing/surguard-go.git
git@gitee.com:aurawing/surguard-go.git
aurawing
surguard-go
surguard-go
v0.3.0

搜索帮助