代码拉取完成,页面将自动刷新
package websocket
import (
"context"
"fmt"
"time"
"gitee.com/carlzyhuang/framework/rpc/websocket/protocol"
"google.golang.org/protobuf/proto"
)
type methodHandler func(srv any, ctx SessionContext, dec func(any) error) (response any, err error)
// MethodDesc represents an RPC service's method specification.
type MethodDesc struct {
MethodName string
MessageID uint32
Handler methodHandler
serviceIdx int // index of service in server.services
}
// ServiceDesc represents an RPC service's specification.
type ServiceDesc struct {
ServiceName string
HandlerType any
Methods []MethodDesc
Metadata any
}
// serviceInfo wraps information about a service. It is very similar to
// ServiceDesc and is constructed from it for internal purposes.
type serviceInfo struct {
// Contains the implementation for the methods in this service.
serviceImpl []any
methods map[uint32]*MethodDesc
}
func (s *serviceInfo) registerServiceInfo(sd *ServiceDesc, ss any) {
s.serviceImpl = append(s.serviceImpl, ss)
idx := len(s.serviceImpl) - 1
for i := range sd.Methods {
d := &sd.Methods[i]
d.serviceIdx = idx
if _, exists := s.methods[d.MessageID]; exists {
panic(fmt.Sprintf("duplicate message ID %d for method %s", d.MessageID, d.MethodName))
}
s.methods[d.MessageID] = d
}
}
func newServiceInfo() *serviceInfo {
info := &serviceInfo{
serviceImpl: []any{},
methods: make(map[uint32]*MethodDesc),
}
return info
}
// Dispatch 调度消息到对应的业务处理器
func (r *serviceInfo) Dispatch(session SessionContext, msg *protocol.RequestPacket) (*protocol.ResponsePacket, error) {
method, ok := r.methods[msg.GetHeader().GetMessageId()]
if !ok {
return nil, fmt.Errorf("method not found: %d", msg.GetHeader().GetMessageId())
}
decoder := func(v any) error {
return proto.Unmarshal(msg.Payload, v.(proto.Message))
}
// new a new context with timeout , TODO 注意性能和可配置性
sessionCtx, cancel := context.WithTimeout(session.Context(), 3*time.Second)
defer cancel()
session.WithContext(sessionCtx)
resp, err := method.Handler(r.serviceImpl[method.serviceIdx], session, decoder)
if err != nil {
return nil, err
}
responsePayload, err := proto.Marshal(resp.(proto.Message))
if err != nil {
return nil, err
}
response := &protocol.ResponsePacket{
Header: &protocol.Header{
MessageId: msg.GetHeader().GetMessageId(),
Sequence: msg.GetHeader().GetSequence(),
Timestamp: uint64(time.Now().UnixMilli()),
Version: msg.GetHeader().GetVersion(),
},
Code: int32(protocol.StatusCode_WS_OK),
Payload: responsePayload,
}
return response, nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。