diff --git a/examples/dbs/run_mysql.go b/examples/dbs/run_mysql.go new file mode 100644 index 0000000000000000000000000000000000000000..9dce43eb73f07ab23a0f50adf0b7972753f8104e --- /dev/null +++ b/examples/dbs/run_mysql.go @@ -0,0 +1,59 @@ +package main + +import ( + "gitee.com/scottq/go-framework/examples_stc" + v1config "gitee.com/scottq/go-framework/src/v1/config" + "log" + "time" +) + +func main() { + c := v1config.DBConfig{ + DbHost: "127.0.0.1", + DbPort: "3306", + DbName: "db_test", + DbUser: "root", + DbPassword: "root", + + MaxLifetime: 30 * int(time.Minute), + MaxIdleTime: 30 * int(time.Minute), + MaxOpenConns: 256, + MaxIdleConns: 80, + MaxConcatLen: "18446744073709551615", + } + log.Printf("config: %+v", c) + op, err := examples_stc.NewOpMysql(c) + if err != nil { + log.Fatalf("new op mysql err:%s", err) + return + } + + list, total, err := op.Query() + if err != nil { + log.Fatalf("query err:%s", err) + return + } + log.Printf("list: %v", list) + log.Printf("total: %d", total) + + + list, total, err = op.Query() + if err != nil { + log.Fatalf("query err:%s", err) + return + } + log.Printf("list: %v", list) + log.Printf("total: %d", total) + + // + newOp, err := op.BeginTrans() + list, total, err = newOp.Query() + if err != nil { + log.Fatalf("query err:%s", err) + return + } + defer newOp.RollbackTrans() + log.Printf("list: %v", list) + log.Printf("total: %d", total) + +} diff --git a/examples/run_grpc/client_grpc.go b/examples/run_grpc/client_grpc.go index 6a9abd82fdf18766948ad2cdcd3cb788aadfc51f..e76ea50971a80815d2fe0c1935d6a90eff13905f 100644 --- a/examples/run_grpc/client_grpc.go +++ b/examples/run_grpc/client_grpc.go @@ -5,6 +5,7 @@ import ( "fmt" "gitee.com/scottq/go-framework/grpcProtos/pb" v1clientrpc "gitee.com/scottq/go-framework/src/v1/clients/grpc" + v1config "gitee.com/scottq/go-framework/src/v1/config" v1log "gitee.com/scottq/go-framework/src/v1/log" "google.golang.org/grpc" "os" @@ -16,13 +17,24 @@ func main() { logPath := fmt.Sprintf("./runtime/logs/%s.log", filepath.Base(os.Args[0])) logger := v1log.NewZapLog("example", logPath, nil) + c := v1config.RpcClientConfig{ + RpcWay: "grpc", + RpcAddr: "127.0.0.1:40001", // RpcDisWay=none 时直连rpc + RpcDisWay: "etcd", + RpcDisServer: "127.0.0.1:2379", + RpcTimeout: 3, + } + opts := []v1clientrpc.RemoteOption{} - if false { - opts = append(opts, v1clientrpc.OptEtcdDiscovery("admin", "127.0.0.1:2379")) + switch c.RpcDisWay { + case v1config.RpcRegByEtcd: + opts = append(opts, + v1clientrpc.OptEtcdDiscovery(c.RpcDisServer), + v1clientrpc.OptConnTimeout(c.RpcTimeout)) } - conn, err := v1clientrpc.NewRemoteConn("127.0.0.1:40001", opts..., ) + conn, err := v1clientrpc.NewRemoteConn("admin", c.RpcAddr, opts...) if err != nil { logger.Error("new remote conn err:%s", err) return @@ -31,9 +43,9 @@ func main() { info := conn.GetState() logger.Info(info.String()) - c := NewRemoteClient(conn) + client := NewRemoteClient(conn) - resp, err := c.CheckAuth("12334") + resp, err := client.CheckAuth("12334") if err != nil { logger.Error("fail:%s", err) return diff --git a/examples/run_grpc/server_grpc.go b/examples/run_grpc/server_grpc.go index b41c22184ad949004e6b6df8e8944db2aa8e164c..c00fcbbb3fb1919595d3f3fa40341959d88e0751 100644 --- a/examples/run_grpc/server_grpc.go +++ b/examples/run_grpc/server_grpc.go @@ -2,8 +2,10 @@ package main import ( "context" + "flag" "fmt" "gitee.com/scottq/go-framework/grpcProtos/pb" + v1config "gitee.com/scottq/go-framework/src/v1/config" v1rpc "gitee.com/scottq/go-framework/src/v1/grpcserver" v1log "gitee.com/scottq/go-framework/src/v1/log" "google.golang.org/grpc" @@ -11,21 +13,42 @@ import ( "path/filepath" ) +var port = "" + +func init() { + flag.StringVar(&port, "port", "40001", "rpc lis port") +} + func main() { + flag.Parse() + logPath := fmt.Sprintf("./runtime/logs/%s.log", filepath.Base(os.Args[0])) logger := v1log.NewZapLog("example", logPath, nil) - server, err := v1rpc.NewGRPCServer("", ":40001") + c := v1config.GRpcServerConfig{ + RpcLisAddr: ":" + port, + RpcRegWay: "etcd", + RpcRegServer: "0.0.0.0:2379", + } + + requestIp:="127.0.0.1" + rpcPort:="" + + server, err := v1rpc.NewGRPCServer("admin", c.RpcLisAddr, + v1rpc.WithOpsHandler(func(server *grpc.Server) { + s := NewDemoServer() + pb.RegisterUserSrServer(server, s) + }), + v1rpc.WithOpsRequestIp(requestIp), + v1rpc.WithOpsRequestPort(rpcPort), + ) if err != nil { logger.Error("run server error:%s" + err.Error()) return } - server.HandlerFunc = func(server *grpc.Server) { - s := NewDemoServer() - pb.RegisterUserSrServer(server, s) - } - if false { - etcd, _ := v1rpc.NewEtcdSrvRegister("0.0.0.0:2379") + switch c.RpcRegWay { + case v1config.RpcRegByEtcd: + etcd, _ := v1rpc.NewEtcdSrvRegister(c.RpcRegServer) server.AddServiceRegister(etcd) } diff --git a/examples_stc/db_mysql.go b/examples_stc/db_mysql.go new file mode 100644 index 0000000000000000000000000000000000000000..ae371d729c689a2e5724e5c5fd08ed617211a72a --- /dev/null +++ b/examples_stc/db_mysql.go @@ -0,0 +1,78 @@ +package examples_stc + +import ( + "gitee.com/scottq/go-framework/src/v1/clients/dbs" + v1config "gitee.com/scottq/go-framework/src/v1/config" +) + +type OpMysql struct { + dbs.IDBMysql +} + +func NewOpMysql(c v1config.DBConfig) (*OpMysql, error) { + db, err := dbs.NewDBMysql(c) + if err != nil { + return nil, err + } + return &OpMysql{ + db, + }, nil +} + +func (op *OpMysql) Query() ([]map[string]interface{}, int64, error) { + rows, total, err := op.ExecuteSearch("admin", []string{ + "id", + "created_at", + "nickname", + "account", + }, []string{ + "id>1", + }, []interface{}{ + }, []string{ + "id desc", + }, 1, 10) + if err != nil { + return nil, 0, err + } + + list := []map[string]interface{}{} + for rows.Next() { + id := 0 + createdAt := "" + nickname := "" + account := "" + + err := rows.Scan(&id, &createdAt, &nickname, &account) + if err != nil { + return nil, 0, err + } + list = append(list, map[string]interface{}{ + "id": id, + "created_at": createdAt, + "nickname": nickname, + "account": account, + }) + } + err = rows.Close() + if err != nil { + return nil, 0, err + } + return list, total, nil +} + +func (op *OpMysql) BeginTrans() (*OpMysql, error) { + var err error + + newOp := &OpMysql{} + newOp.IDBMysql, err = op.IDBMysql.BeginTrans() + + return newOp, err +} + +func (op *OpMysql) CommitTrans() error { + return op.IDBMysql.CommitTrans() +} + +func (op *OpMysql) RollbackTrans() error { + return op.IDBMysql.RollbackTrans() +} \ No newline at end of file diff --git a/examples_stc/http_handler.go b/examples_stc/http_handler.go new file mode 100644 index 0000000000000000000000000000000000000000..7613d214416dcb4a199544d78c2e62b1dc31b0ed --- /dev/null +++ b/examples_stc/http_handler.go @@ -0,0 +1,7 @@ +package examples_stc + +import v1http "gitee.com/scottq/go-framework/src/v1/httpserver" + +func RouteHelloWord(ctx *v1http.Ctx) { + ctx.WriteStr("Hello World") +} diff --git a/src/v1/clients/dbs/db_mysql.go b/src/v1/clients/dbs/db_mysql.go new file mode 100644 index 0000000000000000000000000000000000000000..7c4fd0a02dd3c60caa7b158ba05aa82b86875737 --- /dev/null +++ b/src/v1/clients/dbs/db_mysql.go @@ -0,0 +1,313 @@ +package dbs + +import ( + "database/sql" + "fmt" + v1config "gitee.com/scottq/go-framework/src/v1/config" + "github.com/go-sql-driver/mysql" + "log" + "net" + "strings" + "time" +) + +type IDBMysql interface { + DB() *sql.DB + TX() *sql.Tx + + ExecuteSearch(tableName string, fields []string, whereArr []string, whereArgs []interface{}, orderBy []string, pageNum, pageSize int64) (*sql.Rows, int64, error) + ExecuteQuery(tableName string, fields map[string]interface{}, whereArr []string, whereArgs []interface{}, orderBy []string) (bool, error) + ExecuteCreate(tableName string, fields map[string]interface{}) (int64, error) + ExecuteUpdate(tableName string, fields map[string]interface{}, whereArr []string, whereArgs []interface{}) (int64, error) + + BeginTrans() (IDBMysql, error) + CommitTrans() error + RollbackTrans() error +} +type DBMysql struct { + Edb *sql.DB + Etx *sql.Tx +} + +func (d *DBMysql) DB() *sql.DB { + return d.Edb +} + +func (d *DBMysql) TX() *sql.Tx { + return d.Etx +} + +func NewDBMysql(c v1config.DBConfig) (*DBMysql, error) { + dbConfig := mysql.NewConfig() + dbConfig.User = c.DbUser + dbConfig.Passwd = c.DbPassword + dbConfig.Net = "tcp" + dbConfig.Addr = net.JoinHostPort(c.DbHost, c.DbPort) + dbConfig.DBName = c.DbName + dbConfig.MultiStatements = true + dbConfig.RejectReadOnly = false + extParam := make(map[string]string) + if c.MaxConcatLen != "" { + extParam["group_concat_max_len"] = c.MaxConcatLen + } + dbConfig.Params = extParam + + newDb, err := sql.Open("mysql", dbConfig.FormatDSN()) + if err != nil { + log.Fatalf("connect to db %s failed", dbConfig.FormatDSN()) + return nil, err + } + if c.MaxIdleConns > 0 { + //预留并发链接数 + newDb.SetMaxIdleConns(c.MaxIdleConns) + } + if c.MaxOpenConns > 0 { + //最大支持链接 + newDb.SetMaxOpenConns(c.MaxOpenConns) + } + if c.MaxLifetime > 0 { + //每个链接最大生存时间 + newDb.SetConnMaxLifetime(time.Duration(c.MaxLifetime)) + } + if c.MaxIdleTime > 0 { + //每个链接最大空闲时间 + newDb.SetConnMaxIdleTime(time.Duration(c.MaxIdleTime)) + } + + return &DBMysql{ + Edb: newDb, + }, nil +} + +func (d *DBMysql) ExecuteSearch(tableName string, fields []string, whereArr []string, whereArgs []interface{}, orderBy []string, pageNum, pageSize int64) (*sql.Rows, int64, error) { + var err error + var total int64 + + if pageNum <= 0 { + pageNum = 1 + } + + if pageSize <= 0 || pageSize > 1000 { + pageSize = 1000 + } + + whereStr := "" + if len(whereArr) > 0 { + whereStr = "WHERE " + strings.Join(whereArr, " AND ") + } + + countSql := fmt.Sprintf( + "SELECT COUNT(id) AS total FROM `%s` %s LIMIT 1", + tableName, whereStr) + + db := d.DB() + tx := d.TX() + + //total + var stmt *sql.Stmt + if tx != nil { + stmt, err = tx.Prepare(countSql) + } else { + stmt, err = db.Prepare(countSql) + } + if err != nil { + return nil, total, err + } + + row := stmt.QueryRow(whereArgs...) + err = row.Scan(&total) + if err == sql.ErrNoRows { + return nil, 0, nil + } else if err != nil { + return nil, total, err + } + if err:=stmt.Close();err!=nil{ + return nil,0,err + } + + if len(fields) <= 0 { + fields = append(fields, "*") + } + fieldsStr := strings.Join(fields, ",") + orderByStr := strings.Join(orderBy, ",") + if orderByStr != "" { + orderByStr = "ORDER BY " + orderByStr + } + searchSql := fmt.Sprintf( + "SELECT %s FROM `%s` %s %s LIMIT ? OFFSET ?", + fieldsStr, tableName, whereStr, orderByStr) + + var stmt1 *sql.Stmt + if tx != nil { + stmt1, err = tx.Prepare(searchSql) + } else { + stmt1, err = db.Prepare(searchSql) + } + if err != nil { + return nil, total, err + } + + whereArgs = append(whereArgs, pageSize) + whereArgs = append(whereArgs, pageSize*(pageNum-1)) + + rows, err := stmt1.Query(whereArgs...) + if err != nil { + return nil, total, err + } + + return rows, total, nil +} + +func (d *DBMysql) ExecuteQuery(tableName string, fields map[string]interface{}, whereArr []string, whereArgs []interface{}, orderBy []string) (bool, error) { + var err error + + var fieldArr = []string{} + var scanArr = []interface{}{} + for k, v := range fields { + fieldArr = append(fieldArr, fmt.Sprintf("`%s`", k)) + scanArr = append(scanArr, v) + } + + whereStr := "" + if len(whereArr) > 0 { + whereStr = "WHERE " + strings.Join(whereArr, " AND ") + } + fieldStr := strings.Join(fieldArr, ",") + + orderByStr := strings.Join(orderBy, ",") + if orderByStr != "" { + orderByStr = "ORDER BY " + orderByStr + } + selectSql := fmt.Sprintf("SELECT %s FROM %s %s %s LIMIT 1", + fieldStr, tableName, whereStr, orderByStr) + + db := d.DB() + tx := d.TX() + + var stmt *sql.Stmt + if tx != nil { + stmt, err = tx.Prepare(selectSql) + } else { + stmt, err = db.Prepare(selectSql) + } + if err != nil { + return false, err + } + defer stmt.Close() + + row := stmt.QueryRow(whereArgs...) + err = row.Scan(scanArr...) + if err == sql.ErrNoRows { + return false, nil + } else if err != nil { + return false, err + } + + return true, nil +} + +func (d *DBMysql) ExecuteCreate(tableName string, fields map[string]interface{}) (int64, error) { + var err error + + var fieldArr = []string{} + var valueArr = []interface{}{} + for k, v := range fields { + fieldArr = append(fieldArr, fmt.Sprintf("`%s`=?", k)) + valueArr = append(valueArr, v) + } + + fieldStr := strings.Join(fieldArr, ",") + + db := d.DB() + tx := d.TX() + + insertSql := fmt.Sprintf("INSERT INTO %s SET %s", tableName, fieldStr) + var stmt *sql.Stmt + if tx != nil { + stmt, err = tx.Prepare(insertSql) + } else { + stmt, err = db.Prepare(insertSql) + } + + if err != nil { + return 0, err + } + defer stmt.Close() + + ret, err := stmt.Exec(valueArr...) + if err != nil { + return 0, err + } + + return ret.LastInsertId() +} + +func (d *DBMysql) ExecuteUpdate(tableName string, fields map[string]interface{}, whereArr []string, whereArgs []interface{}) (int64, error) { + var err error + + var fieldArr = []string{} + var valueArr = []interface{}{} + for k, v := range fields { + fieldArr = append(fieldArr, fmt.Sprintf("`%s`=?", k)) + valueArr = append(valueArr, v) + } + + fieldStr := strings.Join(fieldArr, ",") + + whereStr := "" + if len(whereArr) > 0 { + whereStr = "WHERE " + strings.Join(whereArr, " AND ") + } + + if len(whereArgs) > 0 { + for _, v := range whereArgs { + valueArr = append(valueArr, v) + } + } + db := d.DB() + tx := d.TX() + + updateSql := fmt.Sprintf("UPDATE %s SET %s %s", tableName, fieldStr, whereStr) + var stmt *sql.Stmt + if tx != nil { + stmt, err = tx.Prepare(updateSql) + } else { + stmt, err = db.Prepare(updateSql) + } + if err != nil { + return 0, err + } + defer stmt.Close() + + ret, err := stmt.Exec(valueArr...) + if err != nil { + return 0, err + } + + return ret.RowsAffected() +} + +func (d *DBMysql) BeginTrans() (IDBMysql, error) { + tx, err := d.Edb.Begin() + if err != nil { + return nil, err + } + return &DBMysql{ + Edb: nil, + Etx: tx, + }, nil +} + +func (d *DBMysql) CommitTrans() error { + if d.Etx == nil { + return fmt.Errorf("not begin trans") + } + return d.Etx.Commit() +} + +func (d *DBMysql) RollbackTrans() error { + if d.Etx == nil { + return fmt.Errorf("not begin trans") + } + return d.Etx.Rollback() +} diff --git a/src/v1/clients/etcd/service_discovery.go b/src/v1/clients/etcd/service_discovery.go index 9472f38061bba4c167379eddcf0a6f74c855d7f5..ed38c6f77aaac306a999dc37b060692b6594d662 100644 --- a/src/v1/clients/etcd/service_discovery.go +++ b/src/v1/clients/etcd/service_discovery.go @@ -6,6 +6,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc/attributes" "google.golang.org/grpc/resolver" + "log" "strings" "sync" "time" @@ -94,7 +95,10 @@ func (sdv *ServiceDiscovery) syncCC() { addr = rInfo.SrInvoke } - addresses = append(addresses, resolver.Address{Addr: addr, Attributes: attrs}) + addresses = append(addresses, resolver.Address{ + Addr: addr, + Attributes: attrs, + }) } //sdv.cc.NewAddress(addresses) sdv.cc.UpdateState(resolver.State{ @@ -121,8 +125,7 @@ func (sdv *ServiceDiscovery) Scheme() string { } func (sdv *ServiceDiscovery) ResolveNow(options resolver.ResolveNowOptions) { - sdv.log("resolve now") - sdv.log("resolve now:%v", options) + sdv.log("connecting... %+v",options) return } @@ -136,5 +139,5 @@ func (sdv *ServiceDiscovery) log(s string, args ...interface{}) { if !strings.HasSuffix(s, "\n") { s = s + "\n" } - fmt.Printf(s, args...) + log.Printf(s, args...) } diff --git a/src/v1/clients/grpc/client.go b/src/v1/clients/grpc/client.go index 9ce5b06ddad19c0861e3947ad585eec4ce891f21..05fb9d727d5bf8aad3135ef7b1050050bdd32c2f 100644 --- a/src/v1/clients/grpc/client.go +++ b/src/v1/clients/grpc/client.go @@ -14,12 +14,13 @@ import ( type RemoteOption func(*RemoteConn) error type RemoteConn struct { + name string addr string balanceName string connTimeout int64 } -func NewRemoteConn(addr string, options ...RemoteOption) (*grpc.ClientConn, error) { +func NewRemoteConn(name string, addr string, options ...RemoteOption) (*grpc.ClientConn, error) { defer func() { if err := recover(); err != nil { @@ -27,6 +28,7 @@ func NewRemoteConn(addr string, options ...RemoteOption) (*grpc.ClientConn, erro } }() rConn := &RemoteConn{ + name: name, addr: addr, balanceName: "round_robin", connTimeout: 10, @@ -59,9 +61,12 @@ func OptConnTimeout(d int64) RemoteOption { } } -func OptEtcdDiscovery(name, addr string) RemoteOption { - +func OptEtcdDiscovery(addr string) RemoteOption { return func(conn *RemoteConn) error { + if addr == "" { + addr = conn.addr + } + d, err := v1etcd.NewServiceDiscovery(addr, 5) if err != nil { return err @@ -69,9 +74,9 @@ func OptEtcdDiscovery(name, addr string) RemoteOption { resolver.Register(d) b := &WeightBalance{} - balancer.Register(base.NewBalancerBuilder(b.Name(), b, base.Config{HealthCheck: true}), ) + balancer.Register(base.NewBalancerBuilder(b.Name(), b, base.Config{HealthCheck: true})) - conn.addr = fmt.Sprintf("%s://8.8.8.8/%s", d.Scheme(), name) + conn.addr = fmt.Sprintf("%s://8.8.8.8/%s", d.Scheme(), conn.name) conn.balanceName = b.Name() return nil } diff --git a/src/v1/clients/grpc/weight_balance.go b/src/v1/clients/grpc/weight_balance.go index 1e0091b483cfa4568b733b3fb8eceb6900e6cf92..facd056a319c5e14d7c7142adba1eab0753fa809 100644 --- a/src/v1/clients/grpc/weight_balance.go +++ b/src/v1/clients/grpc/weight_balance.go @@ -1,9 +1,9 @@ package grpc import ( - "fmt" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" + "log" "sync" ) @@ -24,7 +24,9 @@ func (b *WeightBalance) Build(info base.PickerBuildInfo) balancer.Picker { conns := []balancer.SubConn{} weights := []int64{} + i := 0 for subConn, subInfo := range info.ReadySCs { + i++ conns = append(conns, subConn) //fmt.Println("subConn", subConn) @@ -41,7 +43,7 @@ func (b *WeightBalance) Build(info base.PickerBuildInfo) balancer.Picker { w = 1 } - fmt.Println("weight", w) + log.Printf("rpc discovery %d: %s ,%d", i, subInfo.Address.Addr, weight) weights = append(weights, w) } b.mu.Lock() @@ -69,4 +71,3 @@ func (b *WeightBalance) Pick(info balancer.PickInfo) (balancer.PickResult, error func (b *WeightBalance) Name() string { return "my_weight_round" } - diff --git a/src/v1/config/db.go b/src/v1/config/db.go new file mode 100644 index 0000000000000000000000000000000000000000..7265d3e32f0eb8c1ad1f1d5c2186a9b23cbb4a9c --- /dev/null +++ b/src/v1/config/db.go @@ -0,0 +1,15 @@ +package config + +type DBConfig struct { + DbHost string `yaml:"DbHost"` + DbPort string `yaml:"DbPort"` + DbUser string `yaml:"DbUser"` + DbPassword string `yaml:"DbPassword"` + DbName string `yaml:"DbName"` + + MaxIdleConns int `default:"0" yaml:"MaxIdleConns"` //预留并发链接数 + MaxOpenConns int `default:"0" yaml:"MaxOpenConns"` //最大支持链接 + MaxLifetime int `default:"0" yaml:"ConnMaxLifetime"` //每个链接最大生存时间 + MaxIdleTime int `default:"0" yaml:"ConnMaxIdleTime"` //每个链接最大空闲时间 + MaxConcatLen string `default:"" yaml:"MaxConcatLen"` +} diff --git a/src/v1/config/rpc.go b/src/v1/config/rpc.go new file mode 100644 index 0000000000000000000000000000000000000000..603c4d29523605b2969bb3ae21ae1feb7b1e6172 --- /dev/null +++ b/src/v1/config/rpc.go @@ -0,0 +1,26 @@ +package config + +// rpc注册方式 +const RpcRegByNone = "none" +const RpcRegByEtcd = "etcd" + +// rpc 请求方式 +const RpcByHttp = "http" +const RpcByGRpc = "grpc" + +//服务端grpc配置 +type GRpcServerConfig struct { + RpcLisAddr string `yaml:"RpcLisAddr"` //grpc启动端口,如 ":4001" + RpcRegWay string `yaml:"RpcRegister"` //服务注册方式:none/etcd + RpcRegServer string `yaml:"RpcRegServer"` //服务注册服务端地址,如etcd地址 +} + +//客户端rpc配置 +type RpcClientConfig struct { + RpcWay string `yaml:"RpcWay"` //rpc服务方式 + RpcTimeout int64 `yaml:"RpcTimeout"` //rpc超时时间,单位:秒 + RpcAddr string `yaml:"RpcAddr"` //rpc请求地址 + + RpcDisWay string `yaml:"RpcDisWay"` //服务发现方式:none/etcd + RpcDisServer string `yaml:"RpcDisServer"` //服务发现服务端地址,如etcd地址 +} diff --git a/src/v1/grpcserver/server_grpc.go b/src/v1/grpcserver/server_grpc.go index e3b061146bcf92c12acb5db07a6f4b3f1f5a902c..cd33703f81767223e8243ddce7328e377c7eb990 100644 --- a/src/v1/grpcserver/server_grpc.go +++ b/src/v1/grpcserver/server_grpc.go @@ -5,6 +5,7 @@ import ( "google.golang.org/grpc" "log" "net" + "strings" ) type HandlerFunc = func(*grpc.Server) @@ -21,16 +22,24 @@ type GRPCServer struct { RequestPort string } -func NewGRPCServer(name string, addr string, ) (*GRPCServer, error) { - return &GRPCServer{ +func NewGRPCServer(name string, addr string, ops ...ServerOps) (*GRPCServer, error) { + server := &GRPCServer{ name: name, listenAddr: addr, s: grpc.NewServer(), - }, nil + } + + for i, _ := range ops { + ops[i](server) + } + + return server, nil } func (svr *GRPCServer) AddServiceRegister(r IServiceRegister) { - svr.serviceRegister = r + if r != nil { + svr.serviceRegister = r + } } func (svr *GRPCServer) Run() error { @@ -71,9 +80,14 @@ func (svr *GRPCServer) registerService() error { return err } } + if RequestPort == "" { + RequestPort = strings.Split(svr.listenAddr, ":")[1] + } if svr.serviceRegister != nil { - return svr.serviceRegister.Register(svr.name, RequestIp+":"+RequestPort) + regServer := RequestIp + ":" + RequestPort + log.Printf("[%s]rpc register addr %s\n", svr.name, regServer) + return svr.serviceRegister.Register(svr.name, regServer) } return nil diff --git a/src/v1/grpcserver/server_grpc_dis.go b/src/v1/grpcserver/server_grpc_dis.go index 1f2baf1a00ebd15b6f39bc45ceb97fe9f9ca0810..d76bf39a3a2bec6b34a5f98196a6ff69e5566a5c 100644 --- a/src/v1/grpcserver/server_grpc_dis.go +++ b/src/v1/grpcserver/server_grpc_dis.go @@ -1,7 +1,11 @@ package grpcserver - //服务发现者 接口 -type IServiceDiscovery interface{ - +type IServiceDiscovery interface { + Discovery() +} + +//服务注册者 接口 +type IServiceRegister interface { + Register(name string, addr string) error } diff --git a/src/v1/grpcserver/server_grpc_opt.go b/src/v1/grpcserver/server_grpc_opt.go new file mode 100644 index 0000000000000000000000000000000000000000..eecb9047cb3b90706962c6b8ed1729489fae23b7 --- /dev/null +++ b/src/v1/grpcserver/server_grpc_opt.go @@ -0,0 +1,22 @@ +package grpcserver + +type ServerOps = func(*GRPCServer) + +var WithOpsRequestIp = func(ip string) ServerOps { + return func(server *GRPCServer) { + server.RequestIp = ip + } +} + +var WithOpsRequestPort = func(port string) ServerOps { + return func(server *GRPCServer) { + server.RequestPort = port + } +} + +var WithOpsHandler = func(h HandlerFunc) ServerOps { + return func(server *GRPCServer) { + server.HandlerFunc = h + } +} + diff --git a/src/v1/grpcserver/server_grpc_reg.go b/src/v1/grpcserver/server_grpc_reg.go deleted file mode 100644 index 3d25d27ae7c4328d18e8f637208937c9b1f03926..0000000000000000000000000000000000000000 --- a/src/v1/grpcserver/server_grpc_reg.go +++ /dev/null @@ -1,7 +0,0 @@ -package grpcserver - - -//服务注册者 接口 -type IServiceRegister interface{ - Register(name string,addr string) error -}