1 Star 1 Fork 0

liuxuezhan/mylib

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
socket.go 2.81 KB
一键复制 编辑 原始数据 按行查看 历史
liuxuezhan 提交于 4年前 . 'new gate'
package tcp
import (
"net"
"p15server/src/core"
"time"
)
// ============================================================================
const (
C_read_buf_size = 4096
C_max_write_size = 4096
C_qw_max_size = 10000
)
// ============================================================================
type Socket struct {
c net.Conn // underlying connection
qw chan *[]byte // write queue
cb_data func(buf []byte) // callback: data
cb_close func() // callback: close
rip string // remote ip
rport int32 // remote port
hb uint32 // heart beat in milliseconds
}
// ============================================================================
func new_socket(c net.Conn) *Socket {
return &Socket{
c: c,
qw: make(chan *[]byte, C_qw_max_size),
}
}
// ============================================================================
func (self *Socket) Close() {
self.c.Close()
}
func (self *Socket) Send(buf []byte) {
// ignore EPIPE
defer func() { recover() }()
// push to qw. kick if qw if full
select {
case self.qw <- &buf:
default:
self.Close()
}
}
func (self *Socket) TcpNoDelay(b bool) {
if c, ok := self.c.(*net.TCPConn); ok {
c.SetNoDelay(b)
}
}
func (self *Socket) RemoteAddr() string {
return self.c.RemoteAddr().String()
}
func (self *Socket) RemoteIP() string {
return self.rip
}
func (self *Socket) RemotePort() int32 {
return self.rport
}
func (self *Socket) HeartBeat(ms uint32) {
self.hb = ms
}
func (self *Socket) OnData(f func(buf []byte)) {
self.cb_data = f
}
func (self *Socket) OnClose(f func()) {
self.cb_close = f
}
// ============================================================================
func (self *Socket) parse_remote_addr() {
addr := self.c.RemoteAddr().String()
host, port, err := net.SplitHostPort(addr)
if err == nil {
self.rip = host
self.rport = core.Atoi32(port)
}
}
func (self *Socket) thr_read() {
defer func() {
self.Close()
close(self.qw)
self.cb_data = nil
self.cb_close = nil
}()
c := self.c
buf := make([]byte, C_read_buf_size)
for {
// set deadline
if self.hb > 0 {
c.SetReadDeadline(core.GetNowTime().Add(time.Duration(self.hb) * time.Millisecond))
}
// read
n, err := c.Read(buf)
if err != nil || n == 0 {
// event: close
if self.cb_close != nil {
self.cb_close()
}
break
}
// event: data
if self.cb_data != nil {
self.cb_data(buf[:n])
}
}
}
func (self *Socket) thr_write() {
c := self.c
qw := self.qw
for {
select {
case buf, ok := <-qw:
if !ok {
return
}
L := len(qw)
for L > 0 && len(*buf) < C_max_write_size {
*buf = append(*buf, *<-qw...)
L--
}
n, err := c.Write(*buf)
if err != nil {
return
}
if n != len(*buf) {
panic("socket write error!")
}
}
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/liuxuezhan/mylib.git
git@gitee.com:liuxuezhan/mylib.git
liuxuezhan
mylib
mylib
v1.0.8

搜索帮助

371d5123 14472233 46e8bd33 14472233