3 Star 0 Fork 0

Gitee 极速下载 / gitlab-workhorsesource

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://gitlab.com/gitlab-org/gitlab-workhorse
克隆/下载
object.go 3.46 KB
一键复制 编辑 原始数据 按行查看 历史
Alessio Caiazza 提交于 2018-06-07 08:49 . Objectstorage ETag checking
package objectstore
import (
"context"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"time"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
// httpTransport defines a http.Transport with values
// that are more restrictive than for http.DefaultTransport,
// they define shorter TLS Handshake, and more agressive connection closing
// to prevent the connection hanging and reduce FD usage
var httpTransport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 10 * time.Second,
}).DialContext,
MaxIdleConns: 2,
IdleConnTimeout: 30 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
}
var httpClient = &http.Client{
Transport: httpTransport,
}
type StatusCodeError error
// Object represents an object on a S3 compatible Object Store service.
// It can be used as io.WriteCloser for uploading an object
type Object struct {
// PutURL is a presigned URL for PutObject
PutURL string
// DeleteURL is a presigned URL for RemoveObject
DeleteURL string
uploader
}
// NewObject opens an HTTP connection to Object Store and returns an Object pointer that can be used for uploading.
func NewObject(ctx context.Context, putURL, deleteURL string, deadline time.Time, size int64) (*Object, error) {
return newObject(ctx, putURL, deleteURL, deadline, size, true)
}
func newObject(ctx context.Context, putURL, deleteURL string, deadline time.Time, size int64, metrics bool) (*Object, error) {
started := time.Now()
pr, pw := io.Pipe()
// we should prevent pr.Close() otherwise it may shadow error set with pr.CloseWithError(err)
req, err := http.NewRequest(http.MethodPut, putURL, ioutil.NopCloser(pr))
if err != nil {
if metrics {
objectStorageUploadRequestsRequestFailed.Inc()
}
return nil, fmt.Errorf("PUT %q: %v", helper.ScrubURLParams(putURL), err)
}
req.ContentLength = size
req.Header.Set("Content-Type", "application/octet-stream")
uploadCtx, cancelFn := context.WithDeadline(ctx, deadline)
o := &Object{
PutURL: putURL,
DeleteURL: deleteURL,
uploader: newMD5Uploader(uploadCtx, pw),
}
if metrics {
objectStorageUploadsOpen.Inc()
}
go func() {
// wait for the upload to finish
<-o.ctx.Done()
if metrics {
objectStorageUploadTime.Observe(time.Since(started).Seconds())
}
// wait for provided context to finish before performing cleanup
<-ctx.Done()
o.delete()
}()
go func() {
defer cancelFn()
if metrics {
defer objectStorageUploadsOpen.Dec()
}
defer func() {
// This will be returned as error to the next write operation on the pipe
pr.CloseWithError(o.uploadError)
}()
req = req.WithContext(o.ctx)
resp, err := httpClient.Do(req)
if err != nil {
if metrics {
objectStorageUploadRequestsRequestFailed.Inc()
}
o.uploadError = fmt.Errorf("PUT request %q: %v", helper.ScrubURLParams(o.PutURL), err)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
if metrics {
objectStorageUploadRequestsInvalidStatus.Inc()
}
o.uploadError = StatusCodeError(fmt.Errorf("PUT request %v returned: %s", helper.ScrubURLParams(o.PutURL), resp.Status))
return
}
o.extractETag(resp.Header.Get("ETag"))
if o.etag != o.md5Sum() {
o.uploadError = fmt.Errorf("ETag mismatch. expected %q got %q", o.md5Sum(), o.etag)
return
}
}()
return o, nil
}
func (o *Object) delete() {
o.syncAndDelete(o.DeleteURL)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/mirrors/gitlab-workhorsesource.git
git@gitee.com:mirrors/gitlab-workhorsesource.git
mirrors
gitlab-workhorsesource
gitlab-workhorsesource
v5.2.0

搜索帮助

344bd9b3 5694891 D2dac590 5694891