当前仓库属于暂停状态,部分功能使用受限,详情请查阅 仓库状态说明
2 Star 0 Fork 1

JUMEI_ARCH / go-plugins
暂停

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
watcher.go 2.55 KB
一键复制 编辑 原始数据 按行查看 历史
Asim Aslam 提交于 2018-02-19 20:27 . watch branch
package eureka
import (
"errors"
"time"
"github.com/hudl/fargo"
"github.com/micro/go-micro/registry"
)
type eurekaWatcher struct {
conn fargoConnection
exit chan bool
results chan *registry.Result
}
func newWatcher(conn fargoConnection, opts ...registry.WatchOption) registry.Watcher {
var wo registry.WatchOptions
for _, o := range opts {
o(&wo)
}
w := &eurekaWatcher{
conn: conn,
exit: make(chan bool),
results: make(chan *registry.Result),
}
// watch a single service
if len(wo.Service) > 0 {
done := make(chan struct{})
ch := conn.ScheduleAppUpdates(wo.Service, false, done)
go w.watch(ch, done)
go func() {
<-w.exit
close(done)
}()
return w
}
// watch all services
go w.poll()
return w
}
func (e *eurekaWatcher) poll() {
// list service ticker
t := time.NewTicker(time.Second * 10)
done := make(chan struct{})
services := make(map[string]<-chan fargo.AppUpdate)
for {
select {
case <-e.exit:
close(done)
return
case <-t.C:
apps, err := e.conn.GetApps()
if err != nil {
continue
}
for _, app := range apps {
if _, ok := services[app.Name]; ok {
continue
}
ch := e.conn.ScheduleAppUpdates(app.Name, false, done)
services[app.Name] = ch
go e.watch(ch, done)
}
}
}
}
func (e *eurekaWatcher) watch(ch <-chan fargo.AppUpdate, done chan struct{}) {
for {
select {
// exit on exit
case <-e.exit:
return
// exit on done
case <-done:
return
// process updates
case u := <-ch:
if u.Err != nil {
continue
}
// process instances independently
for _, instance := range u.App.Instances {
var action string
switch instance.Status {
// update
case fargo.UP:
action = "update"
// delete
case fargo.OUTOFSERVICE, fargo.UNKNOWN, fargo.DOWN:
action = "delete"
// skip
default:
continue
}
// construct the service with a single node
service := appToService(&fargo.Application{
Name: u.App.Name,
Instances: []*fargo.Instance{instance},
})
if len(service) == 0 {
continue
}
// in case we get bounced during processing
// check exit channels
select {
// send the update
case e.results <- &registry.Result{Action: action, Service: service[0]}:
case <-done:
return
case <-e.exit:
return
}
}
}
}
}
func (e *eurekaWatcher) Next() (*registry.Result, error) {
select {
case <-e.exit:
return nil, errors.New("watcher stopped")
case r := <-e.results:
return r, nil
}
}
func (e *eurekaWatcher) Stop() {
close(e.exit)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/JMArch/go-plugins.git
git@gitee.com:JMArch/go-plugins.git
JMArch
go-plugins
go-plugins
v0.9.3

搜索帮助

344bd9b3 5694891 D2dac590 5694891