# OSS.PipeLine **Repository Path**: osscore/OSS.PipeLine ## Basic Information - **Project Name**: OSS.PipeLine - **Description**: 流式事件处理,微服务下业务生命周期管理,强化业务的流程管理,建立业务操作边界,打造标准化的业务执行单元,提高代码复用。 - **Primary Language**: Unknown - **License**: GPL-3.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 4 - **Forks**: 3 - **Created**: 2021-06-01 - **Last Updated**: 2026-04-19 ## Categories & Tags **Categories**: process-engine **Tags**: None ## README # OSS.PipeLine 轻量级 .NET 流程引擎与数据管道库,用于构建异步数据处理管道和业务流程。 ## 项目组成 | 库 | 描述 | |---|---| | **OSS.DataPipe** | 轻量级消息管道库,支持生产-消费模式和基于消息管道的自动重试事件管理 | | **OSS.Pipeline** | 轻量级流程引擎,提供 Activity、Branch、Convertor、Enumerator 组件 | ## 架构关系 ``` ┌─────────────────────────────────────────────────────────────┐ │ OSS.Pipeline │ │ (Workflow Engine - Activity, Branch, Convertor, Enumerator)│ └───────────────────────┬─────────────────────────────────────┘ │ depends on ▼ ┌─────────────────────────────────────────────────────────────┐ │ OSS.DataPipe │ │ (Message Pipe - Producer/Consumer with Retry Support) │ └─────────────────────────────────────────────────────────────┘ ``` ## 安装 ```bash # NuGet 安装 Install-Package OSS.DataPipe Install-Package OSS.Pipeline ``` --- ## OSS.DataPipe 使用指南 ### 核心接口 | 接口 | 描述 | |------|------| | `IDataProducer` | 数据生产者,通过 `Push()` 推送数据 | | `IDataConsumer` | 数据消费者,通过 `Pop()` 消费数据 | | `IDataPipeProvider` | 扩展接口,支持自定义队列实现 | ### 基本使用 #### 1. 创建数据生产者 ```csharp using OSS.DataPipe; // 推荐使用静态全局定义(当前未指定特定队列实现,内部会使用默认内存队列) private static IDataProducer producer = DataPipeFactory.CreateProducer(async (data) => { Console.WriteLine($"处理数据: {data}"); return true; // 返回 true 表示成功,false 会触发重试 }); ``` #### 2. 推送数据 ```csharp await producer.Push("测试数据"); ``` #### 3. 资源释放 如果通过 IDataPipeProvider 接口自定义消息队列机制,关联需要释放的资源,使用: ```csharp DataPipeFactory.Release(producer); ``` #### 自定义队列 Provider 实现 `IDataPipeProvider` 接口可集成 RabbitMQ、Kafka 等消息队列: ```csharp // 自定义消息Provider, 根据传入 SourceCode ,提供不同的自定义消息队列实现 internal class CustomDataPipeProvider : IDataPipeProvider { public IDataProducer? CreateProducer(IDataConsumer consumer, DataPipeOption option) { if (option.SourceCode == "CustomQueue001") return new CustomQueue001(consumer); return null; } // 如果需要,可自定义实现释放方法 public bool Release(IDataProducer input) => true; } // 自定义消息队列实现 internal class CustomQueue001 : IDataProducer { public IDataConsumer Consumer { get; } public CustomQueue001(IDataConsumer consumer) { Consumer = consumer; } public Task Push(TData data, CancellationToken? ct = null) { // to do something,such as pushing to RabbitMQ // for testing ,consume immediately return Consumer.Pop(data, ct); } } // 全局注入 Provider DataPipeFactory.PipeProvider = new CustomDataPipeProvider(); ``` ### 事件重试 (RetryProcessor) 此处理器主要作用是提供对事件方法的包装,在异常或者执行返回需要重试标识的基础上,根据重试次数设置,将入参推入消息管道进行多次消费。 ```csharp using OSS.DataPipe.Event; // 正常业务方法 private static Task DoSomething(string input) { Console.WriteLine($"execution input:{input}"); throw new Exception(); } // 定义 RetryProcessor, 自动重试 3 次 public static readonly RetryProcessor _eventRetryProcessor = new(DoSomething, new RetryPolicy(3)); // 具体调用: _eventRetryProcessor.Process("sssss"); ``` 同时,该 Processor 提供多种扩展 -- todo 创建 DataPipe.Processor.md 文档,并添加链接到这里,详细介绍 RetryProcessor 的其他使用方法 --- ## OSS.Pipeline 使用指南 每个业务流程都是由多个具体的业务事件组成,如果把每个业务事件当做一个Pipe, OSS.Pipeline的主要作用就是提供基础框架,在不侵入业务方法的基础上,将每个独立的事件方法串联成一个完整的业务流程。 在这个流程中,上游 Pipe 的输出作为下游 Pipe 的输入,只需要 上游节点输出类型和下游节点输入类型保持一致(继承BasePassiveActivity的被动节点除外),即可通过Pipeline将两个节点串联起来。 ### 组件类型 | 组件 | PipeType | 描述 | |------|----------|------| | **Activity** | `Activity` | 基本执行单元,处理业务逻辑 | | **BranchGateway** | `BranchGateway` | 条件分支,根据条件路由到不同路径 | | **Convertor** | `Convertor` | 类型转换器,转换数据类型 | | **Enumerator** | `Enumerator` | 枚举器,将集合展开为单个元素 | ### 构建简单流程 假设当前存在一个业务流程 step1: 输入 - string, 输出 - string step2: 输入 - string, 输出 - int step3: 输入 - int, 输出 - string 快速示例如下: ```csharp using OSS.Pipeline; var startPipe = new SimpleActivity("step1", async (input) => { Console.WriteLine($"{input} -> 步骤1完成"); return "Y"; }); var endPipe = startPipe.Append("step2", async (resStr) => { Console.WriteLine($"{resStr} -> 步骤2完成"); return resStr=="Y"?10:0; }) .Append("step3", async (count) => { Console.WriteLine($"{(count==10?"Succeeded":"Failed")} -> 步骤3完成"); return string.Empty; }); var pipeline = PipelineBuilder.Build("简单流程", startPipe, endPipe); var result = await pipeline.Run("测试流程"); ``` 或者: ```csharp var pipeline = PipelineBuilder.Build( "简单流程", "step1", async (input) => { Console.WriteLine($"{input} -> 步骤1完成"); return "Y"; }, start => { var endPipe = startPipe.Append("step2", async (resStr) => { Console.WriteLine($"{resStr} -> 步骤2完成"); return resStr=="Y"?10:0; }) .Append("step3", async (count) => { Console.WriteLine($"{(count==10?"Succeeded":"Failed")} -> 步骤3完成"); return string.Empty; }); return endPipe; } ); await pipeline.Run("测试流程"); ``` ### 分支流程 ```csharp // 创建分支网关 var branchGate = new BranchPipe("审核结果分支"); // 连接到主流程 createActivity .Append(reduceInventoryActivity) .Append(auditActivity) .Append(branchGate); // 分支1:审核通过 branchGate.Append( result => result.IsSuccess(), // 条件 new NotifyEmailActivity()); // 满足条件时执行 // 分支2:审核失败 branchGate.Append( "退回库存", result => !result.IsSuccess(), // 条件 async (result) => { await ReturnInventoryAsync(result); return new NotifyResult("已退回库存"); }); var pipeline = PipelineBuilder.Build("订单流程", createActivity, tailActivity); ``` ### 类型转换器 ```csharp var convertor = orderActivity.Append( order => new OrderDto { Id = order.Id, Amount = order.Amount }, "OrderToDto"); ``` ### 配置组件重试 ```csharp var activity = new SimpleActivity("外部API", async (input) => { return await CallExternalApiAsync(input); }) .SetRetryPolicy(new RetryPolicy { RetryTimes = 3, AutoReleaseDataPipe = true }); ``` ### 流程监控 ```csharp public class MyMonitor : IPipeMonitor { public async Task Monitor(MonitorDataItem data) { Console.WriteLine($"[{data.stage}] {data.pipe_code}"); // data.pipe_type - 组件类型 // data.input - 输入参数 // data.output - 输出结果 // data.executed_times - 执行次数 // data.exception - 异常信息 } } pipeline.SetMonitor(new MyMonitor()); ``` ### 自定义 Activity 组件 ```csharp public class CreateOrderActivity : BaseActivity { public CreateOrderActivity() : base("CreateOrder") { } protected override async Task> Executing( CreateOrderReq para, CancellationToken ct) { var order = await _orderService.CreateAsync(para); return new EventResult(order); // 成功 // return new EventResult(true, "创建失败"); // 需要重试 } protected override Task FinallySucceeded(CreateOrderReq para, OrderInfo result, CancellationToken ct) { // 成功后的处理 return Task.CompletedTask; } protected override Task FinallyFailed(CreateOrderReq para, RetryOutput result, CancellationToken ct) { // 最终失败后的处理 return Task.CompletedTask; } } ``` ### 被动 Activity 被动 Activity 不需要输入参数,通过 `AppendPassive` 添加到分支: ```csharp branchGate.AppendPassive("清理缓存", async (result) => { await ClearCacheAsync(result); return result; }); ``` --- ## 应用场景 | 场景 | DataPipe | Pipeline | |------|:--------:|:--------:| | 消息队列处理 | ✓ | | | 事件驱动架构 | ✓ | ✓ | | 订单处理流程 | | ✓ | | 审批工作流 | | ✓ | | 数据同步/ETL | ✓ | ✓ | | 任务编排 | | ✓ | --- ## 构建与测试 ```bash # 构建解决方案 dotnet build src/OSS.PipeLine.sln # 运行测试示例 dotnet run --project src/Tests/OSS.Pipeline.ConsoleTests/OSS.Pipeline.ConsoleTests.csproj ``` ## 目录结构 ``` src/ ├── OSS.DataPipe/ # 消息管道库 │ ├── Interface/ # 核心接口 │ ├── DefaultQueue/ # 默认内存队列 │ └── Event/ # 重试系统 ├── OSS.Pipeline/ # 流程引擎库 │ ├── Base/ # 基础抽象 │ ├── Pipeline/ # 流程构建器 │ └── Pipes/ # 组件实现 │ ├── Activity/ # 活动组件 │ ├── Branch/ # 分支组件 │ ├── Convertor/ # 转换器组件 │ └── Enumerator/ # 枚举器组件 └── Tests/ # 测试项目 ``` ## 详细文档 | 文档 | 描述 | |------|------| | [OSS.DataPipe 详细文档](docs/DataPipe.md) | 核心组件、重试机制、自定义队列实现 | | [OSS.Pipeline 详细文档](docs/Pipeline.md) | 组件类型、流程构建、监控配置 | ## 许可证 MIT License