代码拉取完成,页面将自动刷新
package tcp
import (
"net"
"p15server/src/core"
"time"
)
// ============================================================================
const (
C_read_buf_size = 4096
C_max_write_size = 4096
C_qw_max_size = 10000
)
// ============================================================================
type Socket struct {
c net.Conn // underlying connection
qw chan *[]byte // write queue
cb_data func(buf []byte) // callback: data
cb_close func() // callback: close
rip string // remote ip
rport int32 // remote port
hb uint32 // heart beat in milliseconds
}
// ============================================================================
func new_socket(c net.Conn) *Socket {
return &Socket{
c: c,
qw: make(chan *[]byte, C_qw_max_size),
}
}
// ============================================================================
func (self *Socket) Close() {
self.c.Close()
}
func (self *Socket) Send(buf []byte) {
// ignore EPIPE
defer func() { recover() }()
// push to qw. kick if qw if full
select {
case self.qw <- &buf:
default:
self.Close()
}
}
func (self *Socket) TcpNoDelay(b bool) {
if c, ok := self.c.(*net.TCPConn); ok {
c.SetNoDelay(b)
}
}
func (self *Socket) RemoteAddr() string {
return self.c.RemoteAddr().String()
}
func (self *Socket) RemoteIP() string {
return self.rip
}
func (self *Socket) RemotePort() int32 {
return self.rport
}
func (self *Socket) HeartBeat(ms uint32) {
self.hb = ms
}
func (self *Socket) OnData(f func(buf []byte)) {
self.cb_data = f
}
func (self *Socket) OnClose(f func()) {
self.cb_close = f
}
// ============================================================================
func (self *Socket) parse_remote_addr() {
addr := self.c.RemoteAddr().String()
host, port, err := net.SplitHostPort(addr)
if err == nil {
self.rip = host
self.rport = core.Atoi32(port)
}
}
func (self *Socket) thr_read() {
defer func() {
self.Close()
close(self.qw)
self.cb_data = nil
self.cb_close = nil
}()
c := self.c
buf := make([]byte, C_read_buf_size)
for {
// set deadline
if self.hb > 0 {
c.SetReadDeadline(core.GetNowTime().Add(time.Duration(self.hb) * time.Millisecond))
}
// read
n, err := c.Read(buf)
if err != nil || n == 0 {
// event: close
if self.cb_close != nil {
self.cb_close()
}
break
}
// event: data
if self.cb_data != nil {
self.cb_data(buf[:n])
}
}
}
func (self *Socket) thr_write() {
c := self.c
qw := self.qw
for {
select {
case buf, ok := <-qw:
if !ok {
return
}
L := len(qw)
for L > 0 && len(*buf) < C_max_write_size {
*buf = append(*buf, *<-qw...)
L--
}
n, err := c.Write(*buf)
if err != nil {
return
}
if n != len(*buf) {
panic("socket write error!")
}
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。