1 Star 0 Fork 0

andrew.zhang / libgo

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
svr2msg.go 8.26 KB
一键复制 编辑 原始数据 按行查看 历史
andrew.zhang 提交于 2022-10-31 06:14 . 整理以前代码中...
package netrpc
import (
"fmt"
"math"
"reflect"
"strconv"
"gitee.com/cosmiczh/libgo/loglv"
"gitee.com/cosmiczh/libgo/zbutil"
)
type svrpkti struct {
m_inpb_isptr bool
m_inpb_type reflect.Type
m_inpb_func func(svr2sock *SVR2sock, svr2ply *SVR2ply, inpb Message, memptr ...uint32)
}
var g_namefunc_pkt = make([]map[string]*svrpkti, 0, 10) //key1:msgdir;key2:msgname
var g_codefunc_pkt = make([]map[uint16]*svrpkti, 0, 10) //key1:msgdir;key2:msgname
func enlarge_pkt(msgdir uint8) error {
if int(msgdir) >= cap(g_namefunc_pkt) {
return fmt.Errorf("msgdir:%d too big(>%d)", msgdir, cap(g_namefunc_pkt)-1)
}
if prevlen := len(g_namefunc_pkt); int(msgdir) >= len(g_namefunc_pkt) {
g_namefunc_pkt = append(g_namefunc_pkt, make([]map[string]*svrpkti, int(msgdir)+1-len(g_namefunc_pkt))...)
for ; prevlen < len(g_namefunc_pkt); prevlen++ {
g_namefunc_pkt[prevlen] = make(map[string]*svrpkti, 1)
}
}
if prevlen := len(g_codefunc_pkt); int(msgdir) >= len(g_codefunc_pkt) {
g_codefunc_pkt = append(g_codefunc_pkt, make([]map[uint16]*svrpkti, int(msgdir)+1-len(g_codefunc_pkt))...)
for ; prevlen < len(g_codefunc_pkt); prevlen++ {
g_codefunc_pkt[prevlen] = make(map[uint16]*svrpkti, 1)
}
}
return nil
}
func RegSvrMsg(msgdir uint8, regpb Message, msg_func func(svr2sock *SVR2sock, svr2ply *SVR2ply, inpb Message, memptr ...uint32)) {
l_msgi := &svrpkti{
m_inpb_func: func(svr2sock *SVR2sock, svr2ply *SVR2ply, inpb Message, memptr ...uint32) {
msg_func(svr2sock, svr2ply, inpb, memptr...)
}}
if l_type := reflect.TypeOf(regpb); l_type.Kind() == reflect.Ptr {
l_msgi.m_inpb_isptr = true
l_msgi.m_inpb_type = l_type.Elem()
} else if l_type.Kind() == reflect.Struct {
l_msgi.m_inpb_isptr = false
l_msgi.m_inpb_type = l_type
} else {
panic("regpb struct error.")
}
var l_inpb_name string = l_msgi.m_inpb_type.String()
if len(l_inpb_name) < 1 {
panic(fmt.Sprintf("注册的协议类型错误:无协议名"))
}
if err := enlarge_pkt(msgdir); err != nil {
panic(err.Error())
}
if _, l_found := g_namefunc_pkt[msgdir][l_inpb_name]; l_found {
panic(fmt.Sprintf("重复注册的消息:%s", l_inpb_name))
}
g_namefunc_pkt[msgdir][l_inpb_name] = l_msgi
if msgcode := regpb.GetMsgCode(); zbutil.Between(msgcode, 1, math.MaxUint16) {
g_codefunc_pkt[msgdir][msgcode] = l_msgi
}
}
func (this *SVR2base) OnProcPack(datasock *Datasock, rpk *RPacket) {
var l_svr2sock *SVR2sock = nil
if svr2sock := datasock.GetSVR2sock(); svr2sock == nil {
return
} else {
l_svr2sock = svr2sock
}
l_appendcount := uint16(0)
defer loglv.War.Recoverf("来自[%s]的非法包[附加了%d个8字节]", datasock.GetPeerAddr(), l_appendcount)
l_appendcount = rpk.ReverseReadUInt2()
if l_appendcount == math.MaxUint16 {
var l_protname string = ""
if l_svr2sock.m_pktcmd_size == 0 {
l_protname = rpk.ReadString()
} else if l_msgcode := rpk.ReadCmd(); zbutil.Between(l_msgcode, 1, math.MaxUint16) {
if ty, found := findProtByCode(l_msgcode); found {
l_protname = ty.String()
} else {
// loglv.Inf.Printf("Recv a unknonw broadcast proto that come from %s,msgcode:%d", datasock.GetPeerAddr(), l_msgcode)
// return
l_protname = "MsgCode:" + strconv.Itoa(int(l_msgcode))
}
}
l_dbo_svr2sock := l_svr2sock
this.m_msgsvc.SwitchRoutine(l_dbo_svr2sock, nil, l_protname, func() {
l_svr2plys := func() []*SVR2ply {
defer l_svr2sock.RLock()()
l_svr2plys := make([]*SVR2ply, 0, len(l_svr2sock.m_SVR2plys))
for _, svr2ply := range l_svr2sock.m_SVR2plys {
l_svr2plys = append(l_svr2plys, svr2ply)
}
return l_svr2plys
}()
defer loglv.Fta.Recoverf("协议(%s)BroadCastMsg函数执行发生异常", l_protname)
this.m_msgsvc.BroadCastMsg(l_dbo_svr2sock, l_svr2plys, l_protname, rpk)
})
return
}
l_memptr := make([]uint32, 0, l_appendcount)
for i := uint16(0); i < l_appendcount; i++ {
l_memptr = append(l_memptr, rpk.ReverseReadUInt4())
}
var l_svr2ply *SVR2ply = nil
if len(l_memptr) > 0 {
l_svr2ply, _ = l_svr2sock.FindSVR2ply(l_memptr[0])
if l_svr2ply != nil {
l_memptr = l_memptr[1:]
}
}
l_msgi, l_protype, l_isptr, l_protname := (*svrpkti)(nil), reflect.Type(nil), false, ""
var registered bool = false
if l_svr2sock.m_pktcmd_size == 0 {
l_protname = rpk.ReadString()
l_msgi, registered = g_namefunc_pkt[this.m_msgdir][l_protname]
if found := false; registered {
l_protype, l_isptr = l_msgi.m_inpb_type, l_msgi.m_inpb_isptr
} else if l_protype, found = findProtByName(l_protname); found {
l_isptr = true
} else {
// loglv.Err.Printf("Recv a unknonw proto that come from %s,protname:%s", datasock.GetPeerAddr(), l_protname)
// return
}
} else if l_msgcode := rpk.ReadCmd(); zbutil.Between(l_msgcode, 1, math.MaxUint16) {
l_msgi, registered = g_codefunc_pkt[this.m_msgdir][l_msgcode]
if found := false; registered {
l_protype, l_isptr = l_msgi.m_inpb_type, l_msgi.m_inpb_isptr
l_protname = l_protype.String()
} else if l_protype, found = findProtByCode(l_msgcode); found {
l_isptr = true
l_protname = l_protype.String()
} else {
l_protname = "MsgCode:" + strconv.Itoa(int(l_msgcode))
// loglv.Inf.Printf("Recv a unknonw proto that come from %s,msgcode:%d", datasock.GetPeerAddr(), l_msgcode)
// return
}
}
l_pb_parsed, l_pb_msg, l_pb_err := false, Message(nil), error(nil)
lfun_parse := func() Message {
if l_pb_parsed {
return l_pb_msg
}
l_pb_parsed = true
if l_protype == nil {
loglv.Err.Printf("\n\t协议(%s)的protobuf解析出现错误:not defined protocol.\n", l_protname)
return l_pb_msg
}
if l_isptr {
l_pb_msg = (reflect.New(l_protype).Interface()).(Message)
} else {
l_pb_msg = (reflect.New(l_protype).Elem().Interface()).(Message)
}
if l_pb_err = Unmarshal(rpk.ReadSequence(), l_pb_msg); l_pb_err != nil {
l_pb_msg = nil
loglv.Err.Printf("\n\t协议(%s)的protobuf解析出现错误:%v\n", l_protname, l_pb_err)
return l_pb_msg
}
return l_pb_msg
}
l_dbo_svr2sock := l_svr2sock
lfun_proc := func() {
if !registered {
defer loglv.Fta.Recoverf("协议(%s)NotRegMessge函数执行发生异常", l_protname)
this.m_msgsvc.NotRegMessge(l_dbo_svr2sock, l_svr2ply, l_protname, rpk, lfun_parse, l_memptr...)
} else if lfun_parse() != nil {
defer loglv.Fta.Recoverf("协议(%s)注册函数执行发生异常", l_protname)
l_msgi.m_inpb_func(l_dbo_svr2sock, l_svr2ply, l_pb_msg, l_memptr...)
}
}
if l_svr2ply == nil {
this.m_msgsvc.SwitchRoutine(l_dbo_svr2sock, l_svr2ply, l_protname, lfun_proc)
} else if svr2ply := l_svr2ply; svr2ply.m_wait4fun == nil {
this.m_msgsvc.SwitchRoutine(l_dbo_svr2sock, l_svr2ply, l_protname, lfun_proc)
} else {
select {
case svr2ply.m_wait4fun <- func(idx int) (retpb Message, breakwait bool) {
if idx >= len(svr2ply.m_waitname) {
this.m_msgsvc.SwitchRoutine(l_dbo_svr2sock, l_svr2ply, l_protname, lfun_proc)
return nil, false
} else if callback, found := svr2ply.m_waitname[idx][l_protname]; !found {
this.m_msgsvc.SwitchRoutine(l_dbo_svr2sock, l_svr2ply, l_protname, lfun_proc)
return nil, false
} else if callback == nil {
if registered {
this.m_msgsvc.SwitchRoutine(l_dbo_svr2sock, l_svr2ply, l_protname, lfun_proc)
}
return lfun_parse(), true
} else {
lfun_parse()
return l_pb_msg, callback(l_pb_msg)
}
}:
default:
loglv.Err.Printf("### Goid:%d blocked(push Message)... lose %s", svr2ply.m_goid, l_protname)
}
}
}
func (this *SVR2base) NotRegMessge(svr2sock *SVR2sock, svr2ply *SVR2ply, msg string, rpk *RPacket, parse func() Message, memptr ...uint32) {
loglv.War.Println("尚未注册的消息:", svr2sock.GetMsgDir(), "/", msg)
}
func (this *SVR2base) BroadCastMsg(svr2sock *SVR2sock, svr2plys []*SVR2ply, msg string, rpk *RPacket) {
loglv.War.Println("未处理的广播消息:", svr2sock.GetMsgDir(), "/", msg)
}
func (this *SVR2sock) SendProt(svr2ply *SVR2ply, pb Message, memptr ...uint32) error {
l_wpk, l_protname, err := this.packprot(pb)
if err != nil {
return err
}
if err := this.writetail(false, svr2ply, l_wpk, l_protname, memptr...); err != nil {
return err
}
return this.Datasock.SendPack(l_wpk)
}
func (this *SVR2sock) SendP2All(pb Message) error {
l_wpk, l_protname, err := this.packprot(pb)
if err != nil {
return err
}
if err := this.writetail(true, nil, l_wpk, l_protname); err != nil {
return err
}
return this.Datasock.SendPack(l_wpk)
}
Go
1
https://gitee.com/andrewzh/libgo.git
git@gitee.com:andrewzh/libgo.git
andrewzh
libgo
libgo
v1.0.3

搜索帮助