Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions core/src/main/java/org/zstack/core/workflow/SimpleFlowChain.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.zstack.core.CoreGlobalProperty;
import org.zstack.core.Platform;
import org.zstack.core.errorcode.ErrorFacade;
import org.zstack.header.core.AsyncBackup;
import org.zstack.header.core.progress.ProgressFlowChainProcessorFactory;
import org.zstack.header.core.workflow.*;
import org.zstack.header.errorcode.ErrorCode;
Expand All @@ -23,6 +24,7 @@
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.zstack.core.Platform.inerr;
Expand Down Expand Up @@ -62,6 +64,7 @@ public class SimpleFlowChain implements FlowTrigger, FlowRollback, FlowChain, Fl
private List<List<Runnable>> afterDone = new ArrayList<>();
private List<List<Runnable>> afterError = new ArrayList<>();
private List<List<Runnable>> afterFinal = new ArrayList<>();
private List<AsyncBackup> asyncBackups = new ArrayList<>();

@Autowired(required = false)
ProgressFlowChainProcessorFactory progressFactory;
Expand Down Expand Up @@ -131,11 +134,20 @@ public SimpleFlowChain() {
id = "FCID_" + Platform.getUuid().substring(0, 8);
}

public SimpleFlowChain(String chainName) {
this();
this.name = chainName;
}

public SimpleFlowChain(Map<String, Object> data) {
id = "FCID_" + Platform.getUuid().substring(0, 8);
this.data.putAll(data);
}

public static SimpleFlowChain of(String chainName) {
return new SimpleFlowChain(chainName);
}

