1 Star 0 Fork 0

h79/goutils

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
resolver_conn_wrapper.go 2.77 KB
一键复制 编辑 原始数据 按行查看 历史
package client
import (
"gitee.com/h79/goutils/common/system"
"gitee.com/h79/goutils/thrift/resolver"
"sync"
"time"
)
type ccResolverWrapper struct {
cc *Client
resolverMu sync.Mutex
resolver resolver.Resolver
curState resolver.State
done *system.Event
pollingMu sync.Mutex
polling chan struct{}
}
// newCCResolverWrapper uses the resolver.Builder to build a Resolver and
// returns a ccResolverWrapper object which wraps the newly built resolver.
func newCCResolverWrapper(cc *Client, rb resolver.Builder) (*ccResolverWrapper, error) {
ccr := &ccResolverWrapper{
cc: cc,
done: system.NewEvent(),
}
var err error
// We need to hold the lock here while we assign to the ccr.resolver field
// to guard against a data race caused by the following code path,
// rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up
// accessing ccr.resolver which is being assigned here.
ccr.resolverMu.Lock()
defer ccr.resolverMu.Unlock()
ccr.resolver, err = rb.Build(cc.parsedTarget, ccr)
if err != nil {
return nil, err
}
return ccr, nil
}
func (ccr *ccResolverWrapper) resolveNow() {
ccr.resolverMu.Lock()
if !ccr.done.HasFired() {
ccr.resolver.ResolveNow()
}
ccr.resolverMu.Unlock()
}
func (ccr *ccResolverWrapper) close() {
ccr.resolverMu.Lock()
ccr.resolver.Close()
ccr.done.Fire()
ccr.resolverMu.Unlock()
}
func (ccr *ccResolverWrapper) stopPolling() {
// stop polling
if ccr.polling != nil {
close(ccr.polling)
ccr.polling = nil
}
}
// poll begins or ends asynchronous polling of the resolver based on whether
// err is ErrBadResolverState.
func (ccr *ccResolverWrapper) poll(err error) {
ccr.pollingMu.Lock()
defer ccr.pollingMu.Unlock()
if err != nil {
ccr.stopPolling()
return
}
if ccr.polling != nil {
// already polling
return
}
p := make(chan struct{})
ccr.polling = p
system.ChildRunning(func() {
for i := 0; ; i++ {
ccr.resolveNow()
t := time.NewTimer(time.Second * time.Duration(2+i*5))
select {
case <-p:
t.Stop()
return
case <-ccr.done.Done():
// Resolver has been closed.
t.Stop()
return
case <-t.C:
select {
case <-p:
return
default:
}
// Timer expired; re-resolve.
case <-system.Closed():
return
}
}
})
}
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
if ccr.done.HasFired() {
return
}
ccr.curState = s
ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
}
func (ccr *ccResolverWrapper) ReportError(err error) {
if ccr.done.HasFired() {
return
}
ccr.poll(ccr.cc.updateResolverState(resolver.State{}, err))
}
func (ccr *ccResolverWrapper) NewAddress(adders []resolver.Address) {
if ccr.done.HasFired() {
return
}
ccr.curState.Addresses = adders
ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/h79/goutils.git
git@gitee.com:h79/goutils.git
h79
goutils
goutils
v1.21.16

搜索帮助