代码拉取完成,页面将自动刷新
同步操作将从 Gitee 极速下载/NSQ 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
package main
import (
"bufio"
"flag"
"log"
"net"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/bitly/go-nsq"
)
var (
runfor = flag.Duration("runfor", 10*time.Second, "duration of time to run")
tcpAddress = flag.String("nsqd-tcp-address", "127.0.0.1:4150", "<addr>:<port> to connect to nsqd")
topic = flag.String("topic", "sub_bench", "topic to receive messages on")
size = flag.Int("size", 200, "size of messages")
batchSize = flag.Int("batch-size", 200, "batch size of messages")
deadline = flag.String("deadline", "", "deadline to start the benchmark run")
)
var totalMsgCount int64
func main() {
flag.Parse()
var wg sync.WaitGroup
log.SetPrefix("[bench_writer] ")
msg := make([]byte, *size)
batch := make([][]byte, *batchSize)
for i := range batch {
batch[i] = msg
}
goChan := make(chan int)
rdyChan := make(chan int)
for j := 0; j < runtime.GOMAXPROCS(0); j++ {
wg.Add(1)
go func() {
pubWorker(*runfor, *tcpAddress, *batchSize, batch, *topic, rdyChan, goChan)
wg.Done()
}()
<-rdyChan
}
if *deadline != "" {
t, err := time.Parse("2006-01-02 15:04:05", *deadline)
if err != nil {
log.Fatal(err)
}
d := t.Sub(time.Now())
log.Printf("sleeping until %s (%s)", t, d)
time.Sleep(d)
}
start := time.Now()
close(goChan)
wg.Wait()
end := time.Now()
duration := end.Sub(start)
tmc := atomic.LoadInt64(&totalMsgCount)
log.Printf("duration: %s - %.03fmb/s - %.03fops/s - %.03fus/op",
duration,
float64(tmc*int64(*size))/duration.Seconds()/1024/1024,
float64(tmc)/duration.Seconds(),
float64(duration/time.Microsecond)/float64(tmc))
}
func pubWorker(td time.Duration, tcpAddr string, batchSize int, batch [][]byte, topic string, rdyChan chan int, goChan chan int) {
conn, err := net.DialTimeout("tcp", tcpAddr, time.Second)
if err != nil {
panic(err.Error())
}
conn.Write(nsq.MagicV2)
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
rdyChan <- 1
<-goChan
var msgCount int64
endTime := time.Now().Add(td)
for {
cmd, _ := nsq.MultiPublish(topic, batch)
_, err := cmd.WriteTo(rw)
if err != nil {
panic(err.Error())
}
err = rw.Flush()
if err != nil {
panic(err.Error())
}
resp, err := nsq.ReadResponse(rw)
if err != nil {
panic(err.Error())
}
frameType, data, err := nsq.UnpackResponse(resp)
if err != nil {
panic(err.Error())
}
if frameType == nsq.FrameTypeError {
panic(string(data))
}
msgCount += int64(len(batch))
if time.Now().After(endTime) {
break
}
}
atomic.AddInt64(&totalMsgCount, msgCount)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。