2 Star 0 Fork 0

slh92 / plugin-sip

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
tcp_server.go 3.59 KB
一键复制 编辑 原始数据 按行查看 历史
slh92 提交于 2024-04-07 18:21 . 初始化代码
package transport
import (
"fmt"
"io"
"net"
"sync"
"time"
)
type TCPServer struct {
Statistic
addr string
listener net.Listener
readChan chan *Packet
writeChan chan *Packet
done chan struct{}
Keepalive bool
sessions sync.Map //key 是 remote-addr , value:*Connection。
}
func NewTCPServer(port uint16, keepalive bool) IServer {
tcpAddr := fmt.Sprintf(":%d", port)
return &TCPServer{
addr: tcpAddr,
Keepalive: keepalive,
readChan: make(chan *Packet, 10),
writeChan: make(chan *Packet, 10),
done: make(chan struct{}),
}
}
func (s *TCPServer) IsReliable() bool {
return true
}
func (s *TCPServer) Name() string {
return fmt.Sprintf("tcp server at:%s", s.addr)
}
func (s *TCPServer) IsKeepalive() bool {
return s.Keepalive
}
func (s *TCPServer) StartAndWait() error {
//监听端口
//开启tcp连接线程
var err error
s.listener, err = net.Listen("tcp", s.addr)
//s.listener, err = tls.Listen("tcp", s.tcpAddr, tlsConfig)
if err != nil {
fmt.Println("TCP Listen failed:", err)
return err
}
defer s.listener.Close()
fmt.Println("start tcp server at: ", s.addr)
//心跳线程
if s.Keepalive {
//TODO:start heartbeat thread
}
//写线程
go func() {
for {
select {
case p := <-s.writeChan:
val, ok := s.sessions.Load(p.Addr.String())
if !ok {
return
}
c := *val.(*Connection)
_, _ = c.Write(p.Data)
case <-s.done:
return
}
}
}()
//读线程
for {
conn, err := s.listener.Accept()
if err != nil {
var tempDelay time.Duration // how long to sleep on accept failure
fmt.Println("accept err :", err.Error())
// 重连。参考http server
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
time.Sleep(tempDelay)
continue
}
fmt.Println("accept error, retry failed & exit.")
return err
}
// conn.SetReadDeadline(time.Now().Add(600 * time.Second))
session := newTCPConnection(conn)
address := session.RemoteAddr().String()
s.sessions.Store(address, session)
fmt.Println(fmt.Sprintf("new tcp client remoteAddr: %v", address))
go s.handlerSession(&session)
}
}
func (s *TCPServer) handlerSession(c *Connection) {
conn := *c
addrStr := conn.RemoteAddr().String()
//recovery from panic
defer func() {
s.CloseOne(addrStr)
if err := recover(); err != nil {
fmt.Println("client receiver handler panic: ", err)
}
}()
buf := make([]byte, 2048)
for {
n, err := conn.Read(buf)
switch {
case err == nil:
p := &Packet{
Addr: conn.RemoteAddr(),
Data: buf[:n],
}
s.readChan <- p
case err == io.EOF:
fmt.Println(fmt.Sprintf("io.EOF,client close --- remoteAddr: %v", conn.RemoteAddr()))
return
case err != nil:
fmt.Println("client other err: ", err)
fmt.Println(fmt.Sprintf("client other err --- remoteAddr: %v", addrStr))
return
}
}
}
func (s *TCPServer) CloseOne(addr string) {
val, ok := s.sessions.Load(addr)
if !ok {
return
}
c := *val.(*Connection)
_ = c.Close()
s.sessions.Delete(addr)
}
func (s *TCPServer) ReadPacketChan() <-chan *Packet {
return s.readChan
}
func (s *TCPServer) WritePacket(packet *Packet) {
s.writeChan <- packet
}
func (s *TCPServer) Close() error {
//TODO:TCP服务退出之前,需要先close掉所有客户端的连接
s.sessions.Range(func(key, value interface{}) bool {
c := *value.(*Connection)
_ = c.Close()
s.sessions.Delete(key)
return true
})
return nil
}
func (s *TCPServer) Conn() *Connection {
return nil
}
Go
1
https://gitee.com/slh1992/plugin-sip.git
git@gitee.com:slh1992/plugin-sip.git
slh1992
plugin-sip
plugin-sip
v1.3.9

搜索帮助

53164aa7 5694891 3bd8fe86 5694891