# downloadThread **Repository Path**: extraU/downloadThread ## Basic Information - **Project Name**: downloadThread - **Description**: 本项目利用ForkJoin模式简单的实现了多线程下载 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2018-01-24 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # downloadThread 本项目利用ForkJoin模式简单的实现了多线程下载 # 起因 本人在看了 [Java实现多线程下载文件](https://blog.dubby.cn/detail.html?id=9090) 这篇博客后突发想法而做。 那篇文章利用Callable类和线程池实现多线程下载。其原理是:先使用head方法查询得到对应文件的Content-Length,然后拆分成多个部分,交由多个线程去处理,使用"Range", "bytes=" + start + "-" + end这个header来指定下载文件的哪个部分。然后交给一个线程池实现多线程。每一个线程是实现了Callable类,这是一个带有返回结果的线程。返回一个Future类,这样实现了异步调用。而我的改进则是采用ForkJoin模式实现。 # ForkJoin Fork/Join是Java7后提供的并行执行任务的框架。其主要思想就是将一个大任务拆分为一个个小任务,再将这些结果汇合成最终结果。它的思想和MapReduce框架思想有些相似,都是将任务拆分为小任务,然后再聚合各个结果。Fork/Join框架更像是MapReduce框架的单机版。 实现一个Fork/Join模型的过程是:向ForkJoinPool类提交一个ForkJoinTask任务,ForkJoinTask任务支持fork()方法将任务进行分解和join()方法等待小任务的执行完成。ForkJoinTask主要有两个子类:1、RecursiveActive,一个没有返回任务值的任务;2、RecursiveTask,一个带有返回值的任务。 Fork/Join框架的优势是:1、将大任务拆分成小任务,充分利用线程优势;2、线程A执行完成后如果线程B还有任务未执行完成,线程A会协助线程B一起执行任务(其原理是:线程执行自己的任务时从任务顶部开始拿数据,而执行其它线程任务时从底部开始拿数据这样避免冲突)。 # 改造 我的改造选用RecursiveTask实现一个带有返回值的DownloadThread2类。 ```java @Override protected Map compute() { Map resultMap = null; try { URL url = new URL(urlString); HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection(); httpURLConnection.setRequestMethod("GET"); if(number == 1) { download(httpURLConnection); } else { resultMap = devide(httpURLConnection); } } catch (MalformedURLException e) { e.printStackTrace(); } catch (ProtocolException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } return resultMap; } ``` 继承RecursiveTask类需要实现compute()方法。这个方法是任务计算的主要方法。本例中number个数不为1时执行将任务拆分成小任务,否则执行下载。 拆分成小任务的方法如下: ```java //分解成小任务 private Map devide(HttpURLConnection httpURLConnection) { Map resultMap = new HashMap(); String contentLengthStr = httpURLConnection.getHeaderField("Content-Length"); //获取下载文件的大小 long contentLength = Long.parseLong(contentLengthStr); if(contentLength > 1024*1024) { System.out.println("Content-Length\t" + (contentLength/1024/1024) + " MB"); } else if(contentLength > 1024) { System.out.println("Content-Length\t" + (contentLength/1024) + " KB"); } else { System.out.println("Content-Length\t" + contentLength + " B"); } ArrayList tasks = new ArrayList(); long tempLength = contentLength / number; //对任务进行拆分 for(int i = 0; i < number; ++i) { start = end + 1; end = end + tempLength; if(i == number -1) { end = contentLength; } System.out.println("start:\t" + start + "\tend:\t" + end); DownloadThread2 downloadThread2 = new DownloadThread2(1, start, end, urlString, fileName); tasks.add(downloadThread2); downloadThread2.fork(); //创建子任务 } for(int i = 0;i < tasks.size(); i++) { DownloadThread2 downloadThread2 = tasks.get(i); downloadThread2.join(); //等待子任务完成 resultMap.put(i, downloadThread2.downloadTemp); } return resultMap; } ``` 上面方法实现的内容是:首先获取要下载文件的大小,再根据传入的number值将任务分解成number个,先获取每一个子任务要下载的文件的区域,然后使用fork方法创建子任务;创建完子任务后等待子任务完成后,将结果记录在resultMap中并返回,在主函数中将下载的各个小文件拼成一个完整的文件。 主函数实现如下: ```java public static void main(String[] args) { long startTimestamp = System.currentTimeMillis(); if(args == null || args.length < 2) { System.out.println("please input the file url and save load."); return; } final String urlString = args[0]; path = args[1]; //指定线程数 int number = 5; if(args.length >= 3) { number = Integer.parseInt(args[2]); } System.out.println("Download start, url is \"" + urlString +"\""); if(!"".equals(path)) { prefix = path + "/" + prefix ; } ForkJoinPool pool = new ForkJoinPool(); try { DownloadThread2 downloadThread2 = new DownloadThread2(number, 0L, -1L, urlString, prefix); ForkJoinTask> result = pool.submit(downloadThread2); //提交任务 Map resultMap = result.get(); System.out.println(); String filename = urlString.substring(urlString.lastIndexOf("/") + 1); if(!"".equals(path)) { filename = path + "/" + filename; } //合并文件 RandomAccessFile resultFile = new RandomAccessFile(filename, "rw"); for(int i = 0; i < number; i++) { DownloadTemp temp = resultMap.get(i); RandomAccessFile tempFile = new RandomAccessFile(temp.getFileName(), "r"); tempFile.getChannel().transferTo(0, tempFile.length(), resultFile.getChannel()); tempFile.close(); } resultFile.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { pool.shutdown(); } long completedTimestamp = System.currentTimeMillis(); System.out.println(); System.out.println("Cost " + (completedTimestamp - startTimestamp)/1000 + " s"); } ``` 主函数主要实现三个功能:1、对传入的参数进行判断;2、建立ForkJoinPool线程池,实现一个下载任务,通过submit()方法将这个任务提交线程池,线程池会返回一个携带结果的任务通过get方法获取前面一个方法的结果resultMap。3、reultMap带有所有下载的文件的信息,根据这些信息将这些文件合并成一个文件,就是最终我们要下载的文件。 # 写在最后 文章和代码之中难免有些不合理或错误的地方,欢迎批评指教。本文代码见: https://gitee.com/extraU/downloadThread