# AsyncTool **Repository Path**: xzjsccz/async-tool ## Basic Information - **Project Name**: AsyncTool - **Description**: 异步任务调度工具,支持任务依赖、超时、重试、结果收集等功能,适合在服务端或桌面应用中快速构建复杂的任务流水线。学习京东开源项目使用c#写的一版,并在此基础上添加新的功能,使用简单方便,不用编写复杂的异步调度逻辑,只需关注业务代码即可 - **Primary Language**: C# - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 1 - **Created**: 2025-11-07 - **Last Updated**: 2026-06-03 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # AsyncTool 项目文档 > 异步任务调度工具,支持任务依赖、优先级、并发控制、整体超时、单任务超时、失败重试、失败忽略、回调和结果收集。 ## 项目结构 | 模块 | 路径 | 说明 | | --- | --- | --- | | 核心调度 | `Async/Async.cs` | `Async.Start/StartAsync` 负责启动任务图、优先级调度、超时与清理 | | 任务定义 | `WorkJob/WorkJob.cs` | 使用 Builder 配置任务 ID、执行委托、依赖、优先级、重试、超时与回调 | | 任务异常 | `WorkJob/WorkJobExecutionException.cs` | 任务失败时抛出的专用异常,保留 `WorkJobId` 和原始异常 | | 工具类 | `AsyncUtil/AsyncUtil.cs` | 管理任务组令牌、任务组 ID 与任务集合缓存 | | 结果缓存 | `WorkJobResult/WorkJobResult.cs` | 保存任务业务结果与异常信息 | | 全局结果上下文 | `WorkJobResult/WorkJobResultContext.cs` | 按 `asyncId` 保存任务组结果 | | 示例程序 | `Program.cs` | 演示较复杂的任务依赖链 | 构建和运行: ```bash dotnet build dotnet run ``` 运行测试: ```bash dotnet test AsyncTool.Tests/AsyncTool.Tests.csproj ``` ## 核心 API 当前代码中没有 `AsyncOptions` 类。并发度通过 `StartAsync` 的第三个参数控制,任务回调通过 `WorkJob.WithCallBack(...)` 配置。 ```csharp Task Async.StartAsync( IEnumerable workJobs, long timeoutMilliseconds, int? maxDegreeOfParallelism = null); string Async.Start( IEnumerable workJobs, long timeoutMilliseconds, int? maxDegreeOfParallelism = null); ``` - `workJobs`:传入根任务或任务集合。调度器会根据依赖关系收集完整任务图。 - `timeoutMilliseconds`:任务组整体超时时间,超时会抛出 `TimeoutException`。 - `maxDegreeOfParallelism`:最大并发任务数。`null` 或 `<= 0` 时使用默认并发度。 ## 快速上手 下面示例构建 `load-config -> load-users -> generate-report` 的依赖链,并读取最终结果。 ```csharp using AsyncTool; using AsyncTool.Jobs; using AsyncTool.Results; var loadConfig = WorkJob.CreateBuilder() .WithId("load-config") .WithPriority(100) .WithWork(async () => { await Task.Delay(100); return (object)"config"; }) .WithCallBack( onBegin: context => Console.WriteLine($"{context.WorkJobId} started"), onCompleted: context => Console.WriteLine($"{context.WorkJobId} finished")) .Build(); var loadUsers = WorkJob.CreateBuilder() .WithId("load-users") .WithPriority(80) .WithWork(async context => { var config = context.GetResult("load-config"); await Task.Delay(100); return (object)$"users with {config}"; }) .Build(); var generateReport = WorkJob.CreateBuilder() .WithId("generate-report") .WithPriority(60) .WithWork(async context => { var users = context.GetResult("load-users"); await Task.Delay(100); return (object)$"report: {users}"; }) .Build(); loadConfig.Next(loadUsers); loadUsers.Next(generateReport); var asyncId = await Async.StartAsync( new[] { loadConfig }, timeoutMilliseconds: 3000, maxDegreeOfParallelism: 2); var resultContext = WorkJobResultContext.GetResult(asyncId); var report = resultContext?.GetResult("generate-report"); Console.WriteLine(report); ``` ## 依赖与结果 - 使用 `Next(...)` 建立强依赖。 - 一个任务可以连接多个后续任务。 - 一个任务也可以依赖多个前置任务,只有所有前置任务完成后才会执行。 - 结果通过任务 ID 读取,例如 `resultContext.GetResult("job-id")`。 - 异常通过 `resultContext.GetException("job-id")` 读取。 ```csharp var prepare = WorkJob.CreateBuilder() .WithId("prepare") .WithWork(async () => (object)"ready") .Build(); var fetchA = WorkJob.CreateBuilder() .WithId("fetch-a") .WithWork(async () => (object)"A") .Build(); var fetchB = WorkJob.CreateBuilder() .WithId("fetch-b") .WithWork(async () => (object)"B") .Build(); var aggregate = WorkJob.CreateBuilder() .WithId("aggregate") .WithWork(async context => { var a = context.GetResult("fetch-a"); var b = context.GetResult("fetch-b"); return (object)$"{a}+{b}"; }) .Build(); prepare.Next(fetchA, fetchB); fetchA.Next(aggregate); fetchB.Next(aggregate); var asyncId = await Async.StartAsync(new[] { prepare }, 5000, maxDegreeOfParallelism: 2); var result = WorkJobResultContext.GetResult(asyncId)?.GetResult("aggregate"); ``` ## 优先级与并发控制 `WithPriority` 的值越大优先级越高。调度器会在“依赖已满足”的任务集合中优先选择高优先级任务执行,同时受 `maxDegreeOfParallelism` 限制。 ```csharp var high = WorkJob.CreateBuilder() .WithId("high") .WithPriority(100) .WithWork(async () => (object)"high") .Build(); var low = WorkJob.CreateBuilder() .WithId("low") .WithPriority(10) .WithWork(async () => (object)"low") .Build(); await Async.StartAsync( new[] { low, high }, timeoutMilliseconds: 2000, maxDegreeOfParallelism: 1); ``` ## 超时与协作式取消 任务组整体超时由 `StartAsync` 的 `timeoutMilliseconds` 控制。若任务希望及时响应取消,可以使用 `WithCancellableWork(...)` 并在任务内传递或检查 `CancellationToken`。 ```csharp var slow = WorkJob.CreateBuilder() .WithId("slow") .WithCancellableWork(async cancellationToken => { await Task.Delay(3000, cancellationToken); return (object)"slow"; }) .Build(); try { await Async.StartAsync(new[] { slow }, timeoutMilliseconds: 1000); } catch (TimeoutException ex) { Console.WriteLine(ex.Message); } ``` 单个任务超时通过 `WithTimeout(milliseconds)` 配置。 ```csharp var timeoutJob = WorkJob.CreateBuilder() .WithId("timeout") .WithTimeout(200) .WithWork(async () => { await Task.Delay(1000); return (object)"too-slow"; }) .Build(); ``` ## 失败重试 `WithRetry(count)` 表示失败后最多重试 `count` 次。例如 `WithRetry(2)` 最多执行 3 次:首次执行 + 2 次重试。 ```csharp var attempt = 0; var retryJob = WorkJob.CreateBuilder() .WithId("retry") .WithRetry(2) .WithWork(async () => { attempt++; if (attempt < 3) { throw new InvalidOperationException("fail"); } await Task.Delay(100); return (object)"success"; }) .Build(); var asyncId = await Async.StartAsync(new[] { retryJob }, 5000); var result = WorkJobResultContext.GetResult(asyncId)?.GetResult("retry"); ``` ## 忽略失败 `WithIgnoreFail(true)` 会让任务失败后被视为已完成,后续依赖任务可以继续执行。失败异常不会作为业务结果写入,业务结果读取为 `null`;如需诊断,可通过 `GetException(jobId)` 获取异常。 ```csharp var optional = WorkJob.CreateBuilder() .WithId("optional") .WithIgnoreFail(true) .WithWork(async () => { throw new InvalidOperationException("optional failed"); }) .Build(); var next = WorkJob.CreateBuilder() .WithId("next") .WithWork(async context => { var optionalResult = context.GetResult("optional"); // null return (object)"continue"; }) .Build(); optional.Next(next); var asyncId = await Async.StartAsync(new[] { optional }, 5000); var exception = WorkJobResultContext.GetResult(asyncId)?.GetException("optional"); ``` ## 失败处理 普通任务失败时,调度器会抛出 `WorkJobExecutionException`。异常中包含失败任务 ID 和原始异常。 ```csharp try { await Async.StartAsync(new[] { retryJob }, 5000); } catch (WorkJobExecutionException ex) { Console.WriteLine(ex.WorkJobId); Console.WriteLine(ex.InnerException?.Message); } ``` ## 图校验与任务生命周期 启动前调度器会校验任务图: - 任务 ID 不能重复。 - 依赖图不能存在循环依赖。 - `WorkJob` 实例不可重复运行;一次任务组执行完成后,如需再次执行,请重新创建 `WorkJob` 实例。 ## 关于 `Stop` `Async.Stop(asyncId)` 会取消指定任务组、标记任务失败并清理缓存和结果。当前 `StartAsync` 是等待任务组完成后才返回 `asyncId`,因此通常通过整体超时和协作式取消控制任务生命周期。成功完成的任务组会自动清理内部 token/任务缓存,但结果会保留在 `WorkJobResultContext` 中供读取。