1 Star 0 Fork 0

zhuchance / kubernetes

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
server.go 32.44 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/http/pprof"
"reflect"
"strconv"
"strings"
"sync"
"time"
restful "github.com/emicklei/go-restful"
"github.com/golang/glog"
cadvisorapi "github.com/google/cadvisor/info/v1"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/kubernetes/pkg/api"
apierrs "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/auth/authenticator"
"k8s.io/kubernetes/pkg/auth/authorizer"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/httplog"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/configz"
"k8s.io/kubernetes/pkg/util/flushwriter"
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/httpstream/spdy"
"k8s.io/kubernetes/pkg/util/limitwriter"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/volume"
)
// Server is a http.Handler which exposes kubelet functionality over HTTP.
type Server struct {
auth AuthInterface
host HostInterface
restfulCont containerInterface
resourceAnalyzer stats.ResourceAnalyzer
runtime kubecontainer.Runtime
}
type TLSOptions struct {
Config *tls.Config
CertFile string
KeyFile string
}
// containerInterface defines the restful.Container functions used on the root container
type containerInterface interface {
Add(service *restful.WebService) *restful.Container
Handle(path string, handler http.Handler)
Filter(filter restful.FilterFunction)
ServeHTTP(w http.ResponseWriter, r *http.Request)
RegisteredWebServices() []*restful.WebService
// RegisteredHandlePaths returns the paths of handlers registered directly with the container (non-web-services)
// Used to test filters are being applied on non-web-service handlers
RegisteredHandlePaths() []string
}
// filteringContainer delegates all Handle(...) calls to Container.HandleWithFilter(...),
// so we can ensure restful.FilterFunctions are used for all handlers
type filteringContainer struct {
*restful.Container
registeredHandlePaths []string
}
func (a *filteringContainer) Handle(path string, handler http.Handler) {
a.HandleWithFilter(path, handler)
a.registeredHandlePaths = append(a.registeredHandlePaths, path)
}
func (a *filteringContainer) RegisteredHandlePaths() []string {
return a.registeredHandlePaths
}
// ListenAndServeKubeletServer initializes a server to respond to HTTP network requests on the Kubelet.
func ListenAndServeKubeletServer(
host HostInterface,
resourceAnalyzer stats.ResourceAnalyzer,
address net.IP,
port uint,
tlsOptions *TLSOptions,
auth AuthInterface,
enableDebuggingHandlers bool,
runtime kubecontainer.Runtime) {
glog.Infof("Starting to listen on %s:%d", address, port)
handler := NewServer(host, resourceAnalyzer, auth, enableDebuggingHandlers, runtime)
s := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
Handler: &handler,
MaxHeaderBytes: 1 << 20,
}
if tlsOptions != nil {
s.TLSConfig = tlsOptions.Config
glog.Fatal(s.ListenAndServeTLS(tlsOptions.CertFile, tlsOptions.KeyFile))
} else {
glog.Fatal(s.ListenAndServe())
}
}
// ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet.
func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint, runtime kubecontainer.Runtime) {
glog.V(1).Infof("Starting to listen read-only on %s:%d", address, port)
s := NewServer(host, resourceAnalyzer, nil, false, runtime)
server := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
Handler: &s,
MaxHeaderBytes: 1 << 20,
}
glog.Fatal(server.ListenAndServe())
}
// AuthInterface contains all methods required by the auth filters
type AuthInterface interface {
authenticator.Request
authorizer.RequestAttributesGetter
authorizer.Authorizer
}
// HostInterface contains all the kubelet methods required by the server.
// For testablitiy.
type HostInterface interface {
GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error)
GetContainerInfoV2(name string, options cadvisorapiv2.RequestOptions) (map[string]cadvisorapiv2.ContainerInfo, error)
GetRawContainerInfo(containerName string, req *cadvisorapi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorapi.ContainerInfo, error)
GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error)
GetPods() []*api.Pod
GetRunningPods() ([]*api.Pod, error)
GetPodByName(namespace, name string) (*api.Pod, bool)
RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error
AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error
GetKubeletContainerLogs(podFullName, containerName string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error
ServeLogs(w http.ResponseWriter, req *http.Request)
PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error
StreamingConnectionIdleTimeout() time.Duration
ResyncInterval() time.Duration
GetHostname() string
GetNode() (*api.Node, error)
GetNodeConfig() cm.NodeConfig
LatestLoopEntryTime() time.Time
ImagesFsInfo() (cadvisorapiv2.FsInfo, error)
RootFsInfo() (cadvisorapiv2.FsInfo, error)
ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool)
PLEGHealthCheck() (bool, error)
}
// NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
func NewServer(
host HostInterface,
resourceAnalyzer stats.ResourceAnalyzer,
auth AuthInterface,
enableDebuggingHandlers bool,
runtime kubecontainer.Runtime) Server {
server := Server{
host: host,
resourceAnalyzer: resourceAnalyzer,
auth: auth,
restfulCont: &filteringContainer{Container: restful.NewContainer()},
runtime: runtime,
}
if auth != nil {
server.InstallAuthFilter()
}
server.InstallDefaultHandlers()
if enableDebuggingHandlers {
server.InstallDebuggingHandlers()
}
return server
}
// InstallAuthFilter installs authentication filters with the restful Container.
func (s *Server) InstallAuthFilter() {
s.restfulCont.Filter(func(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
// Authenticate
u, ok, err := s.auth.AuthenticateRequest(req.Request)
if err != nil {
glog.Errorf("Unable to authenticate the request due to an error: %v", err)
resp.WriteErrorString(http.StatusUnauthorized, "Unauthorized")
return
}
if !ok {
resp.WriteErrorString(http.StatusUnauthorized, "Unauthorized")
return
}
// Get authorization attributes
attrs := s.auth.GetRequestAttributes(u, req.Request)
// Authorize
if err := s.auth.Authorize(attrs); err != nil {
msg := fmt.Sprintf("Forbidden (user=%s, verb=%s, namespace=%s, resource=%s)", u.GetName(), attrs.GetVerb(), attrs.GetNamespace(), attrs.GetResource())
glog.V(2).Info(msg)
resp.WriteErrorString(http.StatusForbidden, msg)
return
}
// Continue
chain.ProcessFilter(req, resp)
})
}
// InstallDefaultHandlers registers the default set of supported HTTP request
// patterns with the restful Container.
func (s *Server) InstallDefaultHandlers() {
healthz.InstallHandler(s.restfulCont,
healthz.PingHealthz,
healthz.NamedCheck("syncloop", s.syncLoopHealthCheck),
healthz.NamedCheck("pleg", s.plegHealthCheck),
)
var ws *restful.WebService
ws = new(restful.WebService)
ws.
Path("/pods").
Produces(restful.MIME_JSON)
ws.Route(ws.GET("").
To(s.getPods).
Operation("getPods"))
s.restfulCont.Add(ws)
s.restfulCont.Add(stats.CreateHandlers(s.host, s.resourceAnalyzer))
s.restfulCont.Handle("/metrics", prometheus.Handler())
ws = new(restful.WebService)
ws.
Path("/spec/").
Produces(restful.MIME_JSON)
ws.Route(ws.GET("").
To(s.getSpec).
Operation("getSpec").
Writes(cadvisorapi.MachineInfo{}))
s.restfulCont.Add(ws)
}
const pprofBasePath = "/debug/pprof/"
// InstallDeguggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
func (s *Server) InstallDebuggingHandlers() {
var ws *restful.WebService
ws = new(restful.WebService)
ws.
Path("/run")
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
To(s.getRun).
Operation("getRun"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getRun).
Operation("getRun"))
s.restfulCont.Add(ws)
ws = new(restful.WebService)
ws.
Path("/exec")
ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec).
Operation("getExec"))
s.restfulCont.Add(ws)
ws = new(restful.WebService)
ws.
Path("/attach")
ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
To(s.getAttach).
Operation("getAttach"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
To(s.getAttach).
Operation("getAttach"))
ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getAttach).
Operation("getAttach"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getAttach).
Operation("getAttach"))
s.restfulCont.Add(ws)
ws = new(restful.WebService)
ws.
Path("/portForward")
ws.Route(ws.POST("/{podNamespace}/{podID}").
To(s.getPortForward).
Operation("getPortForward"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}").
To(s.getPortForward).
Operation("getPortForward"))
s.restfulCont.Add(ws)
ws = new(restful.WebService)
ws.
Path("/logs/")
ws.Route(ws.GET("").
To(s.getLogs).
Operation("getLogs"))
ws.Route(ws.GET("/{logpath:*}").
To(s.getLogs).
Operation("getLogs"))
s.restfulCont.Add(ws)
ws = new(restful.WebService)
ws.
Path("/containerLogs")
ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
To(s.getContainerLogs).
Operation("getContainerLogs"))
s.restfulCont.Add(ws)
configz.InstallHandler(s.restfulCont)
handlePprofEndpoint := func(req *restful.Request, resp *restful.Response) {
name := strings.TrimPrefix(req.Request.URL.Path, pprofBasePath)
switch name {
case "profile":
pprof.Profile(resp, req.Request)
case "symbol":
pprof.Symbol(resp, req.Request)
case "cmdline":
pprof.Cmdline(resp, req.Request)
default:
pprof.Index(resp, req.Request)
}
}
// Setup pporf handlers.
ws = new(restful.WebService).Path(pprofBasePath)
ws.Route(ws.GET("/{subpath:*}").To(func(req *restful.Request, resp *restful.Response) {
handlePprofEndpoint(req, resp)
})).Doc("pprof endpoint")
s.restfulCont.Add(ws)
// The /runningpods endpoint is used for testing only.
ws = new(restful.WebService)
ws.
Path("/runningpods/").
Produces(restful.MIME_JSON)
ws.Route(ws.GET("").
To(s.getRunningPods).
Operation("getRunningPods"))
s.restfulCont.Add(ws)
}
type httpHandler struct {
f func(w http.ResponseWriter, r *http.Request)
}
func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.f(w, r)
}
// Checks if kubelet's sync loop that updates containers is working.
func (s *Server) syncLoopHealthCheck(req *http.Request) error {
duration := s.host.ResyncInterval() * 2
minDuration := time.Minute * 5
if duration < minDuration {
duration = minDuration
}
enterLoopTime := s.host.LatestLoopEntryTime()
if !enterLoopTime.IsZero() && time.Now().After(enterLoopTime.Add(duration)) {
return fmt.Errorf("Sync Loop took longer than expected.")
}
return nil
}
// Checks if pleg, which lists pods periodically, is healthy.
func (s *Server) plegHealthCheck(req *http.Request) error {
if ok, err := s.host.PLEGHealthCheck(); !ok {
return fmt.Errorf("PLEG took longer than expected: %v", err)
}
return nil
}
// getContainerLogs handles containerLogs request against the Kubelet
func (s *Server) getContainerLogs(request *restful.Request, response *restful.Response) {
podNamespace := request.PathParameter("podNamespace")
podID := request.PathParameter("podID")
containerName := request.PathParameter("containerName")
if len(podID) == 0 {
// TODO: Why return JSON when the rest return plaintext errors?
// TODO: Why return plaintext errors?
response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing podID."}`))
return
}
if len(containerName) == 0 {
// TODO: Why return JSON when the rest return plaintext errors?
response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing container name."}`))
return
}
if len(podNamespace) == 0 {
// TODO: Why return JSON when the rest return plaintext errors?
response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing podNamespace."}`))
return
}
query := request.Request.URL.Query()
// backwards compatibility for the "tail" query parameter
if tail := request.QueryParameter("tail"); len(tail) > 0 {
query["tailLines"] = []string{tail}
// "all" is the same as omitting tail
if tail == "all" {
delete(query, "tailLines")
}
}
// container logs on the kubelet are locked to the v1 API version of PodLogOptions
logOptions := &api.PodLogOptions{}
if err := api.ParameterCodec.DecodeParameters(query, v1.SchemeGroupVersion, logOptions); err != nil {
response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Unable to decode query."}`))
return
}
logOptions.TypeMeta = unversioned.TypeMeta{}
if errs := validation.ValidatePodLogOptions(logOptions); len(errs) > 0 {
response.WriteError(apierrs.StatusUnprocessableEntity, fmt.Errorf(`{"message": "Invalid request."}`))
return
}
pod, ok := s.host.GetPodByName(podNamespace, podID)
if !ok {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod %q does not exist\n", podID))
return
}
// Check if containerName is valid.
containerExists := false
for _, container := range pod.Spec.Containers {
if container.Name == containerName {
containerExists = true
}
}
if !containerExists {
for _, container := range pod.Spec.InitContainers {
if container.Name == containerName {
containerExists = true
}
}
}
if !containerExists {
response.WriteError(http.StatusNotFound, fmt.Errorf("container %q not found in pod %q\n", containerName, podID))
return
}
if _, ok := response.ResponseWriter.(http.Flusher); !ok {
response.WriteError(http.StatusInternalServerError, fmt.Errorf("unable to convert %v into http.Flusher, cannot show logs\n", reflect.TypeOf(response)))
return
}
fw := flushwriter.Wrap(response.ResponseWriter)
if logOptions.LimitBytes != nil {
fw = limitwriter.New(fw, *logOptions.LimitBytes)
}
response.Header().Set("Transfer-Encoding", "chunked")
if err := s.host.GetKubeletContainerLogs(kubecontainer.GetPodFullName(pod), containerName, logOptions, fw, fw); err != nil {
if err != limitwriter.ErrMaximumWrite {
response.WriteError(http.StatusBadRequest, err)
}
return
}
}
// encodePods creates an api.PodList object from pods and returns the encoded
// PodList.
func encodePods(pods []*api.Pod) (data []byte, err error) {
podList := new(api.PodList)
for _, pod := range pods {
podList.Items = append(podList.Items, *pod)
}
// TODO: this needs to be parameterized to the kubelet, not hardcoded. Depends on Kubelet
// as API server refactor.
// TODO: Locked to v1, needs to be made generic
codec := api.Codecs.LegacyCodec(unversioned.GroupVersion{Group: api.GroupName, Version: "v1"})
return runtime.Encode(codec, podList)
}
// getPods returns a list of pods bound to the Kubelet and their spec.
func (s *Server) getPods(request *restful.Request, response *restful.Response) {
pods := s.host.GetPods()
data, err := encodePods(pods)
if err != nil {
response.WriteError(http.StatusInternalServerError, err)
return
}
writeJsonResponse(response, data)
}
// getRunningPods returns a list of pods running on Kubelet. The list is
// provided by the container runtime, and is different from the list returned
// by getPods, which is a set of desired pods to run.
func (s *Server) getRunningPods(request *restful.Request, response *restful.Response) {
pods, err := s.host.GetRunningPods()
if err != nil {
response.WriteError(http.StatusInternalServerError, err)
return
}
data, err := encodePods(pods)
if err != nil {
response.WriteError(http.StatusInternalServerError, err)
return
}
writeJsonResponse(response, data)
}
// getLogs handles logs requests against the Kubelet.
func (s *Server) getLogs(request *restful.Request, response *restful.Response) {
s.host.ServeLogs(response, request.Request)
}
// getSpec handles spec requests against the Kubelet.
func (s *Server) getSpec(request *restful.Request, response *restful.Response) {
info, err := s.host.GetCachedMachineInfo()
if err != nil {
response.WriteError(http.StatusInternalServerError, err)
return
}
response.WriteEntity(info)
}
func getContainerCoordinates(request *restful.Request) (namespace, pod string, uid types.UID, container string) {
namespace = request.PathParameter("podNamespace")
pod = request.PathParameter("podID")
if uidStr := request.PathParameter("uid"); uidStr != "" {
uid = types.UID(uidStr)
}
container = request.PathParameter("containerName")
return
}
// getAttach handles requests to attach to a container.
func (s *Server) getAttach(request *restful.Request, response *restful.Response) {
podNamespace, podID, uid, container := getContainerCoordinates(request)
pod, ok := s.host.GetPodByName(podNamespace, podID)
if !ok {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
return
}
remotecommand.ServeAttach(response.ResponseWriter,
request.Request,
s.host,
kubecontainer.GetPodFullName(pod),
uid,
container,
s.host.StreamingConnectionIdleTimeout(),
remotecommand.DefaultStreamCreationTimeout,
remotecommand.SupportedStreamingProtocols)
}
// getExec handles requests to run a command inside a container.
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
podNamespace, podID, uid, container := getContainerCoordinates(request)
pod, ok := s.host.GetPodByName(podNamespace, podID)
if !ok {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
return
}
remotecommand.ServeExec(response.ResponseWriter,
request.Request,
s.host,
kubecontainer.GetPodFullName(pod),
uid,
container,
s.host.StreamingConnectionIdleTimeout(),
remotecommand.DefaultStreamCreationTimeout,
remotecommand.SupportedStreamingProtocols)
}
// getRun handles requests to run a command inside a container.
func (s *Server) getRun(request *restful.Request, response *restful.Response) {
podNamespace, podID, uid, container := getContainerCoordinates(request)
pod, ok := s.host.GetPodByName(podNamespace, podID)
if !ok {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
return
}
command := strings.Split(request.QueryParameter("cmd"), " ")
data, err := s.host.RunInContainer(kubecontainer.GetPodFullName(pod), uid, container, command)
if err != nil {
response.WriteError(http.StatusInternalServerError, err)
return
}
writeJsonResponse(response, data)
}
func getPodCoordinates(request *restful.Request) (namespace, pod string, uid types.UID) {
namespace = request.PathParameter("podNamespace")
pod = request.PathParameter("podID")
if uidStr := request.PathParameter("uid"); uidStr != "" {
uid = types.UID(uidStr)
}
return
}
// Derived from go-restful writeJSON.
func writeJsonResponse(response *restful.Response, data []byte) {
if data == nil {
response.WriteHeader(http.StatusOK)
// do not write a nil representation
return
}
response.Header().Set(restful.HEADER_ContentType, restful.MIME_JSON)
response.WriteHeader(http.StatusOK)
if _, err := response.Write(data); err != nil {
glog.Errorf("Error writing response: %v", err)
}
}
// PortForwarder knows how to forward content from a data stream to/from a port
// in a pod.
type PortForwarder interface {
// PortForwarder copies data between a data stream and a port in a pod.
PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error
}
// getPortForward handles a new restful port forward request. It determines the
// pod name and uid and then calls ServePortForward.
func (s *Server) getPortForward(request *restful.Request, response *restful.Response) {
podNamespace, podID, uid := getPodCoordinates(request)
pod, ok := s.host.GetPodByName(podNamespace, podID)
if !ok {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
return
}
podName := kubecontainer.GetPodFullName(pod)
ServePortForward(response.ResponseWriter, request.Request, s.host, podName, uid, s.host.StreamingConnectionIdleTimeout(), remotecommand.DefaultStreamCreationTimeout)
}
// ServePortForward handles a port forwarding request. A single request is
// kept alive as long as the client is still alive and the connection has not
// been timed out due to idleness. This function handles multiple forwarded
// connections; i.e., multiple `curl http://localhost:8888/` requests will be
// handled by a single invocation of ServePortForward.
func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, uid types.UID, idleTimeout time.Duration, streamCreationTimeout time.Duration) {
supportedPortForwardProtocols := []string{portforward.PortForwardProtocolV1Name}
_, err := httpstream.Handshake(req, w, supportedPortForwardProtocols)
// negotiated protocol isn't currently used server side, but could be in the future
if err != nil {
// Handshake writes the error to the client
utilruntime.HandleError(err)
return
}
streamChan := make(chan httpstream.Stream, 1)
glog.V(5).Infof("Upgrading port forward response")
upgrader := spdy.NewResponseUpgrader()
conn := upgrader.UpgradeResponse(w, req, portForwardStreamReceived(streamChan))
if conn == nil {
return
}
defer conn.Close()
glog.V(5).Infof("(conn=%p) setting port forwarding streaming connection idle timeout to %v", conn, idleTimeout)
conn.SetIdleTimeout(idleTimeout)
h := &portForwardStreamHandler{
conn: conn,
streamChan: streamChan,
streamPairs: make(map[string]*portForwardStreamPair),
streamCreationTimeout: streamCreationTimeout,
pod: podName,
uid: uid,
forwarder: portForwarder,
}
h.run()
}
// portForwardStreamReceived is the httpstream.NewStreamHandler for port
// forward streams. It checks each stream's port and stream type headers,
// rejecting any streams that with missing or invalid values. Each valid
// stream is sent to the streams channel.
func portForwardStreamReceived(streams chan httpstream.Stream) func(httpstream.Stream, <-chan struct{}) error {
return func(stream httpstream.Stream, replySent <-chan struct{}) error {
// make sure it has a valid port header
portString := stream.Headers().Get(api.PortHeader)
if len(portString) == 0 {
return fmt.Errorf("%q header is required", api.PortHeader)
}
port, err := strconv.ParseUint(portString, 10, 16)
if err != nil {
return fmt.Errorf("unable to parse %q as a port: %v", portString, err)
}
if port < 1 {
return fmt.Errorf("port %q must be > 0", portString)
}
// make sure it has a valid stream type header
streamType := stream.Headers().Get(api.StreamType)
if len(streamType) == 0 {
return fmt.Errorf("%q header is required", api.StreamType)
}
if streamType != api.StreamTypeError && streamType != api.StreamTypeData {
return fmt.Errorf("invalid stream type %q", streamType)
}
streams <- stream
return nil
}
}
// portForwardStreamHandler is capable of processing multiple port forward
// requests over a single httpstream.Connection.
type portForwardStreamHandler struct {
conn httpstream.Connection
streamChan chan httpstream.Stream
streamPairsLock sync.RWMutex
streamPairs map[string]*portForwardStreamPair
streamCreationTimeout time.Duration
pod string
uid types.UID
forwarder PortForwarder
}
// getStreamPair returns a portForwardStreamPair for requestID. This creates a
// new pair if one does not yet exist for the requestID. The returned bool is
// true if the pair was created.
func (h *portForwardStreamHandler) getStreamPair(requestID string) (*portForwardStreamPair, bool) {
h.streamPairsLock.Lock()
defer h.streamPairsLock.Unlock()
if p, ok := h.streamPairs[requestID]; ok {
glog.V(5).Infof("(conn=%p, request=%s) found existing stream pair", h.conn, requestID)
return p, false
}
glog.V(5).Infof("(conn=%p, request=%s) creating new stream pair", h.conn, requestID)
p := newPortForwardPair(requestID)
h.streamPairs[requestID] = p
return p, true
}
// monitorStreamPair waits for the pair to receive both its error and data
// streams, or for the timeout to expire (whichever happens first), and then
// removes the pair.
func (h *portForwardStreamHandler) monitorStreamPair(p *portForwardStreamPair, timeout <-chan time.Time) {
select {
case <-timeout:
err := fmt.Errorf("(conn=%v, request=%s) timed out waiting for streams", h.conn, p.requestID)
utilruntime.HandleError(err)
p.printError(err.Error())
case <-p.complete:
glog.V(5).Infof("(conn=%v, request=%s) successfully received error and data streams", h.conn, p.requestID)
}
h.removeStreamPair(p.requestID)
}
// hasStreamPair returns a bool indicating if a stream pair for requestID
// exists.
func (h *portForwardStreamHandler) hasStreamPair(requestID string) bool {
h.streamPairsLock.RLock()
defer h.streamPairsLock.RUnlock()
_, ok := h.streamPairs[requestID]
return ok
}
// removeStreamPair removes the stream pair identified by requestID from streamPairs.
func (h *portForwardStreamHandler) removeStreamPair(requestID string) {
h.streamPairsLock.Lock()
defer h.streamPairsLock.Unlock()
delete(h.streamPairs, requestID)
}
// requestID returns the request id for stream.
func (h *portForwardStreamHandler) requestID(stream httpstream.Stream) string {
requestID := stream.Headers().Get(api.PortForwardRequestIDHeader)
if len(requestID) == 0 {
glog.V(5).Infof("(conn=%p) stream received without %s header", h.conn, api.PortForwardRequestIDHeader)
// If we get here, it's because the connection came from an older client
// that isn't generating the request id header
// (https://github.com/kubernetes/kubernetes/blob/843134885e7e0b360eb5441e85b1410a8b1a7a0c/pkg/client/unversioned/portforward/portforward.go#L258-L287)
//
// This is a best-effort attempt at supporting older clients.
//
// When there aren't concurrent new forwarded connections, each connection
// will have a pair of streams (data, error), and the stream IDs will be
// consecutive odd numbers, e.g. 1 and 3 for the first connection. Convert
// the stream ID into a pseudo-request id by taking the stream type and
// using id = stream.Identifier() when the stream type is error,
// and id = stream.Identifier() - 2 when it's data.
//
// NOTE: this only works when there are not concurrent new streams from
// multiple forwarded connections; it's a best-effort attempt at supporting
// old clients that don't generate request ids. If there are concurrent
// new connections, it's possible that 1 connection gets streams whose IDs
// are not consecutive (e.g. 5 and 9 instead of 5 and 7).
streamType := stream.Headers().Get(api.StreamType)
switch streamType {
case api.StreamTypeError:
requestID = strconv.Itoa(int(stream.Identifier()))
case api.StreamTypeData:
requestID = strconv.Itoa(int(stream.Identifier()) - 2)
}
glog.V(5).Infof("(conn=%p) automatically assigning request ID=%q from stream type=%s, stream ID=%d", h.conn, requestID, streamType, stream.Identifier())
}
return requestID
}
// run is the main loop for the portForwardStreamHandler. It processes new
// streams, invoking portForward for each complete stream pair. The loop exits
// when the httpstream.Connection is closed.
func (h *portForwardStreamHandler) run() {
glog.V(5).Infof("(conn=%p) waiting for port forward streams", h.conn)
Loop:
for {
select {
case <-h.conn.CloseChan():
glog.V(5).Infof("(conn=%p) upgraded connection closed", h.conn)
break Loop
case stream := <-h.streamChan:
requestID := h.requestID(stream)
streamType := stream.Headers().Get(api.StreamType)
glog.V(5).Infof("(conn=%p, request=%s) received new stream of type %s", h.conn, requestID, streamType)
p, created := h.getStreamPair(requestID)
if created {
go h.monitorStreamPair(p, time.After(h.streamCreationTimeout))
}
if complete, err := p.add(stream); err != nil {
msg := fmt.Sprintf("error processing stream for request %s: %v", requestID, err)
utilruntime.HandleError(errors.New(msg))
p.printError(msg)
} else if complete {
go h.portForward(p)
}
}
}
}
// portForward invokes the portForwardStreamHandler's forwarder.PortForward
// function for the given stream pair.
func (h *portForwardStreamHandler) portForward(p *portForwardStreamPair) {
defer p.dataStream.Close()
defer p.errorStream.Close()
portString := p.dataStream.Headers().Get(api.PortHeader)
port, _ := strconv.ParseUint(portString, 10, 16)
glog.V(5).Infof("(conn=%p, request=%s) invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
err := h.forwarder.PortForward(h.pod, h.uid, uint16(port), p.dataStream)
glog.V(5).Infof("(conn=%p, request=%s) done invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
if err != nil {
msg := fmt.Errorf("error forwarding port %d to pod %s, uid %v: %v", port, h.pod, h.uid, err)
utilruntime.HandleError(msg)
fmt.Fprint(p.errorStream, msg.Error())
}
}
// portForwardStreamPair represents the error and data streams for a port
// forwarding request.
type portForwardStreamPair struct {
lock sync.RWMutex
requestID string
dataStream httpstream.Stream
errorStream httpstream.Stream
complete chan struct{}
}
// newPortForwardPair creates a new portForwardStreamPair.
func newPortForwardPair(requestID string) *portForwardStreamPair {
return &portForwardStreamPair{
requestID: requestID,
complete: make(chan struct{}),
}
}
// add adds the stream to the portForwardStreamPair. If the pair already
// contains a stream for the new stream's type, an error is returned. add
// returns true if both the data and error streams for this pair have been
// received.
func (p *portForwardStreamPair) add(stream httpstream.Stream) (bool, error) {
p.lock.Lock()
defer p.lock.Unlock()
switch stream.Headers().Get(api.StreamType) {
case api.StreamTypeError:
if p.errorStream != nil {
return false, errors.New("error stream already assigned")
}
p.errorStream = stream
case api.StreamTypeData:
if p.dataStream != nil {
return false, errors.New("data stream already assigned")
}
p.dataStream = stream
}
complete := p.errorStream != nil && p.dataStream != nil
if complete {
close(p.complete)
}
return complete, nil
}
// printError writes s to p.errorStream if p.errorStream has been set.
func (p *portForwardStreamPair) printError(s string) {
p.lock.RLock()
defer p.lock.RUnlock()
if p.errorStream != nil {
fmt.Fprint(p.errorStream, s)
}
}
// ServeHTTP responds to HTTP requests on the Kubelet.
func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
defer httplog.NewLogged(req, &w).StacktraceWhen(
httplog.StatusIsNot(
http.StatusOK,
http.StatusMovedPermanently,
http.StatusTemporaryRedirect,
http.StatusNotFound,
http.StatusSwitchingProtocols,
),
).Log()
s.restfulCont.ServeHTTP(w, req)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/meoom/kubernetes.git
git@gitee.com:meoom/kubernetes.git
meoom
kubernetes
kubernetes
v1.3.10

搜索帮助

344bd9b3 5694891 D2dac590 5694891