1 Star 0 Fork 1

freely随意/BaiduPCS-Go

forked from ttpc2008/BaiduPCS-Go 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
worker.go 2.38 KB
一键复制 编辑 原始数据 按行查看 历史
konica 提交于 2018-08-04 22:27 . 更新错误处理, 增加搜索文件
package uploader
import (
"context"
"github.com/iikira/BaiduPCS-Go/pcsutil/waitgroup"
"github.com/oleiade/lane"
"os"
"time"
)
type (
worker struct {
id int
partOffset int64
splitUnit SplitUnit
checksum string
}
workerList []*worker
)
// CheckSumList 返回所以worker的checksum
// TODO: 实现sort
func (werl *workerList) CheckSumList() []string {
checksumList := make([]string, 0, len(*werl))
for _, wer := range *werl {
checksumList = append(checksumList, wer.checksum)
}
return checksumList
}
func (werl *workerList) Readed() int64 {
var readed int64
for _, wer := range *werl {
readed += wer.splitUnit.Readed()
}
return readed
}
func (muer *MultiUploader) upload() (uperr error) {
err := muer.multiUpload.Precreate()
if err != nil {
return err
}
var (
deque = lane.NewDeque()
// 控制并发量
wg = waitgroup.NewWaitGroup(muer.parallel)
)
// 加入队列
for _, wer := range muer.workers {
if wer.checksum == "" {
deque.Append(wer)
}
}
for {
e := deque.Shift()
if e == nil { // 任务为空
if wg.Parallel() == 0 { // 结束
break
} else {
time.Sleep(1e9)
continue
}
}
wer := e.(*worker)
wg.AddDelta()
go func() {
defer wg.Done()
var (
ctx, cancel = context.WithCancel(context.Background())
doneChan = make(chan struct{})
checksum string
terr error
)
go func() {
checksum, terr = muer.multiUpload.TmpFile(ctx, int(wer.id), wer.partOffset, wer.splitUnit)
close(doneChan)
}()
select {
case <-muer.canceled:
cancel()
return
case <-doneChan:
// continue
}
cancel()
if terr != nil {
if me, ok := terr.(*MultiError); ok {
if me.Terminated { // 终止
close(muer.canceled)
uperr = me.Err
return
}
}
uploaderVerbose.Warnf("upload err: %s, id: %d\n", terr, wer.id)
wer.splitUnit.Seek(0, os.SEEK_SET)
deque.Append(wer)
return
}
wer.checksum = checksum
// 通知更新
if muer.updateInstanceStateChan != nil && len(muer.updateInstanceStateChan) < cap(muer.updateInstanceStateChan) {
muer.updateInstanceStateChan <- struct{}{}
}
}()
}
wg.Wait()
select {
case <-muer.canceled:
if uperr != nil {
return uperr
}
return context.Canceled
default:
}
cerr := muer.multiUpload.CreateSuperFile(muer.workers.CheckSumList()...)
if cerr != nil {
return cerr
}
return
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/freelysuiyi/BaiduPCS-Go.git
git@gitee.com:freelysuiyi/BaiduPCS-Go.git
freelysuiyi
BaiduPCS-Go
BaiduPCS-Go
v3.5.4

搜索帮助