# ThreadForge **Repository Path**: feiyoulian/ThreadForge ## Basic Information - **Project Name**: ThreadForge - **Description**: No description available - **Primary Language**: Unknown - **License**: MIT - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 1 - **Created**: 2026-02-24 - **Last Updated**: 2026-04-24 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # ThreadForge [![Maven Central](https://img.shields.io/maven-central/v/pub.lighting/threadforge-core?label=Maven%20Central)](https://search.maven.org/artifact/pub.lighting/threadforge-core) [![Java](https://img.shields.io/badge/Java-8%2B-007396)](https://adoptium.net/) [![License](https://img.shields.io/badge/License-MIT-green.svg)](https://github.com/wuuJiawei/ThreadForge/blob/main/LICENSE) ## 更新日志 - 完整更新记录:[`CHANGELOG.md`](./CHANGELOG.md) - 最新版本:`v1.1.0`(2026-02-19) - 最新版本摘要(v1.1.0): 1. 新增 `RetryPolicy`(支持 scope 默认重试与任务级覆盖) 2. 新增任务级超时(`Per-Task Timeout` / `TaskTimeoutException`) 3. 新增上下文传播(平台线程与虚拟线程) 4. 新增 OpenTelemetry 集成(`withOpenTelemetry(...)`) 5. 新增任务优先级调度(`TaskPriority` + `Scheduler.priority(...)`) ThreadForge 是一个减少多线程心智负担的结构化并发框架,目标是让并发代码更简单、更安全、更可观测。 > 先降低并发代码的认知成本,再追求吞吐与性能。 ## 初衷(Why ThreadForge) 在真实业务里,Java 的各种并发模型往往过于复杂,新手上手成本太高,现有的代码经过多线程的歪歪绕绕后也难以维护。 比如下面这些显而易见的问题: - 任务生命周期分散在多个类和线程,边界不清晰 - 失败传播、超时和取消逻辑重复且容易遗漏 - 清理动作(资源释放、回滚)分散,异常路径经常漏收口 - 观测信息(任务开始/结束/失败)不统一,排障成本高 ThreadForge 的目标是把这些分散的并发控制点收敛到一个可推理的模型里,让团队在维护并发代码时更省脑力。 ## ThreadForge 如何减少心智负担 - 结构化作用域:所有任务都归属 `ThreadScope`,生命周期有边界 - 默认安全策略:默认 `FAIL_FAST` + 默认 deadline + 自动取消/清理 - 统一失败语义:通过 `FailurePolicy` 明确不同场景的失败处理方式 - 自动上下文传播:`Context` 在提交/调度时自动捕获并传播 - 统一观测入口:通过 `ThreadHook` 和 `TaskInfo` 做生命周期埋点 - 内置低开销指标:默认聚合任务耗时与状态计数,可按需读取快照 - 跨 JDK 一致调用:JDK 21+ 优先虚拟线程,旧版本自动降级 ## 设计目标 - 默认行为正确:默认失败快速传播、默认超时、自动清理资源 - 结构化生命周期:任务必须绑定在 `ThreadScope` 内 - 跨 JDK 兼容:JDK 21+ 优先虚拟线程,其它版本自动降级 - 可观测:为任务启动/成功/失败/取消提供 hook ## 当前实现状态 - 结构化作用域:`ThreadScope` - 任务句柄:`Task` - 失败策略:`FailurePolicy` - 失败重试策略:`RetryPolicy` - 任务优先级:`TaskPriority` - 协作式取消:`CancellationToken` - 上下文传播:`Context` - 有界通道:`Channel` - 调度策略:`Scheduler` - 延迟/周期任务:`DelayScheduler` + `ScheduledTask` - OpenTelemetry 集成:`withOpenTelemetry(...)` + `OpenTelemetryHook` - 生命周期观测:`ThreadHook` + `TaskInfo` - 内置指标快照:`ScopeMetricsSnapshot` - 组合式编排 API:`Task.thenApply` / `Task.thenCompose` / `Task.exceptionally` ## 快速开始 ```java try (ThreadScope scope = ThreadScope.open()) { Task task = scope.submit("load-user", () -> "u-100"); String user = task.await(); } ``` ## 安装 Maven: ```xml pub.lighting threadforge-core 1.1.0 ``` Gradle: ```gradle implementation("pub.lighting:threadforge-core:1.1.0") ``` ## 核心 API ### ThreadScope ```java ThreadScope.open() .withScheduler(Scheduler.priority(8)) .withFailurePolicy(FailurePolicy.FAIL_FAST) .withRetryPolicy(RetryPolicy.noRetry()) .withDefaultTaskPriority(TaskPriority.NORMAL) .withOpenTelemetry() .withConcurrencyLimit(32) .withDeadline(Duration.ofSeconds(30)) .withHook(hook); ``` 任务提交: ```java Task submit(Callable callable) Task submit(String name, Callable callable) Task submit(Callable callable, TaskPriority taskPriority) Task submit(String name, Callable callable, TaskPriority taskPriority) Task submit(Callable callable, RetryPolicy retryPolicy) Task submit(String name, Callable callable, RetryPolicy retryPolicy) Task submit(Callable callable, Duration timeout) Task submit(String name, Callable callable, Duration timeout) Task submit(Callable callable, RetryPolicy retryPolicy, Duration timeout) Task submit(String name, Callable callable, RetryPolicy retryPolicy, Duration timeout) ``` 等待任务: ```java Outcome await(Collection> tasks) Outcome await(Task first, Task... rest) List awaitAll(Collection> tasks) List awaitAll(Task first, Task... rest) ScopeMetricsSnapshot metrics() ``` 调度任务: ```java ScheduledTask schedule(Duration delay, Callable callable) ScheduledTask schedule(Duration delay, Runnable runnable) ScheduledTask scheduleAtFixedRate(Duration initial, Duration period, Runnable runnable) ScheduledTask scheduleWithFixedDelay(Duration initial, Duration delay, Runnable runnable) ``` 优先级调度器: ```java Scheduler.priority(int size) ``` 清理回调: ```java scope.defer(() -> resource.close()); ``` ### Task ```java long id() String name() Task.State state() boolean isDone() boolean isCancelled() boolean isFailed() boolean cancel() T await() CompletableFuture toCompletableFuture() CompletableFuture thenApply(Function fn) CompletableFuture thenCompose(Function> fn) CompletableFuture exceptionally(Function fn) ``` ### FailurePolicy - `FAIL_FAST`:首个失败直接抛出,并取消其他任务 - `COLLECT_ALL`:等待全部结束,若有失败抛 `AggregateException` - `SUPERVISOR`:不自动取消,失败写入 `Outcome` - `CANCEL_OTHERS`:失败后取消其余任务,不直接抛出 - `IGNORE_ALL`:忽略失败,返回不含失败的 `Outcome` ### RetryPolicy - `RetryPolicy.noRetry()`:默认行为,只执行 1 次 - `RetryPolicy.attempts(n)`:最多执行 `n` 次(不含延迟) - `RetryPolicy.fixedDelay(n, delay)`:固定间隔重试 - `RetryPolicy.exponentialBackoff(n, initial, multiplier, max)`:指数退避 ### TaskPriority - `TaskPriority.HIGH` - `TaskPriority.NORMAL` - `TaskPriority.LOW` 说明: - 配合 `Scheduler.priority(...)` 使用时,队列会优先执行高优先级任务 - 同优先级按提交顺序执行(FIFO) - 在非优先级调度器下可正常运行,但不保证严格优先级顺序 ### Context ```java Context.put("traceId", "req-1001"); String traceId = Context.get("traceId"); Context.remove("traceId"); Context.clear(); Map values = Context.snapshot(); ``` - `Context` 会在 `submit/schedule` 时自动传播到任务线程(平台线程与虚拟线程都支持) - 任务结束后会自动恢复线程原始上下文,避免线程复用导致串值 ### OpenTelemetry - `withOpenTelemetry()`:使用默认 instrumentation name `io.threadforge` - `withOpenTelemetry("your.instrumentation.name")`:指定 instrumentation name - 若 classpath 缺少 OpenTelemetry API,会在启用时快速失败并提示依赖 ### ThreadHook ```java .withHook(new ThreadHook() { @Override public void onStart(TaskInfo info) {} @Override public void onSuccess(TaskInfo info, Duration duration) {} @Override public void onFailure(TaskInfo info, Throwable error, Duration duration) {} @Override public void onCancel(TaskInfo info, Duration duration) {} }); ``` ### ScopeMetricsSnapshot(内置指标) ```java ScopeMetricsSnapshot snapshot = scope.metrics(); long started = snapshot.started(); long succeeded = snapshot.succeeded(); long failed = snapshot.failed(); long cancelled = snapshot.cancelled(); long completed = snapshot.completed(); Duration total = snapshot.totalDuration(); Duration avg = snapshot.averageDuration(); Duration max = snapshot.maxDuration(); ``` ## 示例 ### 1. 并发 RPC 聚合 ```java try (ThreadScope scope = ThreadScope.open()) { Task user = scope.submit("user", () -> "u-100"); Task orders = scope.submit("orders", () -> 3); scope.await(user, orders); String profile = user.await() + ":" + orders.await(); } ``` ### 2. 并发度控制 ```java try (ThreadScope scope = ThreadScope.open().withConcurrencyLimit(50)) { List> tasks = new ArrayList<>(); for (int id : ids) { tasks.add(scope.submit(() -> externalApiCall(id))); } List values = scope.awaitAll(tasks); } ``` ### 3. 全量收集失败 ```java try (ThreadScope scope = ThreadScope.open().withFailurePolicy(FailurePolicy.SUPERVISOR)) { Task ok = scope.submit(() -> 1); Task bad = scope.submit(() -> { throw new IllegalStateException("boom"); }); Outcome outcome = scope.await(ok, bad); if (outcome.hasFailures()) { // 统一处理失败 } } ``` ### 4. 超时取消 ```java try (ThreadScope scope = ThreadScope.open().withDeadline(Duration.ofMillis(200))) { Task a = scope.submit(() -> rpcA()); Task b = scope.submit(() -> rpcB()); scope.await(a, b); } catch (ScopeTimeoutException timeout) { fallback(); } ``` ### 5. 任务级超时(Per-Task Timeout) ```java try (ThreadScope scope = ThreadScope.open() .withFailurePolicy(FailurePolicy.SUPERVISOR)) { Task slow = scope.submit( "slow-rpc", () -> callSlowRpc(), Duration.ofMillis(150) ); Outcome outcome = scope.await(slow); if (outcome.hasFailures() && outcome.failures().get(0) instanceof TaskTimeoutException) { // 只处理该任务超时,不影响其他任务继续执行 } } ``` ### 6. 上下文自动传播(Context Propagation) ```java Context.put("traceId", "req-1001"); try (ThreadScope scope = ThreadScope.open().withScheduler(Scheduler.fixed(4))) { Task trace = scope.submit(() -> Context.get("traceId")); assert "req-1001".equals(trace.await()); } ``` ### 7. 优先级队列 ```java try (ThreadScope scope = ThreadScope.open() .withScheduler(Scheduler.priority(4))) { Task low = scope.submit("low", () -> doLow(), TaskPriority.LOW); Task high = scope.submit("high", () -> doHigh(), TaskPriority.HIGH); scope.await(low, high); } ``` ### 8. 生产者-消费者 ```java try (ThreadScope scope = ThreadScope.open()) { Channel ch = Channel.bounded(128); scope.submit(() -> { for (int i = 1; i <= 5; i++) { ch.send(i); } ch.close(); return null; }); Task sum = scope.submit(() -> { int total = 0; for (Integer v : ch) { total += v; } return total; }); Integer result = sum.await(); } ``` ### 9. 延迟与周期任务 ```java try (ThreadScope scope = ThreadScope.open()) { scope.schedule(Duration.ofMillis(200), () -> doOnce()); ScheduledTask heartbeat = scope.scheduleAtFixedRate( Duration.ofMillis(0), Duration.ofSeconds(5), () -> reportHeartbeat() ); // 需要时可取消 heartbeat.cancel(); } ``` ### 10. 组合式写法 ```java try (ThreadScope scope = ThreadScope.open()) { Task base = scope.submit(() -> 21); Integer value = base .thenApply(v -> v * 2) .thenCompose(v -> CompletableFuture.completedFuture(v + 1)) .exceptionally(err -> 0) .join(); } ``` ### 11. OpenTelemetry 追踪 ```java try (ThreadScope scope = ThreadScope.open() .withOpenTelemetry("io.threadforge.demo")) { Task task = scope.submit("rpc-a", () -> callRemote()); task.await(); } ``` ### 12. 失败自动重试 ```java try (ThreadScope scope = ThreadScope.open() .withRetryPolicy(RetryPolicy.fixedDelay(3, Duration.ofMillis(50)))) { Task task = scope.submit("flaky-rpc", () -> callRemote()); scope.await(task); String value = task.await(); } ``` ## JDK 兼容性 - JDK 21+:`Scheduler.detect()` 会优先使用虚拟线程执行器 - JDK 8-20:自动降级到通用线程池执行 - 对外 API 保持一致,无需按 JDK 分叉业务代码 ## 构建与测试 ```bash mvn verify ``` 项目已启用 JaCoCo 覆盖率门禁:`LINE >= 80%`。 ## API 文档与 GitHub Wiki - 完整 API 文档(按功能目录分类):`docs/api/README.md` - Wiki 生成清单:`docs/api/wiki-manifest.tsv` - 生成本地 Wiki 页面: ```bash ./scripts/generate-github-wiki.sh ``` - 发布到 GitHub Wiki: ```bash ./scripts/publish-github-wiki.sh git@github.com:/.git ``` 默认会生成到 `docs/github-wiki/`,并包含 `Home.md` 与 `_Sidebar.md`。 ## License MIT,详见 `LICENSE`。