Ai
3 Star 0 Fork 1

tym_hmm/mongo-tool

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
csv.go 4.98 KB
一键复制 编辑 原始数据 按行查看 历史
天蝎儿 提交于 2021-11-03 19:30 +08:00 . 修改模块名称
// Copyright (C) MongoDB, Inc. 2014-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package mongoimport
import (
gocsv "encoding/csv"
"fmt"
"io"
"gitee.com/tym_hmm/mongo-tool/mongoimport/csv"
"go.mongodb.org/mongo-driver/bson"
)
// CSVInputReader implements the InputReader interface for CSV input types.
type CSVInputReader struct {
// colSpecs is a list of column specifications in the BSON documents to be imported
colSpecs []ColumnSpec
// csvReader is the underlying reader used to read data in from the CSV or CSV file
csvReader *csv.Reader
// csvRejectWriter is where coercion-failed rows are written, if applicable
csvRejectWriter *gocsv.Writer
// csvRecord stores each line of input we read from the underlying reader
csvRecord []string
// numProcessed tracks the number of CSV records processed by the underlying reader
numProcessed uint64
// numDecoders is the number of concurrent goroutines to use for decoding
numDecoders int
// embedded sizeTracker exposes the Size() method to check the number of bytes read so far
sizeTracker
// ignoreBlanks is whether empty fields should be ignored
ignoreBlanks bool
// useArrayIndexFields is whether field names include array indexes
useArrayIndexFields bool
}
// CSVConverter implements the Converter interface for CSV input.
type CSVConverter struct {
colSpecs []ColumnSpec
data []string
index uint64
ignoreBlanks bool
useArrayIndexFields bool
rejectWriter *gocsv.Writer
}
// NewCSVInputReader returns a CSVInputReader configured to read data from the
// given io.Reader, extracting only the specified columns using exactly "numDecoders"
// goroutines.
func NewCSVInputReader(colSpecs []ColumnSpec, in io.Reader, rejects io.Writer, numDecoders int, ignoreBlanks bool, useArrayIndexFields bool) *CSVInputReader {
szCount := newSizeTrackingReader(newBomDiscardingReader(in))
csvReader := csv.NewReader(szCount)
// allow variable number of colSpecs in document
csvReader.FieldsPerRecord = -1
csvReader.TrimLeadingSpace = true
return &CSVInputReader{
colSpecs: colSpecs,
csvReader: csvReader,
csvRejectWriter: gocsv.NewWriter(rejects),
numProcessed: uint64(0),
numDecoders: numDecoders,
sizeTracker: szCount,
ignoreBlanks: ignoreBlanks,
useArrayIndexFields: useArrayIndexFields,
}
}
// ReadAndValidateHeader reads the header from the underlying reader and validates
// the header fields. It sets err if the read/validation fails.
func (r *CSVInputReader) ReadAndValidateHeader() (err error) {
fields, err := r.csvReader.Read()
if err != nil {
return err
}
r.colSpecs = ParseAutoHeaders(fields)
return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields)
}
// ReadAndValidateHeader reads the header from the underlying reader and validates
// the header fields. It sets err if the read/validation fails.
func (r *CSVInputReader) ReadAndValidateTypedHeader(parseGrace ParseGrace) (err error) {
fields, err := r.csvReader.Read()
if err != nil {
return err
}
r.colSpecs, err = ParseTypedHeaders(fields, parseGrace)
if err != nil {
return err
}
return validateReaderFields(ColumnNames(r.colSpecs), r.useArrayIndexFields)
}
// StreamDocument takes a boolean indicating if the documents should be streamed
// in read order and a channel on which to stream the documents processed from
// the underlying reader. Returns a non-nil error if streaming fails.
func (r *CSVInputReader) StreamDocument(ordered bool, readDocs chan bson.D) (retErr error) {
csvRecordChan := make(chan Converter, r.numDecoders)
csvErrChan := make(chan error)
// begin reading from source
go func() {
var err error
for {
r.csvRecord, err = r.csvReader.Read()
if err != nil {
close(csvRecordChan)
if err == io.EOF {
csvErrChan <- nil
} else {
r.numProcessed++
csvErrChan <- fmt.Errorf("read error on entry #%v: %v", r.numProcessed, err)
}
return
}
csvRecordChan <- CSVConverter{
colSpecs: r.colSpecs,
data: r.csvRecord,
index: r.numProcessed,
ignoreBlanks: r.ignoreBlanks,
useArrayIndexFields: r.useArrayIndexFields,
rejectWriter: r.csvRejectWriter,
}
r.numProcessed++
}
}()
go func() {
csvErrChan <- streamDocuments(ordered, r.numDecoders, csvRecordChan, readDocs)
}()
return channelQuorumError(csvErrChan, 2)
}
// Convert implements the Converter interface for CSV input. It converts a
// CSVConverter struct to a BSON document.
func (c CSVConverter) Convert() (b bson.D, err error) {
b, err = tokensToBSON(
c.colSpecs,
c.data,
c.index,
c.ignoreBlanks,
c.useArrayIndexFields,
)
if _, ok := err.(coercionError); ok {
c.Print()
err = nil
}
return
}
func (c CSVConverter) Print() {
c.rejectWriter.Write(c.data)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/tym_hmm/mongo-tool.git
git@gitee.com:tym_hmm/mongo-tool.git
tym_hmm
mongo-tool
mongo-tool
v1.0.5

搜索帮助