From 90cddd59ef6729998be96de2f79aceb502b21b0a Mon Sep 17 00:00:00 2001 From: wuweifeng10 Date: Thu, 11 Nov 2021 11:01:09 +0800 Subject: [PATCH 1/7] Merge branch 'V1.4' # Conflicts: # src/main/java/com/jd/platform/async/executor/Async.java # src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java --- src/main/java/com/jd/platform/async/executor/Async.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/jd/platform/async/executor/Async.java b/src/main/java/com/jd/platform/async/executor/Async.java index b4dba22..b5e1b5f 100644 --- a/src/main/java/com/jd/platform/async/executor/Async.java +++ b/src/main/java/com/jd/platform/async/executor/Async.java @@ -15,11 +15,10 @@ import java.util.stream.Collectors; * @version 1.0 */ public class Async { - private static final ThreadPoolExecutor COMMON_POOL = - new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, 1024, - 15L, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - (ThreadFactory) Thread::new); + /** + * 默认不定长线程池 + */ + private static final ThreadPoolExecutor COMMON_POOL = (ThreadPoolExecutor) Executors.newCachedThreadPool(); /** * 注意,这里是个static,也就是只能有一个线程池。用户自定义线程池时,也只能定义一个 */ -- Gitee From 38ee76c0c9781cdfa264cfc6688fde3e9a5b49d9 Mon Sep 17 00:00:00 2001 From: wuweifeng10 Date: Thu, 11 Nov 2021 11:02:02 +0800 Subject: [PATCH 2/7] 1 --- src/test/java/parallel/ParWorker5.java | 53 ++++++++++++++++++++++++++ src/test/java/parallel/ParWorker6.java | 53 ++++++++++++++++++++++++++ src/test/java/parallel/ParWorker7.java | 53 ++++++++++++++++++++++++++ 3 files changed, 159 insertions(+) create mode 100755 src/test/java/parallel/ParWorker5.java create mode 100755 src/test/java/parallel/ParWorker6.java create mode 100755 src/test/java/parallel/ParWorker7.java diff --git a/src/test/java/parallel/ParWorker5.java b/src/test/java/parallel/ParWorker5.java new file mode 100755 index 0000000..262e862 --- /dev/null +++ b/src/test/java/parallel/ParWorker5.java @@ -0,0 +1,53 @@ +package parallel; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class ParWorker5 implements IWorker, ICallback { + private long sleepTime = 1000; + + public void setSleepTime(long sleepTime) { + this.sleepTime = sleepTime; + } + @Override + public String action(String object, Map allWrappers) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "result = " + SystemClock.now() + "---param = " + object + " from 3"; + } + + + @Override + public String defaultValue() { + return "worker3--default"; + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + if (success) { + System.out.println("callback worker3 success--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } else { + System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } + } + +} diff --git a/src/test/java/parallel/ParWorker6.java b/src/test/java/parallel/ParWorker6.java new file mode 100755 index 0000000..4aa6c71 --- /dev/null +++ b/src/test/java/parallel/ParWorker6.java @@ -0,0 +1,53 @@ +package parallel; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class ParWorker6 implements IWorker, ICallback { + private long sleepTime = 1000; + + public void setSleepTime(long sleepTime) { + this.sleepTime = sleepTime; + } + @Override + public String action(String object, Map allWrappers) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "result = " + SystemClock.now() + "---param = " + object + " from 3"; + } + + + @Override + public String defaultValue() { + return "worker3--default"; + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + if (success) { + System.out.println("callback worker3 success--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } else { + System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } + } + +} diff --git a/src/test/java/parallel/ParWorker7.java b/src/test/java/parallel/ParWorker7.java new file mode 100755 index 0000000..2157995 --- /dev/null +++ b/src/test/java/parallel/ParWorker7.java @@ -0,0 +1,53 @@ +package parallel; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class ParWorker7 implements IWorker, ICallback { + private long sleepTime = 1000; + + public void setSleepTime(long sleepTime) { + this.sleepTime = sleepTime; + } + @Override + public String action(String object, Map allWrappers) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "result = " + SystemClock.now() + "---param = " + object + " from 3"; + } + + + @Override + public String defaultValue() { + return "worker3--default"; + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + if (success) { + System.out.println("callback worker3 success--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } else { + System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } + } + +} -- Gitee From f4721761e5ade390597141233f0f32b8bd6d379e Mon Sep 17 00:00:00 2001 From: kyle <573984425@qq.com> Date: Fri, 11 Mar 2022 09:55:47 +0800 Subject: [PATCH 3/7] =?UTF-8?q?refactor:=20=E8=B0=83=E6=95=B4=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/beforev14/parallel/ParWorker5.java | 13 +- .../java/beforev14/parallel/ParWorker6.java | 12 +- .../java/beforev14/parallel/ParWorker7.java | 12 +- .../src/test/java/v15/cases/Case1.java | 8 +- .../com/jd/platform/async/executor/Async.java | 157 ------------------ 5 files changed, 26 insertions(+), 176 deletions(-) delete mode 100644 src/main/java/com/jd/platform/async/executor/Async.java diff --git a/asyncTool-core/src/test/java/beforev14/parallel/ParWorker5.java b/asyncTool-core/src/test/java/beforev14/parallel/ParWorker5.java index 262e862..1a2670e 100755 --- a/asyncTool-core/src/test/java/beforev14/parallel/ParWorker5.java +++ b/asyncTool-core/src/test/java/beforev14/parallel/ParWorker5.java @@ -1,4 +1,4 @@ -package parallel; +package beforev14.parallel; import com.jd.platform.async.callback.ICallback; @@ -13,13 +13,15 @@ import java.util.Map; * @author wuweifeng wrote on 2019-11-20. */ public class ParWorker5 implements IWorker, ICallback { + private long sleepTime = 1000; public void setSleepTime(long sleepTime) { this.sleepTime = sleepTime; } + @Override - public String action(String object, Map allWrappers) { + public String action(String object, Map> allWrappers) { try { Thread.sleep(sleepTime); } catch (InterruptedException e) { @@ -28,7 +30,6 @@ public class ParWorker5 implements IWorker, ICallback, ICallback workResult) { if (success) { System.out.println("callback worker3 success--" + SystemClock.now() + "----" + workResult.getResult() - + "-threadName:" +Thread.currentThread().getName()); + + "-threadName:" + Thread.currentThread().getName()); } else { - System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult() - + "-threadName:" +Thread.currentThread().getName()); + System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" + Thread.currentThread().getName()); } } diff --git a/asyncTool-core/src/test/java/beforev14/parallel/ParWorker6.java b/asyncTool-core/src/test/java/beforev14/parallel/ParWorker6.java index 4aa6c71..8da8e2f 100755 --- a/asyncTool-core/src/test/java/beforev14/parallel/ParWorker6.java +++ b/asyncTool-core/src/test/java/beforev14/parallel/ParWorker6.java @@ -1,4 +1,4 @@ -package parallel; +package beforev14.parallel; import com.jd.platform.async.callback.ICallback; @@ -13,13 +13,15 @@ import java.util.Map; * @author wuweifeng wrote on 2019-11-20. */ public class ParWorker6 implements IWorker, ICallback { + private long sleepTime = 1000; public void setSleepTime(long sleepTime) { this.sleepTime = sleepTime; } + @Override - public String action(String object, Map allWrappers) { + public String action(String object, Map> allWrappers) { try { Thread.sleep(sleepTime); } catch (InterruptedException e) { @@ -43,10 +45,10 @@ public class ParWorker6 implements IWorker, ICallback workResult) { if (success) { System.out.println("callback worker3 success--" + SystemClock.now() + "----" + workResult.getResult() - + "-threadName:" +Thread.currentThread().getName()); + + "-threadName:" + Thread.currentThread().getName()); } else { - System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult() - + "-threadName:" +Thread.currentThread().getName()); + System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" + Thread.currentThread().getName()); } } diff --git a/asyncTool-core/src/test/java/beforev14/parallel/ParWorker7.java b/asyncTool-core/src/test/java/beforev14/parallel/ParWorker7.java index 2157995..1d3810d 100755 --- a/asyncTool-core/src/test/java/beforev14/parallel/ParWorker7.java +++ b/asyncTool-core/src/test/java/beforev14/parallel/ParWorker7.java @@ -1,4 +1,4 @@ -package parallel; +package beforev14.parallel; import com.jd.platform.async.callback.ICallback; @@ -13,13 +13,15 @@ import java.util.Map; * @author wuweifeng wrote on 2019-11-20. */ public class ParWorker7 implements IWorker, ICallback { + private long sleepTime = 1000; public void setSleepTime(long sleepTime) { this.sleepTime = sleepTime; } + @Override - public String action(String object, Map allWrappers) { + public String action(String object, Map> allWrappers) { try { Thread.sleep(sleepTime); } catch (InterruptedException e) { @@ -43,10 +45,10 @@ public class ParWorker7 implements IWorker, ICallback workResult) { if (success) { System.out.println("callback worker3 success--" + SystemClock.now() + "----" + workResult.getResult() - + "-threadName:" +Thread.currentThread().getName()); + + "-threadName:" + Thread.currentThread().getName()); } else { - System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult() - + "-threadName:" +Thread.currentThread().getName()); + System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" + Thread.currentThread().getName()); } } diff --git a/asyncTool-core/src/test/java/v15/cases/Case1.java b/asyncTool-core/src/test/java/v15/cases/Case1.java index c0a1b74..d8126f7 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case1.java +++ b/asyncTool-core/src/test/java/v15/cases/Case1.java @@ -4,21 +4,22 @@ import com.jd.platform.async.executor.Async; import com.jd.platform.async.wrapper.WorkerWrapper; import com.jd.platform.async.wrapper.WorkerWrapperBuilder; -import java.util.concurrent.ExecutionException; - /** * 示例:简单示例--复杂点的 * * @author create by TcSnZh on 2021/5/8-下午10:29 */ class Case1 { + private static WorkerWrapperBuilder builder(String id) { return WorkerWrapper.builder() .id(id) .worker((param, allWrappers) -> { System.out.println("wrapper(id=" + id + ") is working"); try { - Thread.sleep(50); + if (!"F".equals(id)) { + Thread.sleep(50); + } } catch (InterruptedException e) { e.printStackTrace(); } @@ -58,5 +59,6 @@ class Case1 { wrapper(id=H) is working */ } + } diff --git a/src/main/java/com/jd/platform/async/executor/Async.java b/src/main/java/com/jd/platform/async/executor/Async.java deleted file mode 100644 index f88d50b..0000000 --- a/src/main/java/com/jd/platform/async/executor/Async.java +++ /dev/null @@ -1,157 +0,0 @@ -package com.jd.platform.async.executor; - - -import com.jd.platform.async.callback.DefaultGroupCallback; -import com.jd.platform.async.callback.IGroupCallback; -import com.jd.platform.async.wrapper.WorkerWrapper; - -import java.util.*; -import java.util.concurrent.*; -import java.util.stream.Collectors; - -/** - * 类入口,可以根据自己情况调整core线程的数量 - * @author wuweifeng wrote on 2019-12-18 - * @version 1.0 - */ -public class Async { - /** - * 默认不定长线程池 - */ - private static final ThreadPoolExecutor COMMON_POOL = (ThreadPoolExecutor) Executors.newCachedThreadPool(); - /** - * 注意,这里是个static,也就是只能有一个线程池。用户自定义线程池时,也只能定义一个 - */ - private static ExecutorService executorService; - - /** - * 出发点 - */ - public static boolean beginWork(long timeout, ExecutorService executorService, List workerWrappers) throws ExecutionException, InterruptedException { - if(workerWrappers == null || workerWrappers.size() == 0) { - return false; - } - //保存线程池变量 - Async.executorService = executorService; - //定义一个map,存放所有的wrapper,key为wrapper的唯一id,value是该wrapper,可以从value中获取wrapper的result - Map forParamUseWrappers = new ConcurrentHashMap<>(); - CompletableFuture[] futures = new CompletableFuture[workerWrappers.size()]; - for (int i = 0; i < workerWrappers.size(); i++) { - WorkerWrapper wrapper = workerWrappers.get(i); - futures[i] = CompletableFuture.runAsync(() -> wrapper.work(executorService, timeout, forParamUseWrappers), executorService); - } - try { - CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS); - return true; - } catch (TimeoutException e) { - Set set = new HashSet<>(); - totalWorkers(workerWrappers, set); - for (WorkerWrapper wrapper : set) { - wrapper.stopNow(); - } - return false; - } - } - - /** - * 如果想自定义线程池,请传pool。不自定义的话,就走默认的COMMON_POOL - */ - public static boolean beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException { - if(workerWrapper == null || workerWrapper.length == 0) { - return false; - } - List workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toList()); - return beginWork(timeout, executorService, workerWrappers); - } - - /** - * 同步阻塞,直到所有都完成,或失败 - */ - public static boolean beginWork(long timeout, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException { - return beginWork(timeout, COMMON_POOL, workerWrapper); - } - - public static void beginWorkAsync(long timeout, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) { - beginWorkAsync(timeout, COMMON_POOL, groupCallback, workerWrapper); - } - - /** - * 异步执行,直到所有都完成,或失败后,发起回调 - */ - public static void beginWorkAsync(long timeout, ExecutorService executorService, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) { - if (groupCallback == null) { - groupCallback = new DefaultGroupCallback(); - } - IGroupCallback finalGroupCallback = groupCallback; - if (executorService != null) { - executorService.submit(() -> { - try { - boolean success = beginWork(timeout, executorService, workerWrapper); - if (success) { - finalGroupCallback.success(Arrays.asList(workerWrapper)); - } else { - finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException()); - } - } catch (ExecutionException | InterruptedException e) { - e.printStackTrace(); - finalGroupCallback.failure(Arrays.asList(workerWrapper), e); - } - }); - } else { - COMMON_POOL.submit(() -> { - try { - boolean success = beginWork(timeout, COMMON_POOL, workerWrapper); - if (success) { - finalGroupCallback.success(Arrays.asList(workerWrapper)); - } else { - finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException()); - } - } catch (ExecutionException | InterruptedException e) { - e.printStackTrace(); - finalGroupCallback.failure(Arrays.asList(workerWrapper), e); - } - }); - } - - } - - /** - * 总共多少个执行单元 - */ - @SuppressWarnings("unchecked") - private static void totalWorkers(List workerWrappers, Set set) { - set.addAll(workerWrappers); - for (WorkerWrapper wrapper : workerWrappers) { - if (wrapper.getNextWrappers() == null) { - continue; - } - List wrappers = wrapper.getNextWrappers(); - totalWorkers(wrappers, set); - } - - } - - /** - * 关闭线程池 - */ - public static void shutDown() { - shutDown(executorService); - } - - /** - * 关闭线程池 - */ - public static void shutDown(ExecutorService executorService) { - if (executorService != null) { - executorService.shutdown(); - } else { - COMMON_POOL.shutdown(); - } - } - - public static String getThreadCount() { - return "activeCount=" + COMMON_POOL.getActiveCount() + - " completedCount " + COMMON_POOL.getCompletedTaskCount() + - " largestCount " + COMMON_POOL.getLargestPoolSize(); - } -} -- Gitee From 3b5a50c5bb8a919bafc7323cb35e8a4bff164a44 Mon Sep 17 00:00:00 2001 From: kyle <573984425@qq.com> Date: Sat, 12 Mar 2022 17:08:00 +0800 Subject: [PATCH 4/7] =?UTF-8?q?docs:=20ALL=5FDEPENDENCIES=5FALL=5FSUCCESS?= =?UTF-8?q?=E7=AE=97=E6=B3=95=E4=B8=8A=E6=9C=89=E7=BC=BA=E9=99=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- QuickStart.md | 2 +- .../com/jd/platform/async/wrapper/WorkerWrapper.java | 1 + .../wrapper/strategy/depend/DependenceStrategy.java | 9 +++++++++ asyncTool-core/src/test/java/v15/cases/Case1.java | 6 +++--- 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/QuickStart.md b/QuickStart.md index 9f997ca..cd2bd34 100644 --- a/QuickStart.md +++ b/QuickStart.md @@ -1,4 +1,4 @@ -如果只是需要用这个框架,请往下看即可。如果需要深入了解这个框架是如何一步一步实现的,从接到需求,到每一步的思考,每个类为什么这么设计,为什么有这些方法,也就是如何从0到1开发出这个框架,作者在[csdn开了专栏](https://blog.csdn.net/tianyaleixiaowu/category_9637010.html)专门讲中间件如何从0开发,包括并不限于这个小框架。京东内部同事可在cf上搜索erp也能看到。 +如果只是需要用这个框架,请往下看即可。如果需要深入了解这个框架是如何一步一步实现的,从接到需求,到每一步的思考,每个类为什么这么设计,为什么有这些方法,也就是如何从0到1开发出这个框架,作者在[csdn开了专栏](https://blog.csdn.net/tianyaleixiaowu/category_9637010.html) 专门讲中间件如何从0开发,包括并不限于这个小框架。京东内部同事可在cf上搜索erp也能看到。 # 安装教程 diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java index 56f83b6..1d09816 100755 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java @@ -362,6 +362,7 @@ public abstract class WorkerWrapper { wrapperStrategy.judgeAction(getDependWrappers(), this, fromWrapper); switch (judge.getDependenceAction()) { case TAKE_REST: + System.out.println("TAKE_REST\t"+id+"\t"+fromWrapper.id); return; case FAST_FAIL: if (setState(state, STARTED, ERROR)) { diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/depend/DependenceStrategy.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/depend/DependenceStrategy.java index a075369..0b2c3df 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/depend/DependenceStrategy.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/depend/DependenceStrategy.java @@ -80,6 +80,14 @@ public interface DependenceStrategy { * 被依赖的所有Wrapper都必须成功才能开始工作。 * 如果其中任一Wrapper还没有执行且不存在失败,则休息。 * 如果其中任一Wrapper失败则立即失败。 + * + * FIXME + * 这里有个问题, + * 假设任务A依赖B、C + * + * B执行时间比较长,A-B的线程和A-C的线程都检测到B的res==null(DEFAULT), + * 那么线程A就真的去休眠(TAKE_REST)而没有发起, + * 导致整个任务长时间无法结束 */ DependenceStrategy ALL_DEPENDENCIES_ALL_SUCCESS = new DependenceStrategy() { @Override @@ -103,6 +111,7 @@ public interface DependenceStrategy { } } if (hasWaiting) { + System.out.println(Thread.currentThread().getName()+"\thasWaiting\t"+thisWrapper.getId()+"\t"+fromWrapper.getId()); return DependenceAction.TAKE_REST.emptyProperty(); } return DependenceAction.START_WORK.emptyProperty(); diff --git a/asyncTool-core/src/test/java/v15/cases/Case1.java b/asyncTool-core/src/test/java/v15/cases/Case1.java index d8126f7..44547e3 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case1.java +++ b/asyncTool-core/src/test/java/v15/cases/Case1.java @@ -17,13 +17,13 @@ class Case1 { .worker((param, allWrappers) -> { System.out.println("wrapper(id=" + id + ") is working"); try { - if (!"F".equals(id)) { + if ("F".equals(id)) { Thread.sleep(50); } } catch (InterruptedException e) { e.printStackTrace(); } - return null; + return id; }); } @@ -44,7 +44,7 @@ class Case1 { ) .build(); try { - Async.work(1000, a, d).awaitFinish(); + Async.work(10000, a, d).awaitFinish(); } catch (InterruptedException e) { e.printStackTrace(); } -- Gitee From db3a162d1a1a5c911a9cee6b2ff3a28e5eda196d Mon Sep 17 00:00:00 2001 From: kyle <573984425@qq.com> Date: Fri, 18 Mar 2022 16:20:45 +0800 Subject: [PATCH 5/7] =?UTF-8?q?fix=EF=BC=9A=E6=9C=80=E5=90=8E=E4=B8=80?= =?UTF-8?q?=E4=B8=AA=E4=BB=BB=E5=8A=A1=E4=B8=8D=E8=83=BD=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/jd/platform/async/wrapper/WorkerWrapper.java | 4 +++- .../wrapper/strategy/depend/DependenceStrategy.java | 2 -- asyncTool-core/src/test/java/v15/cases/Case1.java | 9 ++++++--- .../platform/async/openutil/timer/HashedWheelTimer.java | 1 + 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java index 1d09816..b4d92c7 100755 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java @@ -362,7 +362,9 @@ public abstract class WorkerWrapper { wrapperStrategy.judgeAction(getDependWrappers(), this, fromWrapper); switch (judge.getDependenceAction()) { case TAKE_REST: - System.out.println("TAKE_REST\t"+id+"\t"+fromWrapper.id); + //FIXME 等待200毫秒重新投入线程池,主要为了调起最后一个任务 + Thread.sleep(200L); + executorService.submit(() -> this.work(executorService, fromWrapper, remainTime, group)); return; case FAST_FAIL: if (setState(state, STARTED, ERROR)) { diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/depend/DependenceStrategy.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/depend/DependenceStrategy.java index 0b2c3df..e45aedc 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/depend/DependenceStrategy.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/depend/DependenceStrategy.java @@ -1,6 +1,5 @@ package com.jd.platform.async.wrapper.strategy.depend; -import com.jd.platform.async.exception.SkippedException; import com.jd.platform.async.worker.ResultState; import com.jd.platform.async.worker.WorkResult; import com.jd.platform.async.wrapper.WorkerWrapper; @@ -111,7 +110,6 @@ public interface DependenceStrategy { } } if (hasWaiting) { - System.out.println(Thread.currentThread().getName()+"\thasWaiting\t"+thisWrapper.getId()+"\t"+fromWrapper.getId()); return DependenceAction.TAKE_REST.emptyProperty(); } return DependenceAction.START_WORK.emptyProperty(); diff --git a/asyncTool-core/src/test/java/v15/cases/Case1.java b/asyncTool-core/src/test/java/v15/cases/Case1.java index 44547e3..394353d 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case1.java +++ b/asyncTool-core/src/test/java/v15/cases/Case1.java @@ -15,14 +15,17 @@ class Case1 { return WorkerWrapper.builder() .id(id) .worker((param, allWrappers) -> { - System.out.println("wrapper(id=" + id + ") is working"); try { if ("F".equals(id)) { - Thread.sleep(50); + System.out.println("wrapper(id=" + id + ") is working"); + Thread.sleep(5000); } } catch (InterruptedException e) { e.printStackTrace(); } + if ("F".equals(id)) { + System.out.println("wrapper(id=" + id + ") is worki444ng"); + } return id; }); } @@ -44,7 +47,7 @@ class Case1 { ) .build(); try { - Async.work(10000, a, d).awaitFinish(); + Async.work(1000, a, d).awaitFinish(); } catch (InterruptedException e) { e.printStackTrace(); } diff --git a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/HashedWheelTimer.java b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/HashedWheelTimer.java index 90df6e9..5492b72 100644 --- a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/HashedWheelTimer.java +++ b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/HashedWheelTimer.java @@ -315,6 +315,7 @@ public class HashedWheelTimer extends AbstractWheelTimer { startTimeInitialized.countDown(); do { + //TODO 时间轮这里结束不了任务 final long deadline = waitForNextTick(); if (deadline > 0) { int idx = (int) (tick & mask); -- Gitee From bf72136ab9e74292d72044269593e4935d583dd4 Mon Sep 17 00:00:00 2001 From: kyle <573984425@qq.com> Date: Sat, 19 Mar 2022 10:22:03 +0800 Subject: [PATCH 6/7] =?UTF-8?q?refactor:=20=E8=B0=83=E6=95=B4=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- asyncTool-core/src/test/java/v15/cases/Case1.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/asyncTool-core/src/test/java/v15/cases/Case1.java b/asyncTool-core/src/test/java/v15/cases/Case1.java index 394353d..d917c6d 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case1.java +++ b/asyncTool-core/src/test/java/v15/cases/Case1.java @@ -19,13 +19,12 @@ class Case1 { if ("F".equals(id)) { System.out.println("wrapper(id=" + id + ") is working"); Thread.sleep(5000); + }else { + System.out.println("wrapper(id=" + id + ") is worki444ng"); } } catch (InterruptedException e) { e.printStackTrace(); } - if ("F".equals(id)) { - System.out.println("wrapper(id=" + id + ") is worki444ng"); - } return id; }); } -- Gitee From 3b357d7140ee7e877d8b5b2bbae13cd604fe93dd Mon Sep 17 00:00:00 2001 From: kyle <573984425@qq.com> Date: Sat, 19 Mar 2022 10:46:53 +0800 Subject: [PATCH 7/7] =?UTF-8?q?fix:=20=E8=B0=83=E6=95=B4=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=EF=BC=8C=E5=89=A9=E4=BD=99=E6=97=B6=E9=97=B4=E8=A6=81=E9=87=8D?= =?UTF-8?q?=E6=96=B0=E8=AE=A1=E7=AE=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/jd/platform/async/wrapper/WorkerWrapper.java | 2 +- asyncTool-core/src/test/java/v15/cases/Case1.java | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java index b4d92c7..42785b5 100755 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java @@ -364,7 +364,7 @@ public abstract class WorkerWrapper { case TAKE_REST: //FIXME 等待200毫秒重新投入线程池,主要为了调起最后一个任务 Thread.sleep(200L); - executorService.submit(() -> this.work(executorService, fromWrapper, remainTime, group)); + executorService.submit(() -> this.work(executorService, fromWrapper, remainTime-(SystemClock.now()-now), group)); return; case FAST_FAIL: if (setState(state, STARTED, ERROR)) { diff --git a/asyncTool-core/src/test/java/v15/cases/Case1.java b/asyncTool-core/src/test/java/v15/cases/Case1.java index d917c6d..29059cb 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case1.java +++ b/asyncTool-core/src/test/java/v15/cases/Case1.java @@ -1,6 +1,7 @@ package v15.cases; import com.jd.platform.async.executor.Async; +import com.jd.platform.async.executor.timer.SystemClock; import com.jd.platform.async.wrapper.WorkerWrapper; import com.jd.platform.async.wrapper.WorkerWrapperBuilder; @@ -18,8 +19,8 @@ class Case1 { try { if ("F".equals(id)) { System.out.println("wrapper(id=" + id + ") is working"); - Thread.sleep(5000); - }else { + Thread.sleep(12000); + } else { System.out.println("wrapper(id=" + id + ") is worki444ng"); } } catch (InterruptedException e) { @@ -30,6 +31,7 @@ class Case1 { } public static void main(String[] args) { + long now = SystemClock.now(); WorkerWrapper a = builder("A").build(); WorkerWrapper d; builder("H") @@ -46,10 +48,11 @@ class Case1 { ) .build(); try { - Async.work(1000, a, d).awaitFinish(); + Async.work(10000, a, d).awaitFinish(); } catch (InterruptedException e) { e.printStackTrace(); } + System.out.println("now:" + (SystemClock.now() - now)); /* 输出: wrapper(id=D) is working wrapper(id=A) is working -- Gitee