代码拉取完成,页面将自动刷新
// Package client
// @program: file_client_go
// @author: ygt
// @create: 2022/8/1 9:49
package client
import (
"bytes"
"encoding/json"
"errors"
"github.com/panjf2000/ants/v2"
"io"
"io/ioutil"
"log"
"mime/multipart"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
)
type fc struct{}
var (
FClient = newC()
)
func newC() fu {
return &fc{}
}
type fu interface {
// UploadF 上传文件
//
// param:
// filePath 要上传的文件绝对路径
// uploadUri 服务端地址
// mergeUrl 合并地址
// clientId 客户端唯一id(用户id/用户名/...)
// return:
// FileUploadResponseInfo 返回上传结果
UploadF(filePath string, uploadUrl string, mergeUrl string, clientId string) FileUploadResponseInfo
// DownloadF
//
// param:
// path 下载到本地的地址,目录,不存在会创建
// uri 服务端下载地址
// return
// 文件绝对路径,error
DownloadF(path string, uri string) (string, error)
// DeleteF 删除远程文件
//
// param:
// url: 删除请求url,用文件id拼接而成
// return:
// bool: 删除成功/失败
// string: 成功/失败的消息
DeleteF(uri string) (bool, string)
}
// UploadF 上传文件
//
// param:
// filePath 要上传的文件绝对路径
// uploadUri 服务端地址
// mergeUrl 合并地址
// clientId 客户端唯一id(用户id/用户名/...)
// return:
// FileUploadResponseInfo 返回上传结果
func (fcc *fc) UploadF(filePath string, uploadUrl string, mergeUrl string, clientId string) FileUploadResponseInfo {
//nuCPU := runtime.NumCPU()
//runtime.GOMAXPROCS(nuCPU)
//log.Printf("Running with %d CPUs\n", nuCPU)
log.Println("进入方法,开始上传")
var wg sync.WaitGroup
_, _, size := getFileMetadata(filePath)
identifier := Ic.GetFileIdentifier(filePath)
if size == 0 {
// 空文件,禁止上传
log.Println("禁止上传空文件")
os.Exit(0)
}
totalChunks := getTotalChunks(size)
result, err := check(filePath, uploadUrl, 1, totalChunks, size, identifier, clientId)
if err != nil {
log.Println(err)
}
log.Println(string(result))
var fur FileUploadResponseInfo
_ = json.Unmarshal(result, &fur)
switch fur.State {
case success:
log.Println("上传成功")
break
case remoteStoreCreatBucketFailed:
log.Println("操作远程存储创建桶失败")
break
case createLocalTempDirFailed:
log.Println("服务端创建本地临时目录失败")
break
case initDbFailed:
log.Println("初始化数据库失败")
break
case allowMerge:
// 可以merge
log.Println("合并中...")
fur = m(filePath, mergeUrl, identifier, totalChunks, size, clientId)
break
default:
fallthrough
case breakpointContinuation:
p, _ := ants.NewPoolWithFunc(threadNum, taskFunc, ants.WithPreAlloc(true))
defer p.Release()
wg.Add(len(fur.NotYetUpload))
for _, v := range fur.NotYetUpload {
up := &up{
filePath: filePath,
uploadUri: uploadUrl,
i: int64(v),
totalChunks: totalChunks,
identifier: identifier,
clientId: clientId,
wg: &wg,
}
// cpu比较少,频繁的切换goroutine的效率不如直接for循环按顺序执行任务
// 并行提交任务
go func() {
err = p.Invoke(up)
if err != nil {
log.Println(err)
}
//log.Printf("断点续传========================:%d\n", v)
}()
//taskFunc(up)
}
wg.Wait()
result, err = check(filePath, uploadUrl, 1, totalChunks, size, identifier, clientId)
if err != nil {
log.Println(err)
}
//log.Println(string(result))
_ = json.Unmarshal(result, &fur)
switch fur.State {
case success:
log.Println("上传成功")
break
case allowMerge:
log.Println("合并中...")
fur = m(filePath, mergeUrl, identifier, totalChunks, size, clientId)
break
case breakpointContinuation:
// 所有goroutine已经执行完,单独片失败也已经重试了,还是返回有需要上传的片,说明已经失败了
log.Println("上传失败")
fur.Msg = "上传失败"
break
}
break
}
return fur
}
// DownloadF
//
// param:
// path 下载到本地的地址,目录,不存在会创建
// uri 服务端下载地址
// return
// 文件绝对路径,error
func (fcc *fc) DownloadF(path string, uri string) (string, error) {
ok, dir := checkPath(path)
if !ok {
return "", errors.New(dir)
}
//发送get请求
req, err := http.NewRequest(http.MethodGet, uri, nil)
if err != nil {
return "", err
}
resp, err := getHttpClient().Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
//body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
if 400 == resp.StatusCode {
// 异常,直接返回
body, _ := ioutil.ReadAll(resp.Body)
log.Println(string(body))
return "", errors.New(string(body))
}
fileName := getFileNameFromHeader(resp)
fileFullPath := dir + string(os.PathSeparator) + fileName
myFile, err := os.Create(fileFullPath)
defer myFile.Close()
if err != nil {
log.Printf("创建文件失败:error : %v\n", err)
return "", err
}
_, err = io.Copy(myFile, resp.Body)
//_, err = myFile.Write(body)
if err != nil {
log.Printf("写入文件失败:error : %v\n", err)
return "", err
}
return fileFullPath, nil
}
// DeleteF 删除远程文件
//
// param:
// url: 删除请求url,用文件id拼接而成
// return:
// bool: 删除成功/失败
// string: 成功/失败的消息
func (fcc *fc) DeleteF(uri string) (bool, string) {
//发送delete请求
req, err := http.NewRequest(http.MethodDelete, uri, nil)
if err != nil {
log.Println("构造请求失败,err:", err)
return false, err.Error()
}
resp, err := getHttpClient().Do(req)
if err != nil {
log.Println("发送请求失败,err:", err)
return false, err.Error()
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
var fdr FileDeleteResponseInfo
_ = json.Unmarshal(body, &fdr)
return "ok" == fdr.Data, fdr.Msg
}
func m(filePath string, mergeUrl string, identifier string, totalChunks float64, size int64, clientId string) FileUploadResponseInfo {
result, err := merge(filePath, mergeUrl, identifier, totalChunks, size, clientId)
if err != nil {
log.Println(err)
}
var fur FileUploadResponseInfo
_ = json.Unmarshal(result, &fur)
switch fur.State {
case success:
log.Println("合并成功,上传成功")
break
case denyMerge:
log.Println("未上传完成,禁止合并")
break
case mergeFailed:
log.Println("合并失败")
break
case verifyFailed:
log.Println("校验唯一标识(xxh)失败")
break
case allowMerge:
result, err = merge(filePath, mergeUrl, identifier, totalChunks, size, clientId)
break
case uploadRemoteStoreFailed:
log.Println("合并完成,上传远程存储失败")
break
}
//log.Println(string(result))
return fur
}
// 查询文件上传信息
func check(filePath string, uploadUrl string, chunkNumber int64, totalChunks float64, size int64, identifier string, clientId string) ([]byte, error) {
_, fileName, _ := getFileMetadata(filePath)
//发送get请求
req, err := http.NewRequest(http.MethodGet, uploadUrl, nil)
if err != nil {
return nil, err
}
q := req.URL.Query()
q.Add("fileName", fileName)
q.Add("identifier", identifier)
q.Add("chunkNumber", strconv.FormatInt(chunkNumber, 10))
q.Add("totalChunks", strconv.FormatFloat(totalChunks, 'f', -1, 64))
q.Add("totalSize", strconv.FormatInt(size, 10))
q.Add("clientId", clientId)
req.URL.RawQuery = q.Encode()
//添加头文件
//获取返回值
resp, err := getHttpClient().Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, nil
}
// multipartF
// 上传文件
func multipartF(filePath string, uploadUrl string, chunkNumber, offset int64, totalChunks float64, identifier string, clientId string) ([]byte, error) {
_, fileName, size := getFileMetadata(filePath)
bodyBuf := &bytes.Buffer{}
bodyWriter := multipart.NewWriter(bodyBuf)
//添加表单属性
// 参数1 (File参数)
f, err := os.Open(filePath)
defer f.Close()
if err != nil {
return nil, err
}
b := make([]byte, chunkSize)
if chunkNumber == int64(totalChunks) {
if size-chunkNumber*chunkSize < chunkSize {
// 最后一块大小不够chunkSize
b = make([]byte, size-(chunkNumber-1)*chunkSize)
}
}
l, _ := f.ReadAt(b, offset)
fw, err := bodyWriter.CreateFormFile("file", filepath.Base(filePath))
if err != nil {
return nil, err
}
_, _ = fw.Write(b)
//把文件流写入到缓冲区里去
_ = bodyWriter.WriteField("fileName", fileName)
_ = bodyWriter.WriteField("identifier", identifier)
_ = bodyWriter.WriteField("totalSize", strconv.FormatInt(size, 10))
_ = bodyWriter.WriteField("chunkNumber", strconv.FormatInt(chunkNumber, 10))
_ = bodyWriter.WriteField("totalChunks", strconv.FormatFloat(totalChunks, 'f', -1, 64))
_ = bodyWriter.WriteField("currentChunkSize", strconv.Itoa(l))
_ = bodyWriter.WriteField("clientId", clientId)
// 一定要记着关闭
err = bodyWriter.Close()
if err != nil {
log.Println(err)
return nil, err
}
//发送post请求
req, err := http.NewRequest(http.MethodPost, uploadUrl, bodyBuf)
if err != nil {
return nil, err
}
//添加头文件
req.Header.Set("Content-Type", bodyWriter.FormDataContentType())
//获取返回值
resp, err := getHttpClient().Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, nil
}
// 合并文件
func merge(filePath string, mergeUrl string, identifier string, totalChunks float64, size int64, clientId string) ([]byte, error) {
_, fileName, _ := getFileMetadata(filePath)
data := make(url.Values)
data["fileName"] = []string{fileName}
data["identifier"] = []string{identifier}
data["totalSize"] = []string{strconv.FormatInt(size, 10)}
data["totalChunks"] = []string{strconv.FormatFloat(totalChunks, 'f', -1, 64)}
data["clientId"] = []string{clientId}
req, err := http.NewRequest(http.MethodPost, mergeUrl, strings.NewReader(data.Encode()))
if err != nil {
return nil, err
}
//添加头文件
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
//获取返回值
resp, err := getHttpClient().Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。