代码拉取完成,页面将自动刷新
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package owner
import (
"fmt"
"math"
"os"
"strconv"
"sync/atomic"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/juju/errors"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
const (
newSessionRetryInterval = 200 * time.Millisecond
logIntervalCnt = int(3 * time.Second / newSessionRetryInterval)
)
// Manager is used to campaign the owner and manage the owner information.
type Manager interface {
// ID returns the ID of the manager.
ID() string
// IsOwner returns whether the ownerManager is the owner.
IsOwner() bool
// SetOwner sets whether the ownerManager is the owner.
SetOwner(isOwner bool)
// GetOwnerID gets the owner ID.
GetOwnerID(ctx context.Context) (string, error)
// CampaignOwner campaigns the owner.
CampaignOwner(ctx context.Context) error
// Cancel cancels this etcd ownerManager campaign.
Cancel()
}
const (
// NewSessionDefaultRetryCnt is the default retry times when create new session.
NewSessionDefaultRetryCnt = 3
// NewSessionRetryUnlimited is the unlimited retry times when create new session.
NewSessionRetryUnlimited = math.MaxInt64
)
// ownerManager represents the structure which is used for electing owner.
type ownerManager struct {
owner int32
id string // id is the ID of the manager.
key string
prompt string
etcdCli *clientv3.Client
cancel context.CancelFunc
}
// NewOwnerManager creates a new Manager.
func NewOwnerManager(etcdCli *clientv3.Client, prompt, id, key string, cancel context.CancelFunc) Manager {
return &ownerManager{
etcdCli: etcdCli,
id: id,
key: key,
prompt: prompt,
cancel: cancel,
}
}
// ID implements Manager.ID interface.
func (m *ownerManager) ID() string {
return m.id
}
// IsOwner implements Manager.IsOwner interface.
func (m *ownerManager) IsOwner() bool {
return atomic.LoadInt32(&m.owner) == 1
}
// SetOwner implements Manager.SetOwner interface.
func (m *ownerManager) SetOwner(isOwner bool) {
if isOwner {
atomic.StoreInt32(&m.owner, 1)
} else {
atomic.StoreInt32(&m.owner, 0)
}
}
// Cancel implements Manager.Cancel interface.
func (m *ownerManager) Cancel() {
m.cancel()
}
// ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing.
var ManagerSessionTTL = 60
// setManagerSessionTTL sets the ManagerSessionTTL value, it's used for testing.
func setManagerSessionTTL() error {
ttlStr := os.Getenv("tidb_manager_ttl")
if len(ttlStr) == 0 {
return nil
}
ttl, err := strconv.Atoi(ttlStr)
if err != nil {
return errors.Trace(err)
}
ManagerSessionTTL = ttl
return nil
}
// NewSession creates a new etcd session.
func NewSession(ctx context.Context, logPrefix string, etcdCli *clientv3.Client, retryCnt, ttl int) (*concurrency.Session, error) {
var err error
var etcdSession *concurrency.Session
failedCnt := 0
for i := 0; i < retryCnt; i++ {
if err = contextDone(ctx, err); err != nil {
return etcdSession, errors.Trace(err)
}
// gofail: var closeClient bool
// if closeClient {
// etcdCli.Close()
// }
// gofail: var closeGrpc bool
// if closeGrpc {
// etcdCli.ActiveConnection().Close()
// }
startTime := time.Now()
etcdSession, err = concurrency.NewSession(etcdCli,
concurrency.WithTTL(ttl), concurrency.WithContext(ctx))
metrics.NewSessionHistogram.WithLabelValues(logPrefix, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
if err == nil {
break
}
if failedCnt%logIntervalCnt == 0 {
log.Warnf("%s failed to new session to etcd, err %v", logPrefix, err)
}
time.Sleep(newSessionRetryInterval)
failedCnt++
}
return etcdSession, errors.Trace(err)
}
// CampaignOwner implements Manager.CampaignOwner interface.
func (m *ownerManager) CampaignOwner(ctx context.Context) error {
logPrefix := fmt.Sprintf("[%s] %s", m.prompt, m.key)
session, err := NewSession(ctx, logPrefix, m.etcdCli, NewSessionDefaultRetryCnt, ManagerSessionTTL)
if err != nil {
return errors.Trace(err)
}
cancelCtx, _ := context.WithCancel(ctx)
go m.campaignLoop(cancelCtx, session)
return nil
}
func (m *ownerManager) campaignLoop(ctx context.Context, etcdSession *concurrency.Session) {
defer func() {
if r := recover(); r != nil {
buf := util.GetStack()
log.Errorf("[%s] recover panic:%v, %s", m.prompt, r, buf)
metrics.PanicCounter.WithLabelValues(metrics.LabelDDLOwner).Inc()
}
}()
logPrefix := fmt.Sprintf("[%s] %s ownerManager %s", m.prompt, m.key, m.id)
var err error
for {
if err != nil {
metrics.CampaignOwnerCounter.WithLabelValues(m.prompt, err.Error()).Inc()
}
select {
case <-etcdSession.Done():
log.Infof("%s etcd session is done, creates a new one", logPrefix)
leaseID := etcdSession.Lease()
etcdSession, err = NewSession(ctx, logPrefix, m.etcdCli, NewSessionRetryUnlimited, ManagerSessionTTL)
if err != nil {
log.Infof("%s break campaign loop, NewSession err %v", logPrefix, err)
m.revokeSession(logPrefix, leaseID)
return
}
case <-ctx.Done():
m.revokeSession(logPrefix, etcdSession.Lease())
return
default:
}
// If the etcd server turns clocks forward,the following case may occur.
// The etcd server deletes this session's lease ID, but etcd session doesn't find it.
// In this time if we do the campaign operation, the etcd server will return ErrLeaseNotFound.
if terror.ErrorEqual(err, rpctypes.ErrLeaseNotFound) {
if etcdSession != nil {
err = etcdSession.Close()
log.Infof("%s etcd session encounters the error of lease not found, closes it err %s", logPrefix, err)
}
continue
}
elec := concurrency.NewElection(etcdSession, m.key)
err = elec.Campaign(ctx, m.id)
if err != nil {
log.Infof("%s failed to campaign, err %v", logPrefix, err)
continue
}
ownerKey, err := GetOwnerInfo(ctx, elec, logPrefix, m.id)
if err != nil {
continue
}
m.SetOwner(true)
m.watchOwner(ctx, etcdSession, ownerKey)
m.SetOwner(false)
metrics.CampaignOwnerCounter.WithLabelValues(m.prompt, metrics.NoLongerOwner).Inc()
log.Warnf("%s isn't the owner", logPrefix)
}
}
func (m *ownerManager) revokeSession(logPrefix string, leaseID clientv3.LeaseID) {
// Revoke the session lease.
// If revoke takes longer than the ttl, lease is expired anyway.
cancelCtx, cancel := context.WithTimeout(context.Background(),
time.Duration(ManagerSessionTTL)*time.Second)
_, err := m.etcdCli.Revoke(cancelCtx, leaseID)
cancel()
log.Infof("%s break campaign loop, revoke err %v", logPrefix, err)
}
// GetOwnerID implements Manager.GetOwnerID interface.
func (m *ownerManager) GetOwnerID(ctx context.Context) (string, error) {
resp, err := m.etcdCli.Get(ctx, m.key, clientv3.WithFirstCreate()...)
if err != nil {
return "", errors.Trace(err)
}
if len(resp.Kvs) == 0 {
return "", concurrency.ErrElectionNoLeader
}
return string(resp.Kvs[0].Value), nil
}
// GetOwnerInfo gets the owner information.
func GetOwnerInfo(ctx context.Context, elec *concurrency.Election, logPrefix, id string) (string, error) {
resp, err := elec.Leader(ctx)
if err != nil {
// If no leader elected currently, it returns ErrElectionNoLeader.
log.Infof("%s failed to get leader, err %v", logPrefix, err)
return "", errors.Trace(err)
}
ownerID := string(resp.Kvs[0].Value)
log.Infof("%s, owner is %v", logPrefix, ownerID)
if ownerID != id {
log.Warnf("%s isn't the owner", logPrefix)
return "", errors.New("ownerInfoNotMatch")
}
return string(resp.Kvs[0].Key), nil
}
func (m *ownerManager) watchOwner(ctx context.Context, etcdSession *concurrency.Session, key string) {
logPrefix := fmt.Sprintf("[%s] ownerManager %s watch owner key %v", m.prompt, m.id, key)
log.Debugf("%s", logPrefix)
watchCh := m.etcdCli.Watch(ctx, key)
for {
select {
case resp, ok := <-watchCh:
if !ok {
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.WatcherClosed).Inc()
log.Infof("%s watcher is closed, no owner", logPrefix)
return
}
if resp.Canceled {
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.Cancelled).Inc()
log.Infof("%s canceled, no owner", logPrefix)
return
}
for _, ev := range resp.Events {
if ev.Type == mvccpb.DELETE {
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.Deleted).Inc()
log.Infof("%s failed, owner is deleted", logPrefix)
return
}
}
case <-etcdSession.Done():
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.SessionDone).Inc()
return
case <-ctx.Done():
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.CtxDone).Inc()
return
}
}
}
func init() {
err := setManagerSessionTTL()
if err != nil {
log.Warnf("set manager session TTL failed %v", err)
}
}
func contextDone(ctx context.Context, err error) error {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
default:
}
// Sometime the ctx isn't closed, but the etcd client is closed,
// we need to treat it as if context is done.
// TODO: Make sure ctx is closed with etcd client.
if terror.ErrorEqual(err, context.Canceled) ||
terror.ErrorEqual(err, context.DeadlineExceeded) ||
terror.ErrorEqual(err, grpc.ErrClientConnClosing) {
return errors.Trace(err)
}
return nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。