1 Star 0 Fork 0

kzangv/gsf-lib

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
parallel.go 2.95 KB
一键复制 编辑 原始数据 按行查看 历史
kzangv 提交于 2023-04-21 08:42 +08:00 . fixed
package glib
import (
"context"
"errors"
"io"
"net/http"
"sync"
"time"
)
type ParallelFailType uint8
const (
ParallelFailTypeAll ParallelFailType = 1
ParallelFailTypeOne ParallelFailType = 2
)
type ParallelHandle func(cancelCtx context.Context) error
type ParallelErrorHandle func(err error)
type ParallelFunc func() error
type ParallelHttpNewRequest func(cancelCtx context.Context) (*http.Request, error)
type ParallelHttpResponseFormat func(io.ReadCloser) error
type _ParallelItem struct {
handle ParallelHandle
eHandle ParallelErrorHandle
}
type ParallelMgt struct {
handles []*_ParallelItem
lock sync.Mutex
}
func (r *ParallelMgt) AddFunc(handle ParallelFunc) int {
return r.AddHandles(ParallelGetFunc(handle), nil)
}
func (r *ParallelMgt) AddHttpRequest(client *http.Client, newReqHandler ParallelHttpNewRequest, reqFormatHandler ParallelHttpResponseFormat) int {
return r.AddHandles(ParallelGetHttpRequest(client, newReqHandler, reqFormatHandler), nil)
}
// AddHandles
// @handle 需要处理 cancelCtx 事件
func (r *ParallelMgt) AddHandles(handle ParallelHandle, errHandle ParallelErrorHandle) int {
if r.handles == nil {
r.handles = make([]*_ParallelItem, 0, 4)
}
defer r.lock.Unlock()
r.lock.Lock()
r.handles = append(r.handles, &_ParallelItem{handle: handle, eHandle: errHandle})
return len(r.handles)
}
func (r *ParallelMgt) Run(timeout time.Duration, failType ParallelFailType) []error {
errs := make([]error, 0, len(r.handles))
wg := sync.WaitGroup{}
wg.Add(len(r.handles))
var (
cancel context.CancelFunc
ctx context.Context
)
if timeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), timeout)
} else {
ctx, cancel = context.WithCancel(context.Background())
}
defer cancel()
for k := range r.handles {
go func(v *_ParallelItem) {
defer wg.Done()
if err := v.handle(ctx); err != nil {
if v.eHandle != nil {
v.eHandle(err)
}
defer r.lock.Unlock()
r.lock.Lock()
errs = append(errs, err)
if failType == ParallelFailTypeOne && cancel != nil {
cancel()
}
}
}(r.handles[k])
}
wg.Wait()
return errs
}
func ParallelGetFunc(handle ParallelFunc) ParallelHandle {
return func(cancelCtx context.Context) error {
proc := make(chan error, 1)
go func() { defer close(proc); proc <- handle() }()
var err error
select {
case <-cancelCtx.Done():
err = cancelCtx.Err()
case err, _ = <-proc:
}
return err
}
}
func ParallelGetHttpRequest(client *http.Client, newReqHandler ParallelHttpNewRequest, reqFormatHandler ParallelHttpResponseFormat) ParallelHandle {
return func(cancelCtx context.Context) error {
req, err := newReqHandler(cancelCtx)
if err != nil {
return err
}
resp, err := client.Do(req)
if err != nil {
return errors.New("[" + req.URL.String() + "] " + err.Error())
}
defer resp.Body.Close()
err = reqFormatHandler(resp.Body)
if err != nil {
return errors.New("[" + req.URL.String() + "] " + err.Error())
}
return nil
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/kzangv/gsf-lib.git
git@gitee.com:kzangv/gsf-lib.git
kzangv
gsf-lib
gsf-lib
v0.2.1

搜索帮助