3 Star 0 Fork 0

GiteeStudio / codis

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
main.go 8.18 KB
一键复制 编辑 原始数据 按行查看 历史
江二十三 提交于 2019-11-04 14:34 . use go mod
// Copyright 2016 CodisLabs. All Rights Reserved.
// Licensed under the MIT (MIT-LICENSE.txt) license.
package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/http/httputil"
"net/url"
"os"
"path/filepath"
"runtime"
"sort"
"strconv"
"sync"
"time"
"github.com/docopt/docopt-go"
"github.com/go-martini/martini"
"github.com/martini-contrib/render"
"gitee.com/oscstudio/codis/pkg/models"
"gitee.com/oscstudio/codis/pkg/utils"
"gitee.com/oscstudio/codis/pkg/utils/errors"
"gitee.com/oscstudio/codis/pkg/utils/log"
"gitee.com/oscstudio/codis/pkg/utils/rpc"
"gitee.com/oscstudio/codis/pkg/utils/sync2/atomic2"
)
var roundTripper http.RoundTripper
func init() {
var dials atomic2.Int64
tr := &http.Transport{}
tr.Dial = func(network, addr string) (net.Conn, error) {
c, err := net.DialTimeout(network, addr, time.Second*10)
if err == nil {
log.Debugf("rpc: dial new connection to [%d] %s - %s",
dials.Incr()-1, network, addr)
}
return c, err
}
go func() {
for {
time.Sleep(time.Minute)
tr.CloseIdleConnections()
}
}()
roundTripper = tr
}
func main() {
const usage = `
Usage:
codis-fe [--ncpu=N] [--log=FILE] [--log-level=LEVEL] [--assets-dir=PATH] [--pidfile=FILE] (--dashboard-list=FILE|--zookeeper=ADDR [--zookeeper-auth=USR:PWD]|--etcd=ADDR [--etcd-auth=USR:PWD]|--filesystem=ROOT) --listen=ADDR
codis-fe --version
Options:
--ncpu=N set runtime.GOMAXPROCS to N, default is runtime.NumCPU().
-d FILE, --dashboard-list=FILE set list of dashboard, can be generated by codis-admin.
-l FILE, --log=FILE set path/name of daliy rotated log file.
--log-level=LEVEL set the log-level, should be INFO,WARN,DEBUG or ERROR, default is INFO.
--listen=ADDR set the listen address.
`
d, err := docopt.Parse(usage, nil, true, "", false)
if err != nil {
log.PanicError(err, "parse arguments failed")
}
if d["--version"].(bool) {
fmt.Println("version:", utils.Version)
fmt.Println("compile:", utils.Compile)
return
}
if s, ok := utils.Argument(d, "--log"); ok {
w, err := log.NewRollingFile(s, log.DailyRolling)
if err != nil {
log.PanicErrorf(err, "open log file %s failed", s)
} else {
log.StdLog = log.New(w, "")
}
}
log.SetLevel(log.LevelInfo)
if s, ok := utils.Argument(d, "--log-level"); ok {
if !log.SetLevelString(s) {
log.Panicf("option --log-level = %s", s)
}
}
if n, ok := utils.ArgumentInteger(d, "--ncpu"); ok {
runtime.GOMAXPROCS(n)
} else {
runtime.GOMAXPROCS(runtime.NumCPU())
}
log.Warnf("set ncpu = %d", runtime.GOMAXPROCS(0))
listen := utils.ArgumentMust(d, "--listen")
log.Warnf("set listen = %s", listen)
var assets string
if s, ok := utils.Argument(d, "--assets-dir"); ok {
abspath, err := filepath.Abs(s)
if err != nil {
log.PanicErrorf(err, "get absolute path of %s failed", s)
}
assets = abspath
} else {
binpath, err := filepath.Abs(filepath.Dir(os.Args[0]))
if err != nil {
log.PanicErrorf(err, "get path of binary failed")
}
assets = filepath.Join(binpath, "assets")
}
log.Warnf("set assets = %s", assets)
indexFile := filepath.Join(assets, "index.html")
if _, err := os.Stat(indexFile); err != nil {
log.PanicErrorf(err, "get stat of %s failed", indexFile)
}
var loader ConfigLoader
if d["--dashboard-list"] != nil {
file := utils.ArgumentMust(d, "--dashboard-list")
loader = &StaticLoader{file}
log.Warnf("set --dashboard-list = %s", file)
} else {
var coordinator struct {
name string
addr string
auth string
}
switch {
case d["--zookeeper"] != nil:
coordinator.name = "zookeeper"
coordinator.addr = utils.ArgumentMust(d, "--zookeeper")
if d["--zookeeper-auth"] != nil {
coordinator.auth = utils.ArgumentMust(d, "--zookeeper-auth")
}
case d["--etcd"] != nil:
coordinator.name = "etcd"
coordinator.addr = utils.ArgumentMust(d, "--etcd")
if d["--etcd-auth"] != nil {
coordinator.auth = utils.ArgumentMust(d, "--etcd-auth")
}
case d["--filesystem"] != nil:
coordinator.name = "filesystem"
coordinator.addr = utils.ArgumentMust(d, "--filesystem")
default:
log.Panicf("invalid coordinator")
}
log.Warnf("set --%s = %s", coordinator.name, coordinator.addr)
c, err := models.NewClient(coordinator.name, coordinator.addr, coordinator.auth, time.Minute)
if err != nil {
log.PanicErrorf(err, "create '%s' client to '%s' failed", coordinator.name, coordinator.addr)
}
defer c.Close()
loader = &DynamicLoader{c}
}
router := NewReverseProxy(loader)
m := martini.New()
m.Use(martini.Recovery())
m.Use(render.Renderer())
m.Use(martini.Static(assets, martini.StaticOptions{SkipLogging: true}))
r := martini.NewRouter()
r.Get("/list", func() (int, string) {
names := router.GetNames()
sort.Sort(sort.StringSlice(names))
return rpc.ApiResponseJson(names)
})
r.Any("/**", func(w http.ResponseWriter, req *http.Request) {
name := req.URL.Query().Get("forward")
if p := router.GetProxy(name); p != nil {
p.ServeHTTP(w, req)
} else {
w.WriteHeader(http.StatusForbidden)
}
})
m.MapTo(r, (*martini.Routes)(nil))
m.Action(r.Handle)
l, err := net.Listen("tcp", listen)
if err != nil {
log.PanicErrorf(err, "listen %s failed", listen)
}
defer l.Close()
if s, ok := utils.Argument(d, "--pidfile"); ok {
if pidfile, err := filepath.Abs(s); err != nil {
log.WarnErrorf(err, "parse pidfile = '%s' failed", s)
} else if err := ioutil.WriteFile(pidfile, []byte(strconv.Itoa(os.Getpid())), 0644); err != nil {
log.WarnErrorf(err, "write pidfile = '%s' failed", pidfile)
} else {
defer func() {
if err := os.Remove(pidfile); err != nil {
log.WarnErrorf(err, "remove pidfile = '%s' failed", pidfile)
}
}()
log.Warnf("option --pidfile = %s", pidfile)
}
}
h := http.NewServeMux()
h.Handle("/", m)
hs := &http.Server{Handler: h}
if err := hs.Serve(l); err != nil {
log.PanicErrorf(err, "serve %s failed", listen)
}
}
type ConfigLoader interface {
Reload() (map[string]string, error)
}
type StaticLoader struct {
path string
}
func (l *StaticLoader) Reload() (map[string]string, error) {
b, err := ioutil.ReadFile(l.path)
if err != nil {
return nil, errors.Trace(err)
}
var list []*struct {
Name string `json:"name"`
Dashboard string `json:"dashboard"`
}
if err := json.Unmarshal(b, &list); err != nil {
return nil, errors.Trace(err)
}
var m = make(map[string]string)
for _, e := range list {
m[e.Name] = e.Dashboard
}
return m, nil
}
type DynamicLoader struct {
client models.Client
}
func (l *DynamicLoader) Reload() (map[string]string, error) {
var m = make(map[string]string)
list, err := l.client.List(models.CodisDir, false)
if err != nil {
return nil, errors.Trace(err)
}
for _, path := range list {
product := filepath.Base(path)
if b, err := l.client.Read(models.LockPath(product), false); err != nil {
log.WarnErrorf(err, "read topom of product %s failed", product)
} else if b != nil {
var t = &models.Topom{}
if err := json.Unmarshal(b, t); err != nil {
log.WarnErrorf(err, "decode json failed")
} else {
m[product] = t.AdminAddr
}
}
}
return m, nil
}
type ReverseProxy struct {
sync.Mutex
loadAt time.Time
loader ConfigLoader
routes map[string]*httputil.ReverseProxy
}
func NewReverseProxy(loader ConfigLoader) *ReverseProxy {
r := &ReverseProxy{}
r.loader = loader
r.routes = make(map[string]*httputil.ReverseProxy)
return r
}
func (r *ReverseProxy) reload(d time.Duration) {
if time.Now().Sub(r.loadAt) < d {
return
}
r.routes = make(map[string]*httputil.ReverseProxy)
if m, err := r.loader.Reload(); err != nil {
log.WarnErrorf(err, "reload reverse proxy failed")
} else {
for name, host := range m {
if name == "" || host == "" {
continue
}
u := &url.URL{Scheme: "http", Host: host}
p := httputil.NewSingleHostReverseProxy(u)
p.Transport = roundTripper
r.routes[name] = p
}
}
r.loadAt = time.Now()
}
func (r *ReverseProxy) GetProxy(name string) *httputil.ReverseProxy {
r.Lock()
defer r.Unlock()
return r.routes[name]
}
func (r *ReverseProxy) GetNames() []string {
r.Lock()
defer r.Unlock()
r.reload(time.Second * 5)
var names []string
for name, _ := range r.routes {
names = append(names, name)
}
return names
}
1
https://gitee.com/oscstudio/codis.git
git@gitee.com:oscstudio/codis.git
oscstudio
codis
codis
bd29e9d9901b

搜索帮助