1 Star 0 Fork 0

andrew.zhang / libgo

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
svr2ply.go 13.70 KB
一键复制 编辑 原始数据 按行查看 历史
andrew.zhang 提交于 2022-10-31 06:14 . 整理以前代码中...
package netrpc
import (
"fmt"
"reflect"
"runtime"
"sync"
"time"
"unsafe"
"gitee.com/cosmiczh/libgo/loglv"
"gitee.com/cosmiczh/libgo/zbidx"
"gitee.com/cosmiczh/libgo/zbsync"
)
func NewSVR2ply(outer any) *SVR2ply {
return new(SVR2ply).SVR2ply(outer)
}
// -----------------For var g_k1k2mgr = &k1k2mgr
type k1k2 struct {
outname string
m_Key1 uint32 //调用ut.GetPeerIdx(idx)获得索引
m_Key2Aloc zbidx.IdxAloc
}
func (this *k1k2) Key1() int { return zbidx.GetPeerIdx(this.m_Key1) }
type k1k2mgr struct {
m_Key1Aloc zbidx.IdxAloc
zbsync.RWMutex //保护下面两个字段
m_ClsOfOut map[string]*k1k2 //key:out类名,k1k2.outname
m_ClsEs [][]*SVR2ply //key1:zbidx.GetPeerIdx(k1k2.m_Key1),key2:zbidx.GetPeerIdx(k1k2.m_Key2Aloc.AlocPeer())
}
var g_k1k2mgr = &k1k2mgr{m_Key1Aloc: zbidx.NewIdxAloc(false), m_ClsOfOut: make(map[string]*k1k2), m_ClsEs: make([][]*SVR2ply, 0)}
// -----------------End var g_k1k2mgr = &k1k2mgr
// msgdir_relation 消息方向的关联struct
type msgdir_relation struct {
m_svr2sock *SVR2sock
m_peerptr2me uint32 //对端面向(我)给我的数字指针,用这个指针(我)可以唯一标记对方,传递给对方对方可以根据这个数字指针唯一识别(我)
}
// SVR2ply Server上的player对象表示
type SVR2ply struct {
m_outer_iptr uintptr //记录外包对象指针却不增加他的引用计数,省去循环引用的麻烦
m_outer_type reflect.Type
m_k1k2 *k1k2
m_msgdir_rela []msgdir_relation //idx:msgdir
m_rela2count zbsync.Int32
m_myptr2peer uint32 //初始0是非法值,在第一次NewSockRelation的时候产生,在最后一次DelSockRelation的时候废弃设置到0
m_outer any //初始nil是非法值,在第一次NewSockRelation的时候产生,在最后一次DelSockRelation的时候废弃设置到nil
m_waitname []map[string]func(retpb Message) (breakwait bool)
m_wait4fun chan func(idx int) (retpb Message, breakwait bool)
mx_waitfun sync.Once
mx_break zbsync.RWMutex
m_postask chan func() (_break bool, new_tmout_ms int32)
m_goid int64
}
// SVR2ply outer:必须是SVR2ply的外包对象的指针
func (this *SVR2ply) SVR2ply(outer any) *SVR2ply {
this.m_outer_iptr = reflect.ValueOf(outer).Pointer()
this.m_outer_type = reflect.TypeOf(outer)
if this.m_outer_type.Kind() != reflect.Ptr {
panic("必须是外包对象的指针.")
}
this.m_outer_type = this.m_outer_type.Elem()
l_outname := this.m_outer_type.String()
func() {
if k1k2, found := g_k1k2mgr.m_ClsOfOut[l_outname]; found {
this.m_k1k2 = k1k2
return
}
defer g_k1k2mgr.Lock()()
if k1k2, found := g_k1k2mgr.m_ClsOfOut[l_outname]; found {
this.m_k1k2 = k1k2
return
}
this.m_k1k2 = &k1k2{
m_Key2Aloc: zbidx.NewIdxAloc(false),
m_Key1: g_k1k2mgr.m_Key1Aloc.AlocPeer(),
outname: l_outname,
}
g_k1k2mgr.m_ClsOfOut[l_outname] = this.m_k1k2
}()
this.m_msgdir_rela = make([]msgdir_relation, 0)
this.m_rela2count.Store(0)
this.m_myptr2peer = 0
this.m_outer = nil
return this
}
// GetMyPtr 获取对端标记自己的指针
func (this *SVR2ply) GetMyPtr() uint32 { return this.m_myptr2peer }
// GetOuter 获取外包对象的指针,可以直接用type断言转换成外包对象的指针
func (this *SVR2ply) GetOuter() any { return this.m_outer }
func (this *SVR2ply) Get_ISonline(msgdir GetMsgDir) bool {
l_msgdir := int(msgdir.GetMsgDir())
return int(l_msgdir) < len(this.m_msgdir_rela) && this.m_msgdir_rela[l_msgdir].m_svr2sock != nil
}
func (this *SVR2ply) GetSVR2sock(msgdir GetMsgDir) *SVR2sock {
if msgdir := int(msgdir.GetMsgDir()); msgdir < len(this.m_msgdir_rela) {
return this.m_msgdir_rela[msgdir].m_svr2sock
}
return nil
}
func (this *SVR2ply) GetPeerPtr(msgdir GetMsgDir) (peerptr2me uint32) {
if msgdir := int(msgdir.GetMsgDir()); msgdir < len(this.m_msgdir_rela) {
return this.m_msgdir_rela[msgdir].m_peerptr2me
}
return INVALID_peer2iptr
}
func (this *SVR2ply) Get2PeerNum() int {
if k1k2 := this.m_k1k2; k1k2 != nil {
return k1k2.m_Key2Aloc.GetAlocNum()
}
return -int(zbidx.IdxBitwise)
}
// SetPeerPtr 设置本地标记对端的数字指针
func (this *SVR2ply) SetPeerPtr(msgdir GetMsgDir, peerptr2me uint32) {
if msgdir := int(msgdir.GetMsgDir()); msgdir < len(this.m_msgdir_rela) {
this.m_msgdir_rela[msgdir].m_peerptr2me = peerptr2me
}
}
// NewSockRelation peerptr2me=INVALID_PEER2IPTR表示无效值,会被忽略
func (this *SVR2ply) NewSockRelation(svr2sock *SVR2sock, peerptr2me uint32) (old_svr2sock *SVR2sock, old_peerptr2me uint32) {
old_svr2sock, old_peerptr2me = this.newSockRelation(svr2sock, false)
if peerptr2me != INVALID_peer2iptr {
this.m_msgdir_rela[svr2sock.GetMsgDir()].m_peerptr2me = peerptr2me
}
return
}
func (this *SVR2ply) newSockRelation(svr2sock *SVR2sock, it bool) (old_svr2sock *SVR2sock, old_peerptr2me uint32) {
l_svr2sock := svr2sock
if l_svr2sock.m_k1k2 == nil {
func() {
defer l_svr2sock.Lock()()
if l_svr2sock.m_k1k2 == nil {
l_svr2sock.m_k1k2 = this.m_k1k2
}
}()
}
if l_svr2sock.m_k1k2 != this.m_k1k2 {
panic(fmt.Errorf("sockek[%s]与本类[%s]相异,不能关联", l_svr2sock.m_k1k2.outname, this.m_k1k2.outname))
}
l_msgdir := svr2sock.GetMsgDir() //取出socket所代表的消息方向
this.enlarge(l_msgdir)
old_svr2sock = this.m_msgdir_rela[l_msgdir].m_svr2sock
old_peerptr2me = this.m_msgdir_rela[l_msgdir].m_peerptr2me
if old_svr2sock != nil {
this.DelSockRelation(svr2sock)
}
if !it {
svr2sock.AddPlayer(this)
} else {
this.m_msgdir_rela[l_msgdir].m_svr2sock = svr2sock
if this.m_rela2count.Inc() == 1 {
this.m_outer = reflect.NewAt(this.m_outer_type, unsafe.Pointer(this.m_outer_iptr)).Interface()
this.m_myptr2peer = this.m_k1k2.m_Key2Aloc.AlocPeer()
l_idx1 := zbidx.GetPeerIdx(this.m_k1k2.m_Key1)
l_idx2 := zbidx.GetPeerIdx(this.m_myptr2peer)
func() {
defer g_k1k2mgr.Lock()()
if l_idx1 < len(g_k1k2mgr.m_ClsEs) && l_idx2 < len(g_k1k2mgr.m_ClsEs[l_idx1]) {
g_k1k2mgr.m_ClsEs[l_idx1][l_idx2] = this
return
}
if l_idx1 >= len(g_k1k2mgr.m_ClsEs) {
g_k1k2mgr.m_ClsEs = append(g_k1k2mgr.m_ClsEs, make([][]*SVR2ply, l_idx1+1-len(g_k1k2mgr.m_ClsEs))...)
}
if l_idx2 >= len(g_k1k2mgr.m_ClsEs[l_idx1]) {
g_k1k2mgr.m_ClsEs[l_idx1] = append(g_k1k2mgr.m_ClsEs[l_idx1], make([]*SVR2ply, l_idx2+1-len(g_k1k2mgr.m_ClsEs[l_idx1]))...)
}
g_k1k2mgr.m_ClsEs[l_idx1][l_idx2] = this
}()
}
}
return
}
func (this *SVR2ply) DelSockRelation(msgdir GetMsgDir) (myptr2peer uint32) {
if msgdir := int(msgdir.GetMsgDir()); msgdir >= len(this.m_msgdir_rela) {
return 0
} else if this.m_msgdir_rela[msgdir].m_svr2sock != nil {
l_svr2sock := this.m_msgdir_rela[msgdir].m_svr2sock
this.m_msgdir_rela[msgdir].m_svr2sock = nil
l_svr2sock.DelPlayer(this)
if l_refcount := this.m_rela2count.Dec(); l_refcount == 0 {
l_idx1 := zbidx.GetPeerIdx(this.m_k1k2.m_Key1)
l_idx2 := zbidx.GetPeerIdx(this.m_myptr2peer)
func() {
defer g_k1k2mgr.Lock()()
g_k1k2mgr.m_ClsEs[l_idx1][l_idx2] = nil
}()
this.m_k1k2.m_Key2Aloc.FreePeer(this.m_myptr2peer)
this.m_myptr2peer = 0
this.m_outer = nil
} else if l_refcount < 0 {
loglv.Fta.Stackf("引用数严重异常:%d", l_refcount)
}
return 0
}
return this.GetMyPtr()
}
func (this *SVR2ply) clear(msgdir uint8) {
this.m_outer = nil
this.m_msgdir_rela[msgdir].m_svr2sock = nil
}
func (this *SVR2ply) enlarge(msgdir uint8) {
if int(msgdir) >= len(this.m_msgdir_rela) {
this.m_msgdir_rela = append(this.m_msgdir_rela, make([]msgdir_relation, int(msgdir)+1-len(this.m_msgdir_rela))...)
}
}
func (this *SVR2ply) SendPack(msgdir GetMsgDir, wpk *WPacket, memptr ...uint32) error {
if msgdir := int(msgdir.GetMsgDir()); msgdir < len(this.m_msgdir_rela) && this.m_msgdir_rela[msgdir].m_svr2sock != nil {
l_svr2sock := this.m_msgdir_rela[msgdir].m_svr2sock
if err := l_svr2sock.writetail(false, this, wpk, "", memptr...); err != nil {
return err
}
return l_svr2sock.Datasock.SendPack(wpk)
} else {
return fmt.Errorf("SVR2ply与SVR2sock的关联尚未建立或已断开")
}
}
func (this *SVR2ply) AsynProt(tmot_ms uint32, msgdir GetMsgDir, in_para Message, acb func(retcode int8, retpb Message)) error {
if msgdir := int(msgdir.GetMsgDir()); msgdir < len(this.m_msgdir_rela) && this.m_msgdir_rela[msgdir].m_svr2sock != nil {
return this.m_msgdir_rela[msgdir].m_svr2sock.AsynProt(tmot_ms, this, in_para, acb)
} else {
return fmt.Errorf("SVR2ply与SVR2sock的关联尚未建立或已断开")
}
}
func (this *SVR2ply) SyncProt(tmot_ms uint32, msgdir GetMsgDir, in_para Message) (retcode int8, retpb Message) {
if msgdir := int(msgdir.GetMsgDir()); msgdir < len(this.m_msgdir_rela) && this.m_msgdir_rela[msgdir].m_svr2sock != nil {
return this.m_msgdir_rela[msgdir].m_svr2sock.SyncProt(tmot_ms, this, in_para)
} else {
return -120, nil
}
}
func (this *SVR2ply) SendP2All(msgdir GetMsgDir, pb Message) error {
if msgdir := int(msgdir.GetMsgDir()); msgdir < len(this.m_msgdir_rela) && this.m_msgdir_rela[msgdir].m_svr2sock != nil {
return this.m_msgdir_rela[msgdir].m_svr2sock.SendP2All(pb)
} else {
return fmt.Errorf("SVR2ply与SVR2sock的关联尚未建立或已断开")
}
}
func (this *SVR2ply) SendProt(msgdir GetMsgDir, pb Message, memptr ...uint32) error {
if msgdir := int(msgdir.GetMsgDir()); msgdir < len(this.m_msgdir_rela) && this.m_msgdir_rela[msgdir].m_svr2sock != nil {
return this.m_msgdir_rela[msgdir].m_svr2sock.SendProt(this, pb, memptr...)
} else {
return fmt.Errorf("SVR2ply与SVR2sock的关联尚未建立或已断开")
}
}
func (this *SVR2ply) SendProt4Ret(msgdir GetMsgDir, pb Message, tmout_ms int32, canbreak bool, retmsg []string, callback func(retpb Message) (breakwait bool)) (retpb Message, err int8) {
if this.m_goid != 0 && this.m_goid != runtime.Goid() {
return nil, -2
}
this.mx_waitfun.Do(func() {
this.m_wait4fun = make(chan func(idx int) (retpb Message, breakwait bool), 2000)
this.m_postask = make(chan func() (bool, int32), 100)
})
if err := this.SendProt(msgdir, pb); err != nil {
return nil, -1
}
return this.wait4ret(tmout_ms, canbreak, retmsg, callback)
}
func (this *SVR2ply) Wait4Ret(tmout_ms int32, canbreak bool, retmsg []string, callback func(retpb Message) (breakwait bool)) (retpb Message, err int8) {
if this.m_goid != 0 && this.m_goid != runtime.Goid() {
return nil, -2
}
this.mx_waitfun.Do(func() {
this.m_wait4fun = make(chan func(idx int) (retpb Message, breakwait bool), 2000)
this.m_postask = make(chan func() (bool, int32), 100)
})
return this.wait4ret(tmout_ms, canbreak, retmsg, callback)
}
func (this *SVR2ply) wait4ret(tmout_ms int32, canbreak bool, retmsg []string, callback func(retpb Message) (breakwait bool)) (retpb Message, err int8) {
l_waitname := make(map[string]func(retpb Message) (breakwait bool), len(retmsg))
for i := 0; i < len(retmsg); i++ {
l_waitname[retmsg[i]] = callback
}
l_idx := len(this.m_waitname)
this.m_waitname = append(this.m_waitname, l_waitname)
defer func() {
if l_idx < len(this.m_waitname) {
this.m_waitname = this.m_waitname[:l_idx]
}
}()
if tmout_ms < 0 {
tmout_ms = 0
}
l_now := time.Now()
l_endtime := l_now.Add(time.Duration(tmout_ms) * time.Millisecond)
for ; l_now.Before(l_endtime); l_now = time.Now() {
if lpost := this.m_postask; lpost == nil {
break
} else if !canbreak {
select {
case wait4fun, _ := <-this.m_wait4fun:
if wait4fun == nil { //chan被关闭
return nil, -100
} else if retpb, breakwait := wait4fun(l_idx); breakwait { //breakwait
if retpb == nil {
return nil, -3
}
return retpb, 0 //等到消息了
}
case <-time.After(l_endtime.Sub(l_now)):
return nil, 100 //超时
}
} else {
select {
case fpostask, _ := <-lpost:
if fpostask == nil {
return nil, -100
} else if _break, tmout_ms := fpostask(); _break {
return nil, 99 //打断,外层应该清理
} else if tmout_ms >= 0 {
l_endtime = time.Now().Add(time.Duration(tmout_ms) * time.Millisecond)
}
case wait4fun, _ := <-this.m_wait4fun:
if wait4fun == nil { //chan被关闭
return nil, -100
} else if retpb, breakwait := wait4fun(l_idx); breakwait { //breakwait
if retpb == nil {
return nil, -3
}
return retpb, 0 //等到消息了
}
case <-time.After(l_endtime.Sub(l_now)):
return nil, 100 //超时
}
}
}
return nil, 100 //超时
}
func (this *SVR2ply) PostTask(cancel_all bool, task func() (cancel_all bool, new_tmout_ms int32)) {
if cancel_all || task != nil {
defer this.mx_break.RLock()()
if this.m_postask != nil {
this.m_postask <- func() (ret_cancel_all bool, ret_new_tmout_ms int32) {
if task == nil {
ret_cancel_all, ret_new_tmout_ms = cancel_all, -1
} else if ret_cancel_all, ret_new_tmout_ms = task(); !ret_cancel_all {
ret_cancel_all = cancel_all
}
if ret_cancel_all {
if this.m_waitname = nil; this.m_postask != nil && len(this.m_postask) > 0 {
defer this.mx_break.Lock()()
this.m_postask = make(chan func() (bool, int32), 100)
}
}
return
}
}
}
}
func (this *SVR2ply) AiMain(fun_init, fun_idle func()) {
this.mx_waitfun.Do(func() {
this.m_wait4fun = make(chan func(idx int) (retpb Message, breakwait bool), 2000)
this.m_postask = make(chan func() (bool, int32), 100)
})
this.m_goid = runtime.Goid()
defer loglv.Fta.Recoverf("SVR2ply::AiMain() exception.")
if fun_init != nil {
fun_init()
}
for this.m_postask != nil {
fun_idle()
}
}
func (this *SVR2ply) AiRunning() bool {
return this.m_goid == runtime.Goid() && this.m_postask != nil
}
func (this *SVR2ply) AiStop() {
if this.m_goid == runtime.Goid() {
this.PostTask(true, nil)
close(this.m_postask)
this.m_postask = nil
return
}
this.PostTask(true, func() (cancel_all bool, new_tmout_ms int32) {
defer this.mx_break.Lock()()
close(this.m_postask)
this.m_postask = nil
return true, -1
})
}
Go
1
https://gitee.com/andrewzh/libgo.git
git@gitee.com:andrewzh/libgo.git
andrewzh
libgo
libgo
v1.0.3

搜索帮助