1 Star 0 Fork 0

dream_hat/dreamgo

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
svr.go 17.37 KB
一键复制 编辑 原始数据 按行查看 历史
dream_hat 提交于 2023-12-20 10:57 +08:00 . init
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755
package dream
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"io"
"math/rand"
"net"
"sync"
"sync/atomic"
"time"
"unsafe"
)
const PACKAGE_FLAG_CONFIG = "@VIOXYCFG:" //规定设置数据格式 headerxyid,ip,dataformat-
const XY_REGISTER = 30001
const XY_REGISTER_RSP = 30002
const XY_CONFIG = 30003
const xyAuthRsp = 30006
type INewConnInterface interface {
OnNewClientConnect(c IClient)
}
type IRegisterInterface interface {
OnClientRegister(c IClient, id int32, processId int32, tpc int, auth string) bool
}
type IClientInterface interface {
OnClientPackage(c IClient, data []byte, xy XY)
OnClientClose(c IClient)
}
type IClient interface {
Ip() string
SetSilent() //设置成静默,将永远不能再接收数据
BreakConnect() //中断连接
SetUserData(k string, d interface{}) //设置数据,需要自行处理数据安全问题
UserData(k string, def interface{}) interface{} //得到数据,需要自行处理数据安全问题
UserDataNum(k string, def int64) int64 //得到数据,需要自行处理数据安全问题
UserDataSss(k string, def string) string //得到数据,需要自行处理数据安全问题
CloseCause() int //关闭原因
SendXY(xy XY, pack []byte) int
SendPackage(pack []byte) int
}
type sSvrSettings struct {
rtimeouts uint32
wtimeouts uint32
headerlen int //0,1,2,137
headerxyid bool
header0lensplit []byte
}
var defsvrsettings sSvrSettings
func init() {
defsvrsettings.headerlen = 137 //自动
defsvrsettings.rtimeouts = 10 * 60
defsvrsettings.wtimeouts = 60
defsvrsettings.headerxyid = true
defsvrsettings.header0lensplit = []byte{0}
}
type onePackWithXY struct {
xy XY
pack []byte
}
var packPool = sync.Pool{
New: func() interface{} {
return &onePackWithXY{}
},
}
func newPack(len uint32) *onePackWithXY {
p := packPool.Get().(*onePackWithXY)
p.pack = make([]byte, len)
return p
}
func newZuPack(pack []byte, xy XY) *onePackWithXY {
return &onePackWithXY{
pack: make([]byte, 1),
}
}
type sMyGroupEvent struct {
client *sMyClient
isclose bool
pack []byte
}
func allocMyClientsPushItem() *sMyGroupEvent {
p := &sMyGroupEvent{}
p.client = nil
p.isclose = false
p.pack = nil
return p
}
type sMyGroup struct {
cbIf IClientInterface
ch_event chan *sMyGroupEvent
ch_exit chan bool
}
func (self *sMyGroup) pushPackage(client *sMyClient, pack []byte) {
timeout := time.NewTimer(time.Microsecond * 500)
p := allocMyClientsPushItem()
p.client = client
p.isclose = false
p.pack = pack
select {
case self.ch_event <- p:
case <-timeout.C:
panic("MyClientsInterface:pushPackage")
}
}
func (self *sMyGroup) pushClose(client *sMyClient) {
timeout := time.NewTimer(time.Microsecond * 500)
p := allocMyClientsPushItem()
p.client = client
p.isclose = true
p.pack = nil
select {
case self.ch_event <- p:
case <-timeout.C:
panic("MyClient:pushClose")
}
}
type sConnData struct {
dat map[string]interface{}
lock *sync.Mutex
sendlastfail error
recvlastfail error
ip string
}
func (p *sConnData) initUserData() {
p.lock = &sync.Mutex{}
p.dat = make(map[string]interface{})
}
func (p *sConnData) SetUserData(k string, d interface{}) {
p.lock.Lock()
defer p.lock.Unlock()
p.dat[k] = d
}
func (p *sConnData) UserData(k string, def interface{}) interface{} {
p.lock.Lock()
defer p.lock.Unlock()
v, ok := p.dat[k]
if ok {
return v
}
return def
}
func (p *sConnData) UserDataNum(k string, def int64) int64 {
p.lock.Lock()
defer p.lock.Unlock()
vx, ok := p.dat[k]
if ok {
switch v := vx.(type) {
case int:
return int64(v)
case int8:
return int64(v)
case int16:
return int64(v)
case int32:
return int64(v)
case int64:
return int64(v)
case uint:
return int64(v)
case uint8:
return int64(v)
case uint16:
return int64(v)
case uint32:
return int64(v)
case uint64:
return int64(v)
case float32:
return int64(v)
case float64:
return int64(v)
default:
return def
}
}
return def
}
func (p *sConnData) UserDataSss(k string, def string) string {
p.lock.Lock()
defer p.lock.Unlock()
vx, ok := p.dat[k]
if ok {
switch v := vx.(type) {
case string:
return string(v)
default:
return def
}
}
return def
}
func (p *sConnData) freeUserData() {
p.lock.Lock()
defer p.lock.Unlock()
p.dat = nil
p.sendlastfail = nil
p.recvlastfail = nil
}
type sMyClient struct {
sConnData
sSvrSettings
c net.Conn
ch_sendlist chan []byte
setsilent int32
setbreaked int32
exited int32
cbCheck IRegisterInterface
cbIf IClientInterface
araRandnum []uint32
aracrptstate bool
}
func allocOneClient(c net.Conn, cbIf IClientInterface /*cbXY IXYInterface,*/, cbCheck IRegisterInterface /*, cbClose IConnCloseInterface*/) *sMyClient {
p := &sMyClient{
c: c,
cbCheck: cbCheck,
cbIf: cbIf,
//cbPackage: cbPackage,
//cbXY: cbXY,
//cbClose: cbClose,
araRandnum: make([]uint32, 3),
aracrptstate: false,
}
p.initUserData()
p.ip = c.RemoteAddr().String()
p.ch_sendlist = make(chan []byte, 100)
p.sendlastfail = nil
p.recvlastfail = nil
p.setsilent = 0
p.setbreaked = 0
p.exited = 0
p.headerxyid = defsvrsettings.headerxyid
p.headerlen = defsvrsettings.headerlen
p.header0lensplit = defsvrsettings.header0lensplit
p.rtimeouts = defsvrsettings.rtimeouts
p.wtimeouts = defsvrsettings.wtimeouts
return p
}
func freeClient(p *sMyClient) {
Assert(p.c == nil, "freeClient1")
Assert(p.ch_sendlist == nil, "freeClient2")
p.ch_sendlist = nil
p.cbCheck = nil
p.cbIf = nil
//p.cbXY = nil
//p.cbClose = nil
p.freeUserData()
p.lock = nil
}
func (p *sMyClient) Ip() string {
return p.ip
}
func (p *sMyClient) SetSilent() {
atomic.StoreInt32(&p.setsilent, 1)
}
func (p *sMyClient) BreakConnect() {
atomic.StoreInt32(&p.setbreaked, 1)
}
func (p *sMyClient) CloseCause() int {
if atomic.CompareAndSwapInt32(&p.exited, 1, 1) {
if p.recvlastfail == io.EOF {
return 1
} else if p.recvlastfail != nil && p.recvlastfail.(*net.OpError).Timeout() {
return 2
} else if p.sendlastfail != nil && p.sendlastfail.(*net.OpError).Timeout() {
return 3
}
return 9999
}
return 0
}
func (self *sMyClient) SendPackage(pack []byte) int {
Assert(pack != nil, "MyClient::SendPackage nil")
if self.headerlen == -1 {
return -2
}
Assert(self.headerlen == 0 || self.headerlen == 1 || self.headerlen == 2 || self.headerlen == 4 || self.headerlen == 137, "MyClient::SendPackage limit")
AssertImposible(self.headerlen == 1 && len(pack) > 0xff, "MyClient::SendPackage 1 limit")
AssertImposible(self.headerlen == 2 && len(pack) > 0xffff, "MyClient::SendPackage 2 limit")
//Assert(self.cbPackage != nil, "MyClient::SendPackage not")
Assert(!self.headerxyid, "MyClient headerxyid SendPackage")
if atomic.LoadInt32(&self.exited) == 1 {
return -1
}
pk := allocPackWithInstall(false, 0, pack, self.headerlen)
timeout := time.NewTimer(time.Microsecond * 500)
if self.aracrptstate {
var temp = newARACrpt()
temp.SetKey(self.araRandnum[0], self.araRandnum[1], self.araRandnum[2])
var x = pk[1:]
temp.TransformString(&x, uint32(len(pk)-1))
}
select {
case self.ch_sendlist <- pk:
break
case <-timeout.C:
return 1
}
return 0
}
func (self *sMyClient) SendXY(xy XY, pack []byte) int {
Assert(pack != nil, "MyClient::SendXY nil")
if self.headerlen == -1 {
Log("send return")
return -2
}
Assert(self.headerlen == 0 || self.headerlen == 1 || self.headerlen == 2 || self.headerlen == 4 || self.headerlen == 137, "MyClient::SendPackage limit")
AssertImposible(self.headerlen == 1 && len(pack) > 0xff, "MyClient::SendXY 1 limit")
AssertImposible(self.headerlen == 2 && len(pack) > 0xffff, "MyClient::SendXY 2 limit")
//Assert(self.cbXY != nil, "MyClient::SendPackage xy")
Assert(self.headerxyid, "MyClient headerxyid SendXY")
if atomic.LoadInt32(&self.exited) == 1 {
return -1
}
pk := allocPackWithInstall(true, xy, pack, self.headerlen)
timeout := time.NewTimer(time.Microsecond * 500)
if self.aracrptstate {
var temp = newARACrpt()
temp.SetKey(self.araRandnum[0], self.araRandnum[1], self.araRandnum[2])
var x = pk[1:]
temp.TransformString(&x, uint32(len(pk)-1))
}
select {
case self.ch_sendlist <- pk:
break
case <-timeout.C:
return 1
}
return 0
}
func (self *sMyClient) SendXY_zore() int {
if self.headerlen == -1 {
Log("send return")
return -2
}
Assert(self.headerlen == 0 || self.headerlen == 1 || self.headerlen == 2 || self.headerlen == 4 || self.headerlen == 137, "MyClient::SendPackage limit")
//Assert(self.cbXY != nil, "MyClient::SendPackage xy")
Assert(self.headerxyid, "MyClient headerxyid SendXY")
if atomic.LoadInt32(&self.exited) == 1 {
return -1
}
pk := make([]byte, 1)
pk[0] = 0
timeout := time.NewTimer(time.Microsecond * 500)
select {
case self.ch_sendlist <- pk:
break
case <-timeout.C:
return 1
}
return 0
}
func (self *sMyClient) runSend(wg *sync.WaitGroup) {
defer func() { CrashSnap(recover()) }()
for {
buf := <-self.ch_sendlist
if buf == nil { //nil pack or chan close
break
}
err := loopSend(self.c, buf, self.wtimeouts)
if err != nil {
self.sendlastfail = err
Logi(LOG_KERL, "SVR:MyClient runSend[", err, "]退出,未触发MyClient关闭,会导致对方ReadTimeOut,需改")
break
}
freePack(buf)
}
wg.Done()
}
var byPACKAGE_FLAG_CONFIG = []byte(PACKAGE_FLAG_CONFIG)
func isCONFIGPack(by []byte) bool {
if len(by) < len(byPACKAGE_FLAG_CONFIG) {
return false
}
for i, _ := range byPACKAGE_FLAG_CONFIG {
if by[i] != byPACKAGE_FLAG_CONFIG[i] {
return false
}
}
return true
}
func (self *sMyClient) runRecv() {
header := make([]byte, 7)
for atomic.LoadInt32(&self.setbreaked) == 0 {
len, err := recvPackageHeader(self.c, self.headerlen, self.rtimeouts, header)
if err != nil {
self.recvlastfail = err
break
}
var xy XY = 0
if self.headerxyid {
if len == 0 && err == nil {
Log("SendXY_zore")
self.SendXY_zore()
continue
}
if len < uint32(unsafe.Sizeof(xy)) {
self.recvlastfail = errors.New("XY limit")
break
}
//len += uint32(unsafe.Sizeof(xy))
}
var pack []byte = nil
if len > 0 {
pack = allocPack(len)
var err error
if isValidHenlen(self.headerlen) {
Log("loopRecv ", len)
err = loopRecv(self.c, pack, self.rtimeouts)
} else {
var read int
read, err = tryRecv(self.c, pack, self.rtimeouts)
pack = pack[0:read]
}
if err != nil {
self.recvlastfail = err
break
}
if self.aracrptstate {
var temp = newARACrpt()
temp.SetKey(self.araRandnum[0], self.araRandnum[1], self.araRandnum[2])
temp.TransformString(&pack, len)
}
}
if self.headerxyid {
if len < uint32(unsafe.Sizeof(xy)) {
self.recvlastfail = errors.New("XY limit")
break
}
io := bytes.NewBuffer(pack)
binary.Read(io, binary.LittleEndian, &xy)
}
if atomic.LoadInt32(&self.setsilent) == 0 {
if self.headerxyid {
/////这里也许要处理别的协议
if xy == XY_CONFIG && self.cbCheck != nil {
buf := make([]byte, 8)
io := bytes.NewBuffer(buf)
io.Reset()
binary.Write(io, binary.LittleEndian, int32(0))
binary.Write(io, binary.LittleEndian, int32(1))
self.SendXY(xyAuthRsp, buf)
} else if xy == 30000 && self.cbCheck != nil {
self.araRandnum[0] = rand.Uint32()
self.araRandnum[1] = rand.Uint32()
self.araRandnum[2] = rand.Uint32()
buf := make([]byte, 15)
io := bytes.NewBuffer(buf)
io.Reset()
binary.Write(io, binary.LittleEndian, int8(0))
binary.Write(io, binary.LittleEndian, int16(12))
binary.Write(io, binary.LittleEndian, self.araRandnum[0])
binary.Write(io, binary.LittleEndian, self.araRandnum[1])
binary.Write(io, binary.LittleEndian, self.araRandnum[2])
self.SendXY(XY_REGISTER, buf)
self.aracrptstate = true
} else if xy == XY_REGISTER && self.cbCheck != nil {
jr := NewJSON()
err = json.Unmarshal(pack[int(unsafe.Sizeof(xy)):], &jr)
if err == nil {
jrd := JSONRead{JSONT: jr}
tpc := jrd.Num("type")
sid := jrd.Num("id")
processId := jrd.Num("processid")
authss := jrd.Sss("sign")
if sid != 0 {
cked := self.cbCheck.OnClientRegister(self, int32(sid), int32(processId), int(tpc), authss)
if cked {
j := NewJSON()
j["id"] = sid
j["error"] = 0
b, _ := json.Marshal(j)
self.SendXY(XY_REGISTER_RSP, b)
} else {
self.BreakConnect()
}
} else {
self.BreakConnect()
}
} else {
self.BreakConnect()
}
} else {
self.cbIf.OnClientPackage(self, pack[int(unsafe.Sizeof(xy)):], xy)
}
} else {
self.cbIf.OnClientPackage(self, pack, 0)
}
}
if pack != nil {
freePack(pack)
}
}
atomic.StoreInt32(&self.exited, 1)
self.cbIf.OnClientClose(self)
}
func (p *sMyClient) run(wgg *sync.WaitGroup) {
defer func() { CrashSnap(recover()) }()
wg := &sync.WaitGroup{}
wg.Add(1)
go p.runSend(wg)
p.runRecv()
p.ch_sendlist <- nil
wg.Wait()
Logi(LOG_KERL, "SVR:MyClient runRecv 退出[", p.recvlastfail, ",", p.sendlastfail, "],通过sendlist关闭SendRun并Conn.Close(还有点问题)")
close(p.ch_sendlist)
p.c.Close()
p.c = nil
p.ch_sendlist = nil
freeClient(p) //可一定程度避免内存泄露
if wgg != nil {
wgg.Done()
}
}
type IServer interface {
IRun
IStop
}
type sMyServer struct {
tcplisten net.Listener
wg *sync.WaitGroup
cbConn INewConnInterface
cbCheck IRegisterInterface
cbIf IClientInterface
clients_inbox uint
addr string
}
func (p *sMyServer) accept(wg *sync.WaitGroup) {
defer func() { CrashSnap(recover()) }()
Logi(LOG_KERL, "Listening ", p.addr, " ...")
for {
c, err := p.tcplisten.Accept()
if err != nil {
break
}
mycnt := allocOneClient(c, p.cbIf /*p.cbXY,*/, p.cbCheck)
//mycnt.headerxy = settings.headerdefxy
//mycnt.headerlen = settings.headerDeflen
p.clients_inbox++
p.wg.Add(1)
if p.cbConn != nil {
p.cbConn.OnNewClientConnect(mycnt)
}
go mycnt.run(p.wg)
}
Logi(LOG_KERL, " ...Listening ", p.addr, " exit")
if wg != nil {
wg.Done()
}
}
type ServiceConfig struct {
}
func (p *ServiceConfig) AddHander(id int) {
}
func (p *ServiceConfig) AddClientHander(cmd int16) {
}
func (p *ServiceConfig) AddHttpHander(file string) {
}
func (p *sMyServer) Stop() {
p.tcplisten.Close()
}
func (p *sMyServer) Run(wg *sync.WaitGroup) {
if wg != nil {
wg.Add(1)
}
go p.accept(wg)
}
func ServeStartupAll(addr string, cbConn INewConnInterface, cbIf IClientInterface, cbCheck IRegisterInterface /*cbXY IXYInterface, cbClose IConnCloseInterface*/) IServer {
//Assert(cbConn != nil, "ServeStartupAll::handler nil")
//Assert(cbClose != nil, "ServeStartupAll::handler nil")
AssertImposible(cbIf == nil /*&& cbXY == nil*/, "ServeStartupAll::handler nil")
tcplisten, err := net.Listen("tcp", addr)
if err != nil {
Logi(LOG_KERL, "err:Listen(", addr, ") ", err)
return nil
}
p := &sMyServer{
tcplisten: tcplisten,
wg: &sync.WaitGroup{},
addr: addr,
cbConn: cbConn,
cbCheck: cbCheck,
cbIf: cbIf,
//cbXY: cbXY,
//cbClose: cbClose,
}
return p
}
func ServeStartup(addr string,
cbConn INewConnInterface,
cbIf IClientInterface) IServer {
return ServeStartupAll(addr, cbConn, cbIf, nil /*cbXY,*/)
}
type ARACrpt struct {
m_LFSR_A uint32
m_LFSR_B uint32
m_LFSR_C uint32
m_Mask_A uint32
m_Mask_B uint32
m_Mask_C uint32
m_Rot0_A uint32
m_Rot0_B uint32
m_Rot0_C uint32
m_Rot1_A uint32
m_Rot1_B uint32
m_Rot1_C uint32
}
func newARACrpt() *ARACrpt {
var res = new(ARACrpt)
res.m_LFSR_A = 0x13579BDF
res.m_LFSR_B = 0x2468ACE0
res.m_LFSR_C = 0xFDB97531
res.m_Mask_A = 0x80000062
res.m_Mask_B = 0x40000020
res.m_Mask_C = 0x10000002
res.m_Rot0_A = 0x7FFFFFFF
res.m_Rot0_B = 0x3FFFFFFF
res.m_Rot0_C = 0x0FFFFFFF
res.m_Rot1_A = 0x80000000
res.m_Rot1_B = 0xC0000000
res.m_Rot1_C = 0xF0000000
return res
}
func (ara *ARACrpt) SetKey(key1 uint32, key2 uint32, key3 uint32) {
ara.m_LFSR_A = key1
ara.m_LFSR_B = key2
ara.m_LFSR_C = key3
if 0x00000000 == ara.m_LFSR_A {
ara.m_LFSR_A = 0x13579BDF
}
if 0x00000000 == ara.m_LFSR_B {
ara.m_LFSR_B = 0x2468ACE0
}
if 0x00000000 == ara.m_LFSR_C {
ara.m_LFSR_C = 0xFDB97531
}
}
func (ara *ARACrpt) GetKey() (uint32, uint32, uint32) {
return ara.m_LFSR_A, ara.m_LFSR_B, ara.m_LFSR_C
}
func (ara *ARACrpt) TransformChar(cTarget *byte) {
Counter := 0
var Crypto byte
Out_B := (ara.m_LFSR_B & 0x00000001)
Out_C := (ara.m_LFSR_C & 0x00000001)
for ; Counter < 8; Counter++ {
if ara.m_LFSR_A&0x00000001 != 0x00000000 {
ara.m_LFSR_A = (((ara.m_LFSR_A ^ ara.m_Mask_A) >> 1) | ara.m_Rot1_A)
if ara.m_LFSR_B&0x00000001 != 0x00000000 {
ara.m_LFSR_B = (((ara.m_LFSR_B ^ ara.m_Mask_B) >> 1) | ara.m_Rot1_B)
Out_B = 0x00000001
} else {
ara.m_LFSR_B = ((ara.m_LFSR_B >> 1) & ara.m_Rot0_B)
Out_B = 0x00000000
}
} else {
ara.m_LFSR_A = ((ara.m_LFSR_A >> 1) & ara.m_Rot0_A)
if ara.m_LFSR_C&0x00000001 != 0x00000000 {
ara.m_LFSR_C = (((ara.m_LFSR_C ^ ara.m_Mask_C) >> 1) | ara.m_Rot1_C)
Out_C = 0x00000001
} else {
ara.m_LFSR_C = ((ara.m_LFSR_C >> 1) & ara.m_Rot0_C)
Out_C = 0x00000000
}
}
Crypto = ((Crypto << 1) | byte(Out_B^Out_C))
}
*cTarget = (*cTarget ^ Crypto)
}
func (ara *ARACrpt) TransformString(csTarget *[]byte, cbsize uint32) {
for npos := uint32(0); npos < cbsize; npos++ {
ara.TransformChar(&(*csTarget)[npos])
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/dream_hat/dreamgo.git
git@gitee.com:dream_hat/dreamgo.git
dream_hat
dreamgo
dreamgo
v1.1.2

搜索帮助