4 Star 5 Fork 4

Plato/Service-Box-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
zk_infra.go 11.99 KB
一键复制 编辑 原始数据 按行查看 历史
package zookeeper
import (
"errors"
"gitee.com/dennis-kk/service-box-go/common"
perror "gitee.com/dennis-kk/service-box-go/util/errors"
"gitee.com/dennis-kk/service-box-go/util/service_infra"
"gitee.com/dennis-kk/service-box-go/util/slog"
"github.com/go-zookeeper/zk"
"sync"
"sync/atomic"
"time"
)
var (
defaultTimeout = 5 * time.Second //default timeout for connect zookeeper server
once sync.Once
)
func init() {
once.Do(func() {
service_infra.RegisterServiceInfra("zookeeper", makeZKServiceInfra)
})
}
type zkClientStatus int
const (
zkDisConnected zkClientStatus = iota
zkConnected
)
// zkServiceInfra implement of service infra interface with zookeeper
type zkServiceInfra struct {
opts *service_infra.Options //zookeeper options
conn *zk.Conn //zookeeper connection
logger *zkLogHelper //zk hook logger
status zkClientStatus // zookeeper client connect status
zkWatcher <-chan zk.Event //zookeeper watcher event
zkMainCh chan *zkEventData //zookeeper write by net tick read by main loop
zkNetCh chan *zkEventData //write by main loop, read by net loop
varietyEH service_infra.ServiceEventHandle //listener map, ket service path, value function
closeSign chan struct{}
wg sync.WaitGroup
}
func makeZKServiceInfra() service_infra.IServiceInfra {
return &zkServiceInfra{
opts: &service_infra.Options{},
conn: nil,
logger: makeZkLogHelper(0),
status: zkDisConnected,
zkWatcher: nil,
zkMainCh: make(chan *zkEventData, 16),
zkNetCh: make(chan *zkEventData, 16),
varietyEH: nil,
closeSign: make(chan struct{}, 1),
wg: sync.WaitGroup{},
}
}
func (sf *zkServiceInfra) Init(opts ...service_infra.Option) error {
for _, opt := range opts {
opt(sf.opts)
}
sf.opts.PreProcess()
return nil
}
// Start start zookeeper client, connect to zookeeper server
func (sf *zkServiceInfra) Start() error {
var err error
sf.conn, sf.zkWatcher, err = zk.Connect(sf.opts.Hosts, defaultTimeout, zk.WithLogger(sf.logger))
if err != nil {
slog.Error("[zookeeper] connect to zookeeper servers %v error %v !", sf.opts.Hosts, err)
return err
}
t := time.NewTimer(defaultTimeout)
run := true
for run {
select {
case <-t.C:
return errors.New("connect to server time out")
default:
status := sf.conn.State()
if status == zk.StateConnected || status == zk.StateHasSession {
run = false
}
}
}
atomic.StoreUint32(&sf.logger.open, 1)
// 预先创建好前缀路径
err = sf.createParentPath(sf.opts.Prefix)
if err != nil {
return err
}
sf.status = zkConnected
sf.wg.Add(1)
go sf.loop()
return nil
}
func (sf *zkServiceInfra) Stop() error {
if sf == nil {
// has dis connected to server
return nil
}
sf.closeSign <- struct{}{}
close(sf.closeSign)
sf.wg.Wait()
sf.conn.Close()
return nil
}
func (sf *zkServiceInfra) Tick() {
for {
select {
case zkEvent := <-sf.zkMainCh:
sf.dealLoopEvent(zkEvent)
default:
return
}
}
}
func (sf *zkServiceInfra) FindServiceAsync(svsName string) error {
if !isZKConnValid(sf.conn) {
slog.Error("[Zookeeper] invalid zookeeper connection state: %d ", sf.conn.State())
return common.ErrServiceFinderConn
}
event := &zkEventData{
eType: zkFindService,
name: svsName,
}
event.path = preProcessSvsPath(sf.opts.Prefix, svsName)
//TODO get timer with object pool
//阻塞发送消息,最多一秒
timer := time.NewTimer(1 * time.Second)
select {
case sf.zkNetCh <- event:
return nil
case <-timer.C:
return common.ErrFindServiceTimeout
}
}
func (sf *zkServiceInfra) FindService(svsName string) (*service_infra.ServiceInfo, error) {
if !isZKConnValid(sf.conn) {
slog.Error("[Zookeeper] invalid zookeeper connection state: %d ", sf.conn.State())
return nil, nil
}
//pre deal svsName
svsName = preProcessSvsPath(sf.opts.Prefix, svsName)
exists, _, err := sf.conn.Exists(svsName)
if err != nil {
return nil, err
}
if !exists {
return nil, nil
}
children, _, err := sf.conn.Children(svsName)
if err != nil {
return nil, err
}
if len(children) == 0 {
return nil, nil
}
info := &service_infra.ServiceInfo{
Hosts: children,
}
return info, nil
}
func (sf *zkServiceInfra) RegisterService(svsName string, info *service_infra.ServiceInfo) error {
if !isZKConnValid(sf.conn) {
slog.Error("[Zookeeper] invalid service connect while service %s register ", svsName)
return nil
}
if info == nil {
//FIXME add common err and add log
return nil
}
//pre deal svsName
svsName = preProcessSvsPath(sf.opts.Prefix, svsName)
//check node exits
exists, _, err := sf.conn.Exists(svsName)
if err != nil {
sf.logger.Printf("register service %s error %v ", svsName, err)
return err
}
//zk: ephemeral nodes may not have children
//主节点是临时节点的话是不能创建子节点
if exists == false {
//TODO check path from cache
for i, v := range svsName {
//create parents path
if v == '/' && i != 0 {
if err = sf.tryCreateZKPath(svsName[:i]); err != nil {
return err
}
}
}
_, err = sf.conn.Create(svsName, nil, 0, zk.WorldACL(zk.PermAll))
if err != nil {
return err
}
}
// set service info
for _, v := range info.Hosts {
//service path
path := svsName + "/" + v
exists, _, err = sf.conn.Exists(path)
if err != nil {
slog.Error("[Zookeeper] %s check path exists error %v", path, err)
return err
}
if exists {
err = sf.conn.Delete(path, -1)
if err != nil {
slog.Error("[Zookeeper] %s delete path error %v", path, err)
return err
}
}
_, err = sf.conn.Create(path, nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err != nil {
slog.Error("[Zookeeper] create path %s error %v", path, err)
return err
}
}
return nil
}
func (sf *zkServiceInfra) UnRegisterService(svsName string, info *service_infra.ServiceInfo) error {
if sf == nil || !isZKConnValid(sf.conn) {
return perror.InvalidZKConn
}
if info == nil {
//FIXME add common err and err log
return nil
}
//pre deal service name path
svsName = preProcessSvsPath(sf.opts.Prefix, svsName)
for _, v := range info.Hosts {
err := sf.conn.Delete(svsName+"/"+v, -1)
if err != nil {
return err
}
}
return nil
}
func (sf *zkServiceInfra) AddListener(handle service_infra.ServiceEventHandle) error {
sf.varietyEH = handle
return nil
}
func (sf *zkServiceInfra) DelListener() error {
if sf == nil {
//TODO add common log
return nil
}
sf.varietyEH = nil
return nil
}
func (sf *zkServiceInfra) IsConnected() bool {
return sf.status == zkConnected
}
func (sf *zkServiceInfra) loop() {
var reason string
defer func() {
sf.logger.Printf("exit zkServiceInfra loop with %s", reason)
sf.wg.Done()
}()
for {
select {
case zkEvent := <-sf.zkNetCh:
if zkEvent == nil {
reason = "zkNetCh has been closed"
// Deal EOF
return
}
sf.dealLoopEvent(zkEvent)
case zkEvent, ok := <-sf.zkWatcher:
if !ok {
reason = "zkWatcher has been closed"
sf.logger.Printf("zookeeper event channel has been closed !")
return
}
if zkEvent.Err != nil {
reason = zkEvent.Err.Error()
sf.logger.Printf("get zookeeper error event %d , err: %q", zkEvent.State, zkEvent.Err.Error())
return
}
switch zkEvent.State {
case zk.StateExpired:
reason = "receive StateExpired "
sf.logger.Printf("zookeeper node %s expired %d !", zkEvent.Path, zkEvent.Type)
return
default:
sf.dealZKEvent(zkEvent)
}
case <-sf.closeSign:
reason = "closing signal"
//优雅关闭
return
}
}
}
func (sf *zkServiceInfra) createParentPath(path string) error {
if path == "/" {
return nil
}
//check node exits
exists, _, err := sf.conn.Exists(path)
if err != nil {
sf.logger.Printf("register service %s error %v ", path, err)
return err
}
//zk: ephemeral nodes may not have children
//主节点是临时节点的话是不能创建子节点
if exists == false {
for i, v := range path {
//create parents path
if v == '/' && i != 0 {
if err = sf.tryCreateZKPath(path[:i]); err != nil {
return err
}
}
}
_, err = sf.conn.Create(path, nil, 0, zk.WorldACL(zk.PermAll))
if err != nil {
return err
}
}
return nil
}
func (sf *zkServiceInfra) tryCreateZKPath(path string) error {
exists, _, err := sf.conn.Exists(path)
if err != nil {
//TODO add error log
return err
}
if exists {
//path has create
return nil
}
// try create zookeeper nodes
_, err = sf.conn.Create(path, nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err != nil {
//TODO add error log
return err
}
return nil
}
func (sf *zkServiceInfra) dealZKEvent(event zk.Event) {
switch event.Type {
case zk.EventNodeCreated:
//add new zookeeper node
sf.onNodeCreated(event.Path)
case zk.EventNodeDataChanged:
//node data has changed
case zk.EventNodeChildrenChanged:
//node's children changed, This is only generated by watches on the child list of a node. These watches
//are set using get_children
sf.onChildrenChanged(event.Path)
case zk.EventNodeDeleted:
//node has been deleted, generated by watches on nodes. These watches
//are set using zoo_exists and zoo_get.
sf.onNodeDeleted(event.Path)
}
}
func (sf *zkServiceInfra) dealLoopEvent(event *zkEventData) {
switch event.eType {
case zkFindService:
sf.doFindService(event)
case zkRegisterService:
sf.doRegisterService(event)
case zkChangeHost:
sf.doChangeService(event)
case zkDeleteHost:
sf.doDeleteService(event)
default:
slog.Warn("[Zookeeper] unsupported event type %d ", event.eType)
}
}
func (sf *zkServiceInfra) doFindService(event *zkEventData) {
//check exits
isExists, _, _, err := sf.conn.ExistsW(event.path)
if !isExists {
return
}
event.eType = zkChangeHost
if err != nil {
event.err = err
sf.zkMainCh <- event
return
}
//not use zookeeper stat and event channel yet
event.hosts, _, _, event.err = sf.conn.ChildrenW(event.path)
if len(event.hosts) == 0 {
return
}
sf.zkMainCh <- event
}
func (sf *zkServiceInfra) doRegisterService(event *zkEventData) {
exists, _, err := sf.conn.Exists(event.path)
if err != nil {
//TODO add log
return
}
if exists == false {
//TODO check path from cache
for i, v := range event.path {
//create parents path
if v == '/' && i != 0 {
if err = sf.tryCreateZKPath(event.path[:i]); err != nil {
return
}
}
}
_, err = sf.conn.Create(event.path, nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err != nil {
return
}
}
// set service info
for _, v := range event.hosts {
_, err = sf.conn.Create(event.path+"/"+v, nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err != nil {
//TODO add err log
return
}
}
}
func (sf *zkServiceInfra) doChangeService(event *zkEventData) {
if sf.varietyEH != nil {
if event.err != nil {
sf.varietyEH(event.name, service_infra.ServiceChange, nil, event.err)
return
}
info := &service_infra.ChangedInfo{
Hosts: event.hosts,
}
sf.varietyEH(event.name, service_infra.ServiceChange, info, nil)
}
}
func (sf *zkServiceInfra) doDeleteService(event *zkEventData) {
if sf.varietyEH != nil {
sf.varietyEH(event.name, service_infra.ServiceDelete, nil, event.err)
}
}
func (sf *zkServiceInfra) onNodeCreated(path string) {
//service's node has been created, add watcher to child
event := &zkEventData{
eType: zkChangeHost,
}
event.name = getNameFromPath(path)
event.hosts, _, _, event.err = sf.conn.ChildrenW(path)
if event.err == nil && len(event.hosts) == 0 {
return
}
sf.zkMainCh <- event
}
func (sf *zkServiceInfra) onChildrenChanged(path string) {
//get value,and re-add watch
event := &zkEventData{
eType: zkChangeHost,
name: getNameFromPath(path),
path: path,
}
event.hosts, _, _, event.err = sf.conn.ChildrenW(path)
sf.zkMainCh <- event
}
func (sf *zkServiceInfra) onNodeDeleted(path string) {
//re-add exists watcher
_, _, _, err := sf.conn.ExistsW(path)
//construct node delete event
event := &zkEventData{
eType: zkDeleteHost,
name: getNameFromPath(path),
path: path,
err: err,
}
sf.zkMainCh <- event
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/dennis-kk/service-box-go.git
git@gitee.com:dennis-kk/service-box-go.git
dennis-kk
service-box-go
Service-Box-go
v0.5.23

搜索帮助