登录
注册
开源
企业版
高校版
搜索
帮助中心
使用条款
关于我们
开源
企业版
高校版
私有云
模力方舟
AI 队友
登录
注册
轻量养虾,开箱即用!低 Token + 稳定算力,Gitee & 模力方舟联合出品的 PocketClaw 正式开售!点击了解详情
代码拉取完成,页面将自动刷新
开源项目
>
程序开发
>
作业/任务调度
&&
捐赠
捐赠前请先登录
取消
前往登录
扫描微信二维码支付
取消
支付完成
支付提示
将跳转至支付宝完成支付
确定
取消
Watch
不关注
关注所有动态
仅关注版本发行动态
关注但不提醒动态
263
Star
2.5K
Fork
872
GVP
假诗人
/
PowerJob
代码
Issues
93
Pull Requests
3
Wiki
统计
流水线
服务
JavaDoc
质量分析
Jenkins for Gitee
腾讯云托管
腾讯云 Serverless
悬镜安全
阿里云 SAE
Codeblitz
SBOM
开发画像分析
我知道了,不再自动展开
更新失败,请稍后重试!
移除标识
内容风险标识
本任务被
标识为内容中包含有代码安全 Bug 、隐私泄露等敏感信息,仓库外成员不可访问
实现了datax任务的处理支持回馈给作者大佬
待办的
#I6WBZ5
kanon
创建于
2023-04-17 15:08
已经实现了对datax任务的支持,回馈给作者大大,抽象类 AbstractScriptProcessor 中有个变量 dataXPyPath 是处理器调起datax任务需要的脚本路径变量,因为这个类是抽象的通过配置文件从外部注入属性好像拿不到这个变量值,就这个小缺陷 :smiling_imp: ``` package tech.powerjob.official.processors.impl.script; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.SystemUtils; import tech.powerjob.official.processors.CommonBasicProcessor; import tech.powerjob.official.processors.util.CommonUtils; import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.TaskContext; import tech.powerjob.worker.log.OmsLogger; import java.io.*; import java.net.URL; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.Set; import java.util.concurrent.ForkJoinPool; /** * 脚本处理器 * * @author tjq * @author Jiang Jining * @since 2020/4/16 */ @Slf4j public abstract class AbstractScriptProcessor extends CommonBasicProcessor { private static final ForkJoinPool POOL = new ForkJoinPool(4 * Runtime.getRuntime().availableProcessors()); private static final Set<String> DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp"); protected static final String SH_SHELL = "/bin/sh"; protected static final String CMD_SHELL = "cmd.exe"; private String dataXPyPath = "D:\\develop\\datax\\bin\\datax.py";//指定datax安装目录下datax.py启动脚本路径 private static final String WORKER_DIR = System.getProperty("user.home") + "/powerjob/worker/official_script_processor/"; @Override protected ProcessResult process0(TaskContext context) throws Exception { OmsLogger omsLogger = context.getOmsLogger(); String scriptParams = CommonUtils.parseParams(context); omsLogger.info("[SYSTEM] ScriptProcessor start to process, params: {}", scriptParams); if (scriptParams == null) { String message = "[SYSTEM] ScriptParams is null, please check jobParam configuration."; omsLogger.warn(message); return new ProcessResult(false, message); } String scriptPath = prepareScriptFile(context.getInstanceId(), scriptParams); omsLogger.info("[SYSTEM] Generate executable file successfully, path: {}", scriptPath); if (SystemUtils.IS_OS_WINDOWS) { if (StringUtils.equals(getRunCommand(), SH_SHELL)) { String message = String.format("[SYSTEM] Current OS is %s where shell scripts cannot run.", SystemUtils.OS_NAME); omsLogger.warn(message); return new ProcessResult(false, message); } } // 授权 if (!SystemUtils.IS_OS_WINDOWS) { ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath); // 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁) chmodPb.start().waitFor(); omsLogger.info("[SYSTEM] chmod 755 authorization complete, ready to start execution~"); } // 2. 执行目标脚本,修改此处增加三元判断是否为datax类型任务 , 后期修改此处来实现传入参数 ProcessBuilder pb = StringUtils.equals(getRunCommand(), CMD_SHELL) ? new ProcessBuilder(getRunCommand(), "/c", scriptPath) : scriptPath.endsWith("json") ? new ProcessBuilder(getRunCommand(), dataXPyPath, scriptPath) : new ProcessBuilder(getRunCommand(), scriptPath); Process process = pb.start(); StringBuilder inputBuilder = new StringBuilder(); StringBuilder errorBuilder = new StringBuilder(); boolean success = true; String result; final Charset charset = getCharset(); try (InputStream is = process.getInputStream(); InputStream es = process.getErrorStream()) { POOL.execute(() -> copyStream(is, inputBuilder, omsLogger, charset)); POOL.execute(() -> copyStream(es, errorBuilder, omsLogger, charset)); success = process.waitFor() == 0; } catch (InterruptedException ie) { omsLogger.info("[SYSTEM] ScriptProcessor has been interrupted"); } finally { result = String.format("[INPUT]: %s;[ERROR]: %s", inputBuilder.toString(), errorBuilder.toString()); } return new ProcessResult(success, result); } private String prepareScriptFile(Long instanceId, String processorInfo) throws IOException { String scriptPath = WORKER_DIR + getScriptName(instanceId); File script = new File(scriptPath); if (script.exists()) { return scriptPath; } File dir = new File(script.getParent()); boolean success = dir.mkdirs(); success = script.createNewFile(); if (!success) { throw new RuntimeException("create script file failed"); } // 如果是下载链接,则从网络获取 for (String protocol : DOWNLOAD_PROTOCOL) { if (processorInfo.startsWith(protocol)) { FileUtils.copyURLToFile(new URL(processorInfo), script, 5000, 300000); return scriptPath; } } final Charset charset = getCharset(); if (charset != null) { try (Writer fstream = new OutputStreamWriter(Files.newOutputStream(script.toPath()), charset); BufferedWriter out = new BufferedWriter(fstream)) { out.write(processorInfo); out.flush(); } } else { try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) { bw.write(processorInfo); bw.flush(); } } return scriptPath; } private static void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger, Charset charset) { String line; try (BufferedReader br = new BufferedReader(new InputStreamReader(is, charset))) { while ((line = br.readLine()) != null) { sb.append(line); // 同步到在线日志 omsLogger.info(line); } } catch (Exception e) { log.warn("[ScriptProcessor] copyStream failed.", e); omsLogger.warn("[SYSTEM] copyStream failed.", e); sb.append("Exception: ").append(e); } } /** * 生成脚本名称 * * @param instanceId id of instance * @return 文件名称 */ protected abstract String getScriptName(Long instanceId); /** * 获取运行命令(eg,shell返回 /bin/sh) * * @return 执行脚本的命令 */ protected abstract String getRunCommand(); /** * 默认不指定 * * @return Charset */ protected Charset getCharset() { return StandardCharsets.UTF_8; } } ``` ``` package tech.powerjob.official.processors.impl.script; import org.springframework.stereotype.Component; @Component public class DataxProcessor extends AbstractScriptProcessor { @Override protected String getScriptName(Long instanceId) { return String.format("DataX_%d.json", instanceId); } @Override protected String getRunCommand() { return "python"; } } ```
已经实现了对datax任务的支持,回馈给作者大大,抽象类 AbstractScriptProcessor 中有个变量 dataXPyPath 是处理器调起datax任务需要的脚本路径变量,因为这个类是抽象的通过配置文件从外部注入属性好像拿不到这个变量值,就这个小缺陷 :smiling_imp: ``` package tech.powerjob.official.processors.impl.script; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.SystemUtils; import tech.powerjob.official.processors.CommonBasicProcessor; import tech.powerjob.official.processors.util.CommonUtils; import tech.powerjob.worker.core.processor.ProcessResult; import tech.powerjob.worker.core.processor.TaskContext; import tech.powerjob.worker.log.OmsLogger; import java.io.*; import java.net.URL; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.Set; import java.util.concurrent.ForkJoinPool; /** * 脚本处理器 * * @author tjq * @author Jiang Jining * @since 2020/4/16 */ @Slf4j public abstract class AbstractScriptProcessor extends CommonBasicProcessor { private static final ForkJoinPool POOL = new ForkJoinPool(4 * Runtime.getRuntime().availableProcessors()); private static final Set<String> DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp"); protected static final String SH_SHELL = "/bin/sh"; protected static final String CMD_SHELL = "cmd.exe"; private String dataXPyPath = "D:\\develop\\datax\\bin\\datax.py";//指定datax安装目录下datax.py启动脚本路径 private static final String WORKER_DIR = System.getProperty("user.home") + "/powerjob/worker/official_script_processor/"; @Override protected ProcessResult process0(TaskContext context) throws Exception { OmsLogger omsLogger = context.getOmsLogger(); String scriptParams = CommonUtils.parseParams(context); omsLogger.info("[SYSTEM] ScriptProcessor start to process, params: {}", scriptParams); if (scriptParams == null) { String message = "[SYSTEM] ScriptParams is null, please check jobParam configuration."; omsLogger.warn(message); return new ProcessResult(false, message); } String scriptPath = prepareScriptFile(context.getInstanceId(), scriptParams); omsLogger.info("[SYSTEM] Generate executable file successfully, path: {}", scriptPath); if (SystemUtils.IS_OS_WINDOWS) { if (StringUtils.equals(getRunCommand(), SH_SHELL)) { String message = String.format("[SYSTEM] Current OS is %s where shell scripts cannot run.", SystemUtils.OS_NAME); omsLogger.warn(message); return new ProcessResult(false, message); } } // 授权 if (!SystemUtils.IS_OS_WINDOWS) { ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath); // 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁) chmodPb.start().waitFor(); omsLogger.info("[SYSTEM] chmod 755 authorization complete, ready to start execution~"); } // 2. 执行目标脚本,修改此处增加三元判断是否为datax类型任务 , 后期修改此处来实现传入参数 ProcessBuilder pb = StringUtils.equals(getRunCommand(), CMD_SHELL) ? new ProcessBuilder(getRunCommand(), "/c", scriptPath) : scriptPath.endsWith("json") ? new ProcessBuilder(getRunCommand(), dataXPyPath, scriptPath) : new ProcessBuilder(getRunCommand(), scriptPath); Process process = pb.start(); StringBuilder inputBuilder = new StringBuilder(); StringBuilder errorBuilder = new StringBuilder(); boolean success = true; String result; final Charset charset = getCharset(); try (InputStream is = process.getInputStream(); InputStream es = process.getErrorStream()) { POOL.execute(() -> copyStream(is, inputBuilder, omsLogger, charset)); POOL.execute(() -> copyStream(es, errorBuilder, omsLogger, charset)); success = process.waitFor() == 0; } catch (InterruptedException ie) { omsLogger.info("[SYSTEM] ScriptProcessor has been interrupted"); } finally { result = String.format("[INPUT]: %s;[ERROR]: %s", inputBuilder.toString(), errorBuilder.toString()); } return new ProcessResult(success, result); } private String prepareScriptFile(Long instanceId, String processorInfo) throws IOException { String scriptPath = WORKER_DIR + getScriptName(instanceId); File script = new File(scriptPath); if (script.exists()) { return scriptPath; } File dir = new File(script.getParent()); boolean success = dir.mkdirs(); success = script.createNewFile(); if (!success) { throw new RuntimeException("create script file failed"); } // 如果是下载链接,则从网络获取 for (String protocol : DOWNLOAD_PROTOCOL) { if (processorInfo.startsWith(protocol)) { FileUtils.copyURLToFile(new URL(processorInfo), script, 5000, 300000); return scriptPath; } } final Charset charset = getCharset(); if (charset != null) { try (Writer fstream = new OutputStreamWriter(Files.newOutputStream(script.toPath()), charset); BufferedWriter out = new BufferedWriter(fstream)) { out.write(processorInfo); out.flush(); } } else { try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) { bw.write(processorInfo); bw.flush(); } } return scriptPath; } private static void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger, Charset charset) { String line; try (BufferedReader br = new BufferedReader(new InputStreamReader(is, charset))) { while ((line = br.readLine()) != null) { sb.append(line); // 同步到在线日志 omsLogger.info(line); } } catch (Exception e) { log.warn("[ScriptProcessor] copyStream failed.", e); omsLogger.warn("[SYSTEM] copyStream failed.", e); sb.append("Exception: ").append(e); } } /** * 生成脚本名称 * * @param instanceId id of instance * @return 文件名称 */ protected abstract String getScriptName(Long instanceId); /** * 获取运行命令(eg,shell返回 /bin/sh) * * @return 执行脚本的命令 */ protected abstract String getRunCommand(); /** * 默认不指定 * * @return Charset */ protected Charset getCharset() { return StandardCharsets.UTF_8; } } ``` ``` package tech.powerjob.official.processors.impl.script; import org.springframework.stereotype.Component; @Component public class DataxProcessor extends AbstractScriptProcessor { @Override protected String getScriptName(Long instanceId) { return String.format("DataX_%d.json", instanceId); } @Override protected String getRunCommand() { return "python"; } } ```
评论 (
2
)
登录
后才可以发表评论
状态
待办的
待办的
进行中
已完成
已关闭
负责人
未设置
标签
未设置
标签管理
里程碑
未关联里程碑
未关联里程碑
Pull Requests
未关联
未关联
关联的 Pull Requests 被合并后可能会关闭此 issue
分支
未关联
分支 (
-
)
标签 (
-
)
开始日期   -   截止日期
-
置顶选项
不置顶
置顶等级:高
置顶等级:中
置顶等级:低
优先级
不指定
严重
主要
次要
不重要
参与者(3)
Java
1
https://gitee.com/KFCFans/PowerJob.git
git@gitee.com:KFCFans/PowerJob.git
KFCFans
PowerJob
PowerJob
点此查找更多帮助
搜索帮助
Git 命令在线学习
如何在 Gitee 导入 GitHub 仓库
Git 仓库基础操作
企业版和社区版功能对比
SSH 公钥设置
如何处理代码冲突
仓库体积过大,如何减小?
如何找回被删除的仓库数据
Gitee 产品配额说明
GitHub仓库快速导入Gitee及同步更新
什么是 Release(发行版)
将 PHP 项目自动发布到 packagist.org
评论
仓库举报
回到顶部
登录提示
该操作需登录 Gitee 帐号,请先登录后再操作。
立即登录
没有帐号,去注册