2 Star 0 Fork 0

wuzheng0709 / backend-gopkg

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
handler.go 19.50 KB
一键复制 编辑 原始数据 按行查看 历史
HCY 提交于 2024-05-13 15:15 . [REV] projectApi

package ecm
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"gitee.com/wuzheng0709/backend-gopkg/infrastructure/config"
"gitee.com/wuzheng0709/backend-gopkg/infrastructure/connector/redis"
"gitee.com/wuzheng0709/backend-gopkg/infrastructure/pkg/gin/log"
"gitee.com/wuzheng0709/backend-gopkg/infrastructure/pkg/httpclient"
"gitee.com/wuzheng0709/backend-gopkg/infrastructure/pkg/toolfunc"
"github.com/gin-gonic/gin"
uuid "github.com/satori/go.uuid"
"io"
"math/rand"
"mime/multipart"
"net/http"
"net/url"
"path"
"path/filepath"
"strings"
"time"
"gitee.com/wuzheng0709/backend-gopkg/infrastructure/pkg/filesystem/chunk"
"gitee.com/wuzheng0709/backend-gopkg/infrastructure/pkg/filesystem/chunk/backoff"
"gitee.com/wuzheng0709/backend-gopkg/infrastructure/pkg/filesystem/fsctx"
"gitee.com/wuzheng0709/backend-gopkg/infrastructure/pkg/filesystem/response"
model "gitee.com/wuzheng0709/backend-gopkg/infrastructure/pkg/models"
"gitee.com/wuzheng0709/backend-gopkg/infrastructure/pkg/request"
"gitee.com/wuzheng0709/backend-gopkg/infrastructure/pkg/serializer"
"gitee.com/wuzheng0709/backend-gopkg/infrastructure/pkg/util"
"github.com/HFO4/aliyun-oss-go-sdk/oss"
)
const RedisECMPre = "ecm_"
const ecmTokenPre = "ecm_token_"
// UploadPolicy ECM上传策略
type UploadPolicy struct {
Expiration string `json:"expiration"`
Conditions []interface{} `json:"conditions"`
}
type CheckAndCreateDocInfoResp struct {
AvailableSizes int `json:"availableSizes"`
Data CheckAndCreateDocInfoData `json:"data"`
Reason string `json:"reason"`
Result int `json:"result"`
}
type CheckAndCreateDocInfoData struct {
FileId int `json:"FileId"`
FileVerId int `json:"FileVerId"`
IsSupportMultiTd bool `json:"IsSupportMultiTd"`
OperaterId int `json:"OperaterId"`
ParentFolderId int `json:"ParentFolderId"`
RegionHash string `json:"RegionHash"`
RegionId int `json:"RegionId"`
RegionType int `json:"RegionType"`
RegionUrl string `json:"RegionUrl"`
StoragePlatform int `json:"StoragePlatform"`
}
// CallbackPolicy 回调策略
type CallbackPolicy struct {
CallbackURL string `json:"callbackUrl"`
CallbackBody string `json:"callbackBody"`
CallbackBodyType string `json:"callbackBodyType"`
}
// CreateUploadSessionService 获取上传凭证服务
type CreateUploadSessionService struct {
Path string `json:"path" binding:"required"`
Size uint64 `json:"size" binding:"required"`
Name string `json:"name" binding:"required"`
Type string `json:"type" binding:"required"`
Md5 string `json:"md5"`
PublicRoot string `json:"public_root"`
}
// Driver ECM策略适配器
type Driver struct {
Policy *model.Policy
client *oss.Client
bucket *oss.Bucket
HTTPClient request.Client
UrlVal url.Values
GinCtx *gin.Context
ReqParm CreateUploadSessionService
}
func (handler *Driver) Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error) {
//TODO implement me
return "", nil
}
type key int
const (
chunkRetrySleep = time.Duration(5) * time.Second
// MultiPartUploadThreshold 服务端使用分片上传的阈值
MultiPartUploadThreshold uint64 = 5 * (1 << 30) // 5GB
// VersionID 文件版本标识
VersionID key = iota
)
func NewDriver(policy *model.Policy, req CreateUploadSessionService, GinC *gin.Context) (*Driver, error) {
if policy.OptionsSerialized.ChunkSize == 0 {
policy.OptionsSerialized.ChunkSize = 25 << 20 // 25 MB
}
driver := &Driver{
Policy: policy,
HTTPClient: request.NewClient(),
GinCtx: GinC,
ReqParm: req,
}
return driver, nil
}
// CORS 创建跨域策略
func (handler *Driver) CORS() error {
return handler.client.SetBucketCORS(handler.Policy.BucketName, []oss.CORSRule{
{
AllowedOrigin: []string{"*"},
AllowedMethod: []string{
"GET",
"POST",
"PUT",
"DELETE",
"HEAD",
},
ExposeHeader: []string{},
AllowedHeader: []string{"*"},
MaxAgeSeconds: 3600,
},
})
}
// InitECMClient 初始化ECM鉴权客户端
func (handler *Driver) InitECMClient(forceUsePublicEndpoint bool, cuss CreateUploadSessionService) error {
return nil
}
// List 列出ECM上的文件
func (handler *Driver) List(ctx context.Context, base string, recursive bool) ([]response.Object, error) {
// 列取文件
base = strings.TrimPrefix(base, "/")
if base != "" {
base += "/"
}
var (
delimiter string
marker string
objects []oss.ObjectProperties
commons []string
)
if !recursive {
delimiter = "/"
}
for {
subRes, err := handler.bucket.ListObjects(oss.Marker(marker), oss.Prefix(base),
oss.MaxKeys(1000), oss.Delimiter(delimiter))
if err != nil {
return nil, err
}
objects = append(objects, subRes.Objects...)
commons = append(commons, subRes.CommonPrefixes...)
marker = subRes.NextMarker
if marker == "" {
break
}
}
// 处理列取结果
res := make([]response.Object, 0, len(objects)+len(commons))
// 处理目录
for _, object := range commons {
rel, err := filepath.Rel(base, object)
if err != nil {
continue
}
res = append(res, response.Object{
Name: path.Base(object),
RelativePath: filepath.ToSlash(rel),
Size: 0,
IsDir: true,
LastModify: time.Now(),
})
}
// 处理文件
for _, object := range objects {
rel, err := filepath.Rel(base, object.Key)
if err != nil {
continue
}
res = append(res, response.Object{
Name: path.Base(object.Key),
Source: object.Key,
RelativePath: filepath.ToSlash(rel),
Size: uint64(object.Size),
IsDir: false,
LastModify: object.LastModified,
})
}
return res, nil
}
// Get 获取文件
func (handler *Driver) Get(ctx context.Context, path string) (response.RSCloser, error) {
//// 通过VersionID禁止缓存
//ctx = context.WithValue(ctx, VersionID, time.Now().UnixNano())
//
//// 尽可能使用私有 Endpoint
//ctx = context.WithValue(ctx, fsctx.ForceUsePublicEndpointCtx, false)
//
//// 获取文件源地址
//downloadURL, err := handler.Source(
// ctx,
// path,
// url.URL{},
// int64(model.GetIntSetting("preview_timeout", 60)),
// false,
// 0,
//)
//if err != nil {
// return nil, err
//}
//
//// 获取文件数据流
//resp, err := handler.HTTPClient.Request(
// "GET",
// downloadURL,
// nil,
// request.WithContext(ctx),
// request.WithTimeout(time.Duration(0)),
//).CheckHTTPResponse(200).GetRSCloser()
//if err != nil {
// return nil, err
//}
//
//resp.SetFirstFakeChunk()
//
//// 尝试自主获取文件大小
//if file, ok := ctx.Value(fsctx.FileModelCtx).(model.File); ok {
// resp.SetContentLength(int64(file.Size))
//}
//
return nil, nil
}
// Put 将文件流保存到指定目录
func (handler *Driver) Put(ctx context.Context, file fsctx.FileHeader) error {
defer file.Close()
fileInfo := file.Info()
// 凭证有效期
credentialTTL := model.GetIntSetting("upload_session_timeout", 3600)
// 是否允许覆盖
overwrite := fileInfo.Mode&fsctx.Overwrite == fsctx.Overwrite
options := []oss.Option{
oss.Expires(time.Now().Add(time.Duration(credentialTTL) * time.Second)),
oss.ForbidOverWrite(!overwrite),
}
// 小文件直接上传
if fileInfo.Size < MultiPartUploadThreshold {
return handler.bucket.PutObject(fileInfo.SavePath, file, options...)
}
// 超过阈值时使用分片上传
imur, err := handler.bucket.InitiateMultipartUpload(fileInfo.SavePath, options...)
if err != nil {
return fmt.Errorf("failed to initiate multipart upload: %w", err)
}
chunks := chunk.NewChunkGroup(file, handler.Policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{
Max: model.GetIntSetting("chunk_retries", 5),
Sleep: chunkRetrySleep,
}, model.IsTrueVal(model.GetSettingByName("use_temp_chunk_buffer")))
uploadFunc := func(current *chunk.ChunkGroup, content io.Reader) error {
_, err = handler.bucket.UploadPart(imur, content, current.Length(), current.Index()+1)
return err
}
for chunks.Next() {
if err = chunks.Process(uploadFunc); err != nil {
return fmt.Errorf("failed to upload chunk #%d: %w", chunks.Index(), err)
}
}
_, err = handler.bucket.CompleteMultipartUpload(imur, oss.CompleteAll("yes"), oss.ForbidOverWrite(!overwrite))
return err
}
// Delete 删除一个或多个文件,
// 返回未删除的文件
func (handler *Driver) Delete(ctx context.Context, files []string) ([]string, error) {
// 删除文件
delRes, err := handler.bucket.DeleteObjects(files)
if err != nil {
return files, err
}
// 统计未删除的文件
failed := util.SliceDifference(files, delRes.DeletedObjects)
if len(failed) > 0 {
return failed, errors.New("删除失败")
}
return []string{}, nil
}
// Thumb 获取文件缩略图
func (handler *Driver) Thumb(ctx context.Context, path string) (*response.ContentResponse, error) {
return nil, nil
}
func (handler *Driver) signSourceURL(ctx context.Context, path string, ttl int64, options []oss.Option) (string, error) {
signedURL, err := handler.bucket.SignURL(path, oss.HTTPGet, ttl, options...)
if err != nil {
return "", err
}
// 将最终生成的签名URL域名换成用户自定义的加速域名(如果有)
finalURL, err := url.Parse(signedURL)
if err != nil {
return "", err
}
// 公有空间替换掉Key及不支持的头
if !handler.Policy.IsPrivate {
query := finalURL.Query()
query.Del("ECMAccessKeyId")
query.Del("Signature")
query.Del("response-content-disposition")
query.Del("x-oss-traffic-limit")
finalURL.RawQuery = query.Encode()
}
if handler.Policy.BaseURL != "" {
cdnURL, err := url.Parse(handler.Policy.BaseURL)
if err != nil {
return "", err
}
finalURL.Host = cdnURL.Host
finalURL.Scheme = cdnURL.Scheme
}
return finalURL.String(), nil
}
type EcmUploadResp struct {
UploadId string `json:"uploadId"`
Filename string `json:"filename"`
Status string `json:"status"`
Message string `json:"message"`
Percent int `json:"percent"`
ErrorCode int `json:"errorCode"`
Tag string `json:"tag"`
}
// Token 获取上传策略和认证Token
func (handler *Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession, file fsctx.FileHeader) (*serializer.UploadCredential, error) {
// step1: 获取
// 获取 token
emcToken, err := GetEcmUserLoginIntegrationByUserLoginNameV1(handler.GinCtx, handler.Policy.Server, handler.Policy.SecretKey)
if err != nil {
return nil, err
}
nameExts := strings.Split(handler.ReqParm.Name, ".")
checkForm := make(url.Values)
checkForm.Set("token", emcToken)
checkForm.Set("folderId", handler.ReqParm.PublicRoot) // 企业内容库默认根目录Id为1
checkForm.Set("fileName", handler.ReqParm.Name)
checkForm.Set("fullPath", handler.ReqParm.Path)
checkForm.Set("size", toolfunc.Uint642String(handler.ReqParm.Size))
checkForm.Set("fileModel", "UPDATE")
//checkForm.Set("strategy", "overlayLatestVersion")
checkForm.Set("strategy", "majorUpgrade")
if len(nameExts) == 1 {
checkForm.Set("ext", "") // 通过文件名获取后缀
} else {
checkForm.Set("ext", nameExts[len(nameExts)-1])
}
res, err := http.PostForm(fmt.Sprintf("%s/WebCore?module=RegionDocOperationApi&fun=CheckAndCreateDocInfo", handler.Policy.Server), checkForm)
if err != nil {
return nil, err
}
resBody, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}
var checkResp CheckAndCreateDocInfoResp
err = json.Unmarshal(resBody, &checkResp)
if err != nil {
return nil, err
}
if checkResp.Result != 0 {
return nil, errors.New(checkResp.Reason)
}
// 拼接上传链接
dividend := handler.ReqParm.Size
divisor := handler.Policy.OptionsSerialized.ChunkSize
chunki := 0
chunks := dividend/divisor + 1
var urls []string
for dividend > 0 {
dividend, divisor = divideAndSplit(dividend, divisor)
uploadId := uuid.NewV4().String()
fields := make(map[string]interface{}, 10)
fields["uploadUrl"] = fmt.Sprintf("%s/document/upload?token=%s", handler.Policy.Server, emcToken)
fields["regionHash"] = checkResp.Data.RegionHash
fields["uploadId"] = uploadId
fields["regionId"] = fmt.Sprintf("%d", checkResp.Data.RegionId)
fields["fileName"] = handler.ReqParm.Name
fields["chunks"] = toolfunc.Uint642String(chunks)
fields["chunk"] = toolfunc.Int2String(chunki)
fields["blockSize"] = toolfunc.Uint642String(divisor)
fields["size"] = toolfunc.Uint642String(handler.ReqParm.Size)
fields["chunkSize"] = handler.Policy.OptionsSerialized.ChunkSize
fields["fileld"] = toolfunc.Int2String(checkResp.Data.FileId)
fields["ecmurl"] = handler.Policy.OptionsSerialized.DownloadUrl
fields["token"] = handler.Policy.OptionsSerialized.Token
if _, err = redis.BatchHashSet(redis.ImRedisDB, RedisECMPre+uploadId, fields, 24*time.Hour); err != nil {
log.Error("上传数据存入失败,err:", err.Error())
return nil, err
}
callBackUrl := fmt.Sprintf("%sapi/knowledge/public/ecm/%s/%s", handler.Policy.OptionsSerialized.CallbackSer, uploadSession.Key, uploadId)
urls = append(urls, callBackUrl)
chunki++
}
return &serializer.UploadCredential{
SessionID: uploadSession.Key,
ChunkSize: handler.Policy.OptionsSerialized.ChunkSize,
UploadID: "",
UploadURLs: urls,
CompleteURL: "",
}, nil
}
// ECM 分片上传文件
func UploadPath2Ecm(fileByte []byte, uploadId, sessionID string) (string, string, error) {
fields, err := redis.HashGetAll(redis.ImRedisDB, RedisECMPre+uploadId)
if err != nil {
return "", "", err
}
if len(fields) > 0 {
// 拼接表单
uploadForm := &bytes.Buffer{}
uploadWriter := multipart.NewWriter(uploadForm)
ecmUrl := fields["ecmurl"]
emcToken := fields["token"]
uploadUrl := fmt.Sprintf("%s/document/upload?token=%s", ecmUrl, emcToken)
var part io.Writer
uploadWriter.WriteField("uploadId", sessionID)
uploadWriter.WriteField("regionHash", fields["regionHash"])
uploadWriter.WriteField("regionId", fields["regionId"])
uploadWriter.WriteField("fileName", fields["fileName"])
uploadWriter.WriteField("chunks", fields["chunks"])
uploadWriter.WriteField("chunk", fields["chunk"])
uploadWriter.WriteField("chunkSize", fields["chunkSize"])
uploadWriter.WriteField("size", fields["size"]) // 文件总大小
uploadWriter.WriteField("blockSize", fields["blockSize"]) // 本次chunk的大小
part, err := uploadWriter.CreateFormFile("file", fields["fileName"])
if err != nil {
return "", "", fmt.Errorf("赋值表单文件流错误:%v", err)
}
part.Write(fileByte)
err = uploadWriter.Close()
if err != nil {
return "", "", fmt.Errorf("关闭Writer错误:%v", err)
}
err = uploadWriter.Close()
if err != nil {
return "", "", fmt.Errorf("关闭Writer错误:%v", err)
}
// 请求ecm存储
header := http.Header{}
header.Set("Content-Type", uploadWriter.FormDataContentType())
resBody, _, err := httpclient.PublicRequireByPost(uploadUrl, header, uploadForm.Bytes())
if err != nil {
return "", "", errors.New(fmt.Sprintf("生成文件上传请求错误:%v", err.Error()))
}
var uploadResult EcmUploadResp
err = json.Unmarshal(resBody, &uploadResult)
if err != nil {
return "", "", err
}
if uploadResult.Status == "Error" {
return "", "", errors.New(fmt.Sprintf("Code:%d, Status:%s, Message:%s", uploadResult.ErrorCode, uploadResult.Status, uploadResult.Message))
}
if uploadResult.Status == "End" {
// 写入数据库中 更新上传状态
ecmSourceName := fmt.Sprintf("downLoad/Index?fileIds=%s&r=%d&token=%s", fields["fileld"], rand.Intn(200), emcToken) // emc下载路径
return fields["ecmurl"] + ecmSourceName, ecmSourceName, nil
}
} else {
return "", "", errors.New("未发现传入的参数")
}
return "", "", nil
}
func divideAndSplit(dividend, divisor uint64) (src uint64, out uint64) {
var quotient uint64
quotient = dividend / divisor
if quotient > 0 {
dividend = dividend - divisor
return dividend, divisor
} else {
divisor = dividend % divisor
return 0, divisor
}
}
func GetEcmUserLoginIntegrationByUserLoginName(urlHost, secretKey string) (string, error) {
ecmUrl := fmt.Sprintf("%s/api/services/Org/UserLoginIntegrationByUserLoginName", urlHost)
body := map[string]interface{}{}
body["loginName"] = 13
body["ipAddress"] = "127.0.0.1"
body["integrationKey"] = secretKey
bodyByte, err := json.Marshal(body)
if err != nil {
return "", err
}
header := http.Header{}
header.Set("Accept", "application/json")
header.Set("Content-Type", "application/json")
resBody, _, err := httpclient.PublicRequireByPost(ecmUrl, header, bodyByte)
if err != nil {
return "", err
}
log.Info("JWT ******", ecmUrl, "\nHeader: ", header, "\nReqest: ", string(bodyByte), "\nRespbody:", string(resBody), "\nerr:", err, "\n")
var resp map[string]interface{}
if err = json.Unmarshal(resBody, &resp); err != nil {
return "", err
}
if token, ok := resp["data"]; !ok {
printStr := fmt.Sprintf("ecmUrl:%s;\nHeader:%s;\nReqest:%s;\nRespbody:%s", ecmUrl, header, string(bodyByte), string(resBody))
return "", fmt.Errorf("未查询到Token。%s", printStr)
} else {
return fmt.Sprintf("%s", token), nil
}
}
func GetEcmUserLoginIntegrationByUserLoginNameV(c *gin.Context) (string, error) {
return GetEcmUserLoginIntegrationByUserLoginNameV1(c, config.C.ECM.UrlHost, config.C.ECM.IntergrationKey)
}
/*
GetEcmToken
TODO: 获取 token,存在漏洞,当token在接口调试的过程中,通过接口获取token一定会被替换掉,则toke无法使用;
1、需要手动删除redis 中的token缓存才能生效
⚠️:正式环境上线后,不允许手动调用token接口进行调试, 除非添加 token 校验策略: 用于判断token的有效性;
*/
func GetEcmUserLoginIntegrationByUserLoginNameV1(c *gin.Context, urlHost, secretKey string) (string, error) {
loginName := c.GetString("ecmAccount")
redisPre := ecmTokenPre + loginName
// 1、 从 reids 缓存中获取 token
redisToken := redis.ImRedisDB.Get(redisPre).Val()
if redisToken != "" {
return redisToken, nil
} else {
// 2、 无法取到toke,直接调用接口生成新的
ecmUrl := fmt.Sprintf("%s/api/services/Org/UserLoginIntegrationByUserLoginName", urlHost)
body := map[string]interface{}{}
body["loginName"] = loginName
body["ipAddress"] = c.Request.Header.Get("X-Real-IP")
body["integrationKey"] = secretKey
bodyByte, err := json.Marshal(body)
if err != nil {
return "", err
}
header := http.Header{}
header.Set("Accept", "application/json")
header.Set("Content-Type", "application/json")
resBody, _, err := httpclient.PublicRequireByPost(ecmUrl, header, bodyByte)
if err != nil {
return "", err
}
log.Info("JWT ******", ecmUrl, "\nHeader: ", header, "\nReqest: ", string(bodyByte), "\nRespbody:", string(resBody), "\nerr:", err, "\n")
var resp map[string]interface{}
if err = json.Unmarshal(resBody, &resp); err != nil {
return "", err
}
if token, ok := resp["data"]; !ok || token == nil {
printStr := fmt.Sprintf("ecmUrl:%s;\nHeader:%s;\nReqest:%s;\nRespbody:%s", ecmUrl, header, string(bodyByte), string(resBody))
return "", fmt.Errorf("未查询到Token。%s", printStr)
} else {
if statusCmd := redis.ImRedisDB.
Set(redisPre, token, time.Duration(util.Get24Time(time.Now()).Unix()-time.Now().Unix())*time.Second); statusCmd.Err() != nil {
log.Error("ecm token 写入 redis 失败,err:", err.Error())
}
return fmt.Sprintf("%s", token), nil
}
}
}
// 取消上传凭证
func (handler *Driver) CancelToken(ctx context.Context, uploadSession *serializer.UploadSession) error {
return handler.bucket.AbortMultipartUpload(oss.InitiateMultipartUploadResult{UploadID: uploadSession.UploadID, Key: uploadSession.SavePath}, nil)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/wuzheng0709/backend-gopkg.git
git@gitee.com:wuzheng0709/backend-gopkg.git
wuzheng0709
backend-gopkg
backend-gopkg
v1.3.7

搜索帮助

344bd9b3 5694891 D2dac590 5694891