1 Star 0 Fork 27

陈先乐 / erpc

forked from andeyalee / erpc 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
plugin.go 19.68 KB
一键复制 编辑 原始数据 按行查看 历史
panguncle 提交于 2020-03-17 10:20 . fix: typo, invaild -> invalid
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662
// Copyright 2015-2019 HenryLee. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package erpc
import (
"net"
"github.com/henrylee2cn/goutil"
"github.com/henrylee2cn/goutil/errors"
)
// Plug-ins during runtime
type (
// Plugin plugin background
Plugin interface {
Name() string
}
// PreNewPeerPlugin is executed before creating peer.
PreNewPeerPlugin interface {
Plugin
PreNewPeer(*PeerConfig, *PluginContainer) error
}
// PostNewPeerPlugin is executed after creating peer.
PostNewPeerPlugin interface {
Plugin
PostNewPeer(EarlyPeer) error
}
// PostRegPlugin is executed after registering handler.
PostRegPlugin interface {
Plugin
PostReg(*Handler) error
}
// PostListenPlugin is executed between listening and accepting.
PostListenPlugin interface {
Plugin
PostListen(net.Addr) error
}
// PostDialPlugin is executed after dialing.
PostDialPlugin interface {
Plugin
PostDial(sess PreSession, isRedial bool) *Status
}
// PostAcceptPlugin is executed after accepting connection.
PostAcceptPlugin interface {
Plugin
PostAccept(PreSession) *Status
}
// PreWriteCallPlugin is executed before writing CALL message.
PreWriteCallPlugin interface {
Plugin
PreWriteCall(WriteCtx) *Status
}
// PostWriteCallPlugin is executed after successful writing CALL message.
PostWriteCallPlugin interface {
Plugin
PostWriteCall(WriteCtx) *Status
}
// PreWriteReplyPlugin is executed before writing REPLY message.
PreWriteReplyPlugin interface {
Plugin
PreWriteReply(WriteCtx) *Status
}
// PostWriteReplyPlugin is executed after successful writing REPLY message.
PostWriteReplyPlugin interface {
Plugin
PostWriteReply(WriteCtx) *Status
}
// PreWritePushPlugin is executed before writing PUSH message.
PreWritePushPlugin interface {
Plugin
PreWritePush(WriteCtx) *Status
}
// PostWritePushPlugin is executed after successful writing PUSH message.
PostWritePushPlugin interface {
Plugin
PostWritePush(WriteCtx) *Status
}
// PreReadHeaderPlugin is executed before reading message header.
PreReadHeaderPlugin interface {
Plugin
PreReadHeader(PreCtx) error
}
// PostReadCallHeaderPlugin is executed after reading CALL message header.
PostReadCallHeaderPlugin interface {
Plugin
PostReadCallHeader(ReadCtx) *Status
}
// PreReadCallBodyPlugin is executed before reading CALL message body.
PreReadCallBodyPlugin interface {
Plugin
PreReadCallBody(ReadCtx) *Status
}
// PostReadCallBodyPlugin is executed after reading CALL message body.
PostReadCallBodyPlugin interface {
Plugin
PostReadCallBody(ReadCtx) *Status
}
// PostReadPushHeaderPlugin is executed after reading PUSH message header.
PostReadPushHeaderPlugin interface {
Plugin
PostReadPushHeader(ReadCtx) *Status
}
// PreReadPushBodyPlugin is executed before reading PUSH message body.
PreReadPushBodyPlugin interface {
Plugin
PreReadPushBody(ReadCtx) *Status
}
// PostReadPushBodyPlugin is executed after reading PUSH message body.
PostReadPushBodyPlugin interface {
Plugin
PostReadPushBody(ReadCtx) *Status
}
// PostReadReplyHeaderPlugin is executed after reading REPLY message header.
PostReadReplyHeaderPlugin interface {
Plugin
PostReadReplyHeader(ReadCtx) *Status
}
// PreReadReplyBodyPlugin is executed before reading REPLY message body.
PreReadReplyBodyPlugin interface {
Plugin
PreReadReplyBody(ReadCtx) *Status
}
// PostReadReplyBodyPlugin is executed after reading REPLY message body.
PostReadReplyBodyPlugin interface {
Plugin
PostReadReplyBody(ReadCtx) *Status
}
// PostDisconnectPlugin is executed after disconnection.
PostDisconnectPlugin interface {
Plugin
PostDisconnect(BaseSession) *Status
}
)
// PluginContainer a plugin container
type PluginContainer struct {
*pluginSingleContainer
left *pluginSingleContainer
middle *pluginSingleContainer
right *pluginSingleContainer
refreshTree func()
}
// newPluginContainer new a plugin container.
func newPluginContainer() *PluginContainer {
p := &PluginContainer{
pluginSingleContainer: newPluginSingleContainer(),
left: newPluginSingleContainer(),
middle: newPluginSingleContainer(),
right: newPluginSingleContainer(),
}
p.refreshTree = func() { p.refresh() }
return p
}
func (p *PluginContainer) cloneAndAppendMiddle(plugins ...Plugin) *PluginContainer {
middle := newPluginSingleContainer()
middle.plugins = append(p.middle.GetAll(), plugins...)
newPluginContainer := newPluginContainer()
newPluginContainer.middle = middle
newPluginContainer.left = p.left
newPluginContainer.right = p.right
newPluginContainer.refresh()
oldRefreshTree := p.refreshTree
p.refreshTree = func() {
oldRefreshTree()
newPluginContainer.refresh()
}
return newPluginContainer
}
// AppendLeft appends plugins on the left side of the pluginContainer.
func (p *PluginContainer) AppendLeft(plugins ...Plugin) {
p.left.appendLeft(plugins...)
p.refreshTree()
}
// AppendRight appends plugins on the right side of the pluginContainer.
func (p *PluginContainer) AppendRight(plugins ...Plugin) {
p.right.appendRight(plugins...)
p.refreshTree()
}
// Remove removes a plugin by it's name.
func (p *PluginContainer) Remove(pluginName string) error {
err := p.pluginSingleContainer.remove(pluginName)
if err != nil {
return err
}
p.left.remove(pluginName)
p.middle.remove(pluginName)
p.right.remove(pluginName)
p.refreshTree()
return nil
}
func (p *PluginContainer) refresh() {
count := len(p.left.plugins) + len(p.middle.plugins) + len(p.right.plugins)
allPlugins := make([]Plugin, count)
copy(allPlugins[0:], p.left.plugins)
copy(allPlugins[0+len(p.left.plugins):], p.middle.plugins)
copy(allPlugins[0+len(p.left.plugins)+len(p.middle.plugins):], p.right.plugins)
m := make(map[string]bool, count)
for _, plugin := range allPlugins {
if plugin == nil {
Fatalf("plugin cannot be nil!")
return
}
if m[plugin.Name()] {
Fatalf("repeat add plugin: %s", plugin.Name())
return
}
m[plugin.Name()] = true
}
p.pluginSingleContainer.plugins = allPlugins
}
// pluginSingleContainer plugins container.
type pluginSingleContainer struct {
plugins []Plugin
}
// newPluginSingleContainer new a plugin container.
func newPluginSingleContainer() *pluginSingleContainer {
return &pluginSingleContainer{
plugins: make([]Plugin, 0),
}
}
// appendLeft appends plugins on the left side of the pluginContainer.
func (p *pluginSingleContainer) appendLeft(plugins ...Plugin) {
if len(plugins) == 0 {
return
}
p.plugins = append(plugins, p.plugins...)
}
// appendRight appends plugins on the right side of the pluginContainer.
func (p *pluginSingleContainer) appendRight(plugins ...Plugin) {
if len(plugins) == 0 {
return
}
p.plugins = append(p.plugins, plugins...)
}
// GetByName returns a plugin instance by it's name.
func (p *pluginSingleContainer) GetByName(pluginName string) Plugin {
if p.plugins == nil {
return nil
}
for _, plugin := range p.plugins {
if plugin.Name() == pluginName {
return plugin
}
}
return nil
}
// GetAll returns all activated plugins.
func (p *pluginSingleContainer) GetAll() []Plugin {
return p.plugins
}
// remove removes a plugin by it's name.
func (p *pluginSingleContainer) remove(pluginName string) error {
if p.plugins == nil {
return errors.New("no plugins are registered yet")
}
if len(pluginName) == 0 {
//return error: cannot delete an unamed plugin
return errors.New("plugin with an empty name cannot be removed")
}
indexToRemove := -1
for i, plugin := range p.plugins {
if plugin.Name() == pluginName {
indexToRemove = i
break
}
}
if indexToRemove == -1 {
return errors.New("cannot remove a plugin which isn't exists")
}
p.plugins = append(p.plugins[:indexToRemove], p.plugins[indexToRemove+1:]...)
return nil
}
// PreNewPeer executes the defined plugins before creating peer.
func (p *PluginContainer) preNewPeer(peerConfig *PeerConfig) {
var err error
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PreNewPeerPlugin); ok {
if err = _plugin.PreNewPeer(peerConfig, p); err != nil {
Fatalf("[PreNewPeerPlugin:%s] %s", plugin.Name(), err.Error())
return
}
}
}
}
// PostNewPeer executes the defined plugins after creating peer.
func (p *pluginSingleContainer) postNewPeer(peer EarlyPeer) {
var err error
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PostNewPeerPlugin); ok {
if err = _plugin.PostNewPeer(peer); err != nil {
Fatalf("[PostNewPeerPlugin:%s] %s", plugin.Name(), err.Error())
return
}
}
}
}
// PostReg executes the defined plugins before registering handler.
func (p *pluginSingleContainer) postReg(h *Handler) {
var err error
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PostRegPlugin); ok {
if err = _plugin.PostReg(h); err != nil {
Fatalf("[PostRegPlugin:%s] register handler:%s %s, error:%s", plugin.Name(), h.RouterTypeName(), h.Name(), err.Error())
return
}
}
}
}
// PostListen is executed between listening and accepting.
func (p *pluginSingleContainer) postListen(addr net.Addr) {
var err error
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PostListenPlugin); ok {
if err = _plugin.PostListen(addr); err != nil {
Fatalf("[PostListenPlugin:%s] network:%s, addr:%s, error:%s", plugin.Name(), addr.Network(), addr.String(), err.Error())
return
}
}
}
return
}
// PostDial executes the defined plugins after dialing.
func (p *pluginSingleContainer) postDial(sess PreSession, isRedial bool) (stat *Status) {
var pluginName string
defer func() {
if p := recover(); p != nil {
Errorf("[PostDialPlugin:%s] network:%s, addr:%s, panic:%v\n%s", pluginName, sess.RemoteAddr().Network(), sess.RemoteAddr().String(), p, goutil.PanicTrace(2))
stat = statDialFailed.Copy(p)
}
}()
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PostDialPlugin); ok {
pluginName = plugin.Name()
if stat = _plugin.PostDial(sess, isRedial); !stat.OK() {
Debugf("[PostDialPlugin:%s] network:%s, addr:%s, is_redial:%v, error:%s",
pluginName, sess.RemoteAddr().Network(), sess.RemoteAddr().String(), isRedial, stat.String(),
)
return stat
}
}
}
return nil
}
// PostAccept executes the defined plugins after accepting connection.
func (p *pluginSingleContainer) postAccept(sess PreSession) (stat *Status) {
var pluginName string
defer func() {
if p := recover(); p != nil {
Errorf("[PostAcceptPlugin:%s] network:%s, addr:%s, panic:%v\n%s", pluginName, sess.RemoteAddr().Network(), sess.RemoteAddr().String(), p, goutil.PanicTrace(2))
stat = statInternalServerError.Copy(p)
}
}()
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PostAcceptPlugin); ok {
pluginName = plugin.Name()
if stat = _plugin.PostAccept(sess); !stat.OK() {
Debugf("[PostAcceptPlugin:%s] network:%s, addr:%s, error:%s", pluginName, sess.RemoteAddr().Network(), sess.RemoteAddr().String(), stat.String())
return stat
}
}
}
return nil
}
// PreWriteCall executes the defined plugins before writing CALL message.
func (p *pluginSingleContainer) preWriteCall(ctx WriteCtx) *Status {
var stat *Status
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PreWriteCallPlugin); ok {
if stat = _plugin.PreWriteCall(ctx); !stat.OK() {
Debugf("[PreWriteCallPlugin:%s] %s", plugin.Name(), stat.String())
return stat
}
}
}
return nil
}
// PostWriteCall executes the defined plugins after successful writing CALL message.
func (p *pluginSingleContainer) postWriteCall(ctx WriteCtx) *Status {
var stat *Status
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PostWriteCallPlugin); ok {
if stat = _plugin.PostWriteCall(ctx); !stat.OK() {
Errorf("[PostWriteCallPlugin:%s] %s", plugin.Name(), stat.String())
return stat
}
}
}
return nil
}
// PreWriteReply executes the defined plugins before writing REPLY message.
func (p *pluginSingleContainer) preWriteReply(ctx WriteCtx) {
var stat *Status
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PreWriteReplyPlugin); ok {
if stat = _plugin.PreWriteReply(ctx); !stat.OK() {
Errorf("[PreWriteReplyPlugin:%s] %s", plugin.Name(), stat.String())
return
}
}
}
}
// PostWriteReply executes the defined plugins after successful writing REPLY message.
func (p *pluginSingleContainer) postWriteReply(ctx WriteCtx) {
var stat *Status
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PostWriteReplyPlugin); ok {
if stat = _plugin.PostWriteReply(ctx); !stat.OK() {
Errorf("[PostWriteReplyPlugin:%s] %s", plugin.Name(), stat.String())
return
}
}
}
}
// PreWritePush executes the defined plugins before writing PUSH message.
func (p *pluginSingleContainer) preWritePush(ctx WriteCtx) *Status {
var stat *Status
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PreWritePushPlugin); ok {
if stat = _plugin.PreWritePush(ctx); !stat.OK() {
Debugf("[PreWritePushPlugin:%s] %s", plugin.Name(), stat.String())
return stat
}
}
}
return nil
}
// PostWritePush executes the defined plugins after successful writing PUSH message.
func (p *pluginSingleContainer) postWritePush(ctx WriteCtx) *Status {
var stat *Status
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PostWritePushPlugin); ok {
if stat = _plugin.PostWritePush(ctx); !stat.OK() {
Errorf("[PostWritePushPlugin:%s] %s", plugin.Name(), stat.String())
return stat
}
}
}
return nil
}
// PreReadHeader executes the defined plugins before reading message header.
func (p *pluginSingleContainer) preReadHeader(ctx PreCtx) error {
var err error
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PreReadHeaderPlugin); ok {
if err = _plugin.PreReadHeader(ctx); err != nil {
Debugf("[PreReadHeaderPlugin:%s] disconnected when reading: %s", plugin.Name(), err.Error())
return err
}
}
}
return nil
}
// PostReadCallHeader executes the defined plugins after reading CALL message header.
func (p *pluginSingleContainer) postReadCallHeader(ctx ReadCtx) *Status {
var stat *Status
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PostReadCallHeaderPlugin); ok {
if stat = _plugin.PostReadCallHeader(ctx); !stat.OK() {
Errorf("[PostReadCallHeaderPlugin:%s] %s", plugin.Name(), stat.String())
return stat
}
}
}
return nil
}
// PreReadCallBody executes the defined plugins before reading CALL message body.
func (p *pluginSingleContainer) preReadCallBody(ctx ReadCtx) *Status {
var stat *Status
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PreReadCallBodyPlugin); ok {
if stat = _plugin.PreReadCallBody(ctx); !stat.OK() {
Errorf("[PreReadCallBodyPlugin:%s] %s", plugin.Name(), stat.String())
return stat
}
}
}
return nil
}
// PostReadCallBody executes the defined plugins after reading CALL message body.
func (p *pluginSingleContainer) postReadCallBody(ctx ReadCtx) *Status {
var stat *Status
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PostReadCallBodyPlugin); ok {
if stat = _plugin.PostReadCallBody(ctx); !stat.OK() {
Errorf("[PostReadCallBodyPlugin:%s] %s", plugin.Name(), stat.String())
return stat
}
}
}
return nil
}
// PostReadPushHeader executes the defined plugins after reading PUSH message header.
func (p *pluginSingleContainer) postReadPushHeader(ctx ReadCtx) *Status {
var stat *Status
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PostReadPushHeaderPlugin); ok {
if stat = _plugin.PostReadPushHeader(ctx); !stat.OK() {
Errorf("[PostReadPushHeaderPlugin:%s] %s", plugin.Name(), stat.String())
return stat
}
}
}
return nil
}
// PreReadPushBody executes the defined plugins before reading PUSH message body.
func (p *pluginSingleContainer) preReadPushBody(ctx ReadCtx) *Status {
var stat *Status
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PreReadPushBodyPlugin); ok {
if stat = _plugin.PreReadPushBody(ctx); !stat.OK() {
Errorf("[PreReadPushBodyPlugin:%s] %s", plugin.Name(), stat.String())
return stat
}
}
}
return nil
}
// PostReadPushBody executes the defined plugins after reading PUSH message body.
func (p *pluginSingleContainer) postReadPushBody(ctx ReadCtx) *Status {
var stat *Status
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PostReadPushBodyPlugin); ok {
if stat = _plugin.PostReadPushBody(ctx); !stat.OK() {
Errorf("[PostReadPushBodyPlugin:%s] %s", plugin.Name(), stat.String())
return stat
}
}
}
return nil
}
// PostReadReplyHeader executes the defined plugins after reading REPLY message header.
func (p *pluginSingleContainer) postReadReplyHeader(ctx ReadCtx) *Status {
var stat *Status
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PostReadReplyHeaderPlugin); ok {
if stat = _plugin.PostReadReplyHeader(ctx); !stat.OK() {
Errorf("[PostReadReplyHeaderPlugin:%s] %s", plugin.Name(), stat.String())
return stat
}
}
}
return nil
}
// PreReadReplyBody executes the defined plugins before reading REPLY message body.
func (p *pluginSingleContainer) preReadReplyBody(ctx ReadCtx) *Status {
var stat *Status
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PreReadReplyBodyPlugin); ok {
if stat = _plugin.PreReadReplyBody(ctx); !stat.OK() {
Errorf("[PreReadReplyBodyPlugin:%s] %s", plugin.Name(), stat.String())
return stat
}
}
}
return nil
}
// PostReadReplyBody executes the defined plugins after reading REPLY message body.
func (p *pluginSingleContainer) postReadReplyBody(ctx ReadCtx) *Status {
var stat *Status
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PostReadReplyBodyPlugin); ok {
if stat = _plugin.PostReadReplyBody(ctx); !stat.OK() {
Errorf("[PostReadReplyBodyPlugin:%s] %s", plugin.Name(), stat.String())
return stat
}
}
}
return nil
}
// PostDisconnect executes the defined plugins after disconnection.
func (p *pluginSingleContainer) postDisconnect(sess BaseSession) *Status {
var stat *Status
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PostDisconnectPlugin); ok {
if stat = _plugin.PostDisconnect(sess); !stat.OK() {
Errorf("[PostDisconnectPlugin:%s] %s", plugin.Name(), stat.String())
return stat
}
}
}
return nil
}
func warnInvalidHandlerHooks(plugin []Plugin) {
for _, p := range plugin {
switch p.(type) {
case PreNewPeerPlugin:
Debugf("invalid PreNewPeerPlugin in router: %s", p.Name())
case PostNewPeerPlugin:
Debugf("invalid PostNewPeerPlugin in router: %s", p.Name())
case PostDialPlugin:
Debugf("invalid PostDialPlugin in router: %s", p.Name())
case PostAcceptPlugin:
Debugf("invalid PostAcceptPlugin in router: %s", p.Name())
case PreWriteCallPlugin:
Debugf("invalid PreWriteCallPlugin in router: %s", p.Name())
case PostWriteCallPlugin:
Debugf("invalid PostWriteCallPlugin in router: %s", p.Name())
case PreWritePushPlugin:
Debugf("invalid PreWritePushPlugin in router: %s", p.Name())
case PostWritePushPlugin:
Debugf("invalid PostWritePushPlugin in router: %s", p.Name())
case PreReadHeaderPlugin:
Debugf("invalid PreReadHeaderPlugin in router: %s", p.Name())
case PostReadCallHeaderPlugin:
Debugf("invalid PostReadCallHeaderPlugin in router: %s", p.Name())
case PostReadPushHeaderPlugin:
Debugf("invalid PostReadPushHeaderPlugin in router: %s", p.Name())
}
}
}
Go
1
https://gitee.com/chenxianle/erpc.git
git@gitee.com:chenxianle/erpc.git
chenxianle
erpc
erpc
master

搜索帮助