1 Star 0 Fork 12

hlwqds / nsq

forked from Gitee 极速下载 / NSQ 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
bench_reader.go 2.90 KB
一键复制 编辑 原始数据 按行查看 历史
Matt Reiferson 提交于 2015-04-12 13:00 . s/bitly/nsqio
package main
import (
"bufio"
"flag"
"log"
"net"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/nsqio/go-nsq"
)
var (
runfor = flag.Duration("runfor", 10*time.Second, "duration of time to run")
tcpAddress = flag.String("nsqd-tcp-address", "127.0.0.1:4150", "<addr>:<port> to connect to nsqd")
size = flag.Int("size", 200, "size of messages")
topic = flag.String("topic", "sub_bench", "topic to receive messages on")
channel = flag.String("channel", "ch", "channel to receive messages on")
deadline = flag.String("deadline", "", "deadline to start the benchmark run")
rdy = flag.Int("rdy", 2500, "RDY count to use")
)
var totalMsgCount int64
func main() {
flag.Parse()
var wg sync.WaitGroup
log.SetPrefix("[bench_reader] ")
goChan := make(chan int)
rdyChan := make(chan int)
workers := runtime.GOMAXPROCS(0)
for j := 0; j < workers; j++ {
wg.Add(1)
go func(id int) {
subWorker(*runfor, workers, *tcpAddress, *topic, *channel, rdyChan, goChan, id)
wg.Done()
}(j)
<-rdyChan
}
if *deadline != "" {
t, err := time.Parse("2006-01-02 15:04:05", *deadline)
if err != nil {
log.Fatal(err)
}
d := t.Sub(time.Now())
log.Printf("sleeping until %s (%s)", t, d)
time.Sleep(d)
}
start := time.Now()
close(goChan)
wg.Wait()
end := time.Now()
duration := end.Sub(start)
tmc := atomic.LoadInt64(&totalMsgCount)
log.Printf("duration: %s - %.03fmb/s - %.03fops/s - %.03fus/op",
duration,
float64(tmc*int64(*size))/duration.Seconds()/1024/1024,
float64(tmc)/duration.Seconds(),
float64(duration/time.Microsecond)/float64(tmc))
}
func subWorker(td time.Duration, workers int, tcpAddr string, topic string, channel string, rdyChan chan int, goChan chan int, id int) {
conn, err := net.DialTimeout("tcp", tcpAddr, time.Second)
if err != nil {
panic(err.Error())
}
conn.Write(nsq.MagicV2)
rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
ci := make(map[string]interface{})
ci["short_id"] = "test"
ci["long_id"] = "test"
cmd, _ := nsq.Identify(ci)
cmd.WriteTo(rw)
nsq.Subscribe(topic, channel).WriteTo(rw)
rdyChan <- 1
<-goChan
nsq.Ready(*rdy).WriteTo(rw)
rw.Flush()
nsq.ReadResponse(rw)
nsq.ReadResponse(rw)
var msgCount int64
go func() {
time.Sleep(td)
conn.Close()
}()
for {
resp, err := nsq.ReadResponse(rw)
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") {
break
}
panic(err.Error())
}
frameType, data, err := nsq.UnpackResponse(resp)
if err != nil {
panic(err.Error())
}
if frameType == nsq.FrameTypeError {
panic(string(data))
} else if frameType == nsq.FrameTypeResponse {
continue
}
msg, err := nsq.DecodeMessage(data)
if err != nil {
panic(err.Error())
}
nsq.Finish(msg.ID).WriteTo(rw)
msgCount++
if float64(msgCount%int64(*rdy)) > float64(*rdy)*0.75 {
rw.Flush()
}
}
atomic.AddInt64(&totalMsgCount, msgCount)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/hlwqds/nsq.git
git@gitee.com:hlwqds/nsq.git
hlwqds
nsq
nsq
v0.3.6

搜索帮助

344bd9b3 5694891 D2dac590 5694891