1 Star 0 Fork 2

何吕 / volantmq

forked from JUMEI_ARCH / volantmq 
Create your Gitee Account
Explore and code with more than 8 million developers,Free private repositories !:)
Sign up
Clone or Download
admin.go 7.15 KB
Copy Edit Raw Blame History
hawklin authored 2018-06-20 17:49 . bugfix
package admin
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
"github.com/VolantMQ/volantmq/admin/config"
"github.com/VolantMQ/volantmq/auth"
"github.com/VolantMQ/volantmq/configuration"
"github.com/VolantMQ/volantmq/trace"
"go.uber.org/zap"
)
type HttpsAdmin struct {
*adminConfig.AdminConfig
log *zap.Logger
server *http.Server
}
func (a *HttpsAdmin) listClients(w http.ResponseWriter, req *http.Request) {
handle := func(id string, remote string, clean bool, user string, create string, topics map[string]byte) {
if topics != nil {
fmt.Fprintf(w, "Client(id=%s, user=%s, remote=%s, clean=%t, create=%s, subscribe=%v)\n", id, user, remote, clean, create, topics)
} else {
fmt.Fprintf(w, "Client(id=%s, user=%s, remote=%s, clean=%t, create=%s)\n", id, user, remote, clean, create)
}
}
err := req.ParseForm()
if err != nil {
a.log.Error("parse url fail", zap.Error(err))
io.WriteString(w, "error:"+err.Error()+"\n")
return
}
if req.Form.Get("id") != "" {
err = a.SessionsMgr.ListOneClient(req.Form.Get("id"), handle)
} else {
err = a.SessionsMgr.ListAllClients(handle)
}
if err != nil {
a.log.Error("list client fail", zap.Error(err))
io.WriteString(w, "error:"+err.Error()+"\n")
return
}
return
}
func (a *HttpsAdmin) kickClients(w http.ResponseWriter, req *http.Request) {
err := req.ParseForm()
if err != nil {
a.log.Error("parse url fail", zap.Error(err))
io.WriteString(w, "error:"+err.Error()+"\n")
return
}
if req.Form.Get("id") == "" {
a.log.Error("miss id param")
io.WriteString(w, "error:miss id param\n")
return
}
err = a.SessionsMgr.KickOff(req.Form.Get("id"))
if err != nil {
a.log.Error("kick client fail", zap.String("clientID", req.Form.Get("id")), zap.Error(err))
io.WriteString(w, "error:"+err.Error()+"\n")
return
}
io.WriteString(w, "success\n")
return
}
func (a *HttpsAdmin) listUsers(w http.ResponseWriter, req *http.Request) {
handle := func(user string, pwd string, status string, topic string, access string) {
fmt.Fprintf(w, "User(name=%s, password=%s, topic=%s, status=%s, access=%s)\n", user, pwd, topic, status, access)
}
err := a.AuthMgr.ListUsers(handle)
if err != nil {
a.log.Error("list user fail", zap.Error(err))
io.WriteString(w, "error:"+err.Error()+"\n")
return
}
return
}
func (a *HttpsAdmin) addUsers(w http.ResponseWriter, req *http.Request) {
if req.Method != "POST" {
a.log.Error("method unsupport", zap.String("method", req.Method))
io.WriteString(w, "error:only support POST method\n")
return
}
body, err := ioutil.ReadAll(req.Body)
if err != nil {
a.log.Error("read data fail", zap.Error(err))
io.WriteString(w, "error:"+err.Error()+"\n")
return
}
param := &auth.UserParam{}
err = json.Unmarshal(body, param)
if err != nil {
a.log.Error("unmarshal param fail", zap.Error(err))
io.WriteString(w, "error:"+err.Error()+"\n")
return
}
if param.User == "" ||
param.Password == "" ||
param.Status == "" ||
param.Topic == "" ||
param.Access == "" {
a.log.Error("miss param", zap.Any("param", param))
fmt.Fprintf(w, "error:miss param(%s,%s,%s,%s,%s)\n", param.User, param.Password, param.Status, param.Topic, param.Access)
return
}
err = a.AuthMgr.AddUser(param)
if err != nil {
a.log.Error("add user fail", zap.Any("param", param), zap.Error(err))
io.WriteString(w, "error:"+err.Error()+"\n")
return
}
io.WriteString(w, "success\n")
return
}
func (a *HttpsAdmin) delUsers(w http.ResponseWriter, req *http.Request) {
err := req.ParseForm()
if err != nil {
a.log.Error("parse url fail", zap.Error(err))
io.WriteString(w, "error:"+err.Error()+"\n")
return
}
if req.Form.Get("id") == "" {
a.log.Error("miss id param")
io.WriteString(w, "error:miss id param\n")
return
}
err = a.AuthMgr.DelUser(req.Form.Get("id"))
if err != nil {
a.log.Error("del user fail", zap.String("clientID", req.Form.Get("id")), zap.Error(err))
io.WriteString(w, "error:"+err.Error()+"\n")
return
}
io.WriteString(w, "success\n")
return
}
func (a *HttpsAdmin) traceLog(w http.ResponseWriter, req *http.Request) {
err := req.ParseForm()
if err != nil {
io.WriteString(w, "error:"+err.Error()+"\n")
return
}
if req.Form.Get("onoff") == "" &&
req.Form.Get("id") == "" &&
req.Form.Get("topic") == "" {
a.log.Error("miss onoff param")
io.WriteString(w, "error:miss onoff param\n")
return
}
if req.Form.Get("onoff") == "false" {
trace.GetInstance().SetOff()
} else if req.Form.Get("id") != "" {
trace.GetInstance().SetClientID(req.Form.Get("id"))
} else if req.Form.Get("topic") != "" {
trace.GetInstance().SetTopic(req.Form.Get("topic"))
} else {
io.WriteString(w, "invalid param\n")
a.log.Error("invalid param", zap.String("onoff", req.Form.Get("onoff")), zap.String("clientID", req.Form.Get("id")), zap.String("topic", req.Form.Get("topic")))
return
}
io.WriteString(w, "success(clientID:"+req.Form.Get("id")+", topic:"+req.Form.Get("topic")+")\n")
return
}
func (a *HttpsAdmin) logOps(w http.ResponseWriter, req *http.Request) {
err := req.ParseForm()
if err != nil {
io.WriteString(w, "error:"+err.Error()+"\n")
return
}
if req.Form.Get("level") == "" {
a.log.Error("miss level param")
io.WriteString(w, "error:miss level param\n")
return
}
switch req.Form.Get("level") {
case "debug":
configuration.SetLevel(zap.DebugLevel)
case "info":
configuration.SetLevel(zap.InfoLevel)
case "warn":
configuration.SetLevel(zap.WarnLevel)
case "error":
configuration.SetLevel(zap.ErrorLevel)
default:
a.log.Error("level unsupport", zap.String("level", req.Form.Get("level")))
io.WriteString(w, "error:level("+req.Form.Get("level")+") unsupport\n")
return
}
io.WriteString(w, "success\n")
return
}
func NewHttpsAdmin(config *adminConfig.AdminConfig) (*HttpsAdmin, error) {
a := &HttpsAdmin{
AdminConfig: config,
log: configuration.GetLogger().Named("admin"),
}
mux := http.NewServeMux()
mux.HandleFunc("/v1/clients/list", a.listClients)
mux.HandleFunc("/v1/clients/kick", a.kickClients)
mux.HandleFunc("/v1/users/list", a.listUsers)
mux.HandleFunc("/v1/users/add", a.addUsers)
mux.HandleFunc("/v1/users/del", a.delUsers)
mux.HandleFunc("/v1/trace", a.traceLog)
mux.HandleFunc("/v1/log", a.logOps)
server := &http.Server{
Addr: a.Addr,
ReadTimeout: 2 * time.Second,
WriteTimeout: 2 * time.Second,
Handler: mux,
}
certBytes, err := ioutil.ReadFile(a.ServerCert)
if err != nil {
return nil, err
}
clientCertPool := x509.NewCertPool()
if ok := clientCertPool.AppendCertsFromPEM(certBytes); !ok {
return nil, errors.New("Unable to add certificate to certificate pool")
}
tlsConfig := &tls.Config{
ClientCAs: clientCertPool,
ClientAuth: tls.RequireAndVerifyClientCert,
MinVersion: tls.VersionTLS10,
}
tlsConfig.BuildNameToCertificate()
server.TLSConfig = tlsConfig
a.server = server
return a, nil
}
func (a *HttpsAdmin) Start(config interface{}) error {
go func(s *http.Server) {
err := s.ListenAndServeTLS(a.ServerCert, a.ServerKey)
if err != nil {
a.log.Error("start admin fail", zap.Error(err))
}
}(a.server)
return nil
}
func (a *HttpsAdmin) Close() error {
return a.server.Close()
}

Comment ( 0 )

Sign in to post a comment

Go
1
https://gitee.com/kaifazhe/volantmq.git
git@gitee.com:kaifazhe/volantmq.git
kaifazhe
volantmq
volantmq
v0.0.4

Search