代码拉取完成,页面将自动刷新
// The MIT License (MIT)
//
// Copyright (c) 2015 xtaci
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package kcp
import (
"container/heap"
"runtime"
"sync"
"time"
)
// SystemTimedSched is the library level timed-scheduler
var SystemTimedSched *TimedSched = NewTimedSched(runtime.NumCPU())
type timedFunc struct {
execute func()
ts time.Time
}
// a heap for sorted timed function
type timedFuncHeap []timedFunc
func (h timedFuncHeap) Len() int { return len(h) }
func (h timedFuncHeap) Less(i, j int) bool { return h[i].ts.Before(h[j].ts) }
func (h timedFuncHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *timedFuncHeap) Push(x interface{}) { *h = append(*h, x.(timedFunc)) }
func (h *timedFuncHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
old[n-1].execute = nil // avoid memory leak
*h = old[0 : n-1]
return x
}
// TimedSched represents the control struct for timed parallel scheduler
type TimedSched struct {
// prepending tasks
prependTasks []timedFunc
prependLock sync.Mutex
chPrependNotify chan struct{}
// tasks will be distributed through chTask
chTask chan timedFunc
dieOnce sync.Once
die chan struct{}
}
// NewTimedSched creates a parallel-scheduler with given parallelization
func NewTimedSched(parallel int) *TimedSched {
ts := new(TimedSched)
ts.chTask = make(chan timedFunc)
ts.die = make(chan struct{})
ts.chPrependNotify = make(chan struct{}, 1)
for i := 0; i < parallel; i++ {
go ts.sched()
}
go ts.prepend()
return ts
}
// sched is a goroutine to schedule and execute timed tasks.
func (ts *TimedSched) sched() {
timer := time.NewTimer(0)
defer timer.Stop()
var tasks timedFuncHeap
drained := false
for {
select {
case task := <-ts.chTask:
now := time.Now()
if now.After(task.ts) {
// already delayed! execute immediately
task.execute()
} else {
heap.Push(&tasks, task)
// properly reset timer to trigger based on the top element
stopped := timer.Stop()
if !stopped && !drained {
<-timer.C
}
timer.Reset(tasks[0].ts.Sub(now))
drained = false
}
case now := <-timer.C:
drained = true
for tasks.Len() > 0 {
if now.After(tasks[0].ts) {
heap.Pop(&tasks).(timedFunc).execute()
} else {
timer.Reset(tasks[0].ts.Sub(now))
drained = false
break
}
}
case <-ts.die:
return
}
}
}
// prepend is the front desk goroutine to register tasks
func (ts *TimedSched) prepend() {
var tasks []timedFunc
for {
select {
case <-ts.chPrependNotify:
ts.prependLock.Lock()
// keep cap to reuse slice
if cap(tasks) < cap(ts.prependTasks) {
tasks = make([]timedFunc, 0, cap(ts.prependTasks))
}
tasks = tasks[:len(ts.prependTasks)]
copy(tasks, ts.prependTasks)
for k := range ts.prependTasks {
ts.prependTasks[k].execute = nil // avoid memory leak
}
ts.prependTasks = ts.prependTasks[:0]
ts.prependLock.Unlock()
for k := range tasks {
select {
case ts.chTask <- tasks[k]:
tasks[k].execute = nil // avoid memory leak
case <-ts.die:
return
}
}
tasks = tasks[:0]
case <-ts.die:
return
}
}
}
// Put a function 'f' awaiting to be executed at 'deadline'
func (ts *TimedSched) Put(f func(), deadline time.Time) {
ts.prependLock.Lock()
ts.prependTasks = append(ts.prependTasks, timedFunc{f, deadline})
ts.prependLock.Unlock()
select {
case ts.chPrependNotify <- struct{}{}:
default:
}
}
// Close terminates this scheduler
func (ts *TimedSched) Close() { ts.dieOnce.Do(func() { close(ts.die) }) }
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。