1 Star 0 Fork 0

liuxuezhan / go-plugins

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
grpc.go 10.08 KB
一键复制 编辑 原始数据 按行查看 历史
liuxuezhan 提交于 2021-08-18 17:35 . 'new'
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
// Package grpc is a point to point grpc broker
package grpc
import (
"context"
"crypto/tls"
"errors"
"fmt"
"math/rand"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/google/uuid"
"gitee.com/liuxuezhan/go-micro-v1.18.0/broker"
"gitee.com/liuxuezhan/go-micro-v1.18.0/config/cmd"
merr "gitee.com/liuxuezhan/go-micro-v1.18.0/errors"
"gitee.com/liuxuezhan/go-micro-v1.18.0/registry"
"gitee.com/liuxuezhan/go-micro-v1.18.0/registry/cache"
maddr "gitee.com/liuxuezhan/go-micro-v1.18.0/util/addr"
"gitee.com/liuxuezhan/go-micro-v1.18.0/util/log"
mnet "gitee.com/liuxuezhan/go-micro-v1.18.0/util/net"
mls "gitee.com/liuxuezhan/go-micro-v1.18.0/util/tls"
proto "gitee.com/liuxuezhan/go-plugins/broker/grpc/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// grpcBroker is a point to point async broker
type grpcBroker struct {
id string
address string
opts broker.Options
srv *grpc.Server
r registry.Registry
sync.RWMutex
subscribers map[string][]*grpcSubscriber
running bool
exit chan chan error
}
type grpcHandler struct {
g *grpcBroker
}
type grpcSubscriber struct {
opts broker.SubscribeOptions
id string
topic string
fn broker.Handler
svc *registry.Service
hb *grpcBroker
}
type grpcEvent struct {
m *broker.Message
t string
}
var (
registryKey = "gitee.com/liuxuezhan/go-micro-v1.18.0/registry"
broadcastVersion = "ff.grpc.broadcast"
registerTTL = time.Minute
registerInterval = time.Second * 30
)
func init() {
rand.Seed(time.Now().Unix())
cmd.DefaultBrokers["grpc"] = NewBroker
}
func newConfig(config *tls.Config) *tls.Config {
if config == nil {
return &tls.Config{
InsecureSkipVerify: true,
}
}
return config
}
func newGRPCBroker(opts ...broker.Option) broker.Broker {
options := broker.Options{
Context: context.TODO(),
}
for _, o := range opts {
o(&options)
}
// set address
addr := ":0"
if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 {
addr = options.Addrs[0]
}
// get registry
reg, ok := options.Context.Value(registryKey).(registry.Registry)
if !ok {
reg = registry.DefaultRegistry
}
h := &grpcBroker{
id: "grpc-broker-" + uuid.New().String(),
address: addr,
opts: options,
r: reg,
srv: grpc.NewServer(),
subscribers: make(map[string][]*grpcSubscriber),
exit: make(chan chan error),
}
// specify the message handler
proto.RegisterBrokerServer(h.srv, &grpcHandler{h})
return h
}
func (h *grpcEvent) Ack() error {
return nil
}
func (h *grpcEvent) Message() *broker.Message {
return h.m
}
func (h *grpcEvent) Topic() string {
return h.t
}
func (h *grpcSubscriber) Options() broker.SubscribeOptions {
return h.opts
}
func (h *grpcSubscriber) Topic() string {
return h.topic
}
func (h *grpcSubscriber) Unsubscribe() error {
return h.hb.unsubscribe(h)
}
// The grpc handler
func (h *grpcHandler) Publish(ctx context.Context, msg *proto.Message) (*proto.Empty, error) {
if len(msg.Topic) == 0 {
return nil, merr.InternalServerError("go.micro.broker", "Topic not found")
}
m := &broker.Message{
Header: msg.Header,
Body: msg.Body,
}
p := &grpcEvent{m: m, t: msg.Topic}
h.g.RLock()
for _, subscriber := range h.g.subscribers[msg.Topic] {
if msg.Id == subscriber.id {
// sub is sync; crufty rate limiting
// so we don't hose the cpu
subscriber.fn(p)
}
}
h.g.RUnlock()
return new(proto.Empty), nil
}
func (h *grpcBroker) subscribe(s *grpcSubscriber) error {
h.Lock()
defer h.Unlock()
if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {
return err
}
h.subscribers[s.topic] = append(h.subscribers[s.topic], s)
return nil
}
func (h *grpcBroker) unsubscribe(s *grpcSubscriber) error {
h.Lock()
defer h.Unlock()
var subscribers []*grpcSubscriber
// look for subscriber
for _, sub := range h.subscribers[s.topic] {
// deregister and skip forward
if sub.id == s.id {
_ = h.r.Deregister(sub.svc)
continue
}
// keep subscriber
subscribers = append(subscribers, sub)
}
// set subscribers
h.subscribers[s.topic] = subscribers
return nil
}
func (h *grpcBroker) run(l net.Listener) {
t := time.NewTicker(registerInterval)
defer t.Stop()
for {
select {
// heartbeat for each subscriber
case <-t.C:
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {
_ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
}
}
h.RUnlock()
// received exit signal
case ch := <-h.exit:
ch <- l.Close()
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {
_ = h.r.Deregister(sub.svc)
}
}
h.RUnlock()
return
}
}
}
func (h *grpcBroker) Address() string {
h.RLock()
defer h.RUnlock()
return h.address
}
func (h *grpcBroker) Connect() error {
h.RLock()
if h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
var l net.Listener
var err error
if h.opts.Secure || h.opts.TLSConfig != nil {
config := h.opts.TLSConfig
fn := func(addr string) (net.Listener, error) {
if config == nil {
hosts := []string{addr}
// check if its a valid host:port
if host, _, err := net.SplitHostPort(addr); err == nil {
if len(host) == 0 {
hosts = maddr.IPs()
} else {
hosts = []string{host}
}
}
// generate a certificate
cert, err := mls.Certificate(hosts...)
if err != nil {
return nil, err
}
config = &tls.Config{Certificates: []tls.Certificate{cert}}
}
return tls.Listen("tcp", addr, config)
}
l, err = mnet.Listen(h.address, fn)
} else {
fn := func(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
}
l, err = mnet.Listen(h.address, fn)
}
if err != nil {
return err
}
log.Logf("Broker Listening on %s", l.Addr().String())
addr := h.address
h.address = l.Addr().String()
go h.srv.Serve(l)
go func() {
h.run(l)
h.Lock()
h.address = addr
h.Unlock()
}()
// get registry
reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)
if !ok {
reg = registry.DefaultRegistry
}
// set cache
h.r = cache.New(reg)
// set running
h.running = true
return nil
}
func (h *grpcBroker) Disconnect() error {
h.RLock()
if !h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
// stop cache
rc, ok := h.r.(cache.Cache)
if ok {
rc.Stop()
}
// exit and return err
ch := make(chan error)
h.exit <- ch
err := <-ch
// set not running
h.running = false
return err
}
func (h *grpcBroker) Init(opts ...broker.Option) error {
h.RLock()
if h.running {
h.RUnlock()
return errors.New("cannot init while connected")
}
h.RUnlock()
h.Lock()
defer h.Unlock()
for _, o := range opts {
o(&h.opts)
}
if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 {
h.address = h.opts.Addrs[0]
}
if len(h.id) == 0 {
h.id = "broker-" + uuid.New().String()
}
// get registry
reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)
if !ok {
reg = registry.DefaultRegistry
}
// get cache
if rc, ok := h.r.(cache.Cache); ok {
rc.Stop()
}
// set registry
h.r = cache.New(reg)
return nil
}
func (h *grpcBroker) Options() broker.Options {
return h.opts
}
func (h *grpcBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
h.RLock()
s, err := h.r.GetService("topic:" + topic)
if err != nil {
h.RUnlock()
return err
}
h.RUnlock()
m := &proto.Message{
Topic: topic,
Header: make(map[string]string),
Body: msg.Body,
}
for k, v := range msg.Header {
m.Header[k] = v
}
pub := func(node *registry.Node, b *proto.Message) {
// get tls config
config := newConfig(h.opts.TLSConfig)
var opts []grpc.DialOption
// check if secure is added in metadata
if node.Metadata["secure"] == "true" {
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(config)))
} else {
opts = append(opts, grpc.WithInsecure())
}
m := &proto.Message{
Topic: b.Topic,
Id: node.Id,
Header: b.Header,
Body: b.Body,
}
// dial grpc connection
c, err := grpc.Dial(node.Address, opts...)
if err != nil {
log.Logf(err.Error())
return
}
// publish message
proto.NewBrokerClient(c).Publish(context.TODO(), m)
}
for _, service := range s {
// only process if we have nodes
if len(service.Nodes) == 0 {
continue
}
switch service.Version {
// broadcast version means broadcast to all nodes
case broadcastVersion:
for _, node := range service.Nodes {
// publish async
go pub(node, m)
}
default:
// select node to publish to
node := service.Nodes[rand.Int()%len(service.Nodes)]
// publish async
go pub(node, m)
}
}
return nil
}
func (h *grpcBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
options := broker.NewSubscribeOptions(opts...)
// parse address for host, port
parts := strings.Split(h.Address(), ":")
host := strings.Join(parts[:len(parts)-1], ":")
port, _ := strconv.Atoi(parts[len(parts)-1])
addr, err := maddr.Extract(host)
if err != nil {
return nil, err
}
// create unique id
id := h.id + "." + uuid.New().String()
var secure bool
if h.opts.Secure || h.opts.TLSConfig != nil {
secure = true
}
// register service
node := &registry.Node{
Id: id,
Address: fmt.Sprintf("%s:%d", addr, port),
Metadata: map[string]string{
"secure": fmt.Sprintf("%t", secure),
},
}
// check for queue group or broadcast queue
version := options.Queue
if len(version) == 0 {
version = broadcastVersion
}
service := &registry.Service{
Name: "topic:" + topic,
Version: version,
Nodes: []*registry.Node{node},
}
// generate subscriber
subscriber := &grpcSubscriber{
opts: options,
hb: h,
id: id,
topic: topic,
fn: handler,
svc: service,
}
// subscribe now
if err := h.subscribe(subscriber); err != nil {
return nil, err
}
// return the subscriber
return subscriber, nil
}
func (h *grpcBroker) String() string {
return "grpc"
}
// NewBroker returns a new grpc broker
func NewBroker(opts ...broker.Option) broker.Broker {
return newGRPCBroker(opts...)
}
1
https://gitee.com/liuxuezhan/go-plugins.git
git@gitee.com:liuxuezhan/go-plugins.git
liuxuezhan
go-plugins
go-plugins
db1d4b8b101e

搜索帮助