代码拉取完成,页面将自动刷新
package flows
import (
"sync"
"time"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/packetbeat/publish"
)
type worker struct {
wg sync.WaitGroup
done chan struct{}
run func(*worker)
}
type spool struct {
pub publish.Flows
events []common.MapStr
}
func newWorker(fn func(w *worker)) *worker {
return &worker{
done: make(chan struct{}),
run: fn,
}
}
func (w *worker) Start() {
debugf("start flows worker")
w.wg.Add(1)
go func() {
defer w.finished()
w.run(w)
}()
}
func (w *worker) Stop() {
debugf("stop flows worker")
close(w.done)
w.wg.Wait()
debugf("stopped flows worker")
}
func (w *worker) finished() {
w.wg.Done()
logp.Info("flows worker loop stopped")
}
func (w *worker) sleep(d time.Duration) bool {
select {
case <-w.done:
return false
case <-time.After(d):
return true
}
}
func (w *worker) tick(t *time.Ticker) bool {
select {
case <-w.done:
return false
case <-t.C:
return true
}
}
func (w *worker) periodically(tick time.Duration, fn func() error) {
defer debugf("stop periodic loop")
ticker := time.NewTicker(tick)
for {
cont := w.tick(ticker)
if !cont {
return
}
err := fn()
if err != nil {
return
}
}
}
func (s *spool) init(pub publish.Flows, sz int) {
s.pub = pub
s.events = make([]common.MapStr, 0, sz)
}
func (s *spool) publish(event common.MapStr) {
s.events = append(s.events, event)
if len(s.events) == cap(s.events) {
s.flush()
}
}
func (s *spool) flush() {
if len(s.events) == 0 {
return
}
s.pub.PublishFlows(s.events)
s.events = make([]common.MapStr, 0, cap(s.events))
}
func gcd(a, b int64) int64 {
if a < 0 || b < 0 {
return 0
}
switch {
case a == b:
return a
case a == 0:
return b
case b == 0:
return a
}
shift := uint(0)
for (a&1) == 0 && (b&1) == 0 {
shift++
a /= 2
b /= 2
}
for (a & 1) == 0 {
a = a / 2
}
// a is always odd
for {
for (b & 1) == 0 {
b = b / 2
}
// both a and b are odd. guaranteed b >= a
if a > b {
a, b = b, a
}
b -= a
if b == 0 {
break
}
}
// restore common factors of 2
return a << shift
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。