1 Star 1 Fork 0

宇宙蒙面侠X/github.com-olivere-elastic

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
backoff
cluster-test
config
uritemplates
.gitignore
.travis.yml
CHANGELOG-3.0.md
CONTRIBUTING.md
CONTRIBUTORS
ISSUE_TEMPLATE.md
LICENSE
README.md
bulk.go
bulk_delete_request.go
bulk_delete_request_test.go
bulk_index_request.go
bulk_index_request_test.go
bulk_processor.go
bulk_processor_test.go
bulk_request.go
bulk_test.go
bulk_update_request.go
bulk_update_request_test.go
canonicalize.go
canonicalize_test.go
clear_scroll.go
clear_scroll_test.go
client.go
client_test.go
cluster_health.go
cluster_health_test.go
cluster_state.go
cluster_state_test.go
cluster_stats.go
cluster_stats_test.go
connection.go
count.go
count_test.go
decoder.go
decoder_test.go
delete.go
delete_by_query.go
delete_by_query_test.go
delete_template.go
delete_template_test.go
delete_test.go
doc.go
errors.go
errors_test.go
example_test.go
exists.go
exists_test.go
explain.go
explain_test.go
fetch_source_context.go
fetch_source_context_test.go
field_stats.go
field_stats_test.go
geo_point.go
geo_point_test.go
get.go
get_template.go
get_template_test.go
get_test.go
highlight.go
highlight_test.go
index.go
index_test.go
indices_analyze.go
indices_analyze_test.go
indices_close.go
indices_close_test.go
indices_create.go
indices_create_test.go
indices_delete.go
indices_delete_template.go
indices_delete_test.go
indices_delete_warmer.go
indices_delete_warmer_test.go
indices_exists.go
indices_exists_template.go
indices_exists_template_test.go
indices_exists_test.go
indices_exists_type.go
indices_exists_type_test.go
indices_flush.go
indices_flush_test.go
indices_forcemerge.go
indices_forcemerge_test.go
indices_get.go
indices_get_aliases.go
indices_get_aliases_test.go
indices_get_mapping.go
indices_get_mapping_test.go
indices_get_settings.go
indices_get_settings_test.go
indices_get_template.go
indices_get_template_test.go
indices_get_test.go
indices_get_warmer.go
indices_get_warmer_test.go
indices_open.go
indices_open_test.go
indices_put_alias.go
indices_put_alias_test.go
indices_put_mapping.go
indices_put_mapping_test.go
indices_put_settings.go
indices_put_settings_test.go
indices_put_template.go
indices_put_warmer.go
indices_put_warmer_test.go
indices_refresh.go
indices_refresh_test.go
indices_stats.go
indices_stats_test.go
inner_hit.go
inner_hit_test.go
logger.go
mget.go
mget_test.go
msearch.go
msearch_test.go
mtermvectors.go
mtermvectors_test.go
nodes_info.go
nodes_info_test.go
nodes_stats.go
nodes_stats_test.go
optimize.go
optimize_test.go
percolate.go
percolate_test.go
ping.go
ping_test.go
plugins.go
plugins_test.go
query.go
reindex.go
reindex_test.go
reindexer.go
reindexer_test.go
request.go
request_test.go
rescore.go
rescorer.go
response.go
run-es.sh
scan.go
scan_test.go
script.go
script_test.go
scroll.go
scroll_test.go
search.go
search_aggs.go
search_aggs_bucket_children.go
search_aggs_bucket_children_test.go
search_aggs_bucket_date_histogram.go
search_aggs_bucket_date_histogram_test.go
search_aggs_bucket_date_range.go
search_aggs_bucket_date_range_test.go
search_aggs_bucket_filter.go
search_aggs_bucket_filter_test.go
search_aggs_bucket_filters.go
search_aggs_bucket_filters_test.go
search_aggs_bucket_geo_distance.go
search_aggs_bucket_geo_distance_test.go
search_aggs_bucket_geohash_grid.go
search_aggs_bucket_geohash_grid_test.go
search_aggs_bucket_global.go
search_aggs_bucket_global_test.go
search_aggs_bucket_histogram.go
search_aggs_bucket_histogram_test.go
search_aggs_bucket_missing.go
search_aggs_bucket_missing_test.go
search_aggs_bucket_nested.go
search_aggs_bucket_nested_test.go
search_aggs_bucket_range.go
search_aggs_bucket_range_test.go
search_aggs_bucket_reverse_nested.go
search_aggs_bucket_reverse_nested_test.go
search_aggs_bucket_sampler.go
search_aggs_bucket_sampler_test.go
search_aggs_bucket_significant_terms.go
search_aggs_bucket_significant_terms_test.go
search_aggs_bucket_terms.go
search_aggs_bucket_terms_test.go
search_aggs_metrics_avg.go
search_aggs_metrics_avg_test.go
search_aggs_metrics_cardinality.go
search_aggs_metrics_cardinality_test.go
search_aggs_metrics_extended_stats.go
search_aggs_metrics_extended_stats_test.go
search_aggs_metrics_geo_bounds.go
search_aggs_metrics_geo_bounds_test.go
search_aggs_metrics_max.go
search_aggs_metrics_max_test.go
search_aggs_metrics_min.go
search_aggs_metrics_min_test.go
search_aggs_metrics_percentile_ranks.go
search_aggs_metrics_percentile_ranks_test.go
search_aggs_metrics_percentiles.go
search_aggs_metrics_percentiles_test.go
search_aggs_metrics_stats.go
search_aggs_metrics_stats_test.go
search_aggs_metrics_sum.go
search_aggs_metrics_sum_test.go
search_aggs_metrics_top_hits.go
search_aggs_metrics_top_hits_test.go
search_aggs_metrics_value_count.go
search_aggs_metrics_value_count_test.go
search_aggs_pipeline_avg_bucket.go
search_aggs_pipeline_avg_bucket_test.go
search_aggs_pipeline_bucket_script.go
search_aggs_pipeline_bucket_script_test.go
search_aggs_pipeline_bucket_selector.go
search_aggs_pipeline_bucket_selector_test.go
search_aggs_pipeline_cumulative_sum.go
search_aggs_pipeline_cumulative_sum_test.go
search_aggs_pipeline_derivative.go
search_aggs_pipeline_derivative_test.go
search_aggs_pipeline_max_bucket.go
search_aggs_pipeline_max_bucket_test.go
search_aggs_pipeline_min_bucket.go
search_aggs_pipeline_min_bucket_test.go
search_aggs_pipeline_mov_avg.go
search_aggs_pipeline_mov_avg_test.go
search_aggs_pipeline_serial_diff.go
search_aggs_pipeline_serial_diff_test.go
search_aggs_pipeline_stats_bucket.go
search_aggs_pipeline_stats_bucket_test.go
search_aggs_pipeline_sum_bucket.go
search_aggs_pipeline_sum_bucket_test.go
search_aggs_pipeline_test.go
search_aggs_test.go
search_queries_bool.go
search_queries_bool_test.go
search_queries_boosting.go
search_queries_boosting_test.go
search_queries_common_terms.go
search_queries_common_terms_test.go
search_queries_constant_score.go
search_queries_constant_score_test.go
search_queries_dis_max.go
search_queries_dis_max_test.go
search_queries_exists.go
search_queries_exists_test.go
search_queries_fsq.go
search_queries_fsq_score_funcs.go
search_queries_fsq_test.go
search_queries_fuzzy.go
search_queries_fuzzy_test.go
search_queries_geo_bounding_box.go
search_queries_geo_bounding_box_test.go
search_queries_geo_distance.go
search_queries_geo_distance_test.go
search_queries_geo_polygon.go
search_queries_geo_polygon_test.go
search_queries_has_child.go
search_queries_has_child_test.go
search_queries_has_parent.go
search_queries_has_parent_test.go
search_queries_ids.go
search_queries_ids_test.go
search_queries_indices.go
search_queries_indices_test.go
search_queries_match.go
search_queries_match_all.go
search_queries_match_all_test.go
search_queries_match_test.go
search_queries_missing.go
search_queries_missing_test.go
search_queries_more_like_this.go
search_queries_more_like_this_test.go
search_queries_multi_match.go
search_queries_multi_match_test.go
search_queries_nested.go
search_queries_nested_test.go
search_queries_not.go
search_queries_not_test.go
search_queries_prefix.go
search_queries_prefix_test.go
search_queries_query_string.go
search_queries_query_string_test.go
search_queries_range.go
search_queries_range_test.go
search_queries_raw_string.go
search_queries_raw_string_test.go
search_queries_regexp.go
search_queries_regexp_test.go
search_queries_script.go
search_queries_script_test.go
search_queries_simple_query_string.go
search_queries_simple_query_string_test.go
search_queries_template_query.go
search_queries_template_query_test.go
search_queries_term.go
search_queries_term_test.go
search_queries_terms.go
search_queries_terms_test.go
search_queries_type.go
search_queries_type_test.go
search_queries_wildcard.go
search_queries_wildcard_test.go
search_request.go
search_request_test.go
search_source.go
search_source_test.go
search_suggester_test.go
search_template.go
search_templates_test.go
search_test.go
setup_test.go
sort.go
sort_test.go
suggest.go
suggest_field.go
suggest_field_test.go
suggest_test.go
suggester.go
suggester_completion.go
suggester_completion_fuzzy.go
suggester_completion_fuzzy_test.go
suggester_completion_test.go
suggester_context.go
suggester_context_category.go
suggester_context_category_test.go
suggester_context_geo.go
suggester_context_geo_test.go
suggester_phrase.go
suggester_phrase_test.go
suggester_term.go
suggester_term_test.go
tasks_cancel.go
tasks_cancel_test.go
tasks_list.go
tasks_list_test.go
termvectors.go
termvectors_test.go
update.go
update_by_query.go
update_by_query_test.go
update_test.go
克隆/下载
bulk_processor.go 15.56 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
// Copyright 2012-2016 Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.
package elastic
import (
"sync"
"sync/atomic"
"time"
"gopkg.in/olivere/elastic.v3/backoff"
)
// BulkProcessorService allows to easily process bulk requests. It allows setting
// policies when to flush new bulk requests, e.g. based on a number of actions,
// on the size of the actions, and/or to flush periodically. It also allows
// to control the number of concurrent bulk requests allowed to be executed
// in parallel.
//
// BulkProcessorService, by default, commits either every 1000 requests or when the
// (estimated) size of the bulk requests exceeds 5 MB. However, it does not
// commit periodically. BulkProcessorService also does retry by default, using
// an exponential backoff algorithm.
//
// The caller is responsible for setting the index and type on every
// bulk request added to BulkProcessorService.
//
// BulkProcessorService takes ideas from the BulkProcessor of the
// Elasticsearch Java API as documented in
// https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html.
type BulkProcessorService struct {
c *Client
beforeFn BulkBeforeFunc
afterFn BulkAfterFunc
name string // name of processor
numWorkers int // # of workers (>= 1)
bulkActions int // # of requests after which to commit
bulkSize int // # of bytes after which to commit
flushInterval time.Duration // periodic flush interval
wantStats bool // indicates whether to gather statistics
initialTimeout time.Duration // initial wait time before retry on errors
maxTimeout time.Duration // max time to wait for retry on errors
}
// NewBulkProcessorService creates a new BulkProcessorService.
func NewBulkProcessorService(client *Client) *BulkProcessorService {
return &BulkProcessorService{
c: client,
numWorkers: 1,
bulkActions: 1000,
bulkSize: 5 << 20, // 5 MB
initialTimeout: time.Duration(200) * time.Millisecond,
maxTimeout: time.Duration(10000) * time.Millisecond,
}
}
// BulkBeforeFunc defines the signature of callbacks that are executed
// before a commit to Elasticsearch.
type BulkBeforeFunc func(executionId int64, requests []BulkableRequest)
// BulkAfterFunc defines the signature of callbacks that are executed
// after a commit to Elasticsearch. The err parameter signals an error.
type BulkAfterFunc func(executionId int64, requests []BulkableRequest, response *BulkResponse, err error)
// Before specifies a function to be executed before bulk requests get comitted
// to Elasticsearch.
func (s *BulkProcessorService) Before(fn BulkBeforeFunc) *BulkProcessorService {
s.beforeFn = fn
return s
}
// After specifies a function to be executed when bulk requests have been
// comitted to Elasticsearch. The After callback executes both when the
// commit was successful as well as on failures.
func (s *BulkProcessorService) After(fn BulkAfterFunc) *BulkProcessorService {
s.afterFn = fn
return s
}
// Name is an optional name to identify this bulk processor.
func (s *BulkProcessorService) Name(name string) *BulkProcessorService {
s.name = name
return s
}
// Workers is the number of concurrent workers allowed to be
// executed. Defaults to 1 and must be greater or equal to 1.
func (s *BulkProcessorService) Workers(num int) *BulkProcessorService {
s.numWorkers = num
return s
}
// BulkActions specifies when to flush based on the number of actions
// currently added. Defaults to 1000 and can be set to -1 to be disabled.
func (s *BulkProcessorService) BulkActions(bulkActions int) *BulkProcessorService {
s.bulkActions = bulkActions
return s
}
// BulkSize specifies when to flush based on the size (in bytes) of the actions
// currently added. Defaults to 5 MB and can be set to -1 to be disabled.
func (s *BulkProcessorService) BulkSize(bulkSize int) *BulkProcessorService {
s.bulkSize = bulkSize
return s
}
// FlushInterval specifies when to flush at the end of the given interval.
// This is disabled by default. If you want the bulk processor to
// operate completely asynchronously, set both BulkActions and BulkSize to
// -1 and set the FlushInterval to a meaningful interval.
func (s *BulkProcessorService) FlushInterval(interval time.Duration) *BulkProcessorService {
s.flushInterval = interval
return s
}
// Stats tells bulk processor to gather stats while running.
// Use Stats to return the stats. This is disabled by default.
func (s *BulkProcessorService) Stats(wantStats bool) *BulkProcessorService {
s.wantStats = wantStats
return s
}
// Do creates a new BulkProcessor and starts it.
// Consider the BulkProcessor as a running instance that accepts bulk requests
// and commits them to Elasticsearch, spreading the work across one or more
// workers.
//
// You can interoperate with the BulkProcessor returned by Do, e.g. Start and
// Stop (or Close) it.
//
// Calling Do several times returns new BulkProcessors. You probably don't
// want to do this. BulkProcessorService implements just a builder pattern.
func (s *BulkProcessorService) Do() (*BulkProcessor, error) {
p := newBulkProcessor(
s.c,
s.beforeFn,
s.afterFn,
s.name,
s.numWorkers,
s.bulkActions,
s.bulkSize,
s.flushInterval,
s.wantStats,
s.initialTimeout,
s.maxTimeout)
err := p.Start()
if err != nil {
return nil, err
}
return p, nil
}
// -- Bulk Processor Statistics --
// BulkProcessorStats contains various statistics of a bulk processor
// while it is running. Use the Stats func to return it while running.
type BulkProcessorStats struct {
Flushed int64 // number of times the flush interval has been invoked
Committed int64 // # of times workers committed bulk requests
Indexed int64 // # of requests indexed
Created int64 // # of requests that ES reported as creates (201)
Updated int64 // # of requests that ES reported as updates
Deleted int64 // # of requests that ES reported as deletes
Succeeded int64 // # of requests that ES reported as successful
Failed int64 // # of requests that ES reported as failed
Workers []*BulkProcessorWorkerStats // stats for each worker
}
// BulkProcessorWorkerStats represents per-worker statistics.
type BulkProcessorWorkerStats struct {
Queued int64 // # of requests queued in this worker
LastDuration time.Duration // duration of last commit
}
// newBulkProcessorStats initializes and returns a BulkProcessorStats struct.
func newBulkProcessorStats(workers int) *BulkProcessorStats {
stats := &BulkProcessorStats{
Workers: make([]*BulkProcessorWorkerStats, workers),
}
for i := 0; i < workers; i++ {
stats.Workers[i] = &BulkProcessorWorkerStats{}
}
return stats
}
func (st *BulkProcessorStats) dup() *BulkProcessorStats {
dst := new(BulkProcessorStats)
dst.Flushed = st.Flushed
dst.Committed = st.Committed
dst.Indexed = st.Indexed
dst.Created = st.Created
dst.Updated = st.Updated
dst.Deleted = st.Deleted
dst.Succeeded = st.Succeeded
dst.Failed = st.Failed
for _, src := range st.Workers {
dst.Workers = append(dst.Workers, src.dup())
}
return dst
}
func (st *BulkProcessorWorkerStats) dup() *BulkProcessorWorkerStats {
dst := new(BulkProcessorWorkerStats)
dst.Queued = st.Queued
dst.LastDuration = st.LastDuration
return dst
}
// -- Bulk Processor --
// BulkProcessor encapsulates a task that accepts bulk requests and
// orchestrates committing them to Elasticsearch via one or more workers.
//
// BulkProcessor is returned by setting up a BulkProcessorService and
// calling the Do method.
type BulkProcessor struct {
c *Client
beforeFn BulkBeforeFunc
afterFn BulkAfterFunc
name string
bulkActions int
bulkSize int
numWorkers int
executionId int64
requestsC chan BulkableRequest
workerWg sync.WaitGroup
workers []*bulkWorker
flushInterval time.Duration
flusherStopC chan struct{}
wantStats bool
initialTimeout time.Duration // initial wait time before retry on errors
maxTimeout time.Duration // max time to wait for retry on errors
startedMu sync.Mutex // guards the following block
started bool
statsMu sync.Mutex // guards the following block
stats *BulkProcessorStats
}
func newBulkProcessor(
client *Client,
beforeFn BulkBeforeFunc,
afterFn BulkAfterFunc,
name string,
numWorkers int,
bulkActions int,
bulkSize int,
flushInterval time.Duration,
wantStats bool,
initialTimeout time.Duration,
maxTimeout time.Duration) *BulkProcessor {
return &BulkProcessor{
c: client,
beforeFn: beforeFn,
afterFn: afterFn,
name: name,
numWorkers: numWorkers,
bulkActions: bulkActions,
bulkSize: bulkSize,
flushInterval: flushInterval,
wantStats: wantStats,
initialTimeout: initialTimeout,
maxTimeout: maxTimeout,
}
}
// Start starts the bulk processor. If the processor is already started,
// nil is returned.
func (p *BulkProcessor) Start() error {
p.startedMu.Lock()
defer p.startedMu.Unlock()
if p.started {
return nil
}
// We must have at least one worker.
if p.numWorkers < 1 {
p.numWorkers = 1
}
p.requestsC = make(chan BulkableRequest)
p.executionId = 0
p.stats = newBulkProcessorStats(p.numWorkers)
// Create and start up workers.
p.workers = make([]*bulkWorker, p.numWorkers)
for i := 0; i < p.numWorkers; i++ {
p.workerWg.Add(1)
p.workers[i] = newBulkWorker(p, i)
go p.workers[i].work()
}
// Start the ticker for flush (if enabled)
if int64(p.flushInterval) > 0 {
p.flusherStopC = make(chan struct{})
go p.flusher(p.flushInterval)
}
p.started = true
return nil
}
// Stop is an alias for Close.
func (p *BulkProcessor) Stop() error {
return p.Close()
}
// Close stops the bulk processor previously started with Do.
// If it is already stopped, this is a no-op and nil is returned.
//
// By implementing Close, BulkProcessor implements the io.Closer interface.
func (p *BulkProcessor) Close() error {
p.startedMu.Lock()
defer p.startedMu.Unlock()
// Already stopped? Do nothing.
if !p.started {
return nil
}
// Stop flusher (if enabled)
if p.flusherStopC != nil {
p.flusherStopC <- struct{}{}
<-p.flusherStopC
close(p.flusherStopC)
p.flusherStopC = nil
}
// Stop all workers.
close(p.requestsC)
p.workerWg.Wait()
p.started = false
return nil
}
// Stats returns the latest bulk processor statistics.
// Collecting stats must be enabled first by calling Stats(true) on
// the service that created this processor.
func (p *BulkProcessor) Stats() BulkProcessorStats {
p.statsMu.Lock()
defer p.statsMu.Unlock()
return *p.stats.dup()
}
// Add adds a single request to commit by the BulkProcessorService.
//
// The caller is responsible for setting the index and type on the request.
func (p *BulkProcessor) Add(request BulkableRequest) {
p.requestsC <- request
}
// Flush manually asks all workers to commit their outstanding requests.
// It returns only when all workers acknowledge completion.
func (p *BulkProcessor) Flush() error {
p.statsMu.Lock()
p.stats.Flushed++
p.statsMu.Unlock()
for _, w := range p.workers {
w.flushC <- struct{}{}
<-w.flushAckC // wait for completion
}
return nil
}
// flusher is a single goroutine that periodically asks all workers to
// commit their outstanding bulk requests. It is only started if
// FlushInterval is greater than 0.
func (p *BulkProcessor) flusher(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C: // Periodic flush
p.Flush() // TODO swallow errors here?
case <-p.flusherStopC:
p.flusherStopC <- struct{}{}
return
}
}
}
// -- Bulk Worker --
// bulkWorker encapsulates a single worker, running in a goroutine,
// receiving bulk requests and eventually committing them to Elasticsearch.
// It is strongly bound to a BulkProcessor.
type bulkWorker struct {
p *BulkProcessor
i int
bulkActions int
bulkSize int
service *BulkService
flushC chan struct{}
flushAckC chan struct{}
}
// newBulkWorker creates a new bulkWorker instance.
func newBulkWorker(p *BulkProcessor, i int) *bulkWorker {
return &bulkWorker{
p: p,
i: i,
bulkActions: p.bulkActions,
bulkSize: p.bulkSize,
service: NewBulkService(p.c),
flushC: make(chan struct{}),
flushAckC: make(chan struct{}),
}
}
// work waits for bulk requests and manual flush calls on the respective
// channels and is invoked as a goroutine when the bulk processor is started.
func (w *bulkWorker) work() {
defer func() {
w.p.workerWg.Done()
close(w.flushAckC)
close(w.flushC)
}()
var stop bool
for !stop {
select {
case req, open := <-w.p.requestsC:
if open {
// Received a new request
w.service.Add(req)
if w.commitRequired() {
w.commit() // TODO swallow errors here?
}
} else {
// Channel closed: Stop.
stop = true
if w.service.NumberOfActions() > 0 {
w.commit() // TODO swallow errors here?
}
}
case <-w.flushC:
// Commit outstanding requests
if w.service.NumberOfActions() > 0 {
w.commit() // TODO swallow errors here?
}
w.flushAckC <- struct{}{}
}
}
}
// commit commits the bulk requests in the given service,
// invoking callbacks as specified.
func (w *bulkWorker) commit() error {
var res *BulkResponse
// commitFunc will commit bulk requests and, on failure, be retried
// via exponential backoff
commitFunc := func() error {
var err error
res, err = w.service.Do()
return err
}
// notifyFunc will be called if retry fails
notifyFunc := func(err error, d time.Duration) {
w.p.c.errorf("elastic: bulk processor %q failed but will retry in %v: %v", w.p.name, d, err)
}
id := atomic.AddInt64(&w.p.executionId, 1)
// Update # documents in queue before eventual retries
w.p.statsMu.Lock()
if w.p.wantStats {
w.p.stats.Workers[w.i].Queued = int64(len(w.service.requests))
}
w.p.statsMu.Unlock()
// Save requests because they will be reset in commitFunc
reqs := w.service.requests
// Invoke before callback
if w.p.beforeFn != nil {
w.p.beforeFn(id, reqs)
}
// Commit bulk requests
policy := backoff.NewExponentialBackoff(w.p.initialTimeout, w.p.maxTimeout).SendStop(true)
err := backoff.RetryNotify(commitFunc, policy, notifyFunc)
w.updateStats(res)
if err != nil {
w.p.c.errorf("elastic: bulk processor %q failed: %v", w.p.name, err)
}
// Invoke after callback
if w.p.afterFn != nil {
w.p.afterFn(id, reqs, res, err)
}
return err
}
func (w *bulkWorker) updateStats(res *BulkResponse) {
// Update stats
if res != nil {
w.p.statsMu.Lock()
if w.p.wantStats {
w.p.stats.Committed++
if res != nil {
w.p.stats.Indexed += int64(len(res.Indexed()))
w.p.stats.Created += int64(len(res.Created()))
w.p.stats.Updated += int64(len(res.Updated()))
w.p.stats.Deleted += int64(len(res.Deleted()))
w.p.stats.Succeeded += int64(len(res.Succeeded()))
w.p.stats.Failed += int64(len(res.Failed()))
}
w.p.stats.Workers[w.i].Queued = int64(len(w.service.requests))
w.p.stats.Workers[w.i].LastDuration = time.Duration(int64(res.Took)) * time.Millisecond
}
w.p.statsMu.Unlock()
}
}
// commitRequired returns true if the service has to commit its
// bulk requests. This can be either because the number of actions
// or the estimated size in bytes is larger than specified in the
// BulkProcessorService.
func (w *bulkWorker) commitRequired() bool {
if w.bulkActions >= 0 && w.service.NumberOfActions() >= w.bulkActions {
return true
}
if w.bulkSize >= 0 && w.service.EstimatedSizeInBytes() >= int64(w.bulkSize) {
return true
}
return false
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/awol2010ex/github.com-olivere-elastic.git
git@gitee.com:awol2010ex/github.com-olivere-elastic.git
awol2010ex
github.com-olivere-elastic
github.com-olivere-elastic
v3.0.62

搜索帮助