Ai
1 Star 0 Fork 0

dale/公共包

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
httpsvr.go 16.99 KB
一键复制 编辑 原始数据 按行查看 历史
dale 提交于 2021-11-21 14:58 +08:00 . 增加httpapi
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622
package httpsvr
import (
"bytes"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"io"
"mime"
"mime/multipart"
"net/http"
"os"
"regexp"
"runtime"
"strconv"
"strings"
"time"
"gitee.com/daledi/public/mlog"
"github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/mgo.v2/bson"
)
//Router 自定义http router
type Router struct {
httprouter.Router
serverVer string
accessLog *mlog.Logger
PanicHandler func(*Context)
ResponseHandler func(*Context)
Prefix string
ErrLog *mlog.Logger
// Registry prometheus.Registerer
RequestsInFlight prometheus.Gauge
ConcurrencyLimitExceeded prometheus.Counter
inFlightSem chan struct{}
PrometheusList PrometheusList
}
// type ResponseWriter interface {
// http.ResponseWriter
//
// Status() int
// // Returns the number of bytes already written into the response http body.
// // See Written()
// Size() int
// // Writes the string into the response body.
// WriteString(string) (int, error)
//
// // Returns true if the response body was already written.
// Written() bool
//
// // Forces to write the http header (status code + headers).
// WriteHeader()
// }
//ResponseWriter 增加status字段
type ResponseWriter struct {
http.ResponseWriter
statusCode int
size int
}
//Context http会话期间的session
type Context struct {
Req *http.Request
Writer *ResponseWriter
Params httprouter.Params
Keys map[string]interface{}
Response interface{}
Begin time.Time
NoLog bool
OpLog *bytes.Buffer
Body *bytes.Buffer
BodyLen int64
BodyMap map[string]interface{}
ContextType string
Perm uint
// Econf *common.EopsConf
}
//APIInfo api信息
type APIInfo struct {
Handler Handle
Name string
// Perm 权限
Perm uint
}
// RouterList api路由列表
type RouterList struct {
API map[string]APIInfo
Privilege Handle
}
type PrometheusList struct {
RequestsInFlight prometheus.Gauge
ConcurrencyLimitExceeded prometheus.Counter
RequestsTotal *prometheus.CounterVec
RequestsURLTotal *prometheus.CounterVec
}
const (
noWritten = -1
maxBodySize = int64(10 << 20)
// TypeJson http的响应内容类型:json
defaultType = "application/octet-stream"
TypeJson = "application/json; charset=UTF-8"
TypePlain = "text/plain; charset=UTF-8"
TypeHtml = "text/html; charset=UTF-8"
)
//Handle 增加全局配置信息
type Handle func(ctx *Context)
//type Handle func(http.ResponseWriter, *http.Request, httprouter.Params, interface{})
//NewRouteList 初始化路由表
func NewRouteList() *RouterList {
apilist := make(map[string]APIInfo)
route := RouterList{API: apilist}
return &route
}
// NewRouter 创建一个新的router
func NewRouter(sver string, alog *mlog.Logger) *Router {
concurrency := runtime.GOMAXPROCS(0)
router := &Router{
Router: *httprouter.New(),
accessLog: alog,
// errLog: elog,
serverVer: "DDW/1.0",
// Registry: prometheus.DefaultRegisterer,
PrometheusList: PrometheusList{},
inFlightSem: make(chan struct{}, concurrency),
}
// TODO(beorn7): For now, this hardcodes the method="get" label. Other
// methods should get the same instrumentation.
// prometheus.MustRegister(requestsInFlight)
// prometheus.MustRegister(concurrencyLimitExceeded)
// prometheus.MustRegister(prometheus.NewBuildInfoCollector())
// if err := router.Registry.Register(requestsInFlight); err != nil {
// return nil
// }
// if err := router.Registry.Register(concurrencyLimitExceeded); err != nil {
// return nil
// }
if sver != "" {
router.serverVer = sver
}
router.NotFound()
router.SetupPrometheus()
// router.Router.Handle("GET", "/metrics", func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
// promhttp.Handler().ServeHTTP(w, r)
// })
return router
}
func (router *Router) SetupPrometheus() {
requestsInFlight := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "http_requests_in_flight",
Help: "Current number of HTTP requests being processed.",
ConstLabels: prometheus.Labels{"method": "get"},
})
concurrencyLimitExceeded := prometheus.NewCounter(prometheus.CounterOpts{
Name: "http_concurrency_limit_exceeded_total",
Help: "Total number of times an HTTP request failed because the concurrency limit was reached.",
ConstLabels: prometheus.Labels{"method": "get"},
})
RequestsTotal := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Total number of scrapes by HTTP status code.",
},
[]string{"code"},
)
RequestsURLTotal := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_url_total",
Help: "Total number of scrapes by HTTP URL and status code.",
},
[]string{"code", "url"},
)
RequestsTotal.WithLabelValues("200")
RequestsTotal.WithLabelValues("500")
RequestsTotal.WithLabelValues("502")
RequestsTotal.WithLabelValues("503")
router.PrometheusList.RequestsInFlight = requestsInFlight
router.PrometheusList.ConcurrencyLimitExceeded = concurrencyLimitExceeded
router.PrometheusList.RequestsTotal = RequestsTotal
router.PrometheusList.RequestsURLTotal = RequestsURLTotal
prometheus.MustRegister(requestsInFlight)
prometheus.MustRegister(concurrencyLimitExceeded)
prometheus.MustRegister(RequestsTotal)
prometheus.MustRegister(RequestsURLTotal)
}
//NewResponseWriter 创建新实例
func NewResponseWriter(w http.ResponseWriter) *ResponseWriter {
return &ResponseWriter{w, http.StatusOK, noWritten}
}
//WriteHeader 记录status
func (lrw *ResponseWriter) WriteHeader(code int) {
if !lrw.Written() {
lrw.size = 0
lrw.statusCode = code
lrw.ResponseWriter.WriteHeader(code)
}
}
//AbortWithStatus 记录status
func (lrw *ResponseWriter) AbortWithStatus(code int) {
lrw.size = 0
lrw.statusCode = code
lrw.ResponseWriter.WriteHeader(code)
}
//Write 记录status
func (lrw *ResponseWriter) Write(data []byte) (n int, err error) {
if !lrw.Written() {
n, err = lrw.ResponseWriter.Write(data)
lrw.size += n
}
return
}
//WriteString 获取Written
func (lrw *ResponseWriter) WriteString(s string) (n int, err error) {
n, err = io.WriteString(lrw.ResponseWriter, s)
lrw.size += n
return
}
//Written 获取Written
func (lrw *ResponseWriter) Written() bool {
return lrw.size != noWritten
}
//Status 获取status
func (lrw *ResponseWriter) Status() int {
return lrw.statusCode
}
//Size 获取size
func (lrw *ResponseWriter) Size() int {
return lrw.size
}
//ServeHTTP 增加日志输出
func (router *Router) ServeHTTP(w http.ResponseWriter, req *http.Request) {
lw := NewResponseWriter(w)
lw.Header().Set("Server", router.serverVer)
//lw.Header().Set("Content-Type", "application/json; charset=UTF-8")
h := router.limitHandler(http.HandlerFunc(router.Router.ServeHTTP))
// h := http.HandlerFunc(router.Router.ServeHTTP)
h.ServeHTTP(lw, req)
}
//AddRoute 增加route信息
func (router *Router) AddRoute(method, path string, perm uint, conf interface{}, handlers ...Handle) {
router.Router.Handle(method, path, func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
ctx := &Context{
Req: r,
Writer: w.(*ResponseWriter),
Params: ps,
Keys: make(map[string]interface{}),
Begin: time.Now(),
OpLog: bytes.NewBuffer([]byte{}),
Body: bytes.NewBuffer([]byte{}),
ContextType: "application/octet-stream",
Perm: perm,
}
ctx.Keys["conf"] = conf
if router.PanicHandler != nil {
defer router.PanicHandler(ctx)
}
for _, handle := range handlers {
handle(ctx)
if w.(*ResponseWriter).Written() || ctx.Response != nil {
break
}
}
if router.ResponseHandler != nil {
router.ResponseHandler(ctx)
}
router.responseDefaultHandler(ctx)
})
}
// PutRouterList 添加routerlist到路由表
func (router *Router) PutRouterList(rl *RouterList, conf interface{}) {
for k, v := range rl.API {
t := strings.Split(k, "|")
method := string(t[1])
path := string(t[0])
if router.Prefix != "" {
path = router.Prefix + path
}
if v.Perm > 0 && rl.Privilege != nil {
router.AddRoute(method, path, v.Perm, conf, rl.Privilege, v.Handler)
} else {
router.AddRoute(method, path, v.Perm, conf, v.Handler)
// router.AddRoute(string(t[1]), fmt.Sprintf("%s/:ver%s", econf.GOPT.Prefix, string(t[0])), econf, v.Handler)
}
if h, _, _ := router.Lookup("OPTIONS", path); h == nil {
router.AddRoute("OPTIONS", path, v.Perm, nil, nullHandler)
}
}
}
func nullHandler(ctx *Context) {
ctx.Response = ""
}
func (router *Router) limitHandler(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodGet { // Only limit concurrency of GETs.
select {
case router.inFlightSem <- struct{}{}: // All good, carry on.
router.PrometheusList.RequestsInFlight.Inc()
defer func() {
<-router.inFlightSem
router.PrometheusList.RequestsInFlight.Dec()
}()
default:
router.PrometheusList.ConcurrencyLimitExceeded.Inc()
http.Error(w, fmt.Sprintf(
"Limit of concurrent GET requests reached (%d), try again later.\n", cap(router.inFlightSem),
), http.StatusServiceUnavailable)
return
}
}
h.ServeHTTP(w, req)
})
}
//NotFound 增加route信息
func (router *Router) NotFound() {
router.Router.NotFound = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := &Context{
Req: r,
Writer: w.(*ResponseWriter),
Keys: make(map[string]interface{}),
Begin: time.Now(),
}
// ctx.Keys["econf"] = conf
ctx.Writer.Header().Set("Content-Type", "text/plain; charset=utf-8")
ctx.Writer.WriteHeader(http.StatusNotFound)
if router.PanicHandler != nil {
defer router.PanicHandler(ctx)
}
router.responseDefaultHandler(ctx)
})
}
// responseDefaultHandler 回复客户端的处理器
func (router *Router) responseDefaultHandler(ctx *Context) {
defer func() {
if !ctx.NoLog && router.accessLog != nil {
isArgs := "?"
if ctx.Req.URL.RawQuery == "" {
isArgs = ""
}
status := strconv.Itoa(ctx.Writer.Status())
router.PrometheusList.RequestsTotal.WithLabelValues(status).Inc()
router.PrometheusList.RequestsURLTotal.WithLabelValues(status, ctx.Req.URL.Path).Inc()
router.accessLog.Printf("%s %s %s %s%s%s %d %s %v %v\n",
ctx.Begin.Format(time.RFC3339),
ctx.RemoteAddr(false),
ctx.Req.Method,
ctx.Req.URL.Path,
isArgs,
ctx.Req.URL.RawQuery,
ctx.Writer.Status(),
http.StatusText(ctx.Writer.Status()),
ctx.Keys["code"],
time.Since(ctx.Begin))
}
}()
if ctx.Response == nil {
ctx.Writer.WriteHeader(http.StatusNoContent)
return
}
ctx.Keys["code"] = 0
var out []byte
switch v := ctx.Response.(type) {
case []byte:
out = v
if ctx.ContextType == defaultType {
ctx.ContextType = TypePlain
}
case string:
out = []byte(v)
if ctx.ContextType == defaultType {
ctx.ContextType = TypePlain
}
case []string:
out = []byte(strings.Join(v, "\n"))
if ctx.ContextType == defaultType {
ctx.ContextType = TypePlain
}
case int:
if v > 0 && v < 1000 && v != 200 {
ctx.Writer.WriteHeader(v)
return
}
msg := NewRespMsg(v)
// ctx.Writer.Header().Set("X-Code", strconv.Itoa(v))
ctx.Response = msg
// tmp, err := json.Marshal(msg)
// if err != nil {
// ctx.Writer.WriteHeader(http.StatusResetContent)
// return
// }
// out = tmp
ctx.ContextType = TypeJson
case *RespMsg:
ctx.ContextType = TypeJson
default:
ctx.Writer.WriteHeader(http.StatusResetContent)
return
}
if ctx.ContextType == TypeJson {
msg := ctx.Response.(*RespMsg)
ctx.Keys["code"] = msg.Code
tmp, err := json.Marshal(ctx.Response)
if err != nil {
ctx.Writer.WriteHeader(http.StatusResetContent)
return
}
out = tmp
}
ctx.Writer.Header().Set("Content-Type", ctx.ContextType)
ctx.Writer.Write(out)
}
//RemoteAddr 获取客户端IP
func (ctx *Context) RemoteAddr(xff bool) string {
addr := ctx.Req.RemoteAddr
if i := strings.IndexByte(addr, ':'); i > 0 {
addr = ctx.Req.RemoteAddr[:i]
}
if xff {
if xffip := ctx.Req.Header.Get("X-Forwarded-For"); xffip != "" {
addr = xffip
}
}
return addr
}
//ReqBody 获取请求中的Body
func (ctx *Context) ReqBody(out interface{}) (ecode int, err error) {
ecode = 0
if ctx.Body.Len() == 0 {
clen := ctx.Req.Header.Get("Content-Length")
var bodyLen int64
if bodyLen, err = strconv.ParseInt(clen, 10, 0); err == nil {
if bodyLen > maxBodySize {
// ctx.Writer.WriteHeader(http.StatusRequestEntityTooLarge)
ecode = http.StatusRequestEntityTooLarge
err = errors.New("StatusRequestEntityTooLarge")
return
}
}
reader := io.LimitReader(ctx.Req.Body, maxBodySize+1)
// b, err = ioutil.ReadAll(reader)
ctx.BodyLen, err = ctx.Body.ReadFrom(reader)
if err != nil {
ecode = StatusBodyInvalid
return
}
}
if out == nil {
return
}
ct := ctx.Req.Header.Get("Content-Type")
// RFC 2616, section 7.2.1 - empty type
// SHOULD be treated as application/octet-stream
if ct == "" {
ct = "application/octet-stream"
}
ct, _, _ = mime.ParseMediaType(ct)
switch ct {
case "text/xml":
if err = xml.Unmarshal(ctx.Body.Bytes(), out); err != nil {
ecode = StatusBodyInvalid
}
case "application/json":
if err = json.Unmarshal(ctx.Body.Bytes(), out); err != nil {
ecode = StatusBodyInvalid
}
default:
}
return
}
// var Chunking = errors.New("invalid chunking")
//Upload 文件上传处理。兼容chunked方式。
/*
不是chunk方式上传时,保存的文件名为随机生成。
chunk方式上传时,保存的文件名为提交的文件名。这种方式上传的文件名不能重名。
返回的参数:提交的文件名,保存的文件名,错误信息。当chunk方式上传时,在最后一个chunk请求处理
之前,返回的错误信息是:Chunking,表示chunk未完成。
*/
func (ctx *Context) Upload(savedir, allowExt string) (string, string, error) {
mr, err := ctx.Req.MultipartReader()
if err != nil {
return "", "", err
}
var part *multipart.Part
part, err = mr.NextPart()
if err != nil {
return "", "", err
}
//savedir := econf.GOPT.WorkDir + "/" + project + "/" + cluster + "/" + app
// var dirst os.FileInfo
if _, err = os.Stat(savedir); err != nil {
if err := os.MkdirAll(savedir, 0755); err != nil {
err = fmt.Errorf("mkdir %s failed: %v", savedir, err)
return "", "", err
}
}
for {
if part.FormName() == "files" {
break
}
part, err = mr.NextPart()
if err != nil {
return "", "", err
}
}
postname := part.FileName()
reg := regexp.MustCompile(allowExt)
tmpname := "multipart-" + bson.NewObjectId().Hex()
if postname != "" && reg.MatchString(postname) {
// var b bytes.Buffer
// io.CopyN(&b, part, 1048576+1)
// econf.LogDebug("[%s] upload file %s with content: %s\n", logKind, fname, b.String())
/*
Content-Range:bytes 0-10485759/202286246
Content-Type:multipart/form-data; boundary=----WebKitFormBoundaryduq7g7PEGTVOEwWh
------WebKitFormBoundarydCgVRryb2qJPD2kR
Content-Disposition: form-data; name="checkbox1"
csv
------WebKitFormBoundarydCgVRryb2qJPD2kR
Content-Disposition: form-data; name="files"; filename="test.csv"
Content-Type: text/csv
------WebKitFormBoundarydCgVRryb2qJPD2kR--
*/
// wf := bufio.NewWriter(os.Stdout)
// defer wf.Flush()
var chunkStart, chunkEnd, chunkTotal int64
chunk := ctx.Req.Header.Get("Content-Range")
if chunk != "" {
i := strings.IndexByte(chunk[6:], '-')
j := strings.IndexByte(chunk[6:], '/')
chunkStart, _ = strconv.ParseInt(chunk[6:6+i], 10, 0)
chunkEnd, _ = strconv.ParseInt(chunk[6+i+1:6+j], 10, 0)
chunkTotal, _ = strconv.ParseInt(chunk[6+j+1:], 10, 0)
if i == -1 || j == -1 {
err = errors.New("InvalidRequest: " + chunk)
return "", "", err
}
tmpname = postname
}
//econf.LogDebug("[%s] chunk info: %s\n", logKind, chunk)
fname := savedir + "/" + tmpname
var f *os.File
var fstat os.FileInfo
if chunkStart == 0 {
f, err = os.OpenFile(fname, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0644)
} else {
fstat, err = os.Stat(fname)
if err != nil {
err = errors.New("no first chunk request")
return "", "", err
}
if fstat.Size() != chunkStart {
err = errors.New("file size not eq total chunk size")
return "", "", err
}
// fmt.Fprintf(wf, "%s size: %d\n", fstat.Name(), fstat.Size())
f, err = os.OpenFile(fname, os.O_APPEND|os.O_RDWR, 0644)
}
if err != nil {
err = fmt.Errorf("open file %s failed: %v", fname, err)
return "", "", err
}
// var wn int64
if chunkEnd > 0 {
_, err = io.CopyN(f, part, chunkEnd-chunkStart)
} else {
_, err = io.Copy(f, part)
}
if cerr := f.Close(); err == nil {
err = cerr
}
// fmt.Fprintf(wf, "write size: %d\n", wn)
if err != nil {
err = fmt.Errorf("Write file %s failed: %v", fname, err)
os.Remove(f.Name())
return "", "", err
}
//文件全部上传完成
if (chunkTotal != 0 && chunkEnd == chunkTotal) || chunkTotal == 0 {
return postname, tmpname, nil
} else {
return postname, tmpname, errors.New("invalid chunking")
}
}
err = fmt.Errorf("InvalidRequest")
return "", "", err
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/daledi/public.git
git@gitee.com:daledi/public.git
daledi
public
公共包
v1.0.0

搜索帮助