代码拉取完成,页面将自动刷新
package main
import (
"context"
"encoding/binary"
"flag"
"fmt"
"hash/crc32"
"math/rand"
"os"
"sync"
"sync/atomic"
"time"
"gitee.com/unignss/ntrip/nmea"
"gitee.com/unignss/ntrip/ntripclient"
)
// GlobalStats 存储 1K 客户端的汇总统计指标
type GlobalStats struct {
ConnectCount uint64
FailCount uint64
BytesSent uint64
BytesRecv uint64
PacketCount uint64
ErrorCount uint64
}
var (
stats GlobalStats
globalReqID uint64
)
func main() {
durationStr := flag.String("duration", "1m", "测试执行时长 (例如: 1m, 2m, 30s)")
totalClients := flag.Int("total", 1000, "模拟客户端总数")
flag.Parse()
execDuration, err := time.ParseDuration(*durationStr)
if err != nil {
fmt.Printf("无效的时长参数: %v\n", err)
os.Exit(1)
}
fmt.Printf(">>> 启动 %d NTRIP 压力测试 | 计划时长: %v <<<\n", *totalClients, execDuration)
ctx, cancel := context.WithTimeout(context.Background(), execDuration)
defer cancel()
var wg sync.WaitGroup
for i := 1; i <= *totalClients; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
runSimulatedClient(ctx, id)
}(i)
// 进一步放缓启动速度,避免冲击
if i%20 == 0 {
time.Sleep(100 * time.Millisecond)
}
}
// 监控协程:每秒打印一次即时带宽
go func() {
var lastRecv uint64
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
fmt.Println("ID | 状态 | 即时下行流量")
fmt.Println("-----------------------------")
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
current := atomic.LoadUint64(&stats.BytesRecv)
diff := current - lastRecv
lastRecv = current
fmt.Printf("ALL | 运行中... | %10.2f KB/s\n", float64(diff)/1024.0)
}
}
}()
// 等待执行结束或超时
<-ctx.Done()
fmt.Println("\n>>> 测试时长已到,正在收尾报告... <<<")
// 给一点点时间让客户端处理完最后的包
time.Sleep(1 * time.Second)
printFinalReport(execDuration)
}
func runSimulatedClient(ctx context.Context, id int) {
var user string
var startLat, startLon float64
if id <= 500 {
user = fmt.Sprintf("wh%03d", id)
startLat, startLon = 30.5, 114.3
} else {
user = fmt.Sprintf("gz%03d", id-500)
startLat, startLon = 23.1, 113.2
}
client := ntripclient.NewNtripClient()
client.SetHostPort("127.0.0.1", 2101)
client.SetMountPoint("MP1")
client.SetAuth(user, "pass")
var currentReqID uint64
buffer := make([]byte, 0, 4000)
client.SetHandler(ntripclient.EventHandler{
OnBeforeConnect: func() {
newID := atomic.AddUint64(&globalReqID, 1)
client.SetHeader("X-Request-ID", fmt.Sprintf("%d", newID))
currentReqID = newID
buffer = buffer[:0] // 重置缓冲区
},
OnConnectSuccess: func() {
atomic.AddUint64(&stats.ConnectCount, 1)
if id == 1 {
fmt.Printf("[Success] Client %d connected successfully. Sending initial GGA...\n", id)
}
// 立即上报一次位置,触发 Caster 判定 Region
lat := startLat + (rand.Float64()-0.5)*0.01
lon := startLon + (rand.Float64()-0.5)*0.01
g := nmea.NewNmeaGGA()
g.Type = "GPGGA"
g.Quality = "4"
g.SetLat(lat)
g.SetLng(lon)
g.UpdateUTCTime()
raw := g.Encode()
if err := client.SendGGA(raw); err == nil {
atomic.AddUint64(&stats.BytesSent, uint64(len(raw)))
if id == 1 {
fmt.Printf("[Log] Client %d sent initial GGA: %s", id, raw)
}
}
},
OnConnectErr: func(err error) {
atomic.AddUint64(&stats.FailCount, 1)
if id == 1 {
fmt.Printf("[Error] Client %d connection failed: %v\n", id, err)
}
},
OnData: func(data []byte) {
atomic.AddUint64(&stats.BytesRecv, uint64(len(data)))
buffer = append(buffer, data...)
for len(buffer) >= 2000 {
pkt := buffer[:2000]
receivedCRC := binary.BigEndian.Uint32(pkt[1996:])
calcCRC := crc32.ChecksumIEEE(pkt[:1996])
if receivedCRC == calcCRC {
reqID := binary.BigEndian.Uint64(pkt[4:12])
if reqID == currentReqID {
atomic.AddUint64(&stats.PacketCount, 1)
} else {
atomic.AddUint64(&stats.ErrorCount, 1)
if id == 1 {
fmt.Printf("[Error] ReqID mismatch: Got %d, Expected %d\n", reqID, currentReqID)
}
}
} else {
atomic.AddUint64(&stats.ErrorCount, 1)
if id == 1 {
fmt.Printf("[Error] CRC mismatch: Got %08X, Calc %08X\n", receivedCRC, calcCRC)
}
}
buffer = buffer[2000:]
}
},
})
client.Start()
defer client.Stop()
// 定位上报逻辑
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
gga := nmea.NewNmeaGGA()
gga.Type = "GPGGA"
gga.Quality = "4"
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
lat := startLat + (rand.Float64()-0.5)*0.01
lon := startLon + (rand.Float64()-0.5)*0.01
gga.SetLat(lat)
gga.SetLng(lon)
gga.UpdateUTCTime()
raw := gga.Encode()
if err := client.SendGGA(raw); err == nil {
atomic.AddUint64(&stats.BytesSent, uint64(len(raw)))
}
}
}
}
func printFinalReport(duration time.Duration) {
fmt.Println("\n==========================================")
fmt.Println(" NTRIP 1K 并发测试 最终报表 ")
fmt.Println("==========================================")
fmt.Printf("测试总时长: %v\n", duration)
fmt.Printf("建立连接总数: %d\n", atomic.LoadUint64(&stats.ConnectCount))
fmt.Printf("连接失败次数: %d\n", atomic.LoadUint64(&stats.FailCount))
fmt.Printf("有效数据包: %d\n", atomic.LoadUint64(&stats.PacketCount))
fmt.Printf("异常/CRC错误: %d\n", atomic.LoadUint64(&stats.ErrorCount))
fmt.Println("------------------------------------------")
sent := atomic.LoadUint64(&stats.BytesSent)
recv := atomic.LoadUint64(&stats.BytesRecv)
fmt.Printf("累计发送流量: %.2f MB\n", float64(sent)/(1024*1024))
fmt.Printf("累计接收流量: %.2f MB\n", float64(recv)/(1024*1024))
fmt.Printf("平均下行带宽: %.2f KB/s\n", float64(recv)/1024.0/duration.Seconds())
fmt.Println("==========================================")
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。