4 Star 0 Fork 0

wanttobeamaster/elasticell

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
proxy.go 15.21 KB
一键复制 编辑 原始数据 按行查看 历史
wanttobeamaster 提交于 2021-04-27 14:37 +08:00 . add timeout version
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675
package proxy
import (
"context"
"encoding/json"
"fmt"
"io"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"gitee.com/wanttobeamaster/elasticell/pkg/pb/metapb"
"gitee.com/wanttobeamaster/elasticell/pkg/pb/pdpb"
"gitee.com/wanttobeamaster/elasticell/pkg/pb/raftcmdpb"
"gitee.com/wanttobeamaster/elasticell/pkg/pd"
"gitee.com/wanttobeamaster/elasticell/pkg/pdapi"
"gitee.com/wanttobeamaster/elasticell/pkg/util"
"github.com/fagongzi/goetty"
"github.com/fagongzi/goetty/protocol/redis"
"github.com/fagongzi/log"
"github.com/fagongzi/util/task"
"github.com/fagongzi/util/uuid"
"github.com/pkg/errors"
)
const (
batch = 64
shortForm = "2006-01-02 15:04:05.000000"
)
var (
pingReq = &raftcmdpb.Request{
Cmd: [][]byte{[]byte("ping")},
}
)
type req struct {
rs *redisSession
raftReq *raftcmdpb.Request
retries int
// xiaoxiao : add resp ok
rspOk chan bool
}
func newReqUUID(id []byte, cmd redis.Command, rs *redisSession) *req {
r := &req{
raftReq: &raftcmdpb.Request{
UUID: id,
Cmd: cmd,
},
rs: rs,
retries: 0,
rspOk: make(chan bool , 1), //xiaoxiaoxiao
}
return r
}
func newReq(cmd redis.Command, rs *redisSession) *req {
return newReqUUID(newID(), cmd, rs)
}
func newID() []byte {
return uuid.NewV4().Bytes()
}
func (r *req) errorDone(err error) {
if r.rs != nil {
r.rs.errorResp(err)
r.rspOk <- true // xiaoxiaoxiao
}
}
func (r *req) done(rsp *raftcmdpb.Response) {
if r.rs != nil {
r.rs.resp(rsp)
r.rspOk <- true // xiaoxiaoxiao
}
}
// RedisProxy is a redis proxy
type RedisProxy struct {
sync.RWMutex
cfg *Cfg
svr *goetty.Server // 6379
pdClient *pd.Client // pd client
watcher *pd.Watcher // pd client and 6380
aggregationCmds map[string]func(*redisSession, redis.Command) (bool, error)
supportCmds map[string]struct{}
keyConvertFun func([]byte, func([]byte) metapb.Cell) metapb.Cell
ranges *util.CellTree
stores map[uint64]*metapb.Store
cellLeaderAddrs map[uint64]string // cellid -> leader peer store addr
bcs map[string]*backend // store addr -> netconn
routing *routing // uuid -> session
syncEpoch uint64
reqs []*task.Queue
retries *task.Queue
pings chan string
bcAddrs []string // store addrs
rrNext int64 // round robin of bcAddrs
ctx context.Context
cancel context.CancelFunc
stopOnce sync.Once
stopWG sync.WaitGroup
stopC chan struct{}
}
// NewRedisProxy returns a redisp proxy
func NewRedisProxy(cfg *Cfg) *RedisProxy {
client, err := pd.NewClient(fmt.Sprintf("proxy-%s", cfg.Addr), cfg.PDAddrs...)
if err != nil {
log.Fatalf("bootstrap: create pd client failed, errors:\n%+v", err)
}
redisSvr := goetty.NewServer(cfg.Addr,
goetty.WithServerDecoder(redis.NewRedisDecoder()),
goetty.WithServerEncoder(goetty.NewEmptyEncoder()))
watcher := pd.NewWatcher(client,
cfg.AddrNotify,
cfg.Advertise_AddrNotify,
pd.EventFlagCell|pd.EventFlagStore|pd.EventInit,
time.Duration(cfg.WatcherHeartbeatSec)*time.Second)
p := &RedisProxy{
pdClient: client,
cfg: cfg,
svr: redisSvr,
watcher: watcher,
aggregationCmds: make(map[string]func(*redisSession, redis.Command) (bool, error)),
supportCmds: make(map[string]struct{}),
routing: newRouting(),
ranges: util.NewCellTree(),
stores: make(map[uint64]*metapb.Store),
cellLeaderAddrs: make(map[uint64]string),
bcs: make(map[string]*backend),
stopC: make(chan struct{}),
reqs: make([]*task.Queue, cfg.WorkerCount),
retries: &task.Queue{},
pings: make(chan string),
bcAddrs: make([]string, 0),
}
p.init()
return p
}
// Start starts the proxy
func (p *RedisProxy) Start() error {
go p.listenToStop()
go p.readyToHandleReq(p.ctx)
return p.svr.Start(p.doConnection)
}
// Stop stop the proxy
func (p *RedisProxy) Stop() {
p.stopWG.Add(1)
p.stopC <- struct{}{}
p.stopWG.Wait()
}
func (p *RedisProxy) listenToStop() {
<-p.stopC
p.doStop()
}
func (p *RedisProxy) init() {
p.ctx, p.cancel = context.WithCancel(context.TODO())
p.initKeyConvert()
p.initSupportCMDs()
p.initWatcher()
p.initQueues()
}
func (p *RedisProxy) doStop() {
p.stopOnce.Do(func() {
defer p.stopWG.Done()
log.Infof("stop: start to stop redis proxy")
for _, bc := range p.bcs {
bc.close(true)
log.Infof("stop: store connection closed, addr=<%s>", bc.addr)
}
p.watcher.Stop()
p.svr.Stop()
log.Infof("stop: tcp listen stopped")
p.cancel()
})
}
func (p *RedisProxy) initWatcher() {
err := p.watcher.Start()
if err != nil {
log.Fatalf("bootstrap: init watcher failed, errors:\n%+v", err)
}
go func() { // xiaoxiao ad (p (RedisProxy)
for {
event, err := p.watcher.Ready()
// TODO(xzq) : in initNotify and syncNotify , this two function will generate Ready()
//log.Info("xiaoxiao : proxy.go initWatcher : p.watcher.Ready() is ok again (refreshRanges refreStores)") // will not update all the time
if err != nil {
return
}
switch event.Event {
case pd.EventInit:
p.refreshRanges()
p.refreshStores()
case pd.EventCellCreated:
p.refreshRange(event.CellEvent.Range)
case pd.EventCellLeaderChanged:
p.refreshRange(event.CellEvent.Range)
case pd.EventCellRangeChaned:
p.refreshRange(event.CellEvent.Range)
case pd.EventCellPeersChaned:
p.refreshRange(event.CellEvent.Range)
case pd.EventStoreUp:
case pd.EventStoreDown:
case pd.EventStoreTombstone:
}
}
}()
}
func (p *RedisProxy) initQueues() {
for index := uint64(0); index < p.cfg.WorkerCount; index++ {
p.reqs[index] = &task.Queue{}
}
}
func (p *RedisProxy) doConnection(session goetty.IOSession) error {
addr := session.RemoteAddr()
log.Infof("redis-[%s]: connected", addr)
// every client has 2 goroutines, read, write
rs := newSession(session , time.Duration(p.cfg.Timeout) * time.Second)
go rs.writeLoop()
defer rs.close()
for {
r, err := session.Read()
if err != nil {
if err == io.EOF {
return nil
}
log.Errorf("redis-[%s]: read from cli failed, errors\n %+v",
addr,
err)
return err
}
cmd := r.(redis.Command)
if log.DebugEnabled() {
log.Debugf("redis-[%s]: read a cmd: %s", addr, cmd.ToString())
}
cmdStr := cmd.CmdString()
_, ok := p.supportCmds[cmdStr]
if !ok {
rs.errorResp(fmt.Errorf("command not support: %s", cmdStr))
continue
}
if fn, ok := p.aggregationCmds[cmdStr]; ok {
need, err := fn(rs, cmd)
if err != nil {
rs.errorResp(err)
continue
}
if need {
continue
}
}
p.addToForward(newReq(cmd, rs)) //
}
}
func (p *RedisProxy) initKeyConvert() {
rsp, err := p.pdClient.GetInitParams(context.TODO(), new(pdpb.GetInitParamsReq))
if err != nil {
log.Fatalf("bootstrap: get init params failed, errors:\n%+v", err)
}
params := &pdapi.InitParams{
InitCellCount: 1,
}
if len(rsp.Params) > 0 {
err = json.Unmarshal(rsp.Params, params)
if err != nil {
log.Fatalf("bootstrap: create pd client failed, errors:\n%+v", err)
}
}
if params.InitCellCount > 1 {
p.keyConvertFun = util.Uint64Convert
} else {
p.keyConvertFun = util.NoConvert
}
}
func (p *RedisProxy) initSupportCMDs() {
for _, cmd := range p.cfg.SupportCMDs {
p.supportCmds[cmd] = struct{}{}
}
// kv
p.aggregationCmds["mget"] = p.doMGet
// bitmap
p.aggregationCmds["bmand"] = p.doBMAnd
p.aggregationCmds["bmor"] = p.doBMOr
p.aggregationCmds["bmxor"] = p.doBMXor
p.aggregationCmds["bmandnot"] = p.doBMAndNot
}
func (p *RedisProxy) refreshStores() {
var stores []string
var rsp *pdpb.ListStoreRsp
var err error
if rsp, err = p.pdClient.ListStore(context.TODO(), &pdpb.ListStoreReq{}); err != nil {
log.Errorf("ListStore failed with error\n%+v", err)
}
stores = make([]string, 0)
for _, s := range rsp.Stores {
stores = append(stores, s.ClientAddress)
}
sort.Strings(stores)
p.Lock()
p.bcAddrs = stores
p.Unlock()
}
func (p *RedisProxy) refreshRanges() {
old := p.getSyncEpoch()
log.Infof("pd-sync: try to sync, epoch=<%d>", old)
if old < p.syncEpoch {
log.Infof("pd-sync: already sync, skip, old=<%d> new=<%d>", old, p.syncEpoch)
return
}
p.Lock()
rsp, err := p.pdClient.GetLastRanges(context.TODO(), &pdpb.GetLastRangesReq{})
if err != nil {
log.Fatalf("bootstrap: get cell ranges from pd failed, errors:\n%+v", err)
}
p.clean()
for _, r := range rsp.Ranges {
p.doRefreshRange(r)
}
p.syncEpoch++
log.Infof("pd-sync: sync complete, epoch=%d", p.syncEpoch)
p.Unlock()
}
func (p *RedisProxy) refreshRange(r *pdpb.Range) {
p.Lock()
p.doRefreshRange(r)
p.syncEpoch++
p.Unlock()
}
func (p *RedisProxy) doRefreshRange(r *pdpb.Range) {
p.ranges.Update(r.Cell)
p.cellLeaderAddrs[r.Cell.ID] = r.LeaderStore.ClientAddress
p.stores[r.LeaderStore.ID] = &r.LeaderStore
}
func (p *RedisProxy) clean() {
p.stores = make(map[uint64]*metapb.Store)
}
func (p *RedisProxy) getSyncEpoch() uint64 {
p.RLock()
v := p.syncEpoch
p.RUnlock()
return v
}
func (p *RedisProxy) addToPing(target string) {
p.pings <- target
}
func (p *RedisProxy) retry(r *req) {
r.retries++
p.retries.Put(r)
}
// addToForward : append to reqs again (this time the Epoch is upd)
// called by "doConnection()" and "readyToHandleReq()"
func (p *RedisProxy) addToForward(r *req) {
if r.raftReq == nil {
log.Fatalf("bug: raft req cannot be nil")
}
// xiaoxiao : set timeout
go func(r *req){
ticker := time.NewTicker(r.rs.timeout)
defer ticker.Stop()
select {
case ok := <- r.rspOk: // when response correctly , we need return (ticker.Stop()) correctly
if ok {
close(r.rspOk)
return
}
case <- ticker.C:
rsp := &raftcmdpb.Response {
UUID: r.raftReq.UUID,
Type: r.raftReq.Type,
SessionID: r.raftReq.SessionID,
ErrorResult: []byte("Timeout"),
OriginRequest: r.raftReq,
}
p.routing.put(r.raftReq.UUID, r)
rr := p.routing.delete(r.raftReq.UUID)
rr.done(rsp)
close(r.rspOk)
return
}
}(r)
if r.retries == 0 {
r.raftReq.Epoch = p.getSyncEpoch()
}
if len(r.raftReq.Cmd) <= 1 {
p.reqs[0].Put(r)
return
}
c := p.ranges.Search(r.raftReq.Cmd[1]) // search the cell to handle this cmd key
index := (p.cfg.WorkerCount - 1) & c.ID
p.reqs[index].Put(r)
}
func (p *RedisProxy) readyToHandleReq(ctx context.Context) {
for _, q := range p.reqs {
go func(q *task.Queue) {
log.Infof("bootstrap: handle redis command started")
items := make([]interface{}, batch, batch)
for {
n, err := q.Get(batch, items)
if nil != err {
log.Infof("stop: handle redis command stopped")
return
}
for i := int64(0); i < n; i++ {
r := items[i].(*req)
p.handleReq(r)
}
}
}(q)
}
go func() {
log.Infof("bootstrap: handle redis retries command started")
items := make([]interface{}, batch, batch)
for {
n, err := p.retries.Get(batch, items)
if nil != err {
log.Infof("stop: handle redis retries command stopped")
return
}
failed := 0
for i := int64(0); i < n; i++ {
r := items[i].(*req)
if r.raftReq.Epoch < p.getSyncEpoch() { // must update the r,because the Epoch has increment,so the r may need update.
p.addToForward(r)
} else {
// when in Follower situation , this whill run all the time , so , we need stop it when it run specific times
p.retries.Put(r) // if didn't update the Epoch, it mean nothing has changed , retry again.
// TODO(xzq) : add maxRetry times!
failed++
}
}
if failed > 0 {
time.Sleep(time.Millisecond * 10) // didn't update , sleep 10 ms
}
}
}()
for {
select {
case <-ctx.Done():
for _, q := range p.reqs {
q.Dispose()
}
close(p.pings)
log.Infof("stop: handle redis command stopped")
return
case target := <-p.pings:
if target != "" {
p.forwardTo(target, &req{
raftReq: pingReq,
})
}
}
}
}
// HandleReq : called by "readyHandleReq()" , which is " for {} " to *Req
func (p *RedisProxy) handleReq(r *req) {
if r.retries > 0 {
// If epoch is not stale, wait next
//if r.raftReq.Epoch >= p.getSyncEpoch() { xiaoxiao : changes >= to >
if r.raftReq.Epoch > p.getSyncEpoch() { //xiaoxiao : changes >= to >
p.retries.Put(r)
return
}
}
target := ""
var cellID uint64
if len(r.raftReq.Cmd) <= 1 {
target = p.getRandomStoreAddr()
} else if strings.ToLower(string(r.raftReq.Cmd[0])) == "query" {
log.Info("xiaoxiao : proxy.go 534 : query cmd")
target = p.getRRStoreAddr()
} else {
if len(r.raftReq.Cmd) >= 3 && string(r.raftReq.Cmd[1]) == "testtime" && strings.ToLower(string(r.raftReq.Cmd[0])) == "set" {
t := time.Now()
temp := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.Local)
timestap := temp.Format(shortForm)
timestap += "/"
r.raftReq.Cmd[2] = append(r.raftReq.Cmd[2], []byte(timestap)...)
}
target, cellID = p.getLeaderStoreAddr(r.raftReq.Cmd[1])
}
if log.DebugEnabled() {
log.Debugf("req: handle req, uuid=<%+v>, cell=<%d>, bc=<%s> times=<%d> cmd=<%v>",
r.raftReq.UUID,
cellID,
target,
r.retries,
r.raftReq.Cmd)
}
if target == "" {
log.Debugf("req: leader not found for key, uuid=<%+v> key=<%v>",
r.raftReq.UUID,
r.raftReq.Cmd[1])
p.retry(r)
return
}
err := p.forwardTo(target, r)
if err != nil {
log.Errorf("req: forward failed, uuid=<%+v> error=<%s>",
r.raftReq.UUID,
err)
p.retry(r)
return
}
}
// ForwardTo : connect addr and addReq to the **backend** queue
func (p *RedisProxy) forwardTo(addr string, r *req) error {
bc, err := p.getConn(addr)
if err != nil {
log.Info("xiaoxiao : proxy.go : forwardTo() : p.getConn(addr) " , err)
return errors.Wrapf(err, "getConn")
}
r.raftReq.Epoch = p.getSyncEpoch()
if nil != r.rs {
p.routing.put(r.raftReq.UUID, r)
}
if nil != r.raftReq && len(r.raftReq.UUID) > 0 {
log.Debugf("req: added to backend queue, uuid=<%+v>", r.raftReq.UUID)
}
err = bc.addReq(r)
if err != nil {
p.routing.delete(r.raftReq.UUID)
return errors.Wrapf(err, "writeTo")
}
return nil
}
func (p *RedisProxy) onResp(rsp *raftcmdpb.Response) {
r := p.routing.delete(rsp.UUID)
if r != nil {
if rsp.Type == raftcmdpb.RaftError {
p.retry(r)
return
}
r.done(rsp)
} else if len(rsp.UUID) > 0 {
log.Debugf("redis-resp: client maybe closed, ingore resp, uuid=<%+v>",
rsp.UUID)
}
}
func (p *RedisProxy) search(value []byte) metapb.Cell {
return p.ranges.Search(value)
}
func (p *RedisProxy) getLeaderStoreAddr(key []byte) (string, uint64) {
p.RLock()
cell := p.keyConvertFun(key, p.search)
addr := p.cellLeaderAddrs[cell.ID]
p.RUnlock()
return addr, cell.ID
}
func (p *RedisProxy) getRandomStoreAddr() string {
p.RLock()
var target *metapb.Cell
p.ranges.Ascend(func(cell *metapb.Cell) bool {
target = cell
return false
})
addr := p.cellLeaderAddrs[target.ID]
p.RUnlock()
return addr
}
func (p *RedisProxy) getRRStoreAddr() (target string) {
p.RLock()
total := int64(len(p.bcAddrs))
if total == 0 {
p.RUnlock()
return
}
idx := atomic.AddInt64(&p.rrNext, 1) - 1
if idx >= total {
idx %= total
}
target = p.bcAddrs[idx]
p.RUnlock()
log.Info("xiaoxiao : proxy.go 653 : getRRAtoreAddr " , idx , target )
log.Info("xiaoxiao : proxy.go 654 : getRRAtoreAddr bcaddrs " , p.bcAddrs )
log.Debugf("getRRStoreAddr picked #%d of %v", idx, p.bcAddrs)
return
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/wanttobeamaster/elasticell.git
git@gitee.com:wanttobeamaster/elasticell.git
wanttobeamaster
elasticell
elasticell
8b1bff0b0046

搜索帮助