diff --git a/examples/run_grpc/client_grpc.go b/examples/run_grpc/client_grpc.go index a3b18bfb5c2e88aeaf4cb30052276acd0b882bbe..614f82b2cd896162e490acf8c32dbf1e004ec938 100644 --- a/examples/run_grpc/client_grpc.go +++ b/examples/run_grpc/client_grpc.go @@ -4,8 +4,8 @@ import ( "context" "fmt" "gitee.com/scottq/go-framework/grpcProtos/pb" - v1clientrpc "gitee.com/scottq/go-framework/src/v1/clients/grpc" v1config "gitee.com/scottq/go-framework/src/v1/config" + v1clientrpc "gitee.com/scottq/go-framework/src/v1/libs/clients/grpc" v1log "gitee.com/scottq/go-framework/src/v1/log" "google.golang.org/grpc" "os" diff --git a/examples_stc/db_mysql.go b/examples_stc/db_mysql.go index 40e619478f9cca78b80b97004714e9b81450e962..00e088fcca35a37fe5d8ec134951d923771682a0 100644 --- a/examples_stc/db_mysql.go +++ b/examples_stc/db_mysql.go @@ -2,8 +2,8 @@ package examples_stc import ( "database/sql" - "gitee.com/scottq/go-framework/src/v1/clients/dbs" v1config "gitee.com/scottq/go-framework/src/v1/config" + "gitee.com/scottq/go-framework/src/v1/libs/clients/dbs" ) type OpMysql struct { diff --git a/go.mod b/go.mod index 9f63a5c0e7209e9393c0a6840aba87a9a0edbeb9..2bc7ac8acf95beb152ff8cfd43f25d1e5d7d3f27 100644 --- a/go.mod +++ b/go.mod @@ -6,10 +6,15 @@ require ( github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/gin-gonic/gin v1.8.1 github.com/go-redis/redis/v7 v7.4.1 - github.com/go-sql-driver/mysql v1.6.0 + github.com/go-sql-driver/mysql v1.7.0 + github.com/google/uuid v1.1.2 + github.com/gorilla/websocket v1.5.0 + github.com/jinzhu/gorm v1.9.16 github.com/julienschmidt/httprouter v1.3.0 github.com/natefinch/lumberjack v2.0.0+incompatible + github.com/rfyiamcool/backoff v1.1.0 github.com/satori/go.uuid v1.2.0 + github.com/shopspring/decimal v1.3.1 go.etcd.io/etcd/client/v3 v3.5.0 go.uber.org/zap v1.17.0 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 @@ -17,6 +22,8 @@ require ( google.golang.org/grpc v1.40.0 google.golang.org/protobuf v1.28.0 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b + gorm.io/driver/mysql v1.4.5 + gorm.io/gorm v1.23.8 ) require ( @@ -29,6 +36,8 @@ require ( github.com/goccy/go-json v0.9.7 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/leodido/go-urn v1.2.1 // indirect github.com/mattn/go-isatty v0.0.14 // indirect diff --git a/go.sum b/go.sum index ee5fcda4134ebd5dc3fc7bc24ed7b70d4d45b110..13d5c9dc7e641a733a4afeabb2394ef4bbd86c45 100644 --- a/go.sum +++ b/go.sum @@ -3,11 +3,13 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -27,6 +29,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/denisenkom/go-mssqldb v0.0.0-20191124224453-732737034ffd h1:83Wprp6ROGeiHFAP8WJdI2RoxALQYgdllERc3N5N2DM= +github.com/denisenkom/go-mssqldb v0.0.0-20191124224453-732737034ffd/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -37,6 +41,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5 h1:Yzb9+7DPaBjB8zlTR87/ElzFsnQfuHnVUVqpZZIcV5Y= +github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5/go.mod h1:a2zkGnVExMxdzMo3M0Hi/3sEU+cWnZpSni0O6/Yb/P0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= @@ -59,8 +65,9 @@ github.com/go-playground/validator/v10 v10.10.0 h1:I7mrTYv78z8k8VXa/qJlOlEXn/nBh github.com/go-playground/validator/v10 v10.10.0/go.mod h1:74x4gJWsvQexRdW8Pn3dXSGrTK4nAUsbPlLADvpJkos= github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOrTI= github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= -github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= -github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc= +github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM= github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= @@ -68,6 +75,8 @@ github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= +github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -94,11 +103,22 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/jinzhu/gorm v1.9.16 h1:+IyIjPEABKRpsu/F8OvDPy9fyQlgsg2luMV2ZIH5i5o= +github.com/jinzhu/gorm v1.9.16/go.mod h1:G3LB3wezTOWM2ITLzPxEXgSkOXAntiLHS7UdBefADcs= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.0.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -123,8 +143,12 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= +github.com/lib/pq v1.1.1 h1:sJZmqHoEaY7f+NPP8pgLB/WxulyR3fewgCM2qaSlBb4= +github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/mattn/go-sqlite3 v1.14.0 h1:mLyGNKR8+Vv9CAU7PphKa2hkEqxxhn8i32J6FPj1/QA= +github.com/mattn/go-sqlite3 v1.14.0/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= @@ -166,12 +190,16 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/rfyiamcool/backoff v1.1.0 h1:p7J7H2XGPedoD4P0lK7msSnHYz8AW7P/DW6ZIju+oP4= +github.com/rfyiamcool/backoff v1.1.0/go.mod h1:h0fxXIAsQcqeR8s9YLQjoLFOSazfQQCaQC24tL0yXkI= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= +github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= @@ -208,7 +236,9 @@ go.uber.org/zap v1.17.0 h1:MTjgFu6ZLKvY6Pvaqk97GlxNBuMpV4Hy/3P6tRGlI2U= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20191205180655-e7c4368fe9dd/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 h1:/UOmuWzQfxxo9UtlXMwuQU8CMgg1eZXqTRwkSQJWKOI= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= @@ -221,6 +251,7 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -232,7 +263,9 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= @@ -352,6 +385,10 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/mysql v1.4.5 h1:u1lytId4+o9dDaNcPCFzNv7h6wvmc92UjNk3z8enSBU= +gorm.io/driver/mysql v1.4.5/go.mod h1:SxzItlnT1cb6e1e4ZRpgJN2VYtcqJgqnHxWr4wsP8oc= +gorm.io/gorm v1.23.8 h1:h8sGJ+biDgBA1AD1Ha9gFCx7h8npU7AsLdlkX0n2TpE= +gorm.io/gorm v1.23.8/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= diff --git a/src/utils/embedfs.go b/src/utils/embedfs.go new file mode 100644 index 0000000000000000000000000000000000000000..efa98913919cf6d9191ebf5761bfd43b924898ca --- /dev/null +++ b/src/utils/embedfs.go @@ -0,0 +1,32 @@ +package utils + +import "embed" + +//ScanEmbedFsDo 扫描文件操作 +func ScanEmbedFsDo(embFs embed.FS, fn func(file string, path string) error) error { + return scanFsDo(embFs, ".", fn) +} + +func scanFsDo(embFs embed.FS, dir string, fn func(file string, path string) error) error { + files, _ := embFs.ReadDir(dir) + for _, f := range files { + //文件路径 + path := dir + "/" + f.Name() + if dir == "." { + path = f.Name() + } + + //目录则继续扫描 + if f.IsDir() { + if err := scanFsDo(embFs, path, fn); err != nil { + return err + } + continue + } + //文件则func处理 + if err := fn(f.Name(), path); err != nil { + return err + } + } + return nil +} diff --git a/src/utils/gorm_test.go b/src/utils/gorm_test.go new file mode 100644 index 0000000000000000000000000000000000000000..5c5f8576c52b5c9a4d9cbd71c0fb0f50c89c53ee --- /dev/null +++ b/src/utils/gorm_test.go @@ -0,0 +1,92 @@ +package utils + +import ( + "github.com/jinzhu/gorm" + "github.com/shopspring/decimal" + "testing" + "time" +) + +func TestScopeCreateSQLv1916(t *testing.T) { + now := time.Now() + type User struct { + Account string + CreatedAt *time.Time + UpdatedAt *time.Time + } + value := &User{ + Account: "admin", + UpdatedAt: &(now), + } + //conn.Create(value) + //conn.SubQuery() + + db := TestingDB() + scope := db.Table("xxx").NewScope(value) + //scope := &gorm.Scope{Value: value} + scope.Set("gorm:insert_option", "ON CONFLICT(id) DO NOTHING") + ScopeCreateSQLv1916(scope) + + t.Logf("SQL: %s,%v", scope.SQL, scope.SQLVars) +} + +func TestScopeCreateSQLv1916_map(t *testing.T) { + now := time.Now() + value := map[string]interface{}{ + "account": "admin", + "updated_at": &now, + } + //conn.Create(value) + //conn.SubQuery() + + db := TestingDB() + scope := db.Table("xxx").NewScope(&value) + //scope := &gorm.Scope{Value: value} + scope.Set("gorm:insert_option", "ON CONFLICT(id) DO NOTHING") + ScopeCreateSQLv1916(scope) + + t.Logf("SQL: %s,%v", scope.SQL, scope.SQLVars) +} + +func TestScopeCreateSQLv1916_skipBindVar(t *testing.T) { + + now := time.Now() + type User struct { + Account string + UpdatedAt *time.Time + } + value := &User{Account: "admin", UpdatedAt: &(now)} + + db := TestingDB() + scope := db.Table("xxx").NewScope(&value) + //scope := &gorm.Scope{Value: value} + scope.InstanceSet("skip_bindvar", true) + scope.Set("gorm:insert_option", "ON CONFLICT(id) DO NOTHING") + ScopeCreateSQLv1916(scope) + + t.Logf("SQL: %s,%v", scope.SQL, scope.SQLVars) +} + +func TestInterfaceToSqlString(t *testing.T) { + now := time.Now() + s := "hello" + sts := struct { + Id int64 `json:"id"` + }{Id: 123} + var nill *int + decm := decimal.NewFromFloat(2.3444) + t.Logf("time: %s", InterfaceToSqlString(now)) + t.Logf("*time: %s", InterfaceToSqlString(&now)) + t.Logf("string: %s", InterfaceToSqlString(s)) + t.Logf("*string: %s", InterfaceToSqlString(&s)) + t.Logf("struct: %s", InterfaceToSqlString(sts)) + t.Logf("*struct: %s", InterfaceToSqlString(&sts)) + t.Logf("nil: %s", InterfaceToSqlString(nill)) + t.Logf("decimal: %s", InterfaceToSqlString(decm)) + t.Logf("*decimal: %s", InterfaceToSqlString(&decm)) +} + +func TestingDB() *gorm.DB { + db, _ := gorm.Open("postgres", "host=127.0.0.1 port=5432 user=postgres dbname=db_test password=root sslmode=disable") + return db +} diff --git a/src/utils/gorm_v1.9.16.go b/src/utils/gorm_v1.9.16.go new file mode 100644 index 0000000000000000000000000000000000000000..f5c1e73c8395202635f08b5b19fcba994fb601e3 --- /dev/null +++ b/src/utils/gorm_v1.9.16.go @@ -0,0 +1,179 @@ +package utils + +import ( + "fmt" + "github.com/jinzhu/gorm" + "reflect" + "strings" + "time" +) + +//ScopeCreateSQLv1916 为scope生成create sql和vars(适用于gorm v1.9.16版本) +// 无法用于map +// scope := db.NewScope(value) +// scope.Set("gorm:insert_option", "ON CONFLICT(id) DO NOTHING") +// ScopeCreateSQLv1916(scope) +// scope.SQL, scope.SQLVars +func ScopeCreateSQLv1916(scope *gorm.Scope) { + var ( + columns, placeholders []string + blankColumnsWithDefaultValue []string + ) + + changeableField := func(scope *gorm.Scope, field *gorm.Field) bool { + if selectAttrs := scope.SelectAttrs(); len(selectAttrs) > 0 { + for _, attr := range selectAttrs { + if field.Name == attr || field.DBName == attr { + return true + } + } + return false + } + + for _, attr := range scope.OmitAttrs() { + if field.Name == attr || field.DBName == attr { + return false + } + } + return true + } + + addExtraSpaceIfExist := func(str string) string { + if str != "" { + return " " + str + } + return "" + } + + for _, field := range scope.Fields() { + if changeableField(scope, field) { + if field.IsNormal && !field.IsIgnored { + if field.IsBlank && field.HasDefaultValue { + blankColumnsWithDefaultValue = append(blankColumnsWithDefaultValue, scope.Quote(field.DBName)) + scope.InstanceSet("gorm:blank_columns_with_default_value", blankColumnsWithDefaultValue) + } else if !field.IsPrimaryKey || !field.IsBlank { + columns = append(columns, scope.Quote(field.DBName)) + placeholders = append(placeholders, scope.AddToVars(field.Field.Interface())) + } + } else if field.Relationship != nil && field.Relationship.Kind == "belongs_to" { + for _, foreignKey := range field.Relationship.ForeignDBNames { + if foreignField, ok := scope.FieldByName(foreignKey); ok && !changeableField(scope, foreignField) { + columns = append(columns, scope.Quote(foreignField.DBName)) + placeholders = append(placeholders, scope.AddToVars(foreignField.Field.Interface())) + } + } + } + } + } + + var ( + returningColumn = "*" + quotedTableName = scope.QuotedTableName() + primaryField = scope.PrimaryField() + extraOption string + insertModifier string + ) + + if str, ok := scope.Get("gorm:insert_option"); ok { + extraOption = fmt.Sprint(str) + } + if str, ok := scope.Get("gorm:insert_modifier"); ok { + insertModifier = strings.ToUpper(fmt.Sprint(str)) + if insertModifier == "INTO" { + insertModifier = "" + } + } + + if primaryField != nil { + returningColumn = scope.Quote(primaryField.DBName) + } + + lastInsertIDOutputInterstitial := scope.Dialect().LastInsertIDOutputInterstitial(quotedTableName, returningColumn, columns) + var lastInsertIDReturningSuffix string + if lastInsertIDOutputInterstitial == "" { + lastInsertIDReturningSuffix = scope.Dialect().LastInsertIDReturningSuffix(quotedTableName, returningColumn) + } + + //特殊处理,支持不绑定参数的sql + if v, ok := scope.InstanceGet("nobind_sql"); ok && v.(bool) == true { + vars := []string{} + for _, va := range scope.SQLVars { + vars = append(vars, interfaceToSqlString(va)) + } + scope.Raw(fmt.Sprintf( + "INSERT%v INTO %v (%v)%v VALUES (%v)%v%v", + addExtraSpaceIfExist(insertModifier), + scope.QuotedTableName(), + strings.Join(columns, ","), + addExtraSpaceIfExist(lastInsertIDOutputInterstitial), + strings.Join(vars, ","), + addExtraSpaceIfExist(extraOption), + addExtraSpaceIfExist(lastInsertIDReturningSuffix), + )) + return + } + + scope.Raw(fmt.Sprintf( + "INSERT%v INTO %v (%v)%v VALUES (%v)%v%v", + addExtraSpaceIfExist(insertModifier), + scope.QuotedTableName(), + strings.Join(columns, ","), + addExtraSpaceIfExist(lastInsertIDOutputInterstitial), + strings.Join(placeholders, ","), + addExtraSpaceIfExist(extraOption), + addExtraSpaceIfExist(lastInsertIDReturningSuffix), + )) +} + +func InterfaceToSqlString(a interface{}) string { + return interfaceToSqlString(a) +} + +//转为sql用字符串 +func interfaceToSqlString(a interface{}) string { + if a == nil { + return "null" + } + t := reflect.ValueOf(a) + + //针对特殊的一些类型 + switch a.(type) { + case time.Time: + a = a.(time.Time).Format(time.RFC3339Nano) + t = reflect.ValueOf(a) + } + + switch t.Kind() { + case reflect.Int: + fallthrough + case reflect.Int8: + fallthrough + case reflect.Int16: + fallthrough + case reflect.Int32: + fallthrough + case reflect.Int64: + fallthrough + case reflect.Uint: + fallthrough + case reflect.Uint8: + fallthrough + case reflect.Uint16: + fallthrough + case reflect.Uint32: + fallthrough + case reflect.Uint64: + return fmt.Sprintf("%d", a) + case reflect.Float32: + fallthrough + case reflect.Float64: + return fmt.Sprintf("%f", a) + case reflect.Ptr: + if t.IsNil() { + return "null" + } + //指针类型的取值后在处理 + return interfaceToSqlString(t.Elem().Interface()) + } + return fmt.Sprintf("'%s'", a) +} diff --git a/src/utils/other.go b/src/utils/other.go index 559f80d4f6d178859f3dd91cb4a9f751ad4266e6..3ca05afb0d043fa8bdb25d3b28ce12f98029dbf0 100644 --- a/src/utils/other.go +++ b/src/utils/other.go @@ -51,11 +51,11 @@ func FmtPageParams(pageNum, pageSize int64) (int64, int64) { //比较版本,-1:小于,0:相等,1:大于 // v1.1.1 compare v1.2.1 func CompareVersion(ver, cVer string) int { - if strings.HasPrefix(ver,"v"){ - ver=ver[1:] + if strings.HasPrefix(ver, "v") { + ver = ver[1:] } - if strings.HasPrefix(cVer,"v"){ - cVer=cVer[1:] + if strings.HasPrefix(cVer, "v") { + cVer = cVer[1:] } verArr := strings.Split(ver, ".") //当前的版本 cVerArr := strings.Split(cVer, ".") //比较的版本 @@ -115,9 +115,9 @@ func Interface2Int64(d interface{}) int64 { var r int64 switch d.(type) { case float32: - r, _ = strconv.ParseInt(fmt.Sprint(d), 10, 64) + r = int64(d.(float32)) case float64: - r, _ = strconv.ParseInt(fmt.Sprint(d), 10, 64) + r = int64(d.(float64)) case int: r = int64(d.(int)) case int8: diff --git a/src/v1/grpcserver/server_grpc_dis_etcd.go b/src/v1/grpcserver/server_grpc_dis_etcd.go index 004592c418c8f6c9cc619a17d0a87133510a3c25..24a2167d95f660cf7b8134c0479b564a64527b5c 100644 --- a/src/v1/grpcserver/server_grpc_dis_etcd.go +++ b/src/v1/grpcserver/server_grpc_dis_etcd.go @@ -1,7 +1,7 @@ package grpcserver import ( - v1etcd "gitee.com/scottq/go-framework/src/v1/clients/etcd" + v1etcd "gitee.com/scottq/go-framework/src/v1/libs/clients/etcd" ) type EtcdRegisterFinder struct { diff --git a/src/v1/handler/db_version_mag_v2.go b/src/v1/handler/db_version_mag_v2.go index 6f78807913545fdbe3f4694653b6cbfddedcaf0a..1a89e3e1a6de0421c679c3f7fcb8643a6eeb9419 100644 --- a/src/v1/handler/db_version_mag_v2.go +++ b/src/v1/handler/db_version_mag_v2.go @@ -90,23 +90,16 @@ func (mag *VersionMagV2) AppendSqlContent(version string, content string) { } func (mag *VersionMagV2) AppendEmbedFS(fss embed.FS) error { - files, err := fss.ReadDir(".") - if err != nil { - return err - } - for _, f := range files { - if f.IsDir() { - continue - } - bytes, err := fss.ReadFile(f.Name()) + return utils.ScanEmbedFsDo(fss, func(file string, path string) error { + bytes, err := fss.ReadFile(path) if err != nil { return err } execSql := string(bytes) - mag.AppendSqlContent(f.Name(), execSql) - } - return nil + mag.AppendSqlContent(file, execSql) + return nil + }) } func (mag *VersionMagV2) SetVars(vars map[string]string) { diff --git a/src/v1/httpserver/server_http_ctx.go b/src/v1/httpserver/server_http_ctx.go index 24edddf69127b92355e88333b8fc8195fd1e0aef..b6f65ae30fc4ae3e00fb65475621c7cd8b70fa40 100644 --- a/src/v1/httpserver/server_http_ctx.go +++ b/src/v1/httpserver/server_http_ctx.go @@ -90,6 +90,9 @@ func (ctx *Ctx) FormJson(stc interface{}) error { switch contentType { case "application/json": + if r.Body == nil { + return nil + } decoder := json.NewDecoder(r.Body) err := decoder.Decode(&stc) if err != nil { @@ -131,6 +134,15 @@ func (ctx *Ctx) Param(name string) string { return ctx.routerParams.ByName(name) } +func (ctx *Ctx) SetParam(kv ...string) { + for i := 1; i < len(kv); i += 2 { + ctx.routerParams = append(ctx.routerParams, httprouter.Param{ + Key: kv[i-1], + Value: kv[i], + }) + } +} + func (ctx *Ctx) WriteHeader(code int) { ctx.writer.WriteHeader(code) } diff --git a/src/v1/httpserver/server_http_ctx_test.go b/src/v1/httpserver/server_http_ctx_test.go new file mode 100644 index 0000000000000000000000000000000000000000..02a4cf2417b228c3da553ef210af5ff7f45adeb9 --- /dev/null +++ b/src/v1/httpserver/server_http_ctx_test.go @@ -0,0 +1,29 @@ +package httpserver + +import ( + "net/http" + "net/http/httptest" + "testing" +) + +func TestHttpCtx(t *testing.T) { + r, _ := http.NewRequest(http.MethodGet, "/block/search", nil) + w := httptest.NewRecorder() + ctx := NewCtx(w, r) + + m := map[string]string{ + "aaa": "vvvv", + "bbb": "ddd", + "eee": "fff", + } + for k, v := range m { + ctx.SetParam(k, v, k, v) + } + + for k, v := range m { + if ctx.Param(k) != v { + t.Errorf("params err %s,%s", k, v) + } + t.Logf(ctx.Param(k)) + } +} diff --git a/src/v1/libs/clients/dbs/conn.go b/src/v1/libs/clients/dbs/conn.go new file mode 100644 index 0000000000000000000000000000000000000000..68c0f0e85a04887f6796b3cf4acc263776a80843 --- /dev/null +++ b/src/v1/libs/clients/dbs/conn.go @@ -0,0 +1,61 @@ +package dbs + +import ( + "database/sql" + v1config "gitee.com/scottq/go-framework/src/v1/config" + "github.com/go-sql-driver/mysql" + "net" + "time" +) + +func NewMysqlConn(c *v1config.DBConfig, fs ...func(config *mysql.Config)) (*sql.DB, error) { + dbConfig := mysql.NewConfig() + dbConfig.User = c.DbUser + dbConfig.Passwd = c.DbPassword + dbConfig.Net = "tcp" + dbConfig.Addr = net.JoinHostPort(c.DbHost, c.DbPort) + dbConfig.DBName = c.DbName + dbConfig.MultiStatements = true + dbConfig.RejectReadOnly = false + dbConfig.ParseTime = true + extParam := make(map[string]string) + if c.MaxConcatLen != "" { + extParam["group_concat_max_len"] = c.MaxConcatLen + } + dbConfig.Params = extParam + + if len(fs) > 0 { + for _, f := range fs { + f(dbConfig) + } + } + + newDb, err := sql.Open("mysql", dbConfig.FormatDSN()) + if err != nil { + return nil, err + } + + //默认 + newDb.SetMaxIdleConns(MaxIdleConns) + newDb.SetMaxOpenConns(MaxOpenConns) + newDb.SetConnMaxLifetime(MaxLifeTime) + newDb.SetConnMaxIdleTime(MaxIdleTime) + + if c.MaxIdleConns > 0 { + //预留并发链接数 + newDb.SetMaxIdleConns(c.MaxIdleConns) + } + if c.MaxOpenConns > 0 { + //最大支持链接 + newDb.SetMaxOpenConns(c.MaxOpenConns) + } + if c.MaxLifetime > 0 { + //每个链接最大生存时间 + newDb.SetConnMaxLifetime(time.Duration(c.MaxLifetime)) + } + if c.MaxIdleTime > 0 { + //每个链接最大空闲时间 + newDb.SetConnMaxIdleTime(time.Duration(c.MaxIdleTime)) + } + return newDb, nil +} diff --git a/src/v1/clients/dbs/consts.go b/src/v1/libs/clients/dbs/consts.go similarity index 100% rename from src/v1/clients/dbs/consts.go rename to src/v1/libs/clients/dbs/consts.go diff --git a/src/v1/clients/dbs/db_mysql.go b/src/v1/libs/clients/dbs/db_mysql.go similarity index 81% rename from src/v1/clients/dbs/db_mysql.go rename to src/v1/libs/clients/dbs/db_mysql.go index 6c38dfc0e6085ac34872c15145d93654083c5c94..c4e09358c5e4b4dc2e2a8a8d2f6edfd1fb02b2c0 100644 --- a/src/v1/clients/dbs/db_mysql.go +++ b/src/v1/libs/clients/dbs/db_mysql.go @@ -5,10 +5,7 @@ import ( "fmt" v1config "gitee.com/scottq/go-framework/src/v1/config" "github.com/go-sql-driver/mysql" - "log" - "net" "strings" - "time" ) type IDBMysql interface { @@ -24,8 +21,12 @@ type IDBMysql interface { CommitTrans() error RollbackTrans() error } + type DBMysql struct { + //db,正常操作db Edb *sql.DB + + //tx,初始为nil,开始事务后为对应事务上下文 Etx *sql.Tx } @@ -37,52 +38,14 @@ func (d *DBMysql) TX() *sql.Tx { return d.Etx } -func NewDBMysql(c v1config.DBConfig) (*DBMysql, error) { - dbConfig := mysql.NewConfig() - dbConfig.User = c.DbUser - dbConfig.Passwd = c.DbPassword - dbConfig.Net = "tcp" - dbConfig.Addr = net.JoinHostPort(c.DbHost, c.DbPort) - dbConfig.DBName = c.DbName - dbConfig.MultiStatements = true - dbConfig.RejectReadOnly = false - extParam := make(map[string]string) - if c.MaxConcatLen != "" { - extParam["group_concat_max_len"] = c.MaxConcatLen - } - dbConfig.Params = extParam - - newDb, err := sql.Open("mysql", dbConfig.FormatDSN()) +func NewDBMysql(c v1config.DBConfig, fs ...func(config *mysql.Config)) (*DBMysql, error) { + conn, err := NewMysqlConn(&c, fs...) if err != nil { - log.Fatalf("connect to db %s failed", dbConfig.FormatDSN()) return nil, err } - //默认 - newDb.SetMaxIdleConns(MaxIdleConns) - newDb.SetMaxOpenConns(MaxOpenConns) - newDb.SetConnMaxLifetime(MaxLifeTime) - newDb.SetConnMaxIdleTime(MaxIdleTime) - - if c.MaxIdleConns > 0 { - //预留并发链接数 - newDb.SetMaxIdleConns(c.MaxIdleConns) - } - if c.MaxOpenConns > 0 { - //最大支持链接 - newDb.SetMaxOpenConns(c.MaxOpenConns) - } - if c.MaxLifetime > 0 { - //每个链接最大生存时间 - newDb.SetConnMaxLifetime(time.Duration(c.MaxLifetime)) - } - if c.MaxIdleTime > 0 { - //每个链接最大空闲时间 - newDb.SetConnMaxIdleTime(time.Duration(c.MaxIdleTime)) - } - return &DBMysql{ - Edb: newDb, + Edb: conn, }, nil } @@ -188,10 +151,11 @@ func (d *DBMysql) ExecuteQuery(tableName string, fields map[string]interface{}, } fieldStr := strings.Join(fieldArr, ",") - orderByStr := strings.Join(orderBy, ",") - if orderByStr != "" { - orderByStr = "ORDER BY " + orderByStr - } + var orderByStr string + //orderByStr = strings.Join(orderBy, ",") + //if orderByStr != "" { + // orderByStr = "ORDER BY " + orderByStr + //} selectSql := fmt.Sprintf("SELECT %s FROM %s %s %s LIMIT 1", fieldStr, tableName, whereStr, orderByStr) @@ -325,3 +289,9 @@ func (d *DBMysql) RollbackTrans() error { } return d.Etx.Rollback() } + +func (d *DBMysql) WithDB(db *sql.DB) *DBMysql { + return &DBMysql{ + Edb: db, + } +} diff --git a/src/v1/libs/clients/dbs/db_mysql_test.go b/src/v1/libs/clients/dbs/db_mysql_test.go new file mode 100644 index 0000000000000000000000000000000000000000..90e9db67813cd0c9775c0dcfbb1df74223660b44 --- /dev/null +++ b/src/v1/libs/clients/dbs/db_mysql_test.go @@ -0,0 +1,50 @@ +package dbs + +import ( + v1config "gitee.com/scottq/go-framework/src/v1/config" + "testing" + "time" +) + +func TestDBMysql_Query(t *testing.T) { + db, err := NewDBMysql(getConfig()) + if err != nil { + t.Fatal(err) + return + } + var user struct { + Id int64 + Account string + } + exist, err := db.ExecuteQuery("user", map[string]interface{}{ + "id": &user.Id, + "account": &user.Account, + }, []string{ + "id=?", + }, []interface{}{ + 2, + }, []string{ + "order by id desc", + }) + if err != nil { + t.Fatal(err) + return + } + t.Logf("query %v %s", exist, err) + if exist { + t.Logf("user %v", user) + } +} + +func getConfig() v1config.DBConfig { + return v1config.DBConfig{ + DbHost: "127.0.0.1", + DbPort: "3306", + DbUser: "root", + DbPassword: "root", + DbName: "db_test", + MaxIdleConns: 8, + MaxOpenConns: 16, + MaxLifetime: int(30 * time.Minute), + } +} diff --git a/src/v1/libs/clients/dbs/gorm_mysql.go b/src/v1/libs/clients/dbs/gorm_mysql.go new file mode 100644 index 0000000000000000000000000000000000000000..41fd6a99779caa84f29c94b814e74e8abb5b2c55 --- /dev/null +++ b/src/v1/libs/clients/dbs/gorm_mysql.go @@ -0,0 +1,280 @@ +package dbs + +import ( + "context" + "database/sql" + "fmt" + v1config "gitee.com/scottq/go-framework/src/v1/config" + "github.com/go-sql-driver/mysql" + ormmysql "gorm.io/driver/mysql" + + "gorm.io/gorm" + "strings" +) + +type IGormMysql interface { + DB() *sql.DB + GormDB() *gorm.DB + GormTX() gorm.ConnPool + + ExecuteSearch(tableName string, fields []string, whereArr []string, whereArgs []interface{}, orderBy []string, pageNum, pageSize int64, rowsHandler SearchRowsHandler) (int64, int64, error) + ExecuteQuery(tableName string, fields map[string]interface{}, whereArr []string, whereArgs []interface{}, orderBy []string) (bool, error) + ExecuteCreate(tableName string, fields map[string]interface{}) (int64, error) + ExecuteUpdate(tableName string, fields map[string]interface{}, whereArr []string, whereArgs []interface{}) (int64, error) + + BeginTrans() (IGormMysql, error) + CommitTrans() error + RollbackTrans() error +} + +type GormMysql struct { + //db,正常操作db,开始事务后为begin后的gorm + Edb *gorm.DB + + //tx,初始为nil,开始事务后为对应事务上下文 + //可判断是否为nil来判断是否开始了事务 + Etx gorm.ConnPool +} + +//NewGormMysql gorm的mysql操作db +func NewGormMysql(c *v1config.DBConfig, fs ...func(config *mysql.Config)) (*GormMysql, error) { + conn, err := NewMysqlConn(c, fs...) + if err != nil { + return nil, err + } + db, err := gorm.Open(ormmysql.New(ormmysql.Config{Conn: conn}), &gorm.Config{}) + if err != nil { + return nil, err + } + + return &GormMysql{ + Edb: db, + }, nil +} + +func (d *GormMysql) ExecuteSearch(tableName string, fields []string, whereArr []string, whereArgs []interface{}, orderBy []string, pageNum, pageSize int64, rowsHandler SearchRowsHandler) (int64, int64, error) { + var err error + var total int64 + + if pageNum <= 0 { + pageNum = 1 + } + + if pageSize <= 0 || pageSize > 1000 { + pageSize = 1000 + } + + whereStr := "" + if len(whereArr) > 0 { + whereStr = "WHERE " + strings.Join(whereArr, " AND ") + } + + countSql := fmt.Sprintf( + "SELECT COUNT(id) AS total FROM `%s` %s LIMIT 1", + tableName, whereStr) + + //total + stmt, err := d.sqlConn().PrepareContext(context.Background(), countSql) + if err != nil { + return 0, total, err + } + defer stmt.Close() + + row := stmt.QueryRow(whereArgs...) + err = row.Scan(&total) + if err == sql.ErrNoRows { + return 0, 0, nil + } else if err != nil { + return 0, total, err + } + + if len(fields) <= 0 { + fields = append(fields, "*") + } + fieldsStr := strings.Join(fields, ",") + orderByStr := strings.Join(orderBy, ",") + if orderByStr != "" { + orderByStr = "ORDER BY " + orderByStr + } + searchSql := fmt.Sprintf( + "SELECT %s FROM `%s` %s %s LIMIT ? OFFSET ?", + fieldsStr, tableName, whereStr, orderByStr) + + stmt1, err := d.sqlConn().PrepareContext(context.Background(), searchSql) + if err != nil { + return 0, total, err + } + defer stmt1.Close() + + whereArgs = append(whereArgs, pageSize) + whereArgs = append(whereArgs, pageSize*(pageNum-1)) + + rows, err := stmt1.Query(whereArgs...) + if err != nil { + return 0, total, err + } + defer rows.Close() + //处理rows + err = rowsHandler(rows) + if err != nil { + return 0, total, err + } + + return 0, total, nil +} + +func (d *GormMysql) ExecuteQuery(tableName string, fields map[string]interface{}, whereArr []string, whereArgs []interface{}, orderBy []string) (bool, error) { + var err error + + var fieldArr = []string{} + var scanArr = []interface{}{} + for k, v := range fields { + fieldArr = append(fieldArr, fmt.Sprintf("`%s`", k)) + scanArr = append(scanArr, v) + } + + whereStr := "" + if len(whereArr) > 0 { + whereStr = "WHERE " + strings.Join(whereArr, " AND ") + } + fieldStr := strings.Join(fieldArr, ",") + + var orderByStr string + //orderByStr = strings.Join(orderBy, ",") + //if orderByStr != "" { + // orderByStr = "ORDER BY " + orderByStr + //} + selectSql := fmt.Sprintf("SELECT %s FROM %s %s %s LIMIT 1", + fieldStr, tableName, whereStr, orderByStr) + + stmt, err := d.sqlConn().PrepareContext(context.Background(), selectSql) + if err != nil { + return false, err + } + defer stmt.Close() + + row := stmt.QueryRow(whereArgs...) + err = row.Scan(scanArr...) + if err == sql.ErrNoRows { + return false, nil + } else if err != nil { + return false, err + } + + return true, nil +} + +func (d *GormMysql) ExecuteCreate(tableName string, fields map[string]interface{}) (int64, error) { + var err error + + var fieldArr = []string{} + var valueArr = []interface{}{} + for k, v := range fields { + fieldArr = append(fieldArr, fmt.Sprintf("`%s`=?", k)) + valueArr = append(valueArr, v) + } + + fieldStr := strings.Join(fieldArr, ",") + + insertSql := fmt.Sprintf("INSERT INTO %s SET %s", tableName, fieldStr) + stmt, err := d.sqlConn().PrepareContext(context.Background(), insertSql) + if err != nil { + return 0, err + } + defer stmt.Close() + + ret, err := stmt.Exec(valueArr...) + if err != nil { + return 0, err + } + + return ret.LastInsertId() +} + +func (d *GormMysql) ExecuteUpdate(tableName string, fields map[string]interface{}, whereArr []string, whereArgs []interface{}) (int64, error) { + var err error + + var fieldArr = []string{} + var valueArr = []interface{}{} + for k, v := range fields { + fieldArr = append(fieldArr, fmt.Sprintf("`%s`=?", k)) + valueArr = append(valueArr, v) + } + + fieldStr := strings.Join(fieldArr, ",") + + whereStr := "" + if len(whereArr) > 0 { + whereStr = "WHERE " + strings.Join(whereArr, " AND ") + } + + if len(whereArgs) > 0 { + for _, v := range whereArgs { + valueArr = append(valueArr, v) + } + } + + updateSql := fmt.Sprintf("UPDATE %s SET %s %s", tableName, fieldStr, whereStr) + stmt, err := d.sqlConn().PrepareContext(context.Background(), updateSql) + if err != nil { + return 0, err + } + defer stmt.Close() + + ret, err := stmt.Exec(valueArr...) + if err != nil { + return 0, err + } + + return ret.RowsAffected() +} + +func (d *GormMysql) DB() *sql.DB { + db, _ := d.Edb.DB() + return db +} + +func (d *GormMysql) GormDB() *gorm.DB { + return d.Edb +} + +func (d *GormMysql) GormTX() gorm.ConnPool { + return d.Etx +} + +func (d *GormMysql) BeginTrans() (IGormMysql, error) { + db := d.Edb.Begin() + return &GormMysql{ + Edb: db, + Etx: db.Statement.ConnPool, + }, nil +} + +func (d *GormMysql) CommitTrans() error { + if d.Etx == nil { + return fmt.Errorf("not begin trans") + } + return d.Edb.Commit().Error +} + +func (d *GormMysql) RollbackTrans() error { + if d.Etx == nil { + return fmt.Errorf("not begin trans") + } + return d.Edb.Rollback().Error +} + +func (d *GormMysql) WithDB(db *gorm.DB) *GormMysql { + return &GormMysql{ + Edb: db, + } +} + +//原生的conn操作 +func (d *GormMysql) sqlConn() gorm.ConnPool { + if d.Etx != nil { + return d.Etx + } + db, _ := d.Edb.DB() + return db +} diff --git a/src/v1/libs/clients/dbs/gorm_mysql_test.go b/src/v1/libs/clients/dbs/gorm_mysql_test.go new file mode 100644 index 0000000000000000000000000000000000000000..ca121a6ac1c68523adc8084b8210d10020ff4029 --- /dev/null +++ b/src/v1/libs/clients/dbs/gorm_mysql_test.go @@ -0,0 +1,34 @@ +package dbs + +import "testing" + +func TestGormMysql_Query(t *testing.T) { + c:=getConfig() + db, err := NewGormMysql(&c) + if err != nil { + t.Fatal(err) + return + } + var user struct { + Id int64 + Account string + } + exist, err := db.ExecuteQuery("user", map[string]interface{}{ + "id": &user.Id, + "account": &user.Account, + }, []string{ + "id=?", + }, []interface{}{ + 2, + }, []string{ + "order by id desc", + }) + if err != nil { + t.Fatal(err) + return + } + t.Logf("query %v %s", exist, err) + if exist { + t.Logf("user %v", user) + } +} diff --git a/src/v1/clients/etcd/client.go b/src/v1/libs/clients/etcd/client.go similarity index 100% rename from src/v1/clients/etcd/client.go rename to src/v1/libs/clients/etcd/client.go diff --git a/src/v1/clients/etcd/service_discovery.go b/src/v1/libs/clients/etcd/service_discovery.go similarity index 100% rename from src/v1/clients/etcd/service_discovery.go rename to src/v1/libs/clients/etcd/service_discovery.go diff --git a/src/v1/clients/etcd/service_register.go b/src/v1/libs/clients/etcd/service_register.go similarity index 100% rename from src/v1/clients/etcd/service_register.go rename to src/v1/libs/clients/etcd/service_register.go diff --git a/src/v1/clients/etcd/service_struct.go b/src/v1/libs/clients/etcd/service_struct.go similarity index 100% rename from src/v1/clients/etcd/service_struct.go rename to src/v1/libs/clients/etcd/service_struct.go diff --git a/src/v1/clients/grpc/client.go b/src/v1/libs/clients/grpc/client.go similarity index 96% rename from src/v1/clients/grpc/client.go rename to src/v1/libs/clients/grpc/client.go index 05fb9d727d5bf8aad3135ef7b1050050bdd32c2f..5e36ec181ca1f9e1ffa453d1d64ef52b33cf2eff 100644 --- a/src/v1/clients/grpc/client.go +++ b/src/v1/libs/clients/grpc/client.go @@ -3,7 +3,7 @@ package grpc import ( "context" "fmt" - v1etcd "gitee.com/scottq/go-framework/src/v1/clients/etcd" + v1etcd "gitee.com/scottq/go-framework/src/v1/libs/clients/etcd" "google.golang.org/grpc" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" diff --git a/src/v1/clients/grpc/option.go b/src/v1/libs/clients/grpc/option.go similarity index 100% rename from src/v1/clients/grpc/option.go rename to src/v1/libs/clients/grpc/option.go diff --git a/src/v1/clients/grpc/weight_balance.go b/src/v1/libs/clients/grpc/weight_balance.go similarity index 100% rename from src/v1/clients/grpc/weight_balance.go rename to src/v1/libs/clients/grpc/weight_balance.go diff --git a/src/v1/clients/metrics/indicator.go b/src/v1/libs/clients/metrics/indicator.go similarity index 73% rename from src/v1/clients/metrics/indicator.go rename to src/v1/libs/clients/metrics/indicator.go index 9b6e619f97ed9f0bf9b0977ee0ffa8edf05841ce..da65946a0093a3f21a84e2c63344f2fe99941a5b 100644 --- a/src/v1/clients/metrics/indicator.go +++ b/src/v1/libs/clients/metrics/indicator.go @@ -1,11 +1,11 @@ package metrics -//指标器接口 +//IIndicator 指标器接口 type IIndicator interface { Report(data map[string]*IndicData) //上报数据 } -//单个指标数据 +//IndicData 单个指标数据 type IndicData struct { Name string Value float64 diff --git a/src/v1/clients/metrics/indicator_log.go b/src/v1/libs/clients/metrics/indicator_log.go similarity index 100% rename from src/v1/clients/metrics/indicator_log.go rename to src/v1/libs/clients/metrics/indicator_log.go diff --git a/src/v1/clients/metrics/indicator_promethus.go b/src/v1/libs/clients/metrics/indicator_promethus.go similarity index 100% rename from src/v1/clients/metrics/indicator_promethus.go rename to src/v1/libs/clients/metrics/indicator_promethus.go diff --git a/src/v1/clients/metrics/statistical.go b/src/v1/libs/clients/metrics/statistical.go similarity index 100% rename from src/v1/clients/metrics/statistical.go rename to src/v1/libs/clients/metrics/statistical.go diff --git a/src/v1/comag/comag.go b/src/v1/libs/comag/comag.go similarity index 100% rename from src/v1/comag/comag.go rename to src/v1/libs/comag/comag.go diff --git a/src/v1/comag/comag_failallfail.go b/src/v1/libs/comag/comag_failallfail.go similarity index 93% rename from src/v1/comag/comag_failallfail.go rename to src/v1/libs/comag/comag_failallfail.go index 88f3ed48e07b71a6e721a82ff19ed88bbe4ad2f4..72bf7dca31d7ac188e94cb638ac4596a4b5f4931 100644 --- a/src/v1/comag/comag_failallfail.go +++ b/src/v1/libs/comag/comag_failallfail.go @@ -5,7 +5,7 @@ import ( "sync" ) -//CoMag 一次失败全部失败 +//FailAllFailCoMag 一次失败全部失败的CoMag type FailAllFailCoMag struct { coArr []CoFunc wg sync.WaitGroup diff --git a/src/v1/comag/comag_test.go b/src/v1/libs/comag/comag_test.go similarity index 100% rename from src/v1/comag/comag_test.go rename to src/v1/libs/comag/comag_test.go diff --git a/src/v1/libs/comag/support_rate.go b/src/v1/libs/comag/support_rate.go new file mode 100644 index 0000000000000000000000000000000000000000..093ef63e18767d7a21267f0b658fb8b669d1c371 --- /dev/null +++ b/src/v1/libs/comag/support_rate.go @@ -0,0 +1,76 @@ +package comag + +import ( + v1log "gitee.com/scottq/go-framework/src/v1/log" + "math" + "time" +) + +type CtlRater struct { + StopWorker + + Do func() + Rate int64 + + v1log.InvokeLog + + ch chan int +} + +func (ctl *CtlRater) Start() { + defer ctl.Info("do by rate stopped") + + if ctl.ch == nil { + ctl.ch = make(chan int, 64) + } + if ctl.Rate <= 0 { + ctl.Rate = 1 + } + + go ctl.ctlRate() + ctl.start() +} + +func (ctl *CtlRater) start() { + ctl.Info("ctl rate start work") + do := ctl.Do + for { + select { + case num, ok := <-ctl.ch: + if !ok { + return + } + if num <= 0 { + continue + } + if do != nil { + for i := 1; i <= num; i++ { + do() + } + } + ctl.Info("do success %d, func is %t", num, do != nil) + case <-ctl.StopNotify(): + return + } + } +} + +func (ctl *CtlRater) ctlRate() { + interval := time.Millisecond * 100 //单次执行间隔 + + ti := time.NewTicker(interval) + add := float64(ctl.Rate) / float64(time.Second/interval) //每次间隔增加的值 + var x float64 = 0 + + ctl.Debug("ctl rate will do every %s, every %.3f", interval, add) + for { + select { + case <-ti.C: + num := math.Floor(x + add) + x = x + add - num + ctl.ch <- int(num) + case <-ctl.StopNotify(): + return + } + } +} diff --git a/src/v1/libs/comag/support_stop.go b/src/v1/libs/comag/support_stop.go new file mode 100644 index 0000000000000000000000000000000000000000..d28437f8cd79206a375f6dd70d1b33507e14fd56 --- /dev/null +++ b/src/v1/libs/comag/support_stop.go @@ -0,0 +1,27 @@ +package comag + +import "sync/atomic" + +type StopWorker struct { + sig int32 //0:运行中,1:已停止 + ch chan bool +} + +func (w *StopWorker) Stop() { + if w.ch == nil { + w.ch = make(chan bool, 1) + } + atomic.StoreInt32(&w.sig, 1) + close(w.ch) +} + +func (w *StopWorker) StopNotify() chan bool { + if w.ch == nil { + w.ch = make(chan bool, 1) + } + return w.ch +} + +func (w *StopWorker) Stopped() bool { + return atomic.LoadInt32(&w.sig) == 1 +} diff --git a/src/v1/libs/comag/support_stop_test.go b/src/v1/libs/comag/support_stop_test.go new file mode 100644 index 0000000000000000000000000000000000000000..92cb9874473adeef0d0da2c9f2a70c0922112505 --- /dev/null +++ b/src/v1/libs/comag/support_stop_test.go @@ -0,0 +1,37 @@ +package comag + +import ( + "testing" + "time" +) + +func TestStopWorker(t *testing.T) { + w := StopWorker{} + t.Logf("start at %s", time.Now()) + + for i := 1; i <= 10; i++ { + id := i + go func() { + for !w.Stopped() { + time.Sleep(time.Millisecond * 500) + } + t.Logf("[%d]stoped at %s", id, time.Now()) + }() + + go func() { + for { + select { + case <-w.StopNotify(): + t.Logf("[%d]stoped notify %s", id, time.Now()) + return + } + } + }() + } + go func() { + time.Sleep(time.Second * 10) + w.Stop() + }() + + time.Sleep(time.Second * 20) +} diff --git a/src/v1/libs/lockers/ifs.go b/src/v1/libs/lockers/ifs.go index 29849fb812140cabe443ea9b376ecf6ee8cef53d..a8f2fc15048d85651abcd699a262493708b3a436 100644 --- a/src/v1/libs/lockers/ifs.go +++ b/src/v1/libs/lockers/ifs.go @@ -1,5 +1,6 @@ package lockers +//DistributeLocker 分布式锁接口 type DistributeLocker interface { Lock(key string, ttl int64) (bool, error) Release(key string) error diff --git a/src/v1/log/log.go b/src/v1/log/log.go index 9a08e2de7eb73858e5b110acf49aa952ed72cbd3..13bfa279794766167defd5b7f4533b5f279b58f1 100644 --- a/src/v1/log/log.go +++ b/src/v1/log/log.go @@ -35,7 +35,7 @@ type ILog interface { Fatal(log string, params ...interface{}) } -//fast use log +//InvokeLog fast use log type InvokeLog struct { logger ILog } @@ -44,6 +44,10 @@ func (ink *InvokeLog) AddLogger(logger ILog) { ink.logger = logger } +func (ink *InvokeLog) GetLogger() ILog { + return ink.logger +} + func (ink *InvokeLog) Log(level int8, msg string) { if ink.logger == nil { return diff --git a/src/v1/log/log_null.go b/src/v1/log/log_null.go new file mode 100644 index 0000000000000000000000000000000000000000..bb515067c8e610d123c895d4a26f7e1532dd855f --- /dev/null +++ b/src/v1/log/log_null.go @@ -0,0 +1,26 @@ +package log + +type nullLog struct{} + +func NewNullLog() *nullLog { + return &nullLog{} +} + +func (l nullLog) Debug(format string, params ...interface{}) { + return +} +func (l nullLog) Info(log string, params ...interface{}) { + return +} +func (l nullLog) Warn(log string, params ...interface{}) { + return +} +func (l nullLog) Error(log string, params ...interface{}) { + return +} +func (l nullLog) Panic(log string, params ...interface{}) { + return +} +func (l nullLog) Fatal(log string, params ...interface{}) { + return +} diff --git a/src/v1/log/log_null_test.go b/src/v1/log/log_null_test.go new file mode 100644 index 0000000000000000000000000000000000000000..ae358d97d17d0c3ac3c875ba8a609a5ecca6ab80 --- /dev/null +++ b/src/v1/log/log_null_test.go @@ -0,0 +1,9 @@ +package log + +import "testing" + +func TestNullLog(t *testing.T){ + l:=NewNullLog() + l.Warn("") +} + diff --git a/src/v1/log/log_zap.go b/src/v1/log/log_zap.go index 1d798fa69495aeadc31d288de7b605d5dda11f65..cf44bcb76e65ec272a3f92e0fd3d2e7606239e38 100644 --- a/src/v1/log/log_zap.go +++ b/src/v1/log/log_zap.go @@ -10,7 +10,7 @@ import ( "strings" ) -//default logger +//DefaultZapLog default logger type DefaultZapLog struct { logPath string config *ZapLogConfig //已废弃,请使用cnf @@ -18,7 +18,7 @@ type DefaultZapLog struct { zapLog *zap.Logger } -//日志配置,已经废弃,请使用config.LogConfig +//ZapLogConfig 日志配置,已经废弃,请使用config.LogConfig type ZapLogConfig struct { SyncStdOut bool //是否同步输出至标准输出 MaxSize int // 每个日志文件保存的大小 单位:M diff --git a/src/v1/wsserver/ws/client.go b/src/v1/wsserver/ws/client.go new file mode 100644 index 0000000000000000000000000000000000000000..bfa596f0933d7219a55a23f4e9a0d9aec023c58a --- /dev/null +++ b/src/v1/wsserver/ws/client.go @@ -0,0 +1,225 @@ +package ws + +import ( + "fmt" + "github.com/google/uuid" + "github.com/gorilla/websocket" + "net/http" + "sync" + "time" +) + +const ( + // Time allowed to write a message to the peer. + writeWait = 10 * time.Second + + // Time allowed to read the next pong message from the peer. + pongWait = 60 * time.Second + + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 + + // Maximum message size allowed from peer. + maxMessageSize = 512 +) + +//UnExceptedCloseErr 不处理的关闭错误 +var UnExceptedCloseErr = []int{ + websocket.CloseGoingAway, + websocket.CloseAbnormalClosure, + + websocket.CloseInternalServerErr, // 网络问题 + websocket.CloseMandatoryExtension, // 命令扩展问题 + websocket.CloseMessageTooBig, // 消息大小太大 + websocket.ClosePolicyViolation, // 政策不允许而关闭 + websocket.CloseProtocolError, // 协议错误而关闭 + websocket.CloseTLSHandshake, // tls握手问题 +} + +var ( + newline = []byte{'\n'} + space = []byte{' '} +) + +var upGrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +// Client is a middleman between the websocket connection and the hub. +type Client struct { + uuid string + + hub *Hub + + // The websocket connection. + conn *websocket.Conn + + // Buffered channel of outbound messages. + send chan []byte + + sendMessage chan interface{} + + closeCh chan struct{} + closeOnce sync.Once + closed bool +} + +func (c *Client) ID() string { + return c.uuid +} + +// readPump pumps messages from the websocket connection to the hub. +// +// The application runs readPump in a per-connection goroutine. The application +// ensures that there is at most one reader on a connection by executing all +// reads from this goroutine. +func (c *Client) readPump() { + defer func() { + c.hub.unregister <- c + c.Close() + }() + c.conn.SetReadLimit(maxMessageSize) + c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.conn.SetPongHandler(func(string) error { + c.conn.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) + for { + select { + case <-c.closeCh: + return + default: + messageType, message, err := c.conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, UnExceptedCloseErr...) { + return + } + logger.Error("ws read err: %v", err) + return + } + //logger.Info("messageType %d", messageType) + if ok, pong := c.hub.PongHandle(messageType, message); ok { + c.send <- pong + break + } + + c.hub.messageCh <- &WebSocketMessage{Client: c, Message: message} + } + + } +} + +func (c *Client) SendMessage(message interface{}) { + defer func() { + if err := recover(); err != nil { + //已close情况下无法发送则忽略 + if err == fmt.Errorf("send on closed channel") && c.closed { + return + } + logger.Error("send err %s", err) + } + }() + c.sendMessage <- message +} + +// writePump pumps messages from the hub to the websocket connection. +// +// A goroutine running writePump is started for each connection. The +// application ensures that there is at most one writer to a connection by +// executing all writes from this goroutine. +func (c *Client) writePump() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + c.Close() + }() + for { + select { + case <-c.closeCh: + return + case message, ok := <-c.sendMessage: + //json消息 + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + // The hub closed the channel. + c.conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + if err := c.conn.WriteJSON(message); err != nil { + logger.Debug("write message err: %s", err) + return + } + //logger.Debug("send message end: %+v", message) + + case message, ok := <-c.send: + //二进制bytes消息 + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + // The hub closed the channel. + c.conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + w, err := c.conn.NextWriter(websocket.TextMessage) + if err != nil { + return + } + w.Write(message) + + // Add queued chat messages to the current websocket message. + n := len(c.send) + for i := 0; i < n; i++ { + w.Write(newline) + w.Write(<-c.send) + } + + if err := w.Close(); err != nil { + return + } + case <-ticker.C: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } +} + +//Close 主动断开 +func (c *Client) Close() { + c.closeOnce.Do(func() { + c.closed = true + close(c.closeCh) + close(c.send) + close(c.sendMessage) + logger.Info("ws closed %s", c.ID()) + }) +} + +// serveWs handles websocket requests from the peer. +func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) { + conn, err := upGrader.Upgrade(w, r, nil) + if err != nil { + w.Write([]byte(fmt.Sprintf("ws upgrade err: %s", err))) + return + } + client := &Client{ + uuid: fmt.Sprint(uuid.New()), + hub: hub, + conn: conn, + send: make(chan []byte, 256), + sendMessage: make(chan interface{}, 256), + closeCh: make(chan struct{}), + } + client.hub.register <- client + + // Allow collection of memory referenced by the caller by doing all work in + // new goroutines. + go client.writePump() + go client.readPump() +} diff --git a/src/v1/wsserver/ws/connector.go b/src/v1/wsserver/ws/connector.go new file mode 100644 index 0000000000000000000000000000000000000000..8d3ac4bfdb5a2010f3720e77f444d988bef3d30c --- /dev/null +++ b/src/v1/wsserver/ws/connector.go @@ -0,0 +1,151 @@ +package ws + +import ( + "encoding/json" + "fmt" + "github.com/gorilla/websocket" + "github.com/rfyiamcool/backoff" + "net/http" + "time" +) + +var wsDialer = websocket.Dialer{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + Proxy: http.ProxyFromEnvironment, +} + +type WsConnector interface { + SendJson(msg chan interface{}) + Send(msg []byte) + Start(reader chan []byte) error + Conn() *websocket.Conn +} + +// wsConnector ws连接器 +type wsConnector struct { + wsURL string + conn *websocket.Conn + reader chan []byte + sendCh chan []byte + retry int +} + +// NewFeatureWsConnector ws辅助连接器 +func NewFeatureWsConnector(url string, retry int) *wsConnector { + return &wsConnector{ + wsURL: url, + sendCh: make(chan []byte, 1), + retry: retry, + } +} + +func (connector *wsConnector) Conn() *websocket.Conn { + return connector.conn +} + +func (connector *wsConnector) Start(reader chan []byte) error { + err := connector.reConnect() + if err != nil { + logger.Error("ws reconnect fail! %s\n", connector.wsURL) + return err + } + logger.Info("ws start connector %s", connector.wsURL) + connector.reader = reader + + go connector.read() + go connector.write() + + return nil +} + +func (connector *wsConnector) SendJson(msg chan interface{}) { + p, _ := json.Marshal(msg) + connector.Send(p) +} + +func (connector *wsConnector) Send(msg []byte) { + connector.sendCh <- msg +} + +func (connector *wsConnector) read() { + defer func() { + logger.Info("ws connector write stop %s", connector.wsURL) + }() + for { + _, p, err := connector.conn.ReadMessage() + if err != nil { + logger.Info("ws reconnect be overdue! %s %s", err, connector.wsURL) + //断线重连 + err = connector.reConnect() + if err != nil { + logger.Error("ws reconnect fail! %s %s", err, connector.wsURL) + } + continue + } + + if connector.reader != nil { + connector.reader <- p + } + } +} + +func (connector *wsConnector) write() { + defer func() { + logger.Info("ws connector write stop %s", connector.wsURL) + }() + for { + select { + case p, ok := <-connector.sendCh: + if !ok { + return + } + if connector.conn == nil { + continue + } + if err := connector.conn.WriteMessage(websocket.TextMessage, p); err != nil { + logger.Error("write message err: %s %s", err, connector.wsURL) + } + } + } +} + +// connect 单次连接 +func (connector *wsConnector) connect() (*websocket.Conn, error) { + url := connector.wsURL + c, _, err := wsDialer.Dial(url, nil) + if err != nil { + return nil, err + } + return c, nil +} + +// reConnect 重试连接 +func (connector *wsConnector) reConnect() error { + //指数退避重连 + maxRecon := connector.retry + b := backoff.NewBackOff( + backoff.WithMinDelay(1*time.Second), + backoff.WithMaxDelay(10*time.Second), + backoff.WithFactor(2), + backoff.WithJitterFlag(true), + ) + var final error + + for index := 0; index <= maxRecon; { + c, err := connector.connect() + if err == nil { + connector.conn = c + logger.Info("ws connect success %s", connector.wsURL) + return nil + } + logger.Error("ws connect fail which %d %s !", index, err) + final = err + + if maxRecon > 0 { + index++ + } + time.Sleep(b.Duration()) + } + return fmt.Errorf("binance connect fail %s", final) +} diff --git a/src/v1/wsserver/ws/connector_test.go b/src/v1/wsserver/ws/connector_test.go new file mode 100644 index 0000000000000000000000000000000000000000..7cc75617ad66a0bd469971a5b04fb9aa3fe33cae --- /dev/null +++ b/src/v1/wsserver/ws/connector_test.go @@ -0,0 +1,17 @@ +package ws + +import "testing" + +func TestWsConnector(t *testing.T) { + ws := NewFeatureWsConnector("ws://127.0.0.1:8090/ws/backend", 0) + + reader := make(chan []byte, 128) + ws.Start(reader) + + for { + select { + case msg := <-reader: + t.Logf("receive %s", string(msg)) + } + } +} diff --git a/src/v1/wsserver/ws/feature_action.go b/src/v1/wsserver/ws/feature_action.go new file mode 100644 index 0000000000000000000000000000000000000000..6b1da85a3fd50214291dc8f57cbfc3e3d9e39b51 --- /dev/null +++ b/src/v1/wsserver/ws/feature_action.go @@ -0,0 +1,58 @@ +package ws + +import ( + "encoding/json" + "fmt" +) + +//FeatureActionsMgr ws操作事件管理器 +type FeatureActionsMgr struct { + actions map[string]IActionEvent +} + +func NewFeatureActionsMgr(actions ...IActionEvent) *FeatureActionsMgr { + actionMap := make(map[string]IActionEvent) + + for _, v := range actions { + actionMap[v.Name()] = v + } + + return &FeatureActionsMgr{ + actions: actionMap, + } +} + +//Parse 解析数据,数据格式满足一下 +func (mgr *FeatureActionsMgr) Parse(data []byte) (IActionEvent, error) { + m := make(map[string]interface{}, 1) + + if err := json.Unmarshal(data, &m); err != nil { + return nil, err + } + + //logger.Info("%s,%v", string(data), m) + if name, exist := m["action"]; exist { + return mgr.GetAction(name.(string)) + } + + return nil, fmt.Errorf("action not support") +} + +//ParseDo 解析数据并且执行 +func (mgr *FeatureActionsMgr) ParseDo(client *Client, data []byte) (IActionEvent, error) { + act, err := mgr.Parse(data) + if err != nil { + return nil, err + } + act.Do(client, data) + return act, nil +} + +//GetAction 获取对应action +func (mgr *FeatureActionsMgr) GetAction(name string) (IActionEvent, error) { + if mgr.actions[name] == nil { + return nil, fmt.Errorf("action not support %s", name) + } + + return mgr.actions[name], nil +} diff --git a/src/v1/wsserver/ws/feature_action_event.go b/src/v1/wsserver/ws/feature_action_event.go new file mode 100644 index 0000000000000000000000000000000000000000..b07c30af805420894a18c9cb58f2783ed56e64f0 --- /dev/null +++ b/src/v1/wsserver/ws/feature_action_event.go @@ -0,0 +1,30 @@ +package ws + +//IActionEvent 定义单个action操作时间 +type IActionEvent interface { + Name() string + Do(client *Client, data []byte) +} + +type ActionEvent struct { + name string + event func(*Client, []byte) +} + +func NewActionEvent(name string, f func(client *Client, data []byte)) *ActionEvent { + return &ActionEvent{ + name: name, + event: f, + } +} + +func (act *ActionEvent) Name() string { + return act.name +} + +func (act *ActionEvent) Do(client *Client, data []byte) { + if act.event == nil { + return + } + act.event(client, data) +} diff --git a/src/v1/wsserver/ws/feature_action_test.go b/src/v1/wsserver/ws/feature_action_test.go new file mode 100644 index 0000000000000000000000000000000000000000..9dbc1ab3425f2e92d9d91b8d715ea125ae6f23fd --- /dev/null +++ b/src/v1/wsserver/ws/feature_action_test.go @@ -0,0 +1,56 @@ +package ws + +import ( + "fmt" + "testing" +) + +func TestFeatureActionsMgr_Parse(t *testing.T) { + mgr := getActionsMgr() + actions := []string{ + "subscribe", + "unsubscribe", + "aaa", + } + + for _, s := range actions { + _, err := mgr.ParseDo(nil, actionData(s)) + t.Logf("action %s,err %s", s, err) + } +} + +func TestFeatureActionsMgr_subscribe(t *testing.T) { + mgr := getActionsMgr() + actions := []string{ + "subscribe", + "unsubscribe", + "aaa", + } + + for _, s := range actions { + _, err := mgr.ParseDo(nil, actionData(s)) + t.Logf("action %s,err %s", s, err) + } +} + +//subscribeData subscribe订阅的数据格式 +func actionData(name string) []byte { + return []byte(fmt.Sprintf(` +{"action":"%s","args":[1,2,3],"subscribe":"aaa"} +`, name)) +} + +func getActionsMgr() *FeatureActionsMgr { + actions := []IActionEvent{ + NewActionEvent("subscribe", func(client *Client, data []byte) { + fmt.Printf("do subscribe %s\n", data) + }), + NewActionEvent("unsubscribe", func(client *Client, data []byte) { + fmt.Printf("do unsubscribe %s\n", data) + }), + NewActionEvent("auth", func(client *Client, data []byte) { + fmt.Printf("auth %s\n", data) + }), + } + return NewFeatureActionsMgr(actions...) +} diff --git a/src/v1/wsserver/ws/feature_broadcast.go b/src/v1/wsserver/ws/feature_broadcast.go new file mode 100644 index 0000000000000000000000000000000000000000..ffb82a8bfa384a057ac4802d3fce42fc5897824e --- /dev/null +++ b/src/v1/wsserver/ws/feature_broadcast.go @@ -0,0 +1,10 @@ +package ws + +//BroadcastFeature 广播辅助功能 +type BroadcastFeature struct{} + +func (b BroadcastFeature) Send(clients []*Client, msg interface{}) { + for _, c := range clients { + c.SendMessage(msg) + } +} diff --git a/src/v1/wsserver/ws/feature_clientmgr.go b/src/v1/wsserver/ws/feature_clientmgr.go new file mode 100644 index 0000000000000000000000000000000000000000..025ef990d99bc916d21a1a098381325398553705 --- /dev/null +++ b/src/v1/wsserver/ws/feature_clientmgr.go @@ -0,0 +1,65 @@ +package ws + +import "sync" + +type IClientsMgr interface { + AddClient(c *Client) + DeleteClient(c *Client) + + Clients() []*Client //return all clients + Conns() int64 //return clients num +} + +func NewClientsMgr() *ClientsMgr { + return &ClientsMgr{ + clients: make(map[*Client]bool, 128), + } +} + +//ClientsMgr 联机客户端管理 +type ClientsMgr struct { + clients map[*Client]bool + + //总连接数量 + conn int64 + + mu sync.RWMutex +} + +func (w *ClientsMgr) AddClient(c *Client) { + w.mu.Lock() + w.clients[c] = true + w.conn += int64(1) + w.mu.Unlock() +} + +func (w *ClientsMgr) DeleteClient(c *Client) { + w.mu.RLock() + _, exist := w.clients[c] + w.mu.RUnlock() + + if !exist { + return + } + + w.mu.Lock() + delete(w.clients, c) + w.conn -= int64(1) + w.mu.Unlock() +} + +func (w *ClientsMgr) Clients() []*Client { + clients := make([]*Client, 0) + + w.mu.RLock() + for c := range w.clients { + clients = append(clients, c) + } + w.mu.RUnlock() + + return clients +} + +func (w *ClientsMgr) Conns() int64 { + return w.conn +} diff --git a/src/v1/wsserver/ws/feature_clientmgr_test.go b/src/v1/wsserver/ws/feature_clientmgr_test.go new file mode 100644 index 0000000000000000000000000000000000000000..80f7787df2f9cb50e53f6f62d16c1f12c60cf3f4 --- /dev/null +++ b/src/v1/wsserver/ws/feature_clientmgr_test.go @@ -0,0 +1,18 @@ +package ws + +import "testing" + +func TestClientsMgr(t *testing.T) { + mgr := NewClientsMgr() + + print := func() { + t.Logf("count %d", mgr.Conns()) + t.Logf("clients %+v", mgr.Clients()) + } + a := &Client{} + mgr.AddClient(a) + mgr.AddClient(&Client{}) + print() + mgr.DeleteClient(a) + print() +} diff --git a/src/v1/wsserver/ws/feature_online.go b/src/v1/wsserver/ws/feature_online.go new file mode 100644 index 0000000000000000000000000000000000000000..9946db48e3bbeae671f5ab9281f4964ab1783513 --- /dev/null +++ b/src/v1/wsserver/ws/feature_online.go @@ -0,0 +1,67 @@ +package ws + +import "sync" + +//IOnline 联机在线管理(已废弃,请使用 IClientsMgr ) +type IOnline interface { + AddClient(c *Client) + DeleteClient(c *Client) + + Clients() []*Client //return all clients + Conns() int64 //return clients num +} + +func NewOnlineWorker() *OnlineWorker { + return &OnlineWorker{ + clients: make(map[*Client]bool, 128), + } +} + +//OnlineWorker 联机管理worker +type OnlineWorker struct { + clients map[*Client]bool + + //总连接数量 + conn int64 + + mu sync.RWMutex +} + +func (w *OnlineWorker) AddClient(c *Client) { + w.mu.Lock() + w.clients[c] = true + w.conn += int64(1) + w.mu.Unlock() +} + +func (w *OnlineWorker) DeleteClient(c *Client) { + w.mu.RLock() + _, exist := w.clients[c] + w.mu.RUnlock() + + if !exist { + return + } + + w.mu.Lock() + delete(w.clients, c) + w.conn -= int64(1) + w.mu.Unlock() + +} + +func (w *OnlineWorker) Clients() []*Client { + clients := make([]*Client, 0) + + w.mu.RLock() + for c := range w.clients { + clients = append(clients, c) + } + w.mu.RUnlock() + + return clients +} + +func (w *OnlineWorker) Conns() int64 { + return w.conn +} diff --git a/src/v1/wsserver/ws/feature_subscribe.go b/src/v1/wsserver/ws/feature_subscribe.go new file mode 100644 index 0000000000000000000000000000000000000000..da4939b3ea202139e90ee922101cb3dcd4225d43 --- /dev/null +++ b/src/v1/wsserver/ws/feature_subscribe.go @@ -0,0 +1,133 @@ +package ws + +import "encoding/json" + +const GroupKey = "group" +const SubscribeKey = "subscribe" +const UnSubscribeKey = "unsubscribe" + +//FeatureSubscribeMgr ws订阅功能,负责解析和管理订阅Group +type FeatureSubscribeMgr struct { + groups map[string]ISubscribeGroup +} + +func NewFeatureSubscribeMgr(groups ...ISubscribeGroup) *FeatureSubscribeMgr { + gm := make(map[string]ISubscribeGroup, len(groups)) + for _, v := range groups { + gm[v.Name()] = v + } + + return &FeatureSubscribeMgr{ + groups: gm, + } +} + +//Parse 解析订阅数据,返回参数: +// group,对应group +// bool,true:加入订阅,false:退出订阅 +// 如果对应数据格式是group的则返回的bool不准确,由上层自己判断 +// 支持的数据格式1: {"action":"subscribe","group":"Trade"} +// 上面这个数据格式适用于配合 FeatureActionsMgr 使用 +// 支持的数据格式2: {"subscribe":"Trade"} +// 支持的数据格式3: {"unsubscribe":"Trade"} +func (sub *FeatureSubscribeMgr) Parse(data []byte) (ISubscribeGroup, bool) { + m := make(map[string]string, 10) + //m["1"] = "1" + + if err := json.Unmarshal(data, &m); err != nil { + logger.Error("parse err %s", err) + return nil, false + } + + //logger.Info("%s,%v", string(data), m) + if name, exist := m[GroupKey]; exist { + return sub.groups[name], true + } + if name, exist := m[SubscribeKey]; exist { + //返回对应group,group=nil是不加入,!=nil则是真正加入 + return sub.groups[name], true + } + if name, exist := m[UnSubscribeKey]; exist { + //返回对应group,group=nil则也是不加入的 + return sub.groups[name], false + } + + return nil, false +} + +//ParseJoinGroup 解析并自动加入group,返回: +// 对应group +// true:订阅加入,false:订阅退出 +func (sub *FeatureSubscribeMgr) ParseJoinGroup(client *Client, data []byte) (ISubscribeGroup, bool) { + group, join := sub.Parse(data) + if group == nil { + return nil, join + } + if join { + sub.JoinGroup(client, group.Name()) + } else { + sub.QuitGroup(client, group.Name()) + } + return group, join +} + +//QuitAllGroup 退出所有group +func (sub *FeatureSubscribeMgr) QuitAllGroup(client *Client) { + for _, c := range sub.groups { + c.DeleteClient(client) + } +} + +//JoinGroup 加入指定group +func (sub *FeatureSubscribeMgr) JoinGroup(client *Client, name string) { + if g, exist := sub.groups[name]; exist { + logger.Info("subscribe group [%s]", g.Name()) + g.AddClient(client) + } +} + +//QuitGroup 退出指定group +func (sub *FeatureSubscribeMgr) QuitGroup(client *Client, name string) { + if g, exist := sub.groups[name]; exist { + logger.Info("unsubscribe group [%s]", g.Name()) + g.DeleteClient(client) + } +} + +//Groups 所有groups +func (sub *FeatureSubscribeMgr) Groups() map[string]ISubscribeGroup { + return sub.groups +} + +//Group 指定group +func (sub *FeatureSubscribeMgr) Group(name string) ISubscribeGroup { + return sub.groups[name] +} + +//Subscribe 订阅方法,用于只处理订阅数据 +func (sub *FeatureSubscribeMgr) Subscribe(client *Client, data []byte) ISubscribeGroup { + g, _ := sub.Parse(data) + if g != nil { + sub.JoinGroup(client, g.Name()) + } + return g +} + +//UnSubscribe 取消订阅方法,用于只处理取消订阅数据 +func (sub *FeatureSubscribeMgr) UnSubscribe(client *Client, data []byte) ISubscribeGroup { + g, _ := sub.Parse(data) + if g != nil { + sub.QuitGroup(client, g.Name()) + } + return g +} + +func (sub *FeatureSubscribeMgr) SubscribeAction(client *Client, data []byte) { + g := sub.Subscribe(client, data) + logger.Debug("subscribe group %t", g == nil) +} + +func (sub *FeatureSubscribeMgr) UnSubscribeAction(client *Client, data []byte) { + g := sub.UnSubscribe(client, data) + logger.Debug("unsubscribe group %t", g == nil) +} diff --git a/src/v1/wsserver/ws/feature_subscribe_group.go b/src/v1/wsserver/ws/feature_subscribe_group.go new file mode 100644 index 0000000000000000000000000000000000000000..15d0b69401351f7fd34173169eb1441b94d1e7ec --- /dev/null +++ b/src/v1/wsserver/ws/feature_subscribe_group.go @@ -0,0 +1,32 @@ +package ws + +//ISubscribeGroup 订阅频道组功能 +type ISubscribeGroup interface { + Name() string + IClientsMgr +} + +//FeatureSubscribeGroup ws订阅Group功能 +type FeatureSubscribeGroup struct { + group string + IClientsMgr +} + +func NewFeatureSubscribeGroup(name ...string) *FeatureSubscribeGroup { + n := "" + if len(name) > 0 { + n = name[0] + } + return &FeatureSubscribeGroup{ + group: n, + IClientsMgr: NewClientsMgr(), + } +} + +func (g *FeatureSubscribeGroup) Name() string { + return g.group +} + +func (g *FeatureSubscribeGroup) SetName(n string) { + g.group = n +} diff --git a/src/v1/wsserver/ws/feature_subscribe_test.go b/src/v1/wsserver/ws/feature_subscribe_test.go new file mode 100644 index 0000000000000000000000000000000000000000..25202f6fde081e4072bcd292c85cc57ffe24d746 --- /dev/null +++ b/src/v1/wsserver/ws/feature_subscribe_test.go @@ -0,0 +1,50 @@ +package ws + +import ( + "fmt" + "testing" +) + +func TestWsSubscribe_ParseJoin(t *testing.T) { + clients := []*Client{ + &Client{}, + &Client{}, + &Client{}, + &Client{}, + } + sub := getSubscribeMgr() + + testGroups := []string{ + "G1", "C1", "G2", "DD", + } + for _, name := range testGroups { + g, join := sub.ParseJoinGroup(clients[0], subscribeData(name)) + t.Logf("join %s %t", g, join) + } +} + +//subscribeData subscribe订阅的数据格式 +func subscribeData(name string) []byte { + return []byte(fmt.Sprintf(` +{"subscribe":"%s"} +`, name)) +} + +//unsubscribeData unsubscribe取消订阅的数据格式 +func unsubscribeData(name string) []byte { + return []byte(fmt.Sprintf(` +{"unsubscribe":"%s"} +`, name)) +} + +func getSubscribeMgr() *FeatureSubscribeMgr { + groups := []ISubscribeGroup{ + NewFeatureSubscribeGroup("G1"), + NewFeatureSubscribeGroup("G2"), + NewFeatureSubscribeGroup("A1"), + NewFeatureSubscribeGroup("A2"), + NewFeatureSubscribeGroup("A3"), + } + + return NewFeatureSubscribeMgr(groups...) +} diff --git a/src/v1/wsserver/ws/hub.go b/src/v1/wsserver/ws/hub.go new file mode 100644 index 0000000000000000000000000000000000000000..7bfa44b2032521acffa152aa53068d823a010cca --- /dev/null +++ b/src/v1/wsserver/ws/hub.go @@ -0,0 +1,160 @@ +package ws + +import ( + v1log "gitee.com/scottq/go-framework/src/v1/log" + "net/http" + "sync" +) + +type IWorkerMgr interface { + SetHub(h *Hub) //设置hub + Online(client *Client) //client 上线 + Close(client *Client) //client close + + Pong(messageType int, message []byte) (bool, []byte) //判断消息是否ping消息并返回pong应答消息体 + Receive(cmd *WebSocketMessage) //接收消息处理 +} + +type Hub struct { + //cmd + messageCh chan *WebSocketMessage + + // Register requests from the clients. + register chan *Client + + // Unregister requests from clients. + unregister chan *Client + + // + worker IWorkerMgr + IClientsMgr + + //hub quite service chan and do once + closed chan struct{} + closeOnce sync.Once +} + +func NewHub() *Hub { + return &Hub{ + messageCh: make(chan *WebSocketMessage, 128), + register: make(chan *Client), + unregister: make(chan *Client), + closed: make(chan struct{}, 0), + IClientsMgr: NewClientsMgr(), + } +} + +func (h *Hub) Run() { + //尝试多个协程运行 + for i := 1; i <= 10; i++ { + go h.run() + } +} + +func (h *Hub) Stop() { + defer logger.Info("ws hub shutdown") + + h.closed <- struct{}{} + for { + select { + case _, ok := <-h.closed: + if !ok { + return + } + } + } +} + +func (h *Hub) SetWorker(worker IWorkerMgr) { + h.worker = worker +} + +//用于ws联机的http方法 +func (h *Hub) ServeHTTP(w http.ResponseWriter, r *http.Request) { + serveWs(h, w, r) +} + +//运行 +func (h *Hub) run() { + defer logger.Info("ws hub stop") + defer h.closeOnce.Do(func() { + close(h.register) + close(h.closed) + close(h.unregister) + close(h.messageCh) + }) + + for { + select { + case <-h.closed: + //系统停止 + h.handleQuit() + return + case client, ok := <-h.register: + if !ok { + return + } + //联机上线 + h.handleOnline(client) + case client, ok := <-h.unregister: + if !ok { + return + } + //下线 + h.handleClose(client) + case message, ok := <-h.messageCh: + if !ok { + return + } + //消息处理 + h.handleMessage(message) + } + } +} + +func (h *Hub) handleQuit() { + clients := h.Clients() + + logger.Info("ws clients quite %d", len(clients)) + + for _, c := range clients { + h.handleClose(c) + } +} + +func (h *Hub) handleClose(client *Client) { + h.DeleteClient(client) + + if h.worker != nil { + h.worker.Close(client) + } + client.Close() +} + +func (h *Hub) handleOnline(client *Client) { + h.AddClient(client) + + if h.worker != nil { + h.worker.Online(client) + } +} + +func (h *Hub) handleMessage(cmd *WebSocketMessage) { + if h.worker != nil { + h.worker.Receive(cmd) + } +} + +func (h *Hub) PongHandle(messageType int, message []byte) (bool, []byte) { + if h.worker != nil { + return h.worker.Pong(messageType, message) + } + return false, []byte{} +} + +//logger +var logger v1log.ILog = v1log.NewNullLog() + +func SetLogger(log v1log.ILog) { + logger = log +} diff --git a/src/v1/wsserver/ws/message.go b/src/v1/wsserver/ws/message.go new file mode 100644 index 0000000000000000000000000000000000000000..88425ec481fa8c3cabe94f9c870ba17421a492fe --- /dev/null +++ b/src/v1/wsserver/ws/message.go @@ -0,0 +1,9 @@ +package ws + +//WebSocketMessage ws接收消息 +type WebSocketMessage struct { + MessageType int `json:"-"` + Message []byte `json:"-"` + + Client *Client `json:"-"` +}