代码拉取完成,页面将自动刷新
package service
import (
"context"
"fmt"
"runtime"
"sync"
"time"
)
type IQueue[T any] interface {
// SetStatus 设置队列运行状态
SetConsumerRunning(status bool) bool
// Status 返回队列全部状态
GetStatus() interface{}
// 根据key查询对应的值
GetConsumerRunning() bool
// SetStatus 设置队列运行状态
SetConsumerCount(count int) int
// Enqueue 将项目放入队列
Enqueue(ctx context.Context, item T) error
// Dequeue 从队列中取出项目
Dequeue(ctx context.Context) (T, error)
// 启动消费者
RunConsume(ctx context.Context, message_processor func(T))
// Close 关闭队列
Close()
// WaitUntilEmpty 等待队列为空
WaitUntilEmpty()
}
type MemoryQueue[T any] struct {
Name string // 队列的名称
Capacity int
ConsumerRunning bool
ConsumerCount int
ConsumeSpeed int
queue chan T
wg sync.WaitGroup
}
func NewMemoryQueue[T any](name string, capacity int, consumeSpeed int) *MemoryQueue[T] {
return &MemoryQueue[T]{
Name: name,
Capacity: capacity,
ConsumerRunning: false,
ConsumerCount: 0,
ConsumeSpeed: consumeSpeed,
queue: make(chan T, capacity),
}
}
func (q *MemoryQueue[T]) SetConsumerRunning(status bool) bool {
q.ConsumerRunning = status
return q.ConsumerRunning
}
func (q *MemoryQueue[T]) GetStatus() interface{} {
// 获取当前 goroutine 数量
numGoroutines := runtime.NumGoroutine()
return map[string]interface{}{
"name": q.Name,
"size": len(q.queue),
"capacity": q.Capacity,
"consumerRunning": q.ConsumerRunning,
"consumerCount": q.ConsumerCount,
"consumeSpeed": q.ConsumeSpeed,
"goroutines": numGoroutines,
}
}
func (q *MemoryQueue[T]) GetConsumerRunning() bool {
return q.ConsumerRunning
}
func (q *MemoryQueue[T]) SetConsumerCount(count int) int {
q.ConsumerCount = count
return q.ConsumerCount
}
func (q *MemoryQueue[T]) GetConsumerCount() int {
return q.ConsumerCount
}
func (q *MemoryQueue[T]) Enqueue(ctx context.Context, item T) error {
select {
case <-ctx.Done():
return ctx.Err()
case q.queue <- item:
q.wg.Add(1)
return nil
}
}
// Dequeue 从队列中取出一个项目
func (q *MemoryQueue[T]) Dequeue(ctx context.Context) (T, error) {
select {
case <-ctx.Done():
// 如果上下文已被取消,返回空值和上下文错误
return *new(T), ctx.Err()
case item := <-q.queue:
q.wg.Done()
return item, nil
}
}
func (q *MemoryQueue[T]) Close() {
close(q.queue)
}
func (q *MemoryQueue[T]) WaitUntilEmpty() {
q.wg.Wait()
}
func (q *MemoryQueue[T]) RunConsume(ctx context.Context, message_processor func(T)) {
fmt.Println("Consumption started.")
for {
select {
case <-ctx.Done():
fmt.Println("Consumption stopped.")
return
default:
// 检查队列是否为空
if len(q.queue) == 0 {
// 如果队列为空,稍微延迟一段时间后继续检查
time.Sleep(time.Millisecond * 100)
continue
}
// 启动一个 goroutine 来异步执行 message_processor 操作
item, err := q.Dequeue(ctx)
fmt.Printf("我已从队列里移除item: %+v\n", item)
if err != nil {
fmt.Printf("Error consume data: %v\n", err)
return
}
go func() {
message_processor(item)
}()
// 控制消费速率,等待一段时间后再进行下一次消费
time.Sleep(time.Second / time.Duration(q.ConsumeSpeed))
}
}
}
// 声明并初始化全局的 IntQueue 变量
var IntQueue = NewMemoryQueue[any]("intQueue", 100, 1)
var StringQueue = NewMemoryQueue[any]("stringQueue", 100, 1)
var GeniricQueue = NewMemoryQueue[any]("geniricQueue", 100, 1)
func GetQueueByName(name string) IQueue[any] {
switch name {
case IntQueue.Name:
return IntQueue
case StringQueue.Name:
return StringQueue
case GeniricQueue.Name:
return GeniricQueue
default:
// 如果没有找到对应名称的队列,返回nil
return nil
}
}
func GetQueues() []IQueue[any] {
var queues []IQueue[any]
queues = append(queues, IntQueue, StringQueue, GeniricQueue)
return queues
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。