1 Star 0 Fork 0

瑞哥/util

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
QueueInt64.go 5.75 KB
一键复制 编辑 原始数据 按行查看 历史
瑞哥 提交于 2023-03-22 00:54 . 还原成 0.0.38版本
package rdb
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
"strconv"
"sync"
)
type QueueInt64 interface {
TopicKey() string
RedisClient() *redis.Client
SetQueueInt64MaxLength(max uint64)
LPush(value int64) error
RPush(value int64) error
LPushList(list []int64) error
RPushList(list []int64) error
LPop() (int64, error)
RPop() (int64, error)
LPopList(count int) ([]int64, error)
RPopList(count int) ([]int64, error)
Count() (int64, error)
GetAll() ([]int64, error)
ClearAll() error
}
type _queueInt64 struct {
Topic string //主题
Redis *redis.Client //Redis客户端
MaxLength uint64 //队列最大容量
RWL sync.RWMutex //读写锁
}
func NewQueueInt64(client *redis.Client, topic string) QueueInt64 {
return &_queueInt64{
Topic: topic,
Redis: client,
RWL: sync.RWMutex{},
}
}
// TopicKey 返回默认生成的主题key
func (q *_queueInt64) TopicKey() string {
return fmt.Sprint("QueueInt64_", q.Topic)
}
// RedisClient 返回绑定的Redis客户端
func (q *_queueInt64) RedisClient() *redis.Client {
return q.Redis
}
// SetQueueInt64MaxLength 队列最大容量长度,0则无限长。
// 超出这个长度的时候,插入数据的同时,会往返方向移出,直到剩余长度等于最大容量长度。
func (q *_queueInt64) SetQueueInt64MaxLength(max uint64) {
q.RWL.Lock()
q.MaxLength = max
q.RWL.Unlock()
}
// LPush 左边插入 1 个
func (q *_queueInt64) LPush(value int64) error {
q.RWL.Lock()
err := q.Redis.LPush(context.Background(), q.TopicKey(), value).Err()
if q.MaxLength > 0 && err == nil {
result, _ := q.Redis.LLen(context.Background(), q.TopicKey()).Result()
if uint64(result)-q.MaxLength > 0 {
for i := 0; i < int(uint64(result)-q.MaxLength); i++ {
q.Redis.RPop(context.Background(), q.TopicKey())
}
}
}
q.RWL.Unlock()
return err
}
// RPush 右边插入 1 个
func (q *_queueInt64) RPush(value int64) error {
q.RWL.Lock()
err := q.Redis.RPush(context.Background(), q.TopicKey(), value).Err()
if q.MaxLength > 0 && err == nil {
result, _ := q.Redis.LLen(context.Background(), q.TopicKey()).Result()
if uint64(result)-q.MaxLength > 0 {
for i := 0; i < int(uint64(result)-q.MaxLength); i++ {
q.Redis.LPop(context.Background(), q.TopicKey())
}
}
}
q.RWL.Unlock()
return err
}
// LPushList 左边插入 N 个
func (q *_queueInt64) LPushList(list []int64) error {
q.RWL.Lock()
var temp []interface{}
for _, d := range list {
temp = append(temp, d)
}
err := q.Redis.LPush(context.Background(), q.TopicKey(), temp...).Err()
if q.MaxLength > 0 && err == nil {
result, _ := q.Redis.LLen(context.Background(), q.TopicKey()).Result()
if uint64(result)+uint64(len(list))-q.MaxLength > 0 {
for i := 0; i < int(uint64(result)+uint64(len(list))-q.MaxLength); i++ {
q.Redis.RPop(context.Background(), q.TopicKey())
}
}
}
q.RWL.Unlock()
return err
}
// RPushList 右边插入 N 个
func (q *_queueInt64) RPushList(list []int64) error {
q.RWL.Lock()
var temp []interface{}
for _, d := range list {
temp = append(temp, d)
}
err := q.Redis.RPush(context.Background(), q.TopicKey(), temp...).Err()
if q.MaxLength > 0 && err == nil {
result, _ := q.Redis.LLen(context.Background(), q.TopicKey()).Result()
if uint64(result)+uint64(len(list))-q.MaxLength > 0 {
for i := 0; i < int(uint64(result)+uint64(len(list))-q.MaxLength); i++ {
q.Redis.LPop(context.Background(), q.TopicKey())
}
}
}
q.RWL.Unlock()
return err
}
// LPop 左边移出 1 个
func (q *_queueInt64) LPop() (int64, error) {
q.RWL.Lock()
result, err := q.Redis.LPop(context.Background(), q.TopicKey()).Result()
q.RWL.Unlock()
val, _ := strconv.ParseInt(result, 10, 64)
return val, err
}
// RPop 右边移出 1 个
func (q *_queueInt64) RPop() (int64, error) {
q.RWL.Lock()
result, err := q.Redis.RPop(context.Background(), q.TopicKey()).Result()
q.RWL.Unlock()
val, _ := strconv.ParseInt(result, 10, 64)
return val, err
}
// LPopList 左边移出 N 个
func (q *_queueInt64) LPopList(count int) ([]int64, error) {
q.RWL.Lock()
var result []int64
var j = 0
for i := 0; i < count; i++ {
j++
temp, err := q.Redis.LPop(context.Background(), q.TopicKey()).Result()
if err != nil || j > count*2 {
if err == redis.Nil || j > count*2 {
break
}
i--
continue
}
val, _ := strconv.ParseInt(temp, 10, 64)
result = append(result, val)
}
q.RWL.Unlock()
if len(result) <= 0 {
return result, fmt.Errorf("empty")
}
return result, nil
}
// RPopList 右边移出 N 个
func (q *_queueInt64) RPopList(count int) ([]int64, error) {
q.RWL.Lock()
var result []int64
var j = 0
for i := 0; i < count; i++ {
j++
temp, err := q.Redis.RPop(context.Background(), q.TopicKey()).Result()
if err != nil || j > count*2 {
if err == redis.Nil || j > count*2 {
break
}
i--
continue
}
val, _ := strconv.ParseInt(temp, 10, 64)
result = append(result, val)
}
q.RWL.Unlock()
if len(result) <= 0 {
return result, fmt.Errorf("empty")
}
return result, nil
}
// Count 获取队列当前长度
func (q *_queueInt64) Count() (int64, error) {
q.RWL.RLock()
defer q.RWL.RUnlock()
result, err := q.Redis.LLen(context.Background(), q.TopicKey()).Result()
if err == nil || err == redis.Nil {
return result, nil
}
return result, err
}
// GetAll 获取队列中所有的记录
func (q *_queueInt64) GetAll() ([]int64, error) {
q.RWL.RLock()
result, err := q.Redis.LRange(context.Background(), q.TopicKey(), 0, -1).Result()
q.RWL.RUnlock()
var temp []int64
for _, d := range result {
val, _ := strconv.ParseInt(d, 10, 64)
temp = append(temp, val)
}
return temp, err
}
// ClearAll 清空所有的记录(直接删除key)
func (q *_queueInt64) ClearAll() error {
q.RWL.Lock()
err := q.Redis.Del(context.Background(), q.TopicKey()).Err()
q.RWL.Unlock()
return err
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/ruige_fun/util.git
git@gitee.com:ruige_fun/util.git
ruige_fun
util
util
v0.1.4

搜索帮助

23e8dbc6 1850385 7e0993f3 1850385