1 Star 0 Fork 0

powerpaas / rke

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
etcd.go 22.51 KB
一键复制 编辑 原始数据 按行查看 历史
galal-hussein 提交于 2019-04-08 18:46 . Add :z to backup volume bind mounts
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600
package services
import (
"fmt"
"path"
"strings"
"time"
"context"
etcdclient "github.com/coreos/etcd/client"
"github.com/docker/docker/api/types/container"
"github.com/pkg/errors"
"github.com/rancher/rke/docker"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/util"
v3 "github.com/rancher/types/apis/management.cattle.io/v3"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"k8s.io/client-go/util/cert"
)
const (
EtcdSnapshotPath = "/opt/rke/etcd-snapshots/"
EtcdRestorePath = "/opt/rke/etcd-snapshots-restore/"
EtcdDataDir = "/var/lib/rancher/etcd/"
EtcdInitWaitTime = 10
EtcdSnapshotWaitTime = 5
)
func RunEtcdPlane(
ctx context.Context,
etcdHosts []*hosts.Host,
etcdNodePlanMap map[string]v3.RKEConfigNodePlan,
localConnDialerFactory hosts.DialerFactory,
prsMap map[string]v3.PrivateRegistry,
updateWorkersOnly bool,
alpineImage string,
es v3.ETCDService,
certMap map[string]pki.CertificatePKI) error {
log.Infof(ctx, "[%s] Building up etcd plane..", ETCDRole)
for _, host := range etcdHosts {
if updateWorkersOnly {
continue
}
etcdProcess := etcdNodePlanMap[host.Address].Processes[EtcdContainerName]
imageCfg, hostCfg, _ := GetProcessConfig(etcdProcess)
if err := docker.DoRunContainer(ctx, host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Address, ETCDRole, prsMap); err != nil {
return err
}
if *es.Snapshot == true {
if err := RunEtcdSnapshotSave(ctx, host, prsMap, alpineImage, EtcdSnapshotContainerName, false, es); err != nil {
return err
}
if err := pki.SaveBackupBundleOnHost(ctx, host, alpineImage, EtcdSnapshotPath, prsMap); err != nil {
return err
}
} else {
if err := docker.DoRemoveContainer(ctx, host.DClient, EtcdSnapshotContainerName, host.Address); err != nil {
return err
}
}
if err := createLogLink(ctx, host, EtcdContainerName, ETCDRole, alpineImage, prsMap); err != nil {
return err
}
}
log.Infof(ctx, "[%s] Successfully started etcd plane.. Checking etcd cluster health", ETCDRole)
clientCert := cert.EncodeCertPEM(certMap[pki.KubeNodeCertName].Certificate)
clientkey := cert.EncodePrivateKeyPEM(certMap[pki.KubeNodeCertName].Key)
var healthy bool
for _, host := range etcdHosts {
_, _, healthCheckURL := GetProcessConfig(etcdNodePlanMap[host.Address].Processes[EtcdContainerName])
if healthy = isEtcdHealthy(ctx, localConnDialerFactory, host, clientCert, clientkey, healthCheckURL); healthy {
break
}
}
if !healthy {
return fmt.Errorf("[etcd] Etcd Cluster is not healthy")
}
return nil
}
func RestartEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host) error {
log.Infof(ctx, "[%s] Restarting up etcd plane..", ETCDRole)
var errgrp errgroup.Group
hostsQueue := util.GetObjectQueue(etcdHosts)
for w := 0; w < WorkerThreads; w++ {
errgrp.Go(func() error {
var errList []error
for host := range hostsQueue {
runHost := host.(*hosts.Host)
if err := docker.DoRestartContainer(ctx, runHost.DClient, EtcdContainerName, runHost.Address); err != nil {
errList = append(errList, err)
}
}
return util.ErrList(errList)
})
}
if err := errgrp.Wait(); err != nil {
return err
}
log.Infof(ctx, "[%s] Successfully restarted etcd plane..", ETCDRole)
return nil
}
func RemoveEtcdPlane(ctx context.Context, etcdHosts []*hosts.Host, force bool) error {
log.Infof(ctx, "[%s] Tearing down etcd plane..", ETCDRole)
var errgrp errgroup.Group
hostsQueue := util.GetObjectQueue(etcdHosts)
for w := 0; w < WorkerThreads; w++ {
errgrp.Go(func() error {
var errList []error
for host := range hostsQueue {
runHost := host.(*hosts.Host)
if err := docker.DoRemoveContainer(ctx, runHost.DClient, EtcdContainerName, runHost.Address); err != nil {
errList = append(errList, err)
}
if !runHost.IsWorker || !runHost.IsControl || force {
// remove unschedulable kubelet on etcd host
if err := removeKubelet(ctx, runHost); err != nil {
errList = append(errList, err)
}
if err := removeKubeproxy(ctx, runHost); err != nil {
errList = append(errList, err)
}
if err := removeNginxProxy(ctx, runHost); err != nil {
errList = append(errList, err)
}
if err := removeSidekick(ctx, runHost); err != nil {
errList = append(errList, err)
}
}
}
return util.ErrList(errList)
})
}
if err := errgrp.Wait(); err != nil {
return err
}
log.Infof(ctx, "[%s] Successfully tore down etcd plane..", ETCDRole)
return nil
}
func AddEtcdMember(ctx context.Context, toAddEtcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) error {
log.Infof(ctx, "[add/%s] Adding member [etcd-%s] to etcd cluster", ETCDRole, toAddEtcdHost.HostnameOverride)
peerURL := fmt.Sprintf("https://%s:2380", toAddEtcdHost.InternalAddress)
added := false
for _, host := range etcdHosts {
if host.Address == toAddEtcdHost.Address {
continue
}
etcdClient, err := getEtcdClient(ctx, host, localConnDialerFactory, cert, key)
if err != nil {
logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err)
continue
}
memAPI := etcdclient.NewMembersAPI(etcdClient)
if _, err := memAPI.Add(ctx, peerURL); err != nil {
logrus.Debugf("Failed to Add etcd member [%s] from host: %v", host.Address, err)
continue
}
added = true
break
}
if !added {
return fmt.Errorf("Failed to add etcd member [etcd-%s] to etcd cluster", toAddEtcdHost.HostnameOverride)
}
log.Infof(ctx, "[add/%s] Successfully Added member [etcd-%s] to etcd cluster", ETCDRole, toAddEtcdHost.HostnameOverride)
return nil
}
func RemoveEtcdMember(ctx context.Context, etcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) error {
log.Infof(ctx, "[remove/%s] Removing member [etcd-%s] from etcd cluster", ETCDRole, etcdHost.HostnameOverride)
var mID string
removed := false
for _, host := range etcdHosts {
etcdClient, err := getEtcdClient(ctx, host, localConnDialerFactory, cert, key)
if err != nil {
logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err)
continue
}
memAPI := etcdclient.NewMembersAPI(etcdClient)
members, err := memAPI.List(ctx)
if err != nil {
logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err)
continue
}
for _, member := range members {
if member.Name == fmt.Sprintf("etcd-%s", etcdHost.HostnameOverride) {
mID = member.ID
break
}
}
if err := memAPI.Remove(ctx, mID); err != nil {
logrus.Debugf("Failed to list etcd members from host [%s]: %v", host.Address, err)
continue
}
removed = true
break
}
if !removed {
return fmt.Errorf("Failed to delete etcd member [etcd-%s] from etcd cluster", etcdHost.HostnameOverride)
}
log.Infof(ctx, "[remove/%s] Successfully removed member [etcd-%s] from etcd cluster", ETCDRole, etcdHost.HostnameOverride)
return nil
}
func ReloadEtcdCluster(ctx context.Context, readyEtcdHosts []*hosts.Host, newHost *hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte, prsMap map[string]v3.PrivateRegistry, etcdNodePlanMap map[string]v3.RKEConfigNodePlan, alpineImage string) error {
imageCfg, hostCfg, _ := GetProcessConfig(etcdNodePlanMap[newHost.Address].Processes[EtcdContainerName])
if err := docker.DoRunContainer(ctx, newHost.DClient, imageCfg, hostCfg, EtcdContainerName, newHost.Address, ETCDRole, prsMap); err != nil {
return err
}
if err := createLogLink(ctx, newHost, EtcdContainerName, ETCDRole, alpineImage, prsMap); err != nil {
return err
}
time.Sleep(EtcdInitWaitTime * time.Second)
var healthy bool
for _, host := range readyEtcdHosts {
_, _, healthCheckURL := GetProcessConfig(etcdNodePlanMap[host.Address].Processes[EtcdContainerName])
if healthy = isEtcdHealthy(ctx, localConnDialerFactory, host, cert, key, healthCheckURL); healthy {
break
}
}
if !healthy {
return fmt.Errorf("[etcd] Etcd Cluster is not healthy")
}
return nil
}
func IsEtcdMember(ctx context.Context, etcdHost *hosts.Host, etcdHosts []*hosts.Host, localConnDialerFactory hosts.DialerFactory, cert, key []byte) (bool, error) {
var listErr error
peerURL := fmt.Sprintf("https://%s:2380", etcdHost.InternalAddress)
for _, host := range etcdHosts {
if host.Address == etcdHost.Address {
continue
}
etcdClient, err := getEtcdClient(ctx, host, localConnDialerFactory, cert, key)
if err != nil {
listErr = errors.Wrapf(err, "Failed to create etcd client for host [%s]", host.Address)
logrus.Debugf("Failed to create etcd client for host [%s]: %v", host.Address, err)
continue
}
memAPI := etcdclient.NewMembersAPI(etcdClient)
members, err := memAPI.List(ctx)
if err != nil {
listErr = errors.Wrapf(err, "Failed to create etcd client for host [%s]", host.Address)
logrus.Debugf("Failed to list etcd cluster members [%s]: %v", etcdHost.Address, err)
continue
}
for _, member := range members {
if strings.Contains(member.PeerURLs[0], peerURL) {
logrus.Infof("[etcd] member [%s] is already part of the etcd cluster", etcdHost.Address)
return true, nil
}
}
// reset the list of errors to handle new hosts
listErr = nil
break
}
if listErr != nil {
return false, listErr
}
return false, nil
}
func RunEtcdSnapshotSave(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry, etcdSnapshotImage string, name string, once bool, es v3.ETCDService) error {
log.Infof(ctx, "[etcd] Saving snapshot [%s] on host [%s]", name, etcdHost.Address)
backupCmd := "etcd-backup"
restartPolicy := "always"
if !util.IsRancherBackupSupported(etcdSnapshotImage) {
backupCmd = "rolling-backup"
}
imageCfg := &container.Config{
Cmd: []string{
"/opt/rke-tools/rke-etcd-backup",
backupCmd,
"save",
"--cacert", pki.GetCertPath(pki.CACertName),
"--cert", pki.GetCertPath(pki.KubeNodeCertName),
"--key", pki.GetKeyPath(pki.KubeNodeCertName),
"--name", name,
"--endpoints=" + etcdHost.InternalAddress + ":2379",
},
Image: etcdSnapshotImage,
}
if once {
imageCfg.Cmd = append(imageCfg.Cmd, "--once")
restartPolicy = "no"
} else if es.BackupConfig == nil {
imageCfg.Cmd = append(imageCfg.Cmd, "--retention="+es.Retention)
imageCfg.Cmd = append(imageCfg.Cmd, "--creation="+es.Creation)
}
if es.BackupConfig != nil {
imageCfg = configS3BackupImgCmd(ctx, imageCfg, es.BackupConfig)
}
hostCfg := &container.HostConfig{
Binds: []string{
fmt.Sprintf("%s:/backup:z", EtcdSnapshotPath),
fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(etcdHost.PrefixPath, "/etc/kubernetes"))},
NetworkMode: container.NetworkMode("host"),
RestartPolicy: container.RestartPolicy{Name: restartPolicy},
}
if once {
if err := docker.DoRemoveContainer(ctx, etcdHost.DClient, EtcdSnapshotOnceContainerName, etcdHost.Address); err != nil {
return err
}
if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdSnapshotOnceContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
return err
}
status, _, stderr, err := docker.GetContainerOutput(ctx, etcdHost.DClient, EtcdSnapshotOnceContainerName, etcdHost.Address)
if status != 0 || err != nil {
if removeErr := docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdSnapshotOnceContainerName); removeErr != nil {
log.Warnf(ctx, "Failed to remove container [%s]: %v", removeErr)
}
if err != nil {
return err
}
return fmt.Errorf("Failed to take one-time snapshot, exit code [%d]: %v", status, stderr)
}
return docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdSnapshotOnceContainerName)
}
if err := docker.DoRemoveContainer(ctx, etcdHost.DClient, EtcdSnapshotContainerName, etcdHost.Address); err != nil {
return err
}
if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdSnapshotContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
return err
}
// check if the container exited with error
snapshotCont, err := docker.InspectContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdSnapshotContainerName)
if err != nil {
return err
}
time.Sleep(EtcdSnapshotWaitTime * time.Second)
if snapshotCont.State.Status == "exited" || snapshotCont.State.Restarting {
log.Warnf(ctx, "Etcd rolling snapshot container failed to start correctly")
return docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdSnapshotContainerName)
}
return nil
}
func DownloadEtcdSnapshotFromS3(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry, etcdSnapshotImage string, name string, es v3.ETCDService) error {
log.Infof(ctx, "[etcd] Get snapshot [%s] on host [%s]", name, etcdHost.Address)
s3Backend := es.BackupConfig.S3BackupConfig
if len(s3Backend.Endpoint) == 0 || len(s3Backend.BucketName) == 0 {
return fmt.Errorf("failed to get snapshot [%s] from s3 on host [%s], invalid s3 configurations", name, etcdHost.Address)
}
imageCfg := &container.Config{
Cmd: []string{
"/opt/rke-tools/rke-etcd-backup",
"etcd-backup",
"download",
"--name", name,
"--s3-backup=true",
"--s3-endpoint=" + s3Backend.Endpoint,
"--s3-accessKey=" + s3Backend.AccessKey,
"--s3-secretKey=" + s3Backend.SecretKey,
"--s3-bucketName=" + s3Backend.BucketName,
"--s3-region=" + s3Backend.Region,
},
Image: etcdSnapshotImage,
}
hostCfg := &container.HostConfig{
Binds: []string{
fmt.Sprintf("%s:/backup:z", EtcdSnapshotPath),
fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(etcdHost.PrefixPath, "/etc/kubernetes"))},
NetworkMode: container.NetworkMode("host"),
RestartPolicy: container.RestartPolicy{Name: "always"},
}
if err := docker.DoRemoveContainer(ctx, etcdHost.DClient, EtcdDownloadBackupContainerName, etcdHost.Address); err != nil {
return err
}
if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdDownloadBackupContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
return err
}
status, _, stderr, err := docker.GetContainerOutput(ctx, etcdHost.DClient, EtcdDownloadBackupContainerName, etcdHost.Address)
if status != 0 || err != nil {
if removeErr := docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdDownloadBackupContainerName); removeErr != nil {
log.Warnf(ctx, "Failed to remove container [%s]: %v", removeErr)
}
if err != nil {
return err
}
return fmt.Errorf("Failed to download etcd snapshot from s3, exit code [%d]: %v", status, stderr)
}
return docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdDownloadBackupContainerName)
}
func RestoreEtcdSnapshot(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry, etcdRestoreImage, snapshotName, initCluster string) error {
log.Infof(ctx, "[etcd] Restoring [%s] snapshot on etcd host [%s]", snapshotName, etcdHost.Address)
nodeName := pki.GetEtcdCrtName(etcdHost.InternalAddress)
snapshotPath := fmt.Sprintf("%s%s", EtcdSnapshotPath, snapshotName)
// make sure that restore path is empty otherwise etcd restore will fail
imageCfg := &container.Config{
Cmd: []string{
"sh", "-c", strings.Join([]string{
"rm -rf", EtcdRestorePath,
"&& /usr/local/bin/etcdctl",
fmt.Sprintf("--endpoints=[%s:2379]", etcdHost.InternalAddress),
"--cacert", pki.GetCertPath(pki.CACertName),
"--cert", pki.GetCertPath(nodeName),
"--key", pki.GetKeyPath(nodeName),
"snapshot", "restore", snapshotPath,
"--data-dir=" + EtcdRestorePath,
"--name=etcd-" + etcdHost.HostnameOverride,
"--initial-cluster=" + initCluster,
"--initial-cluster-token=etcd-cluster-1",
"--initial-advertise-peer-urls=https://" + etcdHost.InternalAddress + ":2380",
"&& mv", EtcdRestorePath + "*", EtcdDataDir,
"&& rm -rf", EtcdRestorePath,
}, " "),
},
Env: []string{"ETCDCTL_API=3"},
Image: etcdRestoreImage,
}
hostCfg := &container.HostConfig{
Binds: []string{
"/opt/rke/:/opt/rke/:z",
fmt.Sprintf("%s:/var/lib/rancher/etcd:z", path.Join(etcdHost.PrefixPath, "/var/lib/etcd")),
fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(etcdHost.PrefixPath, "/etc/kubernetes"))},
NetworkMode: container.NetworkMode("host"),
}
if err := docker.DoRemoveContainer(ctx, etcdHost.DClient, EtcdRestoreContainerName, etcdHost.Address); err != nil {
return err
}
if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdRestoreContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
return err
}
status, err := docker.WaitForContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdRestoreContainerName)
if err != nil {
return err
}
if status != 0 {
containerLog, _, err := docker.GetContainerLogsStdoutStderr(ctx, etcdHost.DClient, EtcdRestoreContainerName, "5", false)
if err != nil {
return err
}
if err := docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdRestoreContainerName); err != nil {
return err
}
// printing the restore container's logs
return fmt.Errorf("Failed to run etcd restore container, exit status is: %d, container logs: %s", status, containerLog)
}
return docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdRestoreContainerName)
}
func GetEtcdSnapshotChecksum(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry, alpineImage, snapshotName string) (string, error) {
var checksum string
var err error
snapshotPath := fmt.Sprintf("%s%s", EtcdSnapshotPath, snapshotName)
imageCfg := &container.Config{
Cmd: []string{
"sh", "-c", strings.Join([]string{
"md5sum", snapshotPath,
"|", "cut", "-f1", "-d' '", "|", "tr", "-d", "'\n'"}, " "),
},
Image: alpineImage,
}
hostCfg := &container.HostConfig{
Binds: []string{
"/opt/rke/:/opt/rke/:z",
}}
if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdChecksumContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
return checksum, err
}
if _, err := docker.WaitForContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdChecksumContainerName); err != nil {
return checksum, err
}
_, checksum, err = docker.GetContainerLogsStdoutStderr(ctx, etcdHost.DClient, EtcdChecksumContainerName, "1", false)
if err != nil {
return checksum, err
}
if err := docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdChecksumContainerName); err != nil {
return checksum, err
}
return checksum, nil
}
func configS3BackupImgCmd(ctx context.Context, imageCfg *container.Config, bc *v3.BackupConfig) *container.Config {
cmd := []string{
"--creation=" + fmt.Sprintf("%dh", bc.IntervalHours),
"--retention=" + fmt.Sprintf("%dh", bc.Retention*bc.IntervalHours),
}
if bc.S3BackupConfig != nil {
cmd = append(cmd, []string{
"--s3-backup=true",
"--s3-endpoint=" + bc.S3BackupConfig.Endpoint,
"--s3-accessKey=" + bc.S3BackupConfig.AccessKey,
"--s3-secretKey=" + bc.S3BackupConfig.SecretKey,
"--s3-bucketName=" + bc.S3BackupConfig.BucketName,
"--s3-region=" + bc.S3BackupConfig.Region,
}...)
}
imageCfg.Cmd = append(imageCfg.Cmd, cmd...)
return imageCfg
}
func StartBackupServer(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry, etcdSnapshotImage string, name string) error {
log.Infof(ctx, "[etcd] starting backup server on host [%s]", etcdHost.Address)
imageCfg := &container.Config{
Cmd: []string{
"/opt/rke-tools/rke-etcd-backup",
"etcd-backup",
"serve",
"--name", name,
"--cacert", pki.GetCertPath(pki.CACertName),
"--cert", pki.GetCertPath(pki.KubeNodeCertName),
"--key", pki.GetKeyPath(pki.KubeNodeCertName),
},
Image: etcdSnapshotImage,
}
hostCfg := &container.HostConfig{
Binds: []string{
fmt.Sprintf("%s:/backup:z", EtcdSnapshotPath),
fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(etcdHost.PrefixPath, "/etc/kubernetes"))},
NetworkMode: container.NetworkMode("host"),
RestartPolicy: container.RestartPolicy{Name: "no"},
}
if err := docker.DoRemoveContainer(ctx, etcdHost.DClient, EtcdServeBackupContainerName, etcdHost.Address); err != nil {
return err
}
if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdServeBackupContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
return err
}
time.Sleep(EtcdSnapshotWaitTime * time.Second)
container, err := docker.InspectContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdServeBackupContainerName)
if err != nil {
return err
}
if !container.State.Running {
containerLog, _, err := docker.GetContainerLogsStdoutStderr(ctx, etcdHost.DClient, EtcdServeBackupContainerName, "1", false)
if err != nil {
return err
}
if err := docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdServeBackupContainerName); err != nil {
return err
}
// printing the restore container's logs
return fmt.Errorf("Failed to run backup server container, container logs: %s", containerLog)
}
return nil
}
func DownloadEtcdSnapshotFromBackupServer(ctx context.Context, etcdHost *hosts.Host, prsMap map[string]v3.PrivateRegistry, etcdSnapshotImage, name string, backupServer *hosts.Host) error {
log.Infof(ctx, "[etcd] Get snapshot [%s] on host [%s]", name, etcdHost.Address)
imageCfg := &container.Config{
Cmd: []string{
"/opt/rke-tools/rke-etcd-backup",
"etcd-backup",
"download",
"--name", name,
"--local-endpoint", backupServer.InternalAddress,
"--cacert", pki.GetCertPath(pki.CACertName),
"--cert", pki.GetCertPath(pki.KubeNodeCertName),
"--key", pki.GetKeyPath(pki.KubeNodeCertName),
},
Image: etcdSnapshotImage,
}
hostCfg := &container.HostConfig{
Binds: []string{
fmt.Sprintf("%s:/backup:z", EtcdSnapshotPath),
fmt.Sprintf("%s:/etc/kubernetes:z", path.Join(etcdHost.PrefixPath, "/etc/kubernetes"))},
NetworkMode: container.NetworkMode("host"),
RestartPolicy: container.RestartPolicy{Name: "on-failure"},
}
if err := docker.DoRemoveContainer(ctx, etcdHost.DClient, EtcdDownloadBackupContainerName, etcdHost.Address); err != nil {
return err
}
if err := docker.DoRunContainer(ctx, etcdHost.DClient, imageCfg, hostCfg, EtcdDownloadBackupContainerName, etcdHost.Address, ETCDRole, prsMap); err != nil {
return err
}
status, _, stderr, err := docker.GetContainerOutput(ctx, etcdHost.DClient, EtcdDownloadBackupContainerName, etcdHost.Address)
if status != 0 || err != nil {
if removeErr := docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdDownloadBackupContainerName); removeErr != nil {
log.Warnf(ctx, "Failed to remove container [%s]: %v", removeErr)
}
if err != nil {
return err
}
return fmt.Errorf("Failed to download etcd snapshot from backup server [%s], exit code [%d]: %v", backupServer.Address, status, stderr)
}
return docker.RemoveContainer(ctx, etcdHost.DClient, etcdHost.Address, EtcdDownloadBackupContainerName)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/powerpaas/rke.git
git@gitee.com:powerpaas/rke.git
powerpaas
rke
rke
v0.2.2

搜索帮助

344bd9b3 5694891 D2dac590 5694891