1 Star 1 Fork 0

宇宙蒙面侠X/github.com-olivere-elastic

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
sliced_scroll.go 4.13 KB
一键复制 编辑 原始数据 按行查看 历史
Oliver Eilhard 提交于 2019-03-29 15:07 . Switch doc links to 6.7
// Copyright 2012-present Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.
// SlicedScroll illustrates scrolling through a set of documents
// in parallel. It uses the sliced scrolling feature introduced
// in Elasticsearch 5.0 to create a number of Goroutines, each
// scrolling through a slice of the total results. A second goroutine
// receives the hits from the set of goroutines scrolling through
// the slices and simply counts the total number and the number of
// documents received per slice.
//
// The speedup of sliced scrolling can be significant but is very
// dependent on the specific use case.
//
// See https://www.elastic.co/guide/en/elasticsearch/reference/6.7/search-request-scroll.html#sliced-scroll
// for details on sliced scrolling in Elasticsearch.
//
// Example
//
// Scroll with 4 parallel slices through an index called "products".
// Use "_uid" as the default field:
//
// sliced_scroll -index=products -n=4
//
package main
import (
"context"
"flag"
"fmt"
"io"
"log"
"sync"
"sync/atomic"
"time"
"golang.org/x/sync/errgroup"
"github.com/olivere/elastic"
)
func main() {
var (
url = flag.String("url", "http://localhost:9200", "Elasticsearch URL")
index = flag.String("index", "", "Elasticsearch index name")
typ = flag.String("type", "", "Elasticsearch type name")
field = flag.String("field", "", "Slice field (must be numeric)")
numSlices = flag.Int("n", 2, "Number of slices to use in parallel")
sniff = flag.Bool("sniff", true, "Enable or disable sniffing")
)
flag.Parse()
log.SetFlags(0)
if *url == "" {
log.Fatal("missing url parameter")
}
if *index == "" {
log.Fatal("missing index parameter")
}
if *numSlices <= 0 {
log.Fatal("n must be greater than zero")
}
// Create an Elasticsearch client
client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff))
if err != nil {
log.Fatal(err)
}
// Setup a group of goroutines from the excellent errgroup package
g, ctx := errgroup.WithContext(context.TODO())
// Hits channel will be sent to from the first set of goroutines and consumed by the second
type hit struct {
Slice int
Hit elastic.SearchHit
}
hitsc := make(chan hit)
begin := time.Now()
// Start a number of goroutines to parallelize scrolling
var wg sync.WaitGroup
for i := 0; i < *numSlices; i++ {
wg.Add(1)
slice := i
// Prepare the query
var query elastic.Query
if *typ == "" {
query = elastic.NewMatchAllQuery()
} else {
query = elastic.NewTypeQuery(*typ)
}
// Prepare the slice
sliceQuery := elastic.NewSliceQuery().Id(i).Max(*numSlices)
if *field != "" {
sliceQuery = sliceQuery.Field(*field)
}
// Start goroutine for this sliced scroll
g.Go(func() error {
defer wg.Done()
svc := client.Scroll(*index).Query(query).Slice(sliceQuery)
for {
res, err := svc.Do(ctx)
if err == io.EOF {
break
}
if err != nil {
return err
}
for _, searchHit := range res.Hits.Hits {
// Pass the hit to the hits channel, which will be consumed below
select {
case hitsc <- hit{Slice: slice, Hit: *searchHit}:
case <-ctx.Done():
return ctx.Err()
}
}
}
return nil
})
}
go func() {
// Wait until all scrolling is done
wg.Wait()
close(hitsc)
}()
// Second goroutine will consume the hits sent from the workers in first set of goroutines
var total uint64
totals := make([]uint64, *numSlices)
g.Go(func() error {
for hit := range hitsc {
// We simply count the hits here.
atomic.AddUint64(&totals[hit.Slice], 1)
current := atomic.AddUint64(&total, 1)
sec := int(time.Since(begin).Seconds())
fmt.Printf("%8d | %02d:%02d\r", current, sec/60, sec%60)
select {
default:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
// Wait until all goroutines are finished
if err := g.Wait(); err != nil {
log.Fatal(err)
}
fmt.Printf("Scrolled through a total of %d documents in %v\n", total, time.Since(begin))
for i := 0; i < *numSlices; i++ {
fmt.Printf("Slice %2d received %d documents\n", i, totals[i])
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/awol2010ex/github.com-olivere-elastic.git
git@gitee.com:awol2010ex/github.com-olivere-elastic.git
awol2010ex
github.com-olivere-elastic
github.com-olivere-elastic
v6.2.18

搜索帮助

A270a887 8829481 3d7a4017 8829481