From dbb5d06f339c6a69fad3a175faf3756cc5b29e7d Mon Sep 17 00:00:00 2001 From: Zhang Wenhao Date: Wed, 18 Mar 2026 18:29:52 +0800 Subject: [PATCH] [core]: improve SimpleFlowChain * Added named constructors, factory methods, `then/error/done` overloads, and asynchronous backup collection to SimpleFlowChain; * Added the FlowBuilder stream builder and the `Flow.of` factory method to Flow, allowing for the configuration of `skip/run/rollback` behavior and the generation of anonymous Flow implementations. Related: ZSV-5936 Change-Id: I776b7374716365687279697063726f7a7167736f --- .../zstack/core/workflow/SimpleFlowChain.java | 59 ++++++++++ .../org/zstack/header/core/workflow/Flow.java | 101 ++++++++++++++++++ 2 files changed, 160 insertions(+) diff --git a/core/src/main/java/org/zstack/core/workflow/SimpleFlowChain.java b/core/src/main/java/org/zstack/core/workflow/SimpleFlowChain.java index 4ee9498b1b9..683f2580154 100755 --- a/core/src/main/java/org/zstack/core/workflow/SimpleFlowChain.java +++ b/core/src/main/java/org/zstack/core/workflow/SimpleFlowChain.java @@ -8,6 +8,7 @@ import org.zstack.core.CoreGlobalProperty; import org.zstack.core.Platform; import org.zstack.core.errorcode.ErrorFacade; +import org.zstack.header.core.AsyncBackup; import org.zstack.header.core.progress.ProgressFlowChainProcessorFactory; import org.zstack.header.core.workflow.*; import org.zstack.header.errorcode.ErrorCode; @@ -23,6 +24,7 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; import java.util.function.Function; import static org.zstack.core.Platform.inerr; @@ -62,6 +64,7 @@ public class SimpleFlowChain implements FlowTrigger, FlowRollback, FlowChain, Fl private List> afterDone = new ArrayList<>(); private List> afterError = new ArrayList<>(); private List> afterFinal = new ArrayList<>(); + private List asyncBackups = new ArrayList<>(); @Autowired(required = false) ProgressFlowChainProcessorFactory progressFactory; @@ -131,11 +134,20 @@ public SimpleFlowChain() { id = "FCID_" + Platform.getUuid().substring(0, 8); } + public SimpleFlowChain(String chainName) { + this(); + this.name = chainName; + } + public SimpleFlowChain(Map data) { id = "FCID_" + Platform.getUuid().substring(0, 8); this.data.putAll(data); } + public static SimpleFlowChain of(String chainName) { + return new SimpleFlowChain(chainName); + } + @Override public List getFlows() { return flows; @@ -228,6 +240,12 @@ public SimpleFlowChain then(Flow flow) { return this; } + public SimpleFlowChain then(String flowName, Consumer consumer) { + return then(Flow.of(flowName) + .handle(consumer) + .build()); + } + public SimpleFlowChain ctxHandler(FlowContextHandler handler) { DebugUtils.Assert(contextHandler==null, "there has been an FlowContextHandler installed"); contextHandler = handler; @@ -240,6 +258,21 @@ public SimpleFlowChain error(FlowErrorHandler handler) { return this; } + @SuppressWarnings("rawtypes") + public SimpleFlowChain error(Consumer handler) { + AsyncBackup firstAsyncBackup = this.asyncBackups.isEmpty() ? null : this.asyncBackups.get(0); + AsyncBackup[] otherAsyncBackups = this.asyncBackups.isEmpty() ? new AsyncBackup[0] : + this.asyncBackups.subList(1, this.asyncBackups.size()).toArray(new AsyncBackup[0]); + + DebugUtils.Assert(handler != null, "handler of errorHandler should not be null"); + return error(new FlowErrorHandler(firstAsyncBackup, otherAsyncBackups) { + @Override + public void handle(ErrorCode errCode, Map data) { + handler.accept(errCode); + } + }); + } + @Override public FlowChain Finally(FlowFinallyHandler handler) { finallyHandler = handler; @@ -285,6 +318,17 @@ public Map getData() { return this.data; } + public SimpleFlowChain propagateExceptionTo(AsyncBackup... backups) { + DebugUtils.Assert(backups != null, "backups in methods propagateExceptionTo() must be not null"); + DebugUtils.Assert(Arrays.stream(backups).noneMatch(Objects::isNull), + "backups in propagateExceptionTo() should not contain null elements"); + DebugUtils.Assert(doneHandler == null, "propagateExceptionTo() must be called before SimpleFlowChain.done()"); + DebugUtils.Assert(errorHandler == null, "propagateExceptionTo() must be called before SimpleFlowChain.error()"); + DebugUtils.Assert(finallyHandler == null, "propagateExceptionTo() must be called before SimpleFlowChain.Finally()"); + this.asyncBackups.addAll(Arrays.asList(backups)); + return this; + } + @Override public SimpleFlowChain done(FlowDoneHandler handler) { DebugUtils.Assert(doneHandler==null, "there has been a FlowDoneHandler installed"); @@ -292,6 +336,21 @@ public SimpleFlowChain done(FlowDoneHandler handler) { return this; } + @SuppressWarnings("rawtypes") + public SimpleFlowChain done(Runnable runnable) { + AsyncBackup firstAsyncBackup = this.asyncBackups.isEmpty() ? null : this.asyncBackups.get(0); + AsyncBackup[] otherAsyncBackups = this.asyncBackups.isEmpty() ? new AsyncBackup[0] : + this.asyncBackups.subList(1, this.asyncBackups.size()).toArray(new AsyncBackup[0]); + + DebugUtils.Assert(runnable != null, "runnable of doneHandler should not be null"); + return done(new FlowDoneHandler(firstAsyncBackup, otherAsyncBackups) { + @Override + public void handle(Map data) { + runnable.run(); + } + }); + } + private void collectAfterRunnable(Flow flow) { List ad = FieldUtils.getAnnotatedFieldsOnThisClass(AfterDone.class, flow.getClass()); for (Field f : ad) { diff --git a/header/src/main/java/org/zstack/header/core/workflow/Flow.java b/header/src/main/java/org/zstack/header/core/workflow/Flow.java index 04f304af071..440769ecfe8 100755 --- a/header/src/main/java/org/zstack/header/core/workflow/Flow.java +++ b/header/src/main/java/org/zstack/header/core/workflow/Flow.java @@ -1,8 +1,12 @@ package org.zstack.header.core.workflow; +import org.zstack.utils.DebugUtils; import org.zstack.utils.FieldUtils; import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Predicate; public interface Flow { void run(FlowTrigger trigger, Map data); @@ -23,4 +27,101 @@ default String name() { } return String.format("%s", this.getClass().getSimpleName()); } + + @SuppressWarnings("rawtypes") + public static class FlowBuilder { + public final String flowName; + private Predicate skipPredicate; + private BiConsumer triggerConsumer; + private BiConsumer rollbackConsumer; + + private FlowBuilder(String flowName) { + DebugUtils.Assert(flowName != null, "flowName should not be null"); + this.flowName = flowName; + } + + public FlowBuilder withSkipPredicate(Predicate predicate) { + DebugUtils.Assert(predicate != null, "skipPredicate of FlowBuilder should not be null"); + this.skipPredicate = predicate; + return this; + } + + public FlowBuilder skipIf(Predicate predicate) { + return withSkipPredicate(predicate); + } + + public FlowBuilder runIf(Predicate predicate) { + DebugUtils.Assert(predicate != null, "predicate of FlowBuilder.runIf() should not be null"); + return withSkipPredicate(predicate.negate()); + } + + public FlowBuilder handle(BiConsumer consumer) { + DebugUtils.Assert(consumer != null, "consumer of FlowBuilder.handle() should not be null"); + this.triggerConsumer = consumer; + return this; + } + + public FlowBuilder handle(Consumer consumer) { + DebugUtils.Assert(consumer != null, "consumer of FlowBuilder.handle() should not be null"); + this.triggerConsumer = (trigger, data) -> consumer.accept(trigger); + return this; + } + + public FlowBuilder rollback(BiConsumer consumer) { + DebugUtils.Assert(consumer != null, "consumer of FlowBuilder.rollback() should not be null"); + this.rollbackConsumer = consumer; + return this; + } + + public FlowBuilder rollback(Consumer consumer) { + DebugUtils.Assert(consumer != null, "consumer of FlowBuilder.rollback() should not be null"); + this.rollbackConsumer = (trigger, data) -> consumer.accept(trigger); + return this; + } + + public Flow build() { + DebugUtils.Assert(triggerConsumer != null, "handle() must be called before build()"); + Predicate skipPredicateSnapshot = skipPredicate; + BiConsumer triggerConsumerSnapshot = triggerConsumer; + BiConsumer rollbackConsumerSnapshot = rollbackConsumer; + + return new Flow() { + @Override + public boolean skip(Map data) { + if (skipPredicateSnapshot == null) { + return false; + } + return skipPredicateSnapshot.test(data); + } + + @Override + public void run(FlowTrigger trigger, Map data) { + triggerConsumerSnapshot.accept(trigger, data); + } + + @Override + public void rollback(FlowRollback trigger, Map data) { + if (rollbackConsumerSnapshot == null) { + trigger.rollback(); + } else { + rollbackConsumerSnapshot.accept(trigger, data); + } + } + + @Override + public String name() { + return flowName; + } + + @Override + public String toString() { + return name(); + } + }; + } + } + + public static FlowBuilder of(String flowName) { + return new FlowBuilder(flowName); + } }