代码拉取完成,页面将自动刷新
package glib
import (
"context"
"errors"
"io"
"net/http"
"sync"
"time"
)
type ParallelFailType uint8
const (
ParallelFailTypeAll ParallelFailType = 1
ParallelFailTypeOne ParallelFailType = 2
)
type ParallelHandle func(cancelCtx context.Context) error
type ParallelErrorHandle func(err error)
type ParallelFunc func() error
type ParallelHttpNewRequest func(cancelCtx context.Context) (*http.Request, error)
type ParallelHttpResponseFormat func(io.ReadCloser) error
type _ParallelItem struct {
handle ParallelHandle
eHandle ParallelErrorHandle
}
type ParallelMgt struct {
handles []*_ParallelItem
lock sync.Mutex
}
func (r *ParallelMgt) AddFunc(handle ParallelFunc) int {
return r.AddHandles(ParallelGetFunc(handle), nil)
}
func (r *ParallelMgt) AddHttpRequest(client *http.Client, newReqHandler ParallelHttpNewRequest, reqFormatHandler ParallelHttpResponseFormat) int {
return r.AddHandles(ParallelGetHttpRequest(client, newReqHandler, reqFormatHandler), nil)
}
// AddHandles
// @handle 需要处理 cancelCtx 事件
func (r *ParallelMgt) AddHandles(handle ParallelHandle, errHandle ParallelErrorHandle) int {
if r.handles == nil {
r.handles = make([]*_ParallelItem, 0, 4)
}
defer r.lock.Unlock()
r.lock.Lock()
r.handles = append(r.handles, &_ParallelItem{handle: handle, eHandle: errHandle})
return len(r.handles)
}
func (r *ParallelMgt) Run(timeout time.Duration, failType ParallelFailType) []error {
errs := make([]error, 0, len(r.handles))
wg := sync.WaitGroup{}
wg.Add(len(r.handles))
var (
cancel context.CancelFunc
ctx context.Context
)
if timeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), timeout)
} else {
ctx, cancel = context.WithCancel(context.Background())
}
defer cancel()
for k := range r.handles {
go func(v *_ParallelItem) {
defer wg.Done()
if err := v.handle(ctx); err != nil {
if v.eHandle != nil {
v.eHandle(err)
}
defer r.lock.Unlock()
r.lock.Lock()
errs = append(errs, err)
if failType == ParallelFailTypeOne && cancel != nil {
cancel()
}
}
}(r.handles[k])
}
wg.Wait()
return errs
}
func ParallelGetFunc(handle ParallelFunc) ParallelHandle {
return func(cancelCtx context.Context) error {
proc := make(chan error, 1)
go func() { defer close(proc); proc <- handle() }()
var err error
select {
case <-cancelCtx.Done():
err = cancelCtx.Err()
case err, _ = <-proc:
}
return err
}
}
func ParallelGetHttpRequest(client *http.Client, newReqHandler ParallelHttpNewRequest, reqFormatHandler ParallelHttpResponseFormat) ParallelHandle {
return func(cancelCtx context.Context) error {
req, err := newReqHandler(cancelCtx)
if err != nil {
return err
}
resp, err := client.Do(req)
if err != nil {
return errors.New("[" + req.URL.String() + "] " + err.Error())
}
defer resp.Body.Close()
err = reqFormatHandler(resp.Body)
if err != nil {
return errors.New("[" + req.URL.String() + "] " + err.Error())
}
return nil
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。