1 Star 0 Fork 0

zhangjungang/beats

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
watcher.go 7.11 KB
一键复制 编辑 原始数据 按行查看 历史
package kubernetes
import (
"context"
"errors"
"io"
"sync"
"time"
"github.com/elastic/beats/libbeat/common/bus"
"github.com/elastic/beats/libbeat/logp"
"github.com/ericchiang/k8s"
corev1 "github.com/ericchiang/k8s/api/v1"
)
// Max back off time for retries
const maxBackoff = 30 * time.Second
// Watcher reads Kubernetes events and keeps a list of known pods
type Watcher interface {
// Start watching Kubernetes API for new containers
Start() error
// Stop watching Kubernetes API for new containers
Stop()
// ListenStart returns a bus listener to receive pod started events, with a `pod` key holding it
ListenStart() bus.Listener
// ListenUpdate returns a bus listener to receive pod updated events, with a `pod` key holding it
ListenUpdate() bus.Listener
// ListenStop returns a bus listener to receive pod stopped events, with a `pod` key holding it
ListenStop() bus.Listener
}
type podWatcher struct {
sync.RWMutex
client Client
syncPeriod time.Duration
cleanupTimeout time.Duration
nodeFilter k8s.Option
lastResourceVersion string
ctx context.Context
stop context.CancelFunc
bus bus.Bus
pods map[string]*Pod // pod id -> Pod
deleted map[string]time.Time // deleted annotations key -> last access time
}
// Client for Kubernetes interface
type Client interface {
ListPods(ctx context.Context, namespace string, options ...k8s.Option) (*corev1.PodList, error)
WatchPods(ctx context.Context, namespace string, options ...k8s.Option) (*k8s.CoreV1PodWatcher, error)
}
// NewWatcher initializes the watcher client to provide a local state of
// pods from the cluster (filtered to the given host)
func NewWatcher(client Client, syncPeriod, cleanupTimeout time.Duration, host string) Watcher {
ctx, cancel := context.WithCancel(context.Background())
return &podWatcher{
client: client,
cleanupTimeout: cleanupTimeout,
syncPeriod: syncPeriod,
nodeFilter: k8s.QueryParam("fieldSelector", "spec.nodeName="+host),
lastResourceVersion: "0",
ctx: ctx,
stop: cancel,
pods: make(map[string]*Pod),
deleted: make(map[string]time.Time),
bus: bus.New("kubernetes"),
}
}
func (p *podWatcher) syncPods() error {
logp.Info("kubernetes: %s", "Performing a pod sync")
pods, err := p.client.ListPods(
p.ctx,
"",
p.nodeFilter,
k8s.ResourceVersion(p.lastResourceVersion))
if err != nil {
return err
}
p.Lock()
for _, apiPod := range pods.Items {
pod := GetPod(apiPod)
p.pods[pod.Metadata.UID] = pod
}
p.Unlock()
// Emit all start events (avoid blocking if the bus get's blocked)
go func() {
for _, pod := range p.pods {
p.bus.Publish(bus.Event{
"start": true,
"pod": pod,
})
}
}()
// Store last version
p.lastResourceVersion = pods.Metadata.GetResourceVersion()
logp.Info("kubernetes: %s", "Pod sync done")
return nil
}
// Start watching pods
func (p *podWatcher) Start() error {
// Make sure that events don't flow into the annotator before informer is fully set up
// Sync initial state:
synced := make(chan struct{})
go func() {
p.syncPods()
close(synced)
}()
select {
case <-time.After(p.syncPeriod):
p.Stop()
return errors.New("Timeout while doing initial Kubernetes pods sync")
case <-synced:
// Watch for new changes
go p.watch()
go p.cleanupWorker()
return nil
}
}
func (p *podWatcher) watch() {
// Failures counter, do exponential backoff on retries
var failures uint
for {
logp.Info("kubernetes: %s", "Watching API for pod events")
watcher, err := p.client.WatchPods(p.ctx, "", p.nodeFilter, k8s.ResourceVersion(p.lastResourceVersion))
if err != nil {
//watch pod failures should be logged and gracefully failed over as metadata retrieval
//should never stop.
logp.Err("kubernetes: Watching API error %v", err)
backoff(failures)
failures++
continue
}
for {
_, apiPod, err := watcher.Next()
if err != nil {
logp.Err("kubernetes: Watching API error %v", err)
watcher.Close()
if !(err == io.EOF || err == io.ErrUnexpectedEOF) {
// This is an error event which can be recovered by moving to the latest resource verison
logp.Info("kubernetes: Ignoring event, moving to most recent resource version")
p.lastResourceVersion = ""
}
break
}
// Update last resource version and reset failure counter
p.lastResourceVersion = apiPod.Metadata.GetResourceVersion()
failures = 0
pod := GetPod(apiPod)
if pod.Metadata.DeletionTimestamp != "" {
// Pod deleted
p.Lock()
p.deleted[pod.Metadata.UID] = time.Now()
p.Unlock()
} else {
if p.Pod(pod.Metadata.UID) != nil {
// Pod updated
p.Lock()
p.pods[pod.Metadata.UID] = pod
// un-delete if it's flagged (in case of update or recreation)
delete(p.deleted, pod.Metadata.UID)
p.Unlock()
p.bus.Publish(bus.Event{
"update": true,
"pod": pod,
})
} else {
// Pod added
p.Lock()
p.pods[pod.Metadata.UID] = pod
// un-delete if it's flagged (in case of update or recreation)
delete(p.deleted, pod.Metadata.UID)
p.Unlock()
p.bus.Publish(bus.Event{
"start": true,
"pod": pod,
})
}
}
}
}
}
func backoff(failures uint) {
wait := 1 << failures * time.Second
if wait > maxBackoff {
wait = maxBackoff
}
time.Sleep(wait)
}
// Check annotations flagged as deleted for their last access time, fully delete
// the ones older than p.cleanupTimeout
func (p *podWatcher) cleanupWorker() {
for {
// Wait a full period
time.Sleep(p.cleanupTimeout)
select {
case <-p.ctx.Done():
return
default:
// Check entries for timeout
var toDelete []string
timeout := time.Now().Add(-p.cleanupTimeout)
p.RLock()
for key, lastSeen := range p.deleted {
if lastSeen.Before(timeout) {
logp.Debug("kubernetes", "Removing container %s after cool down timeout", key)
toDelete = append(toDelete, key)
}
}
p.RUnlock()
// Delete timed out entries:
for _, key := range toDelete {
p.bus.Publish(bus.Event{
"stop": true,
"pod": p.Pod(key),
})
}
p.Lock()
for _, key := range toDelete {
delete(p.deleted, key)
delete(p.pods, key)
}
p.Unlock()
}
}
}
func (p *podWatcher) Pod(uid string) *Pod {
p.RLock()
pod := p.pods[uid]
_, deleted := p.deleted[uid]
p.RUnlock()
// Update deleted last access
if deleted {
p.Lock()
p.deleted[uid] = time.Now()
p.Unlock()
}
return pod
}
// ListenStart returns a bus listener to receive pod started events, with a `pod` key holding it
func (p *podWatcher) ListenStart() bus.Listener {
return p.bus.Subscribe("start")
}
// ListenStop returns a bus listener to receive pod stopped events, with a `pod` key holding it
func (p *podWatcher) ListenStop() bus.Listener {
return p.bus.Subscribe("stop")
}
// ListenUpdate returns a bus listener to receive updated pod events, with a `pod` key holding it
func (p *podWatcher) ListenUpdate() bus.Listener {
return p.bus.Subscribe("update")
}
func (p *podWatcher) Stop() {
p.stop()
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zhangjungang/beats.git
git@gitee.com:zhangjungang/beats.git
zhangjungang
beats
beats
v6.2.3

搜索帮助