diff --git a/.gitignore b/.gitignore
index 66d02f66facab7272d4a74c46dfecb5793fa1aa2..991259a79777dba9e44ae0ab16918a6af62ebe9b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,32 +1,32 @@
-HELP.md
-target/
+**/HELP.md
+**/target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**
!**/src/test/**
### STS ###
-.apt_generated
-.classpath
-.factorypath
-.project
-.settings
-.springBeans
-.sts4-cache
+**/.apt_generated
+**/.classpath
+**/.factorypath
+**/.project
+**/.settings
+**/.springBeans
+**/.sts4-cache
### IntelliJ IDEA ###
-.idea
-*.iws
-*.iml
-*.ipr
+**/.idea
+**/*.iws
+**/*.iml
+**/*.ipr
### NetBeans ###
-/nbproject/private/
-/nbbuild/
-/dist/
-/nbdist/
-/.nb-gradle/
-build/
+**/nbproject/private/
+**/nbbuild/
+**/dist/
+**/nbdist/
+**/.nb-gradle/
+**/build/
### VS Code ###
-.vscode/
+**/.vscode/
diff --git a/QuickStart.md b/QuickStart.md
index 125a6d9a0d981cb0a0baa1628c12c1a2ec004f14..ec1e1db5d60048d8b6d18625513065c2e916c24b 100644
--- a/QuickStart.md
+++ b/QuickStart.md
@@ -4,6 +4,8 @@
代码不多,直接拷贝包过去即可。
+#### 旧稳定版本v1.4
+
京东同事通过引用如下maven来使用。
```xml
@@ -34,11 +36,49 @@
```
-# 使用说明
+#### 最新版本v1.5(不稳定)
+
+
+
+从gitee上下载仓库到本地,切换到`dev`分支,然后maven安装到本地仓库。
+
+```bash
+git clone https://gitee.com/jd-platform-opensource/asyncTool.git
+cd ./asyncTool
+git checkout dev
+mvn install
+```
+
+在项目中引入依赖。
+
+```xml
+
+
+ asyncTool-core
+ com.jd.platform
+ 1.5.1-SNAPSHOT
+
+```
+
+# 任务编排
+
+> `asyncTool-core`核心模块提供了核心功能——任务编排
+>
+> 以下文档基于版本:
+>
+> ```xml
+>
+>
+> com.jd.platform
+> asyncTool-core
+> 1.5.1-SNAPSHOT
+>
+>
+> ```
### 基本组件
-worker: 一个最小的任务执行单元。通常是一个网络调用,或一段耗时操作。
+`IWorker`: 一个最小的任务执行单元。通常是一个网络调用,或一段耗时操作。
T,V两个泛型,分别是入参和出参类型。
@@ -63,7 +103,7 @@ public interface IWorker {
V action(T object, Map allWrappers);
/**
- * 超时、异常时,返回的默认值
+ * 超时、异常、跳过时,返回的默认值
*
* @return 默认值
*/
@@ -73,8 +113,7 @@ public interface IWorker {
}
```
-
-callBack:对每个worker的回调。worker执行完毕后,会回调该接口,带着执行成功、失败、原始入参、和详细的结果。
+`ICallback`:对每个worker的回调。worker执行完毕后,会回调该接口,带着执行成功、失败、原始入参、和详细的结果。
```java
/**
@@ -117,6 +156,85 @@ WorkerWrapper w0 = WorkerWrapper.builder()
通过这一个类看一下,action里就是你的耗时操作,begin就是任务开始执行时的回调,result就是worker执行完毕后的回调。当你组合了多个执行单元时,每一步的执行,都在掌控之内。失败了,还会有自定义的默认值。这是CompleteableFuture无法做到的。
+### 如何构造WorkerWrapper?
+
+##### 推荐Builder模式
+
+如果刚开始使用这个框架,则推荐使用如下方式进行构造:
+
+```java
+WorkerWrapper.builder()
+ .id()
+ // 其他属性略。
+ // 请在《简单示例》与《设置WorkerWrapper属性》中慢慢感受详细内容。
+ // 因为这里地方小,写不下。
+```
+
+##### 复杂的快速构造
+
+> 不推荐新手使用。
+>
+> 不推荐在业务中使用,使用Builder模式代码更加简洁,且会检查参数,不必节省这些性能。
+>
+> 该对象的构造方法不会检查属性。
+
+在对WorkerWrapper属性有充足了解后,可使用“直接设置属性 + 关系图”的方式快速构造wrapper。
+
+建议在扩展功能的时候使用该构造器,以提高效率。但是请记得检查参数。
+
+以下为示例:
+
+```java
+class Case9 {
+ public static void main(String[] args) throws ExecutionException, InterruptedException {
+ DirectedGraph, Object> graph = DirectedGraph.synchronizedDigraph(new CommonDirectedGraph<>());
+ QuickBuildWorkerWrapper
*/
- private static ThreadPoolExecutor COMMON_POOL;
+ private static volatile ThreadPoolExecutor COMMON_POOL;
/**
* 在以前(及现在)的版本中:
@@ -133,7 +168,7 @@ public class Async {
* lastExecutorService = new AtomicReference<>(null);
/**
* 该方法将会返回{@link #COMMON_POOL},如果还未初始化则会懒加载初始化后再返回。
@@ -151,9 +186,11 @@ public class Async {
new ThreadFactory() {
private final AtomicLong threadCount = new AtomicLong(0);
+ @SuppressWarnings("NullableProblems")
@Override
public Thread newThread(Runnable r) {
- Thread t = new Thread(r, "asyncTool-commonPool-thread-" + threadCount.getAndIncrement());
+ Thread t = new Thread(r,
+ "asyncTool-commonPool-thread-" + threadCount.getAndIncrement());
t.setDaemon(true);
return t;
}
@@ -183,11 +220,12 @@ public class Async {
/**
* @param now 是否立即关闭
- * @throws IllegalStateException 如果尚未调用过{@link #getCommonPool()},即没有使用过“使用默认线程池”的方法,该方法会抛出空指针异常。
+ * @return 如果尚未调用过{@link #getCommonPool()},即没有初始化默认线程池,返回false。否则返回true。
*/
- public static synchronized void shutDownCommonPool(boolean now) {
+ @SuppressWarnings("unused")
+ public static synchronized boolean shutDownCommonPool(boolean now) {
if (COMMON_POOL == null) {
- throw new IllegalStateException("COMMON_POOL Not initialized yet");
+ return false;
}
if (!COMMON_POOL.isShutdown()) {
if (now) {
@@ -196,6 +234,52 @@ public class Async {
COMMON_POOL.shutdown();
}
}
+ return true;
+ }
+
+ // ========================= deprecated =========================
+
+ /**
+ * 同步执行一次任务。
+ *
+ * @return 只要执行未超时,就返回true。
+ * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。
+ */
+ @Deprecated
+ public static boolean beginWork(long timeout,
+ ExecutorService executorService,
+ Collection extends WorkerWrapper, ?>> workerWrappers)
+ throws InterruptedException {
+ final OnceWork work = work(timeout, executorService, workerWrappers);
+ work.awaitFinish();
+ return work.hasTimeout();
+ }
+
+ /**
+ * 同步执行一次任务。
+ * 如果想自定义线程池,请传pool。不自定义的话,就走默认的COMMON_POOL
+ *
+ * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。
+ */
+ @Deprecated
+ public static boolean beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper)
+ throws ExecutionException, InterruptedException {
+ if (workerWrapper == null || workerWrapper.length == 0) {
+ return false;
+ }
+ Set workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toSet());
+ //noinspection unchecked
+ return beginWork(timeout, executorService, workerWrappers);
+ }
+
+ /**
+ * 同步阻塞,直到所有都完成,或失败
+ *
+ * @deprecated 已经被 {@link #work(long, ExecutorService, Collection, String)}方法取代。
+ */
+ @Deprecated
+ public static boolean beginWork(long timeout, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException {
+ return beginWork(timeout, getCommonPool(), workerWrapper);
}
/**
@@ -209,8 +293,9 @@ public class Async {
*/
@Deprecated
public static void shutDown() {
- if (lastExecutorService != COMMON_POOL) {
- shutDown(lastExecutorService);
+ final ExecutorService last = lastExecutorService.get();
+ if (last != COMMON_POOL) {
+ shutDown(last);
}
}
@@ -221,7 +306,7 @@ public class Async {
* @deprecated 没啥用的方法,要关闭线程池还不如直接调用线程池的关闭方法,避免歧义。
*/
@Deprecated
- public static void shutDown(ExecutorService executorService) {
+ public static void shutDown(@Nullable ExecutorService executorService) {
if (executorService != null) {
executorService.shutdown();
}
diff --git a/src/main/java/com/jd/platform/async/executor/PollingCenter.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/PollingCenter.java
similarity index 85%
rename from src/main/java/com/jd/platform/async/executor/PollingCenter.java
rename to asyncTool-core/src/main/java/com/jd/platform/async/executor/PollingCenter.java
index 66d4cabdad615566616bf85c10b0b3229b5d9228..2dc1829fb05b28c634efd7144c44fb9c691d6c17 100644
--- a/src/main/java/com/jd/platform/async/executor/PollingCenter.java
+++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/PollingCenter.java
@@ -1,9 +1,9 @@
package com.jd.platform.async.executor;
-import com.jd.platform.async.util.timer.Timeout;
-import com.jd.platform.async.util.timer.TimerTask;
-import com.jd.platform.async.util.timer.HashedWheelTimer;
-import com.jd.platform.async.util.timer.Timer;
+import com.jd.platform.async.openutil.timer.HashedWheelTimer;
+import com.jd.platform.async.openutil.timer.Timeout;
+import com.jd.platform.async.openutil.timer.Timer;
+import com.jd.platform.async.openutil.timer.TimerTask;
import com.jd.platform.async.wrapper.WorkerWrapperGroup;
import java.util.Set;
@@ -12,7 +12,7 @@ import java.util.concurrent.TimeUnit;
/**
* 检查{@link WorkerWrapperGroup}是否调用完成的轮询中心。
- *
+ * 内部使用时间轮进行轮询。
*
* ===========================================================================================
*
@@ -27,11 +27,11 @@ import java.util.concurrent.TimeUnit;
* 这是旧版本(v1.4及以前)中可能会引发线程耗尽bug的情况,在test/v15.wrappertest中示例testThreadPolling_V14Bug说明了这个bug
* 线程数:2
* A(5ms)--B1(10ms) ---|--> C1(5ms)
- * . \ | (B1、B2全部完成可执行C1、C2)
+ * . \ | (B1、B2任一完成可执行C1、C2)
* . ---> B2(20ms) --|--> C2(5ms)
*
* }
- * 线程1执行了A,然后在{@link java.util.concurrent.CompletableFuture#allOf(CompletableFuture[])}等待B1与B2执行完成。
+ * 线程1执行了A,然后在{@link CompletableFuture#allOf(CompletableFuture[])}等待B1与B2执行完成。
* 线程2执行了B1或B2中的一个,也在allOf方法等待C1、C2完成。
* 结果没有线程执行C和B2了,导致超时而死,并且这个线程池线程有可能被耗尽。
* >
@@ -65,7 +65,7 @@ public class PollingCenter {
thread.setDaemon(true);
return thread;
},
- 4,
+ 1,
TimeUnit.MILLISECONDS,
1024);
diff --git a/src/main/java/com/jd/platform/async/executor/timer/SystemClock.java b/asyncTool-core/src/main/java/com/jd/platform/async/executor/timer/SystemClock.java
similarity index 97%
rename from src/main/java/com/jd/platform/async/executor/timer/SystemClock.java
rename to asyncTool-core/src/main/java/com/jd/platform/async/executor/timer/SystemClock.java
index e65dd8578d3318f2375747d738eddf77e7a7bf8e..6cba50a28e46c86301cb0b7e1136db091bccda8d 100644
--- a/src/main/java/com/jd/platform/async/executor/timer/SystemClock.java
+++ b/asyncTool-core/src/main/java/com/jd/platform/async/executor/timer/SystemClock.java
@@ -2,7 +2,6 @@ package com.jd.platform.async.executor.timer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
diff --git a/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java b/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java
new file mode 100644
index 0000000000000000000000000000000000000000..611638be5d03cf51dcd6b7d96569707b1878fb74
--- /dev/null
+++ b/asyncTool-core/src/main/java/com/jd/platform/async/worker/OnceWork.java
@@ -0,0 +1,375 @@
+package com.jd.platform.async.worker;
+
+import com.jd.platform.async.executor.timer.SystemClock;
+import com.jd.platform.async.wrapper.WorkerWrapper;
+import com.jd.platform.async.wrapper.WorkerWrapperGroup;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * 一次工作结果的总接口。
+ *
+ * @author tcsnzh[zh.jobs@foxmail.com] create this in 2021/5/25-下午3:22
+ */
+public interface OnceWork {
+ /**
+ * 返回唯一的workId
+ */
+ String workId();
+
+ /**
+ * 判断是否结束。因超时而结束也算结束。
+ */
+ boolean isFinish();
+
+ /**
+ * 同步等待到结束。
+ */
+ void awaitFinish() throws InterruptedException;
+
+ /**
+ * 判断是否超时
+ *
+ * @return 如果尚未结束或已结束但未超时,返回false。已结束且已经超时返回true。
+ */
+ boolean hasTimeout();
+
+ /**
+ * 判断是否全部wrapper都处于 执行成功 或 跳过。
+ *
+ * @return 如果已经结束,所有wrapper都成功或跳过返回true,否则返回false。如果尚未结束,返回false。
+ */
+ default boolean allSuccess() {
+ if (!isFinish()) {
+ return false;
+ }
+ return getWrappers().values().stream().allMatch(wrapper -> {
+ final ResultState state = wrapper.getWorkResult().getResultState();
+ return state == ResultState.SUCCESS || state == ResultState.DEFAULT;
+ });
+ }
+
+ /**
+ * 获取全部参与到工作中的wrapper。
+ */
+ Map> getWrappers();
+
+ /**
+ * 获取{@link WorkResult#getResultState()}为{@link ResultState#SUCCESS}的wrapper。
+ */
+ default Map> getSuccessWrappers() {
+ return getWrappersOfState(ResultState.SUCCESS);
+ }
+
+ /**
+ * 获取状态于这些state中的wrapper。
+ *
+ * @param ofState 状态列表
+ * @return 返回Map
+ */
+ default Map> getWrappersOfState(ResultState... ofState) {
+ final HashSet states = new HashSet<>(Arrays.asList(ofState));
+ return getWrappers().entrySet().stream()
+ .filter(entry -> states.contains(entry.getValue().getWorkResult().getResultState()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ /**
+ * 获取启动时间
+ */
+ long getStartTime();
+
+ /**
+ * 获取结束时间
+ *
+ * @return 如果超时,返回超时的时刻。如果尚未结束,则抛出异常。
+ * @throws IllegalStateException 尚未结束,抛出异常。
+ */
+ long getFinishTime();
+
+ /**
+ * @return 已经取消完成
+ */
+ boolean isCancelled();
+
+ /**
+ * @return 是否正在取消中
+ */
+ boolean isWaitingCancel();
+
+ /**
+ * 请求异步取消。
+ */
+ void pleaseCancel();
+
+ /**
+ * 同步等待取消完成。
+ */
+ default void pleaseCancelAndAwaitFinish() throws InterruptedException {
+ if (!isCancelled() && !isWaitingCancel()) {
+ pleaseCancel();
+ }
+ awaitFinish();
+ }
+
+ /**
+ * @return 返回 {@link AsFuture}封装对象。
+ */
+ default AsFuture asFuture() {
+ return new AsFuture(this, limitTime -> limitTime / 16);
+ }
+
+ /**
+ * @param sleepCheckInterval 为防止线程爆炸,在{@link Future#get(long, TimeUnit)}方法时使用隔一段时间检查一次。
+ * 该Function的参数为总超时毫秒值,返回值为检查时间间隔。
+ * @return 返回 {@link AsFuture}封装对象。
+ */
+ default AsFuture asFuture(Function sleepCheckInterval) {
+ return new AsFuture(this, sleepCheckInterval);
+ }
+
+ // static
+
+ /**
+ * 空任务
+ */
+ static OnceWork emptyWork(String workId) {
+ return new EmptyWork(workId);
+ }
+
+ // class
+
+ class AsFuture implements Future