代码拉取完成,页面将自动刷新
package controllers
import (
"context"
"fmt"
"runtime"
"sync"
queue "gitee.com/linqwen/momo/app/srv/queue/service"
"gitee.com/linqwen/momo/igin"
"gitee.com/linqwen/momo/utils"
)
func QueueRoutes(r igin.IRouterGroup) {
router := r.Group("queue")
{
router.GET("/", List)
router.GET("/consumers/", getConsumerStatusAll)
router.POST("/consumer/:name/start/", setConsumerStart)
router.POST("/consumer/:name/stop", setConsumerStop)
router.GET("/consumer/:name/", getConsumerStatusByName)
router.POST("/publisher/:name", setPublisher)
}
}
var (
consumerCtx context.Context
cancelConsumer context.CancelFunc
consumerLock sync.Mutex
)
func setConsumerStart(c igin.IContext) {
consumerLock.Lock()
defer consumerLock.Unlock()
name := c.Param("name")
q := queue.GetQueueByName(name)
if q == nil {
utils.ResponseApiError(c, 400, "error", fmt.Sprintf("%s :queue is not exist", name))
}
if q.GetConsumerRunning() {
utils.ResponseApiError(c, 400, "error", fmt.Sprintf("Consumer: %s is already running", name))
return
}
consumerCtx, cancelConsumer = context.WithCancel(context.Background())
// 启动3个消费者goroutine并发执行
// for i := 0; i < 3; i++ {
// go q.RunConsume(consumerCtx, queue.CreateData)
// }
q.SetConsumerRunning(true)
q.SetConsumerCount(3)
utils.ResponseApiOk(c, "OK", q.GetStatus())
}
func setConsumerStop(c igin.IContext) {
name := c.Param("name")
q := queue.GetQueueByName(name)
// 尝试将接口类型转换为具体的 MemoryQueue 类型
memoryQueue, ok := q.(*queue.MemoryQueue[any])
if !ok {
// 如果转换失败,说明队列不是 MemoryQueue 类型,返回错误
utils.ResponseApiError(c, 400, "error", fmt.Sprintf("%s :queue is not exist", name))
return
}
consumerLock.Lock()
defer consumerLock.Unlock()
if !memoryQueue.ConsumerRunning {
utils.ResponseApiError(c, 400, "error", fmt.Sprintf("%s :Consumer is not running", name))
return
}
cancelConsumer()
memoryQueue.SetConsumerRunning(false)
utils.ResponseApiOk(c, "OK", fmt.Sprintf("Consumer:%s Running is : %v", name, q.GetConsumerRunning()))
}
func getConsumerStatusByName(c igin.IContext) {
name := c.Param("name")
q := queue.GetQueueByName(name)
if q == nil {
utils.ResponseApiError(c, 400, "error", fmt.Sprintf("%s :queue is not exist", name))
}
// 获取当前 goroutine 数量
numGoroutines := runtime.NumGoroutine()
fmt.Printf("当前活跃的 goroutine 数量:%d\n", numGoroutines)
utils.ResponseApiOk(c, "OK", q.GetStatus())
}
func getConsumerStatusAll(c igin.IContext) {
q := queue.GetQueues()
utils.ResponseApiOk(c, "OK", q)
}
func setPublisher(c igin.IContext) {
name := c.Param("name")
q := queue.GetQueueByName(name)
if q == nil {
utils.ResponseApiError(c, 400, "error", fmt.Sprintf("%s :queue is not exist", name))
}
ctx := context.Background()
for i := 0; i < 10; i++ {
if err := q.Enqueue(ctx, i); err != nil {
utils.ResponseApiError(c, 400, "error", err)
break
}
}
utils.ResponseApiOk(c, "OK", q.GetStatus())
}
// ListCache 列出所有缓存
func List(c igin.IContext) {
q := queue.GetQueues()
var records []interface{}
for _, item := range q {
records = append(records, item)
}
result := utils.PaginationResult{
Current: 1,
Size: 100,
Total: len(q),
Pages: len(q) / 2,
Records: records,
}
utils.ResponseApiOk(c, "ok", result)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。