1 Star 1 Fork 0

宇宙蒙面侠X/github.com-olivere-elastic

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
bulk_insert.go 4.43 KB
一键复制 编辑 原始数据 按行查看 历史
Oliver Eilhard 提交于 2017-02-10 14:58 . Fix documentation links
// Copyright 2012-present Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.
// BulkInsert illustrates how to bulk insert documents into Elasticsearch.
//
// It uses two goroutines to do so. The first creates a simple document
// and sends it to the second via a channel. The second goroutine collects
// those documents, creates a bulk request that is added to a Bulk service
// and committed to Elasticsearch after reaching a number of documents.
// The number of documents after which a commit happens can be specified
// via the "bulk-size" flag.
//
// See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-bulk.html
// for details on the Bulk API in Elasticsearch.
//
// Example
//
// Bulk index 100.000 documents into the index "warehouse", type "product",
// committing every set of 1.000 documents.
//
// bulk_insert -index=warehouse -type=product -n=100000 -bulk-size=1000
//
package main
import (
"encoding/base64"
"errors"
"flag"
"fmt"
"log"
"math/rand"
"sync/atomic"
"time"
"golang.org/x/net/context"
"golang.org/x/sync/errgroup"
"gopkg.in/olivere/elastic.v5"
)
func main() {
var (
url = flag.String("url", "http://localhost:9200", "Elasticsearch URL")
index = flag.String("index", "", "Elasticsearch index name")
typ = flag.String("type", "", "Elasticsearch type name")
sniff = flag.Bool("sniff", true, "Enable or disable sniffing")
n = flag.Int("n", 0, "Number of documents to bulk insert")
bulkSize = flag.Int("bulk-size", 0, "Number of documents to collect before committing")
)
flag.Parse()
log.SetFlags(0)
rand.Seed(time.Now().UnixNano())
if *url == "" {
log.Fatal("missing url parameter")
}
if *index == "" {
log.Fatal("missing index parameter")
}
if *typ == "" {
log.Fatal("missing type parameter")
}
if *n <= 0 {
log.Fatal("n must be a positive number")
}
if *bulkSize <= 0 {
log.Fatal("bulk-size must be a positive number")
}
// Create an Elasticsearch client
client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff))
if err != nil {
log.Fatal(err)
}
// Setup a group of goroutines from the excellent errgroup package
g, ctx := errgroup.WithContext(context.TODO())
// The first goroutine will emit documents and send it to the second goroutine
// via the docsc channel.
// The second Goroutine will simply bulk insert the documents.
type doc struct {
ID string `json:"id"`
Timestamp time.Time `json:"@timestamp"`
}
docsc := make(chan doc)
begin := time.Now()
// Goroutine to create documents
g.Go(func() error {
defer close(docsc)
buf := make([]byte, 32)
for i := 0; i < *n; i++ {
// Generate a random ID
_, err := rand.Read(buf)
if err != nil {
return err
}
id := base64.URLEncoding.EncodeToString(buf)
// Construct the document
d := doc{
ID: id,
Timestamp: time.Now(),
}
// Send over to 2nd goroutine, or cancel
select {
case docsc <- d:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
// Second goroutine will consume the documents sent from the first and bulk insert into ES
var total uint64
g.Go(func() error {
bulk := client.Bulk().Index(*index).Type(*typ)
for d := range docsc {
// Simple progress
current := atomic.AddUint64(&total, 1)
dur := time.Since(begin).Seconds()
sec := int(dur)
pps := int64(float64(current) / dur)
fmt.Printf("%10d | %6d req/s | %02d:%02d\r", current, pps, sec/60, sec%60)
// Enqueue the document
bulk.Add(elastic.NewBulkIndexRequest().Id(d.ID).Doc(d))
if bulk.NumberOfActions() >= *bulkSize {
// Commit
res, err := bulk.Do(ctx)
if err != nil {
return err
}
if res.Errors {
// Look up the failed documents with res.Failed(), and e.g. recommit
return errors.New("bulk commit failed")
}
// "bulk" is reset after Do, so you can reuse it
}
select {
default:
case <-ctx.Done():
return ctx.Err()
}
}
// Commit the final batch before exiting
if bulk.NumberOfActions() > 0 {
_, err = bulk.Do(ctx)
if err != nil {
return err
}
}
return nil
})
// Wait until all goroutines are finished
if err := g.Wait(); err != nil {
log.Fatal(err)
}
// Final results
dur := time.Since(begin).Seconds()
sec := int(dur)
pps := int64(float64(total) / dur)
fmt.Printf("%10d | %6d req/s | %02d:%02d\n", total, pps, sec/60, sec%60)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/awol2010ex/github.com-olivere-elastic.git
git@gitee.com:awol2010ex/github.com-olivere-elastic.git
awol2010ex
github.com-olivere-elastic
github.com-olivere-elastic
v5.0.33

搜索帮助

A270a887 8829481 3d7a4017 8829481