1 Star 1 Fork 0

leoIamOk/geek-university-bigdata-training-camp

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
SparkDistCP.java 3.86 KB
一键复制 编辑 原始数据 按行查看 历史
lei.zhu 提交于 4年前 . update
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
public class SparkDistCP {
private static final SparkConf sparkConf;
private static final SparkContext sparkContext;
private static final JavaSparkContext javaSparkContext;
private static final Configuration configuration;
static {
sparkConf = new SparkConf();
sparkConf.set("spark.master", "local[*]");
sparkConf.set("spark.app.name", "localrun");
sparkContext = SparkContext.getOrCreate(sparkConf);
javaSparkContext = new JavaSparkContext(sparkContext);
configuration = sparkContext.hadoopConfiguration();
}
public static void main(String [] args) throws IOException {
String sourceRootPathStr = args[0];//"file:///tmp/source/";
String targetRootPathStr = args[1];//"file:///tmp/target/";
int maxConcurrency = Integer.parseInt(args[2]);
boolean ignoreFailure = Boolean.parseBoolean(args[3]);
JavaRDD<String> sourceFileListRDD = getSourceFileLists(sourceRootPathStr, targetRootPathStr, maxConcurrency);
sourceFileListRDD.foreachPartition(sourceFileIterator -> {
FileSystem sourceFileSystem = new Path(sourceRootPathStr).getFileSystem(configuration);
FileSystem targetFileSystem = new Path(targetRootPathStr).getFileSystem(configuration);
while(sourceFileIterator.hasNext()) {
String sourceFilePath = sourceFileIterator.next();
Path sourceFileRelativePath = new Path(new Path(sourceRootPathStr).toUri().relativize(new Path(sourceFilePath).toUri()));
Path targetPath = new Path(targetRootPathStr, sourceFileRelativePath);
try(InputStream sourceInputStream = sourceFileSystem.open(new Path(sourceFilePath));
FSDataOutputStream fsDataOutputStream = targetFileSystem.create(targetPath, true)) {
IOUtils.copy(sourceInputStream, fsDataOutputStream);
} catch(Throwable t) {
if(!ignoreFailure) {
throw t;
}
}
}
});
}
private static JavaRDD<String> getSourceFileLists(String sourceRootPathStr, String targetRootPathStr, int maxConcurrency) throws IOException {
Path sourceRootPath = new Path(sourceRootPathStr);
Path targetRootPath = new Path(targetRootPathStr);
FileSystem sourceFileSystem = sourceRootPath.getFileSystem(configuration);
FileSystem targetFileSystem = targetRootPath.getFileSystem(configuration);
RemoteIterator<LocatedFileStatus> iterator = sourceFileSystem.listFiles(sourceRootPath, true);
Set<Path> distinctDirPaths = new HashSet<>();
List<String> fileList = new ArrayList<>();
while(iterator.hasNext()) {
LocatedFileStatus locatedFileStatus = iterator.next();
Path filePath = locatedFileStatus.getPath();
distinctDirPaths.add(filePath.getParent());
fileList.add(filePath.toString());
}
distinctDirPaths.remove(sourceRootPath);
for(Path distinctDirPath : distinctDirPaths) {
String sourceChildrenDirRelativePathStr = sourceRootPath.toUri().relativize(distinctDirPath.toUri()).toString();
targetFileSystem.mkdirs(new Path(targetRootPath, sourceChildrenDirRelativePathStr), new FsPermission(FsAction.ALL, FsAction.READ, FsAction.READ));
}
return javaSparkContext.parallelize(fileList, maxConcurrency);
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Java
1
https://gitee.com/leoIamOk/geek-university-bigdata-training-camp.git
git@gitee.com:leoIamOk/geek-university-bigdata-training-camp.git
leoIamOk
geek-university-bigdata-training-camp
geek-university-bigdata-training-camp
master

搜索帮助