1 Star 0 Fork 0

邢楠 / toolbox

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
routinepool.go 2.96 KB
一键复制 编辑 原始数据 按行查看 历史
邢楠 提交于 2021-05-08 17:14 . # 148
package com
/**
* @Author : xingnan
* @Email : 457415936@qq.com
* @Time : 17:10
* @File : routinepool.go
* @Project : toolbox
* @Description : 并发调度
*/
import "sync"
// Gorouting instance which can accept client jobs
type worker struct {
workerPool chan *worker
jobChannel chan Job
stop chan struct{}
}
func (w *worker) start() {
go func() {
var job Job
for {
// worker free, add it to pool
w.workerPool <- w
select {
case job = <-w.jobChannel:
job()
case <-w.stop:
w.stop <- struct{}{}
return
}
}
}()
}
func newWorker(pool chan *worker) *worker {
return &worker{
workerPool: pool,
jobChannel: make(chan Job),
stop: make(chan struct{}),
}
}
// Accepts jobs from clients, and waits for first free worker to deliver job
type dispatcher struct {
workerPool chan *worker
jobQueue chan Job
stop chan struct{}
}
func (d *dispatcher) dispatch() {
for {
select {
case job := <-d.jobQueue:
worker := <-d.workerPool
worker.jobChannel <- job
case <-d.stop:
for i := 0; i < cap(d.workerPool); i++ {
worker := <-d.workerPool
worker.stop <- struct{}{}
<-worker.stop
}
d.stop <- struct{}{}
return
}
}
}
func newDispatcher(workerPool chan *worker, jobQueue chan Job) *dispatcher {
d := &dispatcher{
workerPool: workerPool,
jobQueue: jobQueue,
stop: make(chan struct{}),
}
for i := 0; i < cap(d.workerPool); i++ {
worker := newWorker(d.workerPool)
worker.start()
}
go d.dispatch()
return d
}
// Job Represents user request, function which should be executed in some worker.
type Job func()
// RoutinePool todo
type RoutinePool struct {
JobQueue chan Job
dispatcher *dispatcher
wg sync.WaitGroup
NumberWorkers int
}
// NewRoutinePool Will make pool of gorouting workers.
// numWorkers - how many workers will be created for this pool
// queueLen - how many jobs can we accept until we block
//
// Returned object contains JobQueue reference, which you can use to send job to pool.
func NewRoutinePool(numWorkers int, jobQueueLen int) *RoutinePool {
jobQueue := make(chan Job, jobQueueLen)
workerPool := make(chan *worker, numWorkers)
pool := &RoutinePool{
JobQueue: jobQueue,
dispatcher: newDispatcher(workerPool, jobQueue),
NumberWorkers: numWorkers,
}
return pool
}
// JobDone In case you are using WaitAll fn, you should call this method
// every time your job is done.
//
// If you are not using WaitAll then we assume you have your own way of synchronizing.
func (p *RoutinePool) JobDone() {
p.wg.Done()
}
// WaitCount How many jobs we should wait when calling WaitAll.
// It is using WaitGroup Add/Done/Wait
func (p *RoutinePool) WaitCount(count int) {
p.wg.Add(count)
}
// WaitAll Will wait for all jobs to finish.
func (p *RoutinePool) WaitAll() {
p.wg.Wait()
}
// Release Will release resources used by pool
func (p *RoutinePool) Release() {
p.dispatcher.stop <- struct{}{}
<-p.dispatcher.stop
}
Go
1
https://gitee.com/xingnan/toolbox.git
git@gitee.com:xingnan/toolbox.git
xingnan
toolbox
toolbox
master

搜索帮助