Ai
1 Star 1 Fork 0

carlzyhuang/framework

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
dispatcher.go 2.59 KB
一键复制 编辑 原始数据 按行查看 历史
huangzhiyong 提交于 2025-10-24 15:00 +08:00 . 仓库地址修改
package websocket
import (
"context"
"fmt"
"time"
"gitee.com/carlzyhuang/framework/rpc/websocket/protocol"
"google.golang.org/protobuf/proto"
)
type methodHandler func(srv any, ctx SessionContext, dec func(any) error) (response any, err error)
// MethodDesc represents an RPC service's method specification.
type MethodDesc struct {
MethodName string
MessageID uint32
Handler methodHandler
serviceIdx int // index of service in server.services
}
// ServiceDesc represents an RPC service's specification.
type ServiceDesc struct {
ServiceName string
HandlerType any
Methods []MethodDesc
Metadata any
}
// serviceInfo wraps information about a service. It is very similar to
// ServiceDesc and is constructed from it for internal purposes.
type serviceInfo struct {
// Contains the implementation for the methods in this service.
serviceImpl []any
methods map[uint32]*MethodDesc
}
func (s *serviceInfo) registerServiceInfo(sd *ServiceDesc, ss any) {
s.serviceImpl = append(s.serviceImpl, ss)
idx := len(s.serviceImpl) - 1
for i := range sd.Methods {
d := &sd.Methods[i]
d.serviceIdx = idx
if _, exists := s.methods[d.MessageID]; exists {
panic(fmt.Sprintf("duplicate message ID %d for method %s", d.MessageID, d.MethodName))
}
s.methods[d.MessageID] = d
}
}
func newServiceInfo() *serviceInfo {
info := &serviceInfo{
serviceImpl: []any{},
methods: make(map[uint32]*MethodDesc),
}
return info
}
// Dispatch 调度消息到对应的业务处理器
func (r *serviceInfo) Dispatch(session SessionContext, msg *protocol.RequestPacket) (*protocol.ResponsePacket, error) {
method, ok := r.methods[msg.GetHeader().GetMessageId()]
if !ok {
return nil, fmt.Errorf("method not found: %d", msg.GetHeader().GetMessageId())
}
decoder := func(v any) error {
return proto.Unmarshal(msg.Payload, v.(proto.Message))
}
// new a new context with timeout , TODO 注意性能和可配置性
sessionCtx, cancel := context.WithTimeout(session.Context(), 3*time.Second)
defer cancel()
session.WithContext(sessionCtx)
resp, err := method.Handler(r.serviceImpl[method.serviceIdx], session, decoder)
if err != nil {
return nil, err
}
responsePayload, err := proto.Marshal(resp.(proto.Message))
if err != nil {
return nil, err
}
response := &protocol.ResponsePacket{
Header: &protocol.Header{
MessageId: msg.GetHeader().GetMessageId(),
Sequence: msg.GetHeader().GetSequence(),
Timestamp: uint64(time.Now().UnixMilli()),
Version: msg.GetHeader().GetVersion(),
},
Code: int32(protocol.StatusCode_WS_OK),
Payload: responsePayload,
}
return response, nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/carlzyhuang/framework.git
git@gitee.com:carlzyhuang/framework.git
carlzyhuang
framework
framework
v0.0.18

搜索帮助