37 Star 411 Fork 76

GVPrancher/rancher

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
controller_multiclusterapp.go 20.19 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638
package multiclusterapp
import (
"context"
"fmt"
"github.com/rancher/types/user"
"reflect"
"strings"
"time"
"github.com/rancher/rancher/pkg/clustermanager"
"github.com/rancher/rancher/pkg/controllers/management/globalnamespacerbac"
"github.com/rancher/rancher/pkg/ref"
"github.com/sirupsen/logrus"
"github.com/rancher/rancher/pkg/namespace"
"github.com/rancher/types/apis/management.cattle.io/v3"
pv3 "github.com/rancher/types/apis/project.cattle.io/v3"
"github.com/rancher/types/config"
"golang.org/x/sync/errgroup"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)
const (
globalScopeAnswersKey = "global"
creatorIDAnn = "field.cattle.io/creatorId"
MultiClusterAppIDSelector = "mcapp"
mcAppLabel = "io.cattle.field/multiClusterAppId"
)
type MCAppManager struct {
apps pv3.AppInterface
appLister pv3.AppLister
multiClusterApps v3.MultiClusterAppInterface
multiClusterAppRevisions v3.MultiClusterAppRevisionInterface
multiClusterAppRevisionLister v3.MultiClusterAppRevisionLister
templateVersionLister v3.CatalogTemplateVersionLister
projectLister v3.ProjectLister
clusterLister v3.ClusterLister
userManager user.Manager
ctx context.Context
}
func StartMCAppManagementController(ctx context.Context, mgmt *config.ManagementContext, clusterManager *clustermanager.Manager) {
management := mgmt.Management
mcApps := management.MultiClusterApps("")
m := &MCAppManager{
ctx: ctx,
apps: mgmt.Project.Apps(""),
appLister: mgmt.Project.Apps("").Controller().Lister(),
multiClusterApps: mcApps,
multiClusterAppRevisions: management.MultiClusterAppRevisions(""),
multiClusterAppRevisionLister: management.MultiClusterAppRevisions("").Controller().Lister(),
projectLister: management.Projects("").Controller().Lister(),
clusterLister: management.Clusters("").Controller().Lister(),
templateVersionLister: management.CatalogTemplateVersions("").Controller().Lister(),
userManager: mgmt.UserManager,
}
mcAppTickerData = map[string]*IntervalData{}
m.multiClusterApps.AddHandler(ctx, "multi-cluster-app-controller", m.sync)
}
func (m *MCAppManager) sync(key string, mcapp *v3.MultiClusterApp) (runtime.Object, error) {
if mcapp == nil || mcapp.DeletionTimestamp != nil {
_, mcappName, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return mcapp, err
}
deleteContext(mcappName)
return m.deleteApps(mcappName, mcapp)
}
var creatorID string
// creatorID is the username of the service account created for this multiclusterapp
systemUser, err := m.userManager.EnsureUser(fmt.Sprintf("system://%s", mcapp.Name), "System account for Multiclusterapp "+mcapp.Name)
if err != nil {
return nil, err
}
creatorID = systemUser.Name
answerMap, err := m.createAnswerMap(mcapp.Spec.Answers)
if err != nil {
return mcapp, err
}
externalID, mcapp, err := m.getExternalID(mcapp)
if err != nil {
return mcapp, err
}
mcapp = mcapp.DeepCopy()
if err := m.reconcileTargetsForDelete(mcapp); err != nil {
return mcapp, err
}
changed, err := m.isChanged(mcapp)
if err != nil {
return mcapp, err
}
toUpdate := false
if changed {
toUpdate, err = m.toUpdate(mcapp)
if err != nil {
return mcapp, err
}
}
batchSize := len(mcapp.Spec.Targets)
if toUpdate && mcapp.Spec.UpgradeStrategy.RollingUpdate != nil {
if mcapp.Spec.UpgradeStrategy.RollingUpdate.Interval != 0 {
batchSize = mcapp.Spec.UpgradeStrategy.RollingUpdate.BatchSize
}
}
resp, err := m.createApps(mcapp, externalID, answerMap, creatorID, batchSize, toUpdate)
if err != nil {
return resp.object, err
}
if !changed {
if mcapp.Status.RevisionName == "" {
return m.setRevisionAndUpdate(mcapp, creatorID)
}
return mcapp, nil
}
if resp.count == len(mcapp.Spec.Targets) && v3.MultiClusterAppConditionInstalled.IsUnknown(mcapp) &&
v3.MultiClusterAppConditionInstalled.GetMessage(mcapp) == "upgrading" {
deleteContext(mcapp.Name)
return m.setRevisionAndUpdate(mcapp, creatorID)
}
if !toUpdate || resp.remaining <= 0 {
return mcapp, nil
}
for i, app := range resp.updateApps {
if _, err := m.updateApp(app, answerMap, externalID, resp.projects[i]); err != nil {
return mcapp, err
}
resp.remaining--
if resp.remaining == 0 {
break
}
}
setInstalledUnknown(mcapp)
upd, err := m.updateCondition(mcapp, setInstalledUnknown)
if err != nil {
return mcapp, err
}
storeContext(m.ctx, mcapp, m.multiClusterApps)
return upd, err
}
type Response struct {
object *v3.MultiClusterApp
projects []string
updateApps []*pv3.App
remaining int
count int
}
func (m *MCAppManager) createApps(mcapp *v3.MultiClusterApp, externalID string, answerMap map[string]map[string]string,
creatorID string, batchSize int, toUpdate bool) (*Response, error) {
var mcappToUpdate *v3.MultiClusterApp
var updateApps []*pv3.App
var projects []string
ann := map[string]string{
creatorIDAnn: creatorID,
}
set := labels.Set(map[string]string{MultiClusterAppIDSelector: mcapp.Name})
resp := &Response{object: mcapp}
updateBatchSize := batchSize
count := 0
// for all targets, create the App{} instance, so that helm controller App lifecycle can pick it up
// only one app per project named mcapp-{{mcapp.Name}}
for ind, t := range mcapp.Spec.Targets {
split := strings.SplitN(t.ProjectName, ":", 2)
if len(split) != 2 {
return resp, fmt.Errorf("error in splitting project ID %v", t.ProjectName)
}
projectNS := split[1]
// check if this app already exists
if t.AppName != "" {
app, err := m.appLister.Get(projectNS, t.AppName)
if err != nil || app == nil {
return resp, fmt.Errorf("error %v getting app %s in %s", err, t.AppName, projectNS)
}
if val, ok := app.Labels[MultiClusterAppIDSelector]; !ok || val != mcapp.Name {
return resp, fmt.Errorf("app %s in %s missing multi cluster app label", t.AppName, projectNS)
}
appUpdated := false
if app.Spec.ExternalID == externalID {
if reflect.DeepEqual(app.Spec.Answers, getAnswerMap(answerMap, t.ProjectName)) {
appUpdated = true
}
}
if appUpdated {
count++
if !pv3.AppConditionInstalled.IsTrue(app) || !pv3.AppConditionDeployed.IsTrue(app) {
toUpdate = false
updateApps = []*pv3.App{}
}
continue
}
if toUpdate && updateBatchSize > 0 {
updateApps = append(updateApps, app)
projects = append(projects, t.ProjectName)
updateBatchSize--
}
continue
}
if batchSize > 0 {
appName, mcapp, err := m.createApp(mcapp, answerMap, ann, set, projectNS, creatorID, externalID, t.ProjectName)
if err != nil {
return resp, fmt.Errorf("error %v in creating multiclusterapp: %v", err, mcapp)
}
if appName != "" {
if mcappToUpdate == nil {
mcappToUpdate = mcapp.DeepCopy()
}
mcappToUpdate.Spec.Targets[ind].AppName = appName
batchSize--
count++
}
}
}
if mcappToUpdate != nil && !reflect.DeepEqual(mcapp, mcappToUpdate) {
upd, err := m.multiClusterApps.Update(mcappToUpdate)
if err != nil {
resp.object = mcappToUpdate
return resp, err
}
resp.object = upd
}
resp.updateApps = updateApps
resp.projects = projects
resp.count = count
resp.remaining = batchSize
return resp, nil
}
func (m *MCAppManager) updateApp(app *pv3.App, answerMap map[string]map[string]string, externalID string, projectName string) (*pv3.App, error) {
app = app.DeepCopy()
app.Spec.Answers = getAnswerMap(answerMap, projectName)
app.Spec.ExternalID = externalID
updatedObj, err := m.apps.Update(app)
if err != nil && apierrors.IsConflict(err) {
_, projectNS := ref.Parse(projectName)
for i := 0; i < 5; i++ {
latestObj, err := m.apps.GetNamespaced(projectNS, app.Name, metav1.GetOptions{})
if err != nil {
return latestObj, err
}
latestToUpdate := latestObj.DeepCopy()
latestToUpdate.Spec.Answers = getAnswerMap(answerMap, projectName)
latestToUpdate.Spec.ExternalID = externalID
updated, err := m.apps.Update(latestToUpdate)
if err != nil && apierrors.IsConflict(err) {
time.Sleep(5 * time.Millisecond)
continue
}
return updated, err
}
return app, err
}
return updatedObj, err
}
func (m *MCAppManager) createRevision(mcapp *v3.MultiClusterApp, creatorID string) (*v3.MultiClusterAppRevision, error) {
ownerReference := metav1.OwnerReference{
APIVersion: "management.cattle.io/v3",
Kind: globalnamespacerbac.MultiClusterAppResource,
Name: mcapp.Name,
UID: mcapp.UID,
}
revision := &v3.MultiClusterAppRevision{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
creatorIDAnn: creatorID,
},
OwnerReferences: []metav1.OwnerReference{ownerReference},
},
}
revision.GenerateName = "mcapprevision-"
revision.Labels = map[string]string{
mcAppLabel: mcapp.Name,
}
revision.Answers = mcapp.Spec.Answers
revision.TemplateVersionName = mcapp.Spec.TemplateVersionName
revision.Namespace = namespace.GlobalNamespace
return m.multiClusterAppRevisions.Create(revision)
}
func (m *MCAppManager) setRevisionAndUpdate(mcapp *v3.MultiClusterApp, creatorID string) (*v3.MultiClusterApp, error) {
latestMcApp, err := m.multiClusterApps.GetNamespaced(namespace.GlobalNamespace, mcapp.Name, metav1.GetOptions{})
if err != nil {
return mcapp, err
}
if latestMcApp.Status.RevisionName != "" {
currRevision, err := m.multiClusterAppRevisionLister.Get(namespace.GlobalNamespace, latestMcApp.Status.RevisionName)
if err != nil {
return mcapp, err
}
if currRevision.TemplateVersionName == mcapp.Spec.TemplateVersionName &&
reflect.DeepEqual(currRevision.Answers, mcapp.Spec.Answers) {
return mcapp, nil
}
mcapp = latestMcApp
}
setInstalledDone(mcapp)
rev, err := m.createRevision(mcapp, creatorID)
if err != nil {
return mcapp, err
}
mcapp.Status.RevisionName = rev.Name
return m.updateCondition(mcapp, setInstalledDone)
}
func (m *MCAppManager) isChanged(mcapp *v3.MultiClusterApp) (bool, error) {
if mcapp.Status.RevisionName == "" {
return false, nil
}
mcappRevision, err := m.multiClusterAppRevisionLister.Get(namespace.GlobalNamespace, mcapp.Status.RevisionName)
if err != nil {
return false, err
}
if mcapp.Spec.TemplateVersionName != mcappRevision.TemplateVersionName {
return true, nil
}
if !reflect.DeepEqual(mcapp.Spec.Answers, mcappRevision.Answers) {
return true, nil
}
return false, nil
}
func (m *MCAppManager) toUpdate(mcapp *v3.MultiClusterApp) (bool, error) {
if v3.MultiClusterAppConditionInstalled.IsUnknown(mcapp) && v3.MultiClusterAppConditionInstalled.GetMessage(mcapp) == "upgrading" {
lastUpdated, err := time.Parse(time.RFC3339, v3.MultiClusterAppConditionInstalled.GetLastUpdated(mcapp))
if err != nil {
return false, err
}
interval := 0
if mcapp.Spec.UpgradeStrategy.RollingUpdate != nil {
interval = mcapp.Spec.UpgradeStrategy.RollingUpdate.Interval
}
if time.Since(lastUpdated) < time.Duration(interval)*time.Second {
return false, nil
}
}
return true, nil
}
func (m *MCAppManager) createApp(mcapp *v3.MultiClusterApp, answerMap map[string]map[string]string, ann map[string]string,
set map[string]string, projectNS string, creatorID string, externalID string, projectName string) (string, *v3.MultiClusterApp, error) {
nsName := getAppNamespaceName(mcapp.Name, projectNS)
app, err := m.appLister.Get(projectNS, nsName)
if err != nil {
if !apierrors.IsNotFound(err) {
return "", mcapp, err
}
toCreate := pv3.App{
ObjectMeta: metav1.ObjectMeta{
Name: nsName,
Namespace: projectNS,
Annotations: ann,
Labels: set,
},
Spec: pv3.AppSpec{
ProjectName: projectName,
TargetNamespace: nsName,
ExternalID: externalID,
MultiClusterAppName: mcapp.Name,
Answers: getAnswerMap(answerMap, projectName),
},
}
// Now create the App instance
app, err = m.apps.Create(&toCreate)
if err != nil && !apierrors.IsAlreadyExists(err) {
return "", mcapp, err
}
}
return app.Name, mcapp, nil
}
func getAnswerMap(answerMap map[string]map[string]string, projectName string) map[string]string {
// find answers for this project, if not found then try finding for the cluster this project belongs to, else finally use the global scoped answer
answers := map[string]string{}
if len(answerMap) > 0 {
if ans, ok := answerMap[projectName]; ok {
return ans
}
// find the answers for the cluster of this project
split := strings.SplitN(projectName, ":", 2)
clusterName := split[0]
if ans, ok := answerMap[clusterName]; ok {
return ans
}
if ans, ok := answerMap[globalScopeAnswersKey]; ok {
return ans
}
}
return answers
}
// deleteApps finds all apps created by this multiclusterapp and deletes them
func (m *MCAppManager) deleteApps(mcAppName string, mcapp *v3.MultiClusterApp) (runtime.Object, error) {
// get all apps with label "multiClusterAppId" = name of this app
appsToDelete := []*pv3.App{}
set := labels.Set(map[string]string{MultiClusterAppIDSelector: mcAppName})
var err error
if mcapp == nil {
appsToDelete, err = m.getAllApps(mcAppName)
if err != nil {
return nil, err
}
} else {
for _, t := range mcapp.Spec.Targets {
split := strings.SplitN(t.ProjectName, ":", 2)
if len(split) != 2 {
return mcapp, fmt.Errorf("error in splitting project ID %v", t.ProjectName)
}
projectNS := split[1]
apps, err := m.appLister.List(projectNS, set.AsSelector())
if err != nil {
return nil, err
}
appsToDelete = append(appsToDelete, apps...)
}
}
if err := m.delete(appsToDelete); err != nil {
return nil, err
}
return nil, nil
}
func (m *MCAppManager) getAllApps(mcAppName string) ([]*pv3.App, error) {
// to get all apps, get all clusters first, then get all apps in all projects of all clusters
allApps := []*pv3.App{}
set := labels.Set(map[string]string{MultiClusterAppIDSelector: mcAppName})
clusters, err := m.clusterLister.List("", labels.NewSelector())
if err != nil {
return allApps, err
}
for _, c := range clusters {
projects, err := m.projectLister.List(c.Name, labels.NewSelector())
if err != nil {
return allApps, err
}
for _, p := range projects {
apps, err := m.appLister.List(p.Name, set.AsSelector())
if err != nil {
return allApps, err
}
allApps = append(allApps, apps...)
}
}
return allApps, err
}
func (m *MCAppManager) reconcileTargetsForDelete(mcapp *v3.MultiClusterApp) error {
existingApps := map[string]bool{}
set := labels.Set(map[string]string{MultiClusterAppIDSelector: mcapp.Name})
for _, t := range mcapp.Spec.Targets {
split := strings.SplitN(t.ProjectName, ":", 2)
if len(split) != 2 {
return fmt.Errorf("error in splitting project ID %v", t.ProjectName)
}
projectNS := split[1]
apps, err := m.appLister.List(projectNS, set.AsSelector())
if err != nil {
return err
}
for _, app := range apps {
existingApps[app.Namespace] = true
}
}
allApps, err := m.getAllApps(mcapp.Name)
if err != nil {
return err
}
toDelete := []*pv3.App{}
for _, app := range allApps {
if _, ok := existingApps[app.Namespace]; !ok {
toDelete = append(toDelete, app)
}
}
if len(toDelete) > 0 {
logrus.Debugf("deleting apps for mcapp %s toDelete %v", mcapp.Name, toDelete)
}
return m.delete(toDelete)
}
func (m *MCAppManager) delete(appsToDelete []*pv3.App) error {
var g errgroup.Group
for ind := range appsToDelete {
app := appsToDelete[ind]
g.Go(func() error {
if err := m.apps.DeleteNamespaced(app.Namespace, app.Name, &metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return err
}
return nil
})
}
if err := g.Wait(); err != nil {
return err
}
return nil
}
func (m *MCAppManager) updateCondition(mcappToUpdate *v3.MultiClusterApp, setCondition func(mcapp *v3.MultiClusterApp)) (*v3.MultiClusterApp, error) {
updatedObj, err := m.multiClusterApps.Update(mcappToUpdate)
if err != nil && apierrors.IsConflict(err) {
// retry 5 times
for i := 0; i < 5; i++ {
latestMcApp, err := m.multiClusterApps.GetNamespaced(namespace.GlobalNamespace, mcappToUpdate.Name, metav1.GetOptions{})
if err != nil {
return latestMcApp, err
}
latestToUpdate := latestMcApp.DeepCopy()
for ind, t := range mcappToUpdate.Spec.Targets {
if t.AppName != "" {
latestToUpdate.Spec.Targets[ind].AppName = t.AppName
}
}
latestToUpdate.Status.RevisionName = mcappToUpdate.Status.RevisionName
setCondition(latestToUpdate)
updatedMcApp, err := m.multiClusterApps.Update(latestToUpdate)
if err != nil && apierrors.IsConflict(err) {
time.Sleep(5 * time.Millisecond)
continue
}
return updatedMcApp, err
}
return mcappToUpdate, err
}
return updatedObj, err
}
func setInstalledUnknown(mcapp *v3.MultiClusterApp) {
v3.MultiClusterAppConditionInstalled.Unknown(mcapp)
v3.MultiClusterAppConditionInstalled.Message(mcapp, "upgrading")
v3.MultiClusterAppConditionInstalled.LastUpdated(mcapp, time.Now().Format(time.RFC3339))
}
func setInstalledDone(mcapp *v3.MultiClusterApp) {
v3.MultiClusterAppConditionInstalled.True(mcapp)
v3.MultiClusterAppConditionInstalled.Message(mcapp, "")
}
func (m *MCAppManager) createAnswerMap(answers []v3.Answer) (map[string]map[string]string, error) {
// create a map, where key is the projectID or clusterID, or "global" if neither is provided, and value is the actual answer values
// Global scoped answers will have all questions. Project/cluster scoped will only have override keys. So we'll first create a global map,
// and then merge with project/cluster map
answerMap := make(map[string]map[string]string)
globalAnswersMap := make(map[string]string)
for _, a := range answers {
if a.ProjectName == "" && a.ClusterName == "" {
globalAnswersMap = a.Values
answerMap[globalScopeAnswersKey] = make(map[string]string)
answerMap[globalScopeAnswersKey] = a.Values
}
}
for _, a := range answers {
if a.ClusterName != "" {
// Using k8s labels.Merge, since by definition:
// Merge combines given maps, and does not check for any conflicts between the maps. In case of conflicts, second map (labels2) wins
// And we want cluster level keys to override keys from global/cluster for that cluster
clusterLabels := labels.Merge(globalAnswersMap, a.Values)
answerMap[a.ClusterName] = make(map[string]string)
answerMap[a.ClusterName] = clusterLabels
}
}
for _, a := range answers {
if a.ProjectName != "" {
// check if answers for the cluster of this project are provided
split := strings.SplitN(a.ProjectName, ":", 2)
if len(split) != 2 {
return answerMap, fmt.Errorf("error in splitting project name: %v", a.ProjectName)
}
clusterName := split[0]
// Using k8s labels.Merge, since by definition:
// Merge combines given maps, and does not check for any conflicts between the maps. In case of conflicts, second map (labels2) wins
// And we want project level keys to override keys from global level for that project
projectLabels := make(map[string]string)
if val, ok := answerMap[clusterName]; ok {
projectLabels = labels.Merge(val, a.Values)
} else {
projectLabels = labels.Merge(globalAnswersMap, a.Values)
}
answerMap[a.ProjectName] = make(map[string]string)
answerMap[a.ProjectName] = projectLabels
}
}
return answerMap, nil
}
// getExternalID gets the TemplateVersion.Spec.ExternalID field
func (m *MCAppManager) getExternalID(mcapp *v3.MultiClusterApp) (string, *v3.MultiClusterApp, error) {
// create the externalID field, it's also present on the templateVersion. So get the templateVersion and read its externalID field
split := strings.SplitN(mcapp.Spec.TemplateVersionName, ":", 2)
templateVersionNamespace := split[0]
templateVersionName := split[1]
tv, err := m.templateVersionLister.Get(templateVersionNamespace, templateVersionName)
if err != nil {
return "", mcapp, err
}
if tv == nil {
return "", mcapp, fmt.Errorf("invalid templateVersion provided: %v", mcapp.Spec.TemplateVersionName)
}
externalID := tv.Spec.ExternalID
return externalID, mcapp, nil
}
func getAppNamespaceName(mcappName, projectNS string) string {
return fmt.Sprintf("%s-%s", mcappName, projectNS)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/rancher/rancher.git
git@gitee.com:rancher/rancher.git
rancher
rancher
rancher
v2.2.2-rc2

搜索帮助

0d507c66 1850385 C8b1a773 1850385