diff --git a/examples/run_grpc/client_grpc.go b/examples/run_grpc/client_grpc.go index e76ea50971a80815d2fe0c1935d6a90eff13905f..80801203388e4e8055580bad50ad064c1ef9ec87 100644 --- a/examples/run_grpc/client_grpc.go +++ b/examples/run_grpc/client_grpc.go @@ -18,11 +18,9 @@ func main() { logger := v1log.NewZapLog("example", logPath, nil) c := v1config.RpcClientConfig{ - RpcWay: "grpc", - RpcAddr: "127.0.0.1:40001", // RpcDisWay=none 时直连rpc + RpcServer: "127.0.0.1:40001", // RpcDisWay=none 时直连rpc RpcDisWay: "etcd", RpcDisServer: "127.0.0.1:2379", - RpcTimeout: 3, } opts := []v1clientrpc.RemoteOption{} @@ -34,7 +32,7 @@ func main() { v1clientrpc.OptConnTimeout(c.RpcTimeout)) } - conn, err := v1clientrpc.NewRemoteConn("admin", c.RpcAddr, opts...) + conn, err := v1clientrpc.NewRemoteConn("admin", c.RpcServer, opts...) if err != nil { logger.Error("new remote conn err:%s", err) return diff --git a/examples/run_grpc/server_grpc.go b/examples/run_grpc/server_grpc.go index c00fcbbb3fb1919595d3f3fa40341959d88e0751..9853595aae19146c291cb34c0a5ca1b3ada80ca2 100644 --- a/examples/run_grpc/server_grpc.go +++ b/examples/run_grpc/server_grpc.go @@ -25,31 +25,22 @@ func main() { logPath := fmt.Sprintf("./runtime/logs/%s.log", filepath.Base(os.Args[0])) logger := v1log.NewZapLog("example", logPath, nil) - c := v1config.GRpcServerConfig{ - RpcLisAddr: ":" + port, - RpcRegWay: "etcd", - RpcRegServer: "0.0.0.0:2379", + c := &v1config.RpcServerConfig{ + RpcLisAddr: ":" + port, + RpcRegWay: "etcd", + RpcRegServer: "0.0.0.0:2379", + RpcRequestAddr: "127.0.0.1:" + port, } - requestIp:="127.0.0.1" - rpcPort:="" - - server, err := v1rpc.NewGRPCServer("admin", c.RpcLisAddr, - v1rpc.WithOpsHandler(func(server *grpc.Server) { - s := NewDemoServer() - pb.RegisterUserSrServer(server, s) - }), - v1rpc.WithOpsRequestIp(requestIp), - v1rpc.WithOpsRequestPort(rpcPort), - ) + server, err := v1rpc.NewGRPCServer("admin", c) if err != nil { logger.Error("run server error:%s" + err.Error()) return } - switch c.RpcRegWay { - case v1config.RpcRegByEtcd: - etcd, _ := v1rpc.NewEtcdSrvRegister(c.RpcRegServer) - server.AddServiceRegister(etcd) + server.AddLogger(logger) + server.HandlerFunc = func(server *grpc.Server) { + s := NewDemoServer() + pb.RegisterUserSrServer(server, s) } server.Run() diff --git a/go.mod b/go.mod index 384e9ccd62094ade033409ecdec3af9c4837af11..8151dd4c9329106393ce83fcf98d3c386a03d884 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.12 require ( github.com/dgrijalva/jwt-go v3.2.0+incompatible + github.com/go-redis/redis/v7 v7.4.1 github.com/go-sql-driver/mysql v1.6.0 github.com/julienschmidt/httprouter v1.3.0 github.com/natefinch/lumberjack v2.0.0+incompatible diff --git a/go.sum b/go.sum index 7a7115ac54991e2febe7d310ba8498b328e4aa19..f4ec7fe86b73109b651a3b6f69ab6071e990691a 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -43,6 +45,8 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOrTI= +github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -79,6 +83,8 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -105,6 +111,11 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/natefinch/lumberjack v2.0.0+incompatible h1:4QJd3OLAMgj7ph+yZTuX13Ld4UpgHp07nNdFX7mqFfM= github.com/natefinch/lumberjack v2.0.0+incompatible/go.mod h1:Wi9p2TTF5DG5oU+6YfsmYQpsTIOm0B1VNzQg9Mw6nPk= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo= +github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= +github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -172,6 +183,7 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -179,6 +191,7 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= @@ -198,10 +211,12 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -270,8 +285,12 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/src/v1/clients/metrics/indicator.go b/src/v1/clients/metrics/indicator.go new file mode 100644 index 0000000000000000000000000000000000000000..9b6e619f97ed9f0bf9b0977ee0ffa8edf05841ce --- /dev/null +++ b/src/v1/clients/metrics/indicator.go @@ -0,0 +1,13 @@ +package metrics + +//指标器接口 +type IIndicator interface { + Report(data map[string]*IndicData) //上报数据 +} + +//单个指标数据 +type IndicData struct { + Name string + Value float64 + Unit string +} \ No newline at end of file diff --git a/src/v1/clients/metrics/indicator_log.go b/src/v1/clients/metrics/indicator_log.go new file mode 100644 index 0000000000000000000000000000000000000000..ab285ef96cbc5cf6e88c75bcde833f0868015ed7 --- /dev/null +++ b/src/v1/clients/metrics/indicator_log.go @@ -0,0 +1,35 @@ +package metrics + +import "log" + +type LogIndicator struct { + label string + ch chan map[string]*IndicData +} + +//指标上报器-日志 +func NewLogIndicator(label string) *LogIndicator { + st := &LogIndicator{ + label: label, + ch: make(chan map[string]*IndicData, 10), + } + go st.bgReport() + return st +} + +func (l *LogIndicator) bgReport() { + for { + select { + case data := <-l.ch: + for _, v := range data { + log.Printf("[Statistical-%s]%s: %f %s", l.label, v.Name, v.Value, v.Unit) + } + } + } +} + +//上报数据 +func (l *LogIndicator) Report(data map[string]*IndicData) { + l.ch <- data +} + diff --git a/src/v1/clients/metrics/indicator_promethus.go b/src/v1/clients/metrics/indicator_promethus.go new file mode 100644 index 0000000000000000000000000000000000000000..eb1fff9818ead1ddde456d6f326b6162d50ba201 --- /dev/null +++ b/src/v1/clients/metrics/indicator_promethus.go @@ -0,0 +1,18 @@ +package metrics + +//todo 指标上报至prometheus +type PrometheusIndicator struct { + label string + ch chan map[string]*IndicData +} + +func NewPrometheusIndicator(label string) *PrometheusIndicator { + return &PrometheusIndicator{ + label: label, + } +} + +//上报数据 +func (l *PrometheusIndicator) Report(data map[string]*IndicData) { + //todo +} diff --git a/src/v1/clients/metrics/statistical.go b/src/v1/clients/metrics/statistical.go new file mode 100644 index 0000000000000000000000000000000000000000..e7b3ef71de8861205f955d989c843a23574e9d73 --- /dev/null +++ b/src/v1/clients/metrics/statistical.go @@ -0,0 +1,148 @@ +package metrics + +import ( + "fmt" + "sync" + "time" +) + +// 统计器接口 +type IOpStatistical interface { + //AddReqs(count int64, time int64) //添加请求统计(请求次数,请求时间) + //AddOneReq(time int64) + + IStatistical + RegIndicator(IIndicator) //注册指标器 +} + +// 统计器接口 +type IStatistical interface { + Push(name string, value float64, opts ...StsOpt) IStatistical +} + +//聚合方法 +type AggrMethodType string + +const AggrSum AggrMethodType = "sum" //求和 +const AggrAvg AggrMethodType = "avg" //求平均 +const AggrLast AggrMethodType = "last" //取最后 + +//统计可选项类型 +type StsOpt func(*dataValues) + +//统计使用单位 +func StsWithUnit(unit string) StsOpt { + return func(d *dataValues) { + d.unit = unit + } +} + +//统计使用聚合方法 +func StsWithAggr(aggr AggrMethodType) StsOpt { + return func(d *dataValues) { + d.aggrMethod = aggr + } +} + +//统计值序列 +type dataValues struct { + aggrMethod AggrMethodType + name string + unit string + values []float64 +} + +//计算出的值,最终为一个统计指标 +func (d *dataValues) value() *IndicData { + l := len(d.values) + v := &IndicData{} + if l < 1 { + return v + } + v.Unit = d.unit + v.Name = d.name + + switch d.aggrMethod { + case AggrLast: + //取最后 + v.Value = d.values[l-1] + case AggrAvg: + //求平均 + sum := float64(0) + for _, x := range d.values { + sum += x + } + v.Value = sum / float64(l) + case AggrSum: + //求和 + sum := float64(0) + for _, x := range d.values { + sum += x + } + v.Value = sum + return v + } + return v +} + +//统计器 +type OpStatistical struct { + label string + interval int64 // 统计间隔时间,单位:秒,如:每1秒统计一次 + + sync.RWMutex + + indicDataValues map[string]*dataValues //待统计的指标数据values + indicators []IIndicator //注册的指标上报器数组 + indCh chan map[string]*IndicData //指标管道 +} + +func NewOpStatistical(label string, interval int64) *OpStatistical { + if interval <= 0 { + interval = 1 + } + st := &OpStatistical{ + label: label, + interval: interval, + indCh: make(chan map[string]*IndicData, 20), + } + st.reset() + return st +} + +func (st *OpStatistical) Start() { + go st.run() +} + +func (st *OpStatistical) run() { + //间隔统计 + t := time.NewTicker(time.Second * time.Duration(st.interval)) + + for { + select { + case <-t.C: + // + st.runOnce() + } + } +} + +// +func (st *OpStatistical) runOnce() { + values := st.reset() + + m := map[string]*IndicData{} + for k, data := range values { + m[k] = data.value() + fmt.Printf("now st: %s,%+v", k, m[k]) + } + st.indCh <- m +} + +func (st *OpStatistical) reset() map[string]*dataValues { + st.Lock() + values := st.indicDataValues + st.indicDataValues = make(map[string]*dataValues) + st.Unlock() + return values +} \ No newline at end of file diff --git a/src/v1/config/rpc.go b/src/v1/config/rpc.go index 3f30dedda7700698931fd7965753dcdc221eeab7..1b7d26c2e17d99aca7ed719457d65715497aa4e9 100644 --- a/src/v1/config/rpc.go +++ b/src/v1/config/rpc.go @@ -9,18 +9,16 @@ const RpcByHttp = "http" const RpcByGRpc = "grpc" //服务端grpc配置 -type GRpcServerConfig struct { +type RpcServerConfig struct { RpcLisAddr string `yaml:"RpcLisAddr"` //grpc启动端口,如 ":4001" - RpcRegWay string `yaml:"RpcRegWay"` //服务注册方式:none/etcd + RpcRegWay string `yaml:"RpcRegWay"` //服务注册方式:none/etcd RpcRegServer string `yaml:"RpcRegServer"` //服务注册服务端地址,如etcd地址 + RpcRequestAddr string `yaml:"RpcRequestAddr"` //服务注册服务端地址,如etcd地址 } -//客户端rpc配置 type RpcClientConfig struct { - RpcWay string `yaml:"RpcWay"` //rpc服务方式,如:http/grpc - RpcTimeout int64 `yaml:"RpcTimeout"` //rpc超时时间,单位:秒 - RpcAddr string `yaml:"RpcAddr"` //rpc请求地址 - + RpcServer string `yaml:"RpcServer"` //rpc访问地址 + RpcTimeout int64 `yaml:"RpcTimeout"` //rpc访问超时时间 RpcDisWay string `yaml:"RpcDisWay"` //服务发现方式:none/etcd - RpcDisServer string `yaml:"RpcDisServer"` //服务发现服务端地址,如etcd地址 + RpcDisServer string `yaml:"RpcDisServer"` //服务发现方式:none/etcd } diff --git a/src/v1/grpcserver/server_grpc.go b/src/v1/grpcserver/server_grpc.go index cd33703f81767223e8243ddce7328e377c7eb990..b2e09f05030cfed91a25d3264ebbb358597a25f9 100644 --- a/src/v1/grpcserver/server_grpc.go +++ b/src/v1/grpcserver/server_grpc.go @@ -2,8 +2,9 @@ package grpcserver import ( "gitee.com/scottq/go-framework/src/utils" + "gitee.com/scottq/go-framework/src/v1/config" + "gitee.com/scottq/go-framework/src/v1/log" "google.golang.org/grpc" - "log" "net" "strings" ) @@ -17,20 +18,22 @@ type GRPCServer struct { HandlerFunc HandlerFunc serviceRegister IServiceRegister - - RequestIp string - RequestPort string + conf *config.RpcServerConfig + log.InvokeLog } -func NewGRPCServer(name string, addr string, ops ...ServerOps) (*GRPCServer, error) { +func NewGRPCServer(name string, conf *config.RpcServerConfig) (*GRPCServer, error) { server := &GRPCServer{ name: name, - listenAddr: addr, + listenAddr: conf.RpcLisAddr, s: grpc.NewServer(), + conf: conf, } - for i, _ := range ops { - ops[i](server) + switch conf.RpcRegWay { + case config.RpcRegByEtcd: + etcd, _ := NewEtcdSrvRegister(conf.RpcRegServer) + server.AddServiceRegister(etcd) } return server, nil @@ -53,16 +56,16 @@ func (svr *GRPCServer) Run() error { func (svr *GRPCServer) run() error { lis, err := net.Listen("tcp", svr.listenAddr) if err != nil { - log.Fatalf("failed to listen: %v", err) + svr.Fatal("failed to listen: %v", err) } if svr.HandlerFunc != nil { svr.HandlerFunc(svr.s) } - log.Printf("[%s]rpc running at %s\n", svr.name, svr.listenAddr) + svr.Info("[%s]rpc running at %s\n", svr.name, svr.listenAddr) if err := svr.s.Serve(lis); err != nil { - log.Fatalf("failed to serve: %v", err) + svr.Fatal("failed to serve: %v", err) return err } @@ -70,23 +73,19 @@ func (svr *GRPCServer) run() error { } func (svr *GRPCServer) registerService() error { - var err error - RequestIp := svr.RequestIp - RequestPort := svr.RequestPort + regServer := svr.conf.RpcRequestAddr - if RequestIp == "" { - RequestIp, err = utils.GetLocalIP() + if regServer == "" { + RequestIp, err := utils.GetLocalIP() if err != nil { return err } - } - if RequestPort == "" { - RequestPort = strings.Split(svr.listenAddr, ":")[1] + RequestPort := strings.Split(svr.listenAddr, ":")[1] + regServer = RequestIp + ":" + RequestPort } if svr.serviceRegister != nil { - regServer := RequestIp + ":" + RequestPort - log.Printf("[%s]rpc register addr %s\n", svr.name, regServer) + svr.Info("[%s]rpc register addr %s\n", svr.name, regServer) return svr.serviceRegister.Register(svr.name, regServer) } diff --git a/src/v1/grpcserver/server_grpc_opt.go b/src/v1/grpcserver/server_grpc_opt.go deleted file mode 100644 index eecb9047cb3b90706962c6b8ed1729489fae23b7..0000000000000000000000000000000000000000 --- a/src/v1/grpcserver/server_grpc_opt.go +++ /dev/null @@ -1,22 +0,0 @@ -package grpcserver - -type ServerOps = func(*GRPCServer) - -var WithOpsRequestIp = func(ip string) ServerOps { - return func(server *GRPCServer) { - server.RequestIp = ip - } -} - -var WithOpsRequestPort = func(port string) ServerOps { - return func(server *GRPCServer) { - server.RequestPort = port - } -} - -var WithOpsHandler = func(h HandlerFunc) ServerOps { - return func(server *GRPCServer) { - server.HandlerFunc = h - } -} - diff --git a/src/v1/libs/lockers/ifs.go b/src/v1/libs/lockers/ifs.go new file mode 100644 index 0000000000000000000000000000000000000000..29849fb812140cabe443ea9b376ecf6ee8cef53d --- /dev/null +++ b/src/v1/libs/lockers/ifs.go @@ -0,0 +1,8 @@ +package lockers + +type DistributeLocker interface { + Lock(key string, ttl int64) (bool, error) + Release(key string) error + Delay(key string, ttl int64) error + TTL(key string) (int64, error) +} diff --git a/src/v1/libs/lockers/redis.go b/src/v1/libs/lockers/redis.go new file mode 100644 index 0000000000000000000000000000000000000000..93042b85d7e1ce976cc45183a43721d07d2a01f8 --- /dev/null +++ b/src/v1/libs/lockers/redis.go @@ -0,0 +1,55 @@ +package lockers + +import ( + "fmt" + "github.com/go-redis/redis/v7" + "os" + "time" +) + +type RedisDisLocker struct { + rds *redis.Client + lockValue string +} + +func NewRedisDisLocker(rds *redis.Client) (DistributeLocker, error) { + name, err := os.Hostname() + if err != nil { + name = fmt.Sprintf("unknow_%d", time.Now().UnixNano()) + } + return &RedisDisLocker{ + rds: rds, + lockValue: "locked_on_" + name, + }, nil +} + +// ttl: 秒级 +func (l *RedisDisLocker) Lock(key string, ttl int64) (bool, error) { + _, err := l.rds.Do("set", key, l.lockValue, "NX", "EX", ttl).Result() + if err == redis.Nil { + return false, nil + } else if err != nil { + return false, err + } + return true, nil +} + +func (l *RedisDisLocker) Release(key string) error { + luaDelete := redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end`) + _, err := luaDelete.Run(l.rds, []string{key}, l.lockValue).Result() + return err +} + +// ttl: 秒级 +func (l *RedisDisLocker) Delay(key string, ttl int64) error { + luaRefresh := redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end`) + _, err := luaRefresh.Run(l.rds, []string{key}, l.lockValue, ttl*1000).Result() + return err +} + +// 返回 毫秒级 +func (l *RedisDisLocker) TTL(key string) (int64, error) { + luaPTTL := redis.NewScript(`if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pttl", KEYS[1]) else return -3 end`) + t, err := luaPTTL.Run(l.rds, []string{key}, l.lockValue).Result() + return t.(int64), err +} diff --git a/src/v1/libs/lockers/redis_test.go b/src/v1/libs/lockers/redis_test.go new file mode 100644 index 0000000000000000000000000000000000000000..271703366baad507f1c60f2654f3b134efe17381 --- /dev/null +++ b/src/v1/libs/lockers/redis_test.go @@ -0,0 +1,106 @@ +package lockers + +import ( + "fmt" + "github.com/go-redis/redis/v7" + "sync" + "testing" + "time" +) + +func TestRedisDisLocker_Lock(t *testing.T) { + + rds := getRedisClient("127.0.0.1:6379") + locker, err := NewRedisDisLocker(rds) + if err != nil { + t.Fatalf("redis conn fail: %s", err) + } + lockKey := "lock_1" + total := 0 + max := 10 + + var wg sync.WaitGroup + // 抢锁测试 + // 本地维护一个total,启动多协程对此值进行累加 + // 打印流程,并对结果校验 + testFunc := func(tag string) { + for { + ok, err := locker.Lock(lockKey, 30) + if err != nil { + t.Fatalf("lock fail:%s", err) + break + } + if !ok { + continue + } + if total >= max { + err = locker.Release(lockKey) + if err != nil { + t.Fatalf("release fail:%s", err) + break + } + break + } + t.Logf("total: %s,%d", tag, total) + time.Sleep(time.Second) //加上sleep可以看到明显的流程 + total++ + err = locker.Release(lockKey) + if err != nil { + t.Fatalf("release fail:%s", err) + break + } + } + } + for i := 1; i <= 10; i++ { + x := i + wg.Add(1) + go func() { + defer wg.Done() + testFunc(fmt.Sprintf("tag_%d", x)) + }() + } + wg.Wait() + + if total != max { + t.Fatalf("locker verify fail:%d", total) + } +} + +func TestRedisDisLocker_Delay(t *testing.T) { + + rds := getRedisClient("127.0.0.1:6379") + locker, err := NewRedisDisLocker(rds) + if err != nil { + t.Fatalf("redis conn fail: %s", err) + } + tl := time.Minute * 10 + ttl := int64(tl / time.Second) + lockKey := "lock_2" + + println(lockKey,ttl) + + if ok, err := locker.Lock(lockKey, ttl); err != nil { + t.Fatalf("lock error:%s", err) + } else if !ok { + t.Fatalf("lock fail:%s", err) + } + + for i := 1; i <= 10; i++ { + err := locker.Delay(lockKey, ttl) + if err != nil { + t.Fatalf("delay err:%s", err) + continue + } + t.Logf("delay success") + time.Sleep(time.Second * 3) + } +} +func getRedisClient(addr string) *redis.Client { + opt := &redis.Options{ + Addr: addr, + Password: "", // no password set + DB: 0, // use default DB + } + + return redis.NewClient(opt) +}