代码拉取完成,页面将自动刷新
同步操作将从 JUMEI_ARCH/volantmq 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
package transport
import (
"errors"
"sync"
"time"
"github.com/VolantMQ/volantmq/auth"
"github.com/VolantMQ/volantmq/clients"
"github.com/VolantMQ/volantmq/packet"
"github.com/VolantMQ/volantmq/ratelimit/type"
"github.com/VolantMQ/volantmq/routines"
"github.com/VolantMQ/volantmq/systree"
"github.com/VolantMQ/volantmq/trace"
"go.uber.org/zap"
)
// Config is base configuration object used by all transports
type Config struct {
// AuthManager
AuthManager *auth.Manager
RateLimiter RateType.RateLimiter
// Port tcp port to listen on
Port string
// Host
Host string
}
// InternalConfig used by server implementation to configure internal specific needs
type InternalConfig struct {
// AllowedVersions what protocol version server will handle
// If not set than defaults to 0x3 and 0x04
AllowedVersions map[packet.ProtocolVersion]bool
Sessions *clients.Manager
Metric systree.Metric
// ConnectTimeout The number of seconds to wait for the CONNACK message before disconnecting.
// If not set then default to 2 seconds.
ConnectTimeout int
// KeepAlive The number of seconds to keep the connection live if there's no data.
// If not set then defaults to 5 minutes.
KeepAlive int
}
type baseConfig struct {
InternalConfig
config Config
onConnection sync.WaitGroup // nolint: structcheck
onceStop sync.Once // nolint: structcheck
quit chan struct{} // nolint: structcheck
log *zap.Logger
protocol string
}
// Provider is interface that all of transports must implement
type Provider interface {
Protocol() string
Serve() error
Close() error
Port() string
}
// Port return tcp port used by transport
func (c *baseConfig) Port() string {
return c.config.Port
}
// Protocol return protocol name used by transport
func (c *baseConfig) Protocol() string {
return c.protocol
}
// handleConnection is for the broker to handle an incoming connection from a client
func (c *baseConfig) handleConnection(conn conn) {
if c == nil {
c.log.Error("Invalid connection type")
return
}
var err error
defer func() {
if err != nil {
conn.Close() // nolint: errcheck, gas
}
}()
// To establish a connection, we must
// 1. Read and decode the message.ConnectMessage from the wire
// 2. If no decoding errors, then authenticate using username and password.
// Otherwise, write out to the wire message.ConnackMessage with
// appropriate error.
// 3. If authentication is successful, then either create a new session or
// retrieve existing session
// 4. Write out to the wire a successful message.ConnackMessage message
// Read the CONNECT message from the wire, if error, then check to see if it's
// a CONNACK error. If it's CONNACK error, send the proper CONNACK error back
// to client. Exit regardless of error type.
conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(c.ConnectTimeout))) // nolint: errcheck, gas
overflow := c.config.RateLimiter.WaitToken()
var req packet.Provider
var buf []byte
if buf, err = routines.GetMessageBuffer(conn); err != nil {
c.log.Error("Couldn't get CONNECT message", zap.String("remote", conn.RemoteAddr().String()), zap.Error(err))
return
}
var reason packet.ReasonCode
var hasErrReason bool
if req, _, err = packet.Decode(packet.ProtocolV50, buf); err != nil {
if reason, hasErrReason = err.(packet.ReasonCode); !hasErrReason {
c.log.Error("Couldn't decode message", zap.String("remote", conn.RemoteAddr().String()), zap.Error(err))
return
}
}
c.Metric.Packets().Received(req.Type())
if overflow == false {
reason = packet.CodeServerBusy
hasErrReason = true
if r, ok := req.(*packet.Connect); ok {
c.log.Error("rearch rate limit",
zap.Float64("speed", c.config.RateLimiter.Limit()),
zap.Int("burst", c.config.RateLimiter.Burst()),
zap.String("clientID", string(r.ClientID())),
zap.String("remote", conn.RemoteAddr().String()),
)
}
} else {
if r, ok := req.(*packet.Connect); ok {
c.log.Debug("pass rate limit",
zap.Float64("speed", c.config.RateLimiter.Limit()),
zap.Int("burst", c.config.RateLimiter.Burst()),
zap.String("clientID", string(r.ClientID())),
zap.String("remote", conn.RemoteAddr().String()),
)
}
}
if err == nil || hasErrReason {
// Disable read deadline. Will set it later if keep-alive interval is bigger than 0
conn.SetReadDeadline(time.Time{}) // nolint: errcheck
switch r := req.(type) {
case *packet.Connect:
clientID := string(r.ClientID())
m, _ := packet.New(req.Version(), packet.CONNACK)
resp, _ := m.(*packet.ConnAck)
if hasErrReason {
username, _ := r.Credentials()
systreeConnStatus := &systree.ClientConnectStatus{
Username: string(username),
Timestamp: time.Now().Unix(),
Address: conn.RemoteAddr().String(),
Protocol: r.Version(),
ConnAckCode: reason,
CleanSession: r.IsClean(),
}
c.Sessions.Systree.Clients().Connected(clientID, systreeConnStatus)
c.Sessions.Systree.Clients().Disconnected(clientID, reason, true)
c.Metric.Packets().Sent(resp.Type())
c.log.Error("client login fail", zap.String("clientID", clientID), zap.String("reason", reason.Error()), zap.String("remote", conn.RemoteAddr().String()))
resp.SetReturnCode(reason)
routines.WriteMessage(conn, resp)
return
}
if trace.GetInstance().Status(clientID, "") {
c.log.Info("client connect", zap.String("clientID", clientID), zap.String("remote", conn.RemoteAddr().String()))
} else {
c.log.Debug("client connect", zap.String("clientID", clientID), zap.String("remote", conn.RemoteAddr().String()))
}
// If protocol version is not in allowed list then give reject and pass control to session manager
// to handle response
if allowed, ok := c.AllowedVersions[r.Version()]; !ok || !allowed {
reason = packet.CodeRefusedUnacceptableProtocolVersion
if r.Version() == packet.ProtocolV50 {
reason = packet.CodeUnsupportedProtocol
}
} else {
user, pass := r.Credentials()
if status := c.config.AuthManager.Password(string(user), string(pass)); status == auth.StatusAllow {
reason = packet.CodeSuccess
} else {
reason = packet.CodeRefusedBadUsernameOrPassword
if req.Version() == packet.ProtocolV50 {
reason = packet.CodeBadUserOrPassword
}
c.log.Warn("bad username or password", zap.String("clientID", clientID))
}
}
resp.SetReturnCode(reason) // nolint: errcheck
err = c.Sessions.NewSession(
&clients.StartConfig{
Req: r,
Resp: resp,
Conn: conn,
Auth: c.config.AuthManager,
})
if err != nil {
c.log.Error("Failed to create session.", zap.ByteString("ClientID", r.ClientID()), zap.Error(err))
}
default:
c.log.Error("Unexpected message type",
zap.String("expected", "CONNECT"),
zap.String("received", r.Type().Name()))
err = errors.New("unexpected message type")
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。