Fetch the repository succeeded.
package component
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"gitee.com/kzangv/gsf-fof/component/define"
"gitee.com/kzangv/gsf-fof/gsf"
"gitee.com/kzangv/gsf-fof/logger"
"github.com/elastic/go-elasticsearch/v8"
"github.com/urfave/cli/v2"
"net"
"net/http"
"net/url"
"strings"
"time"
)
type EsApiLog func(op, idx, body string) func()
type EsClient struct {
DB *elasticsearch.Client
Cfg define.EntityEs
ref *ElasticSearch
IndexSuffix string
}
func (c *EsClient) Init(env int, cc *ElasticSearch) {
switch env {
case gsf.EnvLocal, gsf.EnvDebug:
c.IndexSuffix = "_test"
}
c.ref = cc
}
func (c *EsClient) Config() *define.EntityEs {
return &c.Cfg
}
func (c *EsClient) Ref() *ElasticSearch {
return c.ref
}
func (c *EsClient) CliFlags(name string) []cli.Flag {
return []cli.Flag{
&cli.IntFlag{
Name: fmt.Sprintf("es-%s-max-cnt", name),
Usage: fmt.Sprintf("es (%s) idle max count", name),
Action: func(context *cli.Context, i int) error { c.Cfg.MaxIdleCnt = i; return nil },
},
&cli.StringFlag{
Name: fmt.Sprintf("es-%s-dsn", name),
Usage: fmt.Sprintf("es(%s) DSN (format: `{{user}}:{{password}}@{{host}}`)", name),
Action: func(_ *cli.Context, val string) error {
dsn := strings.Split(val, "@")
switch len(dsn) {
case 1:
c.Cfg.Addr = dsn[0]
case 2:
auth := strings.Split(dsn[0], ":")
if len(auth) != 2 {
return fmt.Errorf("ElasticSearch DSN is invalid(%s)", val)
}
var (
u, p string
err error
)
if u, err = url.QueryUnescape(auth[0]); err == nil {
if p, err = url.QueryUnescape(auth[1]); err == nil {
c.Cfg.Addr = dsn[0]
c.Cfg.User, c.Cfg.Password = u, p
}
}
if err != nil {
return fmt.Errorf("ElasticSearch decode user is faild(%s), err-msg: %s", val, err.Error())
}
default:
return fmt.Errorf("ElasticSearch DSN is invalid(%s)", val)
}
return nil
},
},
}
}
func (c *EsClient) Load(name string) (err error) {
c.DB, err = elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{c.Cfg.Addr},
Username: c.Cfg.User,
Password: c.Cfg.Password,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
DisableCompression: true,
DialContext: (&net.Dialer{
Timeout: time.Duration(c.ref.Cfg.MaxConnTime) * time.Second,
KeepAlive: time.Duration(c.ref.Cfg.KeepAliveTime) * time.Second,
}).DialContext,
MaxIdleConns: c.Cfg.MaxIdleCnt,
MaxIdleConnsPerHost: c.Cfg.MaxIdleCnt,
IdleConnTimeout: time.Duration(c.ref.Cfg.MaxIdleTime) * time.Second,
TLSHandshakeTimeout: time.Duration(c.ref.Cfg.MaxConnTime) * time.Second / 2,
ExpectContinueTimeout: time.Second,
},
})
if err != nil {
c.DB = nil
err = fmt.Errorf("Load ElasticSearch (%s) Failed (Error: %s) ", name, err.Error())
}
return err
}
func (c *EsClient) Search(idx string, body string, receive interface{}) (cnt uint64, err error) {
return c.SearchLog(c.ApiLog, idx, body, receive)
}
func (c *EsClient) SearchLog(l EsApiLog, idx string, body string, receive interface{}) (cnt uint64, err error) {
reqData := &bytes.Buffer{}
reqData.WriteString(body)
db, idx := c.DB, idx+c.IndexSuffix
defer l("_search", idx, body)()
resp, err := db.Search(db.Search.WithIndex(idx), db.Search.WithBody(reqData), db.Search.WithContext(context.Background()))
if err == nil {
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
data := define.EsRespSearch{}
data.Hits.Hits = receive
if err = json.NewDecoder(resp.Body).Decode(&data); err == nil {
cnt = data.Hits.Total.Value
}
} else {
raw := define.EsRespError{}
if err = json.NewDecoder(resp.Body).Decode(&raw); err == nil {
err = fmt.Errorf(raw.Error.Reason)
}
}
}
return
}
func (c *EsClient) ApiLog(op, idx, body string) func() {
bTm := time.Now()
return func() {
spendTime := time.Now().Sub(bTm) / time.Millisecond
if spendTime > time.Duration(c.ref.Cfg.SlowThreshold) { // 查询超过阈值
c.ref.Log.WarnForce(" ES[%d ms]: %s/%s -- %s", spendTime, idx, op, body)
} else if c.ref.Log.Level() == logger.Info {
c.ref.Log.InfoForce(" ES[%d ms]: %s/%s -- %s", spendTime, idx, op, body)
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。