代码拉取完成,页面将自动刷新
package main
import (
"context"
"fmt"
"gitee.com/liujinsuo/redis_queue"
"gitee.com/liujinsuo/tool/msg_group"
"github.com/go-redis/redis/v8"
"log"
"os"
"os/signal"
"reflect"
"strconv"
"syscall"
"time"
)
var NewMsgGroup = &msg_group.Graceful{}
var rdb = &redis.Client{}
func testType() {
{
type T2 struct {
}
var a T2
t := reflect.TypeOf(a)
fmt.Printf("string=%v kind=%v name=%v\n", t.String(), t.Kind(), t.Name())
}
{
type T3 struct {
}
var a T3
t := reflect.TypeOf(a)
fmt.Printf("string=%v kind=%v name=%v\n", t.String(), t.Kind(), t.Name())
}
{
type T3 int
var a T3
t := reflect.TypeOf(a)
fmt.Printf("string=%v kind=%v name=%v\n", t.String(), t.Kind(), t.Name())
}
{
type T3 int
var a T3
t := reflect.TypeOf(a)
fmt.Printf("string=%v kind=%v name=%v\n", t.String(), t.Kind(), t.Name())
}
{
type T4 msg_group.Graceful
var a T4
t := reflect.TypeOf(a)
fmt.Printf("string=%v kind=%v name=%v\n", t.String(), t.Kind(), t.Name())
}
}
func main() {
//testType()
//os.Exit(1)
log.SetFlags(log.LstdFlags | log.Lshortfile)
rdb = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "", // no password set
DB: 3, // use default DB
})
if err := rdb.Ping(context.Background()).Err(); err != nil {
panic(err)
}
NewMsgGroup = msg_group.NewGraceful()
options := msg_group.Options{
Rdb: rdb,
Graceful: NewMsgGroup,
Namespace: "tool_test", //
Expire: time.Second * 100, //队列超时时间,超时队列自动删除,消费者自动退出
QueueMaxLen: 1000, //队列最大元素个数,超出自动删除旧的数据
GroupStartID: redis_queue.GroupStartIDAll, //消费起始位置 只消费新数据,还是消费所有数据
ClaimMinIdle: time.Second * 10, //认领 待处理数据 的最小空闲时长,业务不同值也不同,主要看对消息的处理速度
ClaimCount: 100, //每次认领的消息个数,主要是为了提高性能
ConsumerCount: 100, //每次消费的数据个数,消费数据多可减少与redis的交互,节省流量
ConsumerBlock: time.Second * 2, //BLOCK为关键字,表示设置XREAD为阻塞模式,默认是非阻塞模式,milliseconds表示具体阻塞的时间。同XREAD命令。
ErrLogFunc: func(err error) {
log.Printf("错误 %+v", err)
},
InfoLogFunc: func(msg string) {
log.Println(msg)
},
}
mg := msg_group.NewMsgGroup[string]("bet", options, com)
if err := mg.Init(); err != nil {
log.Printf("初始化消息队列失败")
}
mg2 := msg_group.NewMsgGroup[string]("bet1", options, com)
if err := mg.Init(); err != nil {
log.Printf("初始化消息队列失败")
}
fmt.Println(mg2)
for i := 0; i < 2; i++ {
go func(i int) {
for j := 0; j < 100; j++ {
log.Printf("写数据user_id=%v data=%v\n", i, j)
fn1(strconv.Itoa(i), strconv.Itoa(j), mg)
//time.Sleep(time.Second)
}
}(i)
}
log.Printf("启动成功")
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
sign := <-quit //阻塞等待结束信号
log.Printf("服务器关闭1 %v", sign)
NewMsgGroup.Stop()
log.Printf("服务器关闭2 %v", sign)
NewMsgGroup.Wait()
}
func fn1(UserID string, data string, m *msg_group.MsgGroup[string]) {
QueueName := fmt.Sprintf("queue_%v", UserID)
if err := m.Publish(QueueName, data); err != nil {
log.Printf("添加数据失败 queueName=%v err=", err)
}
log.Printf("添加数据成功 queueName=%v", QueueName)
}
func com(dataList []string) error {
time.Sleep(time.Second * 1) //数据处理时长5秒
log.Printf("处理数据完成 data=%+v\n", dataList)
return nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。