getClasspaths() {
return classpaths;
}
public String[] getProgramArgs() {
return programArgs;
}
public int getParallelism() {
return parallelism;
}
public boolean getDetachedMode() {
return detachedMode;
}
public boolean isShutdownOnAttachedExit() {
return shutdownOnAttachedExit;
}
public SavepointRestoreSettings getSavepointRestoreSettings() {
return savepointSettings;
}
public void applyToConfiguration(Configuration configuration) {
if (getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT) {
configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, getParallelism());
}
configuration.setBoolean(DeploymentOptions.ATTACHED, !getDetachedMode());
configuration.setBoolean(
DeploymentOptions.SHUTDOWN_IF_ATTACHED, isShutdownOnAttachedExit());
ConfigUtils.encodeCollectionToConfig(
configuration, PipelineOptions.CLASSPATHS, getClasspaths(), URL::toString);
SavepointRestoreSettings.toConfiguration(getSavepointRestoreSettings(), configuration);
}
public static ProgramOptions create(CommandLine line) throws CliArgsException {
if (isPythonEntryPoint(line) || containsPythonDependencyOptions(line)) {
return createPythonProgramOptions(line);
} else {
return new ProgramOptions(line);
}
}
}
```
到底行不行呢?然后我们编写验证代码,分两部分
(1).flink job代码
这里直接复制wordcount例子,改个类名,然后调用(2)的jar中一个类
TestLoadExtJar.java
```
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.java.testloadextjar;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import com.test.A;
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text
* files.
*
* The input is a plain text file with lines separated by newline characters.
*
*
Usage: WordCount --input <path> --output <path>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
*
*
This example shows how to:
*
*
* - write a simple Flink program.
*
- use Tuple data types.
*
- write and use user-defined functions.
*
*/
public class TestLoadExtJar {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
A a = new A();
a.test();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
// get input data
DataSet text = null;
if (params.has("input")) {
// union all the inputs from text files
for (String input : params.getMultiParameterRequired("input")) {
if (text == null) {
text = env.readTextFile(input);
} else {
text = text.union(env.readTextFile(input));
}
}
Preconditions.checkNotNull(text, "Input DataSet should not be null.");
} else {
// get default test text data
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
text = WordCountData.getDefaultTextLineDataSet(env);
}
DataSet> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// emit result
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
// execute program
env.execute("WordCount Example");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
/**
* Implements the string tokenizer that splits sentences into words as a user-defined
* FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the
* form of "(word,1)" ({@code Tuple2}).
*/
public static final class Tokenizer
implements FlatMapFunction> {
@Override
public void flatMap(String value, Collector> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
```
(2)模拟第三方代码
使用idea 创建一个maven项目,创建一个A类很简单,供TestLoadExtJar 调用
A.java
```
package com.test;
public class A {
public void test() {
System.out.println("A");
}
}
```
然后把TestLoadExtJar,模拟第三方代码这两个项目打包jar,假如TestLoadExtJar例子打包为TestLoadExtJar.jar 模拟第三方代码打包为testcallextjar-1.0-SNAPSHOT.jar,放在/usr/local/flink-1.13.0/extlib目录下
然后在flink 下运行,先用原来方式运行,然后看报错信息,再加jd参数,指定jar目录,看看能否解决
未加jd参数:

看到没,报类没找到
加了jd参数

然后就可以执行了
## 最后
如果有问题或想沟通,可以加微信,见前面