# flink **Repository Path**: longsebo/flink ## Basic Information - **Project Name**: flink - **Description**: 从github搬过来 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-09-18 - **Last Updated**: 2024-09-21 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # flink 启动Job加载外部jar都有哪些方法 ### 介绍 在 Apache Flink 版本中,启动 Job 时加载外部 Jar 包有几种不同的方法。这些方法允许用户引入自定义的 UDF(用户定义函数)或其他依赖项。以下是几种常见的方法: ### 联系我 如有问题可以加我微信号lita2lz或扫码:
![输入图片说明](docs/assets/weixinhao.png)
## 方法介绍 1. **使用 flink run 命令直接启动** 你可以通过命令行工具 flink run 来指定你的 Job 的主类以及外部 Jar 包的位置。例如: ``` flink run -c com.example.YourMainClass /path/to/your-job.jar -d --classpaths /path/to/external-jar.jar ``` 这里的 -c 参数指定了 Job 的入口类,-d 表示在后台运行 Job,--classpaths 参数用来指定外部 Jar 包的位置。 2. **通过 Flink Web UI 提交 Job** 如果使用的是 Flink 的 Web UI,可以直接在界面上选择你要提交的 Jar 包。在 Flink 的 Web UI 页面上,找到 "Submit New Job",然后选择要上传运行的 Jar 包,并进行任务配置。 3. **使用 Flink 的 YARN Session 模式** 如果你是在 YARN 上使用 Flink 的 Session 模式,那么可以在提交 Job 时通过配置文件或者命令行参数指定额外的 ClassPath。例如,在 flink-conf.yaml 中添加额外的 ClassPath: ``` yarn.application.classpaths: file:///path/to/external-jar.jar ``` 4. **使用 flink yarn cluster 或 flink yarn session 命令** 当使用 Flink 的 YARN 集群模式时,也可以通过命令行参数来指定外部 Jar 包的位置: ``` flink yarn cluster --classpaths /path/to/external-jar.jar ``` 或者 ``` flink yarn session --classpaths /path/to/external-jar.jar ``` 5. **将 Jar 包打包进主 Job Jar 包中** 另一种方法是将所有的依赖 Jar 包都打成一个单独的 Jar 包,然后将这个 Jar 包作为主 Job 的一部分。这样就不需要在运行时显式地指定外部 Jar 包了。 注意事项 确保外部 Jar 包与你的 Flink 版本兼容。 如果使用的是 YARN 模式,确保外部 Jar 包能够正确地分发到各个 TaskManager 上。 如果遇到内存问题,可能需要调整 YARN 或 Flink 的配置,例如增加可用的内存或者调整并行度。 不知道各位是否发现,前面几种方法,都一次指定一个jar。如果某个job依赖多个jar,比如20,30个jar ,通过-C参数一个个指定。不太现实。 第5种方法,虽然可以将多个jar打成一个jar,但是我尝试过,多个jar打包成一个jar了,但是运行时,死活找不到类。下面我介绍第6种方法 6. **修改flink 源码 ,增加支持传入jar目录参数。** 这样提供一个jar目录参数,不管有多少jar都放到这个目录。一个命令参数搞定,然后增加的功能,循环从jar目录装载jar到类路径中。这样就比较省事,另外就是不会跟系统或者其他job发生jar冲突。非常棒。 下面以flink 1.13版本为例, 先从git下载,url: https://github.com/apache/flink.git 说明怎么修改代码,一共修改了两个文件, 分别是CliFrontendParser.java,ProgramOptions.java 它们在flink-clients,但是运行环境它在flink-dist.jar中,修改完后,需要替换flink-dist.jar中对应这两个类 **CliFrontendParser.java** 备注//新增地方就是修改地方 ``` /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.client.cli; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import javax.annotation.Nullable; import java.util.Collection; import java.util.List; import java.util.stream.Collectors; /** * A simple command line parser (based on Apache Commons CLI) that extracts command line options. */ public class CliFrontendParser { static final Option HELP_OPTION = new Option( "h", "help", false, "Show the help message for the CLI Frontend or the action."); static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file."); static final Option CLASS_OPTION = new Option( "c", "class", true, "Class with the program entry point (\"main()\" method). Only needed if the " + "JAR file does not specify the class in its manifest."); static final Option CLASSPATH_OPTION = new Option( "C", "classpath", true, "Adds a URL to each user code " + "classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be " + "accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple " + "times for specifying more than one URL. The protocol must be supported by the " + "{@link java.net.URLClassLoader}."); public static final Option PARALLELISM_OPTION = new Option( "p", "parallelism", true, "The parallelism with which to run the program. Optional flag to override the default value " + "specified in the configuration."); public static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " + "the job in detached mode"); public static final Option SHUTDOWN_IF_ATTACHED_OPTION = new Option( "sae", "shutdownOnAttachedExit", false, "If the job is submitted in attached mode, perform a best-effort cluster shutdown " + "when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C."); //**********新增地方***************** static final Option JARDIR_OPTION = new Option( "jd", "jardir", true, "Adds a jar dir to each user code " + "classloader on all nodes in the cluster. The paths must specify exists and be " + "accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple " + "times for specifying more than one URL. "); //**********新增地方***************** /** * @deprecated use non-prefixed variant {@link #DETACHED_OPTION} for both YARN and non-YARN * deployments */ @Deprecated public static final Option YARN_DETACHED_OPTION = new Option( "yd", "yarndetached", false, "If present, runs " + "the job in detached mode (deprecated; use non-YARN specific option instead)"); public static final Option ARGS_OPTION = new Option( "a", "arguments", true, "Program arguments. Arguments can also be added without -a, simply as trailing parameters."); public static final Option ADDRESS_OPTION = new Option( "m", "jobmanager", true, "Address of the JobManager to which to connect. " + "Use this flag to connect to a different JobManager than the one specified in the configuration."); public static final Option SAVEPOINT_PATH_OPTION = new Option( "s", "fromSavepoint", true, "Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537)."); public static final Option SAVEPOINT_ALLOW_NON_RESTORED_OPTION = new Option( "n", "allowNonRestoredState", false, "Allow to skip savepoint state that cannot be restored. " + "You need to allow this if you removed an operator from your " + "program that was part of the program when the savepoint was triggered."); static final Option SAVEPOINT_DISPOSE_OPTION = new Option("d", "dispose", true, "Path of savepoint to dispose."); // list specific options static final Option RUNNING_OPTION = new Option("r", "running", false, "Show only running programs and their JobIDs"); static final Option SCHEDULED_OPTION = new Option("s", "scheduled", false, "Show only scheduled programs and their JobIDs"); static final Option ALL_OPTION = new Option("a", "all", false, "Show all programs and their JobIDs"); static final Option ZOOKEEPER_NAMESPACE_OPTION = new Option( "z", "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode"); static final Option CANCEL_WITH_SAVEPOINT_OPTION = new Option( "s", "withSavepoint", true, "**DEPRECATION WARNING**: " + "Cancelling a job with savepoint is deprecated. Use \"stop\" instead. \n Trigger" + " savepoint and cancel job. The target directory is optional. If no directory is " + "specified, the configured default directory (" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + ") is used."); public static final Option STOP_WITH_SAVEPOINT_PATH = new Option( "p", "savepointPath", true, "Path to the savepoint (for example hdfs:///flink/savepoint-1537). " + "If no directory is specified, the configured default will be used (\"" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "\")."); public static final Option STOP_AND_DRAIN = new Option( "d", "drain", false, "Send MAX_WATERMARK before taking the savepoint and stopping the pipelne."); public static final Option PY_OPTION = new Option( "py", "python", true, "Python script with the program entry point. " + "The dependent resources can be configured with the `--pyFiles` option."); public static final Option PYFILES_OPTION = new Option( "pyfs", "pyFiles", true, "Attach custom files for job. The standard resource file suffixes such as .py/.egg/.zip/.whl or directory are all supported. " + "These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. " + "Files suffixed with .zip will be extracted and added to PYTHONPATH. " + "Comma (',') could be used as the separator to specify multiple files " + "(e.g., --pyFiles file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)."); public static final Option PYMODULE_OPTION = new Option( "pym", "pyModule", true, "Python module with the program entry point. " + "This option must be used in conjunction with `--pyFiles`."); public static final Option PYREQUIREMENTS_OPTION = new Option( "pyreq", "pyRequirements", true, "Specify a requirements.txt file which defines the third-party dependencies. " + "These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. " + "A directory which contains the installation packages of these dependencies could be specified " + "optionally. Use '#' as the separator if the optional parameter exists " + "(e.g., --pyRequirements file:///tmp/requirements.txt#file:///tmp/cached_dir)."); public static final Option PYARCHIVE_OPTION = new Option( "pyarch", "pyArchives", true, "Add python archive files for job. The archive files will be extracted to the working directory " + "of python UDF worker. Currently only zip-format is supported. For each archive file, a target directory " + "be specified. If the target directory name is specified, the archive file will be extracted to a " + "directory with the specified name. Otherwise, the archive file will be extracted to a " + "directory with the same name of the archive file. The files uploaded via this option are accessible " + "via relative path. '#' could be used as the separator of the archive file path and the target directory " + "name. Comma (',') could be used as the separator to specify multiple archive files. " + "This option can be used to upload the virtual environment, the data files used in Python UDF " + "(e.g., --pyArchives file:///tmp/py37.zip,file:///tmp/data.zip#data --pyExecutable " + "py37.zip/py37/bin/python). The data files could be accessed in Python UDF, e.g.: " + "f = open('data/data.txt', 'r')."); public static final Option PYEXEC_OPTION = new Option( "pyexec", "pyExecutable", true, "Specify the path of the python interpreter used to execute the python UDF worker " + "(e.g.: --pyExecutable /usr/local/bin/python3). " + "The python UDF worker depends on Python 3.6+, Apache Beam (version == 2.27.0), " + "Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). " + "Please ensure that the specified environment meets the above requirements."); static { HELP_OPTION.setRequired(false); JAR_OPTION.setRequired(false); JAR_OPTION.setArgName("jarfile"); CLASS_OPTION.setRequired(false); CLASS_OPTION.setArgName("classname"); CLASSPATH_OPTION.setRequired(false); CLASSPATH_OPTION.setArgName("url"); ADDRESS_OPTION.setRequired(false); ADDRESS_OPTION.setArgName("host:port"); PARALLELISM_OPTION.setRequired(false); PARALLELISM_OPTION.setArgName("parallelism"); DETACHED_OPTION.setRequired(false); SHUTDOWN_IF_ATTACHED_OPTION.setRequired(false); YARN_DETACHED_OPTION.setRequired(false); JARDIR_OPTION.setRequired(false);//新增地方 ARGS_OPTION.setRequired(false); ARGS_OPTION.setArgName("programArgs"); ARGS_OPTION.setArgs(Option.UNLIMITED_VALUES); RUNNING_OPTION.setRequired(false); SCHEDULED_OPTION.setRequired(false); SAVEPOINT_PATH_OPTION.setRequired(false); SAVEPOINT_PATH_OPTION.setArgName("savepointPath"); SAVEPOINT_ALLOW_NON_RESTORED_OPTION.setRequired(false); ZOOKEEPER_NAMESPACE_OPTION.setRequired(false); ZOOKEEPER_NAMESPACE_OPTION.setArgName("zookeeperNamespace"); CANCEL_WITH_SAVEPOINT_OPTION.setRequired(false); CANCEL_WITH_SAVEPOINT_OPTION.setArgName("targetDirectory"); CANCEL_WITH_SAVEPOINT_OPTION.setOptionalArg(true); STOP_WITH_SAVEPOINT_PATH.setRequired(false); STOP_WITH_SAVEPOINT_PATH.setArgName("savepointPath"); STOP_WITH_SAVEPOINT_PATH.setOptionalArg(true); STOP_AND_DRAIN.setRequired(false); PY_OPTION.setRequired(false); PY_OPTION.setArgName("pythonFile"); PYFILES_OPTION.setRequired(false); PYFILES_OPTION.setArgName("pythonFiles"); PYMODULE_OPTION.setRequired(false); PYMODULE_OPTION.setArgName("pythonModule"); PYREQUIREMENTS_OPTION.setRequired(false); PYARCHIVE_OPTION.setRequired(false); PYEXEC_OPTION.setRequired(false); } static final Options RUN_OPTIONS = getRunCommandOptions(); private static Options buildGeneralOptions(Options options) { options.addOption(HELP_OPTION); // backwards compatibility: ignore verbose flag (-v) options.addOption(new Option("v", "verbose", false, "This option is deprecated.")); return options; } private static Options getProgramSpecificOptions(Options options) { options.addOption(JAR_OPTION); options.addOption(CLASS_OPTION); options.addOption(CLASSPATH_OPTION); options.addOption(PARALLELISM_OPTION); options.addOption(ARGS_OPTION); options.addOption(DETACHED_OPTION); options.addOption(SHUTDOWN_IF_ATTACHED_OPTION); options.addOption(YARN_DETACHED_OPTION); options.addOption(PY_OPTION); options.addOption(PYFILES_OPTION); options.addOption(PYMODULE_OPTION); options.addOption(PYREQUIREMENTS_OPTION); options.addOption(PYARCHIVE_OPTION); options.addOption(PYEXEC_OPTION); options.addOption(JARDIR_OPTION); //新增地方 return options; } private static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options options) { options.addOption(CLASS_OPTION); options.addOption(CLASSPATH_OPTION); options.addOption(PARALLELISM_OPTION); options.addOption(DETACHED_OPTION); options.addOption(SHUTDOWN_IF_ATTACHED_OPTION); options.addOption(PY_OPTION); options.addOption(PYFILES_OPTION); options.addOption(PYMODULE_OPTION); options.addOption(PYREQUIREMENTS_OPTION); options.addOption(PYARCHIVE_OPTION); options.addOption(PYEXEC_OPTION); options.addOption(JARDIR_OPTION); //新增地方 return options; } public static Options getRunCommandOptions() { Options options = buildGeneralOptions(new Options()); options = getProgramSpecificOptions(options); options.addOption(SAVEPOINT_PATH_OPTION); return options.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION); } static Options getInfoCommandOptions() { Options options = buildGeneralOptions(new Options()); return getProgramSpecificOptions(options); } static Options getListCommandOptions() { Options options = buildGeneralOptions(new Options()); options.addOption(ALL_OPTION); options.addOption(RUNNING_OPTION); return options.addOption(SCHEDULED_OPTION); } static Options getCancelCommandOptions() { Options options = buildGeneralOptions(new Options()); return options.addOption(CANCEL_WITH_SAVEPOINT_OPTION); } static Options getStopCommandOptions() { return buildGeneralOptions(new Options()) .addOption(STOP_WITH_SAVEPOINT_PATH) .addOption(STOP_AND_DRAIN); } static Options getSavepointCommandOptions() { Options options = buildGeneralOptions(new Options()); options.addOption(SAVEPOINT_DISPOSE_OPTION); return options.addOption(JAR_OPTION); } // -------------------------------------------------------------------------------------------- // Help // -------------------------------------------------------------------------------------------- private static Options getRunOptionsWithoutDeprecatedOptions(Options options) { Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options); o.addOption(SAVEPOINT_PATH_OPTION); return o.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION); } private static Options getInfoOptionsWithoutDeprecatedOptions(Options options) { options.addOption(CLASS_OPTION); options.addOption(PARALLELISM_OPTION); return options; } private static Options getListOptionsWithoutDeprecatedOptions(Options options) { options.addOption(RUNNING_OPTION); options.addOption(ALL_OPTION); options.addOption(SCHEDULED_OPTION); return options; } private static Options getCancelOptionsWithoutDeprecatedOptions(Options options) { return options.addOption(CANCEL_WITH_SAVEPOINT_OPTION); } private static Options getStopOptionsWithoutDeprecatedOptions(Options options) { return options.addOption(STOP_WITH_SAVEPOINT_PATH).addOption(STOP_AND_DRAIN); } private static Options getSavepointOptionsWithoutDeprecatedOptions(Options options) { options.addOption(SAVEPOINT_DISPOSE_OPTION); options.addOption(JAR_OPTION); return options; } /** Prints the help for the client. */ public static void printHelp(Collection customCommandLines) { System.out.println("./flink [OPTIONS] [ARGUMENTS]"); System.out.println(); System.out.println("The following actions are available:"); printHelpForRun(customCommandLines); printHelpForRunApplication(customCommandLines); printHelpForInfo(); printHelpForList(customCommandLines); printHelpForStop(customCommandLines); printHelpForCancel(customCommandLines); printHelpForSavepoint(customCommandLines); System.out.println(); } public static void printHelpForRun(Collection customCommandLines) { HelpFormatter formatter = new HelpFormatter(); formatter.setLeftPadding(5); formatter.setWidth(80); System.out.println("\nAction \"run\" compiles and runs a program."); System.out.println("\n Syntax: run [OPTIONS] "); formatter.setSyntaxPrefix(" \"run\" action options:"); formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options())); printCustomCliOptions(customCommandLines, formatter, true); System.out.println(); } public static void printHelpForRunApplication( Collection customCommandLines) { HelpFormatter formatter = new HelpFormatter(); formatter.setLeftPadding(5); formatter.setWidth(80); System.out.println("\nAction \"run-application\" runs an application in Application Mode."); System.out.println("\n Syntax: run-application [OPTIONS] "); formatter.setSyntaxPrefix(" \"run-application\" action options:"); // Only GenericCLI works with application mode, the other CLIs will be phased out // in the future List filteredCommandLines = customCommandLines.stream() .filter((cli) -> cli instanceof GenericCLI) .collect(Collectors.toList()); printCustomCliOptions(filteredCommandLines, formatter, true); System.out.println(); } public static void printHelpForInfo() { HelpFormatter formatter = new HelpFormatter(); formatter.setLeftPadding(5); formatter.setWidth(80); System.out.println( "\nAction \"info\" shows the optimized execution plan of the program (JSON)."); System.out.println("\n Syntax: info [OPTIONS] "); formatter.setSyntaxPrefix(" \"info\" action options:"); formatter.printHelp(" ", getInfoOptionsWithoutDeprecatedOptions(new Options())); System.out.println(); } public static void printHelpForList(Collection customCommandLines) { HelpFormatter formatter = new HelpFormatter(); formatter.setLeftPadding(5); formatter.setWidth(80); System.out.println("\nAction \"list\" lists running and scheduled programs."); System.out.println("\n Syntax: list [OPTIONS]"); formatter.setSyntaxPrefix(" \"list\" action options:"); formatter.printHelp(" ", getListOptionsWithoutDeprecatedOptions(new Options())); printCustomCliOptions(customCommandLines, formatter, false); System.out.println(); } public static void printHelpForStop(Collection customCommandLines) { HelpFormatter formatter = new HelpFormatter(); formatter.setLeftPadding(5); formatter.setWidth(80); System.out.println( "\nAction \"stop\" stops a running program with a savepoint (streaming jobs only)."); System.out.println("\n Syntax: stop [OPTIONS] "); formatter.setSyntaxPrefix(" \"stop\" action options:"); formatter.printHelp(" ", getStopOptionsWithoutDeprecatedOptions(new Options())); printCustomCliOptions(customCommandLines, formatter, false); System.out.println(); } public static void printHelpForCancel(Collection customCommandLines) { HelpFormatter formatter = new HelpFormatter(); formatter.setLeftPadding(5); formatter.setWidth(80); System.out.println("\nAction \"cancel\" cancels a running program."); System.out.println("\n Syntax: cancel [OPTIONS] "); formatter.setSyntaxPrefix(" \"cancel\" action options:"); formatter.printHelp(" ", getCancelOptionsWithoutDeprecatedOptions(new Options())); printCustomCliOptions(customCommandLines, formatter, false); System.out.println(); } public static void printHelpForSavepoint(Collection customCommandLines) { HelpFormatter formatter = new HelpFormatter(); formatter.setLeftPadding(5); formatter.setWidth(80); System.out.println( "\nAction \"savepoint\" triggers savepoints for a running job or disposes existing ones."); System.out.println("\n Syntax: savepoint [OPTIONS] []"); formatter.setSyntaxPrefix(" \"savepoint\" action options:"); formatter.printHelp(" ", getSavepointOptionsWithoutDeprecatedOptions(new Options())); printCustomCliOptions(customCommandLines, formatter, false); System.out.println(); } /** * Prints custom cli options. * * @param formatter The formatter to use for printing * @param runOptions True if the run options should be printed, False to print only general * options */ private static void printCustomCliOptions( Collection customCommandLines, HelpFormatter formatter, boolean runOptions) { // prints options from all available command-line classes for (CustomCommandLine cli : customCommandLines) { formatter.setSyntaxPrefix(" Options for " + cli.getId() + " mode:"); Options customOpts = new Options(); cli.addGeneralOptions(customOpts); if (runOptions) { cli.addRunOptions(customOpts); } formatter.printHelp(" ", customOpts); System.out.println(); } } public static SavepointRestoreSettings createSavepointRestoreSettings(CommandLine commandLine) { if (commandLine.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) { String savepointPath = commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt()); boolean allowNonRestoredState = commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt()); return SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState); } else { return SavepointRestoreSettings.none(); } } // -------------------------------------------------------------------------------------------- // Line Parsing // -------------------------------------------------------------------------------------------- public static CommandLine parse(Options options, String[] args, boolean stopAtNonOptions) throws CliArgsException { final DefaultParser parser = new DefaultParser(); try { return parser.parse(options, args, stopAtNonOptions); } catch (ParseException e) { throw new CliArgsException(e.getMessage()); } } /** * Merges the given {@link Options} into a new Options object. * * @param optionsA options to merge, can be null if none * @param optionsB options to merge, can be null if none * @return */ public static Options mergeOptions(@Nullable Options optionsA, @Nullable Options optionsB) { final Options resultOptions = new Options(); if (optionsA != null) { for (Option option : optionsA.getOptions()) { resultOptions.addOption(option); } } if (optionsB != null) { for (Option option : optionsB.getOptions()) { resultOptions.addOption(option); } } return resultOptions; } } ``` ProgramOptions.java 备注*//新增地方*就是修改地方 ``` /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.client.cli; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.commons.cli.CommandLine; import java.io.File; import java.io.FilenameFilter; import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import static org.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.CLASSPATH_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.JARDIR_OPTION;//新增地方 import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.SHUTDOWN_IF_ATTACHED_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION; import static org.apache.flink.client.cli.ProgramOptionsUtils.containsPythonDependencyOptions; import static org.apache.flink.client.cli.ProgramOptionsUtils.createPythonProgramOptions; import static org.apache.flink.client.cli.ProgramOptionsUtils.isPythonEntryPoint; /** Base class for command line options that refer to a JAR file program. */ public class ProgramOptions extends CommandLineOptions { private String jarFilePath; protected String entryPointClass; private final List classpaths; private final String[] programArgs; private final int parallelism; private final boolean detachedMode; private final boolean shutdownOnAttachedExit; private final SavepointRestoreSettings savepointSettings; protected ProgramOptions(CommandLine line) throws CliArgsException { super(line); this.entryPointClass = line.hasOption(CLASS_OPTION.getOpt()) ? line.getOptionValue(CLASS_OPTION.getOpt()) : null; this.jarFilePath = line.hasOption(JAR_OPTION.getOpt()) ? line.getOptionValue(JAR_OPTION.getOpt()) : null; this.programArgs = extractProgramArgs(line); List classpaths = new ArrayList(); if (line.hasOption(CLASSPATH_OPTION.getOpt())) { for (String path : line.getOptionValues(CLASSPATH_OPTION.getOpt())) { try { classpaths.add(new URL(path)); } catch (MalformedURLException e) { throw new CliArgsException("Bad syntax for classpath: " + path); } } } //*** 新增地方***** // load jardir all jar. if (line.hasOption(JARDIR_OPTION.getOpt())) { for (String path : line.getOptionValues(JARDIR_OPTION.getOpt())) { List jarFiles = null; try { jarFiles = loadAllJarFromPathURl(path); } catch (MalformedURLException e) { e.printStackTrace(); throw new CliArgsException("Bad syntax for classpath: " + path); } // classpaths.add(new URL(path)); classpaths.addAll(jarFiles); } } //*** 新增地方***** this.classpaths = classpaths; if (line.hasOption(PARALLELISM_OPTION.getOpt())) { String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt()); try { parallelism = Integer.parseInt(parString); if (parallelism <= 0) { throw new NumberFormatException(); } } catch (NumberFormatException e) { throw new CliArgsException( "The parallelism must be a positive number: " + parString); } } else { parallelism = ExecutionConfig.PARALLELISM_DEFAULT; } detachedMode = line.hasOption(DETACHED_OPTION.getOpt()) || line.hasOption(YARN_DETACHED_OPTION.getOpt()); shutdownOnAttachedExit = line.hasOption(SHUTDOWN_IF_ATTACHED_OPTION.getOpt()); this.savepointSettings = CliFrontendParser.createSavepointRestoreSettings(line); } //***新增地方**** start private List loadAllJarFromPathURl(String path) throws MalformedURLException { // 指定需要搜索的目录. List urls = new ArrayList<>(); System.out.println("jar dir:" + path); // 创建File对象表示目录. File directory = new File(path); // 使用FilenameFilter过滤出以.jar结尾的文件. File[] jarFiles = directory.listFiles( new FilenameFilter() { @Override public boolean accept(File dir, String name) { return name.toLowerCase().endsWith(".jar"); } }); System.out.println("jarFiles len:" + jarFiles.length); // 遍历找到的jar文件 if (jarFiles != null) { for (File jarFile : jarFiles) { System.out.println(jarFile.getAbsolutePath()); URL url = jarFile.toURI().toURL(); urls.add(url); } } return urls; } //***新增地方**** end protected String[] extractProgramArgs(CommandLine line) { String[] args = line.hasOption(ARGS_OPTION.getOpt()) ? line.getOptionValues(ARGS_OPTION.getOpt()) : line.getArgs(); if (args.length > 0 && !line.hasOption(JAR_OPTION.getOpt())) { jarFilePath = args[0]; args = Arrays.copyOfRange(args, 1, args.length); } return args; } public void validate() throws CliArgsException { // Java program should be specified a JAR file if (getJarFilePath() == null) { throw new CliArgsException("Java program should be specified a JAR file."); } } public String getJarFilePath() { return jarFilePath; } public String getEntryPointClassName() { return entryPointClass; } public List getClasspaths() { return classpaths; } public String[] getProgramArgs() { return programArgs; } public int getParallelism() { return parallelism; } public boolean getDetachedMode() { return detachedMode; } public boolean isShutdownOnAttachedExit() { return shutdownOnAttachedExit; } public SavepointRestoreSettings getSavepointRestoreSettings() { return savepointSettings; } public void applyToConfiguration(Configuration configuration) { if (getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT) { configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, getParallelism()); } configuration.setBoolean(DeploymentOptions.ATTACHED, !getDetachedMode()); configuration.setBoolean( DeploymentOptions.SHUTDOWN_IF_ATTACHED, isShutdownOnAttachedExit()); ConfigUtils.encodeCollectionToConfig( configuration, PipelineOptions.CLASSPATHS, getClasspaths(), URL::toString); SavepointRestoreSettings.toConfiguration(getSavepointRestoreSettings(), configuration); } public static ProgramOptions create(CommandLine line) throws CliArgsException { if (isPythonEntryPoint(line) || containsPythonDependencyOptions(line)) { return createPythonProgramOptions(line); } else { return new ProgramOptions(line); } } } ``` 到底行不行呢?然后我们编写验证代码,分两部分 (1).flink job代码 这里直接复制wordcount例子,改个类名,然后调用(2)的jar中一个类 TestLoadExtJar.java ``` /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.examples.java.testloadextjar; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.examples.java.wordcount.util.WordCountData; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; import com.test.A; /** * Implements the "WordCount" program that computes a simple word occurrence histogram over text * files. * *

