1 Star 0 Fork 0

fpy-go / plugin

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
manager.go 6.74 KB
一键复制 编辑 原始数据 按行查看 历史
晴空 提交于 2024-01-31 15:19 . 实现Grpc和rpc创建Demo
package extend
import (
"context"
"errors"
"fmt"
"gitee.com/fpy-go/plugin/pkg/extend/consts/ProtocolType"
"gitee.com/fpy-go/plugin/pkg/extend/module/grpc"
"gitee.com/fpy-go/plugin/pkg/extend/module/rpc"
"github.com/gogf/gf/v2/encoding/gyaml"
"github.com/gogf/gf/v2/frame/g"
"os/exec"
"path/filepath"
"strings"
"sync"
"github.com/hashicorp/go-plugin"
)
type PluginInfo struct {
ID string
Path string
Protocol string
Client *plugin.Client
}
func NewManager(ptype, glob, dir string, rpcPluginImpl plugin.Plugin, grpcPluginImpl plugin.Plugin) *Manager {
manager := &Manager{
Type: ptype,
Glob: glob,
Path: dir,
Plugins: map[string]*PluginInfo{},
rpcPluginImpl: rpcPluginImpl,
grpcPluginImpl: grpcPluginImpl,
}
return manager
}
// Manager 为不同类型的插件,管理的生命周期
type Manager struct {
Type string // 管理器处理的插件类型的id
Glob string // 全局的插件文件名
Path string // 插件路径
Plugins map[string]*PluginInfo // 插件信息列表
initialized bool // 是否初始化
rpcPluginImpl plugin.Plugin // RPC插件实现虚拟接口
grpcPluginImpl plugin.Plugin // GRPC插件实现虚拟接口
}
func (m *Manager) Init() error {
//发现插件绝对路径
plugins, err := plugin.Discover(m.Glob, m.Path)
if err != nil {
return err
}
//获取所有插件信息
for _, p := range plugins {
var id string
id, err = m.validPath(p)
if err != nil {
continue
}
var protocol string
if strings.HasPrefix(id, ProtocolType.RPC) {
protocol = ProtocolType.RPC
} else if strings.HasPrefix(id, ProtocolType.GRPC) {
protocol = ProtocolType.GRPC
} else {
return errors.New("插件类型(GRPC/RPC)错误!")
}
//添加到插件信息
m.Plugins[id] = &PluginInfo{
ID: id,
Path: p,
Protocol: protocol,
}
}
m.initialized = true
return nil
}
func (m *Manager) Launch() error {
for id, info := range m.Plugins {
fmt.Printf("注册插件 type=%s, id=%s, impl=%s \n", m.Type, id, info.Path)
// 创建新的客户端
// 两种方式选其一
// 以exec.Command方式启动插件进程,并创建宿主机进程和插件进程的连接
// 或者使用Reattach连接到现有进程,需提供Reattach信息
set, err := m.pluginMap(id, info.Protocol)
if err != nil {
return err
}
client := plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: rpc.HandshakeConfig,
Plugins: set,
//创建新进程,或使用Reattach连接到现有进程中
Cmd: exec.Command(info.Path),
AllowedProtocols: []plugin.Protocol{
plugin.ProtocolNetRPC, plugin.ProtocolGRPC},
})
if _, ok := m.Plugins[id]; !ok {
// 如果没有找到,忽略?
continue
}
pinfo := m.Plugins[id]
pinfo.Client = client
}
return nil
}
func (m *Manager) LaunchPlugin(p string) error {
// 必须调用manager的kill方法,关闭client和server的连接,并关闭server服务,否则删除对应的插件文件会失败
id, err := m.kill(p)
var protocol string
var config plugin.HandshakeConfig
if strings.HasPrefix(id, ProtocolType.RPC) {
protocol = ProtocolType.RPC
config = rpc.HandshakeConfig
} else if strings.HasPrefix(id, ProtocolType.GRPC) {
protocol = ProtocolType.GRPC
config = grpc.HandshakeConfig
} else {
return errors.New("插件类型(GRPC/RPC)错误!")
}
//添加到插件信息
m.Plugins[id] = &PluginInfo{
ID: id,
Path: p,
Protocol: protocol,
}
fmt.Printf("注册插件 type=%s, id=%s, impl=%s \n", m.Type, id, m.Plugins[id].Path)
// 创建新的客户端
// 两种方式选其一
// 以exec.Command方式启动插件进程,并创建宿主机进程和插件进程的连接
// 或者使用Reattach连接到现有进程,需提供Reattach信息
set, err := m.pluginMap(id, protocol)
if err != nil {
return err
}
client := plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: config,
Plugins: set,
//创建新进程,或使用Reattach连接到现有进程中
Cmd: exec.Command(p),
AllowedProtocols: []plugin.Protocol{
plugin.ProtocolNetRPC, plugin.ProtocolGRPC},
})
pinfo := m.Plugins[id]
pinfo.Client = client
return err
}
// 在重新加载插件之前必须调用此方法,否则删除对应的插件文件会失败
func (m *Manager) kill(p string) (string, error) {
id, err := m.validPath(p)
// 关闭客户端连接且关闭服务器服务
if m.Plugins[id] != nil {
m.Plugins[id].Client.Kill()
}
return id, err
}
// 验证文件路径名称是否符合规范并返回名称
func (m *Manager) validPath(p string) (string, error) {
_, file := filepath.Split(p)
globAsterix := strings.LastIndex(m.Glob, "*")
trim := m.Glob[0:globAsterix]
id := strings.TrimPrefix(file, trim)
return id, nil
}
func (m *Manager) Dispose() {
var wg sync.WaitGroup
for _, pinfo := range m.Plugins {
wg.Add(1)
go func(client *plugin.Client) {
// 关闭client,释放相关资源,终止插件子程序的运行
client.Kill()
wg.Done()
}(pinfo.Client)
}
wg.Wait()
}
func (m *Manager) GetInterface(id string) (interface{}, string, error) {
if _, ok := m.Plugins[id]; !ok {
return nil, "", errors.New("在注册的GRPC插件中找不到插件ID! " + id)
}
//获取注册插件客户端 plugin.Client
client := m.Plugins[id].Client
protocol := m.Plugins[id].Protocol
// 返回协议grpc客户端,用于后续通信
rpcClient, err := client.Client()
if err != nil {
return nil, "", err
}
// 根据指定插件名称分配新实例
raw, err := rpcClient.Dispense(id)
if err != nil {
return nil, "", err
}
return raw, protocol, nil
}
// pluginMap 插件名称到插件对象的映射关系
func (m *Manager) pluginMap(id string, protocol string) (plugin.PluginSet, error) {
if protocol == ProtocolType.RPC {
return m.rpcPluginMap(id), nil
} else if protocol == ProtocolType.GRPC {
return m.grpcPluginMap(id), nil
}
return map[string]plugin.Plugin{}, errors.New("插件类型(GRPC/RPC)错误!")
}
// rpcPluginMap 插件名称到插件对象的映射关系
func (m *Manager) rpcPluginMap(id string) map[string]plugin.Plugin {
pMap := map[string]plugin.Plugin{}
pMap[id] = m.rpcPluginImpl
return pMap
}
// grpcPluginMap 插件名称到插件对象的映射关系
func (m *Manager) grpcPluginMap(id string) map[string]plugin.Plugin {
pMap := map[string]plugin.Plugin{}
pMap[id] = m.grpcPluginImpl
return pMap
}
func getPluginsConfigData(pluginType, pluginName string) (res map[interface{}]interface{}, err error) {
key := "plugins" + pluginType + pluginName
pcgData, err := g.Redis().Do(context.TODO(), "GET", key)
err = gyaml.DecodeTo([]byte(pcgData.String()), &res)
return
}
1
https://gitee.com/fpy-go/plugin.git
git@gitee.com:fpy-go/plugin.git
fpy-go
plugin
plugin
345896415f40

搜索帮助

53164aa7 5694891 3bd8fe86 5694891