From 105aebb7e0834b2bfaf94653b4a6b8483efb2349 Mon Sep 17 00:00:00 2001 From: TcSnZh Date: Sun, 6 Jun 2021 01:33:22 +0800 Subject: [PATCH] =?UTF-8?q?1.=E4=BF=AE=E5=A4=8D=E4=BA=86OnceWork.AsFuture.?= =?UTF-8?q?toString()=E6=96=B9=E6=B3=95=E5=BE=AA=E7=8E=AF=E8=B0=83?= =?UTF-8?q?=E7=94=A8=E8=87=AA=E8=BA=AB=E9=97=AE=E9=A2=98=E3=80=82=202.?= =?UTF-8?q?=E6=9B=B4=E6=96=B0=E4=BA=86=E6=96=87=E6=A1=A3=E6=B3=A8=E9=87=8A?= =?UTF-8?q?=E3=80=81=E4=B8=BA=E4=B8=80=E4=BA=9B=E5=BA=9F=E5=BC=83=E7=9A=84?= =?UTF-8?q?=E7=B1=BB=E3=80=81=E5=AD=97=E6=AE=B5=E3=80=81=E6=96=B9=E6=B3=95?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E4=BA=86@Deprecated=E6=B3=A8=E8=A7=A3=203.?= =?UTF-8?q?=E5=B0=86=E6=AD=A3=E5=B8=B8=E7=BB=93=E6=9D=9F=EF=BC=88=E5=8F=96?= =?UTF-8?q?=E6=B6=88=E3=80=81=E8=B7=B3=E8=BF=87=EF=BC=89=E7=9A=84=E5=BC=82?= =?UTF-8?q?=E5=B8=B8=E6=8A=BD=E5=8F=96=E4=B8=BA=E6=8A=BD=E8=B1=A1=E7=B1=BB?= =?UTF-8?q?=EF=BC=8C=E5=B9=B6=E4=BF=AE=E6=94=B9=E4=BA=86=E4=B9=8B=E5=89=8D?= =?UTF-8?q?=E5=BC=82=E5=B8=B8=E5=8C=85=E7=9A=84=E4=B8=8D=E5=90=88=E7=90=86?= =?UTF-8?q?=E7=9A=84=E7=BB=A7=E6=89=BF=E9=80=BB=E8=BE=91=E3=80=82=204.?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BA=86=E5=85=B6=E4=BB=96=E7=9A=84=E4=B8=80?= =?UTF-8?q?=E4=BA=9B=E5=B0=8F=E7=BB=86=E8=8A=82=E3=80=82=E4=BE=8B=E5=A6=82?= =?UTF-8?q?=EF=BC=9A=E7=AD=96=E7=95=A5=E7=B1=BB=E7=9A=84toString()?= =?UTF-8?q?=E6=96=B9=E6=B3=95=E3=80=81=E9=BB=98=E8=AE=A4=E5=9B=9E=E8=B0=83?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=B8=BA=E5=8D=95=E4=BE=8B=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../async/callback/DefaultCallback.java | 14 +- .../async/callback/DefaultGroupCallback.java | 7 +- .../jd/platform/async/callback/ICallback.java | 8 +- .../async/callback/IGroupCallback.java | 19 ++- .../async/callback/ITimeoutWorker.java | 8 ++ ...pedException.java => CancelException.java} | 10 +- .../exception/EndsNormallyException.java | 27 ++++ .../async/exception/SkippedException.java | 15 +-- .../com/jd/platform/async/executor/Async.java | 125 ++++++++++-------- .../jd/platform/async/worker/OnceWork.java | 20 ++- .../jd/platform/async/worker/WorkResult.java | 1 + .../wrapper/StableWorkerWrapperBuilder.java | 1 + .../platform/async/wrapper/WorkerWrapper.java | 31 +++-- .../async/wrapper/WorkerWrapperBuilder.java | 8 +- .../async/wrapper/WorkerWrapperGroup.java | 1 - .../wrapper/strategy/WrapperStrategy.java | 35 ++++- .../depend/DependMustStrategyMapper.java | 1 + .../DependOnUpWrapperStrategyMapper.java | 25 ++-- .../src/test/java/v15/cases/Case9.java | 4 +- 19 files changed, 227 insertions(+), 133 deletions(-) rename asyncTool-core/src/main/java/com/jd/platform/async/exception/{CancelSkippedException.java => CancelException.java} (44%) create mode 100644 asyncTool-core/src/main/java/com/jd/platform/async/exception/EndsNormallyException.java diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/callback/DefaultCallback.java b/asyncTool-core/src/main/java/com/jd/platform/async/callback/DefaultCallback.java index e187359..2f65466 100755 --- a/asyncTool-core/src/main/java/com/jd/platform/async/callback/DefaultCallback.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/callback/DefaultCallback.java @@ -1,6 +1,5 @@ package com.jd.platform.async.callback; -import com.jd.platform.async.exception.SkippedException; import com.jd.platform.async.worker.WorkResult; /** @@ -9,9 +8,20 @@ import com.jd.platform.async.worker.WorkResult; * @author wuweifeng wrote on 2019-11-19. */ public class DefaultCallback implements ICallback { + private static final DefaultCallback instance = new DefaultCallback() { + @Override + public String toString() { + return "(DefaultCallback instance)"; + } + }; + + public static DefaultCallback getInstance() { + return instance; + } + @Override public void begin() { - + // do nothing } /** diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/callback/DefaultGroupCallback.java b/asyncTool-core/src/main/java/com/jd/platform/async/callback/DefaultGroupCallback.java index b9485d3..a47baf3 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/callback/DefaultGroupCallback.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/callback/DefaultGroupCallback.java @@ -7,15 +7,18 @@ import java.util.List; /** * @author wuweifeng wrote on 2019-12-27 * @version 1.0 + * @deprecated deprecate at version 1.5.1 , see {@link IGroupCallback} . */ +@SuppressWarnings("DeprecatedIsStillUsed") +@Deprecated public class DefaultGroupCallback implements IGroupCallback { @Override public void success(List workerWrappers) { - + // do nothing } @Override public void failure(List workerWrappers, Exception e) { - + // do nothing } } diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/callback/ICallback.java b/asyncTool-core/src/main/java/com/jd/platform/async/callback/ICallback.java index fe0b505..4f56009 100755 --- a/asyncTool-core/src/main/java/com/jd/platform/async/callback/ICallback.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/callback/ICallback.java @@ -1,6 +1,7 @@ package com.jd.platform.async.callback; +import com.jd.platform.async.exception.EndsNormallyException; import com.jd.platform.async.exception.SkippedException; import com.jd.platform.async.worker.WorkResult; @@ -27,13 +28,16 @@ public interface ICallback { void result(boolean success, T param, WorkResult workResult); /** - * 提供常量选项:打印异常信息,跳过时的异常{@link SkippedException}不会打印。 + * 提供常量选项: + *

+ * 如果发生了异常,则打印异常信息。 + * 正常结束(例如取消、跳过)的异常{@link com.jd.platform.async.exception.EndsNormallyException}不会打印。 */ ICallback PRINT_EXCEPTION_STACK_TRACE = new ICallback() { @Override public void result(boolean success, Object param, WorkResult workResult) { Exception ex = workResult.getEx(); - if (ex != null && !(ex instanceof SkippedException)) { + if (ex != null && !(ex instanceof EndsNormallyException)) { ex.printStackTrace(); } } diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/callback/IGroupCallback.java b/asyncTool-core/src/main/java/com/jd/platform/async/callback/IGroupCallback.java index 02880b4..4b4c81a 100755 --- a/asyncTool-core/src/main/java/com/jd/platform/async/callback/IGroupCallback.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/callback/IGroupCallback.java @@ -1,18 +1,35 @@ package com.jd.platform.async.callback; +import com.jd.platform.async.worker.OnceWork; import com.jd.platform.async.wrapper.WorkerWrapper; +import java.util.Collection; import java.util.List; +import java.util.concurrent.ExecutorService; /** - * 如果是异步执行整组的话,可以用这个组回调。不推荐使用 + * 如果是异步执行整组的话,可以用这个组回调。已经废弃 + * * @author wuweifeng wrote on 2019-11-19. + * @deprecated deprecate at version 1.5.1 + *

+ * please use {@link com.jd.platform.async.executor.Async#work(long, ExecutorService, Collection, String)}. + *

+ * 该方法返回的{@link OnceWork}句柄,默认不会同步等待结束, + * 这便替代了原先的 + * {@link com.jd.platform.async.executor.Async#beginWorkAsync(long, ExecutorService, IGroupCallback, WorkerWrapper[])} + *

+ * 需要同步等待的话调用{@link OnceWork#awaitFinish()}即可。 + *

*/ +@SuppressWarnings("DeprecatedIsStillUsed") +@Deprecated public interface IGroupCallback { /** * 成功后,可以从wrapper里去getWorkResult */ void success(List workerWrappers); + /** * 失败了,也可以从wrapper里去getWorkResult */ diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/callback/ITimeoutWorker.java b/asyncTool-core/src/main/java/com/jd/platform/async/callback/ITimeoutWorker.java index be5e7ec..4706d17 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/callback/ITimeoutWorker.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/callback/ITimeoutWorker.java @@ -2,11 +2,18 @@ package com.jd.platform.async.callback; /** * @author wuweifeng wrote on 2019-12-20 + * @author tcsnzh + * 远古时期的代码,估计也没人会使用。但我也不确定,因此标注废弃。 + *

+ * 难受的一比,为了屎山的兼容性要在代码里保留这么多屎盆。 * @version 1.0 + * @deprecated deprecated by version 1.5.1--SNAPSHOT */ +@Deprecated public interface ITimeoutWorker extends IWorker { /** * 每个worker都可以设置超时时间 + * * @return 毫秒超时时间 */ long timeOut(); @@ -14,6 +21,7 @@ public interface ITimeoutWorker extends IWorker { /** * 是否开启单个执行单元的超时功能(有时是一个group设置个超时,而不具备关心单个worker的超时) *

注意,如果开启了单个执行单元的超时检测,将使线程池数量多出一倍

+ * * @return 是否开启 */ boolean enableTimeOut(); diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/exception/CancelSkippedException.java b/asyncTool-core/src/main/java/com/jd/platform/async/exception/CancelException.java similarity index 44% rename from asyncTool-core/src/main/java/com/jd/platform/async/exception/CancelSkippedException.java rename to asyncTool-core/src/main/java/com/jd/platform/async/exception/CancelException.java index 5532b74..7fc4b42 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/exception/CancelSkippedException.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/exception/CancelException.java @@ -5,15 +5,11 @@ package com.jd.platform.async.exception; * * @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/25-下午6:12 */ -public class CancelSkippedException extends SkippedException { - public CancelSkippedException() { +public class CancelException extends EndsNormallyException { + public CancelException() { } - public CancelSkippedException(String message) { + public CancelException(String message) { super(message); } - - public CancelSkippedException(String message, long skipAt) { - super(message, skipAt); - } } diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/exception/EndsNormallyException.java b/asyncTool-core/src/main/java/com/jd/platform/async/exception/EndsNormallyException.java new file mode 100644 index 0000000..02d1d2d --- /dev/null +++ b/asyncTool-core/src/main/java/com/jd/platform/async/exception/EndsNormallyException.java @@ -0,0 +1,27 @@ +package com.jd.platform.async.exception; + +/** + * 该异常表示此任务单元是正常结束的。 + * + * @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/6/5-下午11:57 + */ +public abstract class EndsNormallyException extends RuntimeException { + public EndsNormallyException() { + } + + public EndsNormallyException(String message) { + super(message); + } + + public EndsNormallyException(String message, Throwable cause) { + super(message, cause); + } + + public EndsNormallyException(Throwable cause) { + super(cause); + } + + public EndsNormallyException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/exception/SkippedException.java b/asyncTool-core/src/main/java/com/jd/platform/async/exception/SkippedException.java index 1c38e7a..c0ec602 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/exception/SkippedException.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/exception/SkippedException.java @@ -1,30 +1,17 @@ package com.jd.platform.async.exception; -import com.jd.platform.async.executor.timer.SystemClock; - /** * 如果任务在执行之前,自己后面的任务已经执行完或正在被执行,则抛该exception * * @author wuweifeng wrote on 2020-02-18 * @version 1.0 */ -public class SkippedException extends RuntimeException { - private final long skipAt; - +public class SkippedException extends EndsNormallyException { public SkippedException() { this(null); } public SkippedException(String message) { - this(message, SystemClock.now()); - } - - public SkippedException(String message, long skipAt) { super(message); - this.skipAt = skipAt; - } - - public long getSkipAt() { - return skipAt; } } diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java index c0c3d88..8948d8d 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/Async.java @@ -65,14 +65,13 @@ public class Async { } /** - * 核心方法。 - * 该方法不是同步阻塞执行的。如果想要同步阻塞执行,则调用返回值的{@link OnceWork#awaitFinish()}即可。 + * 核心方法。该方法不是同步阻塞执行的。如果想要同步阻塞执行,则调用返回值的{@link OnceWork#awaitFinish()}即可。 * * @param timeout 全组超时时间 * @param executorService 执行线程池 * @param workerWrappers 任务容器集合 * @param workId 本次工作id - * @return 返回 {@link OnceWork}封装对象。 + * @return 返回 {@link OnceWork}任务句柄对象。 */ public static OnceWork work(long timeout, ExecutorService executorService, @@ -95,59 +94,6 @@ public class Async { return onceWork; } - /** - * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。 - */ - @SuppressWarnings("unused") - @Deprecated - public static void beginWorkAsync(long timeout, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) { - beginWorkAsync(timeout, getCommonPool(), groupCallback, workerWrapper); - } - - /** - * 异步执行,直到所有都完成,或失败后,发起回调 - * - * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。 - */ - @Deprecated - public static void beginWorkAsync(long timeout, ExecutorService executorService, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) { - if (groupCallback == null) { - groupCallback = new DefaultGroupCallback(); - } - IGroupCallback finalGroupCallback = groupCallback; - if (executorService != null) { - executorService.submit(() -> { - try { - boolean success = beginWork(timeout, executorService, workerWrapper); - if (success) { - finalGroupCallback.success(Arrays.asList(workerWrapper)); - } else { - finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException()); - } - } catch (ExecutionException | InterruptedException e) { - e.printStackTrace(); - finalGroupCallback.failure(Arrays.asList(workerWrapper), e); - } - }); - } else { - final ExecutorService commonPool = getCommonPool(); - commonPool.submit(() -> { - try { - boolean success = beginWork(timeout, commonPool, workerWrapper); - if (success) { - finalGroupCallback.success(Arrays.asList(workerWrapper)); - } else { - finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException()); - } - } catch (ExecutionException | InterruptedException e) { - e.printStackTrace(); - finalGroupCallback.failure(Arrays.asList(workerWrapper), e); - } - }); - } - - } - // ========================= 设置/属性选项 ========================= /** @@ -156,7 +102,11 @@ public class Async { * 在v1.4及之前,该COMMON_POLL是被写死的。 *

* 自v1.5后: - * 该线程池会被懒加载。 + * 该线程池将会在第一次调用{@link #getCommonPool()}时懒加载。 + * tip: + * 要注意,{@link #work(long, WorkerWrapper[])}、{@link #work(long, Collection)}这些方法, + * 不传入线程池就会默认调用{@link #getCommonPool()},就会初始化线程池。 + *

* 该线程池将会给线程取名为asyncTool-commonPool-thread-0(数字不重复)。 *

*/ @@ -167,7 +117,11 @@ public class Async { * 当执行{@link #beginWork(long, ExecutorService, Collection)}方法时,ExecutorService将会被记录下来。 *

* 注意,这里是个static,也就是只能有一个线程池。用户自定义线程池时,也只能定义一个 + * + * @deprecated 不明意义、毫无用处的字段。记录之前使用的线程池没啥意义。 */ + @SuppressWarnings("DeprecatedIsStillUsed") + @Deprecated private static final AtomicReference lastExecutorService = new AtomicReference<>(null); /** @@ -186,7 +140,6 @@ public class Async { new ThreadFactory() { private final AtomicLong threadCount = new AtomicLong(0); - @SuppressWarnings("NullableProblems") @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, @@ -212,6 +165,10 @@ public class Async { return COMMON_POOL; } + /** + * @deprecated 不明意义的输出信息的方法 + */ + @Deprecated public static String getThreadCount() { return "activeCount=" + COMMON_POOL.getActiveCount() + ",completedCount=" + COMMON_POOL.getCompletedTaskCount() + @@ -282,6 +239,58 @@ public class Async { return beginWork(timeout, getCommonPool(), workerWrapper); } + /** + * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。 + */ + @Deprecated + public static void beginWorkAsync(long timeout, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) { + beginWorkAsync(timeout, getCommonPool(), groupCallback, workerWrapper); + } + + /** + * 异步执行,直到所有都完成,或失败后,发起回调 + * + * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。 + */ + @Deprecated + public static void beginWorkAsync(long timeout, ExecutorService executorService, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) { + if (groupCallback == null) { + groupCallback = new DefaultGroupCallback(); + } + IGroupCallback finalGroupCallback = groupCallback; + if (executorService != null) { + executorService.submit(() -> { + try { + boolean success = beginWork(timeout, executorService, workerWrapper); + if (success) { + finalGroupCallback.success(Arrays.asList(workerWrapper)); + } else { + finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException()); + } + } catch (ExecutionException | InterruptedException e) { + e.printStackTrace(); + finalGroupCallback.failure(Arrays.asList(workerWrapper), e); + } + }); + } else { + final ExecutorService commonPool = getCommonPool(); + commonPool.submit(() -> { + try { + boolean success = beginWork(timeout, commonPool, workerWrapper); + if (success) { + finalGroupCallback.success(Arrays.asList(workerWrapper)); + } else { + finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException()); + } + } catch (ExecutionException | InterruptedException e) { + e.printStackTrace(); + finalGroupCallback.failure(Arrays.asList(workerWrapper), e); + } + }); + } + + } + /** * 关闭上次使用的线程池 * diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java b/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java index 611638b..52dbb33 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java @@ -72,6 +72,9 @@ public interface OnceWork { */ default Map> getWrappersOfState(ResultState... ofState) { final HashSet states = new HashSet<>(Arrays.asList(ofState)); + if (states.isEmpty()) { + return new HashMap<>(1); + } return getWrappers().entrySet().stream() .filter(entry -> states.contains(entry.getValue().getWorkResult().getResultState())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); @@ -123,6 +126,8 @@ public interface OnceWork { } /** + * 返回{@link Future}视图。 + * * @param sleepCheckInterval 为防止线程爆炸,在{@link Future#get(long, TimeUnit)}方法时使用隔一段时间检查一次。 * 该Function的参数为总超时毫秒值,返回值为检查时间间隔。 * @return 返回 {@link AsFuture}封装对象。 @@ -152,16 +157,19 @@ public interface OnceWork { } /** - * 同步等待取消 + * 同步等待取消。 * - * @param ignore__mayInterruptIfRunning 该参数将被无视。因为暂未实现“修改允许打断属性”功能。 // todo 等待实现 + * @param ignore 该参数将被无视。因为暂未实现“修改允许打断属性”功能。todo : await implement */ @Override - public boolean cancel(boolean ignore__mayInterruptIfRunning) { + public boolean cancel(boolean ignore) { try { + if (onceWork.isFinish()) { + return false; + } onceWork.pleaseCancelAndAwaitFinish(); } catch (InterruptedException e) { - throw new RuntimeException("", e); + throw new RuntimeException("interrupted when await finish in : " + this, e); } return true; } @@ -189,7 +197,7 @@ public interface OnceWork { */ @Override public Map> get(long timeout, - @SuppressWarnings("NullableProblems") TimeUnit unit) + TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { final long millis = Objects.requireNonNull(unit).toMillis(timeout); final long interval = Math.max(1, Math.min(millis, sleepCheckInterval.apply(millis))); @@ -206,7 +214,7 @@ public interface OnceWork { @Override public String toString() { - return "(asFuture from " + this + ")"; + return "(asFuture from " + onceWork + ")@" + Integer.toHexString(this.hashCode()); } } diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/worker/WorkResult.java b/asyncTool-core/src/main/java/com/jd/platform/async/worker/WorkResult.java index 1cf0f04..9a9d20a 100755 --- a/asyncTool-core/src/main/java/com/jd/platform/async/worker/WorkResult.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/worker/WorkResult.java @@ -28,6 +28,7 @@ public class WorkResult { * 返回不可修改的DEFAULT单例。 */ public static WorkResult defaultResult() { + //noinspection unchecked return (WorkResult) DEFAULT; } diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/StableWorkerWrapperBuilder.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/StableWorkerWrapperBuilder.java index 5c0da4a..c578dd7 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/StableWorkerWrapperBuilder.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/StableWorkerWrapperBuilder.java @@ -348,6 +348,7 @@ class StableWorkerWrapperBuilder 0) { + //noinspection deprecation selfIsMustSet.forEach(next -> Optional.ofNullable(next.getWrapperStrategy().getDependMustStrategyMapper()) .ifPresent(mustMapper -> mustMapper.addDependMust(wrapper))); } diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java index 51fcdcc..56f83b6 100755 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java @@ -1,6 +1,7 @@ package com.jd.platform.async.wrapper; -import com.jd.platform.async.exception.CancelSkippedException; +import com.jd.platform.async.exception.CancelException; +import com.jd.platform.async.exception.EndsNormallyException; import com.jd.platform.async.worker.ResultState; import com.jd.platform.async.worker.WorkResult; import com.jd.platform.async.callback.DefaultCallback; @@ -113,7 +114,8 @@ public abstract class WorkerWrapper { this.id = id; //允许不设置回调 if (callback == null) { - callback = new DefaultCallback<>(); + //noinspection unchecked + callback = (ICallback) DefaultCallback.getInstance(); } this.callback = callback; this.allowInterrupt = allowInterrupt; @@ -237,7 +239,7 @@ public abstract class WorkerWrapper { */ public void cancel() { if (State.setState(state, states_of_beforeWorkingEnd, SKIP, null)) { - fastFail(false, new CancelSkippedException(), true); + fastFail(false, new CancelException(), true); } } @@ -270,7 +272,7 @@ public abstract class WorkerWrapper { callback.result(success, param, _workResult); } catch (Exception e) { if (setState(state, states_of_skipOrAfterWork, ERROR, null)) { - fastFail(false, e, _workResult.getEx() instanceof SkippedException); + fastFail(false, e, _workResult.getEx() instanceof EndsNormallyException); } } }; @@ -281,8 +283,8 @@ public abstract class WorkerWrapper { }; final BiConsumer __function__fastFail_callbackResult$false_beginNext = (fastFail_isTimeout, fastFail_exception) -> { - boolean isSkip = fastFail_exception instanceof SkippedException; - fastFail(fastFail_isTimeout && !isSkip, fastFail_exception, isSkip); + boolean isEndsNormally = fastFail_exception instanceof EndsNormallyException; + fastFail(fastFail_isTimeout && !isEndsNormally, fastFail_exception, isEndsNormally); __function__callbackResultOfFalse_beginNext.run(); }; final Runnable __function__doWork = @@ -413,13 +415,14 @@ public abstract class WorkerWrapper { * 快速失败。 * 该方法不负责检查状态,请自行控制。 * - * @param isTimeout 是否是因为超时而快速失败 - * @param e 设置异常信息到{@link WorkResult#getEx()} + * @param isTimeout 是否是因为超时而快速失败 + * @param e 设置异常信息到{@link WorkResult#getEx()} + * @param isEndsNormally 是否是因正常情况正常而结束,例如跳过{@link SkippedException}、取消{@link CancelException}。 */ - protected void fastFail(boolean isTimeout, Exception e, boolean isSkip) { + protected void fastFail(boolean isTimeout, Exception e, boolean isEndsNormally) { // 试图打断正在执行{@link IWorker#action(Object, Map)}的线程 Thread _doWorkingThread; - if ((_doWorkingThread = doWorkingThread.get()) != null + if ((_doWorkingThread = this.doWorkingThread.get()) != null // 不会打断自己 && !Objects.equals(Thread.currentThread(), _doWorkingThread)) { _doWorkingThread.interrupt(); @@ -427,7 +430,7 @@ public abstract class WorkerWrapper { // 尚未处理过结果则设置 workResult.compareAndSet(null, new WorkResult<>( worker.defaultValue(), - isTimeout ? ResultState.TIMEOUT : (isSkip ? ResultState.DEFAULT : ResultState.EXCEPTION), + isTimeout ? ResultState.TIMEOUT : (isEndsNormally ? ResultState.DEFAULT : ResultState.EXCEPTION), e )); } @@ -520,8 +523,9 @@ public abstract class WorkerWrapper { @Override public String toString() { - final StringBuilder sb = new StringBuilder(400) - .append("WorkerWrapper{id=").append(id) + final StringBuilder sb = new StringBuilder(256) + .append(this.getClass().getSimpleName()) + .append("{id=").append(id) .append(", state=").append(of(state.get())) .append(", param=").append(param) .append(", workResult=").append(workResult) @@ -571,6 +575,7 @@ public abstract class WorkerWrapper { this.dependOnUpWrapperStrategyMapper = dependOnUpWrapperStrategyMapper; } + @SuppressWarnings("deprecation") @Override public DependMustStrategyMapper getDependMustStrategyMapper() { return dependMustStrategyMapper; diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java index e5307d9..36e7c56 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperBuilder.java @@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit; * * @author create by TcSnZh on 2021/5/4-下午1:26 */ +@SuppressWarnings("unused") public interface WorkerWrapperBuilder { /** * 设置唯一id。 @@ -56,6 +57,7 @@ public interface WorkerWrapperBuilder { */ SetDepend setDepend(); + @SuppressWarnings({"UnusedReturnValue"}) interface SetDepend { /** * 设置在本Wrapper之前的上游Wrapper。 @@ -112,10 +114,8 @@ public interface WorkerWrapperBuilder { * @param wrapper 需要设置特殊策略的Wrapper。 * @param strategy 特殊策略。 */ - @SuppressWarnings("UnusedReturnValue") SetDepend specialDependWrapper(DependOnUpWrapperStrategy strategy, WorkerWrapper wrapper); - @SuppressWarnings("unused") default SetDepend specialDependWrapper(DependOnUpWrapperStrategy strategy, WorkerWrapper... wrappers) { if (strategy == null || wrappers == null) { return this; @@ -158,7 +158,6 @@ public interface WorkerWrapperBuilder { return setDepend().wrapper(wrappers).end(); } - @SuppressWarnings("unused") default WorkerWrapperBuilder depends(Collection wrappers) { return setDepend().wrapper(wrappers).end(); } @@ -167,7 +166,6 @@ public interface WorkerWrapperBuilder { return setDepend().wrapper(wrappers).strategy(strategy).end(); } - @SuppressWarnings("unused") default WorkerWrapperBuilder depends(DependenceStrategy strategy, Collection wrappers) { return setDepend().wrapper(wrappers).strategy(strategy).end(); } @@ -208,7 +206,6 @@ public interface WorkerWrapperBuilder { */ SetNext mustToNextWrapper(WorkerWrapper wrapper); - @SuppressWarnings("unused") default SetNext requireToNextWrapper(WorkerWrapper wrapper, boolean must) { return must ? mustToNextWrapper(wrapper) : wrapper(wrapper); } @@ -234,7 +231,6 @@ public interface WorkerWrapperBuilder { return setNext().wrapper(wrappers).end(); } - @SuppressWarnings("unused") default WorkerWrapperBuilder nextOf(Collection wrappers) { return setNext().wrapper(wrappers).end(); } diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperGroup.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperGroup.java index 5ae630b..0badd58 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperGroup.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/WorkerWrapperGroup.java @@ -1,6 +1,5 @@ package com.jd.platform.async.wrapper; -import com.jd.platform.async.exception.CancelSkippedException; import com.jd.platform.async.executor.PollingCenter; import java.util.Collection; diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/WrapperStrategy.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/WrapperStrategy.java index 044c8d9..54a3221 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/WrapperStrategy.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/WrapperStrategy.java @@ -8,6 +8,7 @@ import com.jd.platform.async.wrapper.strategy.depend.DependenceStrategy; import com.jd.platform.async.wrapper.strategy.skip.SkipStrategy; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; /** * @author create by TcSnZh on 2021/5/17-下午6:23 @@ -99,12 +100,33 @@ public interface WrapperStrategy extends DependenceStrategy, SkipStrategy { abstract class AbstractWrapperStrategy implements WrapperStrategy { @Override public String toString() { - return "WrapperStrategy{" + - "dependWrapperStrategyMapper=" + getDependWrapperStrategyMapper() + - ", dependMustStrategyMapper=" + getDependMustStrategyMapper() + - ", dependenceStrategy=" + getDependenceStrategy() + - ", skipStrategy=" + getSkipStrategy() + - '}'; + final StringBuilder sb = new StringBuilder(128) + .append(this.getClass().getSimpleName()).append('{'); + final AtomicBoolean needAppendSplit = new AtomicBoolean(); + appendNotNullProperty(sb, "dependWrapperStrategyMapper=", + getDependWrapperStrategyMapper(), needAppendSplit, ", "); + appendNotNullProperty(sb, "dependMustStrategyMapper=", + getDependMustStrategyMapper(), needAppendSplit, ", "); + appendNotNullProperty(sb, "dependenceStrategy=", + getDependenceStrategy(), needAppendSplit, ", "); + appendNotNullProperty(sb, "skipStrategy=", + getSkipStrategy(), needAppendSplit, ", "); + return sb.append('}').toString(); + } + + private static void appendNotNullProperty(StringBuilder sb, + String propPrefix, + Object prop, + AtomicBoolean needAppendSplit, + @SuppressWarnings("SameParameterValue") String split) { + if (prop == null) { + return; + } + if (needAppendSplit.get()) { + sb.append(split); + } + sb.append(propPrefix).append(prop); + needAppendSplit.set(true); } } @@ -117,6 +139,7 @@ public interface WrapperStrategy extends DependenceStrategy, SkipStrategy { return null; } + @SuppressWarnings("deprecation") @Override public DependMustStrategyMapper getDependMustStrategyMapper() { return null; diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/depend/DependMustStrategyMapper.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/depend/DependMustStrategyMapper.java index b4b1a24..064f046 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/depend/DependMustStrategyMapper.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/depend/DependMustStrategyMapper.java @@ -14,6 +14,7 @@ import java.util.stream.Collectors; * * @author create by TcSnZh on 2021/5/4-下午1:24 */ +@SuppressWarnings("UnusedReturnValue") public class DependMustStrategyMapper implements DependenceStrategy { private final Set> mustDependSet = new LinkedHashSet<>(); diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/depend/DependOnUpWrapperStrategyMapper.java b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/depend/DependOnUpWrapperStrategyMapper.java index 476aa05..8836581 100644 --- a/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/depend/DependOnUpWrapperStrategyMapper.java +++ b/asyncTool-core/src/main/java/com/jd/platform/async/wrapper/strategy/depend/DependOnUpWrapperStrategyMapper.java @@ -6,6 +6,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * 对不同的{@link WorkerWrapper}调用者实行个性化依赖响应策略。 @@ -27,7 +28,6 @@ public class DependOnUpWrapperStrategyMapper implements DependenceStrategy { @SuppressWarnings("UnusedReturnValue") public DependOnUpWrapperStrategyMapper putMapping(WorkerWrapper targetWrapper, DependOnUpWrapperStrategy strategy) { mapper.put(targetWrapper, strategy); - toStringCache = null; return this; } @@ -42,7 +42,7 @@ public class DependOnUpWrapperStrategyMapper implements DependenceStrategy { * @return 如果在mapper中有对fromWrapper的处理策略,则使用其进行判断。否则返回JUDGE_BY_AFTER交给下一个进行判断。 */ @Override - public DependenceAction.WithProperty judgeAction(Set> dependWrappers, + public DependenceAction.WithProperty judgeAction(Set> dependWrappers, WorkerWrapper thisWrapper, WorkerWrapper fromWrapper) { DependOnUpWrapperStrategy strategy = mapper.get(fromWrapper); @@ -52,19 +52,18 @@ public class DependOnUpWrapperStrategyMapper implements DependenceStrategy { return strategy.judge(fromWrapper); } - /** - * 缓存toString - */ - private String toStringCache; - @Override public String toString() { - if (toStringCache == null) { - toStringCache = "DependWrapperStrategyMapper{mapper=" + mapper.entrySet().stream() - .map(entry -> "{" + entry.getKey().getId() + ":" + entry.getValue() + "}") - .collect(Collectors.toList()) - + "}"; + final StringBuilder sb = new StringBuilder(64) + .append(this.getClass().getSimpleName()).append("{mapper="); + final Set, DependOnUpWrapperStrategy>> entrySet = mapper.entrySet(); + entrySet.forEach(entry -> { + sb.append(entry.getKey().getId()).append(':').append(entry.getValue()).append(", "); + }); + if (entrySet.size() > 0) { + final int length = sb.length(); + sb.delete(length - 2, length); } - return toStringCache; + return sb.append('}').toString(); } } diff --git a/asyncTool-core/src/test/java/v15/cases/Case9.java b/asyncTool-core/src/test/java/v15/cases/Case9.java index d58c4d7..672bded 100644 --- a/asyncTool-core/src/test/java/v15/cases/Case9.java +++ b/asyncTool-core/src/test/java/v15/cases/Case9.java @@ -24,7 +24,7 @@ class Case9 { System.out.println("I am IWorker 1"); return null; }, - new DefaultCallback<>(), + DefaultCallback.getInstance(), false, true, 100, @@ -38,7 +38,7 @@ class Case9 { System.out.println("I am IWorker 2"); return null; }, - new DefaultCallback<>(), + DefaultCallback.getInstance(), false, true, 100, -- Gitee