1 Star 0 Fork 0

0x43/dubbo-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
balancer_priority.go 12.77 KB
一键复制 编辑 原始数据 按行查看 历史
0x43 提交于 2024-12-28 00:42 . first commit
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
*
* Copyright 2021 gRPC authors.
*
*/
package priority
import (
"errors"
"time"
)
import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
)
var (
// ErrAllPrioritiesRemoved is returned by the picker when there's no priority available.
ErrAllPrioritiesRemoved = errors.New("no priority is provided, all priorities are removed")
// DefaultPriorityInitTimeout is the timeout after which if a priority is
// not READY, the next will be started. It's exported to be overridden by
// tests.
DefaultPriorityInitTimeout = 10 * time.Second
)
// syncPriority handles priority after a config update. It makes sure the
// balancer state (started or not) is in sync with the priorities (even in
// tricky cases where a child is moved from a priority to another).
//
// It's guaranteed that after this function returns:
// - If some child is READY, it is childInUse, and all lower priorities are
// closed.
// - If some child is newly started(in Connecting for the first time), it is
// childInUse, and all lower priorities are closed.
// - Otherwise, the lowest priority is childInUse (none of the children is
// ready, and the overall state is not ready).
//
// Steps:
// - If all priorities were deleted, unset childInUse (to an empty string), and
// set parent ClientConn to TransientFailure
// - Otherwise, Scan all children from p0, and check balancer stats:
// - For any of the following cases:
// - If balancer is not started (not built), this is either a new child
// with high priority, or a new builder for an existing child.
// - If balancer is READY
// - If this is the lowest priority
// - do the following:
// - if this is not the old childInUse, override picker so old picker is no
// longer used.
// - switch to it (because all higher priorities are neither new or Ready)
// - forward the new addresses and config
//
// Caller must hold b.mu.
func (b *priorityBalancer) syncPriority() {
// Everything was removed by the update.
if len(b.priorities) == 0 {
b.childInUse = ""
b.priorityInUse = 0
// Stop the init timer. This can happen if the only priority is removed
// shortly after it's added.
b.stopPriorityInitTimer()
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: base.NewErrPicker(ErrAllPrioritiesRemoved),
})
return
}
for p, name := range b.priorities {
child, ok := b.children[name]
if !ok {
b.logger.Errorf("child with name %q is not found in children", name)
continue
}
if !child.started ||
child.state.ConnectivityState == connectivity.Ready ||
child.state.ConnectivityState == connectivity.Idle ||
p == len(b.priorities)-1 {
if b.childInUse != "" && b.childInUse != child.name {
// childInUse was set and is different from this child, will
// change childInUse later. We need to update picker here
// immediately so parent stops using the old picker.
b.cc.UpdateState(child.state)
}
b.logger.Infof("switching to (%q, %v) in syncPriority", child.name, p)
b.switchToChild(child, p)
child.sendUpdate()
break
}
}
}
// Stop priorities [p+1, lowest].
//
// Caller must hold b.mu.
func (b *priorityBalancer) stopSubBalancersLowerThanPriority(p int) {
for i := p + 1; i < len(b.priorities); i++ {
name := b.priorities[i]
child, ok := b.children[name]
if !ok {
b.logger.Errorf("child with name %q is not found in children", name)
continue
}
child.stop()
}
}
// switchToChild does the following:
// - stop all child with lower priorities
// - if childInUse is not this child
// - set childInUse to this child
// - stops init timer
// - if this child is not started, start it, and start a init timer
//
// Note that it does NOT send the current child state (picker) to the parent
// ClientConn. The caller needs to send it if necessary.
//
// this can be called when
// 1. first update, start p0
// 2. an update moves a READY child from a lower priority to higher
// 2. a different builder is updated for this child
// 3. a high priority goes Failure, start next
// 4. a high priority init timeout, start next
//
// Caller must hold b.mu.
func (b *priorityBalancer) switchToChild(child *childBalancer, priority int) {
// Stop lower priorities even if childInUse is same as this child. It's
// possible this child was moved from a priority to another.
b.stopSubBalancersLowerThanPriority(priority)
// If this child is already in use, do nothing.
//
// This can happen:
// - all priorities are not READY, an config update always triggers switch
// to the lowest. In this case, the lowest child could still be connecting,
// so we don't stop the init timer.
// - a high priority is READY, an config update always triggers switch to
// it.
if b.childInUse == child.name && child.started {
return
}
b.childInUse = child.name
b.priorityInUse = priority
// Init timer is always for childInUse. Since we are switching to a
// different child, we will stop the init timer no matter what. If this
// child is not started, we will start the init timer later.
b.stopPriorityInitTimer()
if !child.started {
child.start()
// Need this local variable to capture timerW in the AfterFunc closure
// to check the stopped boolean.
timerW := &timerWrapper{}
b.priorityInitTimer = timerW
timerW.timer = time.AfterFunc(DefaultPriorityInitTimeout, func() {
b.mu.Lock()
defer b.mu.Unlock()
if timerW.stopped {
return
}
b.priorityInitTimer = nil
// Switch to the next priority if there's any.
if pNext := priority + 1; pNext < len(b.priorities) {
nameNext := b.priorities[pNext]
if childNext, ok := b.children[nameNext]; ok {
b.switchToChild(childNext, pNext)
childNext.sendUpdate()
}
}
})
}
}
// handleChildStateUpdate start/close priorities based on the connectivity
// state.
func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.State) {
b.mu.Lock()
defer b.mu.Unlock()
if b.done.HasFired() {
return
}
priority, ok := b.childToPriority[childName]
if !ok {
b.logger.Errorf("priority: received picker update with unknown child %v", childName)
return
}
if b.childInUse == "" {
b.logger.Errorf("priority: no child is in use when picker update is received")
return
}
// priorityInUse is higher than this priority.
if b.priorityInUse < priority {
// Lower priorities should all be closed, this is an unexpected update.
// Can happen if the child policy sends an update after we tell it to
// close.
b.logger.Warnf("priority: received picker update from priority %v, lower than priority in use %v", priority, b.priorityInUse)
return
}
// Update state in child. The updated picker will be sent to parent later if
// necessary.
child, ok := b.children[childName]
if !ok {
b.logger.Errorf("priority: child balancer not found for child %v, priority %v", childName, priority)
return
}
oldState := child.state.ConnectivityState
child.state = s
switch s.ConnectivityState {
case connectivity.Ready, connectivity.Idle:
// Note that idle is also handled as if it's Ready. It will close the
// lower priorities (which will be kept in a cache, not deleted), and
// new picks will use the Idle picker.
b.handlePriorityWithNewStateReady(child, priority)
case connectivity.TransientFailure:
b.handlePriorityWithNewStateTransientFailure(child, priority)
case connectivity.Connecting:
b.handlePriorityWithNewStateConnecting(child, priority, oldState)
default:
// New state is Shutdown, should never happen. Don't forward.
}
}
// handlePriorityWithNewStateReady handles state Ready from a higher or equal
// priority.
//
// An update with state Ready:
// - If it's from higher priority:
// - Switch to this priority
// - Forward the update
//
// - If it's from priorityInUse:
// - Forward only
//
// Caller must make sure priorityInUse is not higher than priority.
//
// Caller must hold mu.
func (b *priorityBalancer) handlePriorityWithNewStateReady(child *childBalancer, priority int) {
// If one priority higher or equal to priorityInUse goes Ready, stop the
// init timer. If update is from higher than priorityInUse, priorityInUse
// will be closed, and the init timer will become useless.
b.stopPriorityInitTimer()
// priorityInUse is lower than this priority, switch to this.
if b.priorityInUse > priority {
b.logger.Infof("Switching priority from %v to %v, because latter became Ready", b.priorityInUse, priority)
b.switchToChild(child, priority)
}
// Forward the update since it's READY.
b.cc.UpdateState(child.state)
}
// handlePriorityWithNewStateTransientFailure handles state TransientFailure
// from a higher or equal priority.
//
// An update with state TransientFailure:
// - If it's from a higher priority:
// - Do not forward, and do nothing
//
// - If it's from priorityInUse:
// - If there's no lower:
// - Forward and do nothing else
// - If there's a lower priority:
// - Switch to the lower
// - Forward the lower child's state
// - Do NOT forward this update
//
// Caller must make sure priorityInUse is not higher than priority.
//
// Caller must hold mu.
func (b *priorityBalancer) handlePriorityWithNewStateTransientFailure(child *childBalancer, priority int) {
// priorityInUse is lower than this priority, do nothing.
if b.priorityInUse > priority {
return
}
// priorityInUse sends a failure. Stop its init timer.
b.stopPriorityInitTimer()
priorityNext := priority + 1
if priorityNext >= len(b.priorities) {
// Forward this update.
b.cc.UpdateState(child.state)
return
}
b.logger.Infof("Switching priority from %v to %v, because former became TransientFailure", priority, priorityNext)
nameNext := b.priorities[priorityNext]
childNext := b.children[nameNext]
b.switchToChild(childNext, priorityNext)
b.cc.UpdateState(childNext.state)
childNext.sendUpdate()
}
// handlePriorityWithNewStateConnecting handles state Connecting from a higher
// than or equal priority.
//
// An update with state Connecting:
// - If it's from a higher priority
// - Do nothing
//
// - If it's from priorityInUse, the behavior depends on previous state.
//
// When new state is Connecting, the behavior depends on previous state. If the
// previous state was Ready, this is a transition out from Ready to Connecting.
// Assuming there are multiple backends in the same priority, this mean we are
// in a bad situation and we should failover to the next priority (Side note:
// the current connectivity state aggregating algorithm (e.g. round-robin) is
// not handling this right, because if many backends all go from Ready to
// Connecting, the overall situation is more like TransientFailure, not
// Connecting).
//
// If the previous state was Idle, we don't do anything special with failure,
// and simply forward the update. The init timer should be in process, will
// handle failover if it timeouts. If the previous state was TransientFailure,
// we do not forward, because the lower priority is in use.
//
// Caller must make sure priorityInUse is not higher than priority.
//
// Caller must hold mu.
func (b *priorityBalancer) handlePriorityWithNewStateConnecting(child *childBalancer, priority int, oldState connectivity.State) {
// priorityInUse is lower than this priority, do nothing.
if b.priorityInUse > priority {
return
}
switch oldState {
case connectivity.Ready:
// Handling transition from Ready to Connecting, is same as handling
// TransientFailure. There's no need to stop the init timer, because it
// should have been stopped when state turned Ready.
priorityNext := priority + 1
if priorityNext >= len(b.priorities) {
// Forward this update.
b.cc.UpdateState(child.state)
return
}
b.logger.Infof("Switching priority from %v to %v, because former became TransientFailure", priority, priorityNext)
nameNext := b.priorities[priorityNext]
childNext := b.children[nameNext]
b.switchToChild(childNext, priorityNext)
b.cc.UpdateState(childNext.state)
childNext.sendUpdate()
case connectivity.Idle:
b.cc.UpdateState(child.state)
default:
// Old state is Connecting, TransientFailure or Shutdown. Don't forward.
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/git4chen/dubbo-go.git
git@gitee.com:git4chen/dubbo-go.git
git4chen
dubbo-go
dubbo-go
v1.0.0

搜索帮助