@Override
public List<Flow> getFlows() {
return flows;
Expand Down Expand Up @@ -228,6 +240,12 @@ public SimpleFlowChain then(Flow flow) {
return this;
}

public SimpleFlowChain then(String flowName, Consumer<FlowTrigger> consumer) {
return then(Flow.of(flowName)
.handle(consumer)
.build());
}

public SimpleFlowChain ctxHandler(FlowContextHandler handler) {
DebugUtils.Assert(contextHandler==null, "there has been an FlowContextHandler installed");
contextHandler = handler;
Expand All @@ -240,6 +258,21 @@ public SimpleFlowChain error(FlowErrorHandler handler) {
return this;
}

@SuppressWarnings("rawtypes")
public SimpleFlowChain error(Consumer<ErrorCode> handler) {
AsyncBackup firstAsyncBackup = this.asyncBackups.isEmpty() ? null : this.asyncBackups.get(0);
AsyncBackup[] otherAsyncBackups = this.asyncBackups.isEmpty() ? new AsyncBackup[0] :
this.asyncBackups.subList(1, this.asyncBackups.size()).toArray(new AsyncBackup[0]);

DebugUtils.Assert(handler != null, "handler of errorHandler should not be null");
return error(new FlowErrorHandler(firstAsyncBackup, otherAsyncBackups) {
@Override
public void handle(ErrorCode errCode, Map data) {
handler.accept(errCode);
}
});
}
Comment on lines +261 to +274
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

propagateExceptionTo() 目前只对新便捷重载生效,旧 handler 入口会静默失效

这里收集到的 asyncBackups 只被接到了 error(Consumer<ErrorCode>)done(Runnable) 上;如果调用方继续使用现有的 error(FlowErrorHandler)done(FlowDoneHandler)setFlow*Handler(...),链级别的 propagateExceptionTo() 配置不会生效,而且没有任何提示。建议要么统一在旧入口里做包装,要么在启用该配置后显式拒绝旧重载,避免链级配置静默无效。

Also applies to: 321-352

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/src/main/java/org/zstack/core/workflow/SimpleFlowChain.java` around
lines 261 - 274, The chain-level propagateExceptionTo() currently only wires
asyncBackups into the new convenience overloads (error(Consumer<ErrorCode>) and
done(Runnable)), leaving old handlers (error(FlowErrorHandler),
done(FlowDoneHandler), and any setFlow*Handler(...) variants) unaware of
asyncBackups; modify SimpleFlowChain so that when propagateExceptionTo() mode is
active you either (A) wrap legacy handlers (FlowErrorHandler, FlowDoneHandler)
with a thin adapter that receives the chain's asyncBackups (reuse AsyncBackup
firstAsyncBackup and otherAsyncBackups logic) and delegates to the original
handler, or (B) explicitly reject registration of legacy handlers by throwing an
IllegalStateException if propagateExceptionTo() is enabled; update both
error(FlowErrorHandler) and done(FlowDoneHandler)/setFlow*Handler(...) entry
points to apply the same wrapping or rejection so chain-level
propagateExceptionTo() behavior is consistent across all handler overloads.


@Override
public FlowChain Finally(FlowFinallyHandler handler) {
finallyHandler = handler;
Expand Down Expand Up @@ -285,13 +318,39 @@ public Map getData() {
return this.data;
}

public SimpleFlowChain propagateExceptionTo(AsyncBackup... backups) {
DebugUtils.Assert(backups != null, "backups in methods propagateExceptionTo() must be not null");
DebugUtils.Assert(Arrays.stream(backups).noneMatch(Objects::isNull),
"backups in propagateExceptionTo() should not contain null elements");
DebugUtils.Assert(doneHandler == null, "propagateExceptionTo() must be called before SimpleFlowChain.done()");
DebugUtils.Assert(errorHandler == null, "propagateExceptionTo() must be called before SimpleFlowChain.error()");
DebugUtils.Assert(finallyHandler == null, "propagateExceptionTo() must be called before SimpleFlowChain.Finally()");
this.asyncBackups.addAll(Arrays.asList(backups));
return this;
}

@Override
public SimpleFlowChain done(FlowDoneHandler handler) {
DebugUtils.Assert(doneHandler==null, "there has been a FlowDoneHandler installed");
doneHandler = handler;
return this;
}

@SuppressWarnings("rawtypes")
public SimpleFlowChain done(Runnable runnable) {
AsyncBackup firstAsyncBackup = this.asyncBackups.isEmpty() ? null : this.asyncBackups.get(0);
AsyncBackup[] otherAsyncBackups = this.asyncBackups.isEmpty() ? new AsyncBackup[0] :
this.asyncBackups.subList(1, this.asyncBackups.size()).toArray(new AsyncBackup[0]);

DebugUtils.Assert(runnable != null, "runnable of doneHandler should not be null");
return done(new FlowDoneHandler(firstAsyncBackup, otherAsyncBackups) {
@Override
public void handle(Map data) {
runnable.run();
}
});
}

private void collectAfterRunnable(Flow flow) {
List<Field> ad = FieldUtils.getAnnotatedFieldsOnThisClass(AfterDone.class, flow.getClass());
for (Field f : ad) {
Expand Down
101 changes: 101 additions & 0 deletions header/src/main/java/org/zstack/header/core/workflow/Flow.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package org.zstack.header.core.workflow;

import org.zstack.utils.DebugUtils;
import org.zstack.utils.FieldUtils;

import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;

public interface Flow {
void run(FlowTrigger trigger, Map data);
Expand All @@ -23,4 +27,101 @@ default String name() {
}
return String.format("%s", this.getClass().getSimpleName());
}

@SuppressWarnings("rawtypes")
public static class FlowBuilder {
public final String flowName;
private Predicate<Map> skipPredicate;
private BiConsumer<FlowTrigger, Map> triggerConsumer;
private BiConsumer<FlowRollback, Map> rollbackConsumer;

private FlowBuilder(String flowName) {
DebugUtils.Assert(flowName != null, "flowName should not be null");
this.flowName = flowName;
}

public FlowBuilder withSkipPredicate(Predicate<Map> predicate) {
DebugUtils.Assert(predicate != null, "skipPredicate of FlowBuilder should not be null");
this.skipPredicate = predicate;
return this;
}

public FlowBuilder skipIf(Predicate<Map> predicate) {
return withSkipPredicate(predicate);
}

public FlowBuilder runIf(Predicate<Map> predicate) {
DebugUtils.Assert(predicate != null, "predicate of FlowBuilder.runIf() should not be null");
return withSkipPredicate(predicate.negate());
}
Comment on lines +49 to +56
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

skipIf/runIfrollback(...) 的组合会回滚一个从未执行过的 flow

当前 SimpleFlowChain.runFlow() 的 skip 分支最终仍会走 next(),而 next() 会把 currentFlow 压进 rollBackFlows。这样一来,只要这里给可跳过的 flow 配了 rollback(...),下游失败时就会对一个从未执行过 run() 的步骤执行回滚。建议在执行器层面避免把被跳过的 flow 入栈,否则这个新 API 很容易产生错误补偿。

Also applies to: 70-109

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@header/src/main/java/org/zstack/header/core/workflow/Flow.java` around lines
49 - 56, The issue: flows skipped via FlowBuilder.skipIf/runIf are still being
pushed into rollBackFlows by SimpleFlowChain.runFlow()/next(), causing
rollback(...) to run for flows that never executed. Fix by ensuring skipped
flows are not enqueued for rollback: in SimpleFlowChain.runFlow()/next(), only
push currentFlow into rollBackFlows when its run() actually executes (or after
successful run start), or add an "executed" flag on the Flow instance and check
it before adding to rollBackFlows or before performing rollback; update the
logic that handles the skip branch to call next() without enqueuing the skipped
Flow (or pass a flag to next() to avoid push). Reference
FlowBuilder.skipIf/runIf, SimpleFlowChain.runFlow(), next(), rollBackFlows, and
rollback(...) when making the change.


public FlowBuilder handle(BiConsumer<FlowTrigger, Map> consumer) {
DebugUtils.Assert(consumer != null, "consumer of FlowBuilder.handle() should not be null");
this.triggerConsumer = consumer;
return this;
}

public FlowBuilder handle(Consumer<FlowTrigger> consumer) {
DebugUtils.Assert(consumer != null, "consumer of FlowBuilder.handle() should not be null");
this.triggerConsumer = (trigger, data) -> consumer.accept(trigger);
return this;
}

public FlowBuilder rollback(BiConsumer<FlowRollback, Map> consumer) {
DebugUtils.Assert(consumer != null, "consumer of FlowBuilder.rollback() should not be null");
this.rollbackConsumer = consumer;
return this;
}

public FlowBuilder rollback(Consumer<FlowRollback> consumer) {
DebugUtils.Assert(consumer != null, "consumer of FlowBuilder.rollback() should not be null");
this.rollbackConsumer = (trigger, data) -> consumer.accept(trigger);
return this;
}

public Flow build() {
DebugUtils.Assert(triggerConsumer != null, "handle() must be called before build()");
Predicate<Map> skipPredicateSnapshot = skipPredicate;
BiConsumer<FlowTrigger, Map> triggerConsumerSnapshot = triggerConsumer;
BiConsumer<FlowRollback, Map> rollbackConsumerSnapshot = rollbackConsumer;

return new Flow() {
@Override
public boolean skip(Map data) {
if (skipPredicateSnapshot == null) {
return false;
}
return skipPredicateSnapshot.test(data);
}

@Override
public void run(FlowTrigger trigger, Map data) {
triggerConsumerSnapshot.accept(trigger, data);
}

@Override
public void rollback(FlowRollback trigger, Map data) {
if (rollbackConsumerSnapshot == null) {
trigger.rollback();
} else {
rollbackConsumerSnapshot.accept(trigger, data);
}
}

@Override
public String name() {
return flowName;
}

@Override
public String toString() {
return name();
}
};
}
}

public static FlowBuilder of(String flowName) {
return new FlowBuilder(flowName);
}
}