diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java index bc07acc55e821a115350fde729fd423105158277..db522b5069cee1bf144ec06c80343e48175e8f0c 100755 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java @@ -20,6 +20,7 @@ import com.jd.platform.async.wrapper.strategy.skip.SkipStrategy; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -264,16 +265,16 @@ public abstract class WorkerWrapper { } } }; - final Runnable __function__callbackResultOfFalse_beginNext = - () -> { - __function__callbackResult.accept(false); + final Consumer __function__callbackResultOfFalse_beginNext = + (success) -> { + __function__callbackResult.accept(success); beginNext(executorService, now, remainTime, group); }; final BiConsumer __function__fastFail_callbackResult$false_beginNext = (fastFail_isTimeout, fastFail_exception) -> { boolean isEndsNormally = fastFail_exception instanceof EndsNormallyException; fastFail(fastFail_isTimeout && !isEndsNormally, fastFail_exception, isEndsNormally); - __function__callbackResultOfFalse_beginNext.run(); + __function__callbackResultOfFalse_beginNext.accept(false); }; final Runnable __function__doWork = () -> { @@ -281,8 +282,12 @@ public abstract class WorkerWrapper { try { if (fire(group)) { if (setState(state, WORKING, AFTER_WORK)) { - __function__callbackResult.accept(true); - beginNext(executorService, now, remainTime, group); + __function__callbackResultOfFalse_beginNext.accept(true); + } + }else { + //如果任务超时,需要将最后那个超时任务设置为超时异常结束的 + if (setState(state, WORKING, ERROR)) { + __function__fastFail_callbackResult$false_beginNext.accept(true, new TimeoutException()); } } } catch (Exception e) { @@ -352,7 +357,6 @@ public abstract class WorkerWrapper { case TAKE_REST: //FIXME 等待200毫秒重新投入线程池,主要为了调起最后一个任务 Thread.sleep(200L); - System.out.println(id+"进入休息"); executorService.submit(() -> this.work(executorService, fromWrapper, remainTime - (SystemClock.now() - now), group)); return; diff --git a/asyncTool-core/src/test/java/v15/cases/Case1.java b/asyncTool-core/src/test/java/v15/cases/Case1.java index 5711ca14c8085fe4dbacb626c4e70694f79a2563..d737d4e44923287d8370b11ddb354dcff9a96806 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case1.java +++ b/asyncTool-core/src/test/java/v15/cases/Case1.java @@ -21,7 +21,7 @@ class Case1 { try { if ("F".equals(id)) { System.out.println("wrapper(id=" + id + ") is working"); - Thread.sleep(100); + Thread.sleep(2000); } else { System.out.println("wrapper(id=" + id + ") is worki444ng"); } @@ -63,7 +63,7 @@ class Case1 { ) .build(); try { - Async.work(5000, a, d).awaitFinish(); + Async.work(1000, a, d).awaitFinish(); } catch (InterruptedException e) { e.printStackTrace(); } diff --git a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/HashedWheelTimer.java b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/HashedWheelTimer.java index 5492b726d3a9d155a741125e9ccc9f4c00a9189e..fbb6d1dc311719acb2d8579122207497b8dbe7f1 100644 --- a/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/HashedWheelTimer.java +++ b/asyncTool-openutil/src/main/java/com/jd/platform/async/openutil/timer/HashedWheelTimer.java @@ -315,7 +315,7 @@ public class HashedWheelTimer extends AbstractWheelTimer { startTimeInitialized.countDown(); do { - //TODO 时间轮这里结束不了任务 + //TODO 时间轮这里一直执行,结束不了任务 final long deadline = waitForNextTick(); if (deadline > 0) { int idx = (int) (tick & mask);