Fetch the repository succeeded.
package db
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"strings"
"time"
commonconfig "gitee.com/h79/goutils/common/config"
"gitee.com/h79/goutils/common/logger"
commonoption "gitee.com/h79/goutils/common/option"
"gitee.com/h79/goutils/common/result"
commontls "gitee.com/h79/goutils/common/tls"
"gitee.com/h79/goutils/dao/config"
"gitee.com/h79/goutils/dao/log"
"gitee.com/h79/goutils/dao/option"
"gitee.com/h79/goutils/dao/util"
"github.com/olivere/elastic/v7"
)
type esClient struct {
Client *elastic.Client
Version string
}
var _ EsDatabase = (*elasticGroup)(nil)
type elasticGroup struct {
selectName string
esMap map[string]*esClient
}
func WithEsClientOption(f func(cfg *config.Elastic) []elastic.ClientOptionFunc) commonoption.Option {
return esClientFunc(f)
}
type esClientFunc func(cfg *config.Elastic) []elastic.ClientOptionFunc
func (t esClientFunc) String() string {
return "es:client"
}
func (t esClientFunc) Type() int { return option.EsClientOpt }
func (t esClientFunc) Value() interface{} { return t }
func esClientFuncExist(opts ...commonoption.Option) esClientFunc {
if r, ok := commonoption.Exist(option.EsClientOpt, opts...); ok {
return r.Value().(esClientFunc)
}
return nil
}
func UseEsClientOption(cnf *config.Elastic, opts ...commonoption.Option) []elastic.ClientOptionFunc {
fn := esClientFuncExist(opts...)
if fn != nil {
return fn(cnf)
}
return []elastic.ClientOptionFunc{}
}
func NewES(conf []config.Elastic, opts ...commonoption.Option) (EsDatabase, error) {
logger.I("Elastic", "config= %#v", conf)
conns := &elasticGroup{esMap: map[string]*esClient{}}
for _, cfg := range conf {
err := conns.Create(cfg, opts...)
if err != nil {
return nil, err
}
}
return conns, nil
}
func esSecret(cfg *config.Elastic, master int, opts ...commonoption.Option) {
sec := option.UseSecret(cfg.Name, master, opts...)
if !sec.HasValid() {
return
}
if sec.User != "" {
cfg.User = sec.User
}
if sec.Pwd != "" {
cfg.Pwd = sec.Pwd
}
}
var esTlsFunc = func(cnf *commontls.Tls) (*tls.Config, error) {
if cnf.Key == "false" || cnf.Key == "" {
return nil, nil
}
if strings.EqualFold(cnf.Key, "skip-verify") {
return &tls.Config{InsecureSkipVerify: true}, nil
}
return NewTlsConfig(cnf)
}
func (conns *elasticGroup) Create(cfg config.Elastic, opts ...commonoption.Option) error {
if len(cfg.Host) <= 0 {
return fmt.Errorf("elastic host is empty")
}
esSecret(&cfg, 0, opts...)
interval := cfg.Interval
if interval <= 0 {
interval = 10
} else if interval > 3600 {
interval = 3600
}
interval = interval * time.Second
url := cfg.Host[0]
errLog := &log.EsLogger{
LogLevel: logger.ErrorLevel,
Level: cfg.Logger.LogLevel,
}
log.UseEsLogger(errLog, opts...)
infoLog := &log.EsLogger{
LogLevel: logger.InfoLevel,
Level: cfg.Logger.LogLevel,
}
log.UseEsLogger(infoLog, opts...)
traceLog := &log.EsLogger{
LogLevel: logger.DebugLevel,
Level: cfg.Logger.LogLevel,
}
log.UseEsLogger(traceLog, opts...)
fn := tlsFuncExist(opts...)
if fn == nil {
fn = esTlsFunc
}
tlsCfg, err := fn(&cfg.Tls)
if err != nil {
return err
}
dia := &net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}
httpClient := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: dia.DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
TLSClientConfig: tlsCfg,
},
}
var options = []elastic.ClientOptionFunc{
elastic.SetURL(cfg.Host...),
elastic.SetBasicAuth(cfg.User, cfg.Pwd),
elastic.SetHealthcheckInterval(interval),
elastic.SetMaxRetries(cfg.MaxRetries),
elastic.SetSniff(cfg.Sniff),
elastic.SetErrorLog(errLog),
elastic.SetInfoLog(infoLog),
elastic.SetTraceLog(traceLog),
elastic.SetHttpClient(httpClient),
}
options = append(options, UseEsClientOption(&cfg, opts...)...)
client, err := elastic.NewClient(options...)
if err != nil {
util.Alarm(result.ErrEsClientInternal, "", fmt.Sprintf("New Elastic(%s)", cfg.Name), err)
return err
}
_, _, err = client.Ping(url).Do(context.Background())
if err != nil {
util.Alarm(result.ErrEsPingInternal, "", fmt.Sprintf("Ping Elastic(%s)", cfg.Name), err)
return err
}
ver, err := client.ElasticsearchVersion(url)
logger.I("Elastic", "Name= %s,Version= %s", cfg.Name, ver)
conns.esMap[cfg.Name] = &esClient{Client: client, Version: cfg.Version}
if commonconfig.RegisterConfig != nil {
commonconfig.RegisterConfig("ES:Err|"+cfg.Name, errLog.HandlerConfig)
commonconfig.RegisterConfig("ES:Info|"+cfg.Name, infoLog.HandlerConfig)
commonconfig.RegisterConfig("ES:Trace|"+cfg.Name, traceLog.HandlerConfig)
}
return nil
}
func (conns *elasticGroup) Get(name string) (*elastic.Client, string, error) {
if len(name) == 0 {
name = conns.selectName
}
es, exists := conns.esMap[name]
if exists == true {
return es.Client, es.Version, nil
}
return nil, "", result.Error(result.ErrNotFound, "Not found")
}
func (conns *elasticGroup) Do(name string, call func(es *elastic.Client, version string) error) error {
if len(name) == 0 {
name = conns.selectName
}
if es, exists := conns.esMap[name]; exists == true {
return call(es.Client, es.Version)
}
return result.Error(result.ErrNotFound, "Not found")
}
func (conns *elasticGroup) Close(name string) error {
if len(name) == 0 {
name = conns.selectName
}
es, exists := conns.esMap[name]
if exists == true {
es.Client.Stop()
delete(conns.esMap, name)
}
return nil
}
func (conns *elasticGroup) CloseAll() {
for _, es := range conns.esMap {
es.Client.Stop()
}
conns.esMap = nil
}
func (conns *elasticGroup) Select(name string) {
conns.selectName = name
}
func (conns *elasticGroup) GetSelector() string {
return conns.selectName
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。