代码拉取完成,页面将自动刷新
package es
import (
"context"
"encoding/json"
"fmt"
"gitee.com/manoshi/go-util/conf"
"gitee.com/manoshi/go-util/exception"
"gitee.com/manoshi/go-util/log"
"gitee.com/manoshi/go-util/util"
"github.com/olivere/elastic/v7"
"sort"
"strings"
"time"
)
func GetIndexPre() string {
if conf.Es.LogIndexNamePrefix == "" {
return indexNamePrefix
} else {
return conf.Es.LogIndexNamePrefix
}
}
func getCurrentIndex() string {
return GetIndexPre() + util.FormatDates(time.Now())
}
type ServerLogData struct {
Id string `json:"id"`
ServerName string `json:"serverName"`
Env string `json:"env"`
TimeStamp int64 `json:"timeStamp"`
ThreadNo string `json:"threadNo"`
ServerIp string `json:"serverIp"`
ClientIp string `json:"clientIp"`
Token string `json:"token"`
CustomerId int32 `json:"customerId"`
ClientSource string `json:"clientSource"` //请求中所有header信息
Url string `json:"url"`
Method string `json:"method"`
Request string `json:"request"`
Response string `json:"response"`
DealTimes int64 `json:"dealTimes"`
}
type ServerLogModel struct {
ServerName string `json:"serverName"`
Env string `json:"env"`
TimeStamp int64 `json:"timeStamp"`
ThreadNo string `json:"threadNo"`
ServerIp string `json:"serverIp"`
ClientIp string `json:"clientIp"`
Token string `json:"token"`
CustomerId int32 `json:"customerId"`
ClientSource string `json:"clientSource"`
Url string `json:"url"`
Method string `json:"method"`
Request string `json:"request"`
Response string `json:"response"`
DealTimes int64 `json:"dealTimes"`
}
func CreateIndex(indexName string) {
if EsStatus != 200 {
log.Warn("elasticsearch was dead!")
return
}
mapping :=
`{
"mappings": {
"properties": {
"env": {
"type": "keyword"
},
"serverName": {
"type": "keyword"
},
"timeStamp": {
"type": "date"
},
"threadNo": {
"type": "keyword"
},
"clientIp": {
"type": "keyword"
},
"serverIp": {
"type": "keyword"
},
"token": {
"type": "keyword"
},
"userId": {
"type": "integer"
},
"clientSource": {
"type": "text"
},
"url": {
"type": "keyword"
},
"method": {
"type": "keyword"
},
"request": {
"type": "text"
},
"response": {
"type": "text"
},
"dealTimes": {
"type": "integer"
}
}
}
}`
exists, err := client.IndexExists(indexName).Do(context.Background())
if err != nil {
log.Error(fmt.Sprintf("check elasticsearch index exists err:%s", err.Error()))
EsStatus = 0
} else {
if !exists {
result, err := client.CreateIndex(indexName).BodyString(mapping).Do(context.Background())
if err != nil {
log.Error(fmt.Sprintf("create elasticsearch index err:%s", err.Error()))
if !strings.Contains(err.Error(), "resource_already_exists_exception") {
EsStatus = 0
}
} else {
log.Info(fmt.Sprintf("create elasticsearch index :%t", result.Acknowledged))
}
}
}
}
func InsertEsLog(logInfo interface{}) {
if EsStatus != 200 {
log.Warn("elasticsearch was dead!")
return
}
indexName := getCurrentIndex()
CreateIndex(indexName)
put, err := getRedisClient().Index().
Index(indexName).
BodyJson(logInfo).
Do(context.Background())
if err != nil {
log.Error(fmt.Sprintf("insert elasticsearch log fail:%s", err.Error()))
EsStatus = 0
} else {
log.Debug(fmt.Sprintf("insert elasticsearch log success:%s", put.Id))
}
}
func GroupField(indexName, field string, size, minCount int, excludeValues []string) []byte {
if EsStatus != 200 {
log.Warn("elasticsearch was dead!")
return nil
}
if size == 0 {
size = 30
}
aggregationQuery := elastic.NewTermsAggregation().Field(field).MinDocCount(minCount).Size(size)
for _, excludeValue := range excludeValues {
aggregationQuery.ExcludeValues(excludeValue)
}
if log.IsDebug() {
source, _ := aggregationQuery.Source()
queryByte, _ := json.Marshal(source)
log.Info(fmt.Sprintf("查询条件:%s", string(queryByte)))
}
res, err := getRedisClient().Search(indexName).
Aggregation("group_field", aggregationQuery).
From(0).
Size(0).
Do(context.Background())
if err != nil {
exception.ThrowsErr(err)
}
bs, err := json.Marshal(res.Aggregations)
if err != nil {
exception.ThrowsErr(err)
}
return bs
}
type QueryParams struct {
Name string
Value interface{}
Type string
}
func QueryLogs(indexName string, params []QueryParams, sortsDesc map[string]bool, from, size int) (total int64, data []ServerLogData) {
if EsStatus != 200 {
log.Warn("elasticsearch was dead!")
return
}
query := elastic.NewBoolQuery()
if params != nil {
for _, param := range params {
switch param.Type {
case NewTermQuery:
query = query.Must(elastic.NewTermQuery(param.Name, param.Value))
continue
case NewMatchPhraseQuery:
query = query.Must(elastic.NewMatchPhraseQuery(param.Name, param.Value))
continue
case NewRegexpQuery:
query = query.Must(elastic.NewRegexpQuery(param.Name, util.GetInterfaceToString(param.Value)))
continue
case NewTermQueryNotMust:
query = query.MustNot(elastic.NewTermQuery(param.Name, param.Value))
continue
case NewMatchPhraseQueryNotMust:
query = query.MustNot(elastic.NewMatchPhraseQuery(param.Name, param.Value))
continue
case NewRegexpQueryNotMust:
query = query.MustNot(elastic.NewRegexpQuery(param.Name, util.GetInterfaceToString(param.Value)))
continue
}
}
}
var fieldSorts []elastic.Sorter
if sortsDesc != nil {
for k, v := range sortsDesc {
if v {
fieldSorts = append(fieldSorts, elastic.NewFieldSort(k).Desc())
} else {
fieldSorts = append(fieldSorts, elastic.NewFieldSort(k))
}
}
}
if from <= 0 {
from = 0
}
if size <= 0 {
size = 10
}
src, err := query.Source()
if err != nil {
exception.ThrowsErr(err)
}
log.Debugf("查询条件为:%s", util.GetInterfaceToString(src))
res, err := getRedisClient().Search(indexName).Query(query).
From(from).
Size(size).
SortBy(fieldSorts...).
Do(context.Background())
if err != nil {
exception.ThrowsErr(err)
}
for _, hit := range res.Hits.Hits {
serverLog := ServerLogData{}
err := util.ToObject(util.GetInterfaceToString(hit.Source), &serverLog)
if err != nil {
exception.ThrowsErr(err)
}
serverLog.Id = hit.Id
data = util.AppendDeduplicate(data, serverLog)
}
return res.TotalHits(), data
}
type IndexInfo struct {
Name string `json:"name"`
Count int `json:"count"`
Size string `json:"size"`
}
func AllIndex() interface{} {
if EsStatus != 200 {
log.Warn("elasticsearch was dead!")
return nil
}
res, err := getRedisClient().CatIndices().Do(context.Background())
if err != nil {
exception.ThrowsErr(err)
}
var keys []string
var sizeMap = make(map[string]IndexInfo)
for _, row := range res {
keys = append(keys, row.Index)
sizeMap[row.Index] = IndexInfo{row.Index, row.DocsCount, row.StoreSize}
}
sort.Strings(keys)
data := make([]IndexInfo, len(keys))
for i, key := range keys {
data[i] = sizeMap[key]
}
return data
}
func DeleteIndex(indexName string) interface{} {
if EsStatus != 200 {
log.Warn("elasticsearch was dead!")
return nil
}
res, err := getRedisClient().DeleteIndex(indexName).
Do(context.Background())
if err != nil {
exception.ThrowsErr(err)
}
return res
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。