1 Star 0 Fork 0

Cruvie Kang / kk_go_kit

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
kk_minio.go 5.81 KB
一键复制 编辑 原始数据 按行查看 历史
cruvie 提交于 2024-04-11 21:21 . update
package kk_minio
import (
"context"
"errors"
"gitee.com/cruvie/kk_go_kit/kk_file"
"gitee.com/cruvie/kk_go_kit/kk_stage"
"gitee.com/cruvie/kk_go_kit/kk_sync"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"golang.org/x/sync/errgroup"
"io"
"log/slog"
)
var MinioClient *minio.Client
func InitMinIO(stage *kk_stage.Stage, endpoint string, accessKeyID string, secretAccessKey string, useSSL bool) {
// Initialize minio tcp_msg_client object.
client, err := minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),
Secure: useSSL,
})
if err != nil {
slog.Error("connect to minio failed", kk_stage.NewLog(stage).Error(err).Args()...)
panic(nil)
}
MinioClient = client
}
type OPObject struct {
BucketName string
ObjectName string
LocalFilePath string
Reader io.Reader
ObjectSize int64
ContentType string
}
//Api https://min.io/docs/minio/linux/developers/go/API.html#PutObject
// UploadFile localFilePath 要上传的本地的文件路径 objectName 文件上传的目的路径(路径+文件名)
func UploadFile(stage *kk_stage.Stage, opObject OPObject) (err error) {
if opObject.BucketName == "" || opObject.ObjectName == "" || opObject.LocalFilePath == "" || opObject.ContentType == "" {
return errors.New("required parameter is empty")
}
uploadInfo, err := MinioClient.FPutObject(context.Background(),
opObject.BucketName,
opObject.ObjectName,
opObject.LocalFilePath,
minio.PutObjectOptions{ContentType: opObject.ContentType})
if err != nil {
slog.Error("upload file failed", kk_stage.NewLog(stage).Error(err).Args()...)
} else {
slog.Info("upload file success", kk_stage.NewLog(stage).Error(err).
Any("object", opObject).
Any("size(MB)", kk_file.ConvertFileSizeMB(uploadInfo.Size)).Args()...)
}
return err
}
// UploadFileStream objectName 文件上传的目的路径(路径+文件名)
func UploadFileStream(stage *kk_stage.Stage, opObject OPObject) (err error) {
if opObject.BucketName == "" || opObject.ObjectName == "" || opObject.Reader == nil || opObject.ObjectSize == 0 || opObject.ContentType == "" {
return errors.New("required parameter is empty")
}
uploadInfo, err := MinioClient.PutObject(
context.Background(),
opObject.BucketName,
opObject.ObjectName,
opObject.Reader,
opObject.ObjectSize,
minio.PutObjectOptions{ContentType: opObject.ContentType})
if err != nil {
slog.Error("upload fileStream failed", kk_stage.NewLog(stage).Any("object", opObject).Error(err).Args()...)
} else {
slog.Info("upload fileStream success", kk_stage.NewLog(stage).
Any("object", opObject).Error(err).
Any("size(MB)", kk_file.ConvertFileSizeMB(uploadInfo.Size)).Args()...)
}
return err
}
func GetFileStream(stage *kk_stage.Stage, opObject OPObject) (io.Reader, error) {
if opObject.BucketName == "" || opObject.ObjectName == "" {
return nil, errors.New("required parameter is empty")
}
object, err := MinioClient.GetObject(
context.Background(),
opObject.BucketName,
opObject.ObjectName,
minio.GetObjectOptions{})
defer func(object *minio.Object) {
err := object.Close()
if err != nil {
slog.Error("close object failed", kk_stage.NewLog(stage).Any("object", opObject).Error(err).Args()...)
}
}(object)
if err != nil {
slog.Error("get fileStream failed", kk_stage.NewLog(stage).Any("object", opObject).Error(err).Args()...)
return nil, err
} else {
return object, nil
}
}
func RemoveObject(stage *kk_stage.Stage, opObject OPObject) (err error) {
if opObject.BucketName == "" || opObject.ObjectName == "" {
return errors.New("required parameter is empty")
}
opts := minio.RemoveObjectOptions{
GovernanceBypass: true,
}
err = MinioClient.RemoveObject(
context.Background(),
opObject.BucketName,
opObject.ObjectName,
opts)
if err != nil {
slog.Error("RemoveObject failed", kk_stage.NewLog(stage).Any("object", opObject).Error(err).Args()...)
} else {
slog.Info("RemoveObject success", kk_stage.NewLog(stage).Any("object", opObject).Error(err).Args()...)
}
return err
}
func RemoveIncompleteObject(stage *kk_stage.Stage, opObject OPObject) (err error) {
if opObject.BucketName == "" || opObject.ObjectName == "" {
return errors.New("required parameter is empty")
}
err = MinioClient.RemoveIncompleteUpload(
context.Background(),
opObject.BucketName,
opObject.ObjectName)
if err != nil {
slog.Error("RemoveIncompleteUpload failed", kk_stage.NewLog(stage).Error(err).Any("object", opObject).Args()...)
} else {
slog.Info("RemoveIncompleteUpload success", kk_stage.NewLog(stage).Error(err).Any("object", opObject).Args()...)
}
return err
}
// StatObject verifies if object exists, you have permission to access it and returns information about the object.
func StatObject(stage *kk_stage.Stage, opObject OPObject) (err error, info *minio.ObjectInfo) {
if opObject.BucketName == "" || opObject.ObjectName == "" {
return errors.New("required parameter is empty"), nil
}
opts := minio.StatObjectOptions{}
objectInfo, err := MinioClient.StatObject(
context.Background(),
opObject.BucketName,
opObject.ObjectName,
opts)
if err != nil {
slog.Error("StatObject failed", kk_stage.NewLog(stage).Error(err).Any("object", opObject).Args()...)
return err, nil
} else {
return nil, &objectInfo
}
}
func BatchOperator(stage *kk_stage.Stage, opObjects []OPObject, operator func(stage *kk_stage.Stage, opObject OPObject) (err error)) (err error) {
var g errgroup.Group
errs := kk_sync.NewError()
for _, object := range opObjects {
g.Go(func() error {
err := operator(stage, object)
errs.Join(err)
return nil
})
}
_ = g.Wait()
return errs.Error()
}
func RemoveObjectAndIncompleteObject(stage *kk_stage.Stage, opObject OPObject) (err error) {
err1 := RemoveObject(stage, opObject)
err2 := RemoveIncompleteObject(stage, opObject)
return errors.Join(err1, err2)
}
1
https://gitee.com/cruvie/kk_go_kit.git
git@gitee.com:cruvie/kk_go_kit.git
cruvie
kk_go_kit
kk_go_kit
6c1fc534a942

搜索帮助