代码拉取完成,页面将自动刷新
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
import (
"math/rand"
"net"
"net/http"
"net/http/pprof"
"os"
goruntime "runtime"
"strconv"
"strings"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/cmd/cloud-controller-manager/app/options"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
cloudcontrollers "k8s.io/kubernetes/pkg/controller/cloud"
routecontroller "k8s.io/kubernetes/pkg/controller/route"
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
"k8s.io/kubernetes/pkg/util/configz"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
)
const (
// Jitter used when starting controller managers
ControllerStartJitter = 1.0
)
// NewCloudControllerManagerCommand creates a *cobra.Command object with default parameters
func NewCloudControllerManagerCommand() *cobra.Command {
s := options.NewCloudControllerManagerServer()
s.AddFlags(pflag.CommandLine)
cmd := &cobra.Command{
Use: "cloud-controller-manager",
Long: `The Cloud controller manager is a daemon that embeds
the cloud specific control loops shipped with Kubernetes.`,
Run: func(cmd *cobra.Command, args []string) {
},
}
return cmd
}
// resyncPeriod computes the time interval a shared informer waits before resyncing with the api server
func resyncPeriod(s *options.CloudControllerManagerServer) func() time.Duration {
return func() time.Duration {
factor := rand.Float64() + 1
return time.Duration(float64(s.MinResyncPeriod.Nanoseconds()) * factor)
}
}
// Run runs the ExternalCMServer. This should never exit.
func Run(s *options.CloudControllerManagerServer, cloud cloudprovider.Interface) error {
if c, err := configz.New("componentconfig"); err == nil {
c.Set(s.KubeControllerManagerConfiguration)
} else {
glog.Errorf("unable to register configz: %s", err)
}
kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig)
if err != nil {
return err
}
// Set the ContentType of the requests from kube client
kubeconfig.ContentConfig.ContentType = s.ContentType
// Override kubeconfig qps/burst settings from flags
kubeconfig.QPS = s.KubeAPIQPS
kubeconfig.Burst = int(s.KubeAPIBurst)
kubeClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "cloud-controller-manager"))
if err != nil {
glog.Fatalf("Invalid API configuration: %v", err)
}
leaderElectionClient := kubernetes.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "leader-election"))
// Start the external controller manager server
go startHTTP(s)
recorder := createRecorder(kubeClient)
run := func(stop <-chan struct{}) {
rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: kubeconfig,
}
var clientBuilder controller.ControllerClientBuilder
if s.UseServiceAccountCredentials {
clientBuilder = controller.SAControllerClientBuilder{
ClientConfig: restclient.AnonymousClientConfig(kubeconfig),
CoreClient: kubeClient.CoreV1(),
AuthenticationClient: kubeClient.Authentication(),
Namespace: "kube-system",
}
} else {
clientBuilder = rootClientBuilder
}
err := StartControllers(s, kubeconfig, clientBuilder, stop, recorder, cloud)
glog.Fatalf("error running controllers: %v", err)
panic("unreachable")
}
if !s.LeaderElection.LeaderElect {
run(nil)
panic("unreachable")
}
// Identity used to distinguish between multiple cloud controller manager instances
id, err := os.Hostname()
if err != nil {
return err
}
// Lock required for leader election
rl := resourcelock.EndpointsLock{
EndpointsMeta: metav1.ObjectMeta{
Namespace: "kube-system",
Name: "cloud-controller-manager",
},
Client: leaderElectionClient.CoreV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id + "-external-cloud-controller",
EventRecorder: recorder,
},
}
// Try and become the leader and start cloud controller manager loops
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: &rl,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leaderelection lost")
},
},
})
panic("unreachable")
}
// StartControllers starts the cloud specific controller loops.
func StartControllers(s *options.CloudControllerManagerServer, kubeconfig *restclient.Config, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}, recorder record.EventRecorder, cloud cloudprovider.Interface) error {
// Function to build the kube client object
client := func(serviceAccountName string) clientset.Interface {
return clientBuilder.ClientOrDie(serviceAccountName)
}
if cloud != nil {
// Initialize the cloud provider with a reference to the clientBuilder
cloud.Initialize(clientBuilder)
}
versionedClient := client("shared-informers")
sharedInformers := informers.NewSharedInformerFactory(versionedClient, resyncPeriod(s)())
// Start the CloudNodeController
nodeController := cloudcontrollers.NewCloudNodeController(
sharedInformers.Core().V1().Nodes(),
client("cloud-node-controller"), cloud,
s.NodeMonitorPeriod.Duration,
s.NodeStatusUpdateFrequency.Duration)
nodeController.Run()
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
// Start the PersistentVolumeLabelController
pvlController := cloudcontrollers.NewPersistentVolumeLabelController(client("pvl-controller"), cloud)
threads := 5
go pvlController.Run(threads, stop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
// Start the service controller
serviceController, err := servicecontroller.New(
cloud,
client("service-controller"),
sharedInformers.Core().V1().Services(),
sharedInformers.Core().V1().Nodes(),
s.ClusterName,
)
if err != nil {
glog.Errorf("Failed to start service controller: %v", err)
} else {
go serviceController.Run(stop, int(s.ConcurrentServiceSyncs))
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
// If CIDRs should be allocated for pods and set on the CloudProvider, then start the route controller
if s.AllocateNodeCIDRs && s.ConfigureCloudRoutes {
if routes, ok := cloud.Routes(); !ok {
glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
} else {
var clusterCIDR *net.IPNet
if len(strings.TrimSpace(s.ClusterCIDR)) != 0 {
_, clusterCIDR, err = net.ParseCIDR(s.ClusterCIDR)
if err != nil {
glog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", s.ClusterCIDR, err)
}
}
routeController := routecontroller.New(routes, client("route-controller"), sharedInformers.Core().V1().Nodes(), s.ClusterName, clusterCIDR)
go routeController.Run(stop, s.RouteReconciliationPeriod.Duration)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
} else {
glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", s.AllocateNodeCIDRs, s.ConfigureCloudRoutes)
}
// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
if _, err = restclient.ServerAPIVersions(kubeconfig); err == nil {
return true, nil
}
glog.Errorf("Failed to get api versions from server: %v", err)
return false, nil
})
if err != nil {
glog.Fatalf("Failed to get api versions from server: %v", err)
}
sharedInformers.Start(stop)
select {}
}
func startHTTP(s *options.CloudControllerManagerServer) {
mux := http.NewServeMux()
healthz.InstallHandler(mux)
if s.EnableProfiling {
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
if s.EnableContentionProfiling {
goruntime.SetBlockProfileRate(1)
}
}
configz.InstallHandler(mux)
mux.Handle("/metrics", prometheus.Handler())
server := &http.Server{
Addr: net.JoinHostPort(s.Address, strconv.Itoa(int(s.Port))),
Handler: mux,
}
glog.Fatal(server.ListenAndServe())
}
func createRecorder(kubeClient *clientset.Clientset) record.EventRecorder {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})
return eventBroadcaster.NewRecorder(api.Scheme, v1.EventSource{Component: "cloud-controller-manager"})
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。