1 Star 0 Fork 0

zhangjungang/beats

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
simple_transp.go 3.84 KB
一键复制 编辑 原始数据 按行查看 历史
Steffen Siering 提交于 2017-04-28 09:08 . Heartbeat event format (#4091)
package http
import (
"bufio"
"compress/gzip"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strings"
"github.com/elastic/beats/libbeat/outputs/transport"
)
const (
gzipEncoding = "gzip"
urlSchemaHTTP = "http"
urlSchemaHTTPS = "https"
)
// SimpleTransport contains the dialer and read/write callbacks
type SimpleTransport struct {
Dialer transport.Dialer
DisableCompression bool
OnStartWrite func()
OnEndWrite func()
OnStartRead func()
}
func (t *SimpleTransport) checkRequest(req *http.Request) error {
if req.URL == nil {
return errors.New("http: missing request URL")
}
if req.Header == nil {
return errors.New("http: missing request headers")
}
scheme := req.URL.Scheme
isHTTP := scheme == urlSchemaHTTP || scheme == urlSchemaHTTPS
if !isHTTP {
return fmt.Errorf("http: unsupported scheme %v", scheme)
}
if req.URL.Host == "" {
return errors.New("http: no host in URL")
}
return nil
}
// RoundTrip sets up goroutines to write the request and read the responses
func (t *SimpleTransport) RoundTrip(req *http.Request) (*http.Response, error) {
type readReturn struct {
resp *http.Response
err error
}
defer reqCloseBody(req)
if err := t.checkRequest(req); err != nil {
return nil, err
}
conn, err := t.Dialer.Dial("tcp", canonicalAddr(req.URL))
if err != nil {
return nil, err
}
defer conn.Close()
requestedGzip := false
if t.DisableCompression &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
req.Method != "HEAD" {
requestedGzip = true
req.Header.Add("Accept-Encoding", gzipEncoding)
defer req.Header.Del("Accept-Encoding")
}
done := req.Context().Done()
readerDone := make(chan readReturn, 1)
writerDone := make(chan error, 1)
// write request
go func() {
writerDone <- t.writeRequest(conn, req)
}()
// read response
go func() {
resp, err := t.readResponse(conn, req, requestedGzip)
readerDone <- readReturn{resp, err}
}()
select {
case err := <-writerDone:
if err != nil {
return nil, err
}
case <-done:
return nil, errors.New("http: request timed out before writing finished")
}
close(writerDone)
var ret readReturn
select {
case ret = <-readerDone:
break
case <-done:
return nil, errors.New("http: request timed out while waiting for response")
}
close(readerDone)
return ret.resp, ret.err
}
func (t *SimpleTransport) writeRequest(conn net.Conn, req *http.Request) error {
writer := bufio.NewWriter(conn)
t.sigStartWrite()
err := req.Write(writer)
if err == nil {
err = writer.Flush()
}
t.sigEndWrite()
return err
}
func (t *SimpleTransport) readResponse(
conn net.Conn,
req *http.Request,
requestedGzip bool,
) (*http.Response, error) {
reader := bufio.NewReader(conn)
resp, err := http.ReadResponse(reader, req)
if err != nil {
return nil, err
}
t.sigStartRead()
if requestedGzip && resp.Header.Get("Content-Encoding") == gzipEncoding {
resp.Header.Del("Content-Encoding")
resp.Header.Del("Content-Length")
resp.ContentLength = -1
unzipper, err := gzip.NewReader(resp.Body)
if err != nil {
resp.Body.Close()
return nil, err
}
resp.Body = struct {
io.Reader
io.Closer
}{unzipper, resp.Body}
}
return resp, nil
}
func (t *SimpleTransport) sigStartRead() { call(t.OnStartRead) }
func (t *SimpleTransport) sigStartWrite() { call(t.OnStartWrite) }
func (t *SimpleTransport) sigEndWrite() { call(t.OnEndWrite) }
func call(f func()) {
if f != nil {
f()
}
}
func reqCloseBody(req *http.Request) {
if req.Body != nil {
req.Body.Close()
}
}
func canonicalAddr(url *url.URL) string {
scheme, addr := url.Scheme, url.Host
if !hasPort(addr) {
portmap := map[string]string{
"http": "80",
"https": "443",
}
addr = net.JoinHostPort(addr, portmap[scheme])
}
return addr
}
func hasPort(s string) bool {
return strings.LastIndex(s, ":") > strings.LastIndex(s, "]")
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/zhangjungang/beats.git
git@gitee.com:zhangjungang/beats.git
zhangjungang
beats
beats
v6.2.3

搜索帮助