代码拉取完成,页面将自动刷新
package wall
//发送文件不合并
import (
"bytes"
"encoding/binary"
"errors"
"io/ioutil"
"net"
"time"
"gitee.com/haodreams/golib/logs"
"gitee.com/haodreams/libs/sock"
"gitee.com/haodreams/libs/config"
"github.com/pierrec/lz4"
)
// MaskLz4 .
const (
MaskLz4 = 0x8000
MaskNone = 0x7ff
MaxSize = 32*1024 - 1
)
// Send 发送
type Send struct {
interval int //扫描间隔
limitSpeed int //限速 K byte/s
ackChar byte //确认字符
remoteServer string //远程服务器地址
conn *net.TCPConn // tcp 连接
bytePerMS int //一毫秒最多发送的字节数
compressionLevel int //压缩级别
timeout time.Duration
}
// NewSend .
func NewSend(conf config.Configer) *Send {
s := new(Send)
s.limitSpeed = conf.DefaultInt("send_limit_speed", 0) //不限速
s.interval = conf.DefaultInt("send_scan_interval", 5)
s.remoteServer = conf.String("remote_server")
s.ackChar = byte(conf.DefaultInt("ack_char", 0))
s.timeout = time.Duration(conf.DefaultInt("timeout", 30)) * time.Second
s.compressionLevel = conf.DefaultInt("compression_level", 5)
if s.remoteServer == "" {
logs.Error("服务器地址为空。")
return nil
}
s.bytePerMS = s.limitSpeed * 1024 / 1000 //每毫秒发送的字节数
return s
}
// Reconnect 重新连接
func (m *Send) Reconnect() (err error) {
if m.conn == nil {
logs.Info("Connect to server,", m.remoteServer)
m.conn, err = sock.CreateTcpConnect(m.remoteServer)
if err != nil {
m.conn = nil
return
}
logs.Info("Connect to server success")
}
return
}
// Write .
func (m *Send) Write(data []byte) (num int, err error) {
num = len(data)
i := 0
for ; i < num; i += MaxSize {
pos := i + MaxSize
if pos > num {
pos = num
}
err = m.write(data[i:pos])
if err != nil {
return
}
}
if i < num {
err = m.write(data[i:num])
if err != nil {
return
}
}
return
}
// 发送一个数据包
func (m *Send) write(data []byte) (err error) {
//接收应答帧
mask, data := Lz4Encode(data)
l := uint16(len(data))
l |= mask
head := make([]byte, 2)
binary.BigEndian.PutUint16(head, l)
_, err = m.conn.Write(head)
if err != nil {
return
}
_, err = m.conn.Write(data)
if err != nil {
return
}
//30秒内要对数据包做出应答
err = m.conn.SetReadDeadline(time.Now().Add(m.timeout))
if err != nil {
m.conn.Close()
m.conn = nil
return
}
ack := make([]byte, 1)
_, err = m.conn.Read(ack)
if err != nil {
m.conn.Close()
m.conn = nil
return
}
if ack[0] != m.ackChar {
err = errors.New("ACK ERROR")
}
return
}
// Lz4Encode 最大32k
func Lz4Encode(data []byte, level ...int) (mask uint16, b []byte) {
buf := bytes.NewBuffer(nil)
zw := lz4.NewWriter(buf)
zw.BlockMaxSize = MaxSize
zw.CompressionLevel = 0
if len(level) > 0 {
zw.CompressionLevel = level[0]
}
_, err := zw.Write(data)
if err != nil {
return 0, data
}
err = zw.Close()
if err != nil {
return 0, data
}
if len(data) < buf.Len() {
return 0, data
}
return MaskLz4, buf.Bytes()
}
// Lz4Decode .
func Lz4Decode(data []byte) (b []byte, err error) {
mask := binary.BigEndian.Uint16(data)
if mask&MaskLz4 == MaskLz4 {
return data[2:], nil
}
zr := lz4.NewReader(bytes.NewBuffer(data[2:]))
b, err = ioutil.ReadAll(zr)
return
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。