1 Star 0 Fork 2

何吕 / volantmq

forked from JUMEI_ARCH / volantmq 
Create your Gitee Account
Explore and code with more than 12 million developers,Free private repositories !:)
Sign up
Clone or Download
tcp.go 3.13 KB
Copy Edit Raw Blame History
hawklin authored 2018-05-22 16:03 . 日志
package transport
import (
"crypto/tls"
"net"
"time"
"fmt"
"github.com/VolantMQ/volantmq/configuration"
"go.uber.org/zap"
"runtime/debug"
)
// ConfigTCP configuration of tcp transport
type ConfigTCP struct {
Scheme string
CertFile string
KeyFile string
transport *Config
}
type tcp struct {
baseConfig
listener net.Listener
tlsConfig *tls.Config
}
// NewConfigTCP allocate new transport config for tcp transport
// Use of this function is preferable instead of direct allocation of ConfigTCP
func NewConfigTCP(transport *Config) *ConfigTCP {
scheme := "tcp"
if transport.Host == "0.0.0.0" {
scheme = "tcp4"
}
return &ConfigTCP{
Scheme: scheme,
transport: transport,
}
}
// NewTCP create new tcp transport
func NewTCP(config *ConfigTCP, internal *InternalConfig) (Provider, error) {
l := &tcp{}
l.quit = make(chan struct{})
l.protocol = config.Scheme
l.InternalConfig = *internal
l.config = *config.transport
l.log = configuration.GetLogger().Named("server.transport.tcp")
var err error
if config.CertFile != "" && config.KeyFile != "" {
l.tlsConfig = &tls.Config{
Certificates: make([]tls.Certificate, 1),
}
l.tlsConfig.Certificates[0], err = tls.LoadX509KeyPair(config.CertFile, config.KeyFile)
if err != nil {
l.tlsConfig = nil
return nil, err
}
}
var ln net.Listener
if ln, err = net.Listen(config.Scheme, config.transport.Host+":"+config.transport.Port); err != nil {
return nil, err
}
if l.tlsConfig != nil {
l.listener = NewTLSListener(ln, l.tlsConfig)
} else {
l.listener = ln
}
return l, nil
}
// Close
func (l *tcp) Close() error {
var err error
l.onceStop.Do(func() {
close(l.quit)
err = l.listener.Close()
l.onConnection.Wait()
})
return err
}
//func (l *tcp) Protocol() string {
// return "tcp"
//}
func (l *tcp) Serve() error {
var tempDelay time.Duration // how long to sleep on accept failure
for {
var conn net.Conn
var err error
if conn, err = l.listener.Accept(); err != nil {
// http://zhen.org/blog/graceful-shutdown-of-go-net-dot-listeners/
select {
case <-l.quit:
return nil
default:
}
// Borrowed from go1.3.3/src/pkg/net/http/server.go:1699
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
l.log.Error("Couldn't accept connection. Retrying",
zap.Error(err),
zap.Duration("retryIn", tempDelay))
time.Sleep(tempDelay)
continue
}
return err
}
l.log.Debug("accepted new connection.", zap.Stringer("client_addr", conn.RemoteAddr()))
l.onConnection.Add(1)
go func(cn net.Conn) {
defer func() {
sysErr := recover()
if sysErr != nil {
sysErr = fmt.Errorf("%v. trace: %s", sysErr, debug.Stack())
l.log.Error("connection handling routine crashed.", zap.Error(sysErr.(error)))
}
l.onConnection.Done()
}()
if conn, err := newConnTCP(cn, l.Metric.Bytes()); err != nil {
l.log.Error("Couldn't create connection interface", zap.Error(err))
} else {
l.handleConnection(conn)
}
}(conn)
}
}
Go
1
https://gitee.com/kaifazhe/volantmq.git
git@gitee.com:kaifazhe/volantmq.git
kaifazhe
volantmq
volantmq
v0.0.4

Search

53164aa7 5694891 3bd8fe86 5694891