1 Star 1 Fork 0

OSSCore/OSS.DataFlow

Create your Gitee Account
Explore and code with more than 12 million developers,Free private repositories !:)
Sign up
Clone or Download
contribute
Sync branch
Cancel
Notice: Creating folder will generate an empty file .keep, because not support in Git
Loading...
README
Apache-2.0

OSS.DataFlow中间件

系统重构解耦的过程涉及不同领域服务分拆,或同一服务下实时响应部分和非响应部分分拆,分解后的各部分通过异步消息的流转传递,完成整体的业务逻辑,但是频繁的在业务层面直接调用不同消息队列的SDK,不够简洁
OSS.Dataflow主要实现异步消息传递的过程抽象,在业务层面提供消息发布订阅的统一抽象接口,在业务逻辑分支之间,以简单的调用完成消息的传递,和具体的消息存储触发实现无关。
同时,在底层的存储和触发层面提取接口,能够在系统的全局适配具体的消息基础设施。在这些接口之上,实现事件处理器,通过消息的重复投放,实现事件执行的容错补充机制。

详细说明文档:https://www.yuque.com/osscore/nfbh0u/aydlz9

消息流业务使用

使用时可以通过Nuget直接安装,也可以通过命令行: Install-Package OSS.DataFlow
组件的使用非常简单,只需要关注:

a. 消息发布者接口,由组件注册时返回,供业务方法调用传入消息体。
b. 消息订阅(消费)者接口实现或委托方法,在组件注册时传入。

具体示例:
a. 消息的发布订阅独立调用示例

	// 全局初始化,注入订阅者实现
	const string msgPSKey = "Publisher-Subscriber-MsgKey";
	DataFlowFactory.RegisterSubscriber<MsgData>(msgPSKey, async (data) =>
            {
                // 当前通过注入消费的委托方法,也可通过接口实现
                // DoSomething(data);
                return true;
            });

	//	获取发布者接口
	private static readonly IDataPublisher publisher = DataFlowFactory.CreatePublisher(); 

	//  业务方法中发布消息
	await publisher.Publish(msgPSKey,new MsgData() {name = "test"});

b. 消息的流式调用示例

	// 直接注册消费实现并获取消息发布接口
	private static readonly IDataPublisher _delegateFlowpusher = 
        DataFlowFactory.RegisterFlow<MsgData>("delegate_flow",async (data) =>
            {
                // 当前通过注入消费的委托方法,也可通过接口实现
                // DoSomething(data);
                return true;
            });

	// 业务方法中发布消息
    await _delegateFlowpusher.Publish("normal_flow",new MsgData() {name = "test"});

如上,只需要获取发布者,并注入消费实现,即可完成整个消息的异步消费处理,同一个消息key可以注册多个消费实现,当有消息进入消费时,会并发处理。

消息底层存储适配扩展

前边介绍了业务接口的使用,和具体消息队列或数据库等隔离,这是对接业务层面的使用。因为业务场景不同,不同的项目对消息的响应速度和处理机制又各有需求,所以 OSS.DataFlow 同样提供了对接消息产品的扩展接口,方便使用者适配已有消息基础设施。

  1. 消息存储适配接口
    对于事件消息处理,需要关注两件事情:接收存储 和 消费触发。在类库中提供了 DataFlowManager 消息流管理类,用户可以通过实现IDataPublisherProvider接口,完成具体的存储实现。
    同时在不同的消息产品触发消费时(比如数据库定时任务或者RabbitMQ消费), 调用通知方法(NotifySubscriber ),来触发通过类库注册的具体的业务订阅处理。
    // 消息流核心部件管理者
    public static class DataFlowManager
    {
        /// <summary>
        /// 自定义 数据流发布(存储)实现的 提供者
        /// </summary>
        public static IDataPublisherProvider PublisherProvider { get; set; }

        /// <summary>
        ///  通过自定义消息触发机制通知订阅者
        ///     调用时请做异常拦截,防止脏数据导致 msgData 类型错误
        /// </summary>
        /// <param name="msgDataKey"></param>
        /// <param name="msgData">消息内容,自定义触发时,请注意和注册订阅者的消费数据类型转换安全</param>
        /// <returns></returns>
        public static Task<bool> NotifySubscriber(string msgDataKey, object msgData)
        {
            ....
        }
    }

关于 IDataPublisherProvider

 	public interface IDataPublisherProvider
    {
        /// <summary>
        /// 数据发布者
        /// </summary>
        /// <param name="option"></param>
        /// <returns> 返回消息发布接口实现 </returns>
        IDataPublisher CreatePublisher(DataPublisherOption option);
    }

	/// <summary>
    ///  数据的发布者
    /// </summary>
    public interface IDataPublisher
    {
        /// <summary>
        /// 推进数据(存储具体消息队列或者数据库实现)
        /// </summary>
        /// <param name="dataKey"></param>
        /// <param name="data"></param>
        /// <returns>是否推入成功</returns>
        Task<bool> Publish<TData>(string dataKey,TData data);
    }

可以看到 IDataPublisher 接口负责具体的存储实现,可以根据 DataPublisherOption 的 source_name 业务属性实现对不同业务需求返回不同的具体实现。

  1. 默认实现介绍

借助.Net 自身的内存消息队列,在类库中提供了默认的内部消息存储转发实现(内存级别),使用者可以自行实现扩展相关接口并进行全局配置。
内置的.Net Core消息队列, 设置了默认1个队列,最大并发为32线程。 如果需要可以通过设置DataPublisherOption的source_name,类库将会为每个source_name 创建独立的内存队列。

回流(重复执行)事件处理器

在有些比较重要的业务处理中,如果发生异常(如网络超时)等操作,会要求过段时间后重复执行进行错误补偿,借助OSS.DataFlow 类库的消息存储和转发接口,提供了 FlowEventProcessor<TIn, TOut> 的事件处理器,在事件处理器内部完成了异常拦截重试的封装。
具体的过程就是当异常发生时,处理器通过将入参包装(FlowEventInput)通过消息流保存,具体的重试触发实现间隔,由开发者根据 MsgKey和参数 FlowEventInput 自行定制实现即可。

一. 使用示例:

 	// 具体执行事件
    public class TestEvent:IFlowEvent<TestCount, TestCount>
    {
        public Task<TestCount> Execute(TestCount input)
        {
            input.count++;
            if (input.count < 10)
            {
                throw new ArgumentException("小于当前要求的条件");
            }
            return Task.FromResult(input);
        }

        public Task Failed(TestCount input)
        {
             Console.WriteLine(input.count);
             return Task.CompletedTask;
        }
    }
    public class TestCount
    {
        public int count { get; set; }
    }

	//  单元测试方法
   	[TestMethod]
   	public async Task DataStackTest()
   	{
       var o = new FlowEventOption()
       {
           event_msg_key = "Test_flow_event_msg",
           flow_retry_times = 4, // 经过消息流重试次数
           func_retry_times = 1  //  当前执行方法内部直接串联循环重试次数
       };
       var flowProcessor = new FlowEventProcessor<TestCount,TestCount>(new TestEvent(), o);
       
       var countPara    = new TestCount() {count = 0};

       var countRes = await flowProcessor.Process(countPara);
       Assert.IsNull(countRes); // 首次抛出异常,拦截返回空

       await Task.Delay(5000);  // 异步消息队列消费缓冲

       // 总执行次数 = (flow_retry_times+1)*(func_retry_times+1) = (4+1)*(1+1) = 10
       Assert.IsTrue(countPara.count==10);// 默认消息队列实现是内存级,countPara引用不变
   	}

二. 使用介绍
上例中 TestCount为默认入参和出参类型。 继承至 IFlowEvent<TestCount, TestCount> 的 TestEvent 为具体执行事件,其定义如下:

 	public interface IFlowEvent<in TIn,TOut>
    {
        /// <summary>
        /// 具体事件执行
        /// </summary>
        /// <param name="input"></param>
        /// <returns></returns>
        Task<TOut> Execute(TIn input);

        /// <summary>
        ///   最终失败执行方法
        /// </summary>
        /// <param name="input"></param>
        /// <returns></returns>
        Task Failed(TIn input);
    }

FlowEventOption为当前事件执行重试等参数项。其定义如下:

 	public class FlowEventOption
    {
        /// <summary>
        ///  保存事件消息的key(用来保存消息到消息流订阅重试)
        /// </summary>
        public string event_msg_key { get; set; }

        /// <summary>
        ///  异常时 推送消息到消息流,通过订阅方式重试运行次数,默认:0
        /// </summary>
        public int flow_retry_times { get; set; } = 1;

        /// <summary>
        /// 异常时 在推送消息流之前,在当前执行方法内部直接串联循环重试次数,默认:1
        /// </summary>
        public int func_retry_times { get; set; } = 0;

        /// <summary>
        ///  消息流的可选项
        /// </summary>
        public DataFlowOption flow_option { get; set; } 
    }
Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

About

主要实现异步消息传递的过程抽象,在业务层面提供消息发布订阅的统一抽象接口,在业务逻辑分支之间,以简单的调用完成消息的传递,和具体的消息存储触发实现无关。在底层的存储和触发层面提取接口,能够在系统的全局适配具体的消息基础设施。 expand collapse
C#
Apache-2.0
Cancel

Releases

No release

Contributors

All

Activities

Load More
can not load any more
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/osscore/oss.dataflow.git
git@gitee.com:osscore/oss.dataflow.git
osscore
oss.dataflow
OSS.DataFlow
main

Search

344bd9b3 5694891 D2dac590 5694891