代码拉取完成,页面将自动刷新
package main
import (
"context"
"fmt"
"gitee.com/micro-tools/micro"
"gitee.com/micro-tools/micro/module"
"gitee.com/micro-tools/wf/extend/utils/gmicro/registry"
"gitee.com/micro-tools/wf/extend/utils/gmicro/registry/etcd"
"gitee.com/micro-tools/wf/frame/g"
"gitee.com/micro-tools/wf/os/glog"
"gitee.com/micro-tools/wf/text/gstr"
"gitee.com/micro-tools/wf/util/gconv"
consulapi "github.com/hashicorp/consul/api"
"github.com/mitchellh/consulstructure"
"go.etcd.io/etcd/clientv3"
"reflect"
)
const (
TracingName = "Blocking Query"
TracingURL = "172.16.13.6:6831"
ConsulURL = "127.0.0.1:8500"
EtcdURL = "127.0.0.1:2379"
)
func main() {
go func() {
type Config struct {
Blocking string `json:"blocking,omitempty"`
}
EtcdBlockingQuery(&Config{}, "/corex/", func(value interface{}) {
var val Config
_ = gconv.Struct(value, &val)
glog.Info(val)
})
//ConsulBlockingQuery(&Config{}, func(value interface{}) {
// var val Config
// _ = gconv.Struct(value, &val)
// glog.Info(val)
//})
}()
rs := etcd.NewRegistry(func(options *registry.Options) {
options.Address = []string{EtcdURL}
})
app := micro.Create(
module.Debug(false),
module.Tracing(module.Trace{
Status: true,
Name: TracingName,
URL: TracingURL,
}),
module.Registry(rs), //指定服务发现
)
err := app.Run(New())
if err != nil {
app.Logger().Errorf("%s", err.Error())
}
}
func ConsulBlockingQuery(targetPointer interface{}, prefix string, call func(value interface{})) {
// Create our decoder
updateCh := make(chan interface{})
errCh := make(chan error)
decoder := &consulstructure.Decoder{
Target: targetPointer,
UpdateCh: updateCh,
ErrCh: errCh,
Prefix: prefix,
Consul: &consulapi.Config{Address: ConsulURL},
}
// Run the decoder and wait for changes
go decoder.Run()
for {
select {
case v := <-updateCh:
call(v)
case err := <-errCh:
fmt.Printf("Error: %s\n", err)
}
}
}
func EtcdBlockingQuery(targetPointer interface{}, prefix string, call func(value interface{})) {
raw := GetStructTagName(targetPointer)
if len(raw) == 0 {
return
}
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{EtcdURL},
})
if err != nil {
return
}
var wkey string
var opts []clientv3.OpOption
if prefix != "" {
wkey = prefix
opts = append(opts, clientv3.WithPrefix())
} else {
wkey = ""
opts = append(opts, clientv3.WithPrefix())
}
ctx := context.Background()
for {
select {
case r := <-cli.Watch(ctx, wkey, opts...):
for _, v := range r.Events {
mk := gconv.String(v.Kv.Key)
mk = gstr.Replace(mk, prefix, "")
for _, k := range raw {
if k == mk {
call(g.Map{k: interface{}(v.Kv.Value)})
break
}
}
}
}
}
}
// GetStructTagName 获取struct的field的tag内容
func GetStructTagName(pointer interface{}) []string {
types := reflect.TypeOf(pointer)
numField := types.Elem().NumField()
var tagValues = make([]string, numField)
for i := 0; i < numField; i++ {
tagValue := types.Elem().Field(i).Tag.Get("json")
if tagValue == "" {
continue
}
tagValues[i] = gstr.Split(tagValue, ",")[0]
}
return tagValues
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。