1 Star 1 Fork 0

hh / etcd

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
server.go 11.86 KB
一键复制 编辑 原始数据 按行查看 历史
package server
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/pprof"
"path"
"strings"
"time"
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
"github.com/coreos/etcd/third_party/github.com/gorilla/mux"
etcdErr "github.com/coreos/etcd/error"
ehttp "github.com/coreos/etcd/http"
"github.com/coreos/etcd/log"
"github.com/coreos/etcd/metrics"
"github.com/coreos/etcd/mod"
uhttp "github.com/coreos/etcd/pkg/http"
"github.com/coreos/etcd/server/v1"
"github.com/coreos/etcd/server/v2"
"github.com/coreos/etcd/store"
_ "github.com/coreos/etcd/store/v2"
)
// This is the default implementation of the Server interface.
type Server struct {
Name string
url string
handler http.Handler
peerServer *PeerServer
registry *Registry
store store.Store
metrics *metrics.Bucket
trace bool
}
// Creates a new Server.
func New(name, url string, peerServer *PeerServer, registry *Registry, store store.Store, mb *metrics.Bucket) *Server {
s := &Server{
Name: name,
url: url,
store: store,
registry: registry,
peerServer: peerServer,
metrics: mb,
}
return s
}
func (s *Server) EnableTracing() {
s.trace = true
}
// The current state of the server in the cluster.
func (s *Server) State() string {
return s.peerServer.RaftServer().State()
}
// The node name of the leader in the cluster.
func (s *Server) Leader() string {
return s.peerServer.RaftServer().Leader()
}
// The current Raft committed index.
func (s *Server) CommitIndex() uint64 {
return s.peerServer.RaftServer().CommitIndex()
}
// The current Raft term.
func (s *Server) Term() uint64 {
return s.peerServer.RaftServer().Term()
}
// The server URL.
func (s *Server) URL() string {
return s.url
}
// PeerHost retrieves the host part of Peer URL for a given node name.
func (s *Server) PeerHost(name string) (string, bool) {
return s.registry.PeerHost(name)
}
// Retrives the Peer URL for a given node name.
func (s *Server) PeerURL(name string) (string, bool) {
return s.registry.PeerURL(name)
}
// ClientURL retrieves the Client URL for a given node name.
func (s *Server) ClientURL(name string) (string, bool) {
return s.registry.ClientURL(name)
}
// Returns a reference to the Store.
func (s *Server) Store() store.Store {
return s.store
}
func (s *Server) SetRegistry(registry *Registry) {
s.registry = registry
}
func (s *Server) SetStore(store store.Store) {
s.store = store
}
func (s *Server) installV1(r *mux.Router) {
s.handleFuncV1(r, "/v1/keys/{key:.*}", v1.GetKeyHandler).Methods("GET", "HEAD")
s.handleFuncV1(r, "/v1/keys/{key:.*}", v1.SetKeyHandler).Methods("POST", "PUT")
s.handleFuncV1(r, "/v1/keys/{key:.*}", v1.DeleteKeyHandler).Methods("DELETE")
s.handleFuncV1(r, "/v1/watch/{key:.*}", v1.WatchKeyHandler).Methods("GET", "HEAD", "POST")
s.handleFunc(r, "/v1/leader", s.GetLeaderHandler).Methods("GET", "HEAD")
s.handleFunc(r, "/v1/machines", s.GetPeersHandler).Methods("GET", "HEAD")
s.handleFunc(r, "/v1/peers", s.GetPeersHandler).Methods("GET", "HEAD")
s.handleFunc(r, "/v1/stats/self", s.GetStatsHandler).Methods("GET", "HEAD")
s.handleFunc(r, "/v1/stats/leader", s.GetLeaderStatsHandler).Methods("GET", "HEAD")
s.handleFunc(r, "/v1/stats/store", s.GetStoreStatsHandler).Methods("GET", "HEAD")
}
func (s *Server) installV2(r *mux.Router) {
r2 := mux.NewRouter()
r.PathPrefix("/v2").Handler(ehttp.NewLowerQueryParamsHandler(r2))
s.handleFuncV2(r2, "/v2/keys/{key:.*}", v2.GetHandler).Methods("GET", "HEAD")
s.handleFuncV2(r2, "/v2/keys/{key:.*}", v2.PostHandler).Methods("POST")
s.handleFuncV2(r2, "/v2/keys/{key:.*}", v2.PutHandler).Methods("PUT")
s.handleFuncV2(r2, "/v2/keys/{key:.*}", v2.DeleteHandler).Methods("DELETE")
s.handleFunc(r2, "/v2/leader", s.GetLeaderHandler).Methods("GET", "HEAD")
s.handleFunc(r2, "/v2/machines", s.GetPeersHandler).Methods("GET", "HEAD")
s.handleFunc(r2, "/v2/peers", s.GetPeersHandler).Methods("GET", "HEAD")
s.handleFunc(r2, "/v2/stats/self", s.GetStatsHandler).Methods("GET", "HEAD")
s.handleFunc(r2, "/v2/stats/leader", s.GetLeaderStatsHandler).Methods("GET", "HEAD")
s.handleFunc(r2, "/v2/stats/store", s.GetStoreStatsHandler).Methods("GET", "HEAD")
s.handleFunc(r2, "/v2/speedTest", s.SpeedTestHandler).Methods("GET", "HEAD")
s.handleFunc(r2, "/v2/migration/snapshot", s.SnapshotHandler).Methods("GET")
}
func (s *Server) installMod(r *mux.Router) {
r.PathPrefix("/mod").Handler(http.StripPrefix("/mod", mod.HttpHandler(s.URL())))
}
func (s *Server) installDebug(r *mux.Router) {
s.handleFunc(r, "/debug/metrics", s.GetMetricsHandler).Methods("GET", "HEAD")
r.HandleFunc("/debug/pprof", pprof.Index)
r.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
r.HandleFunc("/debug/pprof/profile", pprof.Profile)
r.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
r.HandleFunc("/debug/pprof/{name}", pprof.Index)
}
// Adds a v1 server handler to the router.
func (s *Server) handleFuncV1(r *mux.Router, path string, f func(http.ResponseWriter, *http.Request, v1.Server) error) *mux.Route {
return s.handleFunc(r, path, func(w http.ResponseWriter, req *http.Request) error {
return f(w, req, s)
})
}
// Adds a v2 server handler to the router.
func (s *Server) handleFuncV2(r *mux.Router, path string, f func(http.ResponseWriter, *http.Request, v2.Server) error) *mux.Route {
return s.handleFunc(r, path, func(w http.ResponseWriter, req *http.Request) error {
return f(w, req, s)
})
}
type HEADResponseWriter struct {
http.ResponseWriter
}
func (w *HEADResponseWriter) Write([]byte) (int, error) {
return 0, nil
}
// Adds a server handler to the router.
func (s *Server) handleFunc(r *mux.Router, path string, f func(http.ResponseWriter, *http.Request) error) *mux.Route {
// Wrap the standard HandleFunc interface to pass in the server reference.
return r.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) {
if req.Method == "HEAD" {
w = &HEADResponseWriter{w}
}
// Log request.
log.Debugf("[recv] %s %s %s [%s]", req.Method, s.URL(), req.URL.Path, req.RemoteAddr)
// Execute handler function and return error if necessary.
if err := f(w, req); err != nil {
if etcdErr, ok := err.(*etcdErr.Error); ok {
log.Debug("Return error: ", (*etcdErr).Error())
w.Header().Set("Content-Type", "application/json")
etcdErr.Write(w)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
})
}
func (s *Server) HTTPHandler() http.Handler {
router := mux.NewRouter()
// Install the routes.
s.handleFunc(router, "/version", s.GetVersionHandler).Methods("GET")
s.installV1(router)
s.installV2(router)
// Mod is deprecated temporariy due to its unstable state.
// It would be added back later.
// s.installMod(router)
if s.trace {
s.installDebug(router)
}
return router
}
// Dispatch command to the current leader
func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error {
ps := s.peerServer
if ps.raftServer.State() == raft.Leader {
result, err := ps.raftServer.Do(c)
if err != nil {
return err
}
if result == nil {
return etcdErr.NewError(300, "Empty result from raft", s.Store().Index())
}
// response for raft related commands[join/remove]
if b, ok := result.([]byte); ok {
w.WriteHeader(http.StatusOK)
w.Write(b)
return nil
}
var b []byte
if strings.HasPrefix(req.URL.Path, "/v1") {
b, _ = json.Marshal(result.(*store.Event).Response(0))
w.WriteHeader(http.StatusOK)
} else {
e, _ := result.(*store.Event)
b, _ = json.Marshal(e)
w.Header().Set("Content-Type", "application/json")
// etcd index should be the same as the event index
// which is also the last modified index of the node
w.Header().Add("X-Etcd-Index", fmt.Sprint(e.Index()))
w.Header().Add("X-Raft-Index", fmt.Sprint(s.CommitIndex()))
w.Header().Add("X-Raft-Term", fmt.Sprint(s.Term()))
if e.IsCreated() {
w.WriteHeader(http.StatusCreated)
} else {
w.WriteHeader(http.StatusOK)
}
}
w.Write(b)
return nil
}
leader := ps.raftServer.Leader()
if leader == "" {
return etcdErr.NewError(300, "", s.Store().Index())
}
var url string
switch c.(type) {
case *JoinCommand, *RemoveCommand,
*SetClusterConfigCommand:
url, _ = ps.registry.PeerURL(leader)
default:
url, _ = ps.registry.ClientURL(leader)
}
uhttp.Redirect(url, w, req)
return nil
}
// Handler to return the current version of etcd.
func (s *Server) GetVersionHandler(w http.ResponseWriter, req *http.Request) error {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "etcd %s", ReleaseVersion)
return nil
}
// Handler to return the current leader's raft address
func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) error {
leader := s.peerServer.RaftServer().Leader()
if leader == "" {
return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", s.Store().Index())
}
w.WriteHeader(http.StatusOK)
url, _ := s.registry.PeerURL(leader)
w.Write([]byte(url))
return nil
}
// Handler to return all the known peers in the current cluster.
func (s *Server) GetPeersHandler(w http.ResponseWriter, req *http.Request) error {
peers := s.registry.ClientURLs(s.peerServer.RaftServer().Leader(), s.Name)
w.WriteHeader(http.StatusOK)
w.Write([]byte(strings.Join(peers, ", ")))
return nil
}
// Retrieves stats on the Raft server.
func (s *Server) GetStatsHandler(w http.ResponseWriter, req *http.Request) error {
w.Header().Set("Content-Type", "application/json")
w.Write(s.peerServer.Stats())
return nil
}
// Retrieves stats on the leader.
func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request) error {
if s.peerServer.RaftServer().State() == raft.Leader {
w.Header().Set("Content-Type", "application/json")
w.Write(s.peerServer.PeerStats())
return nil
}
w.WriteHeader(http.StatusForbidden)
w.Write([]byte("not current leader"))
return nil
}
// Retrieves stats on the leader.
func (s *Server) GetStoreStatsHandler(w http.ResponseWriter, req *http.Request) error {
w.Header().Set("Content-Type", "application/json")
w.Write(s.store.JsonStats())
return nil
}
// Executes a speed test to evaluate the performance of update replication.
func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) error {
count := 1000
c := make(chan bool, count)
for i := 0; i < count; i++ {
go func() {
for j := 0; j < 10; j++ {
c := s.Store().CommandFactory().CreateSetCommand("foo", false, "bar", time.Unix(0, 0))
s.peerServer.RaftServer().Do(c)
}
c <- true
}()
}
for i := 0; i < count; i++ {
<-c
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("speed test success"))
return nil
}
// SnapshotHandler forces etcd store to do a snapshot. If the disk parameter is set, the snapshot
// will be written to disk at data-dir/index-migrate.snap. Or the snapshot will be returned as
// http body.
func (s *Server) SnapshotHandler(w http.ResponseWriter, req *http.Request) error {
data, err := s.Store().Save()
if err != nil {
http.Error(w, "failed to create snapshot: "+err.Error(), http.StatusInternalServerError)
log.Warn("Failed to create snapshot:" + err.Error())
return nil
}
disk := req.FormValue("disk")
if disk == "true" {
name := fmt.Sprintf("%d-migrate.snap", s.peerServer.RaftServer().CommitIndex())
err = ioutil.WriteFile(path.Join(s.peerServer.RaftServer().Path(), name), data, 0600)
if err != nil {
http.Error(w, "failed to save snapshot: "+err.Error(), http.StatusInternalServerError)
log.Warn("server: failed to save snapshot: " + err.Error())
return nil
}
log.Infof("server: saved snapshot file %s successfully", name)
return nil
}
if disk != "" && disk != "false" {
http.Error(w, "invalid parameter: disk="+disk, http.StatusBadRequest)
return nil
}
_, err = w.Write(data)
if err != nil {
log.Warnf("server: failed to write snapshot to %s: %v", req.RemoteAddr, err.Error())
}
return nil
}
// Retrieves metrics from bucket
func (s *Server) GetMetricsHandler(w http.ResponseWriter, req *http.Request) error {
(*s.metrics).Dump(w)
return nil
}
1
https://gitee.com/w1229748769/etcd.git
git@gitee.com:w1229748769/etcd.git
w1229748769
etcd
etcd
v0.4.9

搜索帮助