代码拉取完成,页面将自动刷新
package dream
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"io"
"math/rand"
"net"
"sync"
"sync/atomic"
"time"
"unsafe"
)
const PACKAGE_FLAG_CONFIG = "@VIOXYCFG:" //规定设置数据格式 headerxyid,ip,dataformat-
const XY_REGISTER = 30001
const XY_REGISTER_RSP = 30002
const XY_CONFIG = 30003
const xyAuthRsp = 30006
type INewConnInterface interface {
OnNewClientConnect(c IClient)
}
type IRegisterInterface interface {
OnClientRegister(c IClient, id int32, processId int32, tpc int, auth string) bool
}
type IClientInterface interface {
OnClientPackage(c IClient, data []byte, xy XY)
OnClientClose(c IClient)
}
type IClient interface {
Ip() string
SetSilent() //设置成静默,将永远不能再接收数据
BreakConnect() //中断连接
SetUserData(k string, d interface{}) //设置数据,需要自行处理数据安全问题
UserData(k string, def interface{}) interface{} //得到数据,需要自行处理数据安全问题
UserDataNum(k string, def int64) int64 //得到数据,需要自行处理数据安全问题
UserDataSss(k string, def string) string //得到数据,需要自行处理数据安全问题
CloseCause() int //关闭原因
SendXY(xy XY, pack []byte) int
SendPackage(pack []byte) int
}
type sSvrSettings struct {
rtimeouts uint32
wtimeouts uint32
headerlen int //0,1,2,137
headerxyid bool
header0lensplit []byte
}
var defsvrsettings sSvrSettings
func init() {
defsvrsettings.headerlen = 137 //自动
defsvrsettings.rtimeouts = 10 * 60
defsvrsettings.wtimeouts = 60
defsvrsettings.headerxyid = true
defsvrsettings.header0lensplit = []byte{0}
}
type onePackWithXY struct {
xy XY
pack []byte
}
var packPool = sync.Pool{
New: func() interface{} {
return &onePackWithXY{}
},
}
func newPack(len uint32) *onePackWithXY {
p := packPool.Get().(*onePackWithXY)
p.pack = make([]byte, len)
return p
}
func newZuPack(pack []byte, xy XY) *onePackWithXY {
return &onePackWithXY{
pack: make([]byte, 1),
}
}
type sMyGroupEvent struct {
client *sMyClient
isclose bool
pack []byte
}
func allocMyClientsPushItem() *sMyGroupEvent {
p := &sMyGroupEvent{}
p.client = nil
p.isclose = false
p.pack = nil
return p
}
type sMyGroup struct {
cbIf IClientInterface
ch_event chan *sMyGroupEvent
ch_exit chan bool
}
func (self *sMyGroup) pushPackage(client *sMyClient, pack []byte) {
timeout := time.NewTimer(time.Microsecond * 500)
p := allocMyClientsPushItem()
p.client = client
p.isclose = false
p.pack = pack
select {
case self.ch_event <- p:
case <-timeout.C:
panic("MyClientsInterface:pushPackage")
}
}
func (self *sMyGroup) pushClose(client *sMyClient) {
timeout := time.NewTimer(time.Microsecond * 500)
p := allocMyClientsPushItem()
p.client = client
p.isclose = true
p.pack = nil
select {
case self.ch_event <- p:
case <-timeout.C:
panic("MyClient:pushClose")
}
}
type sConnData struct {
dat map[string]interface{}
lock *sync.Mutex
sendlastfail error
recvlastfail error
ip string
}
func (p *sConnData) initUserData() {
p.lock = &sync.Mutex{}
p.dat = make(map[string]interface{})
}
func (p *sConnData) SetUserData(k string, d interface{}) {
p.lock.Lock()
defer p.lock.Unlock()
p.dat[k] = d
}
func (p *sConnData) UserData(k string, def interface{}) interface{} {
p.lock.Lock()
defer p.lock.Unlock()
v, ok := p.dat[k]
if ok {
return v
}
return def
}
func (p *sConnData) UserDataNum(k string, def int64) int64 {
p.lock.Lock()
defer p.lock.Unlock()
vx, ok := p.dat[k]
if ok {
switch v := vx.(type) {
case int:
return int64(v)
case int8:
return int64(v)
case int16:
return int64(v)
case int32:
return int64(v)
case int64:
return int64(v)
case uint:
return int64(v)
case uint8:
return int64(v)
case uint16:
return int64(v)
case uint32:
return int64(v)
case uint64:
return int64(v)
case float32:
return int64(v)
case float64:
return int64(v)
default:
return def
}
}
return def
}
func (p *sConnData) UserDataSss(k string, def string) string {
p.lock.Lock()
defer p.lock.Unlock()
vx, ok := p.dat[k]
if ok {
switch v := vx.(type) {
case string:
return string(v)
default:
return def
}
}
return def
}
func (p *sConnData) freeUserData() {
p.lock.Lock()
defer p.lock.Unlock()
p.dat = nil
p.sendlastfail = nil
p.recvlastfail = nil
}
type sMyClient struct {
sConnData
sSvrSettings
c net.Conn
ch_sendlist chan []byte
setsilent int32
setbreaked int32
exited int32
cbCheck IRegisterInterface
cbIf IClientInterface
araRandnum []uint32
aracrptstate bool
}
func allocOneClient(c net.Conn, cbIf IClientInterface /*cbXY IXYInterface,*/, cbCheck IRegisterInterface /*, cbClose IConnCloseInterface*/) *sMyClient {
p := &sMyClient{
c: c,
cbCheck: cbCheck,
cbIf: cbIf,
//cbPackage: cbPackage,
//cbXY: cbXY,
//cbClose: cbClose,
araRandnum: make([]uint32, 3),
aracrptstate: false,
}
p.initUserData()
p.ip = c.RemoteAddr().String()
p.ch_sendlist = make(chan []byte, 100)
p.sendlastfail = nil
p.recvlastfail = nil
p.setsilent = 0
p.setbreaked = 0
p.exited = 0
p.headerxyid = defsvrsettings.headerxyid
p.headerlen = defsvrsettings.headerlen
p.header0lensplit = defsvrsettings.header0lensplit
p.rtimeouts = defsvrsettings.rtimeouts
p.wtimeouts = defsvrsettings.wtimeouts
return p
}
func freeClient(p *sMyClient) {
Assert(p.c == nil, "freeClient1")
Assert(p.ch_sendlist == nil, "freeClient2")
p.ch_sendlist = nil
p.cbCheck = nil
p.cbIf = nil
//p.cbXY = nil
//p.cbClose = nil
p.freeUserData()
p.lock = nil
}
func (p *sMyClient) Ip() string {
return p.ip
}
func (p *sMyClient) SetSilent() {
atomic.StoreInt32(&p.setsilent, 1)
}
func (p *sMyClient) BreakConnect() {
atomic.StoreInt32(&p.setbreaked, 1)
}
func (p *sMyClient) CloseCause() int {
if atomic.CompareAndSwapInt32(&p.exited, 1, 1) {
if p.recvlastfail == io.EOF {
return 1
} else if p.recvlastfail != nil && p.recvlastfail.(*net.OpError).Timeout() {
return 2
} else if p.sendlastfail != nil && p.sendlastfail.(*net.OpError).Timeout() {
return 3
}
return 9999
}
return 0
}
func (self *sMyClient) SendPackage(pack []byte) int {
Assert(pack != nil, "MyClient::SendPackage nil")
if self.headerlen == -1 {
return -2
}
Assert(self.headerlen == 0 || self.headerlen == 1 || self.headerlen == 2 || self.headerlen == 4 || self.headerlen == 137, "MyClient::SendPackage limit")
AssertImposible(self.headerlen == 1 && len(pack) > 0xff, "MyClient::SendPackage 1 limit")
AssertImposible(self.headerlen == 2 && len(pack) > 0xffff, "MyClient::SendPackage 2 limit")
//Assert(self.cbPackage != nil, "MyClient::SendPackage not")
Assert(!self.headerxyid, "MyClient headerxyid SendPackage")
if atomic.LoadInt32(&self.exited) == 1 {
return -1
}
pk := allocPackWithInstall(false, 0, pack, self.headerlen)
timeout := time.NewTimer(time.Microsecond * 500)
if self.aracrptstate {
var temp = newARACrpt()
temp.SetKey(self.araRandnum[0], self.araRandnum[1], self.araRandnum[2])
var x = pk[1:]
temp.TransformString(&x, uint32(len(pk)-1))
}
select {
case self.ch_sendlist <- pk:
break
case <-timeout.C:
return 1
}
return 0
}
func (self *sMyClient) SendXY(xy XY, pack []byte) int {
Assert(pack != nil, "MyClient::SendXY nil")
if self.headerlen == -1 {
Log("send return")
return -2
}
Assert(self.headerlen == 0 || self.headerlen == 1 || self.headerlen == 2 || self.headerlen == 4 || self.headerlen == 137, "MyClient::SendPackage limit")
AssertImposible(self.headerlen == 1 && len(pack) > 0xff, "MyClient::SendXY 1 limit")
AssertImposible(self.headerlen == 2 && len(pack) > 0xffff, "MyClient::SendXY 2 limit")
//Assert(self.cbXY != nil, "MyClient::SendPackage xy")
Assert(self.headerxyid, "MyClient headerxyid SendXY")
if atomic.LoadInt32(&self.exited) == 1 {
return -1
}
pk := allocPackWithInstall(true, xy, pack, self.headerlen)
timeout := time.NewTimer(time.Microsecond * 500)
if self.aracrptstate {
var temp = newARACrpt()
temp.SetKey(self.araRandnum[0], self.araRandnum[1], self.araRandnum[2])
var x = pk[1:]
temp.TransformString(&x, uint32(len(pk)-1))
}
select {
case self.ch_sendlist <- pk:
break
case <-timeout.C:
return 1
}
return 0
}
func (self *sMyClient) SendXY_zore() int {
if self.headerlen == -1 {
Log("send return")
return -2
}
Assert(self.headerlen == 0 || self.headerlen == 1 || self.headerlen == 2 || self.headerlen == 4 || self.headerlen == 137, "MyClient::SendPackage limit")
//Assert(self.cbXY != nil, "MyClient::SendPackage xy")
Assert(self.headerxyid, "MyClient headerxyid SendXY")
if atomic.LoadInt32(&self.exited) == 1 {
return -1
}
pk := make([]byte, 1)
pk[0] = 0
timeout := time.NewTimer(time.Microsecond * 500)
select {
case self.ch_sendlist <- pk:
break
case <-timeout.C:
return 1
}
return 0
}
func (self *sMyClient) runSend(wg *sync.WaitGroup) {
defer func() { CrashSnap(recover()) }()
for {
buf := <-self.ch_sendlist
if buf == nil { //nil pack or chan close
break
}
err := loopSend(self.c, buf, self.wtimeouts)
if err != nil {
self.sendlastfail = err
Logi(LOG_KERL, "SVR:MyClient runSend[", err, "]退出,未触发MyClient关闭,会导致对方ReadTimeOut,需改")
break
}
freePack(buf)
}
wg.Done()
}
var byPACKAGE_FLAG_CONFIG = []byte(PACKAGE_FLAG_CONFIG)
func isCONFIGPack(by []byte) bool {
if len(by) < len(byPACKAGE_FLAG_CONFIG) {
return false
}
for i, _ := range byPACKAGE_FLAG_CONFIG {
if by[i] != byPACKAGE_FLAG_CONFIG[i] {
return false
}
}
return true
}
func (self *sMyClient) runRecv() {
header := make([]byte, 7)
for atomic.LoadInt32(&self.setbreaked) == 0 {
len, err := recvPackageHeader(self.c, self.headerlen, self.rtimeouts, header)
if err != nil {
self.recvlastfail = err
break
}
var xy XY = 0
if self.headerxyid {
if len == 0 && err == nil {
Log("SendXY_zore")
self.SendXY_zore()
continue
}
if len < uint32(unsafe.Sizeof(xy)) {
self.recvlastfail = errors.New("XY limit")
break
}
//len += uint32(unsafe.Sizeof(xy))
}
var pack []byte = nil
if len > 0 {
pack = allocPack(len)
var err error
if isValidHenlen(self.headerlen) {
Log("loopRecv ", len)
err = loopRecv(self.c, pack, self.rtimeouts)
} else {
var read int
read, err = tryRecv(self.c, pack, self.rtimeouts)
pack = pack[0:read]
}
if err != nil {
self.recvlastfail = err
break
}
if self.aracrptstate {
var temp = newARACrpt()
temp.SetKey(self.araRandnum[0], self.araRandnum[1], self.araRandnum[2])
temp.TransformString(&pack, len)
}
}
if self.headerxyid {
if len < uint32(unsafe.Sizeof(xy)) {
self.recvlastfail = errors.New("XY limit")
break
}
io := bytes.NewBuffer(pack)
binary.Read(io, binary.LittleEndian, &xy)
}
if atomic.LoadInt32(&self.setsilent) == 0 {
if self.headerxyid {
/////这里也许要处理别的协议
if xy == XY_CONFIG && self.cbCheck != nil {
buf := make([]byte, 8)
io := bytes.NewBuffer(buf)
io.Reset()
binary.Write(io, binary.LittleEndian, int32(0))
binary.Write(io, binary.LittleEndian, int32(1))
self.SendXY(xyAuthRsp, buf)
} else if xy == 30000 && self.cbCheck != nil {
self.araRandnum[0] = rand.Uint32()
self.araRandnum[1] = rand.Uint32()
self.araRandnum[2] = rand.Uint32()
buf := make([]byte, 15)
io := bytes.NewBuffer(buf)
io.Reset()
binary.Write(io, binary.LittleEndian, int8(0))
binary.Write(io, binary.LittleEndian, int16(12))
binary.Write(io, binary.LittleEndian, self.araRandnum[0])
binary.Write(io, binary.LittleEndian, self.araRandnum[1])
binary.Write(io, binary.LittleEndian, self.araRandnum[2])
self.SendXY(XY_REGISTER, buf)
self.aracrptstate = true
} else if xy == XY_REGISTER && self.cbCheck != nil {
jr := NewJSON()
err = json.Unmarshal(pack[int(unsafe.Sizeof(xy)):], &jr)
if err == nil {
jrd := JSONRead{JSONT: jr}
tpc := jrd.Num("type")
sid := jrd.Num("id")
processId := jrd.Num("processid")
authss := jrd.Sss("sign")
if sid != 0 {
cked := self.cbCheck.OnClientRegister(self, int32(sid), int32(processId), int(tpc), authss)
if cked {
j := NewJSON()
j["id"] = sid
j["error"] = 0
b, _ := json.Marshal(j)
self.SendXY(XY_REGISTER_RSP, b)
} else {
self.BreakConnect()
}
} else {
self.BreakConnect()
}
} else {
self.BreakConnect()
}
} else {
self.cbIf.OnClientPackage(self, pack[int(unsafe.Sizeof(xy)):], xy)
}
} else {
self.cbIf.OnClientPackage(self, pack, 0)
}
}
if pack != nil {
freePack(pack)
}
}
atomic.StoreInt32(&self.exited, 1)
self.cbIf.OnClientClose(self)
}
func (p *sMyClient) run(wgg *sync.WaitGroup) {
defer func() { CrashSnap(recover()) }()
wg := &sync.WaitGroup{}
wg.Add(1)
go p.runSend(wg)
p.runRecv()
p.ch_sendlist <- nil
wg.Wait()
Logi(LOG_KERL, "SVR:MyClient runRecv 退出[", p.recvlastfail, ",", p.sendlastfail, "],通过sendlist关闭SendRun并Conn.Close(还有点问题)")
close(p.ch_sendlist)
p.c.Close()
p.c = nil
p.ch_sendlist = nil
freeClient(p) //可一定程度避免内存泄露
if wgg != nil {
wgg.Done()
}
}
type IServer interface {
IRun
IStop
}
type sMyServer struct {
tcplisten net.Listener
wg *sync.WaitGroup
cbConn INewConnInterface
cbCheck IRegisterInterface
cbIf IClientInterface
clients_inbox uint
addr string
}
func (p *sMyServer) accept(wg *sync.WaitGroup) {
defer func() { CrashSnap(recover()) }()
Logi(LOG_KERL, "Listening ", p.addr, " ...")
for {
c, err := p.tcplisten.Accept()
if err != nil {
break
}
mycnt := allocOneClient(c, p.cbIf /*p.cbXY,*/, p.cbCheck)
//mycnt.headerxy = settings.headerdefxy
//mycnt.headerlen = settings.headerDeflen
p.clients_inbox++
p.wg.Add(1)
if p.cbConn != nil {
p.cbConn.OnNewClientConnect(mycnt)
}
go mycnt.run(p.wg)
}
Logi(LOG_KERL, " ...Listening ", p.addr, " exit")
if wg != nil {
wg.Done()
}
}
type ServiceConfig struct {
}
func (p *ServiceConfig) AddHander(id int) {
}
func (p *ServiceConfig) AddClientHander(cmd int16) {
}
func (p *ServiceConfig) AddHttpHander(file string) {
}
func (p *sMyServer) Stop() {
p.tcplisten.Close()
}
func (p *sMyServer) Run(wg *sync.WaitGroup) {
if wg != nil {
wg.Add(1)
}
go p.accept(wg)
}
func ServeStartupAll(addr string, cbConn INewConnInterface, cbIf IClientInterface, cbCheck IRegisterInterface /*cbXY IXYInterface, cbClose IConnCloseInterface*/) IServer {
//Assert(cbConn != nil, "ServeStartupAll::handler nil")
//Assert(cbClose != nil, "ServeStartupAll::handler nil")
AssertImposible(cbIf == nil /*&& cbXY == nil*/, "ServeStartupAll::handler nil")
tcplisten, err := net.Listen("tcp", addr)
if err != nil {
Logi(LOG_KERL, "err:Listen(", addr, ") ", err)
return nil
}
p := &sMyServer{
tcplisten: tcplisten,
wg: &sync.WaitGroup{},
addr: addr,
cbConn: cbConn,
cbCheck: cbCheck,
cbIf: cbIf,
//cbXY: cbXY,
//cbClose: cbClose,
}
return p
}
func ServeStartup(addr string,
cbConn INewConnInterface,
cbIf IClientInterface) IServer {
return ServeStartupAll(addr, cbConn, cbIf, nil /*cbXY,*/)
}
type ARACrpt struct {
m_LFSR_A uint32
m_LFSR_B uint32
m_LFSR_C uint32
m_Mask_A uint32
m_Mask_B uint32
m_Mask_C uint32
m_Rot0_A uint32
m_Rot0_B uint32
m_Rot0_C uint32
m_Rot1_A uint32
m_Rot1_B uint32
m_Rot1_C uint32
}
func newARACrpt() *ARACrpt {
var res = new(ARACrpt)
res.m_LFSR_A = 0x13579BDF
res.m_LFSR_B = 0x2468ACE0
res.m_LFSR_C = 0xFDB97531
res.m_Mask_A = 0x80000062
res.m_Mask_B = 0x40000020
res.m_Mask_C = 0x10000002
res.m_Rot0_A = 0x7FFFFFFF
res.m_Rot0_B = 0x3FFFFFFF
res.m_Rot0_C = 0x0FFFFFFF
res.m_Rot1_A = 0x80000000
res.m_Rot1_B = 0xC0000000
res.m_Rot1_C = 0xF0000000
return res
}
func (ara *ARACrpt) SetKey(key1 uint32, key2 uint32, key3 uint32) {
ara.m_LFSR_A = key1
ara.m_LFSR_B = key2
ara.m_LFSR_C = key3
if 0x00000000 == ara.m_LFSR_A {
ara.m_LFSR_A = 0x13579BDF
}
if 0x00000000 == ara.m_LFSR_B {
ara.m_LFSR_B = 0x2468ACE0
}
if 0x00000000 == ara.m_LFSR_C {
ara.m_LFSR_C = 0xFDB97531
}
}
func (ara *ARACrpt) GetKey() (uint32, uint32, uint32) {
return ara.m_LFSR_A, ara.m_LFSR_B, ara.m_LFSR_C
}
func (ara *ARACrpt) TransformChar(cTarget *byte) {
Counter := 0
var Crypto byte
Out_B := (ara.m_LFSR_B & 0x00000001)
Out_C := (ara.m_LFSR_C & 0x00000001)
for ; Counter < 8; Counter++ {
if ara.m_LFSR_A&0x00000001 != 0x00000000 {
ara.m_LFSR_A = (((ara.m_LFSR_A ^ ara.m_Mask_A) >> 1) | ara.m_Rot1_A)
if ara.m_LFSR_B&0x00000001 != 0x00000000 {
ara.m_LFSR_B = (((ara.m_LFSR_B ^ ara.m_Mask_B) >> 1) | ara.m_Rot1_B)
Out_B = 0x00000001
} else {
ara.m_LFSR_B = ((ara.m_LFSR_B >> 1) & ara.m_Rot0_B)
Out_B = 0x00000000
}
} else {
ara.m_LFSR_A = ((ara.m_LFSR_A >> 1) & ara.m_Rot0_A)
if ara.m_LFSR_C&0x00000001 != 0x00000000 {
ara.m_LFSR_C = (((ara.m_LFSR_C ^ ara.m_Mask_C) >> 1) | ara.m_Rot1_C)
Out_C = 0x00000001
} else {
ara.m_LFSR_C = ((ara.m_LFSR_C >> 1) & ara.m_Rot0_C)
Out_C = 0x00000000
}
}
Crypto = ((Crypto << 1) | byte(Out_B^Out_C))
}
*cTarget = (*cTarget ^ Crypto)
}
func (ara *ARACrpt) TransformString(csTarget *[]byte, cbsize uint32) {
for npos := uint32(0); npos < cbsize; npos++ {
ara.TransformChar(&(*csTarget)[npos])
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。