The input is a plain text file with lines separated by newline characters. * *

Usage: WordCount --input <path> --output <path>
* If no parameters are provided, the program is run with default data from {@link WordCountData}. * *

This example shows how to: * *

    *
  • write a simple Flink program. *
  • use Tuple data types. *
  • write and use user-defined functions. *
*/ public class TestLoadExtJar { // ************************************************************************* // PROGRAM // ************************************************************************* public static void main(String[] args) throws Exception { final MultipleParameterTool params = MultipleParameterTool.fromArgs(args); // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); A a = new A(); a.test(); // make parameters available in the web interface env.getConfig().setGlobalJobParameters(params); // get input data DataSet text = null; if (params.has("input")) { // union all the inputs from text files for (String input : params.getMultiParameterRequired("input")) { if (text == null) { text = env.readTextFile(input); } else { text = text.union(env.readTextFile(input)); } } Preconditions.checkNotNull(text, "Input DataSet should not be null."); } else { // get default test text data System.out.println("Executing WordCount example with default input data set."); System.out.println("Use --input to specify file input."); text = WordCountData.getDefaultTextLineDataSet(env); } DataSet> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1); // emit result if (params.has("output")) { counts.writeAsCsv(params.get("output"), "\n", " "); // execute program env.execute("WordCount Example"); } else { System.out.println("Printing result to stdout. Use --output to specify output path."); counts.print(); } } // ************************************************************************* // USER FUNCTIONS // ************************************************************************* /** * Implements the string tokenizer that splits sentences into words as a user-defined * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the * form of "(word,1)" ({@code Tuple2}). */ public static final class Tokenizer implements FlatMapFunction> { @Override public void flatMap(String value, Collector> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<>(token, 1)); } } } } } ``` (2)模拟第三方代码 使用idea 创建一个maven项目,创建一个A类很简单,供TestLoadExtJar 调用 A.java ``` package com.test; public class A { public void test() { System.out.println("A"); } } ``` 然后把TestLoadExtJar,模拟第三方代码这两个项目打包jar,假如TestLoadExtJar例子打包为TestLoadExtJar.jar 模拟第三方代码打包为testcallextjar-1.0-SNAPSHOT.jar,放在/usr/local/flink-1.13.0/extlib目录下 然后在flink 下运行,先用原来方式运行,然后看报错信息,再加jd参数,指定jar目录,看看能否解决 未加jd参数: ![输入图片说明](docs/assets/1.png) 看到没,报类没找到 加了jd参数 ![输入图片说明](docs/assets/2.png) 然后就可以执行了 ## 最后 如果有问题或想沟通,可以加微信,见前面