4 Star 5 Fork 4

Plato/Service-Box-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
http_proxy.go 24.21 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935
package http_proxy
import (
"context"
"errors"
"fmt"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/unrolled/secure"
"gitee.com/dennis-kk/rpc-go-backend/idlrpc/pkg/protocol"
"gitee.com/dennis-kk/service-box-go/internal/descriptor"
"gitee.com/dennis-kk/service-box-go/internal/jsonpb"
"gitee.com/dennis-kk/service-box-go/util/slog"
"gitee.com/dennis-kk/service-box-go/util/tools"
"github.com/gin-gonic/gin"
)
const (
defaultJsonPbCoder = "jsonpbcpp"
addressLex = `(https?)?(://)?([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+)`
rpcContextName = "rpc_context"
)
const (
StatusClosed = iota //服务已经关闭
StatusResolved //服务正在运行
StatusUpdating //服务正在更新
StatusClosing //服务正在关闭
)
var (
ErrInvalidHttpAddress = errors.New("invalid http address ")
ErrNoEffectiveServicePath = errors.New("no effective service config path ")
ErrInitServiceDBError = errors.New("create service data base error ")
ErrRequestNotFound = errors.New("http call info not found ")
ErrRequestExpired = errors.New("the http request has expired ")
ErrRoutePathExist = errors.New("route path already exists")
ErrCertificateNotExist = errors.New("certificate file does not exist")
ErrInvalidTokenType = errors.New("invalid token type")
)
var (
// hostExp 是匹配host配置的正则表达式
hostExp *regexp.Regexp
)
type (
// Response http 回包结构
Response map[string]interface{}
// RouteHandle 自定义 api 方法处理原型,在独立协程中运行
RouteHandle func(*Context)
//GroupRouteInfo 添加GroupAPI
GroupRouteInfo struct {
Method string // method type, GET POST PUT
Path string //API 路径
Handle RouteHandle //处理函数
}
// CallQueue http 调用请求队列
CallQueue chan IRpcRequest
//ProxyRequestHandle http 请求转换为rpc 请求后回调函数
// 成功返回 call id 和 nil 错误; 失败 则返回0 和具体错误
ProxyRequestHandle func(req *protocol.ProxyRequestPackage) (uint32, error)
//HttpProxy http 代理,启动http服务器,将收到http 转化为 plato rpc协议
HttpProxy struct {
engine *gin.Engine //http router引擎
server *http.Server //http 服务器
db *descriptor.DataBase //服务配置路径
writer *ProxyWriter //日志包装器
coder jsonpb.IJsonPbCoder //pb 编码器
address string //监听端口
tlsKey string //https 私钥
tlsPem string //https 公钥
isSecure bool //是否开启https
shutdownTimeout time.Duration //停机超时时间
httpQueue CallQueue //http api 请求队列 生产者 http api; 消费者 主协程
onHttpCall ProxyRequestHandle //请求回调响应函数
callCache map[uint32]IRpcRequest //请求缓存函数
apis map[string]bool //已经添加的api, 避免重复添加
status int32 //http proxy 服务状态
rw sync.RWMutex //读写锁
changedService map[string]*descriptor.ServiceDescriptor //探测到磁盘中变化
opt *Options //http 配置
}
//ProxyWriter 日志包装器
ProxyWriter struct {
logger slog.BoxLogger
}
)
func init() {
hostExp = regexp.MustCompile(addressLex)
}
func MakeHttpProxy(opts ...Option) *HttpProxy {
options := NewDefaultOptions()
for _, opt := range opts {
opt(options)
}
hp := &HttpProxy{
status: StatusClosed,
isSecure: false,
opt: options,
}
err := hp.Init(options)
if err != nil {
panic(err)
}
return hp
}
func SemicolonMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
rawQuery := c.Request.URL.RawQuery
if rawQuery != "" && strings.Contains(rawQuery, ";") {
c.Request.URL.RawQuery = strings.ReplaceAll(rawQuery, ";", "%3B")
}
c.Next()
}
}
func (pw *ProxyWriter) Write(p []byte) (n int, err error) {
if pw.logger != nil {
pw.logger.Info(string(p))
}
return len(p), nil
}
// Init 初始化配置
func (hp *HttpProxy) Init(option *Options) error {
if err := hp.initResources(option); err != nil {
return err
}
if err := hp.initHttpServer(option); err != nil {
return err
}
if err := hp.initJsonPbCoder(option); err != nil {
return err
}
if err := hp.initServiceDB(option); err != nil {
return err
}
return nil
}
func (hp *HttpProxy) SetOptions(options ...Option) {
for _, opt := range options {
opt(hp.opt)
}
}
func (hp *HttpProxy) SetHttpRequestHandle(handle ProxyRequestHandle) {
hp.onHttpCall = handle
}
func (hp *HttpProxy) OnRpcResponse(ret *protocol.ProxyRespPackage) error {
// 解析获取调用的头信息
call, ok := hp.callCache[ret.Header.CallID]
if call == nil || !ok {
hp.opt.logger.Warn("call info %d not found !", ret.Header.CallID)
return ErrRequestNotFound
}
defer func() {
call.finish()
delete(hp.callCache, ret.Header.CallID)
}()
//检查call 是否已经过期被关闭
if call.isFinished() {
hp.opt.logger.Warn("service %q:%q request %d has expired", call.getUuid(), call.getMethodId(), ret.Header.CallID)
return ErrRequestExpired
}
// 编码为json 字符串
resp := &responseInfo{
status: http.StatusOK,
body: ret.Buffer,
}
switch ret.Header.ErrorCode {
case protocol.IDL_SUCCESS:
resp.status = http.StatusOK
case protocol.IDL_SERVICE_ERROR:
resp.status = http.StatusInternalServerError
case protocol.IDL_SERVICE_NOT_FOUND:
resp.status = http.StatusNotFound
case protocol.IDL_RPC_TIME_OUT:
resp.status = http.StatusGatewayTimeout
case protocol.IDL_RPC_LIMIT:
resp.status = http.StatusForbidden
default:
resp.status = http.StatusInternalServerError
}
call.doRet(hp, resp)
return nil
}
func (hp *HttpProxy) Start() error {
// 从内存加载服务
if err := hp.loadServiceFromDB(); err != nil {
return err
}
//启动db 模块
if err := hp.db.Start(); err != nil {
return err
}
hp.server = &http.Server{
Addr: hp.address,
Handler: hp.engine,
}
ch := make(chan struct{})
go func() {
ch <- struct{}{}
if hp.isSecure {
if err := hp.server.ListenAndServeTLS(hp.tlsPem, hp.tlsKey); err != http.ErrServerClosed {
hp.opt.logger.Fatal("start https server at %q error %q", hp.address, err.Error())
}
hp.opt.logger.Info("start https server at %s ", hp.address)
} else {
if err := hp.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
hp.opt.logger.Fatal("start http server at %q error %q", hp.address, err.Error())
}
hp.opt.logger.Info("start http server at %s ", hp.address)
}
}()
// 等待服务器完全启动成功
<-ch
atomic.StoreInt32(&hp.status, StatusResolved)
return nil
}
func (hp *HttpProxy) Tick() {
//主线程业务调用
hp.onTick()
//热更逻辑
hp.onServiceReload()
}
// IsResolved 返回是否可以正常提供服务的状态
func (hp *HttpProxy) IsResolved() bool {
return atomic.LoadInt32(&hp.status) == StatusResolved
}
func (hp *HttpProxy) ShutDown() {
// 设置状态不在接受新的请求
atomic.StoreInt32(&hp.status, StatusClosing)
// 优雅关闭
ctx, cancel := context.WithTimeout(context.Background(), hp.shutdownTimeout)
defer cancel()
// 关停服务器
if err := hp.server.Shutdown(ctx); err != nil {
hp.opt.logger.Error("shutdown http server error %q", err.Error())
} else {
hp.opt.logger.Info("stop http server successful !")
}
// 关闭管道
close(hp.httpQueue)
atomic.StoreInt32(&hp.status, StatusClosed)
// 卸载 依赖模块
_ = hp.coder.UnInit()
hp.db.ShutDown()
}
// AddRoute 添加自定义的API路径path, 执行函数handle, 以及是否需要使用auth 进行鉴权,鉴权函数可以为空
func (hp *HttpProxy) AddRoute(method string, path string, handle RouteHandle, auth Authenticator) error {
// 加锁
hp.rw.Lock()
defer hp.rw.Unlock()
//return hp.addRouteUnsafe(method, path, handle, auth)
if _, ok := hp.apis[path]; ok {
return ErrRoutePathExist
}
var ginHandles []gin.HandlerFunc
//ctx := &Context{}
if auth != nil {
ginHandles = append(ginHandles, withAuthHandleWrapper(nil, auth))
ginHandles = append(ginHandles, withRouteRpcContextHandlerWrapper(nil, handle))
} else {
ginHandles = append(ginHandles, withRouteHandleWrapper(nil, handle))
}
// 添加api
hp.engine.Handle(method, path, ginHandles...)
// api 加入列表
hp.apis[path] = true
return nil
}
func (hp *HttpProxy) CallPlatoRpcRaw(name string, method string, rawData []byte) ([]byte, error) {
rpcAbs := hp.db.GetRpcAbstract(name, method)
if rpcAbs == nil {
return nil, fmt.Errorf("service %q method %q not found", name, method)
}
pCall := makeRpcCall(rpcAbs.Uuid, rpcAbs.MethodId, name, method)
pCall.data = rawData
// 发送到主协程
hp.onRequest(pCall)
// 启动定时器
timer := time.NewTimer(time.Duration(rpcAbs.Timeout*1e6) * time.Millisecond)
// 总是清理定时器
defer timer.Stop()
// 阻塞等待调用返回
select {
case resp, ok := <-pCall.ret():
if !ok {
return nil, errors.New("service unavailable")
}
if resp == nil {
//管道关闭
return nil, errors.New("service unavailable")
}
if resp.err == nil {
return resp.body, nil
} else {
return nil, resp.err
}
case <-timer.C:
// 关闭请求
pCall.finish()
return nil, errors.New("service call timeout")
}
}
// CallPlatoRpc 原生调用plato rpc协议
func (hp *HttpProxy) CallPlatoRpc(serviceName string, methodName string, params string) ([]byte, error) {
rpcAbs := hp.db.GetRpcAbstract(serviceName, methodName)
if rpcAbs == nil {
return nil, fmt.Errorf("service %q method %q not found", serviceName, methodName)
}
pCall := makeHttpCall(rpcAbs.Uuid, rpcAbs.MethodId, serviceName, methodName)
pCall.body = []byte(params)
// 发送到主协程
hp.onRequest(pCall)
// 启动定时器
timer := time.NewTimer(time.Duration(rpcAbs.Timeout*1e6) * time.Millisecond)
// 总是清理定时器
defer timer.Stop()
// 阻塞等待调用返回
select {
case resp, ok := <-pCall.ret():
// 管道关闭
if !ok || resp == nil {
return nil, errors.New("service unavailable")
}
if resp.err == nil {
return resp.body, nil
} else {
return nil, resp.err
}
case <-timer.C:
pCall.finish()
return nil, errors.New("service call timeout")
}
}
func (hp *HttpProxy) addRouteUnsafe(method string, path string, handle RouteHandle, auth Authenticator) error {
// 是否存在
if _, ok := hp.apis[path]; ok {
return ErrRoutePathExist
}
var ginHandles []gin.HandlerFunc
//ctx := &Context{}
if auth != nil {
ginHandles = append(ginHandles, withAuthHandleWrapper(nil, auth))
ginHandles = append(ginHandles, withRouteRpcContextHandlerWrapper(nil, handle))
} else {
ginHandles = append(ginHandles, withRouteHandleWrapper(nil, handle))
}
// 添加api
hp.engine.Handle(method, path, ginHandles...)
// api 加入列表
hp.apis[path] = true
return nil
}
// AddGroupRoute 添加自定义groupName, 使用auth 函数对组内所有 handle api进行鉴权, 鉴权函数可以为空
func (hp *HttpProxy) AddGroupRoute(groupName string, auth Authenticator, routes ...*GroupRouteInfo) error {
// 计算实际uri地址路径
var uris []string
for _, route := range routes {
uris = append(uris, fmt.Sprintf("%s/%s", groupName, route.Path))
}
// 上锁
hp.rw.Lock()
defer hp.rw.Unlock()
for _, uri := range uris {
if _, ok := hp.apis[uri]; ok {
// 已经有api存在
hp.opt.logger.Warn("%s has been added to router tree", uri)
return ErrRoutePathExist
}
}
var group *gin.RouterGroup
//ctx := &Context{}
if auth != nil {
group = hp.engine.Group(groupName, withAuthHandleWrapper(nil, auth))
} else {
group = hp.engine.Group(groupName)
}
for _, route := range routes {
if auth != nil {
group.Handle(route.Method, route.Path, withRouteRpcContextHandlerWrapper(nil, route.Handle))
} else {
group.Handle(route.Method, route.Path, withRouteHandleWrapper(nil, route.Handle))
}
}
// 记录添加过的api
for _, uri := range uris {
hp.apis[uri] = true
}
return nil
}
// initResources 初始化 http proxy 需要的资源
func (hp *HttpProxy) initResources(option *Options) error {
// 初始化 channel
if option.CacheSize == 0 {
hp.httpQueue = make(CallQueue, 1024)
} else {
hp.httpQueue = make(CallQueue, option.CacheSize)
}
hp.callCache = make(map[uint32]IRpcRequest)
hp.changedService = make(map[string]*descriptor.ServiceDescriptor)
hp.apis = make(map[string]bool)
return nil
}
func (hp *HttpProxy) initHttpServer(option *Options) error {
// 初始化引擎
hp.engine = gin.New()
// 日志包装,设置日志格式
if option.logger != nil {
hp.opt.logger = option.logger
hp.writer = &ProxyWriter{
logger: option.logger.Children(
slog.WithAppName("http_proxy"),
),
}
gin.DefaultWriter = hp.writer
}
// 设置跨域配置
hp.engine.Use(corsMiddleware)
// 设置日志格式
hp.engine.Use(gin.LoggerWithFormatter(customLogFormatter))
// 设置故障恢复
hp.engine.Use(httpProxyWithLogger(option.logger))
// TODO 设置链路跟踪
// 设置分号转换
hp.engine.Use(SemicolonMiddleware())
// 检查端口配置是否合理
if len(option.Address) <= 0 {
hp.opt.logger.Warn("not effective server address config : <nil>")
return ErrInvalidHttpAddress
}
// 匹配用户配置的格式
matchStr := hostExp.FindStringSubmatch(option.Address)
// 没有匹配到正确格式
if len(matchStr) == 0 {
hp.opt.logger.Warn("invalid http proxy address config %q", option.Address)
return ErrInvalidHttpAddress
}
switch len(matchStr) {
case 2:
// ip:port 格式
hp.address = option.Address
case 4:
// https|http:://ip:port 格式, 设置https 证书处理
hp.address = matchStr[3]
if matchStr[1] == "https" {
//检查配置的私钥和证书是否存在
if !tools.IsRegularFile(option.SSLKey) || !tools.IsRegularFile(option.SSLPem) {
return ErrCertificateNotExist
}
hp.isSecure = true
hp.engine.Use(getTlsMiddleware(hp.address))
hp.tlsPem = option.SSLPem
hp.tlsKey = option.SSLKey
}
default:
hp.opt.logger.Warn("invalid http proxy address config %q", option.Address)
return ErrInvalidHttpAddress
}
if option.Mode == "debug" {
hp.engine.GET("/hello", func(c *gin.Context) {
c.String(200, "Hello Plato!!")
})
}
// 设置模式 debug release test
gin.SetMode(option.Mode)
// 设置关机超时时间
if option.ShutdownTimeout == 0 {
option.ShutdownTimeout = 5000
}
hp.shutdownTimeout = time.Duration(option.ShutdownTimeout) * time.Millisecond
return nil
}
// initJsonPbCoder 初始化pb 编码器
func (hp *HttpProxy) initJsonPbCoder(option *Options) error {
if len(option.ServicePath) <= 0 {
return ErrNoEffectiveServicePath
}
if !tools.IsEffectiveDir(option.ServicePath) {
hp.opt.logger.Warn("%q not a effective config path ", option.ServicePath)
return ErrNoEffectiveServicePath
}
if option.PbCoderName == "" {
option.PbCoderName = defaultJsonPbCoder
}
coder, err := jsonpb.MakeJsonPbWithType(option.PbCoderName, jsonpb.WithConfigPath(option.ServicePath), jsonpb.WithPbVersion(option.PbVersion))
if err != nil {
return err
}
hp.coder = coder
return nil
}
func (hp *HttpProxy) initServiceDB(option *Options) error {
// 初始化数据库
hp.db = descriptor.MakeDataBase()
if hp.db == nil {
hp.opt.logger.Warn("create service database error ")
return ErrInitServiceDBError
}
err := hp.db.Init(
descriptor.WithLogger(option.logger),
descriptor.WithConfigPath(option.ServicePath),
descriptor.WithConfigSuffix(".json"),
)
if err != nil {
hp.db.ShutDown()
hp.db = nil
hp.opt.logger.Warn("init service error %q", err.Error())
}
hp.db.AddWatcher(hp.onServiceChange)
return err
}
func (hp *HttpProxy) loadServiceFromDB() error {
err := hp.db.RangeServices(func(serviceUuid uint64, sd *descriptor.ServiceDescriptor, err error) error {
return hp.addServiceAPI(serviceUuid, sd)
})
if err != nil {
hp.opt.logger.Warn("load http gateway api error %q", err.Error())
}
return err
}
func (hp *HttpProxy) onRequest(req IRpcRequest) {
hp.httpQueue <- req
}
func (hp *HttpProxy) addServiceAPI(uuid uint64, service *descriptor.ServiceDescriptor) error {
URITemplate := "/%s/%s"
for _, m := range service.Methods {
if m.Oneway {
hp.opt.logger.Info("skip one way method %q in %q", service.Name, m.Name)
continue
}
// 公共方法直接加入
if m.Public || hp.opt.InternalMode {
uri := fmt.Sprintf(URITemplate, service.Name, m.Name)
// 检查是否已经加载过了,加载过了跳过
if _, ok := hp.apis[uri]; ok {
hp.opt.logger.Info("skip public %s, it has been loaded ", uri)
return nil
}
s := &serviceRoute{
hp: hp,
service: service.Name,
serviceUuid: uuid,
method: m.Name,
methodId: m.Index,
timeout: time.Duration(m.TimeOut * 1e6),
}
_ = hp.addRouteUnsafe(http.MethodPost, uri, s.onRequest, nil)
hp.opt.logger.Info("new public API %s was added to http gateway with given timeout %d", uri, m.TimeOut)
} else if m.Protected {
uri := fmt.Sprintf(URITemplate, service.Name, m.Name)
// 检查是否已经加载过了,加载过了跳过
if _, ok := hp.apis[uri]; ok {
hp.opt.logger.Info("skip protected %s, it has been loaded ", uri)
return nil
}
s := &serviceRoute{
hp: hp,
service: service.Name,
serviceUuid: uuid,
method: m.Name,
methodId: m.Index,
timeout: time.Duration(m.TimeOut * 1e6),
}
_ = hp.addRouteUnsafe(http.MethodPost, uri, s.onRequest, hp.opt.authenticator)
hp.opt.logger.Info("new protected API %s was added to http gateway with given timeout %d", uri, m.TimeOut)
} else {
//private 方法禁止访问
hp.opt.logger.Info("ignore private %s:%s method !", service.Name, m.Name)
}
}
return nil
}
// onServiceChange 响应服务文件状态变化,如果磁盘中文件变动,api自动更新
func (hp *HttpProxy) onServiceChange(changeType int, services []*descriptor.ServiceDescriptor) error {
switch changeType {
case descriptor.ServiceRemove:
for _, s := range services {
hp.opt.logger.Warn("unsupported service's %s deleted ", s.Name)
}
return nil
case descriptor.ServiceModify:
//service changed !
fallthrough
case descriptor.ServiceAdded:
//service added !
hp.rw.Lock()
defer hp.rw.Unlock()
for _, service := range services {
hp.changedService[service.Uuid] = service
hp.opt.logger.Info("receive service %s:%s change ", service.Uuid, service.Name)
}
}
return nil
}
func (hp *HttpProxy) onServiceReload() {
var needReload bool
hp.rw.RLock()
needReload = len(hp.changedService) != 0
hp.rw.RUnlock()
if !needReload {
return
}
hp.opt.logger.Info("start reload http proxy API ...")
hp.rw.RLock()
changedService := make(map[string]*descriptor.ServiceDescriptor)
for uid, service := range hp.changedService {
changedService[uid] = service
}
hp.changedService = make(map[string]*descriptor.ServiceDescriptor)
hp.rw.RUnlock()
// 准备进入热更新流程, 设置服务器状态
atomic.StoreInt32(&hp.status, StatusUpdating)
// 上写锁 更新api
hp.rw.Lock()
for uid, service := range changedService {
// 转换uuid
serviceUuid, err := strconv.ParseInt(uid, 10, 64)
if err != nil {
hp.opt.logger.Warn("parse service's uuid %q err %q", uid, err.Error())
continue
}
if err := hp.addServiceAPI(uint64(serviceUuid), service); err != nil {
hp.opt.logger.Warn("reload service %s %s error !", uid, service.Name)
} else {
hp.opt.logger.Info("service %s has been reloaded", service.Uuid)
}
}
hp.rw.Unlock()
//重新加载json pb
err := hp.coder.Restart()
if err != nil {
hp.opt.logger.Fatal("re-start json pb coder error %s", err.Error())
return
}
// 重新更新为可以服务状态
atomic.StoreInt32(&hp.status, StatusResolved)
hp.opt.logger.Info("hot-update http proxy API successfully!")
}
func (hp *HttpProxy) onTick() {
for {
select {
case call, ok := <-hp.httpQueue:
if !ok {
// 管道已经关闭
return
}
if call == nil {
// 管道关闭,或出现错误直接退出循环
return
}
// 检查 request 状态,可能在队列中太久了,已经超时了
if call.isFinished() {
//hp.opt.logger.Warn("http request /%s/%s has been closed!", call.service, call.method)
hp.opt.logger.Warn("[http] %d:%d request has been closed!", call.getUuid(), call.getMethodId())
return
}
// 转化为需要返回的数据
buffer, err := call.doRpcRaw(hp)
if err != nil {
// 返回错误信息
ret := &responseInfo{
status: http.StatusBadRequest,
err: err,
}
call.doRet(hp, ret)
return
}
// 组装调用头
msg := &protocol.ProxyRequestPackage{
Header: &protocol.RpcProxyCallHeader{
RpcMsgHeader: protocol.RpcMsgHeader{
Type: protocol.ProxyRequestMsg,
Length: uint32(protocol.ProxyCallHeadSize + len(buffer)),
},
ServiceUUID: call.getUuid(),
MethodID: call.getMethodId(),
ServerID: 0,
//CallId 这里不赋值,由外部调用赋值
},
Buffer: buffer,
}
callId, err := hp.onHttpCall(msg)
// 出现错误,直接返回
if err != nil || callId == 0 {
ret := &responseInfo{
status: http.StatusInternalServerError,
err: err,
}
call.doRet(hp, ret)
hp.opt.logger.Warn("transport http request /%d/%d error %q", call.getUuid(), call.getMethodId(), err.Error())
return
}
call.setCallId(callId)
// 无事发生,结束调用
hp.callCache[callId] = call
default:
// 没有需要处理的消息,结束循环
return
}
}
}
func corsMiddleware(c *gin.Context) {
method := c.Request.Method
origin := c.Request.Header.Get("Origin")
if origin != "" {
//接收客户端发送的origin
c.Header("Access-Control-Allow-Origin", "*") // 可将将 * 替换为指定的域名
// 服务器支持的所有跨域请求的方法
c.Header("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE, UPDATE")
// 允许跨域设置可以返回其他子段,可以自定义字段
c.Header("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept, Authorization")
// 允许浏览器(客户端)可以解析的头部
c.Header("Access-Control-Expose-Headers", "Content-Length, Access-Control-Allow-Origin, Access-Control-Allow-Headers, Cache-Control, Content-Language, Content-Type")
// 允许客户端传递校验信息比如 cookie
c.Header("Access-Control-Allow-Credentials", "true")
}
if method == "OPTIONS" {
c.AbortWithStatus(http.StatusNoContent)
}
c.Next()
}
// getTlsMiddleware 获取tls 校验中间件
func getTlsMiddleware(host string) gin.HandlerFunc {
return func(c *gin.Context) {
secureMiddleware := secure.New(secure.Options{
SSLRedirect: true,
SSLHost: host,
})
err := secureMiddleware.Process(c.Writer, c.Request)
// If there was an error, do not continue.
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"error": err.Error(),
})
return
}
c.Next()
}
}
func withRouteHandleWrapper(_ *Context, route RouteHandle) gin.HandlerFunc {
return func(c *gin.Context) {
ctx := &Context{
Context: c,
}
route(ctx)
//ctx.Context = nil
}
}
func withRouteRpcContextHandlerWrapper(_ *Context, route RouteHandle) gin.HandlerFunc {
return func(c *gin.Context) {
ctx := &Context{
Context: c,
}
// 尝试从context加载 user's rpc context
if userData, ok := c.Get(rpcContextName); ok {
ctx.rpcCtx = userData.(*RpcContext)
}
route(ctx)
//ctx.Context = nil
}
}
func withAuthHandleWrapper(_ *Context, authHandle Authenticator) gin.HandlerFunc {
return func(c *gin.Context) {
// 无鉴权函数
if authHandle == nil {
c.Next()
return
}
// 构造context
ctx := &Context{
Context: c,
}
if err := authHandle(ctx); err != nil {
c.Abort()
c.JSON(http.StatusUnauthorized, gin.H{
"error": err.Error(),
})
return
}
// 如果设置了User's rpc context, 则保存到gin 框架的context中
if ctx.rpcCtx != nil {
c.Set(rpcContextName, ctx.rpcCtx)
}
c.Next()
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/dennis-kk/service-box-go.git
git@gitee.com:dennis-kk/service-box-go.git
dennis-kk
service-box-go
Service-Box-go
v0.5.23

搜索帮助