Fetch the repository succeeded.
package component
import (
"bytes"
"context"
"fmt"
"gitee.com/kzangv/gsf-fof/component/define"
"gitee.com/kzangv/gsf-fof/logger"
"github.com/urfave/cli/v2"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
"time"
)
type MongoApiLog func(tbl, op string, args [][2]interface{}) func()
type MongoClient struct {
DB *mongo.Database
Cfg define.EntityMongo
ref *Mongo
}
func (c *MongoClient) Ref() *Mongo {
return c.ref
}
func (c *MongoClient) CliFlags(name string) []cli.Flag {
return []cli.Flag{
&cli.StringFlag{
Name: fmt.Sprintf("mongo-%s-dsn", name),
Usage: fmt.Sprintf("Mongo(%s) DSN (format: `mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]`)", name),
Action: func(_ *cli.Context, dsn string) error {
c.Cfg.DSN = dsn
return nil
},
},
}
}
func (c *MongoClient) Init(env int, cc *Mongo) {
if c.Cfg.DSN == "" {
c.Cfg.DSN = fmt.Sprintf("mongodb://%s:%s@%s/%s", c.Cfg.User, c.Cfg.Password, c.Cfg.Addr, c.Cfg.Database)
}
c.ref = cc
}
func (c *MongoClient) Config() *define.EntityMongo {
return &c.Cfg
}
func (c *MongoClient) Load(name string) error {
dbName := name
if pv, err := connstring.ParseAndValidate(c.Cfg.DSN); err == nil {
if pv.Database != "" {
dbName = pv.Database
}
} else {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
connect, err := mongo.Connect(ctx, options.Client().
ApplyURI(c.Cfg.DSN).
SetMaxConnIdleTime(time.Second*time.Duration(c.ref.Cfg.MaxIdleTime)).
SetTimeout(time.Second*time.Duration(c.ref.Cfg.ReqMaxTime)).
SetServerSelectionTimeout(time.Second*time.Duration(c.ref.Cfg.SelectionTime)).
SetMinPoolSize(uint64(c.ref.Cfg.MinPoolSize)).
SetMaxPoolSize(uint64(c.ref.Cfg.MaxPoolSize)))
if err == nil {
err = connect.Ping(ctx, readpref.Primary())
}
if err != nil {
err = fmt.Errorf("Load Mongo (%s) Failed (Error: %s) ", name, err.Error())
} else {
c.DB = connect.Database(dbName)
}
return err
}
func (c *MongoClient) FindAll(list interface{}, table string, filter interface{}, opts ...*options.FindOptions) error {
return c.FindAllLog(c.ApiLog, list, table, filter, opts...)
}
func (c *MongoClient) FindAllLog(l MongoApiLog, list interface{}, table string, filter interface{}, opts ...*options.FindOptions) error {
defer l(table, "FindAll", [][2]interface{}{{"filter", filter}})()
cursor, err := c.DB.Collection(table).
Find(ctx, filter, opts...)
if err != nil {
c.ref.Log.Error(err.Error())
return err
}
//延迟关闭游标
defer func() {
if err = cursor.Close(context.TODO()); err != nil {
c.ref.Log.Error(err.Error())
}
}()
// 获取数据
err = cursor.All(ctx, list)
if err != nil {
return err
}
return nil
}
func (c *MongoClient) FindOne(list interface{}, table string, filter interface{}, opts ...*options.FindOneOptions) error {
return c.FindOneLog(c.ApiLog, list, table, filter, opts...)
}
func (c *MongoClient) FindOneLog(l MongoApiLog, data interface{}, table string, filter interface{}, opts ...*options.FindOneOptions) error {
defer l(table, "FindOne", [][2]interface{}{{"filter", filter}})()
result := c.DB.Collection(table).FindOne(ctx, filter, opts...)
// 获取数据
err := result.Decode(data)
if err != nil {
return err
}
return nil
}
func (c *MongoClient) Count(table string, pipeline interface{}, opts ...*options.CountOptions) int64 {
return c.CountLog(c.ApiLog, table, pipeline, opts...)
}
func (c *MongoClient) CountLog(l MongoApiLog, table string, pipeline interface{}, opts ...*options.CountOptions) int64 {
defer l(table, "Count", [][2]interface{}{{"pipeline", pipeline}})()
count, err := c.DB.Collection(table).
CountDocuments(ctx, pipeline, opts...)
if err != nil {
return 0
}
return count
}
func (c *MongoClient) AggregateAll(list interface{}, table string, pipeline interface{}, opts ...*options.AggregateOptions) error {
return c.AggregateAllLog(c.ApiLog, list, table, pipeline, opts...)
}
func (c *MongoClient) AggregateAllLog(l MongoApiLog, list interface{}, table string, pipeline interface{}, opts ...*options.AggregateOptions) error {
defer l(table, "Aggregate", [][2]interface{}{{"pipeline", pipeline}})()
cursor, err := c.DB.Collection(table).
Aggregate(ctx, pipeline, opts...)
if err != nil {
c.ref.Log.Error(err.Error())
return err
}
//延迟关闭游标
defer func() {
if err = cursor.Close(context.TODO()); err != nil {
c.ref.Log.Error(err.Error())
}
}()
// 获取数据
err = cursor.All(ctx, list)
if err != nil {
return err
}
return nil
}
func (c *MongoClient) Update(table string, filter, data interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) {
return c.UpdateLog(c.ApiLog, table, filter, data, opts...)
}
func (c *MongoClient) UpdateLog(l MongoApiLog, table string, filter, data interface{}, opts ...*options.UpdateOptions) (*mongo.UpdateResult, error) {
defer l(table, "Update", [][2]interface{}{{"filter", filter}, {"update", data}})()
return c.DB.Collection(table).UpdateOne(ctx, filter, data, opts...)
}
func (c *MongoClient) Insert(table string, data interface{}, opts ...*options.InsertOneOptions) (*mongo.InsertOneResult, error) {
return c.InsertLog(c.ApiLog, table, data, opts...)
}
func (c *MongoClient) InsertLog(l MongoApiLog, table string, data interface{}, opts ...*options.InsertOneOptions) (*mongo.InsertOneResult, error) {
defer l(table, "Insert", [][2]interface{}{{"insert", data}})()
return c.DB.Collection(table).InsertOne(ctx, data, opts...)
}
func (c *MongoClient) ApiLog(tbl, op string, args [][2]interface{}) func() {
bTm := time.Now()
return func() {
spendTime := time.Now().Sub(bTm) / time.Millisecond
var handler func(string, ...interface{})
if spendTime > time.Duration(c.ref.Cfg.SlowThreshold) { // 查询超过阈值
handler = c.ref.Log.WarnForce
} else if c.ref.Log.Level() == logger.Info {
handler = c.ref.Log.InfoForce
}
if handler != nil {
var format bytes.Buffer
format.WriteString("Mongo[%d ms]: %s[%s] -- ")
fArgs := make([]interface{}, 0, 2+len(args)*2)
fArgs = append(fArgs, spendTime, tbl, op)
fData := make(map[string]interface{}, len(args))
for i, l := 0, len(args); i < l; i++ {
fData[args[i][0].(string)] = args[i][1]
}
argStr, err := bson.MarshalExtJSON(fData, false, false)
if err != nil {
format.WriteString("(error): %s;")
fArgs = append(fArgs, err.Error())
} else {
format.WriteString("%s;")
fArgs = append(fArgs, string(argStr))
}
handler(format.String(), fArgs...)
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。