1 Star 0 Fork 0

huobowen/vulcans

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
server.go 5.13 KB
一键复制 编辑 原始数据 按行查看 历史
huobowen 提交于 2025-04-27 11:47 +08:00 . 修改依赖配置
package tcp
import (
"context"
"encoding/hex"
"errors"
vulcanus "gitee.com/huobowen/vulcans"
managev1 "gitee.com/huobowen/vulcans/api/devmanage/v1"
"gitee.com/huobowen/vulcans/connection"
"gitee.com/huobowen/vulcans/transport"
"gitee.com/huobowen/vulcans/utils"
"gitee.com/huobowen/vulcans/utils/conversion"
"gitee.com/huobowen/vulcans/utils/date"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-netty/go-netty"
"github.com/go-netty/go-netty/codec"
nettyUtils "github.com/go-netty/go-netty/utils"
)
type ServerOption func(*Server)
// Port with server port.
func Port(port uint32) ServerOption {
return func(s *Server) {
s.port = port
}
}
type MessageHandler struct {
codec vulcanus.Codec
handler vulcanus.DataHandler
ctx *transport.Context
}
func (h MessageHandler) HandleActive(ctx netty.ActiveContext) {
log.Infof("[%s] new client connection", ctx.Channel().RemoteAddr())
}
func (h MessageHandler) HandleException(ctx netty.ExceptionContext, ex netty.Exception) {
deviceCode := ""
if ctx.Attachment() != nil {
deviceCode = ctx.Attachment().(string)
}
log.Warnf("[%s] [%s] exception in connection, err: %s", ctx.Channel().RemoteAddr(), deviceCode, ex)
if ex.Error() == "EOF" {
ctx.Channel().Close(ex)
}
}
func (h MessageHandler) HandleInactive(ctx netty.InactiveContext, ex netty.Exception) {
deviceCode := ""
if ctx.Attachment() != nil {
deviceCode = ctx.Attachment().(string)
}
log.Infof("[%s] [%s] connection inactive, err: %s", ctx.Channel().RemoteAddr(), deviceCode, ex)
if deviceCode != "" {
metadata := &vulcanus.Metadata{
DeviceCode: deviceCode,
Time: date.Now(),
Type: "连接信息",
Data: "设备断开连接",
}
h.ctx.D.Kp.SendMetadata(utils.ToJsonString(metadata))
h.ctx.Sm.Delete(deviceCode)
}
// disconnected,the default processing is to close the connection
ctx.HandleInactive(ex)
}
func (h MessageHandler) HandleRead(ctx netty.InboundContext, message netty.Message) {
// leave it to the next handler
//ctx.HandleRead(message)
frame := nettyUtils.MustToBytes(message)
if len(frame) == 0 {
return
}
deviceMsg, err := h.codec.Decode(frame)
if err != nil {
log.Errorf("[%s] decode error [%s] metadata: [%s]", ctx.Channel().RemoteAddr(), err.Error(), hex.EncodeToString(frame))
return
}
deviceCode := utils.If(ctx.Attachment() == nil, deviceMsg.DeviceCode, ctx.Attachment()).(string)
// 获取设备配置信息
request := &managev1.GetDeviceConfigRequest{DeviceCode: deviceCode}
reply, err := h.ctx.D.M1.GetDeviceConfig(context.Background(), request)
if err != nil {
log.Errorf("[%s] get device conf err: %s", deviceCode, err.Error())
return
}
if reply.Code != 200 {
log.Warnf("[%s] [%s] not found device conf, metadata: [%s]", deviceCode, hex.EncodeToString(frame))
ctx.Channel().Close(errors.New(deviceCode + " not found device conf"))
return
}
deviceConfig := reply.Data
// 设备连接第一条数据
if ctx.Attachment() == nil {
// 保存设备码到上下文中
ctx.SetAttachment(deviceMsg.DeviceCode)
metadata := &vulcanus.Metadata{
DeviceCode: deviceMsg.DeviceCode,
Time: date.Now(),
Type: "连接信息",
Data: "设备连接",
}
h.ctx.D.Kp.SendMetadata(utils.ToJsonString(metadata))
}
deviceMsg.DeviceCode = ctx.Attachment().(string)
deviceMsg.DeviceType = h.ctx.C.Device.DeviceType
log.Infof("[%s] [%s] command: [%s] metadata: [%s]", deviceCode,
ctx.Channel().RemoteAddr(), deviceMsg.Command, deviceMsg.Metadata)
deviceMsg.DeviceChannel = &connection.DeviceChannel{DeviceCode: deviceCode, Channel: ctx.Channel()}
// 保存设备会话信息
h.ctx.Sm.Add(deviceMsg.DeviceChannel)
deviceMsg.DeviceConfig = deviceConfig
// 向下游发送设备原始数据
metadata := &vulcanus.Metadata{
DeviceCode: deviceMsg.DeviceCode,
Time: date.Now(),
Type: "设备数据",
Data: deviceMsg.Metadata,
}
h.ctx.D.Kp.SendMetadata(utils.ToJsonString(metadata))
// 处理设备指令
err = h.handler.Handle(h.ctx, deviceMsg)
if err != nil {
log.Errorf("[%s] handle error: %s", deviceCode, err.Error())
}
}
// Server is an TCP server wrapper.
type Server struct {
port uint32
b netty.Bootstrap
}
// NewServer creates a TCP server by options.
func NewServer(ctx *transport.Context, codec []codec.Codec, codec2 vulcanus.Codec, handler vulcanus.DataHandler, opts ...ServerOption) *Server {
srv := &Server{}
for _, o := range opts {
o(srv)
}
var childInitializer = func(channel netty.Channel) {
for _, c := range codec {
channel.Pipeline().
AddLast(c)
}
channel.Pipeline().
AddLast(MessageHandler{
ctx: ctx,
codec: codec2,
handler: handler,
})
}
// create bootstrap & listening & accepting
bootstrap := netty.NewBootstrap(netty.WithChildInitializer(childInitializer))
srv.b = bootstrap
return srv
}
// Start start the TCP server.
func (s *Server) Start(ctx context.Context) error {
log.Infof("[TCP] server listening on [%d]", s.port)
err := s.b.Listen(":" + conversion.IntToStr(int(s.port))).Sync()
return err
}
// Stop stop the TCP server.
func (s *Server) Stop(ctx context.Context) error {
log.Info("[TCP] server stopping")
s.b.Shutdown()
return nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/huobowen/vulcans.git
git@gitee.com:huobowen/vulcans.git
huobowen
vulcans
vulcans
33e1650dda83

搜索帮助