# zip-archive **Repository Path**: mrbus/zip-archive ## Basic Information - **Project Name**: zip-archive - **Description**: 可控多线程高效ZIP压缩 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 6 - **Forks**: 0 - **Created**: 2020-12-26 - **Last Updated**: 2024-10-25 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 基于Apache开源commons-compress实现可控多线程压缩 ### 目的 + `Java`自带`ZipEntity`压缩方式太慢,严重影响压缩效率。 + `commons-compress`实现的`ZipArchiveEntry`效率足够高,但会使用用当前服务器全部可用线程,导致压缩过程中占用`CPU`资源过高,严重影响正常业务进行。 ### 解决办法 + 通过控制`ParallelScatterZipCreator`使用的线程数,控制压缩使用的CPU线程,在提升压缩性能的同时减少对正常业务的使用。 + 通过实际测试结果选择合适的线程数。 ### 实现 + 源码:[`https://gitee.com/mrbus/zip-archive.git`](https://gitee.com/mrbus/zip-archive.git) + 基础jar包 ``` org.apache.commons commons-compress 1.20 ``` + 可根据服务器实际测试数据选择合适的线程数,选择最优方案。 + `ZipArchiveCreator.java` ``` import org.apache.commons.compress.archivers.zip.ParallelScatterZipCreator; import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 多线程压缩线程池
* date:2020-12-26 13:57 * * @author MrBUS * @version 1.0 * @since JDK 1.8 */ public class ZipArchiveCreator extends ParallelScatterZipCreator { public ZipArchiveCreator() { this(Runtime.getRuntime().availableProcessors()); } public ZipArchiveCreator(int nThreads) { this(Executors.newFixedThreadPool(nThreads)); } public ZipArchiveCreator(ExecutorService executorService) { super(executorService); } public ZipArchiveCreator(ExecutorService executorService, ScatterGatherBackingStoreSupplier backingStoreSupplier) { super(executorService, backingStoreSupplier); } } ``` + `ZipArchiveInputStreamSupplier.java` ``` import org.apache.commons.compress.parallel.InputStreamSupplier; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.InputStream; /** * 多线程输入流
* date:2020-12-26 13:57 * * @author MrBUS * @version 1.0 * @since JDK 1.8 */ public class ZipArchiveInputStreamSupplier implements InputStreamSupplier { private final File file; public ZipArchiveInputStreamSupplier(File file) { this.file = file; } @Override public InputStream get() { try { return file.isDirectory() ? new ZipArchiveNullInputStream(0) : new FileInputStream(file); } catch (FileNotFoundException e) { e.printStackTrace(); } return null; } } ``` + `ZipArchiveNullInputStream.java` ``` import java.io.EOFException; import java.io.IOException; import java.io.InputStream; /** * 空流
* date:2020-12-26 13:57 * * @author MrBUS * @version 1.0 * @since JDK 1.8 */ public class ZipArchiveNullInputStream extends InputStream { private final long size; private long position; private long mark; private long readlimit; private boolean eof; private final boolean throwEofException; private final boolean markSupported; public ZipArchiveNullInputStream(long size) { this(size, true, false); } public ZipArchiveNullInputStream(long size, boolean markSupported, boolean throwEofException) { this.mark = -1L; this.size = size; this.markSupported = markSupported; this.throwEofException = throwEofException; } public long getPosition() { return this.position; } public long getSize() { return this.size; } @Override public int available() { long avail = this.size - this.position; if (avail <= 0L) { return 0; } else { return avail > 2147483647L ? 2147483647 : (int)avail; } } @Override public void close() throws IOException { this.eof = false; this.position = 0L; this.mark = -1L; } @Override public synchronized void mark(int readlimit) { if (!this.markSupported) { throw new UnsupportedOperationException("Mark not supported"); } else { this.mark = this.position; this.readlimit = (long)readlimit; } } @Override public boolean markSupported() { return this.markSupported; } @Override public int read() throws IOException { if (this.eof) { throw new IOException("Read after end of file"); } else if (this.position == this.size) { return this.doEndOfFile(); } else { ++this.position; return this.processByte(); } } @Override public int read(byte[] bytes) throws IOException { return this.read(bytes, 0, bytes.length); } @Override public int read(byte[] bytes, int offset, int length) throws IOException { if (this.eof) { throw new IOException("Read after end of file"); } else if (this.position == this.size) { return this.doEndOfFile(); } else { this.position += (long)length; int returnLength = length; if (this.position > this.size) { returnLength = length - (int)(this.position - this.size); this.position = this.size; } this.processBytes(bytes, offset, returnLength); return returnLength; } } @Override public synchronized void reset() throws IOException { if (!this.markSupported) { throw new UnsupportedOperationException("Mark not supported"); } else if (this.mark < 0L) { throw new IOException("No position has been marked"); } else if (this.position > this.mark + this.readlimit) { throw new IOException("Marked position [" + this.mark + "] is no longer valid - passed the read limit [" + this.readlimit + "]"); } else { this.position = this.mark; this.eof = false; } } @Override public long skip(long numberOfBytes) throws IOException { if (this.eof) { throw new IOException("Skip after end of file"); } else if (this.position == this.size) { return (long)this.doEndOfFile(); } else { this.position += numberOfBytes; long returnLength = numberOfBytes; if (this.position > this.size) { returnLength = numberOfBytes - (this.position - this.size); this.position = this.size; } return returnLength; } } protected int processByte() { return 0; } protected void processBytes(byte[] bytes, int offset, int length) { } private int doEndOfFile() throws EOFException { this.eof = true; if (this.throwEofException) { throw new EOFException(); } else { return -1; } } } ``` + `ZipArchiveScatterOutputStream.java` ``` import org.apache.commons.compress.archivers.zip.ScatterZipOutputStream; import org.apache.commons.compress.archivers.zip.ZipArchiveEntry; import org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest; import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream; import org.apache.commons.compress.parallel.InputStreamSupplier; import java.io.File; import java.io.IOException; import java.util.concurrent.ExecutionException; /** * 多线程并行输出流
* date:2020-12-26 13:57 * * @author MrBUS * @version 1.0 * @since JDK 1.8 */ public class ZipArchiveScatterOutputStream { /** * 需要 压缩文件/文件夹 路径 */ private String directoryPath; /** * */ private final ZipArchiveCreator creator; /** * 多线程输出流 */ private final ScatterZipOutputStream output; public ZipArchiveScatterOutputStream(String directoryPath) throws IOException { this(directoryPath, Runtime.getRuntime().availableProcessors()); } public ZipArchiveScatterOutputStream(String directoryPath, int nThreads) throws IOException { this.directoryPath = directoryPath; this.creator = new ZipArchiveCreator(nThreads); this.output = ScatterZipOutputStream.fileBased(File.createTempFile("whatever-preffix", ".whatever")); } public void addEntry(ZipArchiveEntry entry, InputStreamSupplier supplier) throws IOException { if (entry.isDirectory() && !entry.isUnixSymlink()) { output.addArchiveEntry(ZipArchiveEntryRequest.createZipArchiveEntryRequest(entry, supplier)); } else { creator.addArchiveEntry(entry, supplier); } } public void writeTo(ZipArchiveOutputStream archiveOutput) throws IOException, ExecutionException, InterruptedException { output.writeTo(archiveOutput); output.close(); creator.writeTo(archiveOutput); } public String getDirectoryPath() { return directoryPath; } public ZipArchiveCreator getCreator() { return creator; } public ScatterZipOutputStream getOutput() { return output; } } ``` + `ZipArchiveUtils.java` ``` import org.apache.commons.compress.archivers.zip.ZipArchiveEntry; import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream; import org.apache.commons.compress.parallel.InputStreamSupplier; import java.io.File; import java.io.IOException; import java.util.Objects; import java.util.zip.ZipEntry; /** * 多线程并行压缩工具
* date:2020-12-26 13:57 * * @author MrBUS * @version 1.0 * @since JDK 1.8 */ public class ZipArchiveUtils { /** * 限制最大使用线程 */ private static final int MAX_THREADS = 4; /** * 默认使用线程数百分比:67% */ private static final double DEFAULT_THREADS_RATIO = 0.67; /** * 创建压缩文件 * * @param directoryPath 需压缩文件夹/文件 * @param zipPath 压缩包路径 + 文件名 */ public void createZip(String directoryPath, String zipPath) { this.createZip(directoryPath, zipPath, ZipEntry.DEFLATED, getAvailableThreads(DEFAULT_THREADS_RATIO)); } /** * 创建压缩文件 * * @param directoryPath 需压缩文件夹/文件 * @param zipPath 压缩包路径 + 文件名 * @param nThreads 线程数 */ public void createZip(String directoryPath, String zipPath, int nThreads) { this.createZip(directoryPath, zipPath, ZipEntry.DEFLATED, nThreads); } /** * 创建压缩文件 * * @param directoryPath 需压缩文件夹/文件 * @param zipPath 压缩包路径 + 文件名 * @param availableThreadsRatio 可用线程比例 */ public void createZip(String directoryPath, String zipPath, double availableThreadsRatio) { this.createZip(directoryPath, zipPath, ZipEntry.DEFLATED, getAvailableThreads(availableThreadsRatio)); } /** * 创建压缩文件 * * @param directoryPath 需压缩文件夹/文件 * @param zipPath 压缩包路径 + 文件名 * @param method 压缩方式:ZipEntry.DEFLATED: 压缩/ZipEntry.STORED:不压缩 * @param nThreads 线程数 */ public void createZip(String directoryPath, String zipPath, int method, int nThreads) { try { File zipFile = new File(zipPath); File dstFolder = new File(zipFile.getParent()); if (!dstFolder.isDirectory()) { dstFolder.mkdirs(); } File rootDir = new File(directoryPath); ZipArchiveScatterOutputStream scatterOutput = new ZipArchiveScatterOutputStream(rootDir.getAbsolutePath(), nThreads); compress(rootDir, scatterOutput, getFileName(zipFile.getName()), method); ZipArchiveOutputStream archiveOutput = new ZipArchiveOutputStream(zipFile); scatterOutput.writeTo(archiveOutput); archiveOutput.close(); } catch (Exception e) { System.err.println("压缩异常"); e.printStackTrace(); } } /** * 压缩文件 * * @param dir 压缩文件 * @param output ZipArchive多线程输出流 * @param zipName 压缩包名称 * @param method 压缩方式:ZipEntry.DEFLATED: 压缩/ZipEntry.STORED:不压缩 * @throws IOException 流异常 */ private void compress(File dir, ZipArchiveScatterOutputStream output, String zipName, int method) throws IOException { if (dir == null) { return; } if (dir.isFile()) { addEntry(zipName, dir, output, method); return; } if (Objects.requireNonNull(dir.listFiles()).length == 0) { String fileName = zipName + dir.getAbsolutePath().replace(output.getDirectoryPath(), "") + File.separator; addEntry(fileName, dir, output, method); return; } for (File file : Objects.requireNonNull(dir.listFiles())) { if (file.isDirectory()) { compress(file, output, zipName, method); } else { String fileName = zipName + file.getParent().replace(output.getDirectoryPath(), "") + File.separator + file.getName(); addEntry(fileName, file, output, method); } } } /** * 添加目录/文件 * * @param filePath 压缩文件路径 * @param file 压缩文件 * @param output ZipArchive多线程输出流 * @param method 压缩方式:ZipEntry.DEFLATED: 压缩/ZipEntry.STORED:不压缩 * @throws IOException 流异常 */ private void addEntry(String filePath, File file, ZipArchiveScatterOutputStream output, int method) throws IOException { ZipArchiveEntry archiveEntry = new ZipArchiveEntry(filePath); archiveEntry.setMethod(method); InputStreamSupplier supplier = new ZipArchiveInputStreamSupplier(file); output.addEntry(archiveEntry, supplier); } /** * 获取无后缀文件名 * * @param fileName 文件名 * @return 无后缀文件名 */ private String getFileName(String fileName) { if (fileName == null || fileName.length() <= 1 || !fileName.contains(".")) { return fileName; } return fileName.substring(0, fileName.lastIndexOf(".")); } /** * 计算可用线程 * * @param ratio 使用线程比率 * @return 可用线程 */ private int getAvailableThreads(double ratio) { int availableProcessors = Runtime.getRuntime().availableProcessors(); int nThreads = (int) (availableProcessors * ratio); if (nThreads <= 0) { return 1; } else if (nThreads > MAX_THREADS) { return Math.min(MAX_THREADS, availableProcessors); } return Math.min(nThreads, availableProcessors); } } ``` ### Test ``` /** *
* date:2020/12/26 14:55 * * @author MrBUS * @version 1.0 * @since JDK 1.8 */ public class ZipArchiveTest { public static void main(String[] args) { // 要压缩文件 String directoryPath = "E:/zip-test"; // 压缩后路径 String zipPath = "E:/zip-archive/zip-archive-" + System.currentTimeMillis() + "-"; for (int i = Runtime.getRuntime().availableProcessors(); i > 0; i--) { long beginTime = System.currentTimeMillis(); new ZipArchiveUtils().createZip(directoryPath, zipPath + i + ".zip", i); long endTime = System.currentTimeMillis(); System.out.println("使用 " + i + " 线程,压缩耗时:" + (endTime - beginTime) + "毫秒"); } } } ``` ### 总结 以上仅代表个人理解,仅供参考。希望能帮助到有需要的同学。个人技术程度有限,有不对的地方望谅解。觉得有用就给作者点个赞吧,谢谢。