2 Star 0 Fork 0

hero/momo

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
queue.go 3.99 KB
一键复制 编辑 原始数据 按行查看 历史
hero 提交于 2024-11-04 11:45 . upd:目录结构调整
package service
import (
"context"
"fmt"
"runtime"
"sync"
"time"
)
type IQueue[T any] interface {
// SetStatus 设置队列运行状态
SetConsumerRunning(status bool) bool
// Status 返回队列全部状态
GetStatus() interface{}
// 根据key查询对应的值
GetConsumerRunning() bool
// SetStatus 设置队列运行状态
SetConsumerCount(count int) int
// Enqueue 将项目放入队列
Enqueue(ctx context.Context, item T) error
// Dequeue 从队列中取出项目
Dequeue(ctx context.Context) (T, error)
// 启动消费者
RunConsume(ctx context.Context, message_processor func(T))
// Close 关闭队列
Close()
// WaitUntilEmpty 等待队列为空
WaitUntilEmpty()
}
type MemoryQueue[T any] struct {
Name string // 队列的名称
Capacity int
ConsumerRunning bool
ConsumerCount int
ConsumeSpeed int
queue chan T
wg sync.WaitGroup
}
func NewMemoryQueue[T any](name string, capacity int, consumeSpeed int) *MemoryQueue[T] {
return &MemoryQueue[T]{
Name: name,
Capacity: capacity,
ConsumerRunning: false,
ConsumerCount: 0,
ConsumeSpeed: consumeSpeed,
queue: make(chan T, capacity),
}
}
func (q *MemoryQueue[T]) SetConsumerRunning(status bool) bool {
q.ConsumerRunning = status
return q.ConsumerRunning
}
func (q *MemoryQueue[T]) GetStatus() interface{} {
// 获取当前 goroutine 数量
numGoroutines := runtime.NumGoroutine()
return map[string]interface{}{
"name": q.Name,
"size": len(q.queue),
"capacity": q.Capacity,
"consumerRunning": q.ConsumerRunning,
"consumerCount": q.ConsumerCount,
"consumeSpeed": q.ConsumeSpeed,
"goroutines": numGoroutines,
}
}
func (q *MemoryQueue[T]) GetConsumerRunning() bool {
return q.ConsumerRunning
}
func (q *MemoryQueue[T]) SetConsumerCount(count int) int {
q.ConsumerCount = count
return q.ConsumerCount
}
func (q *MemoryQueue[T]) GetConsumerCount() int {
return q.ConsumerCount
}
func (q *MemoryQueue[T]) Enqueue(ctx context.Context, item T) error {
select {
case <-ctx.Done():
return ctx.Err()
case q.queue <- item:
q.wg.Add(1)
return nil
}
}
// Dequeue 从队列中取出一个项目
func (q *MemoryQueue[T]) Dequeue(ctx context.Context) (T, error) {
select {
case <-ctx.Done():
// 如果上下文已被取消,返回空值和上下文错误
return *new(T), ctx.Err()
case item := <-q.queue:
q.wg.Done()
return item, nil
}
}
func (q *MemoryQueue[T]) Close() {
close(q.queue)
}
func (q *MemoryQueue[T]) WaitUntilEmpty() {
q.wg.Wait()
}
func (q *MemoryQueue[T]) RunConsume(ctx context.Context, message_processor func(T)) {
fmt.Println("Consumption started.")
for {
select {
case <-ctx.Done():
fmt.Println("Consumption stopped.")
return
default:
// 检查队列是否为空
if len(q.queue) == 0 {
// 如果队列为空,稍微延迟一段时间后继续检查
time.Sleep(time.Millisecond * 100)
continue
}
// 启动一个 goroutine 来异步执行 message_processor 操作
item, err := q.Dequeue(ctx)
fmt.Printf("我已从队列里移除item: %+v\n", item)
if err != nil {
fmt.Printf("Error consume data: %v\n", err)
return
}
go func() {
message_processor(item)
}()
// 控制消费速率,等待一段时间后再进行下一次消费
time.Sleep(time.Second / time.Duration(q.ConsumeSpeed))
}
}
}
// 声明并初始化全局的 IntQueue 变量
var IntQueue = NewMemoryQueue[any]("intQueue", 100, 1)
var StringQueue = NewMemoryQueue[any]("stringQueue", 100, 1)
var GeniricQueue = NewMemoryQueue[any]("geniricQueue", 100, 1)
func GetQueueByName(name string) IQueue[any] {
switch name {
case IntQueue.Name:
return IntQueue
case StringQueue.Name:
return StringQueue
case GeniricQueue.Name:
return GeniricQueue
default:
// 如果没有找到对应名称的队列,返回nil
return nil
}
}
func GetQueues() []IQueue[any] {
var queues []IQueue[any]
queues = append(queues, IntQueue, StringQueue, GeniricQueue)
return queues
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/linqwen/momo.git
git@gitee.com:linqwen/momo.git
linqwen
momo
momo
v1.1.21

搜索帮助

0d507c66 1850385 C8b1a773 1850385