2 Star 0 Fork 0

上海网仕科技 / moleculer-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
actionCatalog.go 6.20 KB
一键复制 编辑 原始数据 按行查看 历史
package registry
import (
"fmt"
"runtime/debug"
"strings"
"sync"
"gitee.com/anystreaming/moleculer-go"
"gitee.com/anystreaming/moleculer-go/payload"
"gitee.com/anystreaming/moleculer-go/service"
"gitee.com/anystreaming/moleculer-go/strategy"
log "github.com/sirupsen/logrus"
)
type ActionEntry struct {
targetNodeID string
action *service.Action
isLocal bool
service *service.Service
logger *log.Entry
}
type actionsMap map[string][]ActionEntry
type ActionCatalog struct {
actions sync.Map
logger *log.Entry
}
func CreateActionCatalog(logger *log.Entry) *ActionCatalog {
return &ActionCatalog{actions: sync.Map{}, logger: logger}
}
var actionCallRecovery = true //TODO extract this to a Config - useful to turn for Debug in tests.
type ActionError struct {
message string
stack string
action string
}
func (e *ActionError) Error() string {
return e.message
}
func (e *ActionError) Stack() string {
return e.stack
}
func (e *ActionError) Action() string {
return e.action
}
// catchActionError is called defered after invoking a local action
// if there is an error (recover () != nil) this functions log the error and stack track and encapsulate
// the error inside a moleculer.Payload
func (actionEntry *ActionEntry) catchActionError(context moleculer.BrokerContext, result chan moleculer.Payload) {
if !actionCallRecovery {
return
}
if err := recover(); err != nil {
stackTrace := string(debug.Stack())
actionEntry.logger.Error("Action failed: ", context.ActionName(), "\n[Error]: ", err, "\n[Stack Trace]: ", stackTrace)
errT, isError := err.(error)
msg := ""
if isError {
msg = errT.Error()
} else {
msg = fmt.Sprint(err)
}
result <- payload.New(&ActionError{msg, stackTrace, actionEntry.action.Name()})
}
}
func (actionEntry *ActionEntry) invokeLocalAction(context moleculer.BrokerContext) chan moleculer.Payload {
result := make(chan moleculer.Payload, 1)
actionEntry.logger.Trace("Before Invoking action: ", context.ActionName(), " params: ", context.Payload())
go func() {
defer actionEntry.catchActionError(context, result)
handler := actionEntry.action.Handler()
actionResult := handler(context.(moleculer.Context), context.Payload())
actionEntry.logger.Trace("After Invoking action: ", context.ActionName(), " result: ", actionResult)
result <- payload.New(actionResult)
}()
return result
}
func (actionEntry ActionEntry) TargetNodeID() string {
return actionEntry.targetNodeID
}
func (actionEntry ActionEntry) IsLocal() bool {
return actionEntry.isLocal
}
func (actionEntry ActionEntry) Service() *service.Service {
return actionEntry.service
}
func (actionCatalog *ActionCatalog) listByName() map[string][]ActionEntry {
result := make(map[string][]ActionEntry)
actionCatalog.actions.Range(func(key, value interface{}) bool {
result[key.(string)] = value.([]ActionEntry)
return true
})
return result
}
// Add a new action to the catalog.
func (actionCatalog *ActionCatalog) Add(action service.Action, serv *service.Service, local bool) {
entry := ActionEntry{serv.NodeID(), &action, local, serv, actionCatalog.logger}
name := action.FullName()
ver := serv.Version()
if ver != "" && !strings.HasPrefix(name, ver) {
name = service.JoinVersionToName(name, ver)
}
list, exists := actionCatalog.actions.Load(name)
if !exists {
list = []ActionEntry{entry}
} else {
list = append(list.([]ActionEntry), entry)
}
actionCatalog.actions.Store(name, list)
}
func (actionCatalog *ActionCatalog) Update(nodeID string, fullname string, updates map[string]interface{}) {
//TODO .. the only thing that can be udpated is the Action Schema (validation) and that does not exist yet
}
// RemoveByNode remove actions for the given nodeID.
func (actionCatalog *ActionCatalog) RemoveByNode(nodeID string) {
actionCatalog.actions.Range(func(key, value interface{}) bool {
name := key.(string)
actions := value.([]ActionEntry)
var toKeep []ActionEntry
for _, action := range actions {
if action.targetNodeID != nodeID {
toKeep = append(toKeep, action)
}
}
if len(toKeep) == 0 {
actionCatalog.actions.Delete(name)
} else {
actionCatalog.actions.Store(name, toKeep)
}
return true
})
}
func (actionCatalog *ActionCatalog) Remove(nodeID string, name string) {
value, exists := actionCatalog.actions.Load(name)
if !exists {
return
}
actions := value.([]ActionEntry)
var toKeep []ActionEntry
for _, action := range actions {
if action.targetNodeID != nodeID {
toKeep = append(toKeep, action)
}
}
actionCatalog.actions.Store(name, toKeep)
}
func (actionCatalog *ActionCatalog) NextFromNode(actionName string, nodeID string) *ActionEntry {
list, exists := actionCatalog.actions.Load(actionName)
if !exists {
return nil
}
actions := list.([]ActionEntry)
for _, action := range actions {
if action.targetNodeID == nodeID {
return &action
}
}
return nil
}
func (actionCatalog *ActionCatalog) printDebugActions() {
allActions := []string{}
actionCatalog.actions.Range(func(key, value interface{}) bool {
action := key.(string)
if strings.Index(action, "$node") == -1 {
allActions = append(allActions, action)
}
return true
})
fmt.Println("actions: ", strings.Join(allActions, ", "))
fmt.Println("")
}
// Next find all actions registered in this node and use the strategy to select and return the best one to be called.
func (actionCatalog *ActionCatalog) Next(actionName string, stg strategy.Strategy) *ActionEntry {
actions := actionCatalog.Find(actionName)
if actions == nil {
actionCatalog.logger.Debug("actionCatalog.Next() action not found: ", actionName, " actionCatalog.actions: ", actionCatalog.actions)
return nil
}
nodes := make([]strategy.Selector, len(actions))
for index, action := range actions {
nodes[index] = action
if action.IsLocal() {
return &action
}
}
if selected := stg.Select(nodes); selected != nil {
entry := (*selected).(ActionEntry)
return &entry
}
actionCatalog.logger.Debug("actionCatalog.Next() no entries selected for name: ", actionName, " actionCatalog.actions: ", actionCatalog.actions)
return nil
}
func (actionCatalog *ActionCatalog) Find(name string) []ActionEntry {
list, exists := actionCatalog.actions.Load(name)
if !exists {
return nil
}
return list.([]ActionEntry)
}
1
https://gitee.com/anystreaming/moleculer-go.git
git@gitee.com:anystreaming/moleculer-go.git
anystreaming
moleculer-go
moleculer-go
v0.0.7

搜索帮助