2 Star 0 Fork 0

hero/momo

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
queue_controller.go 3.30 KB
一键复制 编辑 原始数据 按行查看 历史
hero 提交于 2024-11-04 16:53 . add:igin
package controllers
import (
"context"
"fmt"
"runtime"
"sync"
queue "gitee.com/linqwen/momo/app/srv/queue/service"
"gitee.com/linqwen/momo/igin"
"gitee.com/linqwen/momo/utils"
)
func QueueRoutes(r igin.IRouterGroup) {
router := r.Group("queue")
{
router.GET("/", List)
router.GET("/consumers/", getConsumerStatusAll)
router.POST("/consumer/:name/start/", setConsumerStart)
router.POST("/consumer/:name/stop", setConsumerStop)
router.GET("/consumer/:name/", getConsumerStatusByName)
router.POST("/publisher/:name", setPublisher)
}
}
var (
consumerCtx context.Context
cancelConsumer context.CancelFunc
consumerLock sync.Mutex
)
func setConsumerStart(c igin.IContext) {
consumerLock.Lock()
defer consumerLock.Unlock()
name := c.Param("name")
q := queue.GetQueueByName(name)
if q == nil {
utils.ResponseApiError(c, 400, "error", fmt.Sprintf("%s :queue is not exist", name))
}
if q.GetConsumerRunning() {
utils.ResponseApiError(c, 400, "error", fmt.Sprintf("Consumer: %s is already running", name))
return
}
consumerCtx, cancelConsumer = context.WithCancel(context.Background())
// 启动3个消费者goroutine并发执行
// for i := 0; i < 3; i++ {
// go q.RunConsume(consumerCtx, queue.CreateData)
// }
q.SetConsumerRunning(true)
q.SetConsumerCount(3)
utils.ResponseApiOk(c, "OK", q.GetStatus())
}
func setConsumerStop(c igin.IContext) {
name := c.Param("name")
q := queue.GetQueueByName(name)
// 尝试将接口类型转换为具体的 MemoryQueue 类型
memoryQueue, ok := q.(*queue.MemoryQueue[any])
if !ok {
// 如果转换失败,说明队列不是 MemoryQueue 类型,返回错误
utils.ResponseApiError(c, 400, "error", fmt.Sprintf("%s :queue is not exist", name))
return
}
consumerLock.Lock()
defer consumerLock.Unlock()
if !memoryQueue.ConsumerRunning {
utils.ResponseApiError(c, 400, "error", fmt.Sprintf("%s :Consumer is not running", name))
return
}
cancelConsumer()
memoryQueue.SetConsumerRunning(false)
utils.ResponseApiOk(c, "OK", fmt.Sprintf("Consumer:%s Running is : %v", name, q.GetConsumerRunning()))
}
func getConsumerStatusByName(c igin.IContext) {
name := c.Param("name")
q := queue.GetQueueByName(name)
if q == nil {
utils.ResponseApiError(c, 400, "error", fmt.Sprintf("%s :queue is not exist", name))
}
// 获取当前 goroutine 数量
numGoroutines := runtime.NumGoroutine()
fmt.Printf("当前活跃的 goroutine 数量:%d\n", numGoroutines)
utils.ResponseApiOk(c, "OK", q.GetStatus())
}
func getConsumerStatusAll(c igin.IContext) {
q := queue.GetQueues()
utils.ResponseApiOk(c, "OK", q)
}
func setPublisher(c igin.IContext) {
name := c.Param("name")
q := queue.GetQueueByName(name)
if q == nil {
utils.ResponseApiError(c, 400, "error", fmt.Sprintf("%s :queue is not exist", name))
}
ctx := context.Background()
for i := 0; i < 10; i++ {
if err := q.Enqueue(ctx, i); err != nil {
utils.ResponseApiError(c, 400, "error", err)
break
}
}
utils.ResponseApiOk(c, "OK", q.GetStatus())
}
// ListCache 列出所有缓存
func List(c igin.IContext) {
q := queue.GetQueues()
var records []interface{}
for _, item := range q {
records = append(records, item)
}
result := utils.PaginationResult{
Current: 1,
Size: 100,
Total: len(q),
Pages: len(q) / 2,
Records: records,
}
utils.ResponseApiOk(c, "ok", result)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/linqwen/momo.git
git@gitee.com:linqwen/momo.git
linqwen
momo
momo
v1.1.14

搜索帮助