1 Star 0 Fork 0

青文杰 / mongo-go-driver

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
collection.go 30.05 KB
一键复制 编辑 原始数据 按行查看 历史
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package mongo
import (
"context"
"errors"
"strings"
"github.com/mongodb/mongo-go-driver/bson/bsoncodec"
"github.com/mongodb/mongo-go-driver/mongo/options"
"github.com/mongodb/mongo-go-driver/mongo/readconcern"
"github.com/mongodb/mongo-go-driver/mongo/readpref"
"github.com/mongodb/mongo-go-driver/mongo/writeconcern"
"github.com/mongodb/mongo-go-driver/x/bsonx"
"github.com/mongodb/mongo-go-driver/x/mongo/driver"
"github.com/mongodb/mongo-go-driver/x/mongo/driver/session"
"github.com/mongodb/mongo-go-driver/x/network/command"
"github.com/mongodb/mongo-go-driver/x/network/description"
)
// Collection performs operations on a given collection.
type Collection struct {
client *Client
db *Database
name string
readConcern *readconcern.ReadConcern
writeConcern *writeconcern.WriteConcern
readPreference *readpref.ReadPref
readSelector description.ServerSelector
writeSelector description.ServerSelector
registry *bsoncodec.Registry
}
func newCollection(db *Database, name string, opts ...*options.CollectionOptions) *Collection {
collOpt := options.MergeCollectionOptions(opts...)
rc := db.readConcern
if collOpt.ReadConcern != nil {
rc = collOpt.ReadConcern
}
wc := db.writeConcern
if collOpt.WriteConcern != nil {
wc = collOpt.WriteConcern
}
rp := db.readPreference
if collOpt.ReadPreference != nil {
rp = collOpt.ReadPreference
}
reg := db.registry
if collOpt.Registry != nil {
reg = collOpt.Registry
}
readSelector := description.CompositeSelector([]description.ServerSelector{
description.ReadPrefSelector(rp),
description.LatencySelector(db.client.localThreshold),
})
writeSelector := description.CompositeSelector([]description.ServerSelector{
description.WriteSelector(),
description.LatencySelector(db.client.localThreshold),
})
coll := &Collection{
client: db.client,
db: db,
name: name,
readPreference: rp,
readConcern: rc,
writeConcern: wc,
readSelector: readSelector,
writeSelector: writeSelector,
registry: reg,
}
return coll
}
func (coll *Collection) copy() *Collection {
return &Collection{
client: coll.client,
db: coll.db,
name: coll.name,
readConcern: coll.readConcern,
writeConcern: coll.writeConcern,
readPreference: coll.readPreference,
readSelector: coll.readSelector,
writeSelector: coll.writeSelector,
registry: coll.registry,
}
}
// Clone creates a copy of this collection with updated options, if any are given.
func (coll *Collection) Clone(opts ...*options.CollectionOptions) (*Collection, error) {
copyColl := coll.copy()
optsColl := options.MergeCollectionOptions(opts...)
if optsColl.ReadConcern != nil {
copyColl.readConcern = optsColl.ReadConcern
}
if optsColl.WriteConcern != nil {
copyColl.writeConcern = optsColl.WriteConcern
}
if optsColl.ReadPreference != nil {
copyColl.readPreference = optsColl.ReadPreference
}
if optsColl.Registry != nil {
copyColl.registry = optsColl.Registry
}
copyColl.readSelector = description.CompositeSelector([]description.ServerSelector{
description.ReadPrefSelector(copyColl.readPreference),
description.LatencySelector(copyColl.client.localThreshold),
})
return copyColl, nil
}
// Name provides access to the name of the collection.
func (coll *Collection) Name() string {
return coll.name
}
// namespace returns the namespace of the collection.
func (coll *Collection) namespace() command.Namespace {
return command.NewNamespace(coll.db.name, coll.name)
}
// Database provides access to the database that contains the collection.
func (coll *Collection) Database() *Database {
return coll.db
}
// BulkWrite performs a bulk write operation.
//
// See https://docs.mongodb.com/manual/core/bulk-write-operations/.
func (coll *Collection) BulkWrite(ctx context.Context, models []WriteModel,
opts ...*options.BulkWriteOptions) (*BulkWriteResult, error) {
if len(models) == 0 {
return nil, ErrEmptySlice
}
if ctx == nil {
ctx = context.Background()
}
sess := sessionFromContext(ctx)
err := coll.client.ValidSession(sess)
if err != nil {
return nil, err
}
dispatchModels := make([]driver.WriteModel, len(models))
for i, model := range models {
if model == nil {
return nil, ErrNilDocument
}
dispatchModels[i] = model.convertModel()
}
res, err := driver.BulkWrite(
ctx,
coll.namespace(),
dispatchModels,
coll.client.topology,
coll.writeSelector,
coll.client.id,
coll.client.topology.SessionPool,
coll.client.retryWrites,
sess,
coll.writeConcern,
coll.client.clock,
coll.registry,
opts...,
)
if err != nil {
if conv, ok := err.(driver.BulkWriteException); ok {
return &BulkWriteResult{}, BulkWriteException{
WriteConcernError: convertWriteConcernError(conv.WriteConcernError),
WriteErrors: convertBulkWriteErrors(conv.WriteErrors),
}
}
return &BulkWriteResult{}, replaceTopologyErr(err)
}
return &BulkWriteResult{
InsertedCount: res.InsertedCount,
MatchedCount: res.MatchedCount,
ModifiedCount: res.ModifiedCount,
DeletedCount: res.DeletedCount,
UpsertedCount: res.UpsertedCount,
UpsertedIDs: res.UpsertedIDs,
}, nil
}
// InsertOne inserts a single document into the collection.
func (coll *Collection) InsertOne(ctx context.Context, document interface{},
opts ...*options.InsertOneOptions) (*InsertOneResult, error) {
if ctx == nil {
ctx = context.Background()
}
doc, insertedID, err := transformAndEnsureID(coll.registry, document)
if err != nil {
return nil, err
}
sess := sessionFromContext(ctx)
err = coll.client.ValidSession(sess)
if err != nil {
return nil, err
}
wc := coll.writeConcern
if sess != nil && sess.TransactionRunning() {
wc = nil
}
oldns := coll.namespace()
cmd := command.Insert{
NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
Docs: []bsonx.Doc{doc},
WriteConcern: wc,
Session: sess,
Clock: coll.client.clock,
}
// convert to InsertManyOptions so these can be argued to dispatch.Insert
insertOpts := make([]*options.InsertManyOptions, len(opts))
for i, opt := range opts {
insertOpts[i] = options.InsertMany()
insertOpts[i].BypassDocumentValidation = opt.BypassDocumentValidation
}
res, err := driver.Insert(
ctx, cmd,
coll.client.topology,
coll.writeSelector,
coll.client.id,
coll.client.topology.SessionPool,
coll.client.retryWrites,
insertOpts...,
)
rr, err := processWriteError(res.WriteConcernError, res.WriteErrors, err)
if rr&rrOne == 0 {
return nil, err
}
return &InsertOneResult{InsertedID: insertedID}, err
}
// InsertMany inserts the provided documents.
func (coll *Collection) InsertMany(ctx context.Context, documents []interface{},
opts ...*options.InsertManyOptions) (*InsertManyResult, error) {
if ctx == nil {
ctx = context.Background()
}
if len(documents) == 0 {
return nil, ErrEmptySlice
}
result := make([]interface{}, len(documents))
docs := make([]bsonx.Doc, len(documents))
for i, doc := range documents {
if doc == nil {
return nil, ErrNilDocument
}
bdoc, insertedID, err := transformAndEnsureID(coll.registry, doc)
if err != nil {
return nil, err
}
docs[i] = bdoc
result[i] = insertedID
}
sess := sessionFromContext(ctx)
err := coll.client.ValidSession(sess)
if err != nil {
return nil, err
}
wc := coll.writeConcern
if sess != nil && sess.TransactionRunning() {
wc = nil
}
oldns := coll.namespace()
cmd := command.Insert{
NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
Docs: docs,
WriteConcern: wc,
Session: sess,
Clock: coll.client.clock,
}
res, err := driver.Insert(
ctx, cmd,
coll.client.topology,
coll.writeSelector,
coll.client.id,
coll.client.topology.SessionPool,
coll.client.retryWrites,
opts...,
)
switch err {
case nil:
case command.ErrUnacknowledgedWrite:
return &InsertManyResult{InsertedIDs: result}, ErrUnacknowledgedWrite
default:
return nil, replaceTopologyErr(err)
}
if len(res.WriteErrors) > 0 || res.WriteConcernError != nil {
bwErrors := make([]BulkWriteError, 0, len(res.WriteErrors))
for _, we := range res.WriteErrors {
bwErrors = append(bwErrors, BulkWriteError{
WriteError{
Index: we.Index,
Code: we.Code,
Message: we.ErrMsg,
},
nil,
})
}
err = BulkWriteException{
WriteErrors: bwErrors,
WriteConcernError: convertWriteConcernError(res.WriteConcernError),
}
}
return &InsertManyResult{InsertedIDs: result}, err
}
// DeleteOne deletes a single document from the collection.
func (coll *Collection) DeleteOne(ctx context.Context, filter interface{},
opts ...*options.DeleteOptions) (*DeleteResult, error) {
if ctx == nil {
ctx = context.Background()
}
f, err := transformDocument(coll.registry, filter)
if err != nil {
return nil, err
}
deleteDocs := []bsonx.Doc{
{
{"q", bsonx.Document(f)},
{"limit", bsonx.Int32(1)},
},
}
sess := sessionFromContext(ctx)
err = coll.client.ValidSession(sess)
if err != nil {
return nil, err
}
wc := coll.writeConcern
if sess != nil && sess.TransactionRunning() {
wc = nil
}
oldns := coll.namespace()
cmd := command.Delete{
NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
Deletes: deleteDocs,
WriteConcern: wc,
Session: sess,
Clock: coll.client.clock,
}
res, err := driver.Delete(
ctx, cmd,
coll.client.topology,
coll.writeSelector,
coll.client.id,
coll.client.topology.SessionPool,
coll.client.retryWrites,
opts...,
)
rr, err := processWriteError(res.WriteConcernError, res.WriteErrors, err)
if rr&rrOne == 0 {
return nil, err
}
return &DeleteResult{DeletedCount: int64(res.N)}, err
}
// DeleteMany deletes multiple documents from the collection.
func (coll *Collection) DeleteMany(ctx context.Context, filter interface{},
opts ...*options.DeleteOptions) (*DeleteResult, error) {
if ctx == nil {
ctx = context.Background()
}
f, err := transformDocument(coll.registry, filter)
if err != nil {
return nil, err
}
deleteDocs := []bsonx.Doc{{{"q", bsonx.Document(f)}, {"limit", bsonx.Int32(0)}}}
sess := sessionFromContext(ctx)
err = coll.client.ValidSession(sess)
if err != nil {
return nil, err
}
wc := coll.writeConcern
if sess != nil && sess.TransactionRunning() {
wc = nil
}
oldns := coll.namespace()
cmd := command.Delete{
NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
Deletes: deleteDocs,
WriteConcern: wc,
Session: sess,
Clock: coll.client.clock,
}
res, err := driver.Delete(
ctx, cmd,
coll.client.topology,
coll.writeSelector,
coll.client.id,
coll.client.topology.SessionPool,
false,
opts...,
)
rr, err := processWriteError(res.WriteConcernError, res.WriteErrors, err)
if rr&rrMany == 0 {
return nil, err
}
return &DeleteResult{DeletedCount: int64(res.N)}, err
}
func (coll *Collection) updateOrReplaceOne(ctx context.Context, filter,
update bsonx.Doc, sess *session.Client, opts ...*options.UpdateOptions) (*UpdateResult, error) {
// TODO: should session be taken from ctx or left as argument?
if ctx == nil {
ctx = context.Background()
}
updateDocs := []bsonx.Doc{
{
{"q", bsonx.Document(filter)},
{"u", bsonx.Document(update)},
{"multi", bsonx.Boolean(false)},
},
}
wc := coll.writeConcern
if sess != nil && sess.TransactionRunning() {
wc = nil
}
oldns := coll.namespace()
cmd := command.Update{
NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
Docs: updateDocs,
WriteConcern: wc,
Session: sess,
Clock: coll.client.clock,
}
r, err := driver.Update(
ctx, cmd,
coll.client.topology,
coll.writeSelector,
coll.client.id,
coll.client.topology.SessionPool,
coll.client.retryWrites,
opts...,
)
if err != nil && err != command.ErrUnacknowledgedWrite {
return nil, replaceTopologyErr(err)
}
res := &UpdateResult{
MatchedCount: r.MatchedCount,
ModifiedCount: r.ModifiedCount,
UpsertedCount: int64(len(r.Upserted)),
}
if len(r.Upserted) > 0 {
res.UpsertedID = r.Upserted[0].ID
res.MatchedCount--
}
rr, err := processWriteError(r.WriteConcernError, r.WriteErrors, err)
if rr&rrOne == 0 {
return nil, err
}
return res, err
}
// UpdateOne updates a single document in the collection.
func (coll *Collection) UpdateOne(ctx context.Context, filter interface{}, update interface{},
opts ...*options.UpdateOptions) (*UpdateResult, error) {
if ctx == nil {
ctx = context.Background()
}
f, err := transformDocument(coll.registry, filter)
if err != nil {
return nil, err
}
u, err := transformDocument(coll.registry, update)
if err != nil {
return nil, err
}
if err := ensureDollarKey(u); err != nil {
return nil, err
}
sess := sessionFromContext(ctx)
err = coll.client.ValidSession(sess)
if err != nil {
return nil, err
}
return coll.updateOrReplaceOne(ctx, f, u, sess, opts...)
}
// UpdateMany updates multiple documents in the collection.
func (coll *Collection) UpdateMany(ctx context.Context, filter interface{}, update interface{},
opts ...*options.UpdateOptions) (*UpdateResult, error) {
if ctx == nil {
ctx = context.Background()
}
f, err := transformDocument(coll.registry, filter)
if err != nil {
return nil, err
}
u, err := transformDocument(coll.registry, update)
if err != nil {
return nil, err
}
if err = ensureDollarKey(u); err != nil {
return nil, err
}
updateDocs := []bsonx.Doc{
{
{"q", bsonx.Document(f)},
{"u", bsonx.Document(u)},
{"multi", bsonx.Boolean(true)},
},
}
sess := sessionFromContext(ctx)
err = coll.client.ValidSession(sess)
if err != nil {
return nil, err
}
wc := coll.writeConcern
if sess != nil && sess.TransactionRunning() {
wc = nil
}
oldns := coll.namespace()
cmd := command.Update{
NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
Docs: updateDocs,
WriteConcern: wc,
Session: sess,
Clock: coll.client.clock,
}
r, err := driver.Update(
ctx, cmd,
coll.client.topology,
coll.writeSelector,
coll.client.id,
coll.client.topology.SessionPool,
false,
opts...,
)
if err != nil && err != command.ErrUnacknowledgedWrite {
return nil, replaceTopologyErr(err)
}
res := &UpdateResult{
MatchedCount: r.MatchedCount,
ModifiedCount: r.ModifiedCount,
UpsertedCount: int64(len(r.Upserted)),
}
// TODO(skriptble): Is this correct? Do we only return the first upserted ID for an UpdateMany?
if len(r.Upserted) > 0 {
res.UpsertedID = r.Upserted[0].ID
res.MatchedCount--
}
rr, err := processWriteError(r.WriteConcernError, r.WriteErrors, err)
if rr&rrMany == 0 {
return nil, err
}
return res, err
}
// ReplaceOne replaces a single document in the collection.
func (coll *Collection) ReplaceOne(ctx context.Context, filter interface{},
replacement interface{}, opts ...*options.ReplaceOptions) (*UpdateResult, error) {
if ctx == nil {
ctx = context.Background()
}
f, err := transformDocument(coll.registry, filter)
if err != nil {
return nil, err
}
r, err := transformDocument(coll.registry, replacement)
if err != nil {
return nil, err
}
if len(r) > 0 && strings.HasPrefix(r[0].Key, "$") {
return nil, errors.New("replacement document cannot contains keys beginning with '$")
}
sess := sessionFromContext(ctx)
err = coll.client.ValidSession(sess)
if err != nil {
return nil, err
}
updateOptions := make([]*options.UpdateOptions, 0, len(opts))
for _, opt := range opts {
uOpts := options.Update()
uOpts.BypassDocumentValidation = opt.BypassDocumentValidation
uOpts.Collation = opt.Collation
uOpts.Upsert = opt.Upsert
updateOptions = append(updateOptions, uOpts)
}
return coll.updateOrReplaceOne(ctx, f, r, sess, updateOptions...)
}
// Aggregate runs an aggregation framework pipeline.
//
// See https://docs.mongodb.com/manual/aggregation/.
func (coll *Collection) Aggregate(ctx context.Context, pipeline interface{},
opts ...*options.AggregateOptions) (*Cursor, error) {
if ctx == nil {
ctx = context.Background()
}
pipelineArr, err := transformAggregatePipeline(coll.registry, pipeline)
if err != nil {
return nil, err
}
aggOpts := options.MergeAggregateOptions(opts...)
sess := sessionFromContext(ctx)
err = coll.client.ValidSession(sess)
if err != nil {
return nil, err
}
wc := coll.writeConcern
if sess != nil && sess.TransactionRunning() {
wc = nil
}
rc := coll.readConcern
if sess != nil && (sess.TransactionInProgress()) {
rc = nil
}
oldns := coll.namespace()
cmd := command.Aggregate{
NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
Pipeline: pipelineArr,
ReadPref: coll.readPreference,
WriteConcern: wc,
ReadConcern: rc,
Session: sess,
Clock: coll.client.clock,
}
batchCursor, err := driver.Aggregate(
ctx, cmd,
coll.client.topology,
coll.readSelector,
coll.writeSelector,
coll.client.id,
coll.client.topology.SessionPool,
coll.registry,
aggOpts,
)
if err != nil {
return nil, replaceTopologyErr(err)
}
cursor, err := newCursor(batchCursor, coll.registry)
return cursor, replaceTopologyErr(err)
}
// Count gets the number of documents matching the filter.
func (coll *Collection) Count(ctx context.Context, filter interface{},
opts ...*options.CountOptions) (int64, error) {
if ctx == nil {
ctx = context.Background()
}
f, err := transformDocument(coll.registry, filter)
if err != nil {
return 0, err
}
sess := sessionFromContext(ctx)
err = coll.client.ValidSession(sess)
if err != nil {
return 0, err
}
rc := coll.readConcern
if sess != nil && (sess.TransactionInProgress()) {
rc = nil
}
oldns := coll.namespace()
cmd := command.Count{
NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
Query: f,
ReadPref: coll.readPreference,
ReadConcern: rc,
Session: sess,
Clock: coll.client.clock,
}
count, err := driver.Count(
ctx, cmd,
coll.client.topology,
coll.readSelector,
coll.client.id,
coll.client.topology.SessionPool,
coll.registry,
opts...,
)
return count, replaceTopologyErr(err)
}
// CountDocuments gets the number of documents matching the filter.
func (coll *Collection) CountDocuments(ctx context.Context, filter interface{},
opts ...*options.CountOptions) (int64, error) {
if ctx == nil {
ctx = context.Background()
}
countOpts := options.MergeCountOptions(opts...)
pipelineArr, err := countDocumentsAggregatePipeline(coll.registry, filter, countOpts)
if err != nil {
return 0, err
}
sess := sessionFromContext(ctx)
err = coll.client.ValidSession(sess)
if err != nil {
return 0, err
}
rc := coll.readConcern
if sess != nil && (sess.TransactionInProgress()) {
rc = nil
}
oldns := coll.namespace()
cmd := command.CountDocuments{
NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
Pipeline: pipelineArr,
ReadPref: coll.readPreference,
ReadConcern: rc,
Session: sess,
Clock: coll.client.clock,
}
count, err := driver.CountDocuments(
ctx, cmd,
coll.client.topology,
coll.readSelector,
coll.client.id,
coll.client.topology.SessionPool,
coll.registry,
countOpts,
)
return count, replaceTopologyErr(err)
}
// EstimatedDocumentCount gets an estimate of the count of documents in a collection using collection metadata.
func (coll *Collection) EstimatedDocumentCount(ctx context.Context,
opts ...*options.EstimatedDocumentCountOptions) (int64, error) {
if ctx == nil {
ctx = context.Background()
}
sess := sessionFromContext(ctx)
err := coll.client.ValidSession(sess)
if err != nil {
return 0, err
}
rc := coll.readConcern
if sess != nil && (sess.TransactionInProgress()) {
rc = nil
}
oldns := coll.namespace()
cmd := command.Count{
NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
Query: bsonx.Doc{},
ReadPref: coll.readPreference,
ReadConcern: rc,
Session: sess,
Clock: coll.client.clock,
}
countOpts := options.Count()
if len(opts) >= 1 {
countOpts = countOpts.SetMaxTime(*opts[len(opts)-1].MaxTime)
}
count, err := driver.Count(
ctx, cmd,
coll.client.topology,
coll.readSelector,
coll.client.id,
coll.client.topology.SessionPool,
coll.registry,
countOpts,
)
return count, replaceTopologyErr(err)
}
// Distinct finds the distinct values for a specified field across a single
// collection.
func (coll *Collection) Distinct(ctx context.Context, fieldName string, filter interface{},
opts ...*options.DistinctOptions) ([]interface{}, error) {
if ctx == nil {
ctx = context.Background()
}
f, err := transformDocument(coll.registry, filter)
if err != nil {
return nil, err
}
sess := sessionFromContext(ctx)
err = coll.client.ValidSession(sess)
if err != nil {
return nil, err
}
rc := coll.readConcern
if sess != nil && (sess.TransactionInProgress()) {
rc = nil
}
oldns := coll.namespace()
cmd := command.Distinct{
NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
Field: fieldName,
Query: f,
ReadPref: coll.readPreference,
ReadConcern: rc,
Session: sess,
Clock: coll.client.clock,
}
res, err := driver.Distinct(
ctx, cmd,
coll.client.topology,
coll.readSelector,
coll.client.id,
coll.client.topology.SessionPool,
opts...,
)
if err != nil {
return nil, replaceTopologyErr(err)
}
return res.Values, nil
}
// Find finds the documents matching a model.
func (coll *Collection) Find(ctx context.Context, filter interface{},
opts ...*options.FindOptions) (*Cursor, error) {
if ctx == nil {
ctx = context.Background()
}
f, err := transformDocument(coll.registry, filter)
if err != nil {
return nil, err
}
sess := sessionFromContext(ctx)
err = coll.client.ValidSession(sess)
if err != nil {
return nil, err
}
rc := coll.readConcern
if sess != nil && (sess.TransactionInProgress()) {
rc = nil
}
oldns := coll.namespace()
cmd := command.Find{
NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
Filter: f,
ReadPref: coll.readPreference,
ReadConcern: rc,
Session: sess,
Clock: coll.client.clock,
}
batchCursor, err := driver.Find(
ctx, cmd,
coll.client.topology,
coll.readSelector,
coll.client.id,
coll.client.topology.SessionPool,
coll.registry,
opts...,
)
if err != nil {
return nil, replaceTopologyErr(err)
}
cursor, err := newCursor(batchCursor, coll.registry)
return cursor, replaceTopologyErr(err)
}
// FindOne returns up to one document that matches the model.
func (coll *Collection) FindOne(ctx context.Context, filter interface{},
opts ...*options.FindOneOptions) *SingleResult {
if ctx == nil {
ctx = context.Background()
}
f, err := transformDocument(coll.registry, filter)
if err != nil {
return &SingleResult{err: err}
}
sess := sessionFromContext(ctx)
err = coll.client.ValidSession(sess)
if err != nil {
return &SingleResult{err: err}
}
rc := coll.readConcern
if sess != nil && (sess.TransactionInProgress()) {
rc = nil
}
oldns := coll.namespace()
cmd := command.Find{
NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
Filter: f,
ReadPref: coll.readPreference,
ReadConcern: rc,
Session: sess,
Clock: coll.client.clock,
}
findOpts := make([]*options.FindOptions, len(opts))
for i, opt := range opts {
findOpts[i] = &options.FindOptions{
AllowPartialResults: opt.AllowPartialResults,
BatchSize: opt.BatchSize,
Collation: opt.Collation,
Comment: opt.Comment,
CursorType: opt.CursorType,
Hint: opt.Hint,
Max: opt.Max,
MaxAwaitTime: opt.MaxAwaitTime,
Min: opt.Min,
NoCursorTimeout: opt.NoCursorTimeout,
OplogReplay: opt.OplogReplay,
Projection: opt.Projection,
ReturnKey: opt.ReturnKey,
ShowRecordID: opt.ShowRecordID,
Skip: opt.Skip,
Snapshot: opt.Snapshot,
Sort: opt.Sort,
}
}
batchCursor, err := driver.Find(
ctx, cmd,
coll.client.topology,
coll.readSelector,
coll.client.id,
coll.client.topology.SessionPool,
coll.registry,
findOpts...,
)
if err != nil {
return &SingleResult{err: replaceTopologyErr(err)}
}
cursor, err := newCursor(batchCursor, coll.registry)
return &SingleResult{cur: cursor, reg: coll.registry, err: replaceTopologyErr(err)}
}
// FindOneAndDelete find a single document and deletes it, returning the
// original in result.
func (coll *Collection) FindOneAndDelete(ctx context.Context, filter interface{},
opts ...*options.FindOneAndDeleteOptions) *SingleResult {
if ctx == nil {
ctx = context.Background()
}
f, err := transformDocument(coll.registry, filter)
if err != nil {
return &SingleResult{err: err}
}
sess := sessionFromContext(ctx)
err = coll.client.ValidSession(sess)
if err != nil {
return &SingleResult{err: err}
}
oldns := coll.namespace()
wc := coll.writeConcern
if sess != nil && sess.TransactionRunning() {
wc = nil
}
cmd := command.FindOneAndDelete{
NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
Query: f,
WriteConcern: wc,
Session: sess,
Clock: coll.client.clock,
}
res, err := driver.FindOneAndDelete(
ctx, cmd,
coll.client.topology,
coll.writeSelector,
coll.client.id,
coll.client.topology.SessionPool,
coll.client.retryWrites,
coll.registry,
opts...,
)
if err != nil {
return &SingleResult{err: replaceTopologyErr(err)}
}
return &SingleResult{rdr: res.Value, reg: coll.registry}
}
// FindOneAndReplace finds a single document and replaces it, returning either
// the original or the replaced document.
func (coll *Collection) FindOneAndReplace(ctx context.Context, filter interface{},
replacement interface{}, opts ...*options.FindOneAndReplaceOptions) *SingleResult {
if ctx == nil {
ctx = context.Background()
}
f, err := transformDocument(coll.registry, filter)
if err != nil {
return &SingleResult{err: err}
}
r, err := transformDocument(coll.registry, replacement)
if err != nil {
return &SingleResult{err: err}
}
if len(r) > 0 && strings.HasPrefix(r[0].Key, "$") {
return &SingleResult{err: errors.New("replacement document cannot contains keys beginning with '$")}
}
sess := sessionFromContext(ctx)
err = coll.client.ValidSession(sess)
if err != nil {
return &SingleResult{err: err}
}
wc := coll.writeConcern
if sess != nil && sess.TransactionRunning() {
wc = nil
}
oldns := coll.namespace()
cmd := command.FindOneAndReplace{
NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
Query: f,
Replacement: r,
WriteConcern: wc,
Session: sess,
Clock: coll.client.clock,
}
res, err := driver.FindOneAndReplace(
ctx, cmd,
coll.client.topology,
coll.writeSelector,
coll.client.id,
coll.client.topology.SessionPool,
coll.client.retryWrites,
coll.registry,
opts...,
)
if err != nil {
return &SingleResult{err: replaceTopologyErr(err)}
}
return &SingleResult{rdr: res.Value, reg: coll.registry}
}
// FindOneAndUpdate finds a single document and updates it, returning either
// the original or the updated.
func (coll *Collection) FindOneAndUpdate(ctx context.Context, filter interface{},
update interface{}, opts ...*options.FindOneAndUpdateOptions) *SingleResult {
if ctx == nil {
ctx = context.Background()
}
f, err := transformDocument(coll.registry, filter)
if err != nil {
return &SingleResult{err: err}
}
u, err := transformDocument(coll.registry, update)
if err != nil {
return &SingleResult{err: err}
}
err = ensureDollarKey(u)
if err != nil {
return &SingleResult{
err: err,
}
}
sess := sessionFromContext(ctx)
err = coll.client.ValidSession(sess)
if err != nil {
return &SingleResult{err: err}
}
wc := coll.writeConcern
if sess != nil && sess.TransactionRunning() {
wc = nil
}
oldns := coll.namespace()
cmd := command.FindOneAndUpdate{
NS: command.Namespace{DB: oldns.DB, Collection: oldns.Collection},
Query: f,
Update: u,
WriteConcern: wc,
Session: sess,
Clock: coll.client.clock,
}
res, err := driver.FindOneAndUpdate(
ctx, cmd,
coll.client.topology,
coll.writeSelector,
coll.client.id,
coll.client.topology.SessionPool,
coll.client.retryWrites,
coll.registry,
opts...,
)
if err != nil {
return &SingleResult{err: replaceTopologyErr(err)}
}
return &SingleResult{rdr: res.Value, reg: coll.registry}
}
// Watch returns a change stream cursor used to receive notifications of changes to the collection.
//
// This method is preferred to running a raw aggregation with a $changeStream stage because it
// supports resumability in the case of some errors. The collection must have read concern majority or no read concern
// for a change stream to be created successfully.
func (coll *Collection) Watch(ctx context.Context, pipeline interface{},
opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
return newChangeStream(ctx, coll, pipeline, opts...)
}
// Indexes returns the index view for this collection.
func (coll *Collection) Indexes() IndexView {
return IndexView{coll: coll}
}
// Drop drops this collection from database.
func (coll *Collection) Drop(ctx context.Context) error {
if ctx == nil {
ctx = context.Background()
}
sess := sessionFromContext(ctx)
err := coll.client.ValidSession(sess)
if err != nil {
return err
}
wc := coll.writeConcern
if sess != nil && sess.TransactionRunning() {
wc = nil
}
cmd := command.DropCollection{
DB: coll.db.name,
Collection: coll.name,
WriteConcern: wc,
Session: sess,
Clock: coll.client.clock,
}
_, err = driver.DropCollection(
ctx, cmd,
coll.client.topology,
coll.writeSelector,
coll.client.id,
coll.client.topology.SessionPool,
)
if err != nil && !command.IsNotFound(err) {
return replaceTopologyErr(err)
}
return nil
}
1
https://gitee.com/qingwenjie/mongo-go-driver.git
git@gitee.com:qingwenjie/mongo-go-driver.git
qingwenjie
mongo-go-driver
mongo-go-driver
v0.3.0

搜索帮助