Cloudbreak Flow engine is responsible for manage the different execution flows of Cloudbreak's microservices using the Spring StateMachine project and async event handling.
If you would like to use the Flow engine in your application, you must implement the following interfaces:
ApplicationFlowInformation
You must implement the following interfaces and abstract classes per flow:
FlowState
: define the states of the flowFlowEvent
: define the events between the flow statesAbstractFlowConfiguration
: configure the statemachine with its states and its transitions
FlowEdgeConfig
failureHandled
eventAbstractAction
implementations for the different flow states.You can implement the following interfaces for the flow to be more flexible:
CommonContext
.PayloadConverter
which can be used to convert different payloads to the one which is appropriate for the state's action.State
in the flow:
Action
associated with the state will send the trigger Event
to its Reactor Handler
.Let's define flow states for our flow. You have to implement the FlowState interface and define your state enums.
We will have the following states:
INIT_STATE
HELLO_WORLD_START_STATE
HELLO_WORLD_FINISHED_STATE
HELLO_WORLD_FAILED_STATE
FINAL_STATE
In code:
public enum HelloWorldState implements FlowState {
INIT_STATE,
HELLO_WORLD_START_STATE,
HELLO_WORLD_FINISHED_STATE,
HELLO_WORLD_FAILED_STATE,
FINAL_STATE
}
You can define restart actions for your flow, which will be used when you restart a failed flow.
For example, in the Data Lake service we are using FillInMemoryStateStoreRestartAction
.
It will update the data lake’s status in the DatalakeInMemoryStateStore
to CANCELLED
or POLLABLE
based on the status.
We are using this to cancel running flows. If you have a data lake install flow, and it is polling Cloudbreak for stack status, then if you send a terminate
request we cancel the installation flow. It happens through this in-memory store.
You can override restartAction
this way:
@Override
public Class<? extends RestartAction> restartAction() {
return FillInMemoryStateStoreRestartAction.class;
}
FlowEvent
s are sent to the flow engine, which changes its state based on these events.
public enum HelloWorldEvent implements FlowEvent {
HELLOWORLD_TRIGGER_EVENT,
HELLOWORLD_FINISHED_EVENT,
FINALIZE_HELLOWORLD_EVENT,
HELLOWORLD_SOMETHING_WENT_WRONG,
HELLOWORLD_FAILHANDLED_EVENT;
private String selector;
HelloWorldEvent() {
this.selector = name();
}
@Override
public String event() {
return selector;
}
}
We have to define which state follows which state. To do that, we implement a Flow config, which extends from AbstractFlowConfiguration
.
If you want it to be retryable, then implement RetryableFlowConfiguration
.
First define transition with Transition.Builder
class. It is a generic class with two type. First type is the state enum, second type is the event enum.
Define the default failure event, in our case it is HELLOWORLD_SOMETHING_WENT_WRONG
. Then you can define state changes in your flow. Every state changes needs
a from - to on which event and what event will be triggered on failure. It can be the defaultFailureEvent
or you can define a specific failure event like
HELLOWORLD_FIRST_STEP_WENT_WRONG_EVENT
in the second step.
You have to define flow edge config, it will define what is your init and final state, the default failure state and the fail handled event. We need fail handled event for restart. If this event arrives it means we handled failure correctly so it can be restarted.
getEvents
method should return your events, getInitEvents
should return your initial event(s) and getRetryableEvent
returns the event we can recover from.
@Component
public class HelloWorldFlowConfig extends AbstractFlowConfiguration<HelloWorldState, HelloWorldEvent> implements RetryableFlowConfiguration<HelloWorldEvent> {
private static final List<Transition<HelloWorldState, HelloWorldEvent>> TRANSITIONS = new Transition.Builder<HelloWorldState, HelloWorldEvent>()
.defaultFailureEvent(HELLOWORLD_SOMETHING_WENT_WRONG)
.from(INIT_STATE)
.to(HELLO_WORLD_FIRST_STEP_STATE)
.event(HELLOWORLD_TRIGGER_EVENT)
.noFailureEvent()
.from(HELLO_WORLD_FIRST_STEP_STATE)
.to(HELLO_WORLD_SECOND_STEP_STATE)
.event(HELLOWORLD_FIRST_STEP_FINISHED_EVENT)
.failureEvent(HELLOWORLD_FIRST_STEP_WENT_WRONG_EVENT)
.from(HELLO_WORLD_SECOND_STEP_STATE)
.to(HELLO_WORLD_FINISHED_STATE)
.event(HELLOWORLD_SECOND_STEP_FINISHED_EVENT)
.defaultFailureEvent()
.from(HELLO_WORLD_FINISHED_STATE)
.to(FINAL_STATE)
.event(FINALIZE_HELLOWORLD_EVENT)
.defaultFailureEvent()
.build();
private static final FlowEdgeConfig<HelloWorldState, HelloWorldEvent> EDGE_CONFIG = new FlowEdgeConfig<>(INIT_STATE, FINAL_STATE, HELLO_WORLD_FAILED_STATE, HELLOWORLD_FAILHANDLED_EVENT);
public HelloWorldFlowConfig() {
super(HelloWorldState.class, HelloWorldEvent.class);
}
@Override
public HelloWorldEvent[] getEvents() {
return HelloWorldEvent.values();
}
@Override
public HelloWorldEvent[] getInitEvents() {
return new HelloWorldEvent[] {
HELLOWORLD_TRIGGER_EVENT
};
}
@Override
public String getDisplayName() {
return "Hello world";
}
@Override
public HelloWorldEvent getRetryableEvent() {
return HELLOWORLD_FAILHANDLED_EVENT;
}
@Override
protected List<Transition<HelloWorldState, HelloWorldEvent>> getTransitions() {
return TRANSITIONS;
}
@Override
protected FlowEdgeConfig<HelloWorldState, HelloWorldEvent> getEdgeConfig() {
return EDGE_CONFIG;
}
}
We will implement the actions for flow states. Let’s see first state HELLO_WORLD_FIRST_STEP_STATE
. In flow config we have the following lines:
.from(INIT_STATE)
.to(HELLO_WORLD_FIRST_STEP_STATE)
.event(HELLOWORLD_TRIGGER_EVENT)
So flow will move from INIT_STATE
to HELLO_WORLD_FIRST_STEP_STATE
when HELLOWORLD_TRIGGER_EVENT
arrives. This is how you can move from one state to another.
As you can remember in FlowEdgeConfig
we defined the start event for our new flow:
@Override
public HelloWorldEvent[] getInitEvents() {
return new HelloWorldEvent[] {
HELLOWORLD_TRIGGER_EVENT
};
}
We will implement actions in HelloWorldActions
class. You can see below we have a Bean with name HELLO_WORLD_FIRST_STEP_STATE
. This is the first action in
our flow. In doExecute you can do the implementation for this state. This time we just print a log message and send an
HelloWorldFirstStepLongLastingTaskTriggerEvent
, but why we do this? This is where handlers come into picture.
@Configuration
public class HelloWorldActions {
private static final Logger LOGGER = LoggerFactory.getLogger(HelloWorldActions.class);
@Bean(name = "HELLO_WORLD_FIRST_STEP_STATE")
public Action<?, ?> firstStep() {
return new AbstractHelloWorldAction<>(HelloWorldFlowTrigger.class) {
@Override
protected void doExecute(HelloWorldContext context, HelloWorldFlowTrigger payload, Map<Object, Object> variables) {
LOGGER.info("Hello world first step in progress, we are sending an event for a handler, because it is a long lasting task");
HelloWorldFirstStepLongLastingTaskTriggerEvent taskTriggerEvent = new HelloWorldFirstStepLongLastingTaskTriggerEvent(payload.getResourceId());
sendEvent(context, taskTriggerEvent);
}
};
}
@Bean(name = "HELLO_WORLD_FIRST_STEP_FAILED_STATE")
public Action<?, ?> firstStepFailedAction() {
return new AbstractHelloWorldAction<>(HelloWorldFirstStepLongLastingTaskFailureResponse.class) {
@Override
protected void doExecute(HelloWorldContext context, HelloWorldFirstStepLongLastingTaskFailureResponse payload, Map<Object, Object> variables) {
sendEvent(context, HELLOWORLD_FAILHANDLED_EVENT.event(), payload);
}
};
}
@Bean(name = "HELLO_WORLD_SECOND_STEP_STATE")
public Action<?, ?> secondStep() {
return new AbstractHelloWorldAction<>(HelloWorldFirstStepLongLastingTaskSuccessResponse.class) {
@Override
protected void doExecute(HelloWorldContext context, HelloWorldFirstStepLongLastingTaskSuccessResponse payload, Map<Object, Object> variables) {
LOGGER.info("Hello world second step in progress..");
HelloWorldSecondStepSuccessful helloWorldSecondStepSuccessful = new HelloWorldSecondStepSuccessful(payload.getResourceId());
sendEvent(context, HELLOWORLD_SECOND_STEP_FINISHED_EVENT.event(), helloWorldSecondStepSuccessful);
}
};
}
@Bean(name = "HELLO_WORLD_FINISHED_STATE")
public Action<?, ?> finishedAction() {
return new AbstractHelloWorldAction<>(HelloWorldSecondStepSuccessful.class) {
@Override
protected void doExecute(HelloWorldContext context, HelloWorldSecondStepSuccessful payload, Map<Object, Object> variables) {
LOGGER.info("Hello world finished!");
sendEvent(context, FINALIZE_HELLOWORLD_EVENT.event(), payload);
}
};
}
@Bean(name = "HELLO_WORLD_FAILED_STATE")
public Action<?, ?> failedAction() {
return new AbstractHelloWorldAction<>(HelloWorldFailedEvent.class) {
@Override
protected void doExecute(HelloWorldContext context, HelloWorldFailedEvent payload, Map<Object, Object> variables) {
sendEvent(context, HELLOWORLD_FAILHANDLED_EVENT.event(), payload);
}
};
}
private abstract static class AbstractHelloWorldAction<P extends Payload> extends AbstractAction<HelloWorldState, HelloWorldEvent, HelloWorldContext, P> {
protected AbstractHelloWorldAction(Class<P> payloadClass) {
super(payloadClass);
}
@Override
protected HelloWorldContext createFlowContext(FlowParameters flowParameters, StateContext<HelloWorldState, HelloWorldEvent> stateContext, P payload) {
return new HelloWorldContext(flowParameters);
}
@Override
protected Object getFailurePayload(P payload, Optional<HelloWorldContext> flowContext, Exception ex) {
return new HelloWorldFailedEvent(payload.getResourceId(), ex);
}
}
}
getFailurePayload
method is called when somehow an unexpected exception happens in your action's doExecute
. It should return with your failure payload,
usually it contains an exception.
If you have to do something that can take a long time, like a remote call or polling something, then you shouldn’t do it in an action's doExecute
method.
If you do this then it will consume one reactor thread and after a while it can happen there will be no available threads for Reactor to process events for new
flows. For this reason you should create a Handler. Handlers should extend from ExceptionCatcherEventHandler
.
You have to implement selector method to define which event your handler will listen on. In our case this event is
HelloWorldFirstStepLongLastingTaskTriggerEvent
. You have to define a default failure event also, it will be sent if some unexpected and unhandled exception
happening in your handler. In doAccept
you can do your long lasting actions like polling or rest calls and you have to return with an event you want to send
back to reactor. In this scenario we send a HelloWorldFirstStepLongLastingTaskSuccessResponse
in successful case, and a
HelloWorldFirstStepLongLastingTaskFailureResponse
in failed case. It is very common to put results into these responses for next flow steps.
@Component
public class HelloWorldFirstStepLongLastingTaskHandler extends ExceptionCatcherEventHandler<HelloWorldFirstStepLongLastingTaskTriggerEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(HelloWorldFirstStepLongLastingTaskHandler.class);
@Override
public String selector() {
return HelloWorldFirstStepLongLastingTaskTriggerEvent.class.getSimpleName();
}
@Override
protected Selectable defaultFailureEvent(Long resourceId, Exception e) {
return new HelloWorldFirstStepLongLastingTaskFailureResponse(resourceId, e);
}
@Override
protected Selectable doAccept(HandlerEvent event) {
HelloWorldFirstStepLongLastingTaskTriggerEvent helloWorldReactorEvent = event.getData();
Long resourceId = helloWorldReactorEvent.getResourceId();
try {
LOGGER.info("Long lasting task execution...");
return new HelloWorldFirstStepLongLastingTaskSuccessResponse(resourceId);
} catch (RuntimeException ex) {
LOGGER.info("Long lasting task execution failed. Cause: ", ex);
return new HelloWorldFirstStepLongLastingTaskFailureResponse(resourceId, ex);
}
}
}
You can check the second step is HELLO_WORLD_SECOND_STEP_STATE
in our flow. It is waiting for a HelloWorldFirstStepLongLastingTaskSuccessResponse
payload,
we sent from handler previously. It is a very simple step also, it’s just print a log, then sends a HELLOWORLD_SECOND_STEP_FINISHED_EVENT
, so our flow can
step into HELLO_WORLD_FINISHED_STATE
where our flow ends.
Documentation is under construction...
Currently not supported, but spring statemachine supports fork and join states
Flows currently are not per state distributed. Spring statemachine has distributed implementation but not released yet: https://docs.spring.io/spring-statemachine/docs/2.1.3.RELEASE/reference/#sm-distributed
Cloudbreak’s flow engine has a concept for executing/connecting flows and executing them in the desired order with shared context that’s called flow chain.
Flow chains are not just able to connect different flows but also connect/execute other flow chains which makes it possible to orchestrate quite complex workflows with dynamic set up. In order to do this the com.sequenceiq.flow.core.chain.FlowEventChainFactory
interface needs to be implemented that orchestrate the contained flows, flow chains and similarly to the flows it is also event driven, just it configures the shared context with the necessary flows and chains dynamically.
As the Flows and their configuration are statically defined within the code base, those configs could be used to generate graphs as visualization of the underlying workflows for easier understanding. Unfortunately Flow Chains are the opposite, they are so dynamic that only with covering unit tests we found a way to build the underlying state machine and workflows to be able to have visualization about them. This way not all of them has a graph yet. There is a drive folder that's intended to be a store of flow and flow chain graphs by Cloudbreak versions.
So if you were able to generate for a fresh CB version with the following mechanism, then do not hesitate to copy the generated pictures to a new folder in the Drive folder.
A separate Make goal has been created to do this from the Cloudbreak project root. Of course if a new module/micros-service is introduced then it needs to updated after creating the generator caller class in the new module.
make generate-flow-graphs
Some of the most simple flow chains covered by the previous step by the more dynamic ones the unit test of the module need to be executed which could simply be done by a gradle clean build for example.
The previously mentioned steps only generate the graph descriptor files in DOT format and to have real visualization the following Make goal from the Cloudbreak project root should be executed:
make generate-flow-graph-pictures
As the Gradle tasks may erase the build directories of the modules the following execution order makes more sense:
./gradlew clean build
make generate-flow-graphs
make generate-flow-graph-pictures
A basic Util class has been introduced to do this on the easiest way. It looks like the following at the end of one of the unit tests. Example: com.sequenceiq.cloudbreak.core.flow2.chain.MultiHostgroupDownscaleFlowEventChainFactoryTest
FlowChainConfigGraphGeneratorUtil.generateFor(
underTest, //The FlowEventChainFactory instance with the built context
FLOW_CONFIGS_PACKAGE, //The package where the relevant Flow and Flow Chain configs are
flowTriggerQueue, //The flow trigger queue which is orchestrated by the tested factory during the test run
"FULL_DOWNSCALE"); //File name suffix to be able to differentiate graphs with the same factory but different context setup
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。