4 Star 6 Fork 5

Humpback / humpback-center

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
cluster.go 47.60 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542
package cluster
import "gitee.com/humpbacks/common/models"
import "gitee.com/humpbacks/humpback-center/notify"
import "gitee.com/humpbacks/humpback-center/cluster/storage"
import "gitee.com/humpbacks/humpback-center/cluster/types"
import "github.com/humpback/discovery"
import "github.com/humpback/discovery/backends"
import "github.com/humpback/gounits/logger"
import "github.com/humpback/gounits/system"
import (
"fmt"
"math/rand"
"reflect"
"sort"
"strconv"
"strings"
"sync"
"time"
)
var pendingWaitForInterval = time.Duration(time.Second * 5)
// pendingContainer is exported
type pendingContainer struct {
GroupID string
Name string
Config models.Container
}
// Server is exported
type Server struct {
Name string `json:"Name"`
IP string `json:"IP"`
}
// Group is exported
// Servers: cluster group's servers.
// ContactInfo: cluster manager contactinfo.
type Group struct {
ID string `json:"ID"`
Name string `json:"Name"`
IsCluster bool `json:"IsCluster"`
Location string `json:"ClusterLocation"`
Servers []Server `json:"Servers"`
ContactInfo string `json:"ContactInfo"`
}
// Cluster is exported
type Cluster struct {
sync.RWMutex
Location string
NotifySender *notify.NotifySender
Discovery *discovery.Discovery
overcommitRatio float64
createRetry int64
removeDelay time.Duration
recoveryInterval time.Duration
randSeed *rand.Rand
nodeCache *types.NodeCache
configCache *ContainersConfigCache
migtatorCache *MigrateContainersCache
enginesPool *EnginesPool
hooksProcessor *HooksProcessor
storageDriver *storage.DataStorage
pendingContainers map[string]*pendingContainer
engines map[string]*Engine
groups map[string]*Group
stopCh chan struct{}
}
// NewCluster is exported
func NewCluster(driverOpts system.DriverOpts, notifySender *notify.NotifySender, discovery *discovery.Discovery) (*Cluster, error) {
if discovery == nil {
return nil, ErrClusterDiscoveryInvalid
}
overcommitratio := 0.05
if val, ret := driverOpts.Float("overcommit", ""); ret {
if val <= float64(-1) {
logger.WARN("[#cluster#] set overcommit should be larger than -1, %f is invalid.", val)
} else if val < float64(0) {
logger.WARN("[#cluster#] opts, -1 < overcommit < 0 will make center take less resource than docker engine offers.")
overcommitratio = val
} else {
overcommitratio = val
}
}
createretry := int64(0)
if val, ret := driverOpts.Int("createretry", ""); ret {
if val < 0 {
logger.WARN("[#cluster#] set createretry should be larger than or equal to 0, %d is invalid.", val)
} else {
createretry = val
}
}
removedelay := time.Duration(0)
if val, ret := driverOpts.String("removedelay", ""); ret {
if dur, err := time.ParseDuration(val); err == nil {
removedelay = dur
}
}
migratedelay := 30 * time.Second
if val, ret := driverOpts.String("migratedelay", ""); ret {
if dur, err := time.ParseDuration(val); err == nil {
migratedelay = dur
}
}
recoveryInterval := 150 * time.Second
if val, ret := driverOpts.String("recoveryinterval", ""); ret {
if dur, err := time.ParseDuration(val); err == nil {
recoveryInterval = dur
}
}
clusterLocation := ""
if val, ret := driverOpts.String("location", ""); ret {
clusterLocation = strings.TrimSpace(val)
}
cacheRoot := ""
if val, ret := driverOpts.String("cacheroot", ""); ret {
cacheRoot = val
}
enginesPool := NewEnginesPool()
migrateContainersCache := NewMigrateContainersCache(migratedelay)
configCache, err := NewContainersConfigCache(cacheRoot)
if err != nil {
return nil, err
}
dataPath := "./data"
if val, ret := driverOpts.String("datapath", ""); ret {
dataPath = strings.TrimSpace(val)
}
storageDriver, err := storage.NewDataStorage(dataPath)
if err != nil {
return nil, err
}
cluster := &Cluster{
Location: clusterLocation,
NotifySender: notifySender,
Discovery: discovery,
overcommitRatio: overcommitratio,
createRetry: createretry,
removeDelay: removedelay,
recoveryInterval: recoveryInterval,
randSeed: rand.New(rand.NewSource(time.Now().UTC().UnixNano())),
nodeCache: types.NewNodeCache(),
configCache: configCache,
migtatorCache: migrateContainersCache,
enginesPool: enginesPool,
hooksProcessor: NewHooksProcessor(),
storageDriver: storageDriver,
pendingContainers: make(map[string]*pendingContainer),
engines: make(map[string]*Engine),
groups: make(map[string]*Group),
stopCh: make(chan struct{}),
}
enginesPool.SetCluster(cluster)
migrateContainersCache.SetCluster(cluster)
return cluster, nil
}
// Start is exported
// Cluster start, init container config cache watch open discovery service
func (cluster *Cluster) Start() error {
if err := cluster.storageDriver.Open(); err != nil {
return err
}
cluster.configCache.Init()
if cluster.Discovery != nil {
if cluster.Location != "" {
logger.INFO("[#cluster#] cluster location: %s", cluster.Location)
}
logger.INFO("[#cluster#] discovery service watching...")
cluster.Discovery.WatchNodes(cluster.stopCh, cluster.watchDiscoveryHandleFunc)
cluster.hooksProcessor.Start()
go cluster.recoveryContainersLoop()
return nil
}
return ErrClusterDiscoveryInvalid
}
// Stop is exported
// Cluster stop
// close discovery service
// stop pendEngines loop
func (cluster *Cluster) Stop() {
close(cluster.stopCh)
cluster.enginesPool.Release()
cluster.hooksProcessor.Close()
cluster.storageDriver.Close()
logger.INFO("[#cluster#] discovery service closed.")
}
// GetMetaDataEngines is exported
func (cluster *Cluster) GetMetaDataEngines(metaid string) (*MetaData, []*Engine, error) {
metaData := cluster.GetMetaData(metaid)
if metaData == nil {
return nil, nil, ErrClusterMetaDataNotFound
}
engines := cluster.GetGroupEngines(metaData.GroupID)
if engines == nil {
return nil, nil, ErrClusterGroupNotFound
}
return metaData, engines, nil
}
// GetMetaData is exported
func (cluster *Cluster) GetMetaData(metaid string) *MetaData {
return cluster.configCache.GetMetaData(metaid)
}
// GetMetaBase is exported
func (cluster *Cluster) GetMetaBase(metaid string) *MetaBase {
if metaData := cluster.GetMetaData(metaid); metaData != nil {
return &metaData.MetaBase
}
return nil
}
// GetEngine is exported
func (cluster *Cluster) GetEngine(ip string) *Engine {
cluster.RLock()
defer cluster.RUnlock()
if engine, ret := cluster.engines[ip]; ret {
return engine
}
return nil
}
// GetGroups is exported
func (cluster *Cluster) GetGroups() []*Group {
cluster.RLock()
defer cluster.RUnlock()
groups := []*Group{}
for _, group := range cluster.groups {
groups = append(groups, group)
}
return groups
}
// GetEngineGroups is exported
func (cluster *Cluster) GetEngineGroups(engine *Engine) []*Group {
cluster.RLock()
defer cluster.RUnlock()
groups := []*Group{}
for _, group := range cluster.groups {
for _, server := range group.Servers {
if server.IP != "" && server.IP == engine.IP {
groups = append(groups, group)
break
}
}
}
for _, group := range cluster.groups {
for _, server := range group.Servers {
if server.Name != "" && server.Name == engine.Name {
groups = append(groups, group)
break
}
}
}
groups = removeDuplicatesGroups(groups)
return groups
}
// GetServerOfEngines is exported
func (cluster *Cluster) GetServerOfEngines(server Server) *Engine {
engine := searchServerOfEngines(server, cluster.engines)
if engine == nil {
engine = searchServerOfStorage(server, cluster.storageDriver.NodeStorage)
if engine == nil {
engine = &Engine{
Name: server.Name,
IP: server.IP,
EngineLabels: map[string]string{},
NodeLabels: map[string]string{},
}
}
engine.StateText = stateText[StateDisconnected]
engine.state = StateDisconnected
}
return engine
}
// GetGroupAllEngines is exported
// Returns all engine under group and contains offline (cluster engines not exists.)
func (cluster *Cluster) GetGroupAllEngines(groupid string) []*Engine {
cluster.RLock()
defer cluster.RUnlock()
group, ret := cluster.groups[groupid]
if !ret {
return nil
}
engines := []*Engine{}
for _, server := range group.Servers {
engine := searchServerOfEngines(server, cluster.engines)
if engine == nil {
engine = searchServerOfStorage(server, cluster.storageDriver.NodeStorage)
if engine == nil {
engine = &Engine{
Name: server.Name,
IP: server.IP,
EngineLabels: map[string]string{},
NodeLabels: map[string]string{},
}
}
engine.StateText = stateText[StateDisconnected]
engine.state = StateDisconnected
}
engines = append(engines, engine)
}
engines = removeDuplicatesEngines(engines)
return engines
}
// GetGroupEngines is exported
// Returns pairs engine under group and cluster engines is exists
func (cluster *Cluster) GetGroupEngines(groupid string) []*Engine {
cluster.RLock()
defer cluster.RUnlock()
group, ret := cluster.groups[groupid]
if !ret {
return nil
}
engines := []*Engine{}
for _, server := range group.Servers {
if engine := searchServerOfEngines(server, cluster.engines); engine != nil {
engines = append(engines, engine)
}
}
engines = removeDuplicatesEngines(engines)
return engines
}
// InGroupsContains is exported
func (cluster *Cluster) InGroupsContains(ip string, name string) bool {
cluster.RLock()
defer cluster.RUnlock()
for _, group := range cluster.groups {
for _, server := range group.Servers {
if server.IP != "" && server.IP == ip {
return true
}
}
}
for _, group := range cluster.groups {
for _, server := range group.Servers {
if server.Name != "" && server.Name == name {
return true
}
}
}
return false
}
// GetMetaEnginesContainers is exported
func (cluster *Cluster) GetMetaEnginesContainers(metaData *MetaData, metaEngines map[string]*Engine) *types.GroupContainer {
groupContainer := &types.GroupContainer{
GroupID: metaData.GroupID,
MetaID: metaData.MetaID,
IsRemoveDelay: metaData.IsRemoveDelay,
IsRecovery: metaData.IsRecovery,
Instances: metaData.Instances,
Placement: metaData.Placement,
WebHooks: metaData.WebHooks,
Config: metaData.Config,
Containers: make([]*types.EngineContainer, 0),
CreateAt: metaData.CreateAt,
LastUpdateAt: metaData.LastUpdateAt,
}
baseConfigs := cluster.configCache.GetMetaDataBaseConfigs(metaData.MetaID)
for _, baseConfig := range baseConfigs {
for _, engine := range metaEngines {
if engine.IsHealthy() && engine.HasMeta(metaData.MetaID) {
if container := engine.Container(baseConfig.ID); container != nil {
groupContainer.Containers = append(groupContainer.Containers, &types.EngineContainer{
IP: engine.IP,
HostName: engine.Name,
Container: container.Config.Container,
})
break
}
}
}
}
return groupContainer
}
// RefreshEnginesContainers is exported
func (cluster *Cluster) RefreshEnginesContainers(metaEngines map[string]*Engine) {
waitGroup := sync.WaitGroup{}
for _, engine := range metaEngines {
if engine.IsHealthy() {
waitGroup.Add(1)
go func(e *Engine) {
e.RefreshContainers()
waitGroup.Done()
}(engine)
}
}
waitGroup.Wait()
}
// GetGroupAllContainers is exported
func (cluster *Cluster) GetGroupAllContainers(groupid string) *types.GroupContainers {
metaEngines := make(map[string]*Engine)
groupMetaData := cluster.configCache.GetGroupMetaData(groupid)
for _, metaData := range groupMetaData {
if _, engines, err := cluster.GetMetaDataEngines(metaData.MetaID); err == nil {
for _, engine := range engines {
if engine.IsHealthy() && engine.HasMeta(metaData.MetaID) {
metaEngines[engine.IP] = engine
}
}
}
}
//cluster.RefreshEnginesContainers(metaEngines)
groupContainers := types.GroupContainers{}
for _, metaData := range groupMetaData {
if groupContainer := cluster.GetMetaEnginesContainers(metaData, metaEngines); groupContainer != nil {
groupContainers = append(groupContainers, groupContainer)
}
}
return &groupContainers
}
// GetGroupContainers is exported
func (cluster *Cluster) GetGroupContainers(metaid string) *types.GroupContainer {
metaData, engines, err := cluster.GetMetaDataEngines(metaid)
if err != nil {
return nil
}
metaEngines := make(map[string]*Engine)
for _, engine := range engines {
if engine.IsHealthy() && engine.HasMeta(metaid) {
metaEngines[engine.IP] = engine
}
}
//cluster.RefreshEnginesContainers(metaEngines)
return cluster.GetMetaEnginesContainers(metaData, metaEngines)
}
// GetGroup is exported
func (cluster *Cluster) GetGroup(groupid string) *Group {
cluster.RLock()
defer cluster.RUnlock()
group, ret := cluster.groups[groupid]
if !ret {
return nil
}
return group
}
// SetGroup is exported
func (cluster *Cluster) SetGroup(group *Group) {
nSize := len(group.Servers)
for i := 0; i < nSize; i++ {
group.Servers[i].Name = strings.ToUpper(group.Servers[i].Name)
}
addServers := []Server{}
removeServers := []Server{}
cluster.Lock()
pGroup, ret := cluster.groups[group.ID]
if !ret {
pGroup = group
cluster.groups[group.ID] = pGroup
logger.INFO("[#cluster#] group created %s %s (%d)", pGroup.ID, pGroup.Name, len(pGroup.Servers))
for _, server := range pGroup.Servers {
ipOrName := selectIPOrName(server.IP, server.Name)
if nodeData := cluster.nodeCache.Get(ipOrName); nodeData != nil {
addServers = append(addServers, server)
}
}
} else {
origins := pGroup.Servers
pGroup.Name = group.Name
pGroup.Location = group.Location
pGroup.Servers = group.Servers
pGroup.IsCluster = group.IsCluster
pGroup.ContactInfo = group.ContactInfo
logger.INFO("[#cluster#] group changed %s %s (%d)", pGroup.ID, pGroup.Name, len(pGroup.Servers))
for _, originServer := range origins {
found := false
for _, newServer := range group.Servers {
if ret := compareRemoveServers(cluster.nodeCache, originServer, newServer); ret {
found = true
break
}
}
if !found {
removeServers = append(removeServers, originServer)
}
}
for _, newServer := range group.Servers {
found := false
for _, originServer := range origins {
if ret := compareAddServers(cluster.nodeCache, originServer, newServer); ret {
found = true
break
}
}
if !found {
addServers = append(addServers, newServer)
}
}
}
cluster.Unlock()
for _, server := range removeServers {
if nodeData := cluster.nodeCache.Get(selectIPOrName(server.IP, server.Name)); nodeData != nil {
if ret := cluster.InGroupsContains(nodeData.IP, nodeData.Name); !ret {
logger.INFO("[#cluster#] group %s remove server to pendengines %s\t%s", pGroup.ID, server.IP, server.Name)
cluster.enginesPool.RemoveEngine(server.IP, server.Name)
} else {
// after recovery containers, need to clear migrator cache of meta container ?
// Migrator StartEngineContainers(groupid, engine)... ?
}
}
}
for _, server := range addServers {
logger.INFO("[#cluster#] group %s append server to pendengines %s\t%s", pGroup.ID, server.IP, server.Name)
cluster.enginesPool.AddEngine(server.IP, server.Name)
/*
if cluster is engine exists ? {
// Migrator CancelEngineContainers(groupid, engine)... // add to group, this group migrator cancel.
}
*/
}
}
// RemoveGroup is exported
func (cluster *Cluster) RemoveGroup(groupid string) bool {
engines := cluster.GetGroupEngines(groupid)
if engines == nil {
logger.WARN("[#cluster#] remove group %s not found.", groupid)
return false
}
// remove group migrator's all meta.
cluster.migtatorCache.RemoveGroup(groupid)
// get group all metaData and clean metaData containers.
wgroup := sync.WaitGroup{}
groupMetaData := cluster.configCache.GetGroupMetaData(groupid)
for _, metaData := range groupMetaData {
wgroup.Add(1)
go func(mdata *MetaData) {
cluster.removeContainers(mdata, "")
cluster.configCache.RemoveMetaData(mdata.MetaID)
cluster.submitHookEvent(mdata, RemoveMetaEvent)
wgroup.Done()
}(metaData)
}
wgroup.Wait()
// remove metadata and group to cluster.
cluster.configCache.RemoveGroupMetaData(groupid)
cluster.Lock()
delete(cluster.groups, groupid) // remove group
logger.INFO("[#cluster#] removed group %s", groupid)
cluster.Unlock()
// remove engine to engines pool.
for _, engine := range engines {
if engine.IsHealthy() {
if ret := cluster.InGroupsContains(engine.IP, engine.Name); !ret {
// engine does not belong to the any groups, remove to cluster.
logger.INFO("[#cluster#] group %s remove server to pendengines %s\t%s", groupid, engine.IP, engine.Name)
cluster.enginesPool.RemoveEngine(engine.IP, engine.Name)
}
}
}
return true
}
//SetServerNodeLabels is exported
func (cluster *Cluster) SetServerNodeLabels(server Server, labels map[string]string) error {
engine := searchServerOfStorage(server, cluster.storageDriver.NodeStorage)
if engine == nil {
return ErrClusterServerNotFound
}
err := cluster.storageDriver.NodeStorage.SetNodeLabels(engine.IP, labels)
if err == nil {
//update engine node-labels in memory
if engine = cluster.GetEngine(engine.IP); engine != nil {
originalLabels := engine.NodeLabelsPairs()
if !reflect.DeepEqual(originalLabels, labels) {
logger.INFO("[#cluster#] set %s(%s) node-labels, %+v", engine.IP, engine.Name, labels)
engine.SetNodeLabelsPairs(labels)
metaids := engine.MetaIds()
for _, metaid := range metaids {
if metaData := cluster.GetMetaData(metaid); metaData != nil {
if metaData.Placement.Constraints != nil && len(metaData.Placement.Constraints) > 0 {
logger.INFO("[#cluster#] meta %s enable available nodes changed.", metaid)
cluster.configCache.SetAvailableNodesChanged(metaid, true)
}
}
}
}
}
}
return err
}
func (cluster *Cluster) watchDiscoveryHandleFunc(added backends.Entries, removed backends.Entries, err error) {
if err != nil {
logger.ERROR("[#cluster#] discovery watch error:%s", err.Error())
return
}
if len(added) == 0 && len(removed) == 0 {
return
}
watchEngines := WatchEngines{}
logger.INFO("[#cluster#] discovery watch removed:%d added:%d.", len(removed), len(added))
for _, entry := range removed {
nodeData, err := deCodeEntry(entry)
if err != nil {
logger.ERROR("[#cluster#] discovery watch removed decode error: %s", err.Error())
continue
}
logger.INFO("[#cluster#] discovery watch, remove to pendengines %s\t%s", nodeData.IP, nodeData.Name)
if cluster.nodeCache.ContainsOtherKey(entry.Key, nodeData.IP) == false {
watchEngines = append(watchEngines, NewWatchEngine(nodeData.IP, nodeData.Name, StateDisconnected))
cluster.enginesPool.RemoveEngine(nodeData.IP, nodeData.Name)
}
cluster.nodeCache.Remove(entry.Key)
}
for _, entry := range added {
nodeData, err := deCodeEntry(entry)
if err != nil {
logger.ERROR("[#cluster#] discovery service watch added decode error: %s", err.Error())
continue
}
logger.INFO("[#cluster#] discovery watch, append to pendengines %s\t%s", nodeData.IP, nodeData.Name)
watchEngines = append(watchEngines, NewWatchEngine(nodeData.IP, nodeData.Name, StateHealthy))
cluster.nodeCache.Add(entry.Key, nodeData)
cluster.enginesPool.AddEngine(nodeData.IP, nodeData.Name)
}
cluster.NotifyGroupEnginesWatchEvent("cluster discovery some engines state changed.", watchEngines)
}
// OperateContainer is exported
func (cluster *Cluster) OperateContainer(containerid string, action string) (string, *types.OperatedContainers, error) {
metaData := cluster.configCache.GetMetaDataOfContainer(containerid)
if metaData == nil {
return "", nil, ErrClusterContainerNotFound
}
operatedContainers, err := cluster.OperateContainers(metaData.MetaID, containerid, action)
return metaData.MetaID, operatedContainers, err
}
// OperateContainers is exported
// if containerid is empty string so operate metaid's all containers
func (cluster *Cluster) OperateContainers(metaid string, containerid string, action string) (*types.OperatedContainers, error) {
metaData, engines, err := cluster.validateMetaData(metaid)
if err != nil {
logger.ERROR("[#cluster#] %s meta %s error, %s", action, metaid, err.Error())
return nil, err
}
foundContainer := false
operatedContainers := types.OperatedContainers{}
for _, engine := range engines {
if foundContainer {
break
}
containers := engine.Containers(metaData.MetaID)
for _, container := range containers {
if containerid == "" || container.Info.ID == containerid {
var err error
if engine.IsHealthy() {
if err = engine.OperateContainer(models.ContainerOperate{Action: action, Container: container.Info.ID}); err != nil {
logger.ERROR("[#cluster#] engine %s, %s container error:%s", engine.IP, action, err.Error())
}
} else {
err = fmt.Errorf("engine state is %s", engine.State())
}
operatedContainers = operatedContainers.SetOperatedPair(engine.IP, engine.Name, container.Info.ID, action, err)
}
if container.Info.ID == containerid {
foundContainer = true
break
}
}
}
cluster.submitHookEvent(metaData, OperateMetaEvent)
return &operatedContainers, nil
}
func (cluster *Cluster) actionContainers(action string, engineContainers map[string]*Engine) (string, error) {
for container, engine := range engineContainers {
if engine != nil {
operate := models.ContainerOperate{Action: action, Container: container}
if err := engine.OperateContainer(operate); err != nil {
return container, err
}
}
}
return "", nil
}
func (cluster *Cluster) upgradeContainers(metaData *MetaData, engines []*Engine, config models.Container) (*types.UpgradeContainers, error) {
priorities := NewEnginePriorities(metaData, engines)
logger.INFO("[#cluster#] upgrade %s containers, priorities %s", config.Name, priorities.EngineStrings())
engineContainers := map[string]*Engine{}
for _, baseConfig := range metaData.BaseConfigs {
var e *Engine
for _, engine := range engines {
if engine.IsHealthy() && engine.HasContainer(baseConfig.ID) {
e = engine
break
}
}
engineContainers[baseConfig.ID] = e
}
afterStop := false
if config.NetworkMode != "bridge" && config.NetworkMode != "nat" {
afterStop = true
}
if !afterStop {
for _, portBindings := range metaData.Config.Ports {
if portBindings.PublicPort != 0 {
afterStop = true
break
}
}
}
if afterStop { //after stop old tag containers.
if containerid, err := cluster.actionContainers("stop", engineContainers); err != nil {
cluster.configCache.RemoveContainerBaseConfig(metaData.MetaID, containerid)
delete(engineContainers, containerid)
logger.ERROR("[#cluster#] upgrade %s after-stop containers error, %s", metaData.MetaID, err.Error())
cluster.actionContainers("start", engineContainers) //restart old tag containers.
return nil, err
}
}
createdContainers, err := cluster.createContainers(metaData, metaData.Instances, priorities, config)
if err != nil {
for _, container := range createdContainers {
if engine := cluster.GetEngine(container.IP); engine != nil {
engine.RemoveContainer(container.ID)
}
}
if afterStop { //restart old tag containers.
cluster.actionContainers("start", engineContainers)
}
return nil, err
}
//remove old tag containers.
for container, engine := range engineContainers {
if engine != nil {
engine.RemoveContainer(container)
} else {
cluster.configCache.RemoveContainerBaseConfig(metaData.MetaID, container)
}
}
upgradeContainers := types.UpgradeContainers{}
for _, container := range createdContainers {
upgradeContainers = upgradeContainers.SetUpgradePair(container.IP, container.HostName, container.Container)
}
return &upgradeContainers, nil
}
// UpgradeContainers is exported
func (cluster *Cluster) UpgradeContainers(metaid string, imagetag string) (*types.UpgradeContainers, error) {
metaData, engines, err := cluster.validateMetaData(metaid)
if err != nil {
logger.ERROR("[#cluster#] upgrade meta %s error, %s", metaid, err.Error())
return nil, err
}
if metaData.ImageTag == imagetag {
return nil, fmt.Errorf("upgrade meta %s cancel, this tag has already in cluster", metaid)
}
config := metaData.Config
tagIndex := strings.LastIndex(config.Image, ":")
if tagIndex <= 0 {
return nil, fmt.Errorf("upgrade %s config tag invalid", metaid)
}
config.Image = config.Image[0:tagIndex] + ":" + imagetag
upgradeContainers, err := cluster.upgradeContainers(metaData, engines, config)
if err != nil {
return nil, fmt.Errorf("upgrade %s failure, %s", metaid, err.Error())
}
//save new tag to meta file.
cluster.configCache.SetImageTag(metaid, imagetag)
cluster.submitHookEvent(metaData, UpgradeMetaEvent)
return upgradeContainers, nil
}
// RemoveContainer is exported
func (cluster *Cluster) RemoveContainer(containerid string) (string, *types.RemovedContainers, error) {
metaData := cluster.configCache.GetMetaDataOfContainer(containerid)
if metaData == nil {
return "", nil, ErrClusterContainerNotFound
}
removedContainers, err := cluster.RemoveContainers(metaData.MetaID, containerid)
return metaData.MetaID, removedContainers, err
}
// RemoveContainersOfMetaName is exported
// remove meta's all containers
func (cluster *Cluster) RemoveContainersOfMetaName(groupid string, metaname string) (string, *types.RemovedContainers, error) {
metaData := cluster.configCache.GetMetaDataOfName(groupid, metaname)
if metaData == nil {
return "", nil, ErrClusterMetaDataNotFound
}
removedContainers, err := cluster.RemoveContainers(metaData.MetaID, "")
if err != nil {
return "", nil, err
}
return metaData.MetaID, removedContainers, nil
}
// RemoveContainers is exported
// if containerid is empty string so remove metaid's all containers
func (cluster *Cluster) RemoveContainers(metaid string, containerid string) (*types.RemovedContainers, error) {
metaData, _, err := cluster.validateMetaData(metaid)
if err != nil {
logger.ERROR("[#cluster#] remove meta %s error, %s", metaid, err.Error())
return nil, err
}
logger.INFO("[#cluster#] remove meta %s %s", metaid, containerid)
removedContainers := cluster.removeContainers(metaData, containerid)
cluster.submitHookEvent(metaData, RemoveMetaEvent)
metaData = cluster.configCache.GetMetaData(metaData.MetaID)
if metaData != nil {
if containerid == "" || len(metaData.BaseConfigs) == 0 {
cluster.configCache.RemoveMetaData(metaData.MetaID)
}
}
return removedContainers, nil
}
// RecoveryContainers is exported
func (cluster *Cluster) RecoveryContainers(metaid string) error {
metaData, engines, err := cluster.validateMetaData(metaid)
if err != nil {
return fmt.Errorf("recovery meta %s %s", metaid, err)
}
if !metaData.IsRecovery {
return fmt.Errorf("recovery meta %s is disabled", metaData.MetaID)
}
baseConfigs := cluster.configCache.GetMetaDataBaseConfigs(metaData.MetaID)
for _, baseConfig := range baseConfigs {
if baseConfig.ID != "" {
found := false
for _, engine := range engines {
if engine.IsHealthy() && engine.HasContainer(baseConfig.ID) {
found = true
break
}
}
if !found { //clean meta invalid container.
cluster.configCache.RemoveContainerBaseConfig(metaData.MetaID, baseConfig.ID)
logger.WARN("[#cluster#] recovery meta %s remove invalid container %s", metaData.MetaID, ShortContainerID(baseConfig.ID))
}
}
}
if len(engines) > 0 {
baseConfigsCount := cluster.configCache.GetMetaDataBaseConfigsCount(metaData.MetaID)
if baseConfigsCount != -1 && metaData.Instances != baseConfigsCount {
var err error
if metaData.Instances > baseConfigsCount {
_, err = cluster.createContainers(metaData, metaData.Instances-baseConfigsCount, nil, metaData.Config)
} else {
cluster.reduceContainers(metaData, baseConfigsCount-metaData.Instances)
}
cluster.submitHookEvent(metaData, RecoveryMetaEvent)
cluster.NotifyGroupMetaContainersEvent("Cluster Meta Containers Recovered.", err, metaData.MetaID)
}
}
return nil
}
// UpdateContainers is exported
func (cluster *Cluster) UpdateContainers(metaid string, instances int, webhooks types.WebHooks, placement types.Placement, config models.Container, updateOption types.UpdateOption) (*types.CreatedContainers, error) {
if instances < 0 {
logger.ERROR("[#cluster#] update meta %s error, %s", metaid, ErrClusterContainersInstancesInvalid)
return nil, ErrClusterContainersInstancesInvalid
}
metaData, engines, err := cluster.validateMetaData(metaid)
if err != nil {
logger.ERROR("[#cluster#] update meta %s error, %s", metaid, err.Error())
return nil, err
}
if config.Name == "" {
config = metaData.Config
}
originalConfig := metaData.Config
originalPlacement := metaData.Placement
imageTag := getImageTag(config.Image)
cluster.configCache.SetMetaData(metaid, instances, webhooks, placement, config, updateOption.IsRemoveDelay, updateOption.IsRecovery)
cluster.configCache.SetImageTag(metaid, imageTag)
metaData = cluster.configCache.GetMetaData(metaid)
if metaData == nil {
return nil, fmt.Errorf("update invalid meta %s", metaid)
}
if len(engines) > 0 {
originalInstances := len(metaData.BaseConfigs)
if instances == 0 { //clear containers.
logger.INFO("[#cluster#] update %s containers, reduce instances to %d.", config.Name, instances)
cluster.reduceContainers(metaData, originalInstances)
} else {
placementCompared := reflect.DeepEqual(originalPlacement, placement)
availableNodesChanged := metaData.AvailableNodesChanged
if !reflect.DeepEqual(originalConfig, config) || !placementCompared || availableNodesChanged {
//config or placement changed, re-create all containers.
logger.INFO("[#cluster#] update %s containers, re-create %d instances.", config.Name, instances)
var priorities *EnginePriorities
if originalInstances == instances && placementCompared && !availableNodesChanged {
priorities = NewEnginePriorities(metaData, engines)
logger.INFO("[#cluster#] update %s containers, priorities %s", config.Name, priorities.EngineStrings())
}
cluster.reduceContainers(metaData, originalInstances)
_, err = cluster.createContainers(metaData, instances, priorities, metaData.Config)
} else { //instances changed only.
if originalInstances < instances {
logger.INFO("[#cluster#] update %s containers, instances changed only, append %d instances.", config.Name, instances-originalInstances)
_, err = cluster.createContainers(metaData, instances-originalInstances, nil, metaData.Config)
} else {
logger.INFO("[#cluster#] update %s containers, instances changed only, reduce %d containers.", config.Name, originalInstances-instances)
cluster.reduceContainers(metaData, originalInstances-instances)
}
}
}
}
if metaData.AvailableNodesChanged {
cluster.configCache.SetAvailableNodesChanged(metaid, false)
logger.INFO("[#cluster#] meta %s disable available nodes changed.", metaid)
}
cluster.submitHookEvent(metaData, UpdateMetaEvent)
if err == nil {
createdContainers := types.CreatedContainers{}
for _, engine := range engines {
if engine.IsHealthy() {
containers := engine.Containers(metaData.MetaID)
for _, container := range containers {
createdContainers = createdContainers.SetCreatedPair(engine.IP, engine.Name, container.Config.Container)
}
}
}
return &createdContainers, nil
}
return nil, err
}
// CreateContainers is exported
func (cluster *Cluster) CreateContainers(groupid string, instances int, webhooks types.WebHooks, placement types.Placement, config models.Container, createOption types.CreateOption) (string, *types.CreatedContainers, error) {
if instances <= 0 {
return "", nil, ErrClusterContainersInstancesInvalid
}
group := cluster.GetGroup(groupid)
engines := cluster.GetGroupEngines(groupid)
if group == nil || engines == nil {
logger.ERROR("[#cluster#] create containers %s error, %s", config.Name, ErrClusterGroupNotFound)
return "", nil, ErrClusterGroupNotFound
}
if len(engines) == 0 {
logger.ERROR("[#cluster#] create containers %s error, %s", config.Name, ErrClusterNoEngineAvailable)
return "", nil, ErrClusterNoEngineAvailable
}
var (
metaID string
bCreate bool = true
)
if !createOption.IsReCreate {
if ret := cluster.cehckContainerNameUniqueness(groupid, config.Name); !ret {
logger.ERROR("[#cluster#] create containers %s error, %s", config.Name, ErrClusterCreateContainerNameConflict)
return "", nil, ErrClusterCreateContainerNameConflict
}
} else {
if metaData := cluster.configCache.GetMetaDataOfName(groupid, config.Name); metaData != nil {
if len(webhooks) == 0 {
webhooks = metaData.WebHooks
}
if createOption.ForceRemove {
cluster.RemoveContainers(metaData.MetaID, "")
time.Sleep(pendingWaitForInterval)
} else {
imageTag := getImageTag(config.Image)
if metaData.ImageTag == imageTag {
logger.WARN("[#cluster#] re-create %s containers %s tag %s eq.", metaData.MetaID, config.Name, imageTag)
for {
time.Sleep(pendingWaitForInterval)
if !cluster.containsPendingContainers(groupid, config.Name) {
break
}
logger.WARN("[#cluster#] re-create %s containers %s pending...", metaData.MetaID, config.Name)
}
cluster.RemoveContainers(metaData.MetaID, "")
time.Sleep(pendingWaitForInterval)
} else {
metaID = metaData.MetaID
bCreate = false
}
}
}
}
createdContainers := types.CreatedContainers{}
if bCreate {
metaData, err := cluster.configCache.CreateMetaData(groupid, instances, webhooks, placement, config, createOption.IsRemoveDelay, createOption.IsRecovery)
if err != nil {
if strings.Contains(err.Error(), "create meta conflict") {
newMetaID, containers, err := cluster.reCreateContainers(groupid, metaData.MetaID, instances, webhooks, placement, config, createOption)
if err != nil {
return "", nil, err
}
createdContainers = *containers
return newMetaID, &createdContainers, nil
}
logger.ERROR("[#cluster#] create containers %s error, %s", config.Name, ErrClusterContainersMetaCreateFailure)
return "", nil, ErrClusterContainersMetaCreateFailure
}
createdContainers, err = cluster.createContainers(metaData, instances, nil, config)
if len(createdContainers) == 0 {
cluster.configCache.RemoveMetaData(metaData.MetaID)
var resultErr string
if err != nil {
resultErr = err.Error()
}
logger.ERROR("[#cluster#] create containers %s error, %s", config.Name, ErrClusterCreateContainerFailure)
return "", nil, fmt.Errorf("%s, %s\n", ErrClusterCreateContainerFailure.Error(), resultErr)
}
metaID = metaData.MetaID
cluster.submitHookEvent(metaData, CreateMetaEvent)
} else {
newMetaID, containers, err := cluster.reCreateContainers(groupid, metaID, instances, webhooks, placement, config, createOption)
if err != nil {
return "", nil, err
}
metaID = newMetaID
createdContainers = *containers
}
return metaID, &createdContainers, nil
}
// reContainers is exported
func (cluster *Cluster) reCreateContainers(groupid string, metaID string, instances int, webhooks types.WebHooks, placement types.Placement, config models.Container, createOption types.CreateOption) (string, *types.CreatedContainers, error) {
retries := cluster.createRetry
RECREATE:
for {
if !cluster.containsPendingContainers(groupid, config.Name) {
break
}
time.Sleep(pendingWaitForInterval)
logger.WARN("[#cluster#] re-create %s containers %s pending...", metaID, config.Name)
}
updateOption := types.UpdateOption{
IsRemoveDelay: createOption.IsRemoveDelay,
IsRecovery: createOption.IsRecovery,
}
containers, err := cluster.UpdateContainers(metaID, instances, webhooks, placement, config, updateOption)
if err != nil || len(*containers) == 0 {
if err != nil {
if strings.Contains(err.Error(), "cluster containers state is setting") {
retries = retries - 1
if retries > 0 {
time.Sleep(pendingWaitForInterval)
goto RECREATE
}
} else if strings.Contains(err.Error(), "cluster metadata not found") {
retries = retries - 1
if retries > 0 {
pMetaData := cluster.configCache.GetMetaDataOfName(groupid, config.Name)
if pMetaData != nil && pMetaData.MetaID != metaID {
metaID = pMetaData.MetaID
time.Sleep(pendingWaitForInterval)
goto RECREATE
}
}
}
}
logger.ERROR("[#cluster#] re-create %s containers %s error, %s", metaID, config.Name, ErrClusterCreateContainerFailure)
return "", nil, ErrClusterCreateContainerFailure
}
return metaID, containers, nil
}
// reduceContainers is exported
func (cluster *Cluster) reduceContainers(metaData *MetaData, instances int) {
cluster.Lock()
cluster.pendingContainers[metaData.Config.Name] = &pendingContainer{
GroupID: metaData.GroupID,
Name: metaData.Config.Name,
Config: metaData.Config,
}
cluster.Unlock()
for ; instances > 0; instances-- {
if _, _, err := cluster.reduceContainer(metaData); err != nil {
logger.ERROR("[#cluster#] reduce container %s, error:%s", metaData.Config.Name, err.Error())
}
}
cluster.Lock()
delete(cluster.pendingContainers, metaData.Config.Name)
cluster.Unlock()
}
// reduceContainer is exported
func (cluster *Cluster) reduceContainer(metaData *MetaData) (*Engine, *Container, error) {
engines := cluster.GetGroupEngines(metaData.GroupID)
if engines == nil || len(engines) == 0 {
return nil, nil, ErrClusterNoEngineAvailable
}
reduceEngines := selectReduceEngines(metaData.MetaID, engines)
if len(reduceEngines) == 0 {
return nil, nil, ErrClusterNoEngineAvailable
}
sort.Sort(reduceEngines)
reduceEngine := reduceEngines[0]
engine := reduceEngine.Engine()
container := reduceEngine.ReduceContainer()
if err := engine.RemoveContainer(container.Info.ID); err != nil {
return nil, nil, err
}
return engine, container, nil
}
// removeContainers is exported
func (cluster *Cluster) removeContainers(metaData *MetaData, containerid string) *types.RemovedContainers {
cluster.Lock()
cluster.pendingContainers[metaData.Config.Name] = &pendingContainer{
GroupID: metaData.GroupID,
Name: metaData.Config.Name,
Config: metaData.Config,
}
cluster.Unlock()
removedContainers := types.RemovedContainers{}
if engines := cluster.GetGroupEngines(metaData.GroupID); engines != nil {
foundContainer := false
for _, engine := range engines {
if foundContainer {
break
}
containers := engine.Containers(metaData.MetaID)
for _, container := range containers {
if containerid == "" || container.Info.ID == containerid {
var err error
if engine.IsHealthy() {
if err = engine.RemoveContainer(container.Info.ID); err != nil {
logger.ERROR("[#cluster#] engine %s, remove container error:%s", engine.IP, err.Error())
}
} else {
err = fmt.Errorf("engine state is %s", engine.State())
}
removedContainers = removedContainers.SetRemovedPair(engine.IP, engine.Name, container.Info.ID, err)
}
if container.Info.ID == containerid {
foundContainer = true
break
}
}
}
}
cluster.Lock()
delete(cluster.pendingContainers, metaData.Config.Name)
cluster.Unlock()
return &removedContainers
}
// createContainers is exported
func (cluster *Cluster) createContainers(metaData *MetaData, instances int, priorities *EnginePriorities, config models.Container) (types.CreatedContainers, error) {
cluster.Lock()
cluster.pendingContainers[config.Name] = &pendingContainer{
GroupID: metaData.GroupID,
Name: config.Name,
Config: config,
}
cluster.Unlock()
var resultErr error
createdContainers := types.CreatedContainers{}
filter := NewEnginesFilter()
for ; instances > 0; instances-- {
index := cluster.configCache.MakeContainerIdleIndex(metaData.MetaID)
if index < 0 {
continue
}
indexStr := strconv.Itoa(index)
containerConfig := config
containerConfig.Name = "CLUSTER-" + metaData.GroupID[:8] + "-" + containerConfig.Name + "-" + indexStr
containerConfig.Env = append(containerConfig.Env, "HUMPBACK_CLUSTER_GROUPID="+metaData.GroupID)
containerConfig.Env = append(containerConfig.Env, "HUMPBACK_CLUSTER_METAID="+metaData.MetaID)
containerConfig.Env = append(containerConfig.Env, "HUMPBACK_CLUSTER_CONTAINER_INDEX="+indexStr)
containerConfig.Env = append(containerConfig.Env, "HUMPBACK_CLUSTER_CONTAINER_ORIGINALNAME="+containerConfig.Name)
if cluster.Location != "" {
containerConfig.Env = append(containerConfig.Env, "HUMPBACK_CLUSTER_LOCATION="+cluster.Location)
}
engine, container, err := cluster.createContainer(metaData, filter, priorities, containerConfig)
if err != nil {
if err == ErrClusterNoEngineAvailable || strings.Index(err.Error(), " not found") >= 0 {
resultErr = err
logger.ERROR("[#cluster#] create container %s, error:%s", containerConfig.Name, err.Error())
continue
}
logger.ERROR("[#cluster#] engine %s, create container %s, error:%s", engine.IP, containerConfig.Name, err.Error())
var retries int64
for ; retries < cluster.createRetry && err != nil; retries++ {
engine, container, err = cluster.createContainer(metaData, filter, nil, containerConfig)
}
if err != nil {
resultErr = err
if err == ErrClusterNoEngineAvailable {
logger.ERROR("[#cluster#] create container %s, error:%s", containerConfig.Name, err.Error())
} else {
logger.ERROR("[#cluster#] engine %s, create container %s, error:%s", engine.IP, containerConfig.Name, err.Error())
}
continue
}
}
createdContainers = createdContainers.SetCreatedPair(engine.IP, engine.Name, container.Config.Container)
}
cluster.Lock()
delete(cluster.pendingContainers, config.Name)
cluster.Unlock()
return createdContainers, resultErr
}
// createContainer is exported
func (cluster *Cluster) createContainer(metaData *MetaData, filter *EnginesFilter, priorities *EnginePriorities, config models.Container) (*Engine, *Container, error) {
engines := cluster.GetGroupEngines(metaData.GroupID)
if engines == nil || len(engines) == 0 {
return nil, nil, ErrClusterNoEngineAvailable
}
for _, engine := range engines {
if engine.IsHealthy() && engine.HasMeta(metaData.MetaID) {
filter.SetAllocEngine(engine)
}
}
var engine *Engine
if priorities != nil {
engine = priorities.Select()
}
if engine == nil {
selectEngines := cluster.selectEngines(engines, filter, config)
if len(selectEngines) == 0 {
return nil, nil, ErrClusterNoEngineAvailable
}
selectEngines = cluster.selectPlacementEngines(selectEngines, filter, &metaData.Placement)
if len(selectEngines) == 0 {
return nil, nil, ErrClusterNoEngineAvailable
}
engine = selectEngines[0]
}
container, err := engine.CreateContainer(config)
if err != nil {
filter.SetFailEngine(engine)
return engine, nil, err
}
return engine, container, nil
}
// selectEngines is exported
func (cluster *Cluster) selectEngines(engines []*Engine, filter *EnginesFilter, config models.Container) []*Engine {
selectEngines := []*Engine{}
for _, engine := range engines {
if engine.IsHealthy() {
selectEngines = append(selectEngines, engine)
}
}
if len(selectEngines) == 0 {
return selectEngines //return empty engines
}
weightedEngines := selectWeightdEngines(selectEngines, config)
if len(weightedEngines) > 0 {
sort.Sort(weightedEngines)
selectEngines = weightedEngines.Engines()
}
if len(selectEngines) > 0 {
filterEngines := filter.Filter(selectEngines)
if len(filterEngines) > 0 {
selectEngines = filterEngines
} else {
filterEngines = filter.AllocEngines()
if len(filterEngines) > 0 {
selectEngines = filterEngines
}
for i := len(selectEngines) - 1; i > 0; i-- {
j := cluster.randSeed.Intn(i + 1)
selectEngines[i], selectEngines[j] = selectEngines[j], selectEngines[i]
}
}
}
return selectEngines
}
// selectPlacementEngines is exported
func (cluster *Cluster) selectPlacementEngines(engines []*Engine, filter *EnginesFilter, placement *types.Placement) []*Engine {
selectEngines := []*Engine{}
if placement.Constraints != nil && len(placement.Constraints) > 0 {
constraints, err := ParseConstraints(placement.Constraints)
if err != nil {
logger.ERROR("[#cluster#] placement engines error, %s", err.Error())
return selectEngines //return empty engines
}
for _, engine := range engines {
ret := MatchConstraints(constraints, engine)
if ret {
selectEngines = append(selectEngines, engine)
logger.INFO("[#cluster#] placement engines, %s(%s)", engine.IP, engine.Name)
} else {
logger.INFO("[#cluster#] placement engines filter, %s(%s)", engine.IP, engine.Name)
}
}
if len(selectEngines) == 0 {
filterEngines := filter.AllocEngines()
if len(filterEngines) > 0 {
logger.INFO("[#cluster#] placement select alloc engines")
selectEngines = filterEngines
} else {
//last retry fail! select a fail engine, throw cretae error,
//prevent return cluster no engine available error.
logger.INFO("[#cluster#] placement select fail engines")
selectEngines = filter.FailEngines()
}
for i := len(selectEngines) - 1; i > 0; i-- {
j := cluster.randSeed.Intn(i + 1)
selectEngines[i], selectEngines[j] = selectEngines[j], selectEngines[i]
}
}
} else {
selectEngines = engines
logger.INFO("[#cluster#] skip placement engines.")
}
return selectEngines
}
// containsPendingContainers is exported
func (cluster *Cluster) containsPendingContainers(groupid string, name string) bool {
cluster.RLock()
defer cluster.RUnlock()
for _, pendingContainer := range cluster.pendingContainers {
if pendingContainer.GroupID == groupid && pendingContainer.Name == name {
return true
}
}
return false
}
// cehckContainerNameUniqueness is exported
func (cluster *Cluster) cehckContainerNameUniqueness(groupid string, name string) bool {
if ret := cluster.containsPendingContainers(groupid, name); ret {
return false
}
metaData := cluster.configCache.GetMetaDataOfName(groupid, name)
if metaData != nil {
return false
}
return true
}
// validateMetaData is exported
func (cluster *Cluster) validateMetaData(metaid string) (*MetaData, []*Engine, error) {
metaData, engines, err := cluster.GetMetaDataEngines(metaid)
if err != nil {
return nil, nil, err
}
if ret := cluster.migtatorCache.Contains(metaData.MetaID); ret {
return nil, nil, ErrClusterContainersMigrating
}
if ret := cluster.containsPendingContainers(metaData.GroupID, metaData.Config.Name); ret {
return nil, nil, ErrClusterContainersSetting
}
return metaData, engines, nil
}
func (cluster *Cluster) submitHookEvent(metaData *MetaData, hookEvent HookEvent) {
if len(metaData.WebHooks) == 0 {
return
}
hookContainers := HookContainers{}
engines := cluster.GetGroupEngines(metaData.GroupID)
for _, engine := range engines {
if engine.IsHealthy() {
containers := engine.Containers(metaData.MetaID)
for _, container := range containers {
hookContainers = append(hookContainers, &HookContainer{
IP: engine.IP,
Name: engine.Name,
Container: container.Config.Container,
})
}
}
}
cluster.hooksProcessor.SubmitHook(metaData.MetaBase, hookContainers, hookEvent)
}
func (cluster *Cluster) recoveryContainersLoop() {
for {
ticker := time.NewTicker(cluster.recoveryInterval)
select {
case <-ticker.C:
{
ticker.Stop()
metaids := []string{}
metaEngines := make(map[string]*Engine)
groups := cluster.GetGroups()
for _, group := range groups {
groupMetaData := cluster.configCache.GetGroupMetaData(group.ID)
for _, metaData := range groupMetaData {
metaids = append(metaids, metaData.MetaID)
if _, engines, err := cluster.GetMetaDataEngines(metaData.MetaID); err == nil {
for _, engine := range engines {
if engine.IsHealthy() && engine.HasMeta(metaData.MetaID) {
metaEngines[engine.IP] = engine
}
}
}
}
}
if len(metaids) > 0 {
cluster.RefreshEnginesContainers(metaEngines)
for _, metaid := range metaids {
if err := cluster.RecoveryContainers(metaid); err != nil {
logger.ERROR("[#cluster#] recovery containers error, %s", err.Error())
}
}
}
}
case <-cluster.stopCh:
{
ticker.Stop()
return
}
}
}
}
Go
1
https://gitee.com/humpbacks/humpback-center.git
git@gitee.com:humpbacks/humpback-center.git
humpbacks
humpback-center
humpback-center
b392570bc023

搜索帮助