37 Star 396 Fork 71

GVPrancher/rancher

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
controller.go 16.34 KB
一键复制 编辑 原始数据 按行查看 历史
Dan Ramich 提交于 2019-05-14 13:54 . Fix writing file contents to disk
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609
package node
import (
"context"
"crypto/sha256"
"encoding/base32"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"strings"
"time"
"github.com/pkg/errors"
"github.com/rancher/norman/objectclient"
"github.com/rancher/norman/types/convert"
"github.com/rancher/norman/types/values"
"github.com/rancher/rancher/pkg/api/customization/clusterregistrationtokens"
"github.com/rancher/rancher/pkg/encryptedstore"
"github.com/rancher/rancher/pkg/jailer"
"github.com/rancher/rancher/pkg/namespace"
"github.com/rancher/rancher/pkg/nodeconfig"
"github.com/rancher/rancher/pkg/ref"
"github.com/rancher/rancher/pkg/systemaccount"
corev1 "github.com/rancher/types/apis/core/v1"
v3 "github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/rancher/types/config"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
v1 "k8s.io/api/core/v1"
kerror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
)
const (
defaultEngineInstallURL = "https://releases.rancher.com/install-docker/17.03.2.sh"
amazonec2 = "amazonec2"
)
// aliases maps Schema field => driver field
// The opposite of this lives in pkg/controllers/management/drivers/nodedriver/machine_driver.go
var aliases = map[string]map[string]string{
"aliyunecs": map[string]string{"sshKeyContents": "sshKeypath"},
"amazonec2": map[string]string{"sshKeyContents": "sshKeypath", "userdata": "userdata"},
"azure": map[string]string{"customData": "customData"},
"digitalocean": map[string]string{"sshKeyContents": "sshKeyPath", "userdata": "userdata"},
"exoscale": map[string]string{"sshKey": "sshKey", "userdata": "userdata"},
"openstack": map[string]string{"privateKeyFile": "privateKeyFile"},
"otc": map[string]string{"privateKeyFile": "privateKeyFile"},
"packet": map[string]string{"userdata": "userdata"},
}
func Register(ctx context.Context, management *config.ManagementContext) {
secretStore, err := nodeconfig.NewStore(management.Core.Namespaces(""), management.Core)
if err != nil {
logrus.Fatal(err)
}
nodeClient := management.Management.Nodes("")
nodeLifecycle := &Lifecycle{
systemAccountManager: systemaccount.NewManager(management),
secretStore: secretStore,
nodeClient: nodeClient,
nodeTemplateClient: management.Management.NodeTemplates(""),
nodeTemplateGenericClient: management.Management.NodeTemplates("").ObjectClient().UnstructuredClient(),
configMapGetter: management.K8sClient.CoreV1(),
clusterLister: management.Management.Clusters("").Controller().Lister(),
schemaLister: management.Management.DynamicSchemas("").Controller().Lister(),
credLister: management.Core.Secrets("").Controller().Lister(),
devMode: os.Getenv("CATTLE_DEV_MODE") != "",
}
nodeClient.AddLifecycle(ctx, "node-controller", nodeLifecycle)
}
type Lifecycle struct {
systemAccountManager *systemaccount.Manager
secretStore *encryptedstore.GenericEncryptedStore
nodeTemplateGenericClient objectclient.GenericClient
nodeClient v3.NodeInterface
nodeTemplateClient v3.NodeTemplateInterface
configMapGetter typedv1.ConfigMapsGetter
clusterLister v3.ClusterLister
schemaLister v3.DynamicSchemaLister
credLister corev1.SecretLister
devMode bool
}
func (m *Lifecycle) setupCustom(obj *v3.Node) {
obj.Status.NodeConfig = &v3.RKEConfigNode{
NodeName: obj.Namespace + ":" + obj.Name,
HostnameOverride: obj.Spec.RequestedHostname,
Address: obj.Spec.CustomConfig.Address,
InternalAddress: obj.Spec.CustomConfig.InternalAddress,
User: obj.Spec.CustomConfig.User,
DockerSocket: obj.Spec.CustomConfig.DockerSocket,
SSHKey: obj.Spec.CustomConfig.SSHKey,
Labels: obj.Spec.CustomConfig.Label,
Port: "22",
Role: roles(obj),
}
if obj.Status.NodeConfig.User == "" {
obj.Status.NodeConfig.User = "root"
}
obj.Status.InternalNodeStatus.Addresses = []v1.NodeAddress{
{
Type: v1.NodeInternalIP,
Address: obj.Status.NodeConfig.Address,
},
}
}
func isCustom(obj *v3.Node) bool {
return obj.Spec.CustomConfig != nil && obj.Spec.CustomConfig.Address != ""
}
func (m *Lifecycle) setWaiting(node *v3.Node) {
v3.NodeConditionRegistered.IsUnknown(node)
v3.NodeConditionRegistered.Message(node, "waiting to register with Kubernetes")
}
func (m *Lifecycle) Create(obj *v3.Node) (runtime.Object, error) {
if isCustom(obj) {
m.setupCustom(obj)
newObj, err := v3.NodeConditionInitialized.Once(obj, func() (runtime.Object, error) {
if err := validateCustomHost(obj); err != nil {
return obj, err
}
m.setWaiting(obj)
return obj, nil
})
return newObj.(*v3.Node), err
}
if obj.Spec.NodeTemplateName == "" {
return obj, nil
}
newObj, err := v3.NodeConditionInitialized.Once(obj, func() (runtime.Object, error) {
template, err := m.getNodeTemplate(obj.Spec.NodeTemplateName)
if err != nil {
return obj, err
}
obj.Status.NodeTemplateSpec = &template.Spec
if obj.Spec.RequestedHostname == "" {
obj.Spec.RequestedHostname = obj.Name
}
if obj.Status.NodeTemplateSpec.EngineInstallURL == "" {
obj.Status.NodeTemplateSpec.EngineInstallURL = defaultEngineInstallURL
}
rawTemplate, err := m.nodeTemplateGenericClient.GetNamespaced(template.Namespace, template.Name, metav1.GetOptions{})
if err != nil {
return obj, err
}
data := rawTemplate.(*unstructured.Unstructured).Object
rawConfig, ok := values.GetValue(data, template.Spec.Driver+"Config")
if !ok {
return obj, fmt.Errorf("node config not specified")
}
if template.Spec.Driver == amazonec2 {
setEc2ClusterIDTag(rawConfig, obj.Namespace)
}
if err := m.updateRawConfigFromCredential(data, rawConfig, template); err != nil {
return obj, err
}
bytes, err := json.Marshal(rawConfig)
if err != nil {
return obj, errors.Wrap(err, "failed to marshal node driver config")
}
if !m.devMode {
err := jailer.CreateJail(obj.Namespace)
if err != nil {
logrus.Infof("Create jail error: %v", err)
return nil, err
}
}
config, err := nodeconfig.NewNodeConfig(m.secretStore, obj)
if err != nil {
return obj, errors.Wrap(err, "failed to save node driver config")
}
defer config.Cleanup()
config.SetDriverConfig(string(bytes))
return obj, config.Save()
})
return newObj.(*v3.Node), err
}
func (m *Lifecycle) getNodeTemplate(nodeTemplateName string) (*v3.NodeTemplate, error) {
ns, n := ref.Parse(nodeTemplateName)
return m.nodeTemplateClient.GetNamespaced(ns, n, metav1.GetOptions{})
}
func (m *Lifecycle) Remove(obj *v3.Node) (runtime.Object, error) {
if obj.Status.NodeTemplateSpec == nil {
return obj, nil
}
newObj, err := v3.NodeConditionRemoved.DoUntilTrue(obj, func() (runtime.Object, error) {
found, err := m.isNodeInAppliedSpec(obj)
if err != nil {
return obj, err
}
if found {
return obj, errors.New("waiting for node to be removed from cluster")
}
if !m.devMode {
err := jailer.CreateJail(obj.Namespace)
if err != nil {
logrus.Infof("Create jail error: %v", err)
return nil, err
}
}
config, err := nodeconfig.NewNodeConfig(m.secretStore, obj)
if err != nil {
return obj, err
}
if err := config.Restore(); err != nil {
return obj, err
}
defer config.Remove()
mExists, err := nodeExists(config.Dir(), obj)
if err != nil {
return obj, err
}
if mExists {
logrus.Infof("Removing node %s", obj.Spec.RequestedHostname)
if err := deleteNode(config.Dir(), obj); err != nil {
return obj, err
}
logrus.Infof("Removing node %s done", obj.Spec.RequestedHostname)
}
return obj, nil
})
return newObj.(*v3.Node), err
}
func (m *Lifecycle) provision(driverConfig, nodeDir string, obj *v3.Node) (*v3.Node, error) {
configRawMap := map[string]interface{}{}
if err := json.Unmarshal([]byte(driverConfig), &configRawMap); err != nil {
return obj, errors.Wrap(err, "failed to unmarshal node config")
}
// Since we know this will take a long time persist so user sees status
obj, err := m.nodeClient.Update(obj)
if err != nil {
return obj, err
}
err = aliasToPath(obj.Status.NodeTemplateSpec.Driver, configRawMap, obj.Namespace)
if err != nil {
return obj, err
}
createCommandsArgs := buildCreateCommand(obj, configRawMap)
cmd, err := buildCommand(nodeDir, obj, createCommandsArgs)
if err != nil {
return obj, err
}
logrus.Infof("Provisioning node %s", obj.Spec.RequestedHostname)
stdoutReader, stderrReader, err := startReturnOutput(cmd)
if err != nil {
return obj, err
}
defer stdoutReader.Close()
defer stderrReader.Close()
defer cmd.Wait()
obj, err = m.reportStatus(stdoutReader, stderrReader, obj)
if err != nil {
return obj, err
}
if err := cmd.Wait(); err != nil {
return obj, err
}
if err := m.deployAgent(nodeDir, obj); err != nil {
return obj, err
}
logrus.Infof("Provisioning node %s done", obj.Spec.RequestedHostname)
return obj, nil
}
func aliasToPath(driver string, config map[string]interface{}, ns string) error {
devMode := os.Getenv("CATTLE_DEV_MODE") != ""
baseDir := path.Join("/opt/jail", ns)
if devMode {
baseDir = os.TempDir()
}
// Check if the required driver has aliased fields
if fields, ok := aliases[driver]; ok {
hasher := sha256.New()
for schemaField, driverField := range fields {
if fileRaw, ok := config[schemaField]; ok {
fileContents := fileRaw.(string)
// Delete our aliased fields
delete(config, schemaField)
if fileContents == "" {
continue
}
hasher.Reset()
hasher.Write([]byte(fileContents))
sha := base32.StdEncoding.WithPadding(-1).EncodeToString(hasher.Sum(nil))[:10]
fullPath := path.Join(baseDir, sha)
err := ioutil.WriteFile(fullPath, []byte(fileContents), 0644)
if err != nil {
return err
}
// Add the field and path
if devMode {
config[driverField] = fullPath
} else {
config[driverField] = path.Join("/", sha)
}
}
}
}
return nil
}
func (m *Lifecycle) deployAgent(nodeDir string, obj *v3.Node) error {
token, err := m.systemAccountManager.GetOrCreateSystemClusterToken(obj.Namespace)
if err != nil {
return err
}
drun := clusterregistrationtokens.NodeCommand(token)
args := buildAgentCommand(obj, drun)
cmd, err := buildCommand(nodeDir, obj, args)
if err != nil {
return err
}
output, err := cmd.CombinedOutput()
if err != nil {
return errors.Wrap(err, string(output))
}
return nil
}
func (m *Lifecycle) ready(obj *v3.Node) (*v3.Node, error) {
config, err := nodeconfig.NewNodeConfig(m.secretStore, obj)
if err != nil {
return obj, err
}
defer config.Cleanup()
if err := config.Restore(); err != nil {
return obj, err
}
driverConfig, err := config.DriverConfig()
if err != nil {
return nil, err
}
// Provision in the background so we can poll and save the config
done := make(chan error)
go func() {
newObj, err := m.provision(driverConfig, config.Dir(), obj)
obj = newObj
done <- err
}()
// Poll and save config
outer:
for {
select {
case err = <-done:
break outer
case <-time.After(5 * time.Second):
config.Save()
}
}
newObj, saveError := v3.NodeConditionConfigSaved.Once(obj, func() (runtime.Object, error) {
return m.saveConfig(config, config.FullDir(), obj)
})
obj = newObj.(*v3.Node)
if err == nil {
return obj, saveError
}
return obj, err
}
func (m *Lifecycle) Updated(obj *v3.Node) (runtime.Object, error) {
obj, err := m.checkLabels(obj)
if err != nil {
return obj, err
}
newObj, err := v3.NodeConditionProvisioned.Once(obj, func() (runtime.Object, error) {
if obj.Status.NodeTemplateSpec == nil {
m.setWaiting(obj)
return obj, nil
}
if !m.devMode {
logrus.Infof("Creating jail for %v", obj.Namespace)
err := jailer.CreateJail(obj.Namespace)
if err != nil {
logrus.Infof("Create jail error: %v", err)
return nil, err
}
}
obj, err := m.ready(obj)
if err == nil {
m.setWaiting(obj)
}
return obj, err
})
return newObj.(*v3.Node), err
}
func (m *Lifecycle) saveConfig(config *nodeconfig.NodeConfig, nodeDir string, obj *v3.Node) (*v3.Node, error) {
logrus.Infof("Generating and uploading node config %s", obj.Spec.RequestedHostname)
if err := config.Save(); err != nil {
return obj, err
}
ip, err := config.IP()
if err != nil {
return obj, err
}
interalAddress, err := config.InternalIP()
if err != nil {
return obj, err
}
sshKey, err := getSSHKey(nodeDir, obj)
if err != nil {
return obj, err
}
sshUser, err := config.SSHUser()
if err != nil {
return obj, err
}
if err := config.Save(); err != nil {
return obj, err
}
template, err := m.getNodeTemplate(obj.Spec.NodeTemplateName)
if err != nil {
return obj, err
}
obj.Status.NodeConfig = &v3.RKEConfigNode{
NodeName: obj.Namespace + ":" + obj.Name,
Address: ip,
InternalAddress: interalAddress,
User: sshUser,
Role: roles(obj),
HostnameOverride: obj.Spec.RequestedHostname,
SSHKey: sshKey,
Labels: template.Labels,
}
obj.Status.InternalNodeStatus.Addresses = []v1.NodeAddress{
{
Type: v1.NodeInternalIP,
Address: obj.Status.NodeConfig.Address,
},
}
if len(obj.Status.NodeConfig.Role) == 0 {
obj.Status.NodeConfig.Role = []string{"worker"}
}
return obj, nil
}
func (m *Lifecycle) isNodeInAppliedSpec(node *v3.Node) (bool, error) {
// worker/controlplane nodes can just be immediately deleted
if !node.Spec.Etcd {
return false, nil
}
cluster, err := m.clusterLister.Get("", node.Namespace)
if err != nil {
if kerror.IsNotFound(err) {
return false, nil
}
return false, err
}
if cluster == nil {
return false, nil
}
if cluster.DeletionTimestamp != nil {
return false, nil
}
if cluster.Status.AppliedSpec.RancherKubernetesEngineConfig == nil {
return false, nil
}
for _, rkeNode := range cluster.Status.AppliedSpec.RancherKubernetesEngineConfig.Nodes {
nodeName := rkeNode.NodeName
if len(nodeName) == 0 {
continue
}
if nodeName == fmt.Sprintf("%s:%s", node.Namespace, node.Name) {
return true, nil
}
}
return false, nil
}
func validateCustomHost(obj *v3.Node) error {
if obj.Spec.Imported {
return nil
}
customConfig := obj.Spec.CustomConfig
signer, err := ssh.ParsePrivateKey([]byte(customConfig.SSHKey))
if err != nil {
return errors.Wrapf(err, "sshKey format is invalid")
}
config := &ssh.ClientConfig{
User: customConfig.User,
Auth: []ssh.AuthMethod{
ssh.PublicKeys(signer),
},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}
conn, err := ssh.Dial("tcp", customConfig.Address+":22", config)
if err != nil {
return errors.Wrapf(err, "Failed to validate ssh connection to address [%s]", customConfig.Address)
}
defer conn.Close()
return nil
}
func roles(node *v3.Node) []string {
var roles []string
if node.Spec.Etcd {
roles = append(roles, "etcd")
}
if node.Spec.ControlPlane {
roles = append(roles, "controlplane")
}
if node.Spec.Worker {
roles = append(roles, "worker")
}
if len(roles) == 0 {
return []string{"worker"}
}
return roles
}
func (m *Lifecycle) setCredFields(data interface{}, fields map[string]v3.Field, credID string) error {
splitID := strings.Split(credID, ":")
if len(splitID) != 2 {
return fmt.Errorf("invalid credential id %s", credID)
}
cred, err := m.credLister.Get(namespace.GlobalNamespace, splitID[1])
if err != nil {
return err
}
if ans := convert.ToMapInterface(data); len(ans) > 0 {
for key, val := range cred.Data {
splitKey := strings.Split(key, "-")
if len(splitKey) == 2 && strings.HasSuffix(splitKey[0], "Config") {
if _, ok := fields[splitKey[1]]; ok {
ans[splitKey[1]] = string(val)
}
}
}
}
return nil
}
func (m *Lifecycle) updateRawConfigFromCredential(data map[string]interface{}, rawConfig interface{}, template *v3.NodeTemplate) error {
credID := convert.ToString(values.GetValueN(data, "spec", "cloudCredentialName"))
if credID != "" {
existingSchema, err := m.schemaLister.Get("", template.Spec.Driver+"config")
if err != nil {
return err
}
logrus.Debugf("setCredFields for credentialName %s", credID)
err = m.setCredFields(rawConfig, existingSchema.Spec.ResourceFields, credID)
if err != nil {
return errors.Wrap(err, "failed to set credential fields")
}
}
return nil
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/rancher/rancher.git
git@gitee.com:rancher/rancher.git
rancher
rancher
rancher
v2.2.4-rc2

搜索帮助

344bd9b3 5694891 D2dac590 5694891