Fetch the repository succeeded.
package tcpclient
import (
"context"
"fmt"
"net"
"time"
"github.com/sirupsen/logrus"
)
type TcpClient interface {
Start() error
Stop()
Write([]byte)
}
type Router interface {
Read([]byte)
Connection()
Disconnection()
}
type Client struct {
Name string
Ip string
Port int
stop bool
cancel func()
ctx context.Context
conn *net.TCPConn
router Router
wrt chan []byte
}
func NewClient(name string, ip string, port int, router Router) TcpClient {
return &Client{
Name: name,
Ip: ip,
Port: port,
stop: false,
cancel: nil,
ctx: nil,
conn: nil,
router: router,
wrt: make(chan []byte, 100),
}
}
func (c *Client) Start() error {
addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%s:%d", c.Ip, c.Port))
if err != nil {
logrus.Errorf("ResolveTCPAddr failed with error(%v)", err)
return err
}
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
c.ctx = ctx
go func() {
for {
// 如果core一直连接不上,而且用户又主动调用Stop函数,则在这边进行返回
select {
case <-c.ctx.Done():
return
default:
}
conn, err := net.DialTCP("tcp4", nil, addr)
if err != nil {
logrus.Errorf("DialTCP failed, addr[%v]", addr)
time.Sleep(time.Second)
continue
}
c.stop = false
logrus.Debugf("DialTCP success, addr[%v]", addr)
c.conn = conn
_ctx, _cancel := context.WithCancel(context.Background())
// 读取tcp
go func() {
buf := make([]byte, 4096)
c.conn.SetReadBuffer(4096)
for {
n, err := c.conn.Read(buf)
if n == 0 || err != nil {
// 连接出问题了,也可能是正常关闭了
logrus.Debugf("conn read failed (%v)", err)
_cancel()
return
}
logrus.Debugf("conn read len(%v)(%v)", n, len(buf))
c.router.Read(buf[:n])
}
}()
// 写入tcp
go func() {
for {
select {
case <-_ctx.Done():
return
case b := <-c.wrt:
n, err := c.conn.Write(b)
if err != nil || n != len(b) {
logrus.Errorf("conn write failed (%v)", err)
_cancel()
return
}
logrus.Debugf("conn write (%v) bytes", n)
}
}
}()
c.router.Connection()
// 说明断开了连接或者准备停止连接
done := false
select {
case <-ctx.Done():
logrus.Debugf("client stop")
done = true
case <-_ctx.Done():
}
conn.Close()
c.router.Disconnection()
if done {
return
} else {
<-time.After(time.Second)
}
}
}()
return nil
}
func (c *Client) Stop() {
c.stop = true
c.cancel()
}
func (c *Client) Write(bytes []byte) {
for {
select {
case c.wrt <- bytes:
return
case <-time.After(time.Second):
logrus.Errorf("write waiting, message full")
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。