diff --git a/WorkFlowCore/WorkFlowCore.Framework/Conditions/WorkStepCondition.cs b/WorkFlowCore/WorkFlowCore.Framework/Conditions/WorkStepCondition.cs new file mode 100644 index 0000000000000000000000000000000000000000..a6ebc3d4b134019cf1344ef58cea62b86ff6e906 --- /dev/null +++ b/WorkFlowCore/WorkFlowCore.Framework/Conditions/WorkStepCondition.cs @@ -0,0 +1,93 @@ +using Newtonsoft.Json.Linq; +using System; +using System.Collections.Generic; +using System.Text; +using System.Text.RegularExpressions; +using WorkFlowCore.Conditions; + +namespace WorkFlowCore.Framework.Conditions +{ + [Condition("审批步骤一般条件处理器", "!=|>=|<=|<|>|==|⊆|∈,参数被 value() 方法包含时解析,用于解析当前审批步骤的表单")] + public class WorkStepCondition : ICondition + { + private VType GetValue(string expression,string formData) + { + //被 value() 方法包含时,需要解析,否则原值返回 + var regex = new Regex(@"value\((?.*)\)"); + + if (regex.IsMatch(expression)) + + { + JObject jObject = JObject.Parse(formData); + var token = jObject.SelectToken(regex.Match(expression).Groups["value"].Value); + return token.Value(); + } + else return (VType)Convert.ChangeType(expression,typeof(VType)); + } + public bool CanAccept(ConditionInput input) + { + //不填就不处理 + if(string.IsNullOrEmpty(input.CurrentWorkStep.FormData)) return true; + try + { + //简单的表达式解析 + + var queryExpressions = input.Expression.Split(new string[] { "&&" }, StringSplitOptions.None); + var regexy = new Regex("(?!=|>=|<=|<|>|==|⊆|∈)"); + + var paramList = new List(); + + foreach (var expression in queryExpressions) + { + if (!regexy.IsMatch(expression)) continue; + var operation = regexy.Match(expression).Groups["operation"].Value; + + var expressionInfo = expression.Split(new string[] { operation }, StringSplitOptions.None); + if (expressionInfo.Length != 2) continue; + + var key = expressionInfo[0].Trim(); + var keyValue = GetValue(key, input.CurrentWorkStep.FormData) ;//TODO 函数解析key获取数据 + var value = expressionInfo[1].Trim(); + var valueValue = GetValue(value, input.CurrentWorkStep.FormData);//TODO 函数解析value获取数据 + var result = false; + + switch (operation) + { + case "!=": + result = !keyValue.Equals(valueValue); + break; + case ">=": + result = decimal.Parse(keyValue)>=decimal.Parse(valueValue); + break; + case "<=": + result = decimal.Parse(keyValue) <= decimal.Parse(valueValue); + break; + case ">": + result = decimal.Parse(keyValue) > decimal.Parse(valueValue); + break; + case "<": + result = decimal.Parse(keyValue) < decimal.Parse(valueValue); + break; + case "==": + result = keyValue.Equals(valueValue); + break; + case "⊆"://包含 + result = keyValue.Contains(valueValue); + break; + case "∈"://属于 + result = valueValue.Contains(keyValue); + break; + default: + break; + } + if (!result) return false; + } + return true; + } + catch (Exception) + { + return false; + } + } + } +} diff --git a/WorkFlowCore/WorkFlowCore.Framework/EventHandlers/AutoHandleStepsEventHandler.cs b/WorkFlowCore/WorkFlowCore.Framework/EventHandlers/AutoHandleStepsEventHandler.cs new file mode 100644 index 0000000000000000000000000000000000000000..e7269b63cac8803bd4365bdffccc50829fa2dbab --- /dev/null +++ b/WorkFlowCore/WorkFlowCore.Framework/EventHandlers/AutoHandleStepsEventHandler.cs @@ -0,0 +1,28 @@ +using System; +using System.Collections.Generic; +using System.Text; +using WorkFlowCore.Common.EventBus; +using WorkFlowCore.EventData; +using WorkFlowCore.WorkTasks; + +namespace WorkFlowCore.Framework.EventHandlers +{ + public class AutoHandleStepsEventHandler : IEventHandler + { + private readonly WorkflowManager workTaskManager; + + public AutoHandleStepsEventHandler(WorkflowManager workTaskManager) + { + this.workTaskManager = workTaskManager; + } + + public void Handle(AutoHandleStepsEventData data) + { + if (data == null || data.Steps == null) return; + foreach (var step in data.Steps) + { + workTaskManager.PassApprove(step.Id, data.Comment, string.Empty).Wait(); + } + } + } +} diff --git a/WorkFlowCore/WorkFlowCore.Framework/WorkFlowCoreFrameworkService.cs b/WorkFlowCore/WorkFlowCore.Framework/WorkFlowCoreFrameworkService.cs index a8f72cd349bae393f88de95582135c8459538281..473988e19404faf628df4747c4f725603f48e023 100644 --- a/WorkFlowCore/WorkFlowCore.Framework/WorkFlowCoreFrameworkService.cs +++ b/WorkFlowCore/WorkFlowCore.Framework/WorkFlowCoreFrameworkService.cs @@ -65,6 +65,7 @@ namespace WorkFlowCore.Framework //事件处理 services.AddScoped(); services.AddScoped(); + services.AddScoped(); var assembly = typeof(WorkFlowCoreFrameworkService).Assembly; services.Replace(new ServiceDescriptor(typeof(IWorkflowSession),typeof(DefaultSession), ServiceLifetime.Scoped)); diff --git a/WorkFlowCore/WorkFlowCore.Host/Controllers/WorkFlowController.cs b/WorkFlowCore/WorkFlowCore.Host/Controllers/WorkFlowController.cs index d34e253b96f720dc9573eafe35e9cdb81050a158..3c96111f68bbc56dcb1bad9a1eefa0ac8420dedf 100644 --- a/WorkFlowCore/WorkFlowCore.Host/Controllers/WorkFlowController.cs +++ b/WorkFlowCore/WorkFlowCore.Host/Controllers/WorkFlowController.cs @@ -220,7 +220,7 @@ namespace WorkFlowCore.Host.Controllers [HttpPost("PassProve")] public async Task>> PassProve(ProveInput input) { - var proveResult = await workflowManager.PassApprove(input.StepId, input.Comment, input.ResourceIds); + var proveResult = await workflowManager.PassApprove(input.StepId, input.Comment, input.ResourceIds,input.FormData); if (proveResult.Code == ProveResult.ProveResultCode.SUCCESS) return OutputDto.Succeed(proveResult.WorkSteps); return OutputDto.Failed>(proveResult.Msg); @@ -233,7 +233,7 @@ namespace WorkFlowCore.Host.Controllers [HttpPost("RejectProve")] public async Task>> RejectProve(ProveInput input) { - var proveResult = await workflowManager.RejectApprove(input.StepId, input.Comment, input.ResourceIds); + var proveResult = await workflowManager.RejectApprove(input.StepId, input.Comment, input.ResourceIds, input.FormData); if (proveResult.Code == ProveResult.ProveResultCode.SUCCESS) return OutputDto.Succeed(proveResult.WorkSteps); return OutputDto.Failed>(proveResult.Msg); diff --git a/WorkFlowCore/WorkFlowCore.Host/ViewModels/WorkFlowCore/ProveInput.cs b/WorkFlowCore/WorkFlowCore.Host/ViewModels/WorkFlowCore/ProveInput.cs index 420bf8b091bd196b165022dbb4c0331502c6ff3e..5681d05988021a8692f756911c72d20511886276 100644 --- a/WorkFlowCore/WorkFlowCore.Host/ViewModels/WorkFlowCore/ProveInput.cs +++ b/WorkFlowCore/WorkFlowCore.Host/ViewModels/WorkFlowCore/ProveInput.cs @@ -23,5 +23,9 @@ namespace WorkFlowCore.Host.ViewModels.WorkFlowCore /// 附件id集合 /// public string ResourceIds { get; set; } + /// + /// 审批表单,与发起流程的表单是有区别 + /// + public string FormData { get; set; } } } diff --git a/WorkFlowCore/WorkFlowCore.Host/WorkFlowCore.Host.csproj b/WorkFlowCore/WorkFlowCore.Host/WorkFlowCore.Host.csproj index 7aed17f987449720b203713acf60aa78dbf29aeb..721d5762a3011d5a604aef3484c706c68d3aecdb 100644 --- a/WorkFlowCore/WorkFlowCore.Host/WorkFlowCore.Host.csproj +++ b/WorkFlowCore/WorkFlowCore.Host/WorkFlowCore.Host.csproj @@ -65,4 +65,8 @@ + + + + diff --git a/WorkFlowCore/WorkFlowCore/EventBus/BaseEventData.cs b/WorkFlowCore/WorkFlowCore/EventBus/BaseEventData.cs new file mode 100644 index 0000000000000000000000000000000000000000..40840deccbfa523f02e8bedc7d0d6a91a8dc3df9 --- /dev/null +++ b/WorkFlowCore/WorkFlowCore/EventBus/BaseEventData.cs @@ -0,0 +1,10 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace WorkFlowCore.EventBus +{ + public class BaseEventData + { + } +} diff --git a/WorkFlowCore/WorkFlowCore/EventBus/EventBusManager.cs b/WorkFlowCore/WorkFlowCore/EventBus/EventBusManager.cs new file mode 100644 index 0000000000000000000000000000000000000000..d1de8cd90f80c64e867c31f808f5c8dbd16901bc --- /dev/null +++ b/WorkFlowCore/WorkFlowCore/EventBus/EventBusManager.cs @@ -0,0 +1,41 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; + +namespace WorkFlowCore.EventBus +{ + /// + /// 全局静态事件帮助类,便于在其它非注入渠道发起事件 + /// + public class EventBusManager + { + private static IServiceProvider serviceProvider; + internal static void Init(IServiceProvider serviceProvider) + { + EventBusManager.serviceProvider = serviceProvider; + } + + public IEventBus Instance() + { + return (IEventBus)serviceProvider.GetService(typeof(IEventBus)); + } + + public void Trigger(TData data) where TData:BaseEventData + { + if (data == null) return; + var services =serviceProvider.GetServices(); + foreach (var service in services) + { + try + { + service.Trigger(data); + } + catch (Exception ex) + { + Console.Error.WriteLine(ex.ToString()); + } + } + } + } +} diff --git a/WorkFlowCore/WorkFlowCore/EventBus/EventBusService.cs b/WorkFlowCore/WorkFlowCore/EventBus/EventBusService.cs new file mode 100644 index 0000000000000000000000000000000000000000..95027bc13332945da8034deffb4041ca307506f3 --- /dev/null +++ b/WorkFlowCore/WorkFlowCore/EventBus/EventBusService.cs @@ -0,0 +1,53 @@ +using WorkFlowCore.EventBus.Implements.Kafka; +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Text; +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.Configuration; + +namespace WorkFlowCore.EventBus +{ + public static class EventBusService + { + + public static IServiceCollection AddDefautEventBus(this IServiceCollection services,params Assembly[] assemblies) + { + services.AddSingleton(typeof(IEventBus), typeof(DefaultEventBus)); + services.AddSingleton(typeof(DefaultEventBus)); + foreach (var assembly in assemblies) + { + DefaultEventBus.RegistSubscriptions(assembly); + } + services.AddSingleton(); + return services; + } + public static IServiceCollection AddKafkaEventBus(this IServiceCollection services, Action options) + { + services.AddSingleton(typeof(IEventBus), typeof(KafkaEventBus)); + services.AddSingleton(typeof(KafkaEventBus)); + var config = new KafkaEventConfig(); + options?.Invoke(config); + services.AddSingleton(provider => config); + services.AddSingleton(); + return services; + } + public static IApplicationBuilder InitGlobalEventBus(this IApplicationBuilder app) + { + //注册普通事件,该事件订阅在单应用有效无法分布式 + EventBusManager.Init(app.ApplicationServices); + + //注册kafka作为分布式事件 + var kafkaEventBus = app.ApplicationServices.GetService(); + var config = app.ApplicationServices.GetService(); + var configuration = app.ApplicationServices.GetService(); + Console.WriteLine("servers:" + configuration["KafkaBootstrapServers"]); + if (kafkaEventBus!=null&&config!=null && config.RegisterAssemblies != null) + kafkaEventBus.RegistSubscriptions(config.RegisterAssemblies); + + return app; + + } + } +} diff --git a/WorkFlowCore/WorkFlowCore/EventBus/IEventBus.cs b/WorkFlowCore/WorkFlowCore/EventBus/IEventBus.cs new file mode 100644 index 0000000000000000000000000000000000000000..60f2940f33e74171f4104de4e4d5ccec207f8718 --- /dev/null +++ b/WorkFlowCore/WorkFlowCore/EventBus/IEventBus.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace WorkFlowCore.EventBus +{ + public interface IEventBus + { + void SubscribeEventHandler(Type eventDataType, Type handlerType); + void UnsubscribeEventHandler(Type eventDataType, Type handlerType); + void SubscribeEventHandler() where THandler : IEventHandler where TData : BaseEventData; + void UnsubscribeEventHandler() where THandler : IEventHandler where TData : BaseEventData; + void Trigger(TData data); + } +} diff --git a/WorkFlowCore/WorkFlowCore/EventBus/IEventHandler.cs b/WorkFlowCore/WorkFlowCore/EventBus/IEventHandler.cs new file mode 100644 index 0000000000000000000000000000000000000000..4f92c5ddbc00eaa65f35c91cd881ccd754e3c7c4 --- /dev/null +++ b/WorkFlowCore/WorkFlowCore/EventBus/IEventHandler.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace WorkFlowCore.EventBus +{ + public interface IEventHandler + { + + } + + public interface IEventHandler: IEventHandler where TData:BaseEventData + { + void Handle(TData data); + } +} diff --git a/WorkFlowCore/WorkFlowCore/EventBus/Implements/Default/DefaultEventBus.cs b/WorkFlowCore/WorkFlowCore/EventBus/Implements/Default/DefaultEventBus.cs new file mode 100644 index 0000000000000000000000000000000000000000..85588cfdd9b2c3ab2e3e13a54ecff9fb44022227 --- /dev/null +++ b/WorkFlowCore/WorkFlowCore/EventBus/Implements/Default/DefaultEventBus.cs @@ -0,0 +1,123 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Text; + +namespace WorkFlowCore.EventBus +{ + public class DefaultEventBus : IEventBus + { + private IServiceProvider serviceProvider; + private static object objLock = new object(); + + public DefaultEventBus(IServiceProvider serviceProvider) + { + this.serviceProvider = serviceProvider; + } + + private static Dictionary> eventSubscribes; + static DefaultEventBus() + { + eventSubscribes = new Dictionary>(); + } + + private static void Subscribe(Type eventDataType, Type handlerType) + { + lock (objLock) + { + if (!eventSubscribes.ContainsKey(eventDataType)) + eventSubscribes.Add(eventDataType, new List { }); + } + + eventSubscribes[eventDataType].Add(handlerType); + + + } + + private static void Unsubscribe(Type eventDataType, Type handlerType) + { + if (!eventSubscribes.ContainsKey(eventDataType)) return; + + if (eventSubscribes[eventDataType].Contains(handlerType)) + eventSubscribes[eventDataType].Remove(handlerType); + } + + public static void Subscribe() where THandler : IEventHandler where TData:BaseEventData + { + Subscribe(typeof(TData), typeof(THandler)); + } + public static void Subscribe() where EventHandler : IEventHandler + { + var handlerType = typeof(EventHandler); + Subscribe(handlerType); + } + + public static void Subscribe(Type handlerType) + { + var interfaceType = handlerType.GetInterfaces().FirstOrDefault(i => i.IsGenericType); + if (interfaceType != null) + { + var dataType = interfaceType.GetGenericArguments()[0]; + Subscribe(dataType, handlerType); + } + else throw new Exception($"{handlerType.FullName} 需实现 {typeof(IEventHandler<>).FullName}"); + } + + + + /// + /// 从 程序集注册 + /// + /// + public static void RegistSubscriptions(params Assembly[] assemblies) + { + foreach (var assembly in assemblies) + { + var types = assembly.GetTypes().Where(t => typeof(IEventHandler).IsAssignableFrom(t)); + + foreach (var type in types) + { + Subscribe(type); + } + } + } + + public void Trigger(TData data) + { + var eventDataType =typeof(TData); + if (!eventSubscribes.ContainsKey(eventDataType)) return; + var handlerTypes = eventSubscribes[eventDataType]; + handlerTypes.ForEach(handlerType => + { + using(var scope = serviceProvider.CreateScope()) + { + var handler = scope.ServiceProvider.GetService(handlerType); + handlerType.GetMethod("Handle", new Type[] { eventDataType }).Invoke(handler, new object[] { data }); + } + + }); + } + + public void SubscribeEventHandler(Type eventDataType, Type handlerType) + { + Subscribe(eventDataType, handlerType); + } + + public void SubscribeEventHandler() where THandler : IEventHandler where TData : BaseEventData + { + Subscribe(); + } + + public void UnsubscribeEventHandler(Type eventDataType, Type handlerType) + { + Unsubscribe(eventDataType, handlerType); + } + + public void UnsubscribeEventHandler() where THandler : IEventHandler where TData : BaseEventData + { + Unsubscribe(typeof(TData), typeof(THandler)); + } + } +} diff --git a/WorkFlowCore/WorkFlowCore/EventBus/Implements/Kafka/KafkaEventBus.cs b/WorkFlowCore/WorkFlowCore/EventBus/Implements/Kafka/KafkaEventBus.cs new file mode 100644 index 0000000000000000000000000000000000000000..5485ed0653993960e41101dfc74599fa119978ce --- /dev/null +++ b/WorkFlowCore/WorkFlowCore/EventBus/Implements/Kafka/KafkaEventBus.cs @@ -0,0 +1,222 @@ +using WorkFlowCore.EventBus.Implements.Kafka; +using Confluent.Kafka; +using Microsoft.Extensions.DependencyInjection; +using Newtonsoft.Json; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace WorkFlowCore.EventBus +{ + public class KafkaEventBus : IEventBus + { + private IServiceProvider serviceProvider; + private readonly KafkaEventConfig eventConfig; + private static object objLock = new object(); + + public KafkaEventBus(IServiceProvider serviceProvider, KafkaEventConfig eventConfig) + { + this.serviceProvider = serviceProvider; + this.eventConfig = eventConfig; + Console.WriteLine(eventConfig.Servers); + } + + private static Dictionary> eventSubscribes; + private static Dictionary eventSubscribeCancellationTokenSources; + static KafkaEventBus() + { + eventSubscribes = new Dictionary>(); + eventSubscribeCancellationTokenSources = new Dictionary(); + } + + private void SubscribeAsync(Type eventDataType, Type handlerType) + { + + var subscribesKey = eventDataType.FullName + handlerType.FullName; + + + if (eventSubscribes.ContainsKey(subscribesKey)) return; + //不做标记的不处理 + var topicAttr = eventDataType.GetCustomAttribute(); + if (topicAttr == null) return; + var toptic = string.IsNullOrEmpty(topicAttr.Topic)?eventDataType.FullName: topicAttr.Topic; + + var groupIdAttr = handlerType.GetCustomAttribute(); + if (groupIdAttr == null) return; + var groupId = string.IsNullOrEmpty(groupIdAttr.GroupId)?handlerType.FullName: groupIdAttr.GroupId; + var conf = new ConsumerConfig + { + GroupId = groupId, + BootstrapServers = eventConfig.Servers, + AutoOffsetReset = AutoOffsetReset.Earliest, + EnableAutoCommit = false, + }; + + CancellationTokenSource cts = new CancellationTokenSource(); + var c = new ConsumerBuilder(conf).Build(); + c.Subscribe(toptic); + lock (objLock) + { + if (!eventSubscribes.ContainsKey(subscribesKey)) + eventSubscribes.Add(subscribesKey, c); + if (!eventSubscribeCancellationTokenSources.ContainsKey(subscribesKey)) + eventSubscribeCancellationTokenSources.Add(subscribesKey, cts); + } + + try + { + while (!cts.IsCancellationRequested) + { + try + { + var cr = c.Consume(cts.Token); + Console.WriteLine($"Consumed message '{cr.Message}' at: '{cr.TopicPartitionOffset}'."); + + var data = JsonConvert.DeserializeObject(cr.Message.Value, eventDataType); + using (var scope = serviceProvider.CreateScope()) + { + var handler = scope.ServiceProvider.GetService(handlerType); + handlerType.GetMethod("Handle", new Type[] { eventDataType }).Invoke(handler, new object[] { data }); + } + c.Commit(cr); + } + catch (ConsumeException e) + { + Console.WriteLine($"Error occured: {e.Error.Reason}"); + } + catch (Exception e) + { + Console.WriteLine($"Error occured: {e.ToString()}"); + } + } + } + catch (OperationCanceledException) + { + // Ensure the consumer leaves the group cleanly and final offsets are committed. + c.Close(); + } + finally + { + if (c != null) + { + c.Close(); + c.Dispose(); + } + } + } + + + private void Subscribe(Type eventDataType, Type handlerType) + { + Task.Run(() => + { + SubscribeAsync(eventDataType, handlerType); + }); + } + + + + private void Unsubscribe(Type eventDataType, Type handlerType) + { + var subscribesKey = eventDataType.FullName + handlerType.FullName; + + if (eventSubscribes.ContainsKey(subscribesKey)) eventSubscribes[subscribesKey].Unsubscribe(); + if (eventSubscribeCancellationTokenSources.ContainsKey(subscribesKey)) eventSubscribeCancellationTokenSources[subscribesKey].Cancel(); + + } + + public void Subscribe() where THandler : IEventHandler where TData : BaseEventData + { + Subscribe(typeof(TData), typeof(THandler)); + } + public void Subscribe() where EventHandler : IEventHandler + { + var handlerType = typeof(EventHandler); + Subscribe(handlerType); + } + + public void Subscribe(Type handlerType) + { + var interfaceType = handlerType.GetInterfaces().FirstOrDefault(i => i.IsGenericType); + if (interfaceType != null) + { + var dataType = interfaceType.GetGenericArguments()[0]; + Subscribe(dataType, handlerType); + } + else throw new Exception($"{handlerType.FullName} 需实现 {typeof(IEventHandler<>).FullName}"); + } + + + + /// + /// 从 程序集注册 + /// + /// + public void RegistSubscriptions(params Assembly[] assemblies) + { + foreach (var assembly in assemblies) + { + var types = assembly.GetTypes().Where(t => typeof(IEventHandler).IsAssignableFrom(t)); + + foreach (var type in types) + { + Subscribe(type); + } + } + } + + private void TriggerEvent(TData data) + { + if (data == null) return; + var conf = new ProducerConfig { BootstrapServers = eventConfig.Servers }; + + Action> handler = r => + Console.WriteLine(!r.Error.IsError + ? $"Delivered message to {r.TopicPartitionOffset}" + : $"Delivery Error: {r.Error.Reason}"); + //不做标记不处理 + var topicAttr = typeof(TData).GetCustomAttribute(); + var toptic = topicAttr != null ? topicAttr.Topic : typeof(TData).FullName; + + using (var p = new ProducerBuilder(conf).Build()) + { + p.Produce(toptic, new Message { Value = JsonConvert.SerializeObject(data) }, handler); + + // wait for up to 10 seconds for any inflight messages to be delivered. + p.Flush(TimeSpan.FromSeconds(10)); + } + } + + public void Trigger(TData data) + { + Task.Run(() => + { + TriggerEvent(data); + }); + } + + public void SubscribeEventHandler(Type eventDataType, Type handlerType) + { + Subscribe(eventDataType, handlerType); + } + + public void SubscribeEventHandler() where THandler : IEventHandler where TData : BaseEventData + { + Subscribe(); + } + + public void UnsubscribeEventHandler(Type eventDataType, Type handlerType) + { + Unsubscribe(eventDataType, handlerType); + } + + public void UnsubscribeEventHandler() where THandler : IEventHandler where TData : BaseEventData + { + Unsubscribe(typeof(TData), typeof(THandler)); + } + } +} diff --git a/WorkFlowCore/WorkFlowCore/EventBus/Implements/Kafka/KafkaEventConfig.cs b/WorkFlowCore/WorkFlowCore/EventBus/Implements/Kafka/KafkaEventConfig.cs new file mode 100644 index 0000000000000000000000000000000000000000..2c262d2ec31b5c3b62ac31848f287c9c39a3c680 --- /dev/null +++ b/WorkFlowCore/WorkFlowCore/EventBus/Implements/Kafka/KafkaEventConfig.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Text; + +namespace WorkFlowCore.EventBus.Implements.Kafka +{ + public class KafkaEventConfig + { + public string Servers { get; set; } + public Assembly[] RegisterAssemblies { get; set; } + } +} diff --git a/WorkFlowCore/WorkFlowCore/EventBus/Implements/Kafka/KafkaEventConsumerAttribute.cs b/WorkFlowCore/WorkFlowCore/EventBus/Implements/Kafka/KafkaEventConsumerAttribute.cs new file mode 100644 index 0000000000000000000000000000000000000000..4223a8cbac80bf2d615e5f552d3c59dab30bcd95 --- /dev/null +++ b/WorkFlowCore/WorkFlowCore/EventBus/Implements/Kafka/KafkaEventConsumerAttribute.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace WorkFlowCore.EventBus.Implements.Kafka +{ + public class KafkaEventConsumerAttribute : Attribute + { + public string GroupId { get; set; } + + public KafkaEventConsumerAttribute(string groupId=null) + { + GroupId = groupId; + } + } +} diff --git a/WorkFlowCore/WorkFlowCore/EventBus/Implements/Kafka/KafkaEventTopicAttribute.cs b/WorkFlowCore/WorkFlowCore/EventBus/Implements/Kafka/KafkaEventTopicAttribute.cs new file mode 100644 index 0000000000000000000000000000000000000000..a9c5643ddae60cd09432a00e56177ce388c7fb29 --- /dev/null +++ b/WorkFlowCore/WorkFlowCore/EventBus/Implements/Kafka/KafkaEventTopicAttribute.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace WorkFlowCore.EventBus.Implements.Kafka +{ + public class KafkaEventTopicAttribute: Attribute + { + public string Topic { get; set; } + + public KafkaEventTopicAttribute(string topic=null) + { + Topic = topic; + } + } +} diff --git a/WorkFlowCore/WorkFlowCore/WorkflowManager.cs b/WorkFlowCore/WorkFlowCore/WorkflowManager.cs index e76c1a17b30caa9f30348a5eafafd9790df6638e..6d074c9401085b1a80c88c19745d822d0174106f 100644 --- a/WorkFlowCore/WorkFlowCore/WorkflowManager.cs +++ b/WorkFlowCore/WorkFlowCore/WorkflowManager.cs @@ -318,6 +318,8 @@ namespace WorkFlowCore var workflowVersion = await GetWorkflowVersion(workTask.WorkflowId.Id, workTask.WorkflowId.VersionId); var preSteps = (await workStepRepository.GetListAsync(ws => ws.GroupId == currentWorkStep.PreStepGroupId)).Select(s => s.ToWorkStep()).ToList(); + + //按节点和审批人员取最新的审批记录 preSteps = preSteps.GroupBy(s => s.NodeId + s.HandleUser.ToString()).Select(g => g.OrderByDescending(step => step.CreationTime).First()).ToList(); var rejectNodes = new List(); @@ -454,7 +456,8 @@ namespace WorkFlowCore using (var unitOfWork = unitOfWorkManager.Begin()) { await workTaskRepository.UpdateAsync(workTask.ToWorkTaskInfo(workTaskInfo)); - await SendTasks(workTask, steps, workTask.FormData); + steps.ForEach(step => step.SetFormData(workTask.FormData)); + await SendTasks(workTask, steps); unitOfWork.Commit(); } @@ -512,7 +515,7 @@ namespace WorkFlowCore if (currentWorkStep.IsHandled) return ProveResult.Failed("步骤已处理!"); - + //派发下个节点任务 @@ -520,6 +523,9 @@ namespace WorkFlowCore var workTask = workTaskInfo.ToWorkTask(); var currentNode = await GetNodeByWorkflowIdAndNodeId(workTask.WorkflowId, currentWorkStep.NodeId); var steps = new List(); + + //在根据条件获取下一组审批步骤之前,先指定当前的审批表单(当前提交的),以便从更多维度决定下一组处理的节点 + currentWorkStep.SetFormData(formData); var nextNodes = await GetNextNodes(currentNode, workTask, currentWorkStep); //更新当前处理节点为已处理 @@ -585,7 +591,7 @@ namespace WorkFlowCore if (steps.Count == 0 && currentNode.NodeType != WorkNodeType.End) return ProveResult.Failed("找不到可以处理的下一个步骤!"); - await SendTasks(workTask, steps, formData); + await SendTasks(workTask, steps); if (!unitOfWork.Commit()) ProveResult.Failed("提交失败"); @@ -650,6 +656,9 @@ namespace WorkFlowCore await workStepRepository.UpdateAsync(currentWorkStep.ToWorkStepInfo(currentWorkStepInfo)); var steps = new List(); + //在根据条件获取下一组审批步骤之前,先指定当前的审批表单(当前提交的),以便从更多维度决定下一组处理的节点 + currentWorkStep.SetFormData(formData); + var nextNodes = await GetNextNodes(currentNode, workTask, currentWorkStep); //如果下一个节点是会签,则同组的步骤都要撤回 if (nextNodes.Count == 1 && nextNodes[0].NodeType == WorkNodeType.Sign) @@ -673,7 +682,7 @@ namespace WorkFlowCore await CheckAndTaskPendding(workTask, workTaskInfo, startNode); } - await SendTasks(workTask, steps, formData); + await SendTasks(workTask, steps); return unitOfWork.Commit(ProveResult.Succeed(steps), ProveResult.Failed("提交失败")); } @@ -981,15 +990,13 @@ namespace WorkFlowCore /// /// /// - /// /// - private async Task SendTasks(WorkTask workTask, List workSteps, string formData = null) + private async Task SendTasks(WorkTask workTask, List workSteps) { foreach (var item in workSteps) { item.IsHandled = false; - item.SetFormData(formData); await workStepRepository.InsertAsync(item.ToWorkStepInfo()); //TODO 发布开启任务事件 //TODO 发布开启消息 diff --git a/web/src/views/workflows/workFlowSimulation/index.vue b/web/src/views/workflows/workFlowSimulation/index.vue index 41debc6df7b9a4bfe9f8a407bd472c8de895672c..bcee055e15c9211ff62aeda8b046c88b30b6d8c5 100644 --- a/web/src/views/workflows/workFlowSimulation/index.vue +++ b/web/src/views/workflows/workFlowSimulation/index.vue @@ -94,6 +94,14 @@
+ +
+