# flow-engine-azkaban **Repository Path**: big-data-xwb/flow-engine-azkaban ## Basic Information - **Project Name**: flow-engine-azkaban - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2020-12-06 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 可视化作业调度平台 #### 介绍 一个基于azkaban实现的可视化的作业调度平台。 基于插件组合和连线形成一个工作流。 依赖flow-core模块。 参考了开源项目kettle及kettle-web的设计思路和部分代码。 #### 使用说明 1 界面生成 前台页面基于mxgraph拖拉拽形成一个工作流然后保存xml文件到后台数据库。 后台解析xml文件,生成meta信息。 ``` FlowMetaCode flowMetaCode = new FlowMetaCode(); FlowMeta flowMeta = (FlowMeta) flowMetaCode.decode(xml); ``` 后台调用azkaban接口完成zip包上传,执行作业,定时调度等功能。 2 代码生成 ```java JobMeta job = new JobMeta(); SetVariableMeta meat = new SetVariableMeta(); meat.setVariable("hhhh"); job.setJobMetaInterface(meat); job.setEnabled(true); JobMeta job1 = new JobMeta(); SetVariableMeta meat1 = new SetVariableMeta(); meat1.setVariable("aaaa"); job1.setJobMetaInterface(meat1); job1.setEnabled(true); HopMeta hop = new HopMeta(); hop.setFrom(job); hop.setTo(job1); FlowMeta flowMeta = new FlowMeta(); flowMeta.addVertex(job); flowMeta.addVertex(job1); flowMeta.addEdge(hop); ``` #### xml样例 ``` ``` #### 插件开发 继承JobMetaCode完成code类的开发 ``` public class ShellCode extends JobMetaCode { @Override public JobMetaInterface getJobMetaInterface() { return new ShellMeta(); } @Override public void decode(JobMeta jobMeta, mxCell cell) { ShellMeta shellMeta = new ShellMeta(); shellMeta.setText(cell.getAttribute("text")); jobMeta.setJobMetaInterface(shellMeta); } @Override public void encode(JobMeta jobMeta, Element e) { ShellMeta shellMeta = (ShellMeta) jobMeta.getJobMetaInterface(); e.setAttribute("text", shellMeta.getText()); } } ``` 实现JobMetaInterface接口完成对应meta类的开发 ```java public class ShellMeta extends BaseJobMeta implements JobMetaInterface { private String text; // shell文本 public String getText() { return text; } public void setText(String text) { this.text = text; } @Override public void setDefault() { text = "#!/bin/bash"; } @Override public void makeFiles() throws IOException { // 生成sh文件 String sh = basePath + jobId + ".sh"; try(BufferedWriter shW = new BufferedWriter(new FileWriter(sh))) { shW.write(text); } files.add(new File(sh)); // 生成job文件 String job = basePath + jobId + ".job"; try(BufferedWriter jobW = new BufferedWriter(new FileWriter(job))) { jobW.write("type=command"); jobW.newLine(); jobW.write("command=sh "+ jobId + ".sh"); } files.add(new File(job)); } } ``` #### 插件注册 1 在源代码里面开发需要使用xml方式注册。 在resource目录下的azkaban-jobs.xml下添加对应job信息 ``` Shell 执行shell脚本 脚本 ./static/graph/job/Shell.png org.mxgraph.flow.engine.azkaban.core.jobs.shell.ShellCode ``` 2 在非源代码里开发需要使用jar方式注册。 2.1 对应code类的实现类上加上对应插件类型的注解。 ``` @Job( id = "setVariable", name = "设置变量", description = "设置变量", category = "变量", icon = "./static/graph/job/xxx.png" ) public class SetVariableCode extends JobMetaCode { ... } ``` 2.2 将项目打为jar包。 2.3 将生成的jar放置一个目录。 2.4 加载此目录下的插件 ```java String path = "E:\\workspace\\azkaban-plugin"; PluginFolder pluginFolder = new PluginFolder(path, false, true); // 添加插件目录 JobPluginType.getInstance().getPluginFolders().add(pluginFolder); Environment.addPluginType(JobPluginType.getInstance()); // 加载插件 Environment.init(); ``` #### 参与贡献 1. Fork 本仓库 2. 新建 Feat_xxx 分支 3. 提交代码 4. 新建 Pull Request 6. https://gitee.com/gitee-stars/