1 Star 0 Fork 0

lqinggang/psiphon-tunnel-core

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
meekConn.go 42.81 KB
一键复制 编辑 原始数据 按行查看 历史
Rod Hynes 提交于 2019-01-25 00:58 +08:00 . Fix: comment typo
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299
/*
* Copyright (c) 2015, Psiphon Inc.
* All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package psiphon
import (
"bytes"
"context"
"crypto/rand"
"crypto/tls"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/Psiphon-Labs/goarista/monotime"
"github.com/Psiphon-Labs/net/http2"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/crypto/nacl/box"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/obfuscator"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/parameters"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/protocol"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/upstreamproxy"
)
// MeekConn is based on meek-client.go from Tor and Psiphon:
//
// https://gitweb.torproject.org/pluggable-transports/meek.git/blob/HEAD:/meek-client/meek-client.go
// CC0 1.0 Universal
//
// https://bitbucket.org/psiphon/psiphon-circumvention-system/src/default/go/meek-client/meek-client.go
const (
MEEK_PROTOCOL_VERSION = 3
MEEK_MAX_REQUEST_PAYLOAD_LENGTH = 65536
)
// MeekConfig specifies the behavior of a MeekConn
type MeekConfig struct {
// ClientParameters is the active set of client parameters to use
// for the meek dial.
ClientParameters *parameters.ClientParameters
// DialAddress is the actual network address to dial to establish a
// connection to the meek server. This may be either a fronted or
// direct address. The address must be in the form "host:port",
// where host may be a domain name or IP address.
DialAddress string
// UseHTTPS indicates whether to use HTTPS (true) or HTTP (false).
UseHTTPS bool
// TLSProfile specifies the value for CustomTLSConfig.TLSProfile for all
// underlying TLS connections created by this meek connection.
TLSProfile string
// RandomizedTLSProfileSeed specifies the value for
// CustomTLSConfig.RandomizedTLSProfileSeed for all underlying TLS
// connections created by this meek connection.
RandomizedTLSProfileSeed *prng.Seed
// UseObfuscatedSessionTickets indicates whether to use obfuscated
// session tickets. Assumes UseHTTPS is true.
UseObfuscatedSessionTickets bool
// SNIServerName is the value to place in the TLS SNI server_name
// field when HTTPS is used.
SNIServerName string
// HostHeader is the value to place in the HTTP request Host header.
HostHeader string
// TransformedHostName records whether a hostname transformation is
// in effect. This value is used for stats reporting.
TransformedHostName bool
// ClientTunnelProtocol is the protocol the client is using. It's
// included in the meek cookie for optional use by the server, in
// cases where the server cannot unambiguously determine the
// tunnel protocol.
ClientTunnelProtocol string
// RoundTripperOnly sets the MeekConn to operate in round tripper
// mode, which is used for untunneled tactics requests. In this
// mode, a connection is established to the meek server as usual,
// but instead of relaying tunnel traffic, the RoundTrip function
// may be used to make requests. In this mode, no relay resources
// incuding buffers are allocated.
RoundTripperOnly bool
// The following values are used to create the obfuscated meek cookie.
MeekCookieEncryptionPublicKey string
MeekObfuscatedKey string
MeekObfuscatorPaddingSeed *prng.Seed
}
// MeekConn is a network connection that tunnels TCP over HTTP and supports "fronting". Meek sends
// client->server flow in HTTP request bodies and receives server->client flow in HTTP response bodies.
// Polling is used to achieve full duplex TCP.
//
// Fronting is an obfuscation technique in which the connection
// to a web server, typically a CDN, is indistinguishable from any other HTTPS connection to the generic
// "fronting domain" -- the HTTP Host header is used to route the requests to the actual destination.
// See https://trac.torproject.org/projects/tor/wiki/doc/meek for more details.
//
// MeekConn also operates in unfronted mode, in which plain HTTP connections are made without routing
// through a CDN.
type MeekConn struct {
clientParameters *parameters.ClientParameters
url *url.URL
additionalHeaders http.Header
cookie *http.Cookie
cachedTLSDialer *cachedTLSDialer
transport transporter
mutex sync.Mutex
isClosed bool
runCtx context.Context
stopRunning context.CancelFunc
relayWaitGroup *sync.WaitGroup
// For round tripper mode
roundTripperOnly bool
meekCookieEncryptionPublicKey string
meekObfuscatedKey string
meekObfuscatorPaddingSeed *prng.Seed
clientTunnelProtocol string
// For relay mode
fullReceiveBufferLength int
readPayloadChunkLength int
emptyReceiveBuffer chan *bytes.Buffer
partialReceiveBuffer chan *bytes.Buffer
fullReceiveBuffer chan *bytes.Buffer
emptySendBuffer chan *bytes.Buffer
partialSendBuffer chan *bytes.Buffer
fullSendBuffer chan *bytes.Buffer
}
// transporter is implemented by both http.Transport and upstreamproxy.ProxyAuthTransport.
type transporter interface {
CloseIdleConnections()
RoundTrip(req *http.Request) (resp *http.Response, err error)
}
// DialMeek returns an initialized meek connection. A meek connection is
// an HTTP session which does not depend on an underlying socket connection (although
// persistent HTTP connections are used for performance). This function does not
// wait for the connection to be "established" before returning. A goroutine
// is spawned which will eventually start HTTP polling.
// When frontingAddress is not "", fronting is used. This option assumes caller has
// already checked server entry capabilities.
func DialMeek(
ctx context.Context,
meekConfig *MeekConfig,
dialConfig *DialConfig) (meek *MeekConn, err error) {
runCtx, stopRunning := context.WithCancel(context.Background())
cleanupStopRunning := true
cleanupCachedTLSDialer := true
var cachedTLSDialer *cachedTLSDialer
// Cleanup in error cases
defer func() {
if cleanupStopRunning {
stopRunning()
}
if cleanupCachedTLSDialer && cachedTLSDialer != nil {
cachedTLSDialer.close()
}
}()
// Configure transport: HTTP or HTTPS
var scheme string
var transport transporter
var additionalHeaders http.Header
var proxyUrl func(*http.Request) (*url.URL, error)
if meekConfig.UseHTTPS {
// Custom TLS dialer:
//
// 1. ignores the HTTP request address and uses the fronting domain
// 2. optionally disables SNI -- SNI breaks fronting when used with certain CDNs.
// 3. skips verifying the server cert.
//
// Reasoning for #3:
//
// With a TLS MiM attack in place, and server certs verified, we'll fail to connect because the client
// will refuse to connect. That's not a successful outcome.
//
// With a MiM attack in place, and server certs not verified, we'll fail to connect if the MiM is actively
// targeting Psiphon and classifying the HTTP traffic by Host header or payload signature.
//
// However, in the case of a passive MiM that's just recording traffic or an active MiM that's targeting
// something other than Psiphon, the client will connect. This is a successful outcome.
//
// What is exposed to the MiM? The Host header does not contain a Psiphon server IP address, just an
// unrelated, randomly generated domain name which cannot be used to block direct connections. The
// Psiphon server IP is sent over meek, but it's in the encrypted cookie.
//
// The payload (user traffic) gets its confidentiality and integrity from the underlying SSH protocol.
// So, nothing is leaked to the MiM apart from signatures which could be used to classify the traffic
// as Psiphon to possibly block it; but note that not revealing that the client is Psiphon is outside
// our threat model; we merely seek to evade mass blocking by taking steps that require progressively
// more effort to block.
//
// There is a subtle attack remaining: an adversary that can MiM some CDNs but not others (and so can
// classify Psiphon traffic on some CDNs but not others) may throttle non-MiM CDNs so that our server
// selection always chooses tunnels to the MiM CDN (without any server cert verification, we won't
// exclusively connect to non-MiM CDNs); then the adversary kills the underlying TCP connection after
// some short period. This is partially mitigated by tactics mechanisms.
scheme = "https"
tlsConfig := &CustomTLSConfig{
ClientParameters: meekConfig.ClientParameters,
DialAddr: meekConfig.DialAddress,
Dial: NewTCPDialer(dialConfig),
SNIServerName: meekConfig.SNIServerName,
SkipVerify: true,
TLSProfile: meekConfig.TLSProfile,
RandomizedTLSProfileSeed: meekConfig.RandomizedTLSProfileSeed,
TrustedCACertificatesFilename: dialConfig.TrustedCACertificatesFilename,
}
tlsConfig.EnableClientSessionCache(meekConfig.ClientParameters)
if meekConfig.UseObfuscatedSessionTickets {
tlsConfig.ObfuscatedSessionTicketKey = meekConfig.MeekObfuscatedKey
}
tlsDialer := NewCustomTLSDialer(tlsConfig)
// Pre-dial one TLS connection in order to inspect the negotiated
// application protocol. Then we create an HTTP/2 or HTTP/1.1 transport
// depending on which protocol was negotiated. The TLS dialer
// is assumed to negotiate only "h2" or "http/1.1"; or not negotiate
// an application protocol.
//
// We cannot rely on net/http's HTTP/2 support since it's only
// activated when http.Transport.DialTLS returns a golang crypto/tls.Conn;
// e.g., https://github.com/golang/go/blob/c8aec4095e089ff6ac50d18e97c3f46561f14f48/src/net/http/transport.go#L1040
//
// The pre-dialed connection is stored in a cachedTLSDialer, which will
// return the cached pre-dialed connection to its first Dial caller, and
// use the tlsDialer for all other Dials.
//
// cachedTLSDialer.close() must be called on all exits paths from this
// function and in meek.Close() to ensure the cached conn is closed in
// any case where no Dial call is made.
//
// The pre-dial must be interruptible so that DialMeek doesn't block and
// hang/delay a shutdown or end of establishment. So the pre-dial uses
// the Controller's PendingConns, not the MeekConn PendingConns. For this
// purpose, a special preDialer is configured.
//
// Only one pre-dial attempt is made; there are no retries. This differs
// from relayRoundTrip, which retries and may redial for each retry.
// Retries at the pre-dial phase are less useful since there's no active
// session to preserve, and establishment will simply try another server.
// Note that the underlying TCPDial may still try multiple IP addreses when
// the destination is a domain and it resolves to multiple IP adresses.
// The pre-dial is made within the parent dial context, so that DialMeek
// may be interrupted. Subsequent dials are made within the meek round trip
// request context. Since http.DialTLS doesn't take a context argument
// (yet; as of Go 1.9 this issue is still open: https://github.com/golang/go/issues/21526),
// cachedTLSDialer is used as a conduit to send the request context.
// meekConn.relayRoundTrip sets its request context into cachedTLSDialer,
// and cachedTLSDialer.dial uses that context.
// As DialAddr is set in the CustomTLSConfig, no address is required here.
preConn, err := tlsDialer(ctx, "tcp", "")
if err != nil {
return nil, common.ContextError(err)
}
cachedTLSDialer = newCachedTLSDialer(preConn, tlsDialer)
if IsTLSConnUsingHTTP2(preConn) {
NoticeInfo("negotiated HTTP/2 for %s", meekConfig.DialAddress)
transport = &http2.Transport{
DialTLS: func(network, addr string, _ *tls.Config) (net.Conn, error) {
return cachedTLSDialer.dial(network, addr)
},
}
} else {
transport = &http.Transport{
DialTLS: func(network, addr string) (net.Conn, error) {
return cachedTLSDialer.dial(network, addr)
},
}
}
} else {
scheme = "http"
var dialer Dialer
// For HTTP, and when the meekConfig.DialAddress matches the
// meekConfig.HostHeader, we let http.Transport handle proxying.
// http.Transport will put the the HTTP server address in the HTTP
// request line. In this one case, we can use an HTTP proxy that does
// not offer CONNECT support.
if strings.HasPrefix(dialConfig.UpstreamProxyURL, "http://") &&
(meekConfig.DialAddress == meekConfig.HostHeader ||
meekConfig.DialAddress == meekConfig.HostHeader+":80") {
url, err := url.Parse(dialConfig.UpstreamProxyURL)
if err != nil {
return nil, common.ContextError(err)
}
proxyUrl = http.ProxyURL(url)
// Here, the dialer must use the address that http.Transport
// passes in (which will be proxy address).
copyDialConfig := new(DialConfig)
*copyDialConfig = *dialConfig
copyDialConfig.UpstreamProxyURL = ""
dialer = NewTCPDialer(copyDialConfig)
} else {
baseDialer := NewTCPDialer(dialConfig)
// The dialer ignores any address that http.Transport will pass in
// (derived from the HTTP request URL) and always dials
// meekConfig.DialAddress.
dialer = func(ctx context.Context, network, _ string) (net.Conn, error) {
return baseDialer(ctx, network, meekConfig.DialAddress)
}
}
httpTransport := &http.Transport{
Proxy: proxyUrl,
DialContext: dialer,
}
if proxyUrl != nil {
// Wrap transport with a transport that can perform HTTP proxy auth negotiation
transport, err = upstreamproxy.NewProxyAuthTransport(httpTransport, dialConfig.CustomHeaders)
if err != nil {
return nil, common.ContextError(err)
}
} else {
transport = httpTransport
}
}
url := &url.URL{
Scheme: scheme,
Host: meekConfig.HostHeader,
Path: "/",
}
if meekConfig.UseHTTPS {
host, _, err := net.SplitHostPort(meekConfig.DialAddress)
if err != nil {
return nil, common.ContextError(err)
}
additionalHeaders = map[string][]string{
"X-Psiphon-Fronting-Address": {host},
}
} else {
if proxyUrl == nil {
additionalHeaders = dialConfig.CustomHeaders
}
}
// The main loop of a MeekConn is run in the relay() goroutine.
// A MeekConn implements net.Conn concurrency semantics:
// "Multiple goroutines may invoke methods on a Conn simultaneously."
//
// Read() calls and relay() are synchronized by exchanging control of a single
// receiveBuffer (bytes.Buffer). This single buffer may be:
// - in the emptyReceiveBuffer channel when it is available and empty;
// - in the partialReadBuffer channel when it is available and contains data;
// - in the fullReadBuffer channel when it is available and full of data;
// - "checked out" by relay or Read when they are are writing to or reading from the
// buffer, respectively.
// relay() will obtain the buffer from either the empty or partial channel but block when
// the buffer is full. Read will obtain the buffer from the partial or full channel when
// there is data to read but block when the buffer is empty.
// Write() calls and relay() are synchronized in a similar way, using a single
// sendBuffer.
meek = &MeekConn{
clientParameters: meekConfig.ClientParameters,
url: url,
additionalHeaders: additionalHeaders,
cachedTLSDialer: cachedTLSDialer,
transport: transport,
isClosed: false,
runCtx: runCtx,
stopRunning: stopRunning,
relayWaitGroup: new(sync.WaitGroup),
roundTripperOnly: meekConfig.RoundTripperOnly,
}
// stopRunning and cachedTLSDialer will now be closed in meek.Close()
cleanupStopRunning = false
cleanupCachedTLSDialer = false
// Allocate relay resources, including buffers and running the relay
// go routine, only when running in relay mode.
if !meek.roundTripperOnly {
cookie, err := makeMeekCookie(
meek.clientParameters,
meekConfig.MeekCookieEncryptionPublicKey,
meekConfig.MeekObfuscatedKey,
meekConfig.MeekObfuscatorPaddingSeed,
meekConfig.ClientTunnelProtocol,
"")
if err != nil {
return nil, common.ContextError(err)
}
meek.cookie = cookie
p := meekConfig.ClientParameters.Get()
if p.Bool(parameters.MeekLimitBufferSizes) {
meek.fullReceiveBufferLength = p.Int(parameters.MeekLimitedFullReceiveBufferLength)
meek.readPayloadChunkLength = p.Int(parameters.MeekLimitedReadPayloadChunkLength)
} else {
meek.fullReceiveBufferLength = p.Int(parameters.MeekFullReceiveBufferLength)
meek.readPayloadChunkLength = p.Int(parameters.MeekReadPayloadChunkLength)
}
p = nil
meek.emptyReceiveBuffer = make(chan *bytes.Buffer, 1)
meek.partialReceiveBuffer = make(chan *bytes.Buffer, 1)
meek.fullReceiveBuffer = make(chan *bytes.Buffer, 1)
meek.emptySendBuffer = make(chan *bytes.Buffer, 1)
meek.partialSendBuffer = make(chan *bytes.Buffer, 1)
meek.fullSendBuffer = make(chan *bytes.Buffer, 1)
meek.emptyReceiveBuffer <- new(bytes.Buffer)
meek.emptySendBuffer <- new(bytes.Buffer)
meek.relayWaitGroup.Add(1)
go meek.relay()
} else {
meek.meekCookieEncryptionPublicKey = meekConfig.MeekCookieEncryptionPublicKey
meek.meekObfuscatedKey = meekConfig.MeekObfuscatedKey
meek.meekObfuscatorPaddingSeed = meekConfig.MeekObfuscatorPaddingSeed
meek.clientTunnelProtocol = meekConfig.ClientTunnelProtocol
}
return meek, nil
}
type cachedTLSDialer struct {
usedCachedConn int32
cachedConn net.Conn
requestCtx atomic.Value
dialer Dialer
}
func newCachedTLSDialer(cachedConn net.Conn, dialer Dialer) *cachedTLSDialer {
return &cachedTLSDialer{
cachedConn: cachedConn,
dialer: dialer,
}
}
func (c *cachedTLSDialer) setRequestContext(requestCtx context.Context) {
c.requestCtx.Store(requestCtx)
}
func (c *cachedTLSDialer) dial(network, addr string) (net.Conn, error) {
if atomic.CompareAndSwapInt32(&c.usedCachedConn, 0, 1) {
conn := c.cachedConn
c.cachedConn = nil
return conn, nil
}
ctx := c.requestCtx.Load().(context.Context)
if ctx == nil {
ctx = context.Background()
}
return c.dialer(ctx, network, addr)
}
func (c *cachedTLSDialer) close() {
if atomic.CompareAndSwapInt32(&c.usedCachedConn, 0, 1) {
c.cachedConn.Close()
c.cachedConn = nil
}
}
// Close terminates the meek connection. Close waits for the relay goroutine
// to stop (in relay mode) and releases HTTP transport resources.
// A mutex is required to support net.Conn concurrency semantics.
func (meek *MeekConn) Close() (err error) {
meek.mutex.Lock()
isClosed := meek.isClosed
meek.isClosed = true
meek.mutex.Unlock()
if !isClosed {
meek.stopRunning()
if meek.cachedTLSDialer != nil {
meek.cachedTLSDialer.close()
}
meek.relayWaitGroup.Wait()
meek.transport.CloseIdleConnections()
}
return nil
}
// IsClosed implements the Closer iterface. The return value
// indicates whether the MeekConn has been closed.
func (meek *MeekConn) IsClosed() bool {
meek.mutex.Lock()
isClosed := meek.isClosed
meek.mutex.Unlock()
return isClosed
}
// RoundTrip makes a request to the meek server and returns the response.
// A new, obfuscated meek cookie is created for every request. The specified
// end point is recorded in the cookie and is not exposed as plaintext in the
// meek traffic. The caller is responsible for obfuscating the request body.
//
// RoundTrip is not safe for concurrent use, and Close must not be called
// concurrently. The caller must ensure onlt one RoundTrip call is active
// at once and that it completes before calling Close.
//
// RoundTrip is only available in round tripper mode.
func (meek *MeekConn) RoundTrip(
ctx context.Context, endPoint string, requestBody []byte) ([]byte, error) {
if !meek.roundTripperOnly {
return nil, common.ContextError(errors.New("operation unsupported"))
}
cookie, err := makeMeekCookie(
meek.clientParameters,
meek.meekCookieEncryptionPublicKey,
meek.meekObfuscatedKey,
meek.meekObfuscatorPaddingSeed,
meek.clientTunnelProtocol,
endPoint)
if err != nil {
return nil, common.ContextError(err)
}
// Note:
//
// - multiple, concurrent RoundTrip calls are unsafe due to the
// meek.cachedTLSDialer.setRequestContext call in newRequest
//
// - concurrent Close and RoundTrip calls are unsafe as Close
// does not synchronize with RoundTrip before calling
// meek.transport.CloseIdleConnections(), so resources could
// be left open.
//
// At this time, RoundTrip is used for tactics in Controller and
// the concurrency constraints are satisfied.
request, cancelFunc, err := meek.newRequest(
ctx, cookie, bytes.NewReader(requestBody), 0)
if err != nil {
return nil, common.ContextError(err)
}
defer cancelFunc()
response, err := meek.transport.RoundTrip(request)
if err == nil {
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
err = fmt.Errorf("unexpected response status code: %d", response.StatusCode)
}
}
if err != nil {
return nil, common.ContextError(err)
}
responseBody, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, common.ContextError(err)
}
return responseBody, nil
}
// Read reads data from the connection.
// net.Conn Deadlines are ignored. net.Conn concurrency semantics are supported.
func (meek *MeekConn) Read(buffer []byte) (n int, err error) {
if meek.roundTripperOnly {
return 0, common.ContextError(errors.New("operation unsupported"))
}
if meek.IsClosed() {
return 0, common.ContextError(errors.New("meek connection is closed"))
}
// Block until there is received data to consume
var receiveBuffer *bytes.Buffer
select {
case receiveBuffer = <-meek.partialReceiveBuffer:
case receiveBuffer = <-meek.fullReceiveBuffer:
case <-meek.runCtx.Done():
return 0, common.ContextError(errors.New("meek connection has closed"))
}
n, err = receiveBuffer.Read(buffer)
meek.replaceReceiveBuffer(receiveBuffer)
return n, err
}
// Write writes data to the connection.
// net.Conn Deadlines are ignored. net.Conn concurrency semantics are supported.
func (meek *MeekConn) Write(buffer []byte) (n int, err error) {
if meek.roundTripperOnly {
return 0, common.ContextError(errors.New("operation unsupported"))
}
if meek.IsClosed() {
return 0, common.ContextError(errors.New("meek connection is closed"))
}
// Repeats until all n bytes are written
n = len(buffer)
for len(buffer) > 0 {
// Block until there is capacity in the send buffer
var sendBuffer *bytes.Buffer
select {
case sendBuffer = <-meek.emptySendBuffer:
case sendBuffer = <-meek.partialSendBuffer:
case <-meek.runCtx.Done():
return 0, common.ContextError(errors.New("meek connection has closed"))
}
writeLen := MEEK_MAX_REQUEST_PAYLOAD_LENGTH - sendBuffer.Len()
if writeLen > 0 {
if writeLen > len(buffer) {
writeLen = len(buffer)
}
_, err = sendBuffer.Write(buffer[:writeLen])
buffer = buffer[writeLen:]
}
meek.replaceSendBuffer(sendBuffer)
}
return n, err
}
// LocalAddr is a stub implementation of net.Conn.LocalAddr
func (meek *MeekConn) LocalAddr() net.Addr {
return nil
}
// RemoteAddr is a stub implementation of net.Conn.RemoteAddr
func (meek *MeekConn) RemoteAddr() net.Addr {
return nil
}
// SetDeadline is a stub implementation of net.Conn.SetDeadline
func (meek *MeekConn) SetDeadline(t time.Time) error {
return common.ContextError(errors.New("not supported"))
}
// SetReadDeadline is a stub implementation of net.Conn.SetReadDeadline
func (meek *MeekConn) SetReadDeadline(t time.Time) error {
return common.ContextError(errors.New("not supported"))
}
// SetWriteDeadline is a stub implementation of net.Conn.SetWriteDeadline
func (meek *MeekConn) SetWriteDeadline(t time.Time) error {
return common.ContextError(errors.New("not supported"))
}
func (meek *MeekConn) replaceReceiveBuffer(receiveBuffer *bytes.Buffer) {
switch {
case receiveBuffer.Len() == 0:
meek.emptyReceiveBuffer <- receiveBuffer
case receiveBuffer.Len() >= meek.fullReceiveBufferLength:
meek.fullReceiveBuffer <- receiveBuffer
default:
meek.partialReceiveBuffer <- receiveBuffer
}
}
func (meek *MeekConn) replaceSendBuffer(sendBuffer *bytes.Buffer) {
switch {
case sendBuffer.Len() == 0:
meek.emptySendBuffer <- sendBuffer
case sendBuffer.Len() >= MEEK_MAX_REQUEST_PAYLOAD_LENGTH:
meek.fullSendBuffer <- sendBuffer
default:
meek.partialSendBuffer <- sendBuffer
}
}
// relay sends and receives tunneled traffic (payload). An HTTP request is
// triggered when data is in the write queue or at a polling interval.
// There's a geometric increase, up to a maximum, in the polling interval when
// no data is exchanged. Only one HTTP request is in flight at a time.
func (meek *MeekConn) relay() {
// Note: meek.Close() calls here in relay() are made asynchronously
// (using goroutines) since Close() will wait on this WaitGroup.
defer meek.relayWaitGroup.Done()
p := meek.clientParameters.Get()
interval := prng.JitterDuration(
p.Duration(parameters.MeekMinPollInterval),
p.Float(parameters.MeekMinPollIntervalJitter))
p = nil
timeout := time.NewTimer(interval)
defer timeout.Stop()
for {
timeout.Reset(interval)
// Block until there is payload to send or it is time to poll
var sendBuffer *bytes.Buffer
select {
case sendBuffer = <-meek.partialSendBuffer:
case sendBuffer = <-meek.fullSendBuffer:
case <-timeout.C:
// In the polling case, send an empty payload
case <-meek.runCtx.Done():
// Drop through to second Done() check
}
// Check Done() again, to ensure it takes precedence
select {
case <-meek.runCtx.Done():
return
default:
}
sendPayloadSize := 0
if sendBuffer != nil {
sendPayloadSize = sendBuffer.Len()
}
// relayRoundTrip will replace sendBuffer (by calling replaceSendBuffer). This
// is a compromise to conserve memory. Using a second buffer here, we could
// copy sendBuffer and immediately replace it, unblocking meekConn.Write() and
// allowing more upstream payload to immediately enqueue. Instead, the request
// payload is read directly from sendBuffer, including retries. Only once the
// server has acknowledged the request payload is sendBuffer replaced. This
// still allows meekConn.Write() to unblock before the round trip response is
// read.
receivedPayloadSize, err := meek.relayRoundTrip(sendBuffer)
if err != nil {
select {
case <-meek.runCtx.Done():
// In this case, meek.relayRoundTrip encountered Done(). Exit without
// logging error.
return
default:
}
NoticeAlert("%s", common.ContextError(err))
go meek.Close()
return
}
// Calculate polling interval. When data is received,
// immediately request more. Otherwise, schedule next
// poll with exponential back off. Jitter and coin
// flips are used to avoid trivial, static traffic
// timing patterns.
p := meek.clientParameters.Get()
if receivedPayloadSize > 0 || sendPayloadSize > 0 {
interval = 0
} else if interval == 0 {
interval = prng.JitterDuration(
p.Duration(parameters.MeekMinPollInterval),
p.Float(parameters.MeekMinPollIntervalJitter))
} else {
if p.WeightedCoinFlip(parameters.MeekApplyPollIntervalMultiplierProbability) {
interval =
time.Duration(float64(interval) *
p.Float(parameters.MeekPollIntervalMultiplier))
}
interval = prng.JitterDuration(
interval,
p.Float(parameters.MeekPollIntervalJitter))
if interval >= p.Duration(parameters.MeekMaxPollInterval) {
interval = prng.JitterDuration(
p.Duration(parameters.MeekMaxPollInterval),
p.Float(parameters.MeekMaxPollIntervalJitter))
}
}
p = nil
}
}
// readCloseSignaller is an io.ReadCloser wrapper for an io.Reader
// that is passed, as the request body, to http.Transport.RoundTrip.
// readCloseSignaller adds the AwaitClosed call, which is used
// to schedule recycling the buffer underlying the reader only after
// RoundTrip has called Close and will no longer use the buffer.
// See: https://golang.org/pkg/net/http/#RoundTripper
type readCloseSignaller struct {
context context.Context
reader io.Reader
closed chan struct{}
}
func NewReadCloseSignaller(
context context.Context,
reader io.Reader) *readCloseSignaller {
return &readCloseSignaller{
context: context,
reader: reader,
closed: make(chan struct{}, 1),
}
}
func (r *readCloseSignaller) Read(p []byte) (int, error) {
return r.reader.Read(p)
}
func (r *readCloseSignaller) Close() error {
select {
case r.closed <- *new(struct{}):
default:
}
return nil
}
func (r *readCloseSignaller) AwaitClosed() bool {
select {
case <-r.context.Done():
case <-r.closed:
return true
}
return false
}
// newRequest performs common request setup for both relay and round
// tripper modes.
//
// newRequest is not safe for concurrent calls due to its use of
// cachedTLSDialer.setRequestContext.
//
// The caller must call the returned cancelFunc.
func (meek *MeekConn) newRequest(
ctx context.Context,
cookie *http.Cookie,
body io.Reader,
contentLength int) (*http.Request, context.CancelFunc, error) {
var requestCtx context.Context
var cancelFunc context.CancelFunc
if ctx != nil {
requestCtx, cancelFunc = context.WithCancel(ctx)
} else {
// - meek.stopRunning() will abort a round trip in flight
// - round trip will abort if it exceeds timeout
requestCtx, cancelFunc = context.WithTimeout(
meek.runCtx,
meek.clientParameters.Get().Duration(parameters.MeekRoundTripTimeout))
}
// Ensure TLS dials are made within the current request context.
if meek.cachedTLSDialer != nil {
meek.cachedTLSDialer.setRequestContext(requestCtx)
}
request, err := http.NewRequest("POST", meek.url.String(), body)
if err != nil {
return nil, cancelFunc, common.ContextError(err)
}
request = request.WithContext(requestCtx)
// Content-Length may not be be set automatically due to the
// underlying type of requestBody.
if contentLength > 0 {
request.ContentLength = int64(contentLength)
}
meek.addAdditionalHeaders(request)
request.Header.Set("Content-Type", "application/octet-stream")
if cookie == nil {
cookie = meek.cookie
}
request.AddCookie(cookie)
return request, cancelFunc, nil
}
// relayRoundTrip configures and makes the actual HTTP POST request
func (meek *MeekConn) relayRoundTrip(sendBuffer *bytes.Buffer) (int64, error) {
// Retries are made when the round trip fails. This adds resiliency
// to connection interruption and intermittent failures.
//
// At least one retry is always attempted, and retries continue
// while still within a brief deadline -- 5 seconds, currently the
// deadline for an actively probed SSH connection to timeout. There
// is a brief delay between retries, allowing for intermittent
// failure states to resolve.
//
// Failure may occur at various stages of the HTTP request:
//
// 1. Before the request begins. In this case, the entire request
// may be rerun.
//
// 2. While sending the request payload. In this case, the client
// must resend its request payload. The server will not have
// relayed its partially received request payload.
//
// 3. After sending the request payload but before receiving
// a response. The client cannot distinguish between case 2 and
// this case, case 3. The client resends its payload and the
// server detects this and skips relaying the request payload.
//
// 4. While reading the response payload. The client will omit its
// request payload when retrying, as the server has already
// acknowledged it. The client will also indicate to the server
// the amount of response payload already received, and the
// server will skip resending the indicated amount of response
// payload.
//
// Retries are indicated to the server by adding a Range header,
// which includes the response payload resend position.
defer func() {
// Ensure sendBuffer is replaced, even in error code paths.
if sendBuffer != nil {
sendBuffer.Truncate(0)
meek.replaceSendBuffer(sendBuffer)
}
}()
retries := uint(0)
p := meek.clientParameters.Get()
retryDeadline := monotime.Now().Add(p.Duration(parameters.MeekRoundTripRetryDeadline))
retryDelay := p.Duration(parameters.MeekRoundTripRetryMinDelay)
retryMaxDelay := p.Duration(parameters.MeekRoundTripRetryMaxDelay)
retryMultiplier := p.Float(parameters.MeekRoundTripRetryMultiplier)
p = nil
serverAcknowledgedRequestPayload := false
receivedPayloadSize := int64(0)
for try := 0; ; try++ {
// Omit the request payload when retrying after receiving a
// partial server response.
var signaller *readCloseSignaller
var requestBody io.ReadCloser
contentLength := 0
if !serverAcknowledgedRequestPayload && sendBuffer != nil {
// sendBuffer will be replaced once the data is no longer needed,
// when RoundTrip calls Close on the Body; this allows meekConn.Write()
// to unblock and start buffering data for the next roung trip while
// still reading the current round trip response. signaller provides
// the hook for awaiting RoundTrip's call to Close.
signaller = NewReadCloseSignaller(meek.runCtx, bytes.NewReader(sendBuffer.Bytes()))
requestBody = signaller
contentLength = sendBuffer.Len()
}
request, cancelFunc, err := meek.newRequest(
nil,
nil,
requestBody,
contentLength)
if err != nil {
// Don't retry when can't initialize a Request
return 0, common.ContextError(err)
}
expectedStatusCode := http.StatusOK
// When retrying, add a Range header to indicate how much
// of the response was already received.
if try > 0 {
expectedStatusCode = http.StatusPartialContent
request.Header.Set("Range", fmt.Sprintf("bytes=%d-", receivedPayloadSize))
}
response, err := meek.transport.RoundTrip(request)
// Wait for RoundTrip to call Close on the request body, when
// there is one. This is necessary to ensure it's safe to
// subsequently replace sendBuffer in both the success and
// error cases.
if signaller != nil {
if !signaller.AwaitClosed() {
// AwaitClosed encountered Done(). Abort immediately. Do not
// replace sendBuffer, as we cannot be certain RoundTrip is
// done with it. MeekConn.Write will exit on Done and not hang
// awaiting sendBuffer.
sendBuffer = nil
return 0, common.ContextError(errors.New("meek connection has closed"))
}
}
if err != nil {
select {
case <-meek.runCtx.Done():
// Exit without retrying and without logging error.
return 0, common.ContextError(err)
default:
}
NoticeAlert("meek round trip failed: %s", err)
// ...continue to retry
}
if err == nil {
if response.StatusCode != expectedStatusCode &&
// Certain http servers return 200 OK where we expect 206, so accept that.
!(expectedStatusCode == http.StatusPartialContent && response.StatusCode == http.StatusOK) {
// Don't retry when the status code is incorrect
response.Body.Close()
return 0, common.ContextError(
fmt.Errorf(
"unexpected status code: %d instead of %d",
response.StatusCode, expectedStatusCode))
}
// Update meek session cookie
for _, c := range response.Cookies() {
if meek.cookie.Name == c.Name {
meek.cookie.Value = c.Value
break
}
}
// Received the response status code, so the server
// must have received the request payload.
serverAcknowledgedRequestPayload = true
// sendBuffer is now no longer required for retries, and the
// buffer may be replaced; this allows meekConn.Write() to unblock
// and start buffering data for the next round trip while still
// reading the current round trip response.
if sendBuffer != nil {
// Assumes signaller.AwaitClosed is called above, so
// sendBuffer will no longer be accessed by RoundTrip.
sendBuffer.Truncate(0)
meek.replaceSendBuffer(sendBuffer)
sendBuffer = nil
}
readPayloadSize, err := meek.readPayload(response.Body)
response.Body.Close()
// receivedPayloadSize is the number of response
// payload bytes received and relayed. A retry can
// resume after this position.
receivedPayloadSize += readPayloadSize
if err != nil {
NoticeAlert("meek read payload failed: %s", err)
// ...continue to retry
} else {
// Round trip completed successfully
break
}
}
// Release context resources now.
cancelFunc()
// Either the request failed entirely, or there was a failure
// streaming the response payload. Always retry once. Then
// retry if time remains; when the next delay exceeds the time
// remaining until the deadline, do not retry.
now := monotime.Now()
if retries >= 1 &&
(now.After(retryDeadline) || retryDeadline.Sub(now) <= retryDelay) {
return 0, common.ContextError(err)
}
retries += 1
delayTimer := time.NewTimer(retryDelay)
select {
case <-delayTimer.C:
case <-meek.runCtx.Done():
delayTimer.Stop()
return 0, common.ContextError(err)
}
// Increase the next delay, to back off and avoid excessive
// activity in conditions such as no network connectivity.
retryDelay = time.Duration(
float64(retryDelay) * retryMultiplier)
if retryDelay >= retryMaxDelay {
retryDelay = retryMaxDelay
}
}
return receivedPayloadSize, nil
}
// Add additional headers to the HTTP request using the same method we use for adding
// custom headers to HTTP proxy requests.
func (meek *MeekConn) addAdditionalHeaders(request *http.Request) {
for name, value := range meek.additionalHeaders {
// hack around special case of "Host" header
// https://golang.org/src/net/http/request.go#L474
// using URL.Opaque, see URL.RequestURI() https://golang.org/src/net/url/url.go#L915
if name == "Host" {
if len(value) > 0 {
if request.URL.Opaque == "" {
request.URL.Opaque = request.URL.Scheme + "://" + request.Host + request.URL.RequestURI()
}
request.Host = value[0]
}
} else {
request.Header[name] = value
}
}
}
// readPayload reads the HTTP response in chunks, making the read buffer available
// to MeekConn.Read() calls after each chunk; the intention is to allow bytes to
// flow back to the reader as soon as possible instead of buffering the entire payload.
//
// When readPayload returns an error, the totalSize output is remains valid -- it's the
// number of payload bytes successfully read and relayed.
func (meek *MeekConn) readPayload(
receivedPayload io.ReadCloser) (totalSize int64, err error) {
defer receivedPayload.Close()
totalSize = 0
for {
reader := io.LimitReader(receivedPayload, int64(meek.readPayloadChunkLength))
// Block until there is capacity in the receive buffer
var receiveBuffer *bytes.Buffer
select {
case receiveBuffer = <-meek.emptyReceiveBuffer:
case receiveBuffer = <-meek.partialReceiveBuffer:
case <-meek.runCtx.Done():
return 0, nil
}
// Note: receiveBuffer size may exceed meek.fullReceiveBufferLength by up to the size
// of one received payload. The meek.fullReceiveBufferLength value is just a guideline.
n, err := receiveBuffer.ReadFrom(reader)
meek.replaceReceiveBuffer(receiveBuffer)
totalSize += n
if err != nil {
return totalSize, common.ContextError(err)
}
if n == 0 {
break
}
}
return totalSize, nil
}
// makeCookie creates the cookie to be sent with initial meek HTTP request. The cookies
// contains obfuscated metadata, including meek version and other protocol information.
//
// In round tripper mode, the cookie contains the destination endpoint for the round
// trip request.
//
// In relay mode, the server will create a session using these values and send the session ID
// back to the client via Set-Cookie header. The client must use that value with
// all consequent HTTP requests.
//
// In plain HTTP meek protocols, the cookie is visible over the adversary network, so the
// cookie is encrypted and obfuscated.
//
// Obsolete meek cookie fields used by the legacy server stack are no longer sent. These
// include ServerAddress and SessionID.
func makeMeekCookie(
clientParameters *parameters.ClientParameters,
meekCookieEncryptionPublicKey string,
meekObfuscatedKey string,
meekObfuscatorPaddingPRNGSeed *prng.Seed,
clientTunnelProtocol string,
endPoint string,
) (cookie *http.Cookie, err error) {
cookieData := &protocol.MeekCookieData{
MeekProtocolVersion: MEEK_PROTOCOL_VERSION,
ClientTunnelProtocol: clientTunnelProtocol,
EndPoint: endPoint,
}
serializedCookie, err := json.Marshal(cookieData)
if err != nil {
return nil, common.ContextError(err)
}
// Encrypt the JSON data
// NaCl box is used for encryption. The peer public key comes from the server entry.
// Nonce is always all zeros, and is not sent in the cookie (the server also uses an all-zero nonce).
// http://nacl.cace-project.eu/box.html:
// "There is no harm in having the same nonce for different messages if the {sender, receiver} sets are
// different. This is true even if the sets overlap. For example, a sender can use the same nonce for two
// different messages if the messages are sent to two different public keys."
var nonce [24]byte
var publicKey [32]byte
decodedPublicKey, err := base64.StdEncoding.DecodeString(meekCookieEncryptionPublicKey)
if err != nil {
return nil, common.ContextError(err)
}
copy(publicKey[:], decodedPublicKey)
ephemeralPublicKey, ephemeralPrivateKey, err := box.GenerateKey(rand.Reader)
if err != nil {
return nil, common.ContextError(err)
}
box := box.Seal(nil, serializedCookie, &nonce, &publicKey, ephemeralPrivateKey)
encryptedCookie := make([]byte, 32+len(box))
copy(encryptedCookie[0:32], ephemeralPublicKey[0:32])
copy(encryptedCookie[32:], box)
maxPadding := clientParameters.Get().Int(parameters.MeekCookieMaxPadding)
// Obfuscate the encrypted data
obfuscator, err := obfuscator.NewClientObfuscator(
&obfuscator.ObfuscatorConfig{
Keyword: meekObfuscatedKey,
PaddingPRNGSeed: meekObfuscatorPaddingPRNGSeed,
MaxPadding: &maxPadding})
if err != nil {
return nil, common.ContextError(err)
}
obfuscatedCookie := obfuscator.SendSeedMessage()
seedLen := len(obfuscatedCookie)
obfuscatedCookie = append(obfuscatedCookie, encryptedCookie...)
obfuscator.ObfuscateClientToServer(obfuscatedCookie[seedLen:])
cookieNamePRNG, err := obfuscator.GetDerivedPRNG("meek-cookie-name")
if err != nil {
return nil, common.ContextError(err)
}
// Format the HTTP cookie
// The format is <random letter 'A'-'Z'>=<base64 data>, which is intended to match common cookie formats.
A := int('A')
Z := int('Z')
// letterIndex is integer in range [int('A'), int('Z')]
letterIndex := cookieNamePRNG.Intn(Z - A + 1)
return &http.Cookie{
Name: string(byte(A + letterIndex)),
Value: base64.StdEncoding.EncodeToString(obfuscatedCookie)},
nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/lqinggang/psiphon-tunnel-core.git
git@gitee.com:lqinggang/psiphon-tunnel-core.git
lqinggang
psiphon-tunnel-core
psiphon-tunnel-core
v2.0.2

搜索帮助