代码拉取完成,页面将自动刷新
package mtask
import (
"gitee.com/dennis-mxx/mxx-core-v2/mlogger"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
type Worker interface {
Run(chainId int)
}
type WorkerChain struct {
num int
chain chan func(chainId int)
chainStat []int
wg sync.WaitGroup
stat bool
catch func(err any)
}
func NewWorker(num int, catch func(err any)) *WorkerChain {
if catch == nil {
catch = func(err any) {
mlogger.Out.Error("WorkerChain DoRunner fail >>> ", err)
}
}
workChain := &WorkerChain{
num: num,
chainStat: make([]int, num),
chain: make(chan func(chainId int), 0),
wg: sync.WaitGroup{},
stat: false,
catch: catch,
}
workChain.Start()
return workChain
}
func (ce *WorkerChain) Start() {
ce.stat = true
for i := 0; i < ce.num; i++ {
ce.start(i)
}
}
func (ce *WorkerChain) start(index int) {
go func() {
for {
if !ce.stat {
break
}
ce.doRunner(index)
}
}()
}
func (ce *WorkerChain) doRunner(index int) {
defer func() {
if err := recover(); err != nil {
ce.catch(err)
}
}()
fn := <-ce.chain
if fn != nil {
ce.chainStat[index] = 1
defer func() {
ce.chainStat[index] = 0
}()
fn(index)
}
}
func (ce *WorkerChain) Stop() {
ce.stat = false
if ce.catch != nil {
close(ce.chain)
}
}
func (ce *WorkerChain) Submit(fn func(chainId int)) {
go func() {
ce.chain <- fn
}()
}
func (ce *WorkerChain) SubmitWorker(fn Worker) {
go func() {
ce.chain <- fn.Run
}()
}
func (ce *WorkerChain) Await() {
for {
if ce.IsEmpty() {
time.Sleep(500 * time.Millisecond)
if ce.IsEmpty() {
time.Sleep(500 * time.Millisecond)
if ce.IsEmpty() {
time.Sleep(500 * time.Millisecond)
if ce.IsEmpty() {
return
}
}
}
}
time.Sleep(3 * time.Second)
}
}
func (ce *WorkerChain) IsEmpty() bool {
empty := true
for i := range ce.chainStat {
if ce.chainStat[i] == 1 {
empty = false
break
}
}
return empty
}
func (ce *WorkerChain) Block() {
sig := make(chan os.Signal, 2)
signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT)
<-sig
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。