1 Star 0 Fork 0

heliubei/etcd-package

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
watch_broadcasts.go 3.06 KB
一键复制 编辑 原始数据 按行查看 历史
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcproxy
import (
"sync"
)
type watchBroadcasts struct {
wp *watchProxy
// mu protects bcasts and watchers from the coalesce loop.
mu sync.Mutex
bcasts map[*watchBroadcast]struct{}
watchers map[*watcher]*watchBroadcast
updatec chan *watchBroadcast
donec chan struct{}
}
// maxCoalesceRecievers prevents a popular watchBroadcast from being coalseced.
const maxCoalesceReceivers = 5
func newWatchBroadcasts(wp *watchProxy) *watchBroadcasts {
wbs := &watchBroadcasts{
wp: wp,
bcasts: make(map[*watchBroadcast]struct{}),
watchers: make(map[*watcher]*watchBroadcast),
updatec: make(chan *watchBroadcast, 1),
donec: make(chan struct{}),
}
go func() {
defer close(wbs.donec)
for wb := range wbs.updatec {
wbs.coalesce(wb)
}
}()
return wbs
}
func (wbs *watchBroadcasts) coalesce(wb *watchBroadcast) {
if wb.size() >= maxCoalesceReceivers {
return
}
wbs.mu.Lock()
for wbswb := range wbs.bcasts {
if wbswb == wb {
continue
}
wb.mu.Lock()
wbswb.mu.Lock()
// 1. check if wbswb is behind wb so it won't skip any events in wb
// 2. ensure wbswb started; nextrev == 0 may mean wbswb is waiting
// for a current watcher and expects a create event from the server.
if wb.nextrev >= wbswb.nextrev && wbswb.responses > 0 {
for w := range wb.receivers {
wbswb.receivers[w] = struct{}{}
wbs.watchers[w] = wbswb
}
wb.receivers = nil
}
wbswb.mu.Unlock()
wb.mu.Unlock()
if wb.empty() {
delete(wbs.bcasts, wb)
wb.stop()
break
}
}
wbs.mu.Unlock()
}
func (wbs *watchBroadcasts) add(w *watcher) {
wbs.mu.Lock()
defer wbs.mu.Unlock()
// find fitting bcast
for wb := range wbs.bcasts {
if wb.add(w) {
wbs.watchers[w] = wb
return
}
}
// no fit; create a bcast
wb := newWatchBroadcast(wbs.wp, w, wbs.update)
wbs.watchers[w] = wb
wbs.bcasts[wb] = struct{}{}
}
// delete removes a watcher and returns the number of remaining watchers.
func (wbs *watchBroadcasts) delete(w *watcher) int {
wbs.mu.Lock()
defer wbs.mu.Unlock()
wb, ok := wbs.watchers[w]
if !ok {
panic("deleting missing watcher from broadcasts")
}
delete(wbs.watchers, w)
wb.delete(w)
if wb.empty() {
delete(wbs.bcasts, wb)
wb.stop()
}
return len(wbs.bcasts)
}
func (wbs *watchBroadcasts) stop() {
wbs.mu.Lock()
for wb := range wbs.bcasts {
wb.stop()
}
wbs.bcasts = nil
close(wbs.updatec)
wbs.mu.Unlock()
<-wbs.donec
}
func (wbs *watchBroadcasts) update(wb *watchBroadcast) {
select {
case wbs.updatec <- wb:
default:
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/heliubei/etcd-package.git
git@gitee.com:heliubei/etcd-package.git
heliubei
etcd-package
etcd-package
v3.3.16

搜索帮助