1 Star 0 Fork 0

andrew.zhang / libgo

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
ioserver.go 6.12 KB
一键复制 编辑 原始数据 按行查看 历史
andrew.zhang 提交于 2022-10-31 06:14 . 整理以前代码中...
package netrpc
import (
"errors"
"fmt"
"net"
"sync/atomic"
"time"
"gitee.com/cosmiczh/libgo/zbgls"
"gitee.com/cosmiczh/libgo/zbrun"
"gitee.com/cosmiczh/libgo/zbsync"
)
// IOServer 服务的运行class
type IOServer struct {
pktlen_size, pktses_size, pktcmd_size uint32
pkthead_size, pkthcmd_size uint32
m_pktcombin zbrun.Runner
m_isSelfNew, m_sendblock_kick, m_isenc bool
m_recv_concurr bool
m_reqchan chan func()
m_recvout_alive, m_sendout_alive int16
m_recv_delay_numMillis int
m_datasocks map[*Datasock]int
m_MTXdatasock zbsync.Mutex
m_lsnsocks map[string]*LsnSocket
m_MTXlsnsock zbsync.Mutex
m_runonce, m_refcount, m_exitflag zbsync.Int32
m_recvbts, m_sendbts atomic.Int64
m_iosvc IOSerivce
m_dbg bool
}
var thispktcombin zbrun.Runner
// IOServer 构造函数
func (this *IOServer) IOServer(iosvc IOSerivce, dbg, recv_concurr, sendblock_kick bool, pktcombin zbrun.Runner,
recvout_alive int16, sendout_alive int16, recv_delay_numMillis int) *IOServer {
if iosvc == nil {
panic("iosvc不能为空")
}
this.m_iosvc, this.m_dbg = iosvc, dbg
this.m_sendblock_kick = sendblock_kick
this.m_recvout_alive, this.m_sendout_alive = recvout_alive, sendout_alive
this.m_recv_delay_numMillis = recv_delay_numMillis
if pktcombin != nil {
this.m_pktcombin = pktcombin
this.m_isSelfNew = false
} else {
this.m_pktcombin = thispktcombin
this.m_isSelfNew = true
}
this.m_recv_concurr, this.m_reqchan = recv_concurr, nil
this.m_datasocks = make(map[*Datasock]int, 100)
this.m_lsnsocks = make(map[string]*LsnSocket, 5)
this.m_runonce = zbsync.Int32{}
this.m_refcount, this.m_exitflag = zbsync.Int32{}, zbsync.Int32{}
this.m_recvbts, this.m_sendbts = atomic.Int64{}, atomic.Int64{}
return this
}
func (this *IOServer) SetPktParse(isenc bool, pktlen_size, pktses_size, pktcmd_size uint32) {
if (pktses_size != 0 && pktses_size != 4) || (pktcmd_size != 0 && pktcmd_size != 2) {
panic("pktses_size/pktcmd_size设置错误.")
}
this.m_isenc = isenc
this.pktlen_size, this.pktses_size, this.pktcmd_size = pktlen_size, pktses_size, pktcmd_size
this.pkthead_size = pktlen_size + pktses_size
this.pkthcmd_size = pktlen_size + pktses_size + pktcmd_size
}
func (this *IOServer) GetRefCount() *zbsync.Int32 { return &this.m_refcount }
func (this *IOServer) GetExitFlag() *zbsync.Int32 { return &this.m_exitflag }
func (this *IOServer) Connect(ip string, port uint16, concted func(*Datasock)) (*Datasock, error) {
var l_target string
if ip == "" {
l_target = fmt.Sprintf(":%d", port)
} else {
l_target = fmt.Sprintf("%s:%d", ip, port)
}
return this.Connect2(l_target, concted)
}
func (this *IOServer) Connect2(target string, concted func(*Datasock)) (*Datasock, error) {
tcpAddr, err := net.ResolveTCPAddr("tcp", target)
if err != nil {
return nil, err
}
l_tcpconn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
return nil, err
}
l_this_sock := new(Datasock).Datasock(l_tcpconn, nil, this)
lf_dec := zbsync.IncDec32(&this.m_refcount, &l_this_sock.m_refcount)
zbgls.GO(func() {
defer lf_dec()
before_recvloop(l_this_sock, concted)
})
return l_this_sock, nil
}
// OpenListen 打开侦听端口
func (this *IOServer) OpenListen(ip string, port uint16) error {
if this == nil {
return nil
}
if port == 0 {
return errors.New("incorrect port")
}
var l_lsnkey string
if ip == "" {
l_lsnkey = fmt.Sprintf(":%d", port)
} else {
l_lsnkey = fmt.Sprintf("%s:%d", ip, port)
}
tcpAddr, err := net.ResolveTCPAddr("tcp", l_lsnkey)
if err != nil {
return err
}
return func() error {
defer this.m_MTXlsnsock.Lock()()
if _, found := this.m_lsnsocks[l_lsnkey]; found {
return errors.New("has listened,please first close")
}
l_lsner, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
return err
}
l_lsnsock := &LsnSocket{m_lsnIP: ip, m_lsnPort: port, m_lsner: *l_lsner, m_iosvr: this, m_refcount: zbsync.Int32{}, m_exitflag: zbsync.Int32{}}
this.m_lsnsocks[l_lsnkey] = l_lsnsock
if this.m_runonce.Load() != 0 {
l_lsnsock.lsn_accept()
}
return nil
}()
}
func (this *IOServer) SetReqChan(reqchan chan func()) {
if !this.m_recv_concurr {
this.m_reqchan = nil
} else {
this.m_reqchan = reqchan
}
defer this.m_MTXlsnsock.Lock()()
if this.m_runonce.CompareAndSwap(0, 1) { //只运行一次
for _, lsn := range this.m_lsnsocks {
lsn.lsn_accept()
}
}
}
// CloseListenByAddr 关闭侦听端口ByAddr
func (this *IOServer) CloseListenByAddr(ip string, port uint16) error {
if this == nil {
return nil
}
l_lsnkey := fmt.Sprintf("%s:%d", ip, port)
defer this.m_MTXlsnsock.Lock()()
if lsn, found := this.m_lsnsocks[l_lsnkey]; found {
lsn.m_exitflag.Store(1)
for lsn.m_refcount.Load() != 0 {
time.Sleep(time.Duration(10 * time.Millisecond))
}
delete(this.m_lsnsocks, l_lsnkey)
return nil
}
return errors.New("not open")
}
// CloseListenByPtr 关闭侦听端口ByPtr
func (this *IOServer) CloseListenByPtr(lsn *LsnSocket) error {
if this == nil {
return nil
}
if lsn == nil {
return errors.New("null parameter")
}
l_lsnkey := fmt.Sprintf("%s:%d", lsn.GetIP(), lsn.GetPort())
defer this.m_MTXlsnsock.Lock()()
if lsn2, found := this.m_lsnsocks[l_lsnkey]; found && lsn2 == lsn {
lsn.m_exitflag.Store(1)
for lsn.m_refcount.Load() != 0 {
time.Sleep(time.Duration(10 * time.Millisecond))
}
delete(this.m_lsnsocks, l_lsnkey)
return nil
} else if found {
return errors.New("incorrect parameter")
}
return errors.New("not open")
}
// CloseAllListen 关闭所有侦听端口
func (this *IOServer) CloseAllListen() {
if this == nil {
return
}
defer this.m_MTXlsnsock.Lock()()
for _, lsn := range this.m_lsnsocks {
lsn.m_exitflag.Store(1)
}
for _, lsn := range this.m_lsnsocks {
for lsn.m_refcount.Load() != 0 {
time.Sleep(time.Duration(10 * time.Millisecond))
}
}
this.m_lsnsocks = make(map[string]*LsnSocket)
}
func (this *IOServer) GetSockNum() int {
return len(this.m_datasocks)
}
func (this *IOServer) GetSendBytes() int64 {
return this.m_sendbts.Load()
}
func (this *IOServer) GetRecvBytes() int64 {
return this.m_recvbts.Load()
}
Go
1
https://gitee.com/andrewzh/libgo.git
git@gitee.com:andrewzh/libgo.git
andrewzh
libgo
libgo
v1.0.3

搜索帮助