1 Star 0 Fork 0

h79 / goutils

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
es.go 11.91 KB
一键复制 编辑 原始数据 按行查看 历史
huqiuyun 提交于 2024-03-16 19:29 . elastic wrapper
package es
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/olivere/elastic/v7"
)
func esNotFoundErr(err error) bool {
return elastic.IsNotFound(err) || err.(*elastic.Error).Details.Reason == "all shards failed"
}
func defaultMapper[T any](sh *elastic.SearchHit) (t *T, err error) {
err = json.Unmarshal(sh.Source, &t)
if err != nil {
return nil, err
}
return t, err
}
// Search ES搜索
func Search(ctx context.Context, client *elastic.Client, filter *SearchQuery, valueFunc func(*elastic.SearchHit)) (int64, error) {
service, err := BuildQuery(ctx, client, filter)
if err != nil {
return 0, err
}
if len(filter.Index) == 0 {
return 0, fmt.Errorf("索引不能为空")
}
resp, err := service.Do(ctx)
if err != nil {
if esNotFoundErr(err) {
return 0, nil
}
return 0, err
}
if resp.TotalHits() == 0 {
return 0, nil
}
for _, hit := range resp.Hits.Hits {
valueFunc(hit)
}
return resp.TotalHits(), nil
}
// SearchAny ES搜索
func SearchAny[T any](ctx context.Context, client *elastic.Client, filter *SearchQuery) ([]*T, int64, error) {
var list []*T
total, err := Search(ctx, client, filter, func(eh *elastic.SearchHit) {
v, err := defaultMapper[T](eh)
if err != nil {
return
}
list = append(list, v)
})
return list, total, err
}
func BatchAdd[T any](ctx context.Context, client *elastic.Client, index string, datas map[string]*T) error {
var bulk = client.Bulk().Index(index)
for k, v := range datas {
bulk.Add(elastic.NewBulkCreateRequest().Id(k).Doc(v))
}
_, err := bulk.Refresh("true").Do(ctx)
return err
}
func BuildQuery(ctx context.Context, client *elastic.Client, filter *SearchQuery) (*elastic.SearchService, error) {
boolQuery := elastic.NewBoolQuery()
boolQuery.Must(filter.MustQuery...)
boolQuery.MustNot(filter.MustNotQuery...)
boolQuery.Should(filter.ShouldQuery...)
boolQuery.Filter(filter.Filters...)
// 当should不为空时,保证至少匹配should中的一项
if len(filter.MustQuery) == 0 && len(filter.MustNotQuery) == 0 && len(filter.ShouldQuery) > 0 {
boolQuery.MinimumShouldMatch("1")
}
service := client.Search().Index(filter.Index).Query(boolQuery)
if len(filter.Sorters) > 0 {
service = service.SortBy(filter.Sorters...)
}
if filter.Size > 0 {
if filter.PageIndex < 1 {
filter.PageIndex = 1
}
var from = (filter.PageIndex - 1) * filter.Size
service = service.From(from).Size(filter.Size)
} else {
if filter.MaxWindows == 0 {
filter.MaxWindows = 5000000
}
service = service.Size(filter.MaxWindows)
}
return service, nil
}
func BatchUpdate(ctx context.Context, client *elastic.Client, index string, datas map[string]map[string]interface{}) error {
var bulk = client.Bulk().Index(index)
for k, v := range datas {
bulk.Add(elastic.NewBulkUpdateRequest().Id(k).Doc(v))
}
_, err := bulk.Refresh("true").Do(ctx)
return err
}
// UpdateWhere 根据查询更新
func UpdateWhere(ctx context.Context, client *elastic.Client, csr *SearchQuery, data map[string]interface{}) error {
if len(csr.Index) == 0 {
return errors.New("index 不能为空")
}
if len(csr.Index) == 0 {
return errors.New("index 不能为空")
}
//ctx._source['name']=params['name']
var i = 0
var script = ""
for k := range data {
if i > 0 {
script += ";"
}
script += fmt.Sprintf("ctx._source['%s']=params['%s']", k, k)
i++
}
esScript := elastic.NewScript(script).Params(data)
res, err := client.UpdateByQuery(csr.Index).Query(csr.ToQuery()).Script(esScript).Refresh("true").Do(ctx)
if err != nil {
return err
}
if res.Updated >= 1 && res.Total >= 1 {
return nil
}
return errors.New("not update")
}
func BatchDelete(ctx context.Context, client *elastic.Client, index string, ids []string) error {
bulk := client.Bulk().Index(index)
for _, v := range ids {
bulk.Add(elastic.NewBulkDeleteRequest().Id(v))
}
_, err := bulk.Refresh("true").Do(ctx)
if err != nil {
if elastic.IsNotFound(err) {
return nil
}
return err
}
return err
}
// Agg 统计
func Agg(ctx context.Context, client *elastic.Client, csr *SearchQuery, agg elastic.Aggregation, valueFunc func(json.RawMessage) error) (err error) {
searchService, err := BuildQuery(ctx, client, csr)
if err != nil {
return err
}
searchResult, err := searchService.Size(0).Aggregation("agg", agg).Do(ctx)
if err != nil {
if esNotFoundErr(err) {
return nil
}
return err
}
v, ok := searchResult.Aggregations["agg"]
if !ok {
return nil
}
return valueFunc(v)
}
func BuildIntTermsQuery(name string, list []int64) (query *elastic.TermsQuery) {
if len(list) == 0 {
return nil
}
l := make([]interface{}, len(list))
for index, value := range list {
l[index] = value
}
return elastic.NewTermsQuery(name, l...)
}
func BuildTermsQuery[T any](name string, list []T) (query *elastic.TermsQuery) {
if len(list) == 0 {
return nil
}
l := make([]interface{}, len(list))
for index, value := range list {
l[index] = value
}
return elastic.NewTermsQuery(name, l...)
}
func BuildTermQuery[T any](name string, val T) (query *elastic.TermQuery) {
return elastic.NewTermQuery(name, val)
}
func BuildExistsQuery(name string) (query *elastic.ExistsQuery) {
return elastic.NewExistsQuery(name)
}
func MustTermsQueryIf[T any](ok bool, search *SearchQuery, name string, list []T) *SearchQuery {
if !ok {
return search
}
if len(list) == 0 {
return search
}
search.MustQuery = append(search.MustQuery, BuildTermsQuery(name, list))
return search
}
func BuildWildcardQuery(names []string, term string) (query *elastic.BoolQuery) {
var should = make([]elastic.Query, len(names))
for i := range names {
should[i] = elastic.NewWildcardQuery(names[i], fmt.Sprintf("*%v*", term))
}
return elastic.NewBoolQuery().Should(should...)
}
func BuildNestedWildcardQuery(path string, name string, card string) (query *elastic.NestedQuery) {
return elastic.NewNestedQuery(path, BuildWildcardQuery([]string{name}, card))
}
func BuildNestedTermsQuery[T any](path string, name string, list []T) (query *elastic.NestedQuery) {
if len(list) == 0 {
return nil
}
return elastic.NewNestedQuery(path, BuildTermsQuery(name, list))
}
type SortField struct {
OrderField string `json:"order_field"`
Desc bool `json:"desc"`
}
type SearchQuery struct {
Index string
MustQuery []elastic.Query
MustNotQuery []elastic.Query
ShouldQuery []elastic.Query
Filters []elastic.Query
Sorters []elastic.Sorter
PageIndex int
Size int
MaxWindows int
}
func (cs *SearchQuery) ToQuery() elastic.Query {
boolQuery := elastic.NewBoolQuery()
boolQuery.Must(cs.MustQuery...)
boolQuery.MustNot(cs.MustNotQuery...)
boolQuery.Should(cs.ShouldQuery...)
boolQuery.Filter(cs.Filters...)
// 当should不为空时,保证至少匹配should中的一项
if len(cs.MustQuery) == 0 && len(cs.MustNotQuery) == 0 && len(cs.ShouldQuery) > 0 {
boolQuery.MinimumShouldMatch("1")
}
return boolQuery
}
// FilterDeleteQuery 过滤删除的
func (cs *SearchQuery) FilterDeleteQuery() *SearchQuery {
if cs.Filters == nil {
cs.Filters = []elastic.Query{}
}
cs.Filters = append(cs.Filters, elastic.NewTermQuery("is_delete", 0))
return cs
}
func (cs *SearchQuery) FilterDeleteQueryIf(filterDel bool) *SearchQuery {
if cs.Filters == nil {
cs.Filters = []elastic.Query{}
}
if !filterDel {
return cs
}
cs.Filters = append(cs.Filters, elastic.NewTermQuery("is_delete", 0))
return cs
}
func (cs *SearchQuery) MustWildcardQueryIf(ok bool, name string, term string) *SearchQuery {
if !ok {
return cs
}
cs.MustQuery = append(cs.MustQuery, elastic.NewWildcardQuery(name, fmt.Sprintf("*%v*", term)))
return cs
}
func (cs *SearchQuery) MustTermQueryIf(ok bool, name string, term interface{}) *SearchQuery {
if !ok {
return cs
}
cs.MustQuery = append(cs.MustQuery, elastic.NewTermQuery(name, term))
return cs
}
func (cs *SearchQuery) MustNotTermQueryIf(ok bool, name string, term interface{}) *SearchQuery {
if !ok {
return cs
}
cs.MustNotQuery = append(cs.MustNotQuery, elastic.NewTermQuery(name, term))
return cs
}
// MustBitScriptQueryIf 位运算
func (cs *SearchQuery) MustBitScriptQueryIf(ok bool, name string, val int32) *SearchQuery {
if !ok {
return cs
}
cs.MustQuery = append(cs.MustNotQuery, elastic.NewScriptQuery(elastic.NewScript(fmt.Sprintf("(doc['%v'].value&%d)==%d", name, val, val))))
return cs
}
func (cs *SearchQuery) TryMustTermsQuery(name string, list []string) *SearchQuery {
if len(list) == 0 {
return cs
}
cs.MustQuery = append(cs.MustQuery, BuildTermsQuery(name, list))
return cs
}
func (cs *SearchQuery) MustTermsQueryIf(ok bool, name string, list []string) *SearchQuery {
return MustTermsQueryIf(ok, cs, name, list)
}
func (cs *SearchQuery) MustTermsUint8QueryIf(ok bool, name string, list []uint8) *SearchQuery {
return MustTermsQueryIf(ok, cs, name, list)
}
func (cs *SearchQuery) MustTermsQueryInt32If(ok bool, name string, list []int32) *SearchQuery {
return MustTermsQueryIf(ok, cs, name, list)
}
func (cs *SearchQuery) MustTermsQueryIntIf(ok bool, name string, list []int) *SearchQuery {
return MustTermsQueryIf(ok, cs, name, list)
}
func (cs *SearchQuery) TryMustNotTermsQuery(name string, list []string) *SearchQuery {
if len(list) == 0 {
return cs
}
cs.MustNotQuery = append(cs.MustNotQuery, BuildTermsQuery(name, list))
return cs
}
func (cs *SearchQuery) TryMustIntTermsQuery(name string, list []int64) *SearchQuery {
if len(list) == 0 {
return cs
}
cs.MustQuery = append(cs.MustQuery, BuildIntTermsQuery(name, list))
return cs
}
func (cs *SearchQuery) ExistsQueryIf(ok bool, name string) *SearchQuery {
if !ok {
return cs
}
cs.MustQuery = append(cs.MustQuery, BuildExistsQuery(name))
return cs
}
// TryMustNestedStringTermsQuery 如 (path:to,name:to.id)
func (cs *SearchQuery) TryMustNestedStringTermsQuery(path string, name string, list []string) *SearchQuery {
if len(list) == 0 {
return cs
}
cs.MustQuery = append(cs.MustQuery, BuildNestedTermsQuery(path, name, list))
return cs
}
// TryMustNestedWildcardQuery 如 (path:to,name:to.id)
func (cs *SearchQuery) TryMustNestedWildcardQuery(path string, name string, card string) *SearchQuery {
cs.MustQuery = append(cs.MustQuery, BuildNestedWildcardQuery(path, name, card))
return cs
}
// TryMustNestedInt32TermsQuery 如 (path:to,name:to.id)
func (cs *SearchQuery) TryMustNestedInt32TermsQuery(path string, name string, list []int32) *SearchQuery {
if len(list) == 0 {
return cs
}
cs.MustQuery = append(cs.MustQuery, BuildNestedTermsQuery(path, name, list))
return cs
}
func (cs *SearchQuery) TryMustRangeQuery(name string, from int64, to int64) *SearchQuery {
if from <= 0 && to <= 0 {
return cs
}
var q = elastic.NewRangeQuery(name)
if from > 0 {
q = q.Gte(from)
}
if to > 0 {
q = q.Lte(to)
}
cs.MustQuery = append(cs.MustQuery, q)
return cs
}
// MustRangeFromQueryIf optType 1 gt 2 gte 3 lt 4 lte
func (cs *SearchQuery) MustRangeFromQueryIf(ok bool, name string, from int64, optType int64) *SearchQuery {
if !ok {
return cs
}
var q = elastic.NewRangeQuery(name)
switch optType {
case 1:
q = q.Gt(from)
case 2:
q = q.Gte(from)
case 3:
q = q.Lt(from)
case 4:
q = q.Lte(from)
}
cs.MustQuery = append(cs.MustQuery, q)
return cs
}
// MustWildcardOrQueryIf 多字段模糊匹配, 用bool query.should 拼接or 条件
func (cs *SearchQuery) MustWildcardOrQueryIf(ok bool, names []string, term string) *SearchQuery {
if !ok {
return cs
}
var should = make([]elastic.Query, len(names))
for i := range names {
should[i] = elastic.NewWildcardQuery(names[i], fmt.Sprintf("*%v*", term))
}
cs.MustQuery = append(cs.MustQuery, elastic.NewBoolQuery().Should(should...))
return cs
}
func (cs *SearchQuery) TrySort(list []*SortField, dft *SortField) *SearchQuery {
if len(list) == 0 {
if dft == nil {
return cs
}
return cs.buildSort(dft)
}
for _, v := range list {
cs.buildSort(v)
}
return cs
}
func (cs *SearchQuery) buildSort(field *SortField) *SearchQuery {
if field == nil {
return cs
}
var fsort = elastic.NewFieldSort(field.OrderField).Order(!field.Desc)
cs.Sorters = append(cs.Sorters, fsort)
return cs
}
Go
1
https://gitee.com/h79/goutils.git
git@gitee.com:h79/goutils.git
h79
goutils
goutils
v1.20.65

搜索帮助