代码拉取完成,页面将自动刷新
同步操作将从 JUMEI_ARCH/volantmq 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
package packet
import "encoding/binary"
type sizeCallback func() int
type encodeCallback func([]byte) (int, error)
type decodeCallback func([]byte) (int, error)
type header struct {
cb struct {
encode encodeCallback
decode decodeCallback
size sizeCallback
}
properties property
packetID []byte
remLen int32
mFlags byte
mType Type
version ProtocolVersion
// the unix nano timestamp on creating the publish packet, it should be unique in all packets.
createdAt int64
decodePacket int64
pubTopicTree int64
findSubscriber int64
pushQueue int64
popQueue int64
endAt int64
}
const (
offsetPacketType byte = 0x04
//offsetPublishFlagRetain byte = 0x00
offsetPublishFlagQoS byte = 0x01
//offsetPublishFlagDup byte = 0x03
offsetConnFlagWillQoS byte = 0x03
//offsetSubscribeOps byte = 0x06
//offsetSubscriptionQoS byte = 0x00
offsetSubscriptionNL byte = 0x02
offsetSubscriptionRAP byte = 0x03
offsetSubscriptionRetainHandling byte = 0x04
//offsetSubscriptionReserved byte = 0x06
)
const (
maskMessageFlags byte = 0x0F
maskConnFlagUsername byte = 0x80
maskConnFlagPassword byte = 0x40
maskConnFlagWillRetain byte = 0x20
maskConnFlagWillQos byte = 0x18
maskConnFlagWill byte = 0x04
maskConnFlagClean byte = 0x02
maskConnFlagReserved byte = 0x01
maskPublishFlagRetain byte = 0x01
maskPublishFlagQoS byte = 0x06
maskPublishFlagDup byte = 0x08
maskSubscriptionQoS byte = 0x03
maskSubscriptionNL byte = 0x04
maskSubscriptionRAP byte = 0x08
maskSubscriptionRetainHandling byte = 0x30
maskSubscriptionReserved byte = 0xC0
)
// GetCreateTimeStamp get the unixnano timestamp on creating the packet.
// (this is not part of the protocols, just for server side internal functions.)
func (h *header) GetCreateTimestamp() int64 {
return h.createdAt
}
func (h *header) SetCreateTimestamp(t int64) {
h.createdAt = t
}
func (h *header) GetDecodePacket() int64 {
return h.decodePacket
}
func (h *header) SetDecodePacket(t int64) {
h.decodePacket = t
}
func (h *header) GetPubTopicTree() int64 {
return h.pubTopicTree
}
func (h *header) SetPubTopicTree(t int64) {
h.pubTopicTree = t
}
func (h *header) GetFindSubscriber() int64 {
return h.findSubscriber
}
func (h *header) SetFindSubscriber(t int64) {
h.findSubscriber = t
}
func (h *header) GetPushQueue() int64 {
return h.pushQueue
}
func (h *header) SetPushQueue(t int64) {
h.pushQueue = t
}
func (h *header) GetPopQueue() int64 {
return h.popQueue
}
func (h *header) SetPopQueue(t int64) {
h.popQueue = t
}
func (h *header) GetEndTimestamp() int64 {
return h.endAt
}
func (h *header) SetEndTimestamp(t int64) {
h.endAt = t
}
// Name returns a string representation of the message type. Examples include
// "PUBLISH", "SUBSCRIBE", and others. This is statically defined for each of
// the message types and cannot be changed.
func (h *header) Name() string {
return h.Type().Name()
}
// Desc returns a string description of the message type. For example, a
// CONNECT message would return "Client request to connect to Server." These
// descriptions are statically defined (copied from the MQTT spec) and cannot
// be changed.
func (h *header) Desc() string {
return h.Type().Desc()
}
// Type returns the MessageType of the Message
func (h *header) Type() Type {
return h.mType
}
// Flags returns the fixed header flags for this message.
func (h *header) Flags() byte {
return h.mFlags
}
// RemainingLength returns the length of the non-fixed-header part of the message.
func (h *header) RemainingLength() int32 {
return h.remLen
}
func (h *header) Version() ProtocolVersion {
return h.version
}
func (h *header) ID() (IDType, error) {
if len(h.packetID) == 0 {
return 0, ErrNotSet
}
return IDType(binary.BigEndian.Uint16(h.packetID)), nil
}
func (h *header) Encode(to []byte) (int, error) {
expectedSize, err := h.Size()
if err != nil {
return 0, err
}
if expectedSize > len(to) {
return expectedSize, ErrInsufficientBufferSize
}
offset := 0
to[offset] = byte(h.mType<<offsetPacketType) | h.mFlags
offset++
offset += binary.PutUvarint(to[offset:], uint64(h.remLen))
var n int
n, err = h.cb.encode(to[offset:])
offset += n
return offset, err
}
func (h *header) SetVersion(v ProtocolVersion) {
h.version = v
}
// Size of message
func (h *header) Size() (int, error) {
ml := h.cb.size()
if err := h.setRemainingLength(int32(ml)); err != nil {
return 0, err
}
return h.size() + ml, nil
}
func (h *header) PropertyGet(id PropertyID) PropertyToType {
if h.version != ProtocolV50 {
return nil
}
return h.properties.Get(id)
}
func (h *header) PropertySet(id PropertyID, val interface{}) error {
if h.version != ProtocolV50 {
return ErrNotSupported
}
return h.properties.Set(h.mType, id, val)
}
func (h *header) PropertyForEach(f func(PropertyID, PropertyToType)) error {
if h.version != ProtocolV50 {
return ErrNotSupported
}
h.properties.ForEach(f)
return nil
}
func (h *header) setPacketID(id IDType) {
if len(h.packetID) == 0 {
h.packetID = make([]byte, 2)
}
binary.BigEndian.PutUint16(h.packetID, uint16(id))
}
func (h *header) decodePacketID(src []byte) int {
if len(h.packetID) == 0 {
h.packetID = make([]byte, 2)
}
return copy(h.packetID, src)
}
func (h *header) encodePacketID(dst []byte) int {
return copy(dst, h.packetID)
}
// setRemainingLength sets the length of the non-fixed-header part of the message.
// It returns error if the length is greater than 268435455, which is the max
// message length as defined by the MQTT spec.
func (h *header) setRemainingLength(remLen int32) error {
if remLen > maxRemainingLength || remLen < 0 {
return ErrInvalidLength
}
h.remLen = remLen
return nil
}
func (h *header) getHeader() *header {
return h
}
// size of header
// this function must be invoked after successful call to setRemainingLength
func (h *header) size() int {
// message type and flags byte
total := 1
return total + uvarintCalc(uint32(h.remLen))
}
// setType sets the message type of this message. It also correctly sets the
// default flags for the message type. It returns an error if the type is invalid.
func (h *header) setType(t Type) {
// Notice we don't set the message to be dirty when we are not allocating a new
// buffer. In this case, it means the buffer is probably a sub-slice of another
// slice. If that's the case, then during encoding we would have copied the whole
// backing buffer anyway.
h.mType = t
h.mFlags = t.DefaultFlags()
}
// decode reads fixed header and remaining length
// if decode successful size of decoded data provided
// if error happened offset points to error place
func (h *header) decode(from []byte) (int, error) {
offset := 0
// decode and validate fixed header
//h.mTypeFlags = src[total]
h.mType = Type(from[offset] >> offsetPacketType)
h.mFlags = from[offset] & maskMessageFlags
reject := false
// [MQTT-2.2.2-1]
if h.mType != PUBLISH && h.mFlags != h.mType.DefaultFlags() {
reject = true
} else {
if !QosType((h.mFlags & maskPublishFlagQoS) >> offsetPublishFlagQoS).IsValid() {
reject = true
}
}
if reject {
rejectCode := CodeRefusedServerUnavailable
if h.version == ProtocolV50 {
rejectCode = CodeMalformedPacket
}
return offset, rejectCode
}
offset++
remLen, m := uvarint(from[offset:])
if m <= 0 {
return offset, ErrInsufficientDataSize
}
offset += m
h.remLen = int32(remLen)
// verify if buffer has enough space for whole message
// if not return expected size
if int(h.remLen) > len(from[offset:]) {
return offset + int(h.remLen), ErrInsufficientDataSize
}
var err error
if h.cb.decode != nil {
var msgTotal int
msgTotal, err = h.cb.decode(from[offset:])
offset += msgTotal
}
return offset, err
}
// uvarint decodes a uint32 from buf and returns that value and the
// number of bytes read (> 0). If an error occurred, the value is 0
// and the number of bytes n is <= 0 meaning:
//
// n == 0: buf too small
// n < 0: value larger than 32 bits (overflow)
// and -n is the number of bytes read
//
// copied from binary.Uvariant
func uvarint(buf []byte) (uint32, int) {
var x uint32
var s uint
for i, b := range buf {
if b < 0x80 {
if i > 4 || i == 4 && b > 1 {
return 0, -(i + 1) // overflow
}
return x | uint32(b)<<s, i + 1
}
x |= uint32(b&0x7f) << s
s += 7
}
return 0, 0
}
func uvarintCalc(x uint32) int {
i := 0
for x >= 0x80 {
x >>= 7
i++
}
return i + 1
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。