Ai
1 Star 0 Fork 0

k8sio/cnkit

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
delayqueue_test.go 9.04 KB
一键复制 编辑 原始数据 按行查看 历史
yuanhack 提交于 2024-09-06 11:23 +08:00 . delayqueue example verify
package delayqueue
import (
"context"
"strconv"
"sync"
"testing"
"time"
"github.com/alicebob/miniredis/v2"
"github.com/go-redis/redis/v8"
"github.com/stretchr/testify/assert"
)
func TestDelayQueue_consume(t *testing.T) {
minired, err := miniredis.Run()
if err != nil {
panic(err)
}
defer minired.Close()
redisCli := redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: []string{
minired.Addr(),
},
})
size := 10
retryCount := 3
deliveryCount := make(map[string]int)
cb := func(s string) bool {
deliveryCount[s]++
i, _ := strconv.ParseInt(s, 10, 64)
return i%2 == 0
}
client := NewRedisV8Wrapper(redisCli)
queue := NewQueue0("test111", client, UseHashTagKey()).
WithCallback(cb).
WithFetchInterval(time.Millisecond * 50).
WithMaxConsumeDuration(0).
WithFetchLimit(2)
for i := 0; i < size; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount), WithMsgTTL(time.Hour))
if err != nil {
t.Error(err)
}
}
for i := 0; i < 10*size; i++ {
err := queue.consume()
if err != nil {
t.Errorf("consume error: %v", err)
return
}
}
for k, v := range deliveryCount {
i, _ := strconv.ParseInt(k, 10, 64)
if i%2 == 0 {
if v != 1 {
t.Errorf("expect 1 delivery, actual %d", v)
}
} else {
if v != retryCount+1 {
t.Errorf("expect %d delivery, actual %d", retryCount+1, v)
}
}
}
}
func TestDelayQueueOnCluster(t *testing.T) {
redisCli := redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: []string{
"127.0.0.1:7000",
"127.0.0.1:7001",
"127.0.0.1:7002",
},
})
redisCli.FlushDB(context.Background())
size := 1000
succeed := 0
cb := func(s string) bool {
succeed++
return true
}
client := NewRedisV8Wrapper(redisCli)
queue := NewQueue0("test", client, cb).
WithFetchInterval(time.Millisecond * 50).
WithMaxConsumeDuration(0).
WithFetchLimit(2).
WithConcurrent(1)
for i := 0; i < size; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), 0)
if err != nil {
t.Error(err)
}
}
for i := 0; i < 10*size; i++ {
err := queue.consume()
if err != nil {
t.Errorf("consume error: %v", err)
return
}
}
if succeed != size {
t.Error("msg not consumed")
}
}
func TestDelayQueue_ConcurrentConsume(t *testing.T) {
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
redisCli.FlushDB(context.Background())
size := 101 // use a prime number may found some hidden bugs ^_^
retryCount := 3
mu := sync.Mutex{}
deliveryCount := make(map[string]int)
cb := func(s string) bool {
mu.Lock()
deliveryCount[s]++
mu.Unlock()
return true
}
client := NewRedisV8Wrapper(redisCli)
queue := NewQueue0("test", client, cb).
WithFetchInterval(time.Millisecond * 50).
WithMaxConsumeDuration(0).
WithConcurrent(4)
for i := 0; i < size; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount), WithMsgTTL(time.Hour))
if err != nil {
t.Error(err)
}
}
for i := 0; i < 2*size; i++ {
err := queue.consume()
if err != nil {
t.Errorf("consume error: %v", err)
return
}
}
for k, v := range deliveryCount {
if v != 1 {
t.Errorf("expect 1 delivery, actual %d. key: %s", v, k)
}
}
}
func TestDelayQueue_StopConsume(t *testing.T) {
size := 100
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
redisCli.FlushDB(context.Background())
var queue *DelayQueue
var received int
var dofinish int
client := NewRedisV8Wrapper(redisCli)
queue = NewQueue0("test", client, func(s string) bool {
received++
if received == size/5 {
queue.StopConsume()
t.Log("send stop signal")
}
time.Sleep(3 * time.Second)
dofinish++
return true
}).WithDefaultRetryCount(0).
WithFetchLimit(10).
WithConcurrent(10).
WithMaxConsumeDuration(10 * time.Second)
for i := 0; i < size; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), 0)
if err != nil {
t.Errorf("send message failed: %v", err)
}
}
done := queue.StartConsume()
<-done
// 理论耗时30秒左右
assert.Equal(t, size/5, dofinish)
}
func TestDelayQueue_Massive_Backlog(t *testing.T) {
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
redisCli.FlushDB(context.Background())
size := 20000
retryCount := 3
cb := func(s string) bool {
return false
}
client := NewRedisV8Wrapper(redisCli)
q := NewQueue0("test", client, cb).
WithFetchInterval(time.Millisecond * 50).
WithMaxConsumeDuration(0).
WithFetchLimit(0)
for i := 0; i < size; i++ {
err := q.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount))
if err != nil {
t.Error(err)
}
}
err := q.pending2Ready()
if err != nil {
t.Error(err)
return
}
// consume
_, err = q.ready2Unack()
if err != nil && err != NilErr {
t.Error(err)
return
}
err = q.unack2Retry()
if err != nil {
t.Error(err)
return
}
unackCard, err := redisCli.ZCard(context.Background(), q.unAckKey).Result()
if err != nil {
t.Error(err)
return
}
if unackCard != 0 {
t.Error("unack card should be 0")
return
}
retryLen, err := redisCli.LLen(context.Background(), q.retryKey).Result()
if err != nil {
t.Error(err)
return
}
if int(retryLen) != size {
t.Errorf("unack card should be %d", size)
return
}
}
func TestDelayQueue_garbage_collect(t *testing.T) {
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
redisCli.FlushDB(context.Background())
cb := func(s string) bool {
return false
}
client := NewRedisV8Wrapper(redisCli)
q := NewQueue0("test", client, cb).
WithFetchInterval(time.Millisecond * 50).
WithMaxConsumeDuration(0).
WithFetchLimit(1)
msgIds := []interface{}{"1", "2", "3"}
err := redisCli.SAdd(context.TODO(), q.garbageKey, msgIds...).Err()
if err != nil {
t.Error(err)
return
}
msgKeys := make([]string, 0, len(msgIds))
for _, idStr := range msgIds {
key := q.genMsgKey(idStr.(string))
msgKeys = append(msgKeys, key)
err := redisCli.SetEX(context.TODO(), key, "value", 60*time.Second).Err()
if err != nil {
t.Error(err)
return
}
}
for _, idStr := range msgIds {
key := q.genMsgKey(idStr.(string))
exists, err := redisCli.Exists(context.TODO(), key).Result()
if err != nil {
t.Error(err)
return
}
if exists != 1 {
t.Errorf("key: %s should be exists", key)
continue
}
has, err := redisCli.SIsMember(context.TODO(), q.garbageKey, idStr).Result()
if err != nil {
t.Error(err)
return
}
if !has {
t.Errorf("idStr: %s should be exists", idStr)
continue
}
}
q.garbageCollect()
for _, idStr := range msgIds {
key := q.genMsgKey(idStr.(string))
exists, err := redisCli.Exists(context.TODO(), key).Result()
if err != nil {
t.Error(err)
return
}
if exists == 1 {
t.Errorf("key: %s should be deleted", key)
continue
}
has, err := redisCli.SIsMember(context.TODO(), q.garbageKey, idStr).Result()
if err != nil {
t.Error(err)
return
}
if has {
t.Errorf("idStr: %s should be deleted", idStr)
continue
}
}
}
func TestDelayQueue_delay(t *testing.T) {
size := 100
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
redisCli.FlushDB(context.Background())
var queue *DelayQueue
var delivery int
var received int
delay := 6
client := NewRedisV8Wrapper(redisCli)
queue = NewQueue0("test", client, func(s string) bool {
delivery++
if delivery == size {
queue.StopConsume()
}
i, _ := strconv.Atoi(s)
if cost := time.Now().Unix() - int64(i); cost >= int64(delay) {
received++
}
return true
}).WithFetchInterval(time.Second).
WithMaxConsumeDuration(0).
WithFetchLimit(13)
for i := 0; i < size; i++ {
err := queue.SendDelayMsg(strconv.Itoa(int(time.Now().Unix())), time.Duration(delay)*time.Second)
if err != nil {
t.Errorf("send message failed: %v", err)
}
time.Sleep(100 * time.Millisecond)
}
done := queue.StartConsume()
<-done
if received != delivery || received != size {
t.Errorf("delay invalid")
}
}
func TestDelayQueue_BatchSend(t *testing.T) {
minired, err := miniredis.Run()
if err != nil {
panic(err)
}
defer minired.Close()
redisCli := redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: []string{minired.Addr()},
})
size := 1000
retryCount := 3
deliveryCount := make(map[string]int)
cb := func(s string) bool {
deliveryCount[s]++
i, _ := strconv.ParseInt(s, 10, 64)
return i%2 == 0
}
client := NewRedisV8Wrapper(redisCli)
queue := NewQueue0("test", client, UseHashTagKey()).
WithCallback(cb).
WithFetchInterval(time.Millisecond * 50).
WithMaxConsumeDuration(0).
WithFetchLimit(2)
msgs := []string{}
for i := 0; i < size; i++ {
msgs = append(msgs, strconv.Itoa(i))
}
err = queue.SendDelayMsgBatch(msgs, 0, 100, WithRetryCount(retryCount), WithMsgTTL(time.Hour))
if err != nil {
t.Error(err)
}
for i := 0; i < 10*size; i++ {
err := queue.consume()
if err != nil {
t.Errorf("consume error: %v", err)
return
}
}
for k, v := range deliveryCount {
i, _ := strconv.ParseInt(k, 10, 64)
if i%2 == 0 {
if v != 1 {
t.Errorf("expect 1 delivery, actual %d", v)
}
} else {
if v != retryCount+1 {
t.Errorf("expect %d delivery, actual %d", retryCount+1, v)
}
}
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/k8sio/cnkit.git
git@gitee.com:k8sio/cnkit.git
k8sio
cnkit
cnkit
master

搜索帮助