1 Star 0 Fork 0

zhuchance / kubernetes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
scheduler.go 43.73 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package scheduler
import (
"errors"
"fmt"
"math"
"math/rand"
"net"
"os/user"
"sync"
"sync/atomic"
"time"
"github.com/gogo/protobuf/proto"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/auth"
"github.com/mesos/mesos-go/detector"
mesos "github.com/mesos/mesos-go/mesosproto"
util "github.com/mesos/mesos-go/mesosutil"
"github.com/mesos/mesos-go/mesosutil/process"
"github.com/mesos/mesos-go/messenger"
"github.com/mesos/mesos-go/upid"
"github.com/pborman/uuid"
"golang.org/x/net/context"
)
const (
authTimeout = 5 * time.Second // timeout interval for an authentication attempt
registrationRetryIntervalMax = float64(1 * time.Minute)
registrationBackoffFactor = 2 * time.Second
)
var (
authenticationCanceledError = errors.New("authentication canceled")
)
type ErrDriverAborted struct {
Reason string
}
func (err *ErrDriverAborted) Error() string {
if err.Reason != "" {
return err.Reason
}
return "driver-aborted"
}
// helper to track authentication progress and to prevent multiple close() ops
// against a signalling chan. it's safe to invoke the func's of this struct
// even if the receiver pointer is nil.
type authenticationAttempt struct {
done chan struct{}
doneOnce sync.Once
}
func (a *authenticationAttempt) cancel() {
if a != nil {
a.doneOnce.Do(func() { close(a.done) })
}
}
func (a *authenticationAttempt) inProgress() bool {
if a != nil {
select {
case <-a.done:
return false
default:
return true
}
}
return false
}
type DriverConfig struct {
Scheduler Scheduler
Framework *mesos.FrameworkInfo
Master string
Credential *mesos.Credential // optional
WithAuthContext func(context.Context) context.Context // required when Credential != nil
HostnameOverride string // optional
BindingAddress net.IP // optional
BindingPort uint16 // optional
PublishedAddress net.IP // optional
NewMessenger func() (messenger.Messenger, error) // optional
NewDetector func() (detector.Master, error) // optional
}
// Concrete implementation of a SchedulerDriver that connects a
// Scheduler with a Mesos master. The MesosSchedulerDriver is
// thread-safe.
//
// Note that scheduler failover is supported in Mesos. After a
// scheduler is registered with Mesos it may failover (to a new
// process on the same machine or across multiple machines) by
// creating a new driver with the ID given to it in
// Scheduler.Registered().
//
// The driver is responsible for invoking the Scheduler callbacks as
// it communicates with the Mesos master.
//
// Note that blocking on the MesosSchedulerDriver (e.g., via
// MesosSchedulerDriver.Join) doesn't affect the scheduler callbacks
// in anyway because they are handled by a different thread.
//
// TODO(yifan): examples.
// See src/examples/test_framework.cpp for an example of using the
// MesosSchedulerDriver.
type MesosSchedulerDriver struct {
masterPid *upid.UPID
frameworkInfo *mesos.FrameworkInfo
self *upid.UPID
stopCh chan struct{}
status mesos.Status
messenger messenger.Messenger
masterDetector detector.Master
connected bool
connection uuid.UUID
failoverTimeout float64
failover bool
cache *schedCache
updates map[string]*mesos.StatusUpdate // Key is a UUID string.
tasks map[string]*mesos.TaskInfo // Key is a UUID string.
credential *mesos.Credential
authenticated bool
authenticating *authenticationAttempt
reauthenticate bool
withAuthContext func(context.Context) context.Context
dispatch func(context.Context, *upid.UPID, proto.Message) error // send a message somewhere
started chan struct{} // signal chan that closes upon a successful call to Start()
eventLock sync.RWMutex // guard for all driver state
withScheduler func(f func(s Scheduler)) // execute some func with respect to the given scheduler
done chan struct{} // signal chan that closes when no more events will be processed
}
// Create a new mesos scheduler driver with the given
// scheduler, framework info,
// master address, and credential(optional)
func NewMesosSchedulerDriver(config DriverConfig) (initializedDriver *MesosSchedulerDriver, err error) {
if config.Scheduler == nil {
err = fmt.Errorf("Scheduler callbacks required.")
} else if config.Master == "" {
err = fmt.Errorf("Missing master location URL.")
} else if config.Framework == nil {
err = fmt.Errorf("FrameworkInfo must be provided.")
} else if config.Credential != nil && config.WithAuthContext == nil {
err = fmt.Errorf("WithAuthContext must be provided when Credential != nil")
}
if err != nil {
return
}
framework := proto.Clone(config.Framework).(*mesos.FrameworkInfo)
// set default userid
if framework.GetUser() == "" {
user, err := user.Current()
if err != nil || user == nil {
if err != nil {
log.Warningf("Failed to obtain username: %v\n", err)
} else {
log.Warningln("Failed to obtain username.")
}
framework.User = proto.String("")
} else {
framework.User = proto.String(user.Username)
}
}
// default hostname
hostname := util.GetHostname(config.HostnameOverride)
if framework.GetHostname() == "" {
framework.Hostname = proto.String(hostname)
}
driver := &MesosSchedulerDriver{
frameworkInfo: framework,
stopCh: make(chan struct{}),
status: mesos.Status_DRIVER_NOT_STARTED,
cache: newSchedCache(),
credential: config.Credential,
failover: framework.Id != nil && len(framework.Id.GetValue()) > 0,
withAuthContext: config.WithAuthContext,
started: make(chan struct{}),
done: make(chan struct{}),
}
driver.withScheduler = driver.makeWithScheduler(config.Scheduler)
if framework.FailoverTimeout != nil && *framework.FailoverTimeout > 0 {
driver.failoverTimeout = *framework.FailoverTimeout * float64(time.Second)
log.V(1).Infof("found failover_timeout = %v", time.Duration(driver.failoverTimeout))
}
newDetector := config.NewDetector
if newDetector == nil {
newDetector = func() (detector.Master, error) {
return detector.New(config.Master)
}
}
newMessenger := config.NewMessenger
if newMessenger == nil {
newMessenger = func() (messenger.Messenger, error) {
process := process.New("scheduler")
return messenger.ForHostname(process, hostname, config.BindingAddress, config.BindingPort, config.PublishedAddress)
}
}
// initialize new detector.
if driver.masterDetector, err = newDetector(); err != nil {
return
} else if driver.messenger, err = newMessenger(); err != nil {
return
} else if err = driver.init(); err != nil {
return
} else {
initializedDriver = driver
}
return
}
func (driver *MesosSchedulerDriver) makeWithScheduler(cs Scheduler) func(func(Scheduler)) {
// mechanism that allows us to asynchronously invoke scheduler callbacks, but in a manner
// such that the callback invocations are serialized. useful because this will decouple the
// goroutine executing a messenger callback from the goroutine executing a scheduler callback,
// while preserving the serialization semantics for each type of callback handling.
// we use a chan to maintain the order of callback invocations; this is important for maintaining
// the order in which status updates are processed.
schedQueue := make(chan func(s Scheduler))
go func() {
defer func() {
close(driver.done)
log.V(1).Infoln("finished processing scheduler events")
}()
for f := range schedQueue {
f(cs)
}
}()
var schedLock sync.Mutex // synchronize write access to schedQueue
abort := int32(0)
// assume that when withScheduler is invoked eventLock is locked
return func(f func(s Scheduler)) {
const timeout = 1 * time.Second
t := time.NewTimer(timeout)
defer t.Stop()
trySend := func() (done bool) {
// don't block while attempting to enqueue a scheduler op; this could
// take a while depending upon the external scheduler implementation.
// also, it allows for multiple go-routines to re-compete for the lock
// every so often - this avoids indefinitely blocking a call to Abort().
driver.eventLock.Unlock()
schedLock.Lock()
defer func() {
schedLock.Unlock()
driver.eventLock.Lock()
}()
if atomic.LoadInt32(&abort) == 1 {
// can't send anymore
return true
}
// try to write to event queue...
select {
case schedQueue <- f:
done = true
case <-driver.stopCh:
done = true
case <-t.C:
}
// if stopping then close out the queue (keeping this check separate from
// the above on purpose! otherwise we could miss the close signal)
select {
case <-driver.stopCh:
if atomic.CompareAndSwapInt32(&abort, 0, 1) {
defer close(schedQueue)
log.V(1).Infoln("stopping scheduler event queue..")
// one last attempt, before we run out of time
select {
case schedQueue <- f:
case <-t.C:
}
}
default:
}
return
}
for !trySend() {
t.Reset(timeout) // TODO(jdef) add jitter to this
}
// have to do this outside trySend because here we're guarded by eventLock; it's ok
// if this happens more then once.
if atomic.LoadInt32(&abort) == 1 {
driver.withScheduler = func(f func(_ Scheduler)) {}
}
}
}
// init initializes the driver.
func (driver *MesosSchedulerDriver) init() error {
log.Infof("Initializing mesos scheduler driver\n")
driver.dispatch = driver.messenger.Send
// serialize all callbacks from the messenger
guarded := func(h messenger.MessageHandler) messenger.MessageHandler {
return messenger.MessageHandler(func(from *upid.UPID, msg proto.Message) {
driver.eventLock.Lock()
defer driver.eventLock.Unlock()
h(from, msg)
})
}
// Install handlers.
driver.messenger.Install(guarded(driver.frameworkRegistered), &mesos.FrameworkRegisteredMessage{})
driver.messenger.Install(guarded(driver.frameworkReregistered), &mesos.FrameworkReregisteredMessage{})
driver.messenger.Install(guarded(driver.resourcesOffered), &mesos.ResourceOffersMessage{})
driver.messenger.Install(guarded(driver.resourceOfferRescinded), &mesos.RescindResourceOfferMessage{})
driver.messenger.Install(guarded(driver.statusUpdated), &mesos.StatusUpdateMessage{})
driver.messenger.Install(guarded(driver.slaveLost), &mesos.LostSlaveMessage{})
driver.messenger.Install(guarded(driver.frameworkMessageRcvd), &mesos.ExecutorToFrameworkMessage{})
driver.messenger.Install(guarded(driver.frameworkErrorRcvd), &mesos.FrameworkErrorMessage{})
driver.messenger.Install(guarded(driver.exitedExecutor), &mesos.ExitedExecutorMessage{})
driver.messenger.Install(guarded(driver.handleMasterChanged), &mesos.InternalMasterChangeDetected{})
driver.messenger.Install(guarded(driver.handleAuthenticationResult), &mesos.InternalAuthenticationResult{})
return nil
}
// lead master detection callback.
func (driver *MesosSchedulerDriver) handleMasterChanged(from *upid.UPID, pbMsg proto.Message) {
if driver.status == mesos.Status_DRIVER_ABORTED {
log.Info("Ignoring master change because the driver is aborted.")
return
} else if !from.Equal(driver.self) {
log.Errorf("ignoring master changed message received from upid '%v'", from)
return
}
// Reconnect every time a master is detected.
if driver.connected {
log.V(3).Info("Disconnecting scheduler.")
driver.masterPid = nil
driver.withScheduler(func(s Scheduler) { s.Disconnected(driver) })
}
msg := pbMsg.(*mesos.InternalMasterChangeDetected)
master := msg.Master
driver.connected = false
driver.authenticated = false
if master != nil {
log.Infof("New master %s detected\n", master.GetPid())
pid, err := upid.Parse(master.GetPid())
if err != nil {
panic("Unable to parse Master's PID value.") // this should not happen.
}
driver.masterPid = pid // save for downstream ops.
driver.tryAuthentication()
} else {
log.Infoln("No master detected.")
}
}
// tryAuthentication expects to be guarded by eventLock
func (driver *MesosSchedulerDriver) tryAuthentication() {
if driver.authenticated {
// programming error
panic("already authenticated")
}
masterPid := driver.masterPid // save for referencing later in goroutine
if masterPid == nil {
log.Info("skipping authentication attempt because we lost the master")
return
}
if driver.authenticating.inProgress() {
// authentication is in progress, try to cancel it (we may too late already)
driver.authenticating.cancel()
driver.reauthenticate = true
return
}
if driver.credential != nil {
// authentication can block and we don't want to hold up the messenger loop
authenticating := &authenticationAttempt{done: make(chan struct{})}
go func() {
defer authenticating.cancel()
result := &mesos.InternalAuthenticationResult{
//TODO(jdef): is this really needed?
Success: proto.Bool(false),
Completed: proto.Bool(false),
Pid: proto.String(masterPid.String()),
}
// don't reference driver.authenticating here since it may have changed
if err := driver.authenticate(masterPid, authenticating); err != nil {
log.Errorf("Scheduler failed to authenticate: %v\n", err)
if err == auth.AuthenticationFailed {
result.Completed = proto.Bool(true)
}
} else {
result.Completed = proto.Bool(true)
result.Success = proto.Bool(true)
}
pid := driver.messenger.UPID()
driver.messenger.Route(context.TODO(), &pid, result)
}()
driver.authenticating = authenticating
} else {
log.Infoln("No credentials were provided. " +
"Attempting to register scheduler without authentication.")
driver.authenticated = true
go driver.doReliableRegistration(float64(registrationBackoffFactor))
}
}
func (driver *MesosSchedulerDriver) handleAuthenticationResult(from *upid.UPID, pbMsg proto.Message) {
if driver.status != mesos.Status_DRIVER_RUNNING {
log.V(1).Info("ignoring authenticate because driver is not running")
return
}
if !from.Equal(driver.self) {
log.Errorf("ignoring authentication result message received from upid '%v'", from)
return
}
if driver.authenticated {
// programming error
panic("already authenticated")
}
if driver.masterPid == nil {
log.Infoln("ignoring authentication result because master is lost")
driver.authenticating.cancel() // cancel any in-progress background attempt
// disable future retries until we get a new master
driver.reauthenticate = false
return
}
msg := pbMsg.(*mesos.InternalAuthenticationResult)
if driver.reauthenticate || !msg.GetCompleted() || driver.masterPid.String() != msg.GetPid() {
log.Infof("failed to authenticate with master %v: master changed", driver.masterPid)
driver.authenticating.cancel() // cancel any in-progress background authentication
driver.reauthenticate = false
driver.tryAuthentication()
return
}
if !msg.GetSuccess() {
log.Errorf("master %v refused authentication", driver.masterPid)
return
}
driver.authenticated = true
go driver.doReliableRegistration(float64(registrationBackoffFactor))
}
// ------------------------- Accessors ----------------------- //
// Status returns the current driver status
func (driver *MesosSchedulerDriver) Status() mesos.Status {
driver.eventLock.RLock()
defer driver.eventLock.RUnlock()
return driver.status
}
// Running returns true if the driver is in the DRIVER_RUNNING state
func (driver *MesosSchedulerDriver) Running() bool {
driver.eventLock.RLock()
defer driver.eventLock.RUnlock()
return driver.status == mesos.Status_DRIVER_RUNNING
}
// Connected returns true if the driver has a registered (and authenticated, if enabled)
// connection to the leading mesos master
func (driver *MesosSchedulerDriver) Connected() bool {
driver.eventLock.RLock()
defer driver.eventLock.RUnlock()
return driver.connected
}
// stopped returns true if the driver status != DRIVER_RUNNING; expects to be guarded by eventLock
func (driver *MesosSchedulerDriver) stopped() bool {
return driver.status != mesos.Status_DRIVER_RUNNING
}
// ---------------------- Handlers for Events from Master --------------- //
func (driver *MesosSchedulerDriver) frameworkRegistered(from *upid.UPID, pbMsg proto.Message) {
log.V(2).Infoln("Handling scheduler driver framework registered event.")
msg := pbMsg.(*mesos.FrameworkRegisteredMessage)
masterInfo := msg.GetMasterInfo()
masterPid := masterInfo.GetPid()
frameworkId := msg.GetFrameworkId()
if driver.status == mesos.Status_DRIVER_ABORTED {
log.Infof("ignoring FrameworkRegisteredMessage from master %s, driver is aborted", masterPid)
return
}
if driver.connected {
log.Infoln("ignoring FrameworkRegisteredMessage from master, driver is already connected", masterPid)
return
}
if driver.stopped() {
log.Infof("ignoring FrameworkRegisteredMessage from master %s, driver is stopped", masterPid)
return
}
if !driver.masterPid.Equal(from) {
log.Warningf("ignoring framework registered message because it was sent from '%v' instead of leading master '%v'", from, driver.masterPid)
return
}
log.Infof("Framework registered with ID=%s\n", frameworkId.GetValue())
driver.frameworkInfo.Id = frameworkId // generated by master.
driver.connected = true
driver.failover = false
driver.connection = uuid.NewUUID()
driver.withScheduler(func(s Scheduler) { s.Registered(driver, frameworkId, masterInfo) })
}
func (driver *MesosSchedulerDriver) frameworkReregistered(from *upid.UPID, pbMsg proto.Message) {
log.V(1).Infoln("Handling Scheduler re-registered event.")
msg := pbMsg.(*mesos.FrameworkReregisteredMessage)
if driver.status == mesos.Status_DRIVER_ABORTED {
log.Infoln("Ignoring FrameworkReregisteredMessage from master, driver is aborted!")
return
}
if driver.connected {
log.Infoln("Ignoring FrameworkReregisteredMessage from master,driver is already connected!")
return
}
if !driver.masterPid.Equal(from) {
log.Warningf("ignoring framework re-registered message because it was sent from '%v' instead of leading master '%v'", from, driver.masterPid)
return
}
// TODO(vv) detect if message was from leading-master (sched.cpp)
log.Infof("Framework re-registered with ID [%s] ", msg.GetFrameworkId().GetValue())
driver.connected = true
driver.failover = false
driver.connection = uuid.NewUUID()
driver.withScheduler(func(s Scheduler) { s.Reregistered(driver, msg.GetMasterInfo()) })
}
func (driver *MesosSchedulerDriver) resourcesOffered(from *upid.UPID, pbMsg proto.Message) {
log.V(2).Infoln("Handling resource offers.")
msg := pbMsg.(*mesos.ResourceOffersMessage)
if driver.status == mesos.Status_DRIVER_ABORTED {
log.Infoln("Ignoring ResourceOffersMessage, the driver is aborted!")
return
}
if !driver.connected {
log.Infoln("Ignoring ResourceOffersMessage, the driver is not connected!")
return
}
pidStrings := msg.GetPids()
if len(pidStrings) != len(msg.Offers) {
log.Errorln("Ignoring offers, Offer count does not match Slave PID count.")
return
}
for i, offer := range msg.Offers {
if pid, err := upid.Parse(pidStrings[i]); err == nil {
driver.cache.putOffer(offer, pid)
log.V(2).Infof("Cached offer %s from SlavePID %s", offer.Id.GetValue(), pid)
} else {
log.Warningf("Failed to parse offer PID '%v': %v", pid, err)
}
}
driver.withScheduler(func(s Scheduler) { s.ResourceOffers(driver, msg.Offers) })
}
func (driver *MesosSchedulerDriver) resourceOfferRescinded(from *upid.UPID, pbMsg proto.Message) {
log.V(1).Infoln("Handling resource offer rescinded.")
msg := pbMsg.(*mesos.RescindResourceOfferMessage)
if driver.status == mesos.Status_DRIVER_ABORTED {
log.Infoln("Ignoring RescindResourceOfferMessage, the driver is aborted!")
return
}
if !driver.connected {
log.Infoln("Ignoring ResourceOffersMessage, the driver is not connected!")
return
}
// TODO(vv) check for leading master (see sched.cpp)
log.V(1).Infoln("Rescinding offer ", msg.OfferId.GetValue())
driver.cache.removeOffer(msg.OfferId)
driver.withScheduler(func(s Scheduler) { s.OfferRescinded(driver, msg.OfferId) })
}
func (driver *MesosSchedulerDriver) send(upid *upid.UPID, msg proto.Message) error {
//TODO(jdef) should implement timeout here
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
c := make(chan error, 1)
go func() { c <- driver.dispatch(ctx, upid, msg) }()
select {
case <-ctx.Done():
<-c // wait for Send(...)
return ctx.Err()
case err := <-c:
return err
}
}
// statusUpdated expects to be guarded by eventLock
func (driver *MesosSchedulerDriver) statusUpdated(from *upid.UPID, pbMsg proto.Message) {
msg := pbMsg.(*mesos.StatusUpdateMessage)
if driver.status != mesos.Status_DRIVER_RUNNING {
log.V(1).Infoln("Ignoring StatusUpdate message, the driver is not running!")
return
}
if !from.Equal(driver.self) {
if !driver.connected {
log.V(1).Infoln("Ignoring StatusUpdate message, the driver is not connected!")
return
}
if !driver.masterPid.Equal(from) {
log.Warningf("ignoring status message because it was sent from '%v' instead of leading master '%v'", from, driver.masterPid)
return
}
}
log.V(2).Infof("Received status update from %q status source %q", from.String(), msg.GetPid())
status := msg.Update.GetStatus()
// see https://github.com/apache/mesos/blob/master/src/sched/sched.cpp#L887
// If the update does not have a 'uuid', it does not need
// acknowledging. However, prior to 0.23.0, the update uuid
// was required and always set. We also don't want to ACK updates
// that were internally generated. In 0.24.0, we can rely on the
// update uuid check here, until then we must still check for
// this being sent from the driver (from == UPID()) or from
// the master (pid == UPID()).
// TODO(vinod): Get rid of this logic in 0.25.0 because master
// and slave correctly set task status in 0.24.0.
if clearUUID := len(msg.Update.Uuid) == 0 || from.Equal(driver.self) || msg.GetPid() == driver.self.String(); clearUUID {
status.Uuid = nil
} else {
status.Uuid = msg.Update.Uuid
}
driver.withScheduler(func(s Scheduler) { s.StatusUpdate(driver, status) })
if driver.status == mesos.Status_DRIVER_ABORTED {
log.V(1).Infoln("Not sending StatusUpdate ACK, the driver is aborted!")
return
}
// Send StatusUpdate Acknowledgement; see above for the rules.
// Only send ACK if udpate was not from this driver and spec'd a UUID; this is compat w/ 0.23+
ackRequired := len(msg.Update.Uuid) > 0 && !from.Equal(driver.self) && msg.GetPid() != driver.self.String()
if ackRequired {
ackMsg := &mesos.StatusUpdateAcknowledgementMessage{
SlaveId: msg.Update.SlaveId,
FrameworkId: driver.frameworkInfo.Id,
TaskId: msg.Update.Status.TaskId,
Uuid: msg.Update.Uuid,
}
log.V(2).Infof("Sending ACK for status update %+v to %q", *msg.Update, from.String())
if err := driver.send(driver.masterPid, ackMsg); err != nil {
log.Errorf("Failed to send StatusUpdate ACK message: %v", err)
return
}
} else {
log.V(2).Infof("Not sending ACK, update is not from slave %q", from.String())
}
}
func (driver *MesosSchedulerDriver) exitedExecutor(from *upid.UPID, pbMsg proto.Message) {
log.V(1).Infoln("Handling ExitedExceutor event.")
msg := pbMsg.(*mesos.ExitedExecutorMessage)
if driver.status == mesos.Status_DRIVER_ABORTED {
log.V(1).Infoln("Ignoring ExitedExecutor message, the driver is aborted!")
return
}
if !driver.connected {
log.V(1).Infoln("Ignoring ExitedExecutor message, the driver is not connected!")
return
}
status := msg.GetStatus()
log.V(2).Infoln("Lost executor %q from slave %q for framework %q with status %d",
msg.GetExecutorId().GetValue(),
msg.GetSlaveId().GetValue(),
msg.GetFrameworkId().GetValue(),
status)
driver.withScheduler(func(s Scheduler) { s.ExecutorLost(driver, msg.GetExecutorId(), msg.GetSlaveId(), int(status)) })
}
func (driver *MesosSchedulerDriver) slaveLost(from *upid.UPID, pbMsg proto.Message) {
log.V(1).Infoln("Handling LostSlave event.")
msg := pbMsg.(*mesos.LostSlaveMessage)
if driver.status == mesos.Status_DRIVER_ABORTED {
log.V(1).Infoln("Ignoring LostSlave message, the driver is aborted!")
return
}
if !driver.connected {
log.V(1).Infoln("Ignoring LostSlave message, the driver is not connected!")
return
}
// TODO(VV) - detect leading master (see sched.cpp)
log.V(2).Infoln("Lost slave ", msg.SlaveId.GetValue())
driver.cache.removeSlavePid(msg.SlaveId)
driver.withScheduler(func(s Scheduler) { s.SlaveLost(driver, msg.SlaveId) })
}
func (driver *MesosSchedulerDriver) frameworkMessageRcvd(from *upid.UPID, pbMsg proto.Message) {
log.V(1).Infoln("Handling framework message event.")
msg := pbMsg.(*mesos.ExecutorToFrameworkMessage)
if driver.status == mesos.Status_DRIVER_ABORTED {
log.V(1).Infoln("Ignoring framwork message, the driver is aborted!")
return
}
log.V(1).Infoln("Received Framwork Message ", msg.String())
driver.withScheduler(func(s Scheduler) { s.FrameworkMessage(driver, msg.ExecutorId, msg.SlaveId, string(msg.Data)) })
}
func (driver *MesosSchedulerDriver) frameworkErrorRcvd(from *upid.UPID, pbMsg proto.Message) {
log.V(1).Infoln("Handling framework error event.")
msg := pbMsg.(*mesos.FrameworkErrorMessage)
driver.error(msg.GetMessage())
}
// ---------------------- Interface Methods ---------------------- //
// Starts the scheduler driver.
// Returns immediately if an error occurs within start sequence.
func (driver *MesosSchedulerDriver) Start() (mesos.Status, error) {
driver.eventLock.Lock()
defer driver.eventLock.Unlock()
return driver.start()
}
// start expected to be guarded by eventLock
func (driver *MesosSchedulerDriver) start() (mesos.Status, error) {
select {
case <-driver.started:
return driver.status, errors.New("Unable to Start: driver has already been started once.")
default: // proceed
}
log.Infoln("Starting the scheduler driver...")
if driver.status != mesos.Status_DRIVER_NOT_STARTED {
return driver.status, fmt.Errorf("Unable to Start, expecting driver status %s, but is %s:", mesos.Status_DRIVER_NOT_STARTED, driver.status)
}
// Start the messenger.
if err := driver.messenger.Start(); err != nil {
log.Errorf("Scheduler failed to start the messenger: %v\n", err)
return driver.status, err
}
pid := driver.messenger.UPID()
driver.self = &pid
driver.status = mesos.Status_DRIVER_RUNNING
close(driver.started)
log.Infof("Mesos scheduler driver started with PID=%v", driver.self)
listener := detector.OnMasterChanged(func(m *mesos.MasterInfo) {
driver.messenger.Route(context.TODO(), driver.self, &mesos.InternalMasterChangeDetected{
Master: m,
})
})
if driver.masterDetector != nil {
// register with Detect() AFTER we have a self pid from the messenger, otherwise things get ugly
// because our internal messaging depends on it. detector callbacks are routed over the messenger
// bus, maintaining serial (concurrency-safe) callback execution.
log.V(1).Infof("starting master detector %T: %+v", driver.masterDetector, driver.masterDetector)
driver.masterDetector.Detect(listener)
log.V(2).Infoln("master detector started")
}
return driver.status, nil
}
// authenticate against the spec'd master pid using the configured authenticationProvider.
// the authentication process is canceled upon either cancelation of authenticating, or
// else because it timed out (authTimeout).
//
// TODO(jdef) perhaps at some point in the future this will get pushed down into
// the messenger layer (e.g. to use HTTP-based authentication). We'd probably still
// specify the callback.Handler here, along with the user-selected authentication
// provider. Perhaps in the form of some messenger.AuthenticationConfig.
//
func (driver *MesosSchedulerDriver) authenticate(pid *upid.UPID, authenticating *authenticationAttempt) error {
log.Infof("authenticating with master %v", pid)
ctx, cancel := context.WithTimeout(context.Background(), authTimeout)
handler := &CredentialHandler{
pid: pid,
client: driver.self,
credential: driver.credential,
}
ctx = driver.withAuthContext(ctx)
ctx = auth.WithParentUPID(ctx, *driver.self)
ch := make(chan error, 1)
go func() { ch <- auth.Login(ctx, handler) }()
select {
case <-ctx.Done():
<-ch
return ctx.Err()
case <-authenticating.done:
cancel()
<-ch
return authenticationCanceledError
case e := <-ch:
cancel()
return e
}
}
func (driver *MesosSchedulerDriver) doReliableRegistration(maxBackoff float64) {
for {
if !driver.registerOnce() {
return
}
maxBackoff = math.Min(maxBackoff, registrationRetryIntervalMax)
// If failover timeout is present, bound the maximum backoff
// by 1/10th of the failover timeout.
if driver.failoverTimeout > 0 {
maxBackoff = math.Min(maxBackoff, driver.failoverTimeout/10.0)
}
// Determine the delay for next attempt by picking a random
// duration between 0 and 'maxBackoff' (jitter).
delay := time.Duration(maxBackoff * rand.Float64())
log.V(1).Infof("will retry registration in %v if necessary", delay)
t := time.NewTimer(delay)
defer t.Stop()
select {
case <-driver.stopCh:
return
case <-t.C:
maxBackoff *= 2
}
}
}
// registerOnce returns true if we should attempt another registration later; it is *not*
// guarded by eventLock: all access to mutable members of MesosSchedulerDriver should be
// explicitly synchronized.
func (driver *MesosSchedulerDriver) registerOnce() bool {
var (
failover bool
pid *upid.UPID
info *mesos.FrameworkInfo
)
if func() bool {
driver.eventLock.RLock()
defer driver.eventLock.RUnlock()
if driver.stopped() || driver.connected || driver.masterPid == nil || (driver.credential != nil && !driver.authenticated) {
log.V(1).Infof("skipping registration request: stopped=%v, connected=%v, authenticated=%v",
driver.stopped(), driver.connected, driver.authenticated)
return false
}
failover = driver.failover
pid = driver.masterPid
info = proto.Clone(driver.frameworkInfo).(*mesos.FrameworkInfo)
return true
}() {
// register framework
var message proto.Message
if len(info.GetId().GetValue()) > 0 {
// not the first time, or failing over
log.V(1).Infof("Reregistering with master: %v", pid)
message = &mesos.ReregisterFrameworkMessage{
Framework: info,
Failover: proto.Bool(failover),
}
} else {
log.V(1).Infof("Registering with master: %v", pid)
message = &mesos.RegisterFrameworkMessage{
Framework: info,
}
}
if err := driver.send(pid, message); err != nil {
log.Errorf("failed to send RegisterFramework message: %v", err)
if _, err = driver.Stop(failover); err != nil {
log.Errorf("failed to stop scheduler driver: %v", err)
}
}
return true
}
return false
}
//Join blocks until the driver is stopped.
//Should follow a call to Start()
func (driver *MesosSchedulerDriver) Join() (mesos.Status, error) {
driver.eventLock.Lock()
defer driver.eventLock.Unlock()
return driver.join()
}
// join expects to be guarded by eventLock
func (driver *MesosSchedulerDriver) join() (stat mesos.Status, err error) {
if stat = driver.status; stat != mesos.Status_DRIVER_RUNNING {
err = fmt.Errorf("Unable to Join, expecting driver status %s, but is %s", mesos.Status_DRIVER_RUNNING, stat)
return
}
timeout := 1 * time.Second
t := time.NewTimer(timeout)
defer t.Stop()
driver.eventLock.Unlock()
defer func() {
driver.eventLock.Lock()
stat = driver.status
}()
waitForDeath:
for {
select {
case <-driver.done:
break waitForDeath
case <-t.C:
}
t.Reset(timeout)
}
return
}
//Run starts and joins driver process and waits to be stopped or aborted.
func (driver *MesosSchedulerDriver) Run() (mesos.Status, error) {
driver.eventLock.Lock()
defer driver.eventLock.Unlock()
return driver.run()
}
// run expected to be guarded by eventLock
func (driver *MesosSchedulerDriver) run() (mesos.Status, error) {
stat, err := driver.start()
if err != nil {
return driver.stop(err, false)
}
if stat != mesos.Status_DRIVER_RUNNING {
return stat, fmt.Errorf("Unable to Run, expecting driver status %s, but is %s:", mesos.Status_DRIVER_RUNNING, driver.status)
}
log.Infoln("Scheduler driver running. Waiting to be stopped.")
return driver.join()
}
//Stop stops the driver.
func (driver *MesosSchedulerDriver) Stop(failover bool) (mesos.Status, error) {
driver.eventLock.Lock()
defer driver.eventLock.Unlock()
return driver.stop(nil, failover)
}
// stop expects to be guarded by eventLock
func (driver *MesosSchedulerDriver) stop(cause error, failover bool) (mesos.Status, error) {
log.Infoln("Stopping the scheduler driver")
if stat := driver.status; stat != mesos.Status_DRIVER_RUNNING {
return stat, fmt.Errorf("Unable to Stop, expected driver status %s, but is %s", mesos.Status_DRIVER_RUNNING, stat)
}
if driver.connected && !failover {
// unregister the framework
log.Infoln("Unregistering the scheduler driver")
message := &mesos.UnregisterFrameworkMessage{
FrameworkId: driver.frameworkInfo.Id,
}
//TODO(jdef) this is actually a little racy: we send an 'unregister' message but then
// immediately afterward the messenger is stopped in driver._stop(). so the unregister message
// may not actually end up being sent out.
if err := driver.send(driver.masterPid, message); err != nil {
log.Errorf("Failed to send UnregisterFramework message while stopping driver: %v\n", err)
if cause == nil {
cause = &ErrDriverAborted{}
}
return driver._stop(cause, mesos.Status_DRIVER_ABORTED)
}
time.Sleep(2 * time.Second)
}
// stop messenger
return driver._stop(cause, mesos.Status_DRIVER_STOPPED)
}
// stop expects to be guarded by eventLock
func (driver *MesosSchedulerDriver) _stop(cause error, stopStatus mesos.Status) (mesos.Status, error) {
// stop messenger
defer func() {
select {
case <-driver.stopCh:
return
default:
}
close(driver.stopCh)
if cause != nil {
log.V(1).Infof("Sending error via withScheduler: %v", cause)
driver.withScheduler(func(s Scheduler) { s.Error(driver, cause.Error()) })
} else {
// send a noop func, withScheduler needs to see that stopCh is closed
log.V(1).Infof("Sending kill signal to withScheduler")
driver.withScheduler(func(_ Scheduler) {})
}
}()
driver.status = stopStatus
driver.connected = false
log.Info("stopping messenger")
err := driver.messenger.Stop()
log.Infof("Stop() complete with status %v error %v", stopStatus, err)
return stopStatus, err
}
func (driver *MesosSchedulerDriver) Abort() (stat mesos.Status, err error) {
driver.eventLock.Lock()
defer driver.eventLock.Unlock()
return driver.abort(nil)
}
// abort expects to be guarded by eventLock
func (driver *MesosSchedulerDriver) abort(cause error) (stat mesos.Status, err error) {
if driver.masterDetector != nil {
defer driver.masterDetector.Cancel()
}
log.Infof("Aborting framework [%+v]", driver.frameworkInfo.Id)
if driver.connected {
_, err = driver.stop(cause, true)
} else {
driver.messenger.Stop()
}
stat = mesos.Status_DRIVER_ABORTED
driver.status = stat
return
}
func (driver *MesosSchedulerDriver) LaunchTasks(offerIds []*mesos.OfferID, tasks []*mesos.TaskInfo, filters *mesos.Filters) (mesos.Status, error) {
driver.eventLock.Lock()
defer driver.eventLock.Unlock()
if stat := driver.status; stat != mesos.Status_DRIVER_RUNNING {
return stat, fmt.Errorf("Unable to LaunchTasks, expected driver status %s, but got %s", mesos.Status_DRIVER_RUNNING, stat)
}
// Launch tasks
if !driver.connected {
log.Infoln("Ignoring LaunchTasks message, disconnected from master.")
// Send statusUpdate with status=TASK_LOST for each task.
// See sched.cpp L#823
for _, task := range tasks {
driver.pushLostTask(task, "Master is disconnected")
}
return driver.status, fmt.Errorf("Not connected to master. Tasks marked as lost.")
}
okTasks := make([]*mesos.TaskInfo, 0, len(tasks))
// Set TaskInfo.executor.framework_id, if it's missing.
for _, task := range tasks {
if task.Executor != nil && task.Executor.FrameworkId == nil {
task.Executor.FrameworkId = driver.frameworkInfo.Id
}
okTasks = append(okTasks, task)
}
for _, offerId := range offerIds {
for _, task := range okTasks {
// Keep only the slave PIDs where we run tasks so we can send
// framework messages directly.
if driver.cache.containsOffer(offerId) {
if driver.cache.getOffer(offerId).offer.SlaveId.Equal(task.SlaveId) {
// cache the tasked slave, for future communication
pid := driver.cache.getOffer(offerId).slavePid
driver.cache.putSlavePid(task.SlaveId, pid)
} else {
log.Warningf("Attempting to launch task %s with the wrong slaveId offer %s\n", task.TaskId.GetValue(), task.SlaveId.GetValue())
}
} else {
log.Warningf("Attempting to launch task %s with unknown offer %s\n", task.TaskId.GetValue(), offerId.GetValue())
}
}
driver.cache.removeOffer(offerId) // if offer
}
// launch tasks
message := &mesos.LaunchTasksMessage{
FrameworkId: driver.frameworkInfo.Id,
OfferIds: offerIds,
Tasks: okTasks,
Filters: filters,
}
if err := driver.send(driver.masterPid, message); err != nil {
for _, task := range tasks {
driver.pushLostTask(task, "Unable to launch tasks: "+err.Error())
}
log.Errorf("Failed to send LaunchTask message: %v\n", err)
return driver.status, err
}
return driver.status, nil
}
// pushLostTask expects to be guarded by eventLock
func (driver *MesosSchedulerDriver) pushLostTask(taskInfo *mesos.TaskInfo, why string) {
msg := &mesos.StatusUpdateMessage{
Update: &mesos.StatusUpdate{
FrameworkId: driver.frameworkInfo.Id,
Status: &mesos.TaskStatus{
TaskId: taskInfo.TaskId,
State: mesos.TaskState_TASK_LOST.Enum(),
Source: mesos.TaskStatus_SOURCE_MASTER.Enum(),
Message: proto.String(why),
Reason: mesos.TaskStatus_REASON_MASTER_DISCONNECTED.Enum(),
},
SlaveId: taskInfo.SlaveId,
ExecutorId: taskInfo.Executor.ExecutorId,
Timestamp: proto.Float64(float64(time.Now().Unix())),
},
Pid: proto.String(driver.self.String()),
}
// put it on internal chanel
// will cause handler to push to attached Scheduler
driver.statusUpdated(driver.self, msg)
}
func (driver *MesosSchedulerDriver) KillTask(taskId *mesos.TaskID) (mesos.Status, error) {
driver.eventLock.Lock()
defer driver.eventLock.Unlock()
if stat := driver.status; stat != mesos.Status_DRIVER_RUNNING {
return stat, fmt.Errorf("Unable to KillTask, expecting driver status %s, but got %s", mesos.Status_DRIVER_RUNNING, stat)
}
if !driver.connected {
log.Infoln("Ignoring kill task message, disconnected from master.")
return driver.status, fmt.Errorf("Not connected to master")
}
message := &mesos.KillTaskMessage{
FrameworkId: driver.frameworkInfo.Id,
TaskId: taskId,
}
if err := driver.send(driver.masterPid, message); err != nil {
log.Errorf("Failed to send KillTask message: %v\n", err)
return driver.status, err
}
return driver.status, nil
}
func (driver *MesosSchedulerDriver) RequestResources(requests []*mesos.Request) (mesos.Status, error) {
driver.eventLock.Lock()
defer driver.eventLock.Unlock()
if stat := driver.status; stat != mesos.Status_DRIVER_RUNNING {
return stat, fmt.Errorf("Unable to RequestResources, expecting driver status %s, but got %s", mesos.Status_DRIVER_RUNNING, stat)
}
if !driver.connected {
log.Infoln("Ignoring request resource message, disconnected from master.")
return driver.status, fmt.Errorf("Not connected to master")
}
message := &mesos.ResourceRequestMessage{
FrameworkId: driver.frameworkInfo.Id,
Requests: requests,
}
if err := driver.send(driver.masterPid, message); err != nil {
log.Errorf("Failed to send ResourceRequest message: %v\n", err)
return driver.status, err
}
return driver.status, nil
}
func (driver *MesosSchedulerDriver) DeclineOffer(offerId *mesos.OfferID, filters *mesos.Filters) (mesos.Status, error) {
// NOTE: don't lock eventLock here because we're delegating to LaunchTasks() and that does it for us
// launching an empty task list will decline the offer
return driver.LaunchTasks([]*mesos.OfferID{offerId}, []*mesos.TaskInfo{}, filters)
}
func (driver *MesosSchedulerDriver) ReviveOffers() (mesos.Status, error) {
driver.eventLock.Lock()
defer driver.eventLock.Unlock()
if stat := driver.status; stat != mesos.Status_DRIVER_RUNNING {
return stat, fmt.Errorf("Unable to ReviveOffers, expecting driver status %s, but got %s", mesos.Status_DRIVER_RUNNING, stat)
}
if !driver.connected {
log.Infoln("Ignoring revive offers message, disconnected from master.")
return driver.status, fmt.Errorf("Not connected to master.")
}
message := &mesos.ReviveOffersMessage{
FrameworkId: driver.frameworkInfo.Id,
}
if err := driver.send(driver.masterPid, message); err != nil {
log.Errorf("Failed to send ReviveOffers message: %v\n", err)
return driver.status, err
}
return driver.status, nil
}
func (driver *MesosSchedulerDriver) SendFrameworkMessage(executorId *mesos.ExecutorID, slaveId *mesos.SlaveID, data string) (mesos.Status, error) {
driver.eventLock.Lock()
defer driver.eventLock.Unlock()
if stat := driver.status; stat != mesos.Status_DRIVER_RUNNING {
return stat, fmt.Errorf("Unable to SendFrameworkMessage, expecting driver status %s, but got %s", mesos.Status_DRIVER_RUNNING, stat)
}
if !driver.connected {
log.Infoln("Ignoring send framework message, disconnected from master.")
return driver.status, fmt.Errorf("Not connected to master")
}
message := &mesos.FrameworkToExecutorMessage{
SlaveId: slaveId,
FrameworkId: driver.frameworkInfo.Id,
ExecutorId: executorId,
Data: []byte(data),
}
// Use list of cached slaveIds from previous offers.
// Send frameworkMessage directly to cached slave, otherwise to master.
if driver.cache.containsSlavePid(slaveId) {
slavePid := driver.cache.getSlavePid(slaveId)
if slavePid.Equal(driver.self) {
return driver.status, nil
}
if err := driver.send(slavePid, message); err != nil {
log.Errorf("Failed to send framework to executor message: %v\n", err)
return driver.status, err
}
} else {
// slavePid not cached, send to master.
if err := driver.send(driver.masterPid, message); err != nil {
log.Errorf("Failed to send framework to executor message: %v\n", err)
return driver.status, err
}
}
return driver.status, nil
}
func (driver *MesosSchedulerDriver) ReconcileTasks(statuses []*mesos.TaskStatus) (mesos.Status, error) {
driver.eventLock.Lock()
defer driver.eventLock.Unlock()
if stat := driver.status; stat != mesos.Status_DRIVER_RUNNING {
return stat, fmt.Errorf("Unable to ReconcileTasks, expecting driver status %s, but got %s", mesos.Status_DRIVER_RUNNING, stat)
}
if !driver.connected {
log.Infoln("Ignoring send Reconcile Tasks message, disconnected from master.")
return driver.status, fmt.Errorf("Not connected to master.")
}
message := &mesos.ReconcileTasksMessage{
FrameworkId: driver.frameworkInfo.Id,
Statuses: statuses,
}
if err := driver.send(driver.masterPid, message); err != nil {
log.Errorf("Failed to send reconcile tasks message: %v\n", err)
return driver.status, err
}
return driver.status, nil
}
// error expects to be guarded by eventLock
func (driver *MesosSchedulerDriver) error(err string) {
if driver.status == mesos.Status_DRIVER_ABORTED {
log.V(3).Infoln("Ignoring error message, the driver is aborted!")
return
}
driver.abort(&ErrDriverAborted{Reason: err})
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/meoom/kubernetes.git
git@gitee.com:meoom/kubernetes.git
meoom
kubernetes
kubernetes
v1.1.2-beta.0

搜索帮助

344bd9b3 5694891 D2dac590 5694891