1 Star 0 Fork 0

simon/smallnest

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
mdns.go 4.93 KB
一键复制 编辑 原始数据 按行查看 历史
simon 提交于 2021-09-04 14:58 +08:00 . 旧版
package serverplugin
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"net/url"
"os"
"strconv"
"strings"
"time"
"github.com/grandcat/zeroconf"
metrics "github.com/rcrowley/go-metrics"
)
type serviceMeta struct {
Service string `json:"service,omitempty"`
Meta string `json:"meta,omitempty"`
ServiceAddress string `json:"service_address,omitempty"`
}
// MDNSRegisterPlugin implements mdns/dns-sd registry.
type MDNSRegisterPlugin struct {
// service address, for example, tcp@127.0.0.1:8972, quic@127.0.0.1:1234
ServiceAddress string
port int
Metrics metrics.Registry
// Registered services
Services []*serviceMeta
UpdateInterval time.Duration
server *zeroconf.Server
domain string
dying chan struct{}
done chan struct{}
}
// NewMDNSRegisterPlugin return a new MDNSRegisterPlugin.
// If domain is empty, use "local." in default.
func NewMDNSRegisterPlugin(serviceAddress string, port int, m metrics.Registry, updateInterval time.Duration, domain string) *MDNSRegisterPlugin {
if domain == "" {
domain = "local."
}
return &MDNSRegisterPlugin{
ServiceAddress: serviceAddress,
port: port,
Metrics: m,
UpdateInterval: updateInterval,
domain: domain,
dying: make(chan struct{}),
done: make(chan struct{}),
}
}
// Start starts to connect etcd cluster
func (p *MDNSRegisterPlugin) Start() error {
if p.server == nil && len(p.Services) != 0 {
p.initMDNS()
}
if p.UpdateInterval > 0 {
ticker := time.NewTicker(p.UpdateInterval)
go func() {
defer p.server.Shutdown()
for {
// refresh service TTL
select {
case <-p.dying:
close(p.done)
return
case <-ticker.C:
if p.server == nil && len(p.Services) == 0 {
break
}
extra := make(map[string]string)
if p.Metrics != nil {
extra["calls"] = fmt.Sprintf("%.2f", metrics.GetOrRegisterMeter("calls", p.Metrics).RateMean())
extra["connections"] = fmt.Sprintf("%.2f", metrics.GetOrRegisterMeter("connections", p.Metrics).RateMean())
}
//set this same metrics for all services at this server
for _, sm := range p.Services {
v, _ := url.ParseQuery(string(sm.Meta))
for key, value := range extra {
v.Set(key, value)
}
sm.Meta = v.Encode()
}
ss, _ := json.Marshal(p.Services)
s := url.QueryEscape(string(ss))
p.server.SetText([]string{s})
}
}
}()
}
return nil
}
// Stop unregister all services.
func (p *MDNSRegisterPlugin) Stop() error {
p.server.Shutdown()
close(p.dying)
<-p.done
return nil
}
func (p *MDNSRegisterPlugin) initMDNS() {
data, _ := json.Marshal(p.Services)
s := url.QueryEscape(string(data))
host, _ := os.Hostname()
addr := p.ServiceAddress
i := strings.Index(addr, "@")
if i > 0 {
addr = addr[i+1:]
}
_, portStr, err := net.SplitHostPort(addr)
if err != nil {
panic(err)
}
p.port, err = strconv.Atoi(portStr)
if err != nil {
panic(err)
}
server, err := zeroconf.Register(host, "_rpcxservices", p.domain, p.port, []string{s}, nil)
if err != nil {
panic(err)
}
p.server = server
}
// HandleConnAccept handles connections from clients
func (p *MDNSRegisterPlugin) HandleConnAccept(conn net.Conn) (net.Conn, bool) {
if p.Metrics != nil {
metrics.GetOrRegisterMeter("connections", p.Metrics).Mark(1)
}
return conn, true
}
// PreCall handles rpc call from clients
func (p *MDNSRegisterPlugin) PreCall(_ context.Context, _, _ string, args interface{}) (interface{}, error) {
if p.Metrics != nil {
metrics.GetOrRegisterMeter("calls", p.Metrics).Mark(1)
}
return args, nil
}
// Register handles registering event.
// this service is registered at BASE/serviceName/thisIpAddress node
func (p *MDNSRegisterPlugin) Register(name string, rcvr interface{}, metadata string) (err error) {
if strings.TrimSpace(name) == "" {
err = errors.New("Register service `name` can't be empty")
return
}
sm := &serviceMeta{
Service: name,
Meta: metadata,
ServiceAddress: p.ServiceAddress,
}
p.Services = append(p.Services, sm)
if p.server == nil {
p.initMDNS()
return
}
ss, _ := json.Marshal(p.Services)
s := url.QueryEscape(string(ss))
p.server.SetText([]string{s})
return
}
func (p *MDNSRegisterPlugin) RegisterFunction(serviceName, fname string, fn interface{}, metadata string) error {
return p.Register(serviceName, fn, metadata)
}
func (p *MDNSRegisterPlugin) Unregister(name string) (err error) {
if len(p.Services) == 0 {
return nil
}
if strings.TrimSpace(name) == "" {
err = errors.New("Register service `name` can't be empty")
return
}
var services = make([]*serviceMeta, 0, len(p.Services)-1)
for _, meta := range p.Services {
if meta.Service != name {
services = append(services, meta)
}
}
p.Services = services
ss, _ := json.Marshal(p.Services)
s := url.QueryEscape(string(ss))
p.server.SetText([]string{s})
// if p.server != nil {
// p.server.Shutdown()
// return
// }
return
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/simon_git_code/smallnest.git
git@gitee.com:simon_git_code/smallnest.git
simon_git_code
smallnest
smallnest
e483c3e07d35

搜索帮助