代码拉取完成,页面将自动刷新
package onedrive
import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"gitee.com/wuzheng0709/backend-gopkg/infrastructure/pkg/conf"
	"io"
	"io/ioutil"
	"net/http"
	"net/url"
	"path"
	"strconv"
	"strings"
	"time"
	"gitee.com/wuzheng0709/backend-gopkg/infrastructure/pkg/cache"
	"gitee.com/wuzheng0709/backend-gopkg/infrastructure/pkg/filesystem/chunk"
	"gitee.com/wuzheng0709/backend-gopkg/infrastructure/pkg/filesystem/chunk/backoff"
	"gitee.com/wuzheng0709/backend-gopkg/infrastructure/pkg/filesystem/fsctx"
	model "gitee.com/wuzheng0709/backend-gopkg/infrastructure/pkg/models"
	"gitee.com/wuzheng0709/backend-gopkg/infrastructure/pkg/mq"
	"gitee.com/wuzheng0709/backend-gopkg/infrastructure/pkg/request"
	"gitee.com/wuzheng0709/backend-gopkg/infrastructure/pkg/util"
)
const (
	// SmallFileSize 单文件上传接口最大尺寸
	SmallFileSize uint64 = 4 * 1024 * 1024
	// ChunkSize 服务端中转分片上传分片大小
	ChunkSize uint64 = 10 * 1024 * 1024
	// ListRetry 列取请求重试次数
	ListRetry       = 1
	chunkRetrySleep = time.Second * 5
)
// GetSourcePath 获取文件的绝对路径
func (info *FileInfo) GetSourcePath() string {
	res, err := url.PathUnescape(
		strings.TrimPrefix(
			path.Join(
				strings.TrimPrefix(info.ParentReference.Path, "/drive/root:"),
				info.Name,
			),
			"/",
		),
	)
	if err != nil {
		return ""
	}
	return res
}
// Error 实现error接口
func (err RespError) Error() string {
	return err.APIError.Message
}
func (client *Client) getRequestURL(api string, opts ...Option) string {
	options := newDefaultOption()
	for _, o := range opts {
		o.apply(options)
	}
	base, _ := url.Parse(client.Endpoints.EndpointURL)
	if base == nil {
		return ""
	}
	if options.useDriverResource {
		base.Path = path.Join(base.Path, client.Endpoints.DriverResource, api)
	} else {
		base.Path = path.Join(base.Path, api)
	}
	return base.String()
}
// ListChildren 根据路径列取子对象
func (client *Client) ListChildren(ctx context.Context, path string) ([]FileInfo, error) {
	var requestURL string
	dst := strings.TrimPrefix(path, "/")
	if dst == "" {
		requestURL = client.getRequestURL("root/children")
	} else {
		requestURL = client.getRequestURL("root:/" + dst + ":/children")
	}
	res, err := client.requestWithStr(ctx, "GET", requestURL+"?$top=999999999", "", 200)
	if err != nil {
		retried := 0
		if v, ok := ctx.Value(fsctx.RetryCtx).(int); ok {
			retried = v
		}
		if retried < ListRetry {
			retried++
			util.Log().Debug("路径[%s]列取请求失败[%s],5秒钟后重试", path, err)
			time.Sleep(time.Duration(5) * time.Second)
			return client.ListChildren(context.WithValue(ctx, fsctx.RetryCtx, retried), path)
		}
		return nil, err
	}
	var (
		decodeErr error
		fileInfo  ListResponse
	)
	decodeErr = json.Unmarshal([]byte(res), &fileInfo)
	if decodeErr != nil {
		return nil, decodeErr
	}
	return fileInfo.Value, nil
}
// Meta 根据资源ID或文件路径获取文件元信息
func (client *Client) Meta(ctx context.Context, id string, path string) (*FileInfo, error) {
	var requestURL string
	if id != "" {
		requestURL = client.getRequestURL("items/" + id)
	} else {
		dst := strings.TrimPrefix(path, "/")
		requestURL = client.getRequestURL("root:/" + dst)
	}
	res, err := client.requestWithStr(ctx, "GET", requestURL+"?expand=thumbnails", "", 200)
	if err != nil {
		return nil, err
	}
	var (
		decodeErr error
		fileInfo  FileInfo
	)
	decodeErr = json.Unmarshal([]byte(res), &fileInfo)
	if decodeErr != nil {
		return nil, decodeErr
	}
	return &fileInfo, nil
}
// CreateUploadSession 创建分片上传会话
func (client *Client) CreateUploadSession(ctx context.Context, dst string, opts ...Option) (string, error) {
	options := newDefaultOption()
	for _, o := range opts {
		o.apply(options)
	}
	dst = strings.TrimPrefix(dst, "/")
	requestURL := client.getRequestURL("root:/" + dst + ":/createUploadSession")
	body := map[string]map[string]interface{}{
		"item": {
			"@microsoft.graph.conflictBehavior": options.conflictBehavior,
		},
	}
	bodyBytes, _ := json.Marshal(body)
	res, err := client.requestWithStr(ctx, "POST", requestURL, string(bodyBytes), 200)
	if err != nil {
		return "", err
	}
	var (
		decodeErr     error
		uploadSession UploadSessionResponse
	)
	decodeErr = json.Unmarshal([]byte(res), &uploadSession)
	if decodeErr != nil {
		return "", decodeErr
	}
	return uploadSession.UploadURL, nil
}
// GetSiteIDByURL 通过 SharePoint 站点 URL 获取站点ID
func (client *Client) GetSiteIDByURL(ctx context.Context, siteUrl string) (string, error) {
	siteUrlParsed, err := url.Parse(siteUrl)
	if err != nil {
		return "", err
	}
	hostName := siteUrlParsed.Hostname()
	relativePath := strings.Trim(siteUrlParsed.Path, "/")
	requestURL := client.getRequestURL(fmt.Sprintf("sites/%s:/%s", hostName, relativePath), WithDriverResource(false))
	res, reqErr := client.requestWithStr(ctx, "GET", requestURL, "", 200)
	if reqErr != nil {
		return "", reqErr
	}
	var (
		decodeErr error
		siteInfo  Site
	)
	decodeErr = json.Unmarshal([]byte(res), &siteInfo)
	if decodeErr != nil {
		return "", decodeErr
	}
	return siteInfo.ID, nil
}
// GetUploadSessionStatus 查询上传会话状态
func (client *Client) GetUploadSessionStatus(ctx context.Context, uploadURL string) (*UploadSessionResponse, error) {
	res, err := client.requestWithStr(ctx, "GET", uploadURL, "", 200)
	if err != nil {
		return nil, err
	}
	var (
		decodeErr     error
		uploadSession UploadSessionResponse
	)
	decodeErr = json.Unmarshal([]byte(res), &uploadSession)
	if decodeErr != nil {
		return nil, decodeErr
	}
	return &uploadSession, nil
}
// UploadChunk 上传分片
func (client *Client) UploadChunk(ctx context.Context, uploadURL string, content io.Reader, current *chunk.ChunkGroup) (*UploadSessionResponse, error) {
	res, err := client.request(
		ctx, "PUT", uploadURL, content,
		request.WithContentLength(current.Length()),
		request.WithHeader(http.Header{
			"Content-Range": {current.RangeHeader()},
		}),
		request.WithoutHeader([]string{"Authorization", "Content-Type"}),
		request.WithTimeout(0),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to upload OneDrive chunk #%d: %w", current.Index(), err)
	}
	if current.IsLast() {
		return nil, nil
	}
	var (
		decodeErr error
		uploadRes UploadSessionResponse
	)
	decodeErr = json.Unmarshal([]byte(res), &uploadRes)
	if decodeErr != nil {
		return nil, decodeErr
	}
	return &uploadRes, nil
}
// Upload 上传文件
func (client *Client) Upload(ctx context.Context, file fsctx.FileHeader) error {
	fileInfo := file.Info()
	// 决定是否覆盖文件
	overwrite := "fail"
	if fileInfo.Mode&fsctx.Overwrite == fsctx.Overwrite {
		overwrite = "replace"
	}
	size := int(fileInfo.Size)
	dst := fileInfo.SavePath
	// 小文件,使用简单上传接口上传
	if size <= int(SmallFileSize) {
		_, err := client.SimpleUpload(ctx, dst, file, int64(size), WithConflictBehavior(overwrite))
		return err
	}
	// 大文件,进行分片
	// 创建上传会话
	uploadURL, err := client.CreateUploadSession(ctx, dst, WithConflictBehavior(overwrite))
	if err != nil {
		return err
	}
	// Initial chunk groups
	chunks := chunk.NewChunkGroup(file, client.Policy.OptionsSerialized.ChunkSize, &backoff.ConstantBackoff{
		Max:   model.GetIntSetting("chunk_retries", 5),
		Sleep: chunkRetrySleep,
	}, model.IsTrueVal(model.GetSettingByName("use_temp_chunk_buffer")))
	uploadFunc := func(current *chunk.ChunkGroup, content io.Reader) error {
		_, err := client.UploadChunk(ctx, uploadURL, content, current)
		return err
	}
	// upload chunks
	for chunks.Next() {
		if err := chunks.Process(uploadFunc); err != nil {
			return fmt.Errorf("failed to upload chunk #%d: %w", chunks.Index(), err)
		}
	}
	return nil
}
// DeleteUploadSession 删除上传会话
func (client *Client) DeleteUploadSession(ctx context.Context, uploadURL string) error {
	_, err := client.requestWithStr(ctx, "DELETE", uploadURL, "", 204)
	if err != nil {
		return err
	}
	return nil
}
// SimpleUpload 上传小文件到dst
func (client *Client) SimpleUpload(ctx context.Context, dst string, body io.Reader, size int64, opts ...Option) (*UploadResult, error) {
	options := newDefaultOption()
	for _, o := range opts {
		o.apply(options)
	}
	dst = strings.TrimPrefix(dst, "/")
	requestURL := client.getRequestURL("root:/" + dst + ":/content")
	requestURL += ("?@microsoft.graph.conflictBehavior=" + options.conflictBehavior)
	res, err := client.request(ctx, "PUT", requestURL, body, request.WithContentLength(int64(size)),
		request.WithTimeout(0),
	)
	if err != nil {
		return nil, err
	}
	var (
		decodeErr error
		uploadRes UploadResult
	)
	decodeErr = json.Unmarshal([]byte(res), &uploadRes)
	if decodeErr != nil {
		return nil, decodeErr
	}
	return &uploadRes, nil
}
// BatchDelete 并行删除给出的文件,返回删除失败的文件,及第一个遇到的错误。此方法将文件分为
// 20个一组,调用Delete并行删除
// TODO 测试
func (client *Client) BatchDelete(ctx context.Context, dst []string) ([]string, error) {
	groupNum := len(dst)/20 + 1
	finalRes := make([]string, 0, len(dst))
	res := make([]string, 0, 20)
	var err error
	for i := 0; i < groupNum; i++ {
		end := 20*i + 20
		if i == groupNum-1 {
			end = len(dst)
		}
		res, err = client.Delete(ctx, dst[20*i:end])
		finalRes = append(finalRes, res...)
	}
	return finalRes, err
}
// Delete 并行删除文件,返回删除失败的文件,及第一个遇到的错误,
// 由于API限制,最多删除20个
func (client *Client) Delete(ctx context.Context, dst []string) ([]string, error) {
	body := client.makeBatchDeleteRequestsBody(dst)
	res, err := client.requestWithStr(ctx, "POST", client.getRequestURL("$batch",
		WithDriverResource(false)), body, 200)
	if err != nil {
		return dst, err
	}
	var (
		decodeErr error
		deleteRes BatchResponses
	)
	decodeErr = json.Unmarshal([]byte(res), &deleteRes)
	if decodeErr != nil {
		return dst, decodeErr
	}
	// 取得删除失败的文件
	failed := getDeleteFailed(&deleteRes)
	if len(failed) != 0 {
		return failed, ErrDeleteFile
	}
	return failed, nil
}
func getDeleteFailed(res *BatchResponses) []string {
	var failed = make([]string, 0, len(res.Responses))
	for _, v := range res.Responses {
		if v.Status != 204 && v.Status != 404 {
			failed = append(failed, v.ID)
		}
	}
	return failed
}
// makeBatchDeleteRequestsBody 生成批量删除请求正文
func (client *Client) makeBatchDeleteRequestsBody(files []string) string {
	req := BatchRequests{
		Requests: make([]BatchRequest, len(files)),
	}
	for i, v := range files {
		v = strings.TrimPrefix(v, "/")
		filePath, _ := url.Parse("/" + client.Endpoints.DriverResource + "/root:/")
		filePath.Path = path.Join(filePath.Path, v)
		req.Requests[i] = BatchRequest{
			ID:     v,
			Method: "DELETE",
			URL:    filePath.EscapedPath(),
		}
	}
	res, _ := json.Marshal(req)
	return string(res)
}
// GetThumbURL 获取给定尺寸的缩略图URL
func (client *Client) GetThumbURL(ctx context.Context, dst string, w, h uint) (string, error) {
	dst = strings.TrimPrefix(dst, "/")
	requestURL := client.getRequestURL("root:/"+dst+":/thumbnails/0") + "/large"
	res, err := client.requestWithStr(ctx, "GET", requestURL, "", 200)
	if err != nil {
		return "", err
	}
	var (
		decodeErr error
		thumbRes  ThumbResponse
	)
	decodeErr = json.Unmarshal([]byte(res), &thumbRes)
	if decodeErr != nil {
		return "", decodeErr
	}
	if thumbRes.URL != "" {
		return thumbRes.URL, nil
	}
	if len(thumbRes.Value) == 1 {
		if res, ok := thumbRes.Value[0]["large"]; ok {
			return res.(map[string]interface{})["url"].(string), nil
		}
	}
	return "", errors.New("无法生成缩略图")
}
// MonitorUpload 监控客户端分片上传进度
func (client *Client) MonitorUpload(uploadURL, callbackKey, path string, size uint64, ttl int64) {
	// 回调完成通知chan
	callbackChan := mq.GlobalMQ.Subscribe(callbackKey, 1)
	defer mq.GlobalMQ.Unsubscribe(callbackKey, callbackChan)
	timeout := model.GetIntSetting("onedrive_monitor_timeout", 600)
	interval := model.GetIntSetting("onedrive_callback_check", 20)
	for {
		select {
		case <-callbackChan:
			util.Log().Debug("客户端完成回调")
			return
		case <-time.After(time.Duration(ttl) * time.Second):
			// 上传会话到期,仍未完成上传,创建占位符
			client.DeleteUploadSession(context.Background(), uploadURL)
			_, err := client.SimpleUpload(context.Background(), path, strings.NewReader(""), 0, WithConflictBehavior("replace"))
			if err != nil {
				util.Log().Debug("无法创建占位文件,%s", err)
			}
			return
		case <-time.After(time.Duration(timeout) * time.Second):
			util.Log().Debug("检查上传情况")
			status, err := client.GetUploadSessionStatus(context.Background(), uploadURL)
			if err != nil {
				if resErr, ok := err.(*RespError); ok {
					if resErr.APIError.Code == "itemNotFound" {
						util.Log().Debug("上传会话已完成,稍后检查回调")
						select {
						case <-time.After(time.Duration(interval) * time.Second):
							util.Log().Warning("未发送回调,删除文件")
							cache.Deletes([]string{callbackKey}, "callback_")
							_, err = client.Delete(context.Background(), []string{path})
							if err != nil {
								util.Log().Warning("无法删除未回调的文件,%s", err)
							}
						case <-callbackChan:
							util.Log().Debug("客户端完成回调")
						}
						return
					}
				}
				util.Log().Debug("无法获取上传会话状态,继续下一轮,%s", err.Error())
				continue
			}
			// 成功获取分片上传状态,检查文件大小
			if len(status.NextExpectedRanges) == 0 {
				continue
			}
			sizeRange := strings.Split(
				status.NextExpectedRanges[len(status.NextExpectedRanges)-1],
				"-",
			)
			if len(sizeRange) != 2 {
				continue
			}
			uploadFullSize, _ := strconv.ParseUint(sizeRange[1], 10, 64)
			if (sizeRange[0] == "0" && sizeRange[1] == "") || uploadFullSize+1 != size {
				util.Log().Debug("未开始上传或文件大小不一致,取消上传会话")
				// 取消上传会话,实测OneDrive取消上传会话后,客户端还是可以上传,
				// 所以上传一个空文件占位,阻止客户端上传
				client.DeleteUploadSession(context.Background(), uploadURL)
				_, err := client.SimpleUpload(context.Background(), path, strings.NewReader(""), 0, WithConflictBehavior("replace"))
				if err != nil {
					util.Log().Debug("无法创建占位文件,%s", err)
				}
				return
			}
		}
	}
}
func sysError(err error) *RespError {
	return &RespError{APIError: APIError{
		Code:    "system",
		Message: err.Error(),
	}}
}
func (client *Client) request(ctx context.Context, method string, url string, body io.Reader, option ...request.Option) (string, *RespError) {
	// 获取凭证
	err := client.UpdateCredential(ctx, conf.SystemConfig.Mode == "slave")
	if err != nil {
		return "", sysError(err)
	}
	option = append(option,
		request.WithHeader(http.Header{
			"Authorization": {"Bearer " + client.Credential.AccessToken},
			"Content-Type":  {"application/json"},
		}),
		request.WithContext(ctx),
		//request.WithTPSLimit(
		//	fmt.Sprintf("policy_%d", client.Policy.ID),
		//	client.Policy.OptionsSerialized.TPSLimit,
		//	client.Policy.OptionsSerialized.TPSLimitBurst,
		//),
	)
	// 发送请求
	res := client.Request.Request(
		method,
		url,
		body,
		option...,
	)
	if res.Err != nil {
		return "", sysError(res.Err)
	}
	respBody, err := res.GetResponse()
	if err != nil {
		return "", sysError(err)
	}
	// 解析请求响应
	var (
		errResp   RespError
		decodeErr error
	)
	// 如果有错误
	if res.Response.StatusCode < 200 || res.Response.StatusCode >= 300 {
		decodeErr = json.Unmarshal([]byte(respBody), &errResp)
		if decodeErr != nil {
			util.Log().Debug("Onedrive返回未知响应[%s]", respBody)
			return "", sysError(decodeErr)
		}
		return "", &errResp
	}
	return respBody, nil
}
func (client *Client) requestWithStr(ctx context.Context, method string, url string, body string, expectedCode int) (string, *RespError) {
	// 发送请求
	bodyReader := ioutil.NopCloser(strings.NewReader(body))
	return client.request(ctx, method, url, bodyReader,
		request.WithContentLength(int64(len(body))),
	)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。
 马建仓 AI 助手
马建仓 AI 助手