1 Star 0 Fork 0

newlife/gnatsd

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
parser.go 13.47 KB
一键复制 编辑 原始数据 按行查看 历史
Tyler Treat 提交于 2017-06-07 17:40 . Fix data race
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738
// Copyright 2012-2014 Apcera Inc. All rights reserved.
package server
import (
"fmt"
)
type pubArg struct {
subject []byte
reply []byte
sid []byte
szb []byte
size int
}
type parseState struct {
state int
as int
drop int
pa pubArg
argBuf []byte
msgBuf []byte
scratch [MAX_CONTROL_LINE_SIZE]byte
}
// Parser constants
const (
OP_START = iota
OP_PLUS
OP_PLUS_O
OP_PLUS_OK
OP_MINUS
OP_MINUS_E
OP_MINUS_ER
OP_MINUS_ERR
OP_MINUS_ERR_SPC
MINUS_ERR_ARG
OP_C
OP_CO
OP_CON
OP_CONN
OP_CONNE
OP_CONNEC
OP_CONNECT
CONNECT_ARG
OP_P
OP_PU
OP_PUB
OP_PUB_SPC
PUB_ARG
OP_PI
OP_PIN
OP_PING
OP_PO
OP_PON
OP_PONG
MSG_PAYLOAD
MSG_END
OP_S
OP_SU
OP_SUB
OP_SUB_SPC
SUB_ARG
OP_U
OP_UN
OP_UNS
OP_UNSU
OP_UNSUB
OP_UNSUB_SPC
UNSUB_ARG
OP_M
OP_MS
OP_MSG
OP_MSG_SPC
MSG_ARG
OP_I
OP_IN
OP_INF
OP_INFO
INFO_ARG
)
func (c *client) parse(buf []byte) error {
var i int
var b byte
mcl := MAX_CONTROL_LINE_SIZE
if c.srv != nil && c.srv.getOpts() != nil {
mcl = c.srv.getOpts().MaxControlLine
}
// snapshot this, and reset when we receive a
// proper CONNECT if needed.
authSet := c.isAuthTimerSet()
// Move to loop instead of range syntax to allow jumping of i
for i = 0; i < len(buf); i++ {
b = buf[i]
switch c.state {
case OP_START:
if b != 'C' && b != 'c' && authSet {
goto authErr
}
switch b {
case 'P', 'p':
c.state = OP_P
case 'S', 's':
c.state = OP_S
case 'U', 'u':
c.state = OP_U
case 'M', 'm':
if c.typ == CLIENT {
goto parseErr
} else {
c.state = OP_M
}
case 'C', 'c':
c.state = OP_C
case 'I', 'i':
c.state = OP_I
case '+':
c.state = OP_PLUS
case '-':
c.state = OP_MINUS
default:
goto parseErr
}
case OP_P:
switch b {
case 'U', 'u':
c.state = OP_PU
case 'I', 'i':
c.state = OP_PI
case 'O', 'o':
c.state = OP_PO
default:
goto parseErr
}
case OP_PU:
switch b {
case 'B', 'b':
c.state = OP_PUB
default:
goto parseErr
}
case OP_PUB:
switch b {
case ' ', '\t':
c.state = OP_PUB_SPC
default:
goto parseErr
}
case OP_PUB_SPC:
switch b {
case ' ', '\t':
continue
default:
c.state = PUB_ARG
c.as = i
}
case PUB_ARG:
switch b {
case '\r':
c.drop = 1
case '\n':
var arg []byte
if c.argBuf != nil {
arg = c.argBuf
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processPub(arg); err != nil {
return err
}
c.drop, c.as, c.state = OP_START, i+1, MSG_PAYLOAD
// If we don't have a saved buffer then jump ahead with
// the index. If this overruns what is left we fall out
// and process split buffer.
if c.msgBuf == nil {
i = c.as + c.pa.size - LEN_CR_LF
}
default:
if c.argBuf != nil {
c.argBuf = append(c.argBuf, b)
}
}
case MSG_PAYLOAD:
if c.msgBuf != nil {
// copy as much as we can to the buffer and skip ahead.
toCopy := c.pa.size - len(c.msgBuf)
avail := len(buf) - i
if avail < toCopy {
toCopy = avail
}
if toCopy > 0 {
start := len(c.msgBuf)
// This is needed for copy to work.
c.msgBuf = c.msgBuf[:start+toCopy]
copy(c.msgBuf[start:], buf[i:i+toCopy])
// Update our index
i = (i + toCopy) - 1
} else {
// Fall back to append if needed.
c.msgBuf = append(c.msgBuf, b)
}
if len(c.msgBuf) >= c.pa.size {
c.state = MSG_END
}
} else if i-c.as >= c.pa.size {
c.state = MSG_END
}
case MSG_END:
switch b {
case '\n':
if c.msgBuf != nil {
c.msgBuf = append(c.msgBuf, b)
} else {
c.msgBuf = buf[c.as : i+1]
}
// strict check for proto
if len(c.msgBuf) != c.pa.size+LEN_CR_LF {
goto parseErr
}
c.processMsg(c.msgBuf)
c.argBuf, c.msgBuf = nil, nil
c.drop, c.as, c.state = 0, i+1, OP_START
default:
if c.msgBuf != nil {
c.msgBuf = append(c.msgBuf, b)
}
continue
}
case OP_S:
switch b {
case 'U', 'u':
c.state = OP_SU
default:
goto parseErr
}
case OP_SU:
switch b {
case 'B', 'b':
c.state = OP_SUB
default:
goto parseErr
}
case OP_SUB:
switch b {
case ' ', '\t':
c.state = OP_SUB_SPC
default:
goto parseErr
}
case OP_SUB_SPC:
switch b {
case ' ', '\t':
continue
default:
c.state = SUB_ARG
c.as = i
}
case SUB_ARG:
switch b {
case '\r':
c.drop = 1
case '\n':
var arg []byte
if c.argBuf != nil {
arg = c.argBuf
c.argBuf = nil
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processSub(arg); err != nil {
return err
}
c.drop, c.as, c.state = 0, i+1, OP_START
default:
if c.argBuf != nil {
c.argBuf = append(c.argBuf, b)
}
}
case OP_U:
switch b {
case 'N', 'n':
c.state = OP_UN
default:
goto parseErr
}
case OP_UN:
switch b {
case 'S', 's':
c.state = OP_UNS
default:
goto parseErr
}
case OP_UNS:
switch b {
case 'U', 'u':
c.state = OP_UNSU
default:
goto parseErr
}
case OP_UNSU:
switch b {
case 'B', 'b':
c.state = OP_UNSUB
default:
goto parseErr
}
case OP_UNSUB:
switch b {
case ' ', '\t':
c.state = OP_UNSUB_SPC
default:
goto parseErr
}
case OP_UNSUB_SPC:
switch b {
case ' ', '\t':
continue
default:
c.state = UNSUB_ARG
c.as = i
}
case UNSUB_ARG:
switch b {
case '\r':
c.drop = 1
case '\n':
var arg []byte
if c.argBuf != nil {
arg = c.argBuf
c.argBuf = nil
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processUnsub(arg); err != nil {
return err
}
c.drop, c.as, c.state = 0, i+1, OP_START
default:
if c.argBuf != nil {
c.argBuf = append(c.argBuf, b)
}
}
case OP_PI:
switch b {
case 'N', 'n':
c.state = OP_PIN
default:
goto parseErr
}
case OP_PIN:
switch b {
case 'G', 'g':
c.state = OP_PING
default:
goto parseErr
}
case OP_PING:
switch b {
case '\n':
c.processPing()
c.drop, c.state = 0, OP_START
}
case OP_PO:
switch b {
case 'N', 'n':
c.state = OP_PON
default:
goto parseErr
}
case OP_PON:
switch b {
case 'G', 'g':
c.state = OP_PONG
default:
goto parseErr
}
case OP_PONG:
switch b {
case '\n':
c.processPong()
c.drop, c.state = 0, OP_START
}
case OP_C:
switch b {
case 'O', 'o':
c.state = OP_CO
default:
goto parseErr
}
case OP_CO:
switch b {
case 'N', 'n':
c.state = OP_CON
default:
goto parseErr
}
case OP_CON:
switch b {
case 'N', 'n':
c.state = OP_CONN
default:
goto parseErr
}
case OP_CONN:
switch b {
case 'E', 'e':
c.state = OP_CONNE
default:
goto parseErr
}
case OP_CONNE:
switch b {
case 'C', 'c':
c.state = OP_CONNEC
default:
goto parseErr
}
case OP_CONNEC:
switch b {
case 'T', 't':
c.state = OP_CONNECT
default:
goto parseErr
}
case OP_CONNECT:
switch b {
case ' ', '\t':
continue
default:
c.state = CONNECT_ARG
c.as = i
}
case CONNECT_ARG:
switch b {
case '\r':
c.drop = 1
case '\n':
var arg []byte
if c.argBuf != nil {
arg = c.argBuf
c.argBuf = nil
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processConnect(arg); err != nil {
return err
}
c.drop, c.state = 0, OP_START
// Reset notion on authSet
authSet = c.isAuthTimerSet()
default:
if c.argBuf != nil {
c.argBuf = append(c.argBuf, b)
}
}
case OP_M:
switch b {
case 'S', 's':
c.state = OP_MS
default:
goto parseErr
}
case OP_MS:
switch b {
case 'G', 'g':
c.state = OP_MSG
default:
goto parseErr
}
case OP_MSG:
switch b {
case ' ', '\t':
c.state = OP_MSG_SPC
default:
goto parseErr
}
case OP_MSG_SPC:
switch b {
case ' ', '\t':
continue
default:
c.state = MSG_ARG
c.as = i
}
case MSG_ARG:
switch b {
case '\r':
c.drop = 1
case '\n':
var arg []byte
if c.argBuf != nil {
arg = c.argBuf
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processMsgArgs(arg); err != nil {
return err
}
c.drop, c.as, c.state = 0, i+1, MSG_PAYLOAD
// jump ahead with the index. If this overruns
// what is left we fall out and process split
// buffer.
i = c.as + c.pa.size - 1
default:
if c.argBuf != nil {
c.argBuf = append(c.argBuf, b)
}
}
case OP_I:
switch b {
case 'N', 'n':
c.state = OP_IN
default:
goto parseErr
}
case OP_IN:
switch b {
case 'F', 'f':
c.state = OP_INF
default:
goto parseErr
}
case OP_INF:
switch b {
case 'O', 'o':
c.state = OP_INFO
default:
goto parseErr
}
case OP_INFO:
switch b {
case ' ', '\t':
continue
default:
c.state = INFO_ARG
c.as = i
}
case INFO_ARG:
switch b {
case '\r':
c.drop = 1
case '\n':
var arg []byte
if c.argBuf != nil {
arg = c.argBuf
c.argBuf = nil
} else {
arg = buf[c.as : i-c.drop]
}
if err := c.processInfo(arg); err != nil {
return err
}
c.drop, c.as, c.state = 0, i+1, OP_START
default:
if c.argBuf != nil {
c.argBuf = append(c.argBuf, b)
}
}
case OP_PLUS:
switch b {
case 'O', 'o':
c.state = OP_PLUS_O
default:
goto parseErr
}
case OP_PLUS_O:
switch b {
case 'K', 'k':
c.state = OP_PLUS_OK
default:
goto parseErr
}
case OP_PLUS_OK:
switch b {
case '\n':
c.drop, c.state = 0, OP_START
}
case OP_MINUS:
switch b {
case 'E', 'e':
c.state = OP_MINUS_E
default:
goto parseErr
}
case OP_MINUS_E:
switch b {
case 'R', 'r':
c.state = OP_MINUS_ER
default:
goto parseErr
}
case OP_MINUS_ER:
switch b {
case 'R', 'r':
c.state = OP_MINUS_ERR
default:
goto parseErr
}
case OP_MINUS_ERR:
switch b {
case ' ', '\t':
c.state = OP_MINUS_ERR_SPC
default:
goto parseErr
}
case OP_MINUS_ERR_SPC:
switch b {
case ' ', '\t':
continue
default:
c.state = MINUS_ERR_ARG
c.as = i
}
case MINUS_ERR_ARG:
switch b {
case '\r':
c.drop = 1
case '\n':
var arg []byte
if c.argBuf != nil {
arg = c.argBuf
c.argBuf = nil
} else {
arg = buf[c.as : i-c.drop]
}
c.processErr(string(arg))
c.drop, c.as, c.state = 0, i+1, OP_START
default:
if c.argBuf != nil {
c.argBuf = append(c.argBuf, b)
}
}
default:
goto parseErr
}
}
// Check for split buffer scenarios for any ARG state.
if c.state == SUB_ARG || c.state == UNSUB_ARG || c.state == PUB_ARG ||
c.state == MSG_ARG || c.state == MINUS_ERR_ARG ||
c.state == CONNECT_ARG || c.state == INFO_ARG {
// Setup a holder buffer to deal with split buffer scenario.
if c.argBuf == nil {
c.argBuf = c.scratch[:0]
c.argBuf = append(c.argBuf, buf[c.as:i-c.drop]...)
}
// Check for violations of control line length here. Note that this is not
// exact at all but the performance hit is too great to be precise, and
// catching here should prevent memory exhaustion attacks.
if len(c.argBuf) > mcl {
c.sendErr("Maximum Control Line Exceeded")
c.closeConnection()
return ErrMaxControlLine
}
}
// Check for split msg
if (c.state == MSG_PAYLOAD || c.state == MSG_END) && c.msgBuf == nil {
// We need to clone the pubArg if it is still referencing the
// read buffer and we are not able to process the msg.
if c.argBuf == nil {
// Works also for MSG_ARG, when message comes from ROUTE.
c.clonePubArg()
}
// If we will overflow the scratch buffer, just create a
// new buffer to hold the split message.
if c.pa.size > cap(c.scratch)-len(c.argBuf) {
lrem := len(buf[c.as:])
// Consider it a protocol error when the remaining payload
// is larger than the reported size for PUB. It can happen
// when processing incomplete messages from rogue clients.
if lrem > c.pa.size+LEN_CR_LF {
goto parseErr
}
c.msgBuf = make([]byte, lrem, c.pa.size+LEN_CR_LF)
copy(c.msgBuf, buf[c.as:])
} else {
c.msgBuf = c.scratch[len(c.argBuf):len(c.argBuf)]
c.msgBuf = append(c.msgBuf, (buf[c.as:])...)
}
}
return nil
authErr:
c.authViolation()
return ErrAuthorization
parseErr:
c.sendErr("Unknown Protocol Operation")
snip := protoSnippet(i, buf)
err := fmt.Errorf("%s Parser ERROR, state=%d, i=%d: proto='%s...'",
c.typeString(), c.state, i, snip)
return err
}
func protoSnippet(start int, buf []byte) string {
stop := start + PROTO_SNIPPET_SIZE
bufSize := len(buf)
if start >= bufSize {
return `""`
}
if stop > bufSize {
stop = bufSize - 1
}
return fmt.Sprintf("%q", buf[start:stop])
}
// clonePubArg is used when the split buffer scenario has the pubArg in the existing read buffer, but
// we need to hold onto it into the next read.
func (c *client) clonePubArg() {
c.argBuf = c.scratch[:0]
c.argBuf = append(c.argBuf, c.pa.subject...)
c.argBuf = append(c.argBuf, c.pa.reply...)
c.argBuf = append(c.argBuf, c.pa.sid...)
c.argBuf = append(c.argBuf, c.pa.szb...)
c.pa.subject = c.argBuf[:len(c.pa.subject)]
if c.pa.reply != nil {
c.pa.reply = c.argBuf[len(c.pa.subject) : len(c.pa.subject)+len(c.pa.reply)]
}
if c.pa.sid != nil {
c.pa.sid = c.argBuf[len(c.pa.subject)+len(c.pa.reply) : len(c.pa.subject)+len(c.pa.reply)+len(c.pa.sid)]
}
c.pa.szb = c.argBuf[len(c.pa.subject)+len(c.pa.reply)+len(c.pa.sid):]
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/newlife/gnatsd.git
git@gitee.com:newlife/gnatsd.git
newlife
gnatsd
gnatsd
v1.0.6

搜索帮助