4 Star 6 Fork 3

王军 / golib

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
stream.go 6.28 KB
一键复制 编辑 原始数据 按行查看 历史
王军 提交于 2023-03-17 09:32 . 整理分类
package stream
import (
"encoding/binary"
"net"
"strings"
"time"
"gitee.com/haodreams/libs/ee"
"gitee.com/haodreams/golib/logs"
"gitee.com/haodreams/golib/lz4file"
)
//PktMaxSize 最大数据包 7M
const PktMaxSize = 1024 * 1024 * 7
//PktMinSize 最小数据包
const PktMinSize = 1024
//PktDefaultSize 默认数据包大小 512K
const PktDefaultSize = 512 * 1024
//PktMask mask
const PktMask = 0x7fffff
//pktLz4Mask 掩码
const pktLz4Mask = 0x800000
//MagicHead 魔法数字
const MagicHead = 0x574a1029
const headLen = 7
const posSize = 4
//所有通讯采用问答式
//Stream tcp 控制
type Stream struct {
head []byte
//options byte
user string
pwd string
maxSize int
host string
buf []byte
data []byte
//CreateTime 创建时间
CreateTime int64
//LastConnectTime 最后连接时间
LastConnectTime int64
//recvBytes int64
//sendBytes int64
//clientIP server 模式下有效
//clientIP string
conn *net.TCPConn
isGood bool
}
//NewClient 客户端模式
func NewClient(user, password, host string, bufSize int) *Stream {
m := new(Stream)
if bufSize < PktMinSize {
bufSize = PktMinSize
} else if bufSize > PktMaxSize {
bufSize = PktMaxSize
}
m.maxSize = bufSize
m.buf = make([]byte, m.maxSize+headLen)
m.head = m.buf[:headLen]
m.data = m.buf[headLen:]
m.user = user
m.pwd = password
m.host = host
return m
}
//NewServer 服务器模式
func NewServer(conn *net.TCPConn, check func(string, string) *ee.Error) (*Stream, *ee.Error) {
m := new(Stream)
m.conn = conn
m.maxSize = PktMinSize
m.buf = make([]byte, m.maxSize+headLen)
m.head = m.buf[:headLen]
m.data = m.buf[headLen:]
//conn.SetNoDelay(true)
conn.SetKeepAlive(true)
conn.SetKeepAlivePeriod(60 * time.Second)
data, err := m.Recv()
if err != nil {
return nil, err
}
//前3个字节数据包的大小, 后面是用户名和密码用\n分割
if len(data) < 3 {
conn.Close()
return nil, ee.NewError("Packet is invalid")
}
if check != nil {
ss := strings.Split(string(data[3:]), "\n")
switch len(ss) {
case 0:
err = check("", "")
case 1:
err = check(ss[0], "")
m.user = ss[0]
case 2:
err = check(ss[0], ss[1])
m.user = ss[0]
m.pwd = ss[1]
default:
err = ee.NewError("Invalid user or name")
}
if err != nil {
m.Send([]byte(err.String()))
conn.Close()
return nil, err
}
}
m.maxSize = getInt(data)
if m.maxSize < PktMinSize {
err = ee.NewError("Buffer size too small")
m.Send([]byte(err.String()))
conn.Close()
return nil, err
}
m.buf = make([]byte, m.maxSize+headLen)
m.head = m.buf[:headLen]
m.data = m.buf[headLen:]
m.Send([]byte("OK"))
return m, err
}
//GetSize 最大值
func (m *Stream) GetSize() int {
return m.maxSize
}
//getInt
func getInt(b []byte) int {
return int((uint32(b[2]) | uint32(b[1])<<8 | uint32(b[0])<<16) & PktMask)
}
func putInt(b []byte, val int) {
b[0] = byte(val >> 16)
b[1] = byte(val >> 8)
b[2] = byte(val)
}
//connect 连接
func (m *Stream) connect() (er *ee.Error) {
if m.conn != nil {
m.conn = nil
}
c, err := net.DialTimeout("tcp", m.host, time.Second*30)
if err != nil {
return ee.NewError(err.Error())
}
conn, ok := c.(*net.TCPConn)
if !ok {
er = ee.NewError("not tcp connect")
return
}
//conn.SetNoDelay(true)
conn.SetKeepAlive(true)
conn.SetKeepAlivePeriod(60 * time.Second)
m.conn = conn
//发送包大小
buf := make([]byte, 3, 60)
putInt(buf, m.maxSize)
buf = append(buf, []byte(m.user+"\n"+m.pwd)...)
er = m.Send(buf)
if er != nil {
return
}
data, er := m.Recv()
if err != nil {
return
}
//log.Println(string(data))
ack := string(data)
if ack != "OK" {
er = ee.NewError(ack)
}
return
}
//IsGood 连接是否正常
func (m *Stream) IsGood() bool {
return m.isGood && (m.conn != nil)
}
//Open 打开连接
func (m *Stream) Open() *ee.Error {
m.Close()
err := m.connect()
if err == nil {
m.isGood = true
}
return err
}
//Close 关闭
func (m *Stream) Close() (er *ee.Error) {
if m.conn != nil {
m.conn.Close()
m.conn = nil
m.isGood = false
}
return
}
//Send 发送
func (m *Stream) Send(data []byte) (er *ee.Error) {
if m.conn == nil {
er = ee.NewError("Connect is not initialzie")
m.isGood = false
return
}
num := len(data)
mask := 0
if num > 256 { //小于256 不压缩
//加密
buf, err := lz4file.Encode(data)
if err != nil {
er = ee.NewError(err.Error())
return er
}
if len(buf) < num {
logs.Debug("压缩后大小:", len(buf), "源数据包大小:", num, "压缩掉的字节:", num-len(buf))
data = buf
mask = pktLz4Mask
num = len(data)
}
}
binary.BigEndian.PutUint32(m.head, MagicHead)
putInt(m.head[posSize:], num|mask)
copy(m.data, data)
_, err := m.conn.Write(m.buf[:num+headLen])
if err != nil {
m.isGood = false
er = ee.NewError(err.Error())
}
return
}
//Recv 接收
func (m *Stream) Recv() (data []byte, er *ee.Error) {
if m.conn == nil {
er = ee.NewError("Connect is not initialzie")
m.isGood = false
return
}
_, er = readTimeout(m.conn, m.head, time.Second*300)
if er != nil {
m.isGood = false
return
}
if binary.BigEndian.Uint32(m.head) != MagicHead {
er = ee.NewError("Check head error")
return
}
num := getInt(m.head[posSize:])
if num == 0 {
return nil, nil
}
data = m.data[:num]
_, er = readTimeout(m.conn, data, time.Second*300)
if er != nil {
m.isGood = false
return
}
if (m.head[posSize] & 0x80) == 0 {
return
}
//解压
data, err := lz4file.Decode(data)
if err != nil {
er = ee.NewError(err.Error())
}
return
}
func readTimeout(conn *net.TCPConn, data []byte, timeout time.Duration) (num int, er *ee.Error) {
length := len(data)
num = 0
err := conn.SetReadDeadline(time.Now().Add(time.Millisecond * time.Duration(timeout)))
if err != nil {
er = ee.NewError(err.Error())
return
}
for num < length {
n, err := conn.Read(data[num:]) ///*read(tcpconn, data[num:])*/
if err != nil {
return num, ee.NewError(err.Error())
}
if n > 0 {
conn.SetReadDeadline(time.Now().Add(time.Millisecond * time.Duration(timeout)))
num += n
}
}
return
}
Go
1
https://gitee.com/haodreams/golib.git
git@gitee.com:haodreams/golib.git
haodreams
golib
golib
212a885c2c3f

搜索帮助