5 Star 12 Fork 2

Gitee 极速下载 / grpc-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/grpc/grpc-go
克隆/下载
balancer_conn_wrappers.go 6.38 KB
一键复制 编辑 原始数据 按行查看 历史
Menghan Li 提交于 2017-11-13 13:44 . Eliminate race on ac.acbw (#1665)
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"sync"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
)
// scStateUpdate contains the subConn and the new state it changed to.
type scStateUpdate struct {
sc balancer.SubConn
state connectivity.State
}
// scStateUpdateBuffer is an unbounded channel for scStateChangeTuple.
// TODO make a general purpose buffer that uses interface{}.
type scStateUpdateBuffer struct {
c chan *scStateUpdate
mu sync.Mutex
backlog []*scStateUpdate
}
func newSCStateUpdateBuffer() *scStateUpdateBuffer {
return &scStateUpdateBuffer{
c: make(chan *scStateUpdate, 1),
}
}
func (b *scStateUpdateBuffer) put(t *scStateUpdate) {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.backlog) == 0 {
select {
case b.c <- t:
return
default:
}
}
b.backlog = append(b.backlog, t)
}
func (b *scStateUpdateBuffer) load() {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.backlog) > 0 {
select {
case b.c <- b.backlog[0]:
b.backlog[0] = nil
b.backlog = b.backlog[1:]
default:
}
}
}
// get returns the channel that receives a recvMsg in the buffer.
//
// Upon receiving, the caller should call load to send another
// scStateChangeTuple onto the channel if there is any.
func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate {
return b.c
}
// resolverUpdate contains the new resolved addresses or error if there's
// any.
type resolverUpdate struct {
addrs []resolver.Address
err error
}
// ccBalancerWrapper is a wrapper on top of cc for balancers.
// It implements balancer.ClientConn interface.
type ccBalancerWrapper struct {
cc *ClientConn
balancer balancer.Balancer
stateChangeQueue *scStateUpdateBuffer
resolverUpdateCh chan *resolverUpdate
done chan struct{}
}
func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
ccb := &ccBalancerWrapper{
cc: cc,
stateChangeQueue: newSCStateUpdateBuffer(),
resolverUpdateCh: make(chan *resolverUpdate, 1),
done: make(chan struct{}),
}
go ccb.watcher()
ccb.balancer = b.Build(ccb, bopts)
return ccb
}
// watcher balancer functions sequencially, so the balancer can be implemeneted
// lock-free.
func (ccb *ccBalancerWrapper) watcher() {
for {
select {
case t := <-ccb.stateChangeQueue.get():
ccb.stateChangeQueue.load()
ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
case t := <-ccb.resolverUpdateCh:
ccb.balancer.HandleResolvedAddrs(t.addrs, t.err)
case <-ccb.done:
}
select {
case <-ccb.done:
ccb.balancer.Close()
return
default:
}
}
}
func (ccb *ccBalancerWrapper) close() {
close(ccb.done)
}
func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
// When updating addresses for a SubConn, if the address in use is not in
// the new addresses, the old ac will be tearDown() and a new ac will be
// created. tearDown() generates a state change with Shutdown state, we
// don't want the balancer to receive this state change. So before
// tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and
// this function will be called with (nil, Shutdown). We don't need to call
// balancer method in this case.
if sc == nil {
return
}
ccb.stateChangeQueue.put(&scStateUpdate{
sc: sc,
state: s,
})
}
func (ccb *ccBalancerWrapper) handleResolvedAddrs(addrs []resolver.Address, err error) {
select {
case <-ccb.resolverUpdateCh:
default:
}
ccb.resolverUpdateCh <- &resolverUpdate{
addrs: addrs,
err: err,
}
}
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
grpclog.Infof("ccBalancerWrapper: new subconn: %v", addrs)
ac, err := ccb.cc.newAddrConn(addrs)
if err != nil {
return nil, err
}
acbw := &acBalancerWrapper{ac: ac}
ac.mu.Lock()
ac.acbw = acbw
ac.mu.Unlock()
return acbw, nil
}
func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
grpclog.Infof("ccBalancerWrapper: removing subconn")
acbw, ok := sc.(*acBalancerWrapper)
if !ok {
return
}
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
}
func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
grpclog.Infof("ccBalancerWrapper: updating state and picker called by balancer: %v, %p", s, p)
ccb.cc.csMgr.updateState(s)
ccb.cc.blockingpicker.updatePicker(p)
}
func (ccb *ccBalancerWrapper) Target() string {
return ccb.cc.target
}
// acBalancerWrapper is a wrapper on top of ac for balancers.
// It implements balancer.SubConn interface.
type acBalancerWrapper struct {
mu sync.Mutex
ac *addrConn
}
func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
grpclog.Infof("acBalancerWrapper: UpdateAddresses called with %v", addrs)
acbw.mu.Lock()
defer acbw.mu.Unlock()
if !acbw.ac.tryUpdateAddrs(addrs) {
cc := acbw.ac.cc
acbw.ac.mu.Lock()
// Set old ac.acbw to nil so the Shutdown state update will be ignored
// by balancer.
//
// TODO(bar) the state transition could be wrong when tearDown() old ac
// and creating new ac, fix the transition.
acbw.ac.acbw = nil
acbw.ac.mu.Unlock()
acState := acbw.ac.getState()
acbw.ac.tearDown(errConnDrain)
if acState == connectivity.Shutdown {
return
}
ac, err := cc.newAddrConn(addrs)
if err != nil {
grpclog.Warningf("acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err)
return
}
acbw.ac = ac
ac.mu.Lock()
ac.acbw = acbw
ac.mu.Unlock()
if acState != connectivity.Idle {
ac.connect(false)
}
}
}
func (acbw *acBalancerWrapper) Connect() {
acbw.mu.Lock()
defer acbw.mu.Unlock()
acbw.ac.connect(false)
}
func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
acbw.mu.Lock()
defer acbw.mu.Unlock()
return acbw.ac
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mirrors/grpc-go.git
git@gitee.com:mirrors/grpc-go.git
mirrors
grpc-go
grpc-go
v1.7.5

搜索帮助

344bd9b3 5694891 D2dac590 5694891