diff --git a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/recovery/RecoveryIssue.java b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/recovery/RecoveryIssue.java
index 12e267c4d..7414f3fe1 100644
--- a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/recovery/RecoveryIssue.java
+++ b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/recovery/RecoveryIssue.java
@@ -15,16 +15,20 @@
*/
package io.flamingock.internal.common.core.recovery;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
/**
* Represents a recovery issue that requires manual intervention during pipeline execution.
* This domain object is specifically designed for recovery scenarios and avoids the
* inappropriate use of audit-specific objects for recovery purposes.
*/
public class RecoveryIssue {
-
+
private final String changeId;
-
- public RecoveryIssue(String changeId) {
+
+ @JsonCreator
+ public RecoveryIssue(@JsonProperty("changeId") String changeId) {
this.changeId = changeId;
}
diff --git a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/ErrorInfo.java b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/ErrorInfo.java
index 6cdc3abcc..560afdd6f 100644
--- a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/ErrorInfo.java
+++ b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/ErrorInfo.java
@@ -15,23 +15,33 @@
*/
package io.flamingock.internal.common.core.response.data;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
/**
* Contains error information when an execution fails.
+ *
+ *
{@code changeIds} carries the change identifiers associated with the error. It may be
+ * empty (e.g., lock or validate-time failures with no specific change), a single id (most
+ * stage-level failures), or several (e.g., a stage blocked by manual intervention because
+ * multiple changes need recovery).
*/
public class ErrorInfo {
private String errorType;
private String message;
- private String changeId;
+ private List changeIds;
private String stageId;
public ErrorInfo() {
+ this.changeIds = new ArrayList<>();
}
- public ErrorInfo(String errorType, String message, String changeId, String stageId) {
+ public ErrorInfo(String errorType, String message, List changeIds, String stageId) {
this.errorType = errorType;
this.message = message;
- this.changeId = changeId;
+ this.changeIds = changeIds != null ? changeIds : new ArrayList<>();
this.stageId = stageId;
}
@@ -51,12 +61,12 @@ public void setMessage(String message) {
this.message = message;
}
- public String getChangeId() {
- return changeId;
+ public List getChangeIds() {
+ return changeIds;
}
- public void setChangeId(String changeId) {
- this.changeId = changeId;
+ public void setChangeIds(List changeIds) {
+ this.changeIds = changeIds != null ? changeIds : new ArrayList<>();
}
public String getStageId() {
@@ -68,11 +78,13 @@ public void setStageId(String stageId) {
}
/**
- * Creates an ErrorInfo from a Throwable.
+ * Creates an ErrorInfo from a Throwable. Pass {@link Collections#emptyList()} when no
+ * specific change is associated with the failure, or {@link Collections#singletonList(Object)}
+ * for single-change cases.
*/
- public static ErrorInfo fromThrowable(Throwable throwable, String changeId, String stageId) {
+ public static ErrorInfo fromThrowable(Throwable throwable, List changeIds, String stageId) {
String errorType = throwable.getClass().getSimpleName();
String message = throwable.getMessage();
- return new ErrorInfo(errorType, message, changeId, stageId);
+ return new ErrorInfo(errorType, message, changeIds, stageId);
}
}
diff --git a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/StageResult.java b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/StageResult.java
index 66cd383c9..a66bb34d6 100644
--- a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/StageResult.java
+++ b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/StageResult.java
@@ -25,18 +25,19 @@ public class StageResult {
private String stageId;
private String stageName;
- private StageStatus status;
+ private StageState state;
private long durationMs;
private List changes;
public StageResult() {
this.changes = new ArrayList<>();
+ this.state = StageState.NOT_STARTED;
}
private StageResult(Builder builder) {
this.stageId = builder.stageId;
this.stageName = builder.stageName;
- this.status = builder.status;
+ this.state = builder.state != null ? builder.state : StageState.NOT_STARTED;
this.durationMs = builder.durationMs;
this.changes = builder.changes != null ? builder.changes : new ArrayList<>();
}
@@ -57,12 +58,12 @@ public void setStageName(String stageName) {
this.stageName = stageName;
}
- public StageStatus getStatus() {
- return status;
+ public StageState getState() {
+ return state;
}
- public void setStatus(StageStatus status) {
- this.status = status;
+ public void setState(StageState state) {
+ this.state = state;
}
public long getDurationMs() {
@@ -82,11 +83,11 @@ public void setChanges(List changes) {
}
public boolean isFailed() {
- return status == StageStatus.FAILED;
+ return state.isFailed();
}
public boolean isCompleted() {
- return status == StageStatus.COMPLETED;
+ return state.isCompleted();
}
public int getAppliedCount() {
@@ -111,10 +112,23 @@ public static Builder builder() {
return new Builder();
}
+ /**
+ * Creates a builder pre-populated from an existing result (useful for transitions that
+ * preserve identity fields and tweak the state).
+ */
+ public static Builder builder(StageResult source) {
+ return new Builder()
+ .stageId(source.stageId)
+ .stageName(source.stageName)
+ .state(source.state)
+ .durationMs(source.durationMs)
+ .changes(new ArrayList<>(source.changes));
+ }
+
public static class Builder {
private String stageId;
private String stageName;
- private StageStatus status;
+ private StageState state;
private long durationMs;
private List changes = new ArrayList<>();
@@ -128,8 +142,8 @@ public Builder stageName(String stageName) {
return this;
}
- public Builder status(StageStatus status) {
- this.status = status;
+ public Builder state(StageState state) {
+ this.state = state;
return this;
}
diff --git a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/StageState.java b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/StageState.java
new file mode 100644
index 000000000..e331b8131
--- /dev/null
+++ b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/StageState.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2026 Flamingock (https://www.flamingock.io)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.flamingock.internal.common.core.response.data;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import io.flamingock.internal.common.core.recovery.RecoveryIssue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
+@JsonSubTypes({
+ @JsonSubTypes.Type(value = StageState.NotStarted.class, name = "NOT_STARTED"),
+ @JsonSubTypes.Type(value = StageState.Started.class, name = "STARTED"),
+ @JsonSubTypes.Type(value = StageState.Completed.class, name = "COMPLETED"),
+ @JsonSubTypes.Type(value = StageState.Failed.class, name = "FAILED"),
+ @JsonSubTypes.Type(value = StageState.BlockedForMI.class, name = "BLOCKED_MANUAL_INTERVENTION")
+})
+public abstract class StageState {
+
+ public static final StageState NOT_STARTED = new NotStarted();
+ public static final StageState STARTED = new Started();
+ public static final StageState COMPLETED = new Completed();
+
+ public static StageState failed(ErrorInfo info) {
+ return new Failed(info);
+ }
+
+ public static StageState blockedManualIntervention(String stageName, List issues) {
+ List changeIds = issues.stream()
+ .map(RecoveryIssue::getChangeId)
+ .collect(Collectors.toList());
+ ErrorInfo errorInfo = new ErrorInfo(
+ "MANUAL_INTERVENTION_REQUIRED",
+ "Manual intervention required",
+ changeIds,
+ stageName
+ );
+ return new BlockedForMI(errorInfo, issues);
+ }
+
+ StageState() {
+ }
+
+ public boolean isNotStarted() {
+ return false;
+ }
+
+ public boolean isStarted() {
+ return false;
+ }
+
+ public boolean isCompleted() {
+ return false;
+ }
+
+ public boolean isFailed() {
+ return false;
+ }
+
+ public boolean isBlockedForManualIntervention() {
+ return false;
+ }
+
+ public Optional getErrorInfo() {
+ return Optional.empty();
+ }
+
+ public List getRecoveryIssues() {
+ return Collections.emptyList();
+ }
+
+ static final class NotStarted extends StageState {
+ @Override
+ public boolean isNotStarted() {
+ return true;
+ }
+ }
+
+ static final class Started extends StageState {
+ @Override
+ public boolean isStarted() {
+ return true;
+ }
+ }
+
+ static final class Completed extends StageState {
+ @Override
+ public boolean isCompleted() {
+ return true;
+ }
+ }
+
+ static class Failed extends StageState {
+ private final ErrorInfo errorInfo;
+
+ @JsonCreator
+ Failed(@JsonProperty("errorInfo") ErrorInfo errorInfo) {
+ this.errorInfo = errorInfo;
+ }
+
+ @Override
+ public boolean isFailed() {
+ return true;
+ }
+
+ @Override
+ public Optional getErrorInfo() {
+ return Optional.ofNullable(errorInfo);
+ }
+
+ @JsonProperty("errorInfo")
+ ErrorInfo serialisedErrorInfo() {
+ return errorInfo;
+ }
+ }
+
+ static final class BlockedForMI extends Failed {
+ private final List recoveryIssues;
+
+ @JsonCreator
+ BlockedForMI(
+ @JsonProperty("errorInfo") ErrorInfo errorInfo,
+ @JsonProperty("recoveryIssues") List recoveryIssues) {
+ super(errorInfo);
+ this.recoveryIssues = Collections.unmodifiableList(new ArrayList<>(recoveryIssues));
+ }
+
+ @Override
+ public boolean isBlockedForManualIntervention() {
+ return true;
+ }
+
+ @Override
+ @JsonProperty("recoveryIssues")
+ public List getRecoveryIssues() {
+ return recoveryIssues;
+ }
+ }
+}
diff --git a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/StageStatus.java b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/StageStatus.java
deleted file mode 100644
index 7367ce6df..000000000
--- a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/StageStatus.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2026 Flamingock (https://www.flamingock.io)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.flamingock.internal.common.core.response.data;
-
-/**
- * Status of a stage execution.
- */
-public enum StageStatus {
- /**
- * Stage completed successfully with all changes applied.
- */
- COMPLETED,
-
- /**
- * Stage failed during execution.
- */
- FAILED,
-
- /**
- * Stage was skipped (all changes already applied).
- */
- SKIPPED,
-
- /**
- * Stage was not started (due to prior failure).
- */
- NOT_STARTED
-}
diff --git a/core/flamingock-core-commons/src/test/java/io/flamingock/internal/common/core/response/ResponseSerializationTest.java b/core/flamingock-core-commons/src/test/java/io/flamingock/internal/common/core/response/ResponseSerializationTest.java
index acc4d1367..8efcaa1eb 100644
--- a/core/flamingock-core-commons/src/test/java/io/flamingock/internal/common/core/response/ResponseSerializationTest.java
+++ b/core/flamingock-core-commons/src/test/java/io/flamingock/internal/common/core/response/ResponseSerializationTest.java
@@ -23,7 +23,7 @@
import io.flamingock.internal.common.core.response.data.ExecuteResponseData;
import io.flamingock.internal.common.core.response.data.ExecutionStatus;
import io.flamingock.internal.common.core.response.data.StageResult;
-import io.flamingock.internal.common.core.response.data.StageStatus;
+import io.flamingock.internal.common.core.response.data.StageState;
import io.flamingock.internal.util.JsonObjectMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
@@ -71,7 +71,7 @@ void shouldSerializeAndDeserializeExecuteResponseData() throws Exception {
.addStage(StageResult.builder()
.stageId("stage-1")
.stageName("Stage One")
- .status(StageStatus.COMPLETED)
+ .state(StageState.COMPLETED)
.durationMs(500)
.addChange(ChangeResult.builder()
.changeId("change-001")
@@ -97,7 +97,7 @@ void shouldSerializeAndDeserializeExecuteResponseData() throws Exception {
assertEquals(0, deserialized.getFailedChanges());
assertEquals(1, deserialized.getStages().size());
assertEquals("stage-1", deserialized.getStages().get(0).getStageId());
- assertEquals(StageStatus.COMPLETED, deserialized.getStages().get(0).getStatus());
+ assertTrue(deserialized.getStages().get(0).getState().isCompleted());
assertEquals(1, deserialized.getStages().get(0).getChanges().size());
assertEquals("change-001", deserialized.getStages().get(0).getChanges().get(0).getChangeId());
}
diff --git a/core/flamingock-core-commons/src/test/java/io/flamingock/internal/common/core/response/data/StageStateSerializationTest.java b/core/flamingock-core-commons/src/test/java/io/flamingock/internal/common/core/response/data/StageStateSerializationTest.java
new file mode 100644
index 000000000..09091064a
--- /dev/null
+++ b/core/flamingock-core-commons/src/test/java/io/flamingock/internal/common/core/response/data/StageStateSerializationTest.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2026 Flamingock (https://www.flamingock.io)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.flamingock.internal.common.core.response.data;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.flamingock.internal.common.core.recovery.RecoveryIssue;
+import io.flamingock.internal.util.JsonObjectMapper;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class StageStateSerializationTest {
+
+ private final ObjectMapper mapper = JsonObjectMapper.DEFAULT_INSTANCE.copy();
+
+ @Test
+ void roundTripNotStarted() throws Exception {
+ String json = mapper.writeValueAsString(StageState.NOT_STARTED);
+ assertTrue(json.contains("\"type\":\"NOT_STARTED\""));
+
+ StageState back = mapper.readValue(json, StageState.class);
+ assertTrue(back.isNotStarted());
+ assertFalse(back.isFailed());
+ }
+
+ @Test
+ void roundTripStarted() throws Exception {
+ String json = mapper.writeValueAsString(StageState.STARTED);
+ assertTrue(json.contains("\"type\":\"STARTED\""));
+
+ StageState back = mapper.readValue(json, StageState.class);
+ assertTrue(back.isStarted());
+ }
+
+ @Test
+ void roundTripCompleted() throws Exception {
+ String json = mapper.writeValueAsString(StageState.COMPLETED);
+ assertTrue(json.contains("\"type\":\"COMPLETED\""));
+
+ StageState back = mapper.readValue(json, StageState.class);
+ assertTrue(back.isCompleted());
+ }
+
+ @Test
+ void roundTripFailed() throws Exception {
+ ErrorInfo info = new ErrorInfo("RuntimeException", "boom", java.util.Collections.singletonList("change-1"), "stage-1");
+ StageState failed = StageState.failed(info);
+
+ String json = mapper.writeValueAsString(failed);
+ assertTrue(json.contains("\"type\":\"FAILED\""));
+ assertTrue(json.contains("\"errorInfo\""));
+
+ StageState back = mapper.readValue(json, StageState.class);
+ assertTrue(back.isFailed());
+ ErrorInfo backInfo = back.getErrorInfo().orElseThrow(AssertionError::new);
+ assertEquals("RuntimeException", backInfo.getErrorType());
+ assertEquals("boom", backInfo.getMessage());
+ assertEquals(java.util.Collections.singletonList("change-1"), backInfo.getChangeIds());
+ assertEquals("stage-1", backInfo.getStageId());
+ }
+
+ @Test
+ void roundTripBlockedForMI() throws Exception {
+ List issues = Arrays.asList(new RecoveryIssue("c1"), new RecoveryIssue("c2"));
+ StageState blocked = StageState.blockedManualIntervention("stage-1", issues);
+
+ String json = mapper.writeValueAsString(blocked);
+ assertTrue(json.contains("\"type\":\"BLOCKED_MANUAL_INTERVENTION\""));
+ assertTrue(json.contains("\"errorInfo\"")); // inherited from Failed
+ assertTrue(json.contains("\"recoveryIssues\""));
+
+ StageState back = mapper.readValue(json, StageState.class);
+ assertTrue(back.isBlockedForManualIntervention());
+ assertTrue(back.isFailed());
+ // ErrorInfo round-tripped
+ ErrorInfo info = back.getErrorInfo().orElseThrow(AssertionError::new);
+ assertEquals("MANUAL_INTERVENTION_REQUIRED", info.getErrorType());
+ assertEquals("stage-1", info.getStageId());
+ // Recovery issues round-tripped
+ assertEquals(2, back.getRecoveryIssues().size());
+ assertEquals("c1", back.getRecoveryIssues().get(0).getChangeId());
+ assertEquals("c2", back.getRecoveryIssues().get(1).getChangeId());
+ }
+
+ @Test
+ void roundTripStageResultWithFailedState() throws Exception {
+ ErrorInfo info = new ErrorInfo("RuntimeException", "boom", java.util.Collections.singletonList("change-1"), "stage-1");
+ StageResult result = StageResult.builder()
+ .stageId("stage-1")
+ .stageName("stage-1")
+ .state(StageState.failed(info))
+ .durationMs(123)
+ .build();
+
+ String json = mapper.writeValueAsString(result);
+ StageResult back = mapper.readValue(json, StageResult.class);
+
+ assertEquals("stage-1", back.getStageId());
+ assertTrue(back.getState().isFailed());
+ assertEquals("boom", back.getState().getErrorInfo().orElseThrow(AssertionError::new).getMessage());
+ }
+}
diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperation.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperation.java
index 6ed835bb2..c9a600f92 100644
--- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperation.java
+++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperation.java
@@ -141,7 +141,7 @@ private ExecuteResponseData execute(LoadedPipeline pipeline) throws FlamingockEx
break;
}
} catch (LockException exception) {
-
+ pipelineRun.markPipelineFailed(exception);
eventPublisher.publish(new StageFailedEvent(exception));
eventPublisher.publish(new PipelineFailedEvent(exception));
if (throwExceptionIfCannotObtainLock) {
@@ -165,6 +165,20 @@ private ExecuteResponseData execute(LoadedPipeline pipeline) throws FlamingockEx
}
pipelineRun.stop();
throw OperationException.fromExisting(e.getCause(), pipelineRun.toResponse());
+ } catch (ManualInterventionRequiredException miEx) {
+ // Per-stage attribution: each RecoveryIssue is mapped to its owning stage and that
+ // stage's state becomes BLOCKED_MANUAL_INTERVENTION with the subset of issues
+ // affecting it. The blocked stages carry their own ErrorInfo, so no pipeline-level
+ // mark is needed.
+ pipelineRun.markStagesBlockedFromMI(miEx);
+ throw miEx;
+ } catch (FlamingockException e) {
+ // Validate-time exceptions (aborted FlamingockException, PendingChangesException)
+ // and any FlamingockException that bubbles up from runStage's generic-throwable
+ // handler. Stage-level failures (if any) have already been recorded; pipeline-level
+ // marking is idempotent (first-wins) and toResponse() keeps the stage error as primary.
+ pipelineRun.markPipelineFailed(e);
+ throw e;
}
} while (true);
@@ -188,6 +202,7 @@ private void runStage(String executionId, Lock lock, ExecutableStage executableS
eventPublisher.publish(new PipelineFailedEvent(exception));
throw exception;
} catch (Throwable generalException) {
+ pipelineRun.markStageFailed(executableStage.getName(), generalException);
throw processAndGetFlamingockException(generalException);
}
}
@@ -203,7 +218,7 @@ private void startStage(String executionId, Lock lock, ExecutableStage executabl
eventPublisher.publish(new StageCompletedEvent(executionOutput));
}
- private FlamingockException processAndGetFlamingockException(Throwable exception) throws FlamingockException {
+private FlamingockException processAndGetFlamingockException(Throwable exception) throws FlamingockException {
FlamingockException flamingockException;
if (exception instanceof OperationException) {
OperationException pipelineException = (OperationException) exception;
diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/result/StageResultBuilder.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/result/StageResultBuilder.java
index 197577c35..fe50bdcad 100644
--- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/result/StageResultBuilder.java
+++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/result/StageResultBuilder.java
@@ -16,8 +16,9 @@
package io.flamingock.internal.core.operation.result;
import io.flamingock.internal.common.core.response.data.ChangeResult;
+import io.flamingock.internal.common.core.response.data.ErrorInfo;
import io.flamingock.internal.common.core.response.data.StageResult;
-import io.flamingock.internal.common.core.response.data.StageStatus;
+import io.flamingock.internal.common.core.response.data.StageState;
import java.time.Duration;
import java.time.LocalDateTime;
@@ -31,7 +32,7 @@ public class StageResultBuilder {
private String stageId;
private String stageName;
- private StageStatus status;
+ private StageState state;
private long durationMs;
private List changes = new ArrayList<>();
private LocalDateTime startTime;
@@ -61,28 +62,28 @@ public StageResultBuilder stopTimer() {
return this;
}
- public StageResultBuilder status(StageStatus status) {
- this.status = status;
+ public StageResultBuilder state(StageState state) {
+ this.state = state;
return this;
}
public StageResultBuilder completed() {
- this.status = StageStatus.COMPLETED;
+ this.state = StageState.COMPLETED;
return this;
}
public StageResultBuilder failed() {
- this.status = StageStatus.FAILED;
+ this.state = StageState.failed(null);
return this;
}
- public StageResultBuilder skipped() {
- this.status = StageStatus.SKIPPED;
+ public StageResultBuilder failed(ErrorInfo errorInfo) {
+ this.state = StageState.failed(errorInfo);
return this;
}
public StageResultBuilder notStarted() {
- this.status = StageStatus.NOT_STARTED;
+ this.state = StageState.NOT_STARTED;
return this;
}
@@ -109,7 +110,7 @@ public StageResult build() {
return StageResult.builder()
.stageId(stageId)
.stageName(stageName)
- .status(status)
+ .state(state)
.durationMs(durationMs)
.changes(changes)
.build();
diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/PipelineRun.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/PipelineRun.java
index 49d2fc08c..e161c5a9b 100644
--- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/PipelineRun.java
+++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/PipelineRun.java
@@ -15,13 +15,16 @@
*/
package io.flamingock.internal.core.pipeline.run;
+import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException;
+import io.flamingock.internal.common.core.recovery.RecoveryIssue;
import io.flamingock.internal.common.core.response.data.ChangeResult;
import io.flamingock.internal.common.core.response.data.ChangeStatus;
import io.flamingock.internal.common.core.response.data.ErrorInfo;
import io.flamingock.internal.common.core.response.data.ExecuteResponseData;
import io.flamingock.internal.common.core.response.data.ExecutionStatus;
import io.flamingock.internal.common.core.response.data.StageResult;
-import io.flamingock.internal.common.core.response.data.StageStatus;
+import io.flamingock.internal.common.core.response.data.StageState;
+import io.flamingock.internal.core.change.loaded.AbstractLoadedChange;
import io.flamingock.internal.core.pipeline.execution.StageExecutionException;
import io.flamingock.internal.core.pipeline.loaded.LoadedPipeline;
import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage;
@@ -33,6 +36,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
public class PipelineRun {
@@ -56,6 +60,7 @@ public static PipelineRun of(List stages) {
private final Map byName;
private Instant startedAt;
private Instant stoppedAt;
+ private ErrorInfo pipelineError;
private PipelineRun(List stageRuns) {
this.stageRuns = Collections.unmodifiableList(stageRuns);
@@ -89,25 +94,90 @@ public void stop() {
}
public void markStageStarted(String stageName) {
- lookupOrThrow(stageName).setState(StageState.STARTED);
+ StageRun run = lookupOrThrow(stageName);
+ run.setResult(StageResult.builder(run.getResult())
+ .state(StageState.STARTED)
+ .build());
}
public void markStageCompleted(String stageName, StageResult result) {
- StageRun run = lookupOrThrow(stageName);
- run.setState(StageState.COMPLETED);
- run.setResult(result);
+ // Incoming result from the executor already carries state=COMPLETED.
+ lookupOrThrow(stageName).setResult(result);
}
public void markStageFailed(String stageName, StageExecutionException exception) {
- StageResult result = exception.getResult();
+ StageResult resultFromExecutor = exception.getResult();
ErrorInfo errorInfo = ErrorInfo.fromThrowable(
exception.getCause(),
- exception.getFailedChangeId(),
- result.getStageId()
+ Collections.singletonList(exception.getFailedChangeId()),
+ resultFromExecutor.getStageId()
);
StageRun run = lookupOrThrow(stageName);
- run.setState(StageState.failed(errorInfo));
- run.setResult(result);
+ run.setResult(StageResult.builder(resultFromExecutor)
+ .state(StageState.failed(errorInfo))
+ .build());
+ }
+
+ /**
+ * Stage-scope failure for non-{@link StageExecutionException} Throwables (no enriched
+ * {@link StageResult} from the executor; we rebuild the existing one with a Failed state).
+ */
+ public void markStageFailed(String stageName, Throwable cause) {
+ StageRun run = lookupOrThrow(stageName);
+ ErrorInfo errorInfo = ErrorInfo.fromThrowable(cause, Collections.emptyList(), stageName);
+ run.setResult(StageResult.builder(run.getResult())
+ .state(StageState.failed(errorInfo))
+ .build());
+ }
+
+ /**
+ * Attributes a {@link ManualInterventionRequiredException} to its owning stages. Each
+ * {@link RecoveryIssue} is resolved to the stage that owns the change, and that stage's
+ * state becomes {@link StageState#blockedManualIntervention(String, List)} with the subset
+ * of issues that affect it.
+ *
+ * @throws IllegalStateException if any issue's changeId can't be resolved to a loaded
+ * stage (signals a planner/loaded-stage inconsistency).
+ */
+ public void markStagesBlockedFromMI(ManualInterventionRequiredException exception) {
+ Map> issuesByStage = new LinkedHashMap<>();
+ for (RecoveryIssue issue : exception.getConflictingChanges()) {
+ String stageName = findStageForChange(issue.getChangeId());
+ issuesByStage.computeIfAbsent(stageName, k -> new ArrayList<>()).add(issue);
+ }
+ for (Map.Entry> entry : issuesByStage.entrySet()) {
+ StageRun run = lookupOrThrow(entry.getKey());
+ run.setResult(StageResult.builder(run.getResult())
+ .state(StageState.blockedManualIntervention(entry.getKey(), entry.getValue()))
+ .build());
+ }
+ }
+
+ private String findStageForChange(String changeId) {
+ for (StageRun stageRun : stageRuns) {
+ for (AbstractLoadedChange change : stageRun.getLoadedStage().getChanges()) {
+ if (changeId.equals(change.getId())) {
+ return stageRun.getName();
+ }
+ }
+ }
+ throw new IllegalStateException(
+ "Cannot attribute manual-intervention issue: changeId '" + changeId
+ + "' is not part of any loaded stage in this run. "
+ + "This indicates a planner/loaded-stage inconsistency.");
+ }
+
+ /**
+ * Pipeline-wide failure marker. Idempotent: first call wins.
+ */
+ public void markPipelineFailed(Throwable cause) {
+ if (this.pipelineError == null) {
+ this.pipelineError = ErrorInfo.fromThrowable(cause, Collections.emptyList(), null);
+ }
+ }
+
+ public Optional getPipelineError() {
+ return Optional.ofNullable(pipelineError);
}
private StageRun lookupOrThrow(String stageName) {
@@ -131,17 +201,19 @@ public ExecuteResponseData toResponse() {
boolean anyFailed = false;
for (StageRun stageRun : stageRuns) {
- StageResult stageResult = stageRun.getResult().orElse(null);
- if (stageResult == null) {
- continue;
- }
+ StageResult stageResult = stageRun.getResult();
stages.add(stageResult);
totalStages++;
- if (stageResult.getStatus() == StageStatus.COMPLETED) {
+ StageState state = stageResult.getState();
+ if (state.isCompleted()) {
completedStages++;
- } else if (stageResult.getStatus() == StageStatus.FAILED) {
+ } else if (state.isFailed()) {
failedStages++;
+ anyFailed = true;
+ if (error == null) {
+ error = state.getErrorInfo().orElse(null);
+ }
}
if (stageResult.getChanges() != null) {
@@ -156,16 +228,13 @@ public ExecuteResponseData toResponse() {
}
}
}
-
- if (stageRun.getState().isFailed()) {
- anyFailed = true;
- if (error == null) {
- error = stageRun.getState().getErrorInfo().orElse(null);
- }
- }
}
- ExecutionStatus status = anyFailed ? ExecutionStatus.FAILED : ExecutionStatus.SUCCESS;
+ if (error == null) {
+ error = this.pipelineError;
+ }
+ boolean pipelineFailed = anyFailed || this.pipelineError != null;
+ ExecutionStatus status = pipelineFailed ? ExecutionStatus.FAILED : ExecutionStatus.SUCCESS;
long durationMs = (startedAt != null && stoppedAt != null)
? Duration.between(startedAt, stoppedAt).toMillis()
: 0L;
diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageRun.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageRun.java
index 5566070f5..a196dcc5e 100644
--- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageRun.java
+++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageRun.java
@@ -16,18 +16,21 @@
package io.flamingock.internal.core.pipeline.run;
import io.flamingock.internal.common.core.response.data.StageResult;
+import io.flamingock.internal.common.core.response.data.StageState;
import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage;
-import java.util.Optional;
-
public class StageRun {
private final AbstractLoadedStage loadedStage;
- private StageState state = StageState.NOT_STARTED;
private StageResult result;
public StageRun(AbstractLoadedStage loadedStage) {
this.loadedStage = loadedStage;
+ this.result = StageResult.builder()
+ .stageId(loadedStage.getName())
+ .stageName(loadedStage.getName())
+ .state(StageState.NOT_STARTED)
+ .build();
}
public String getName() {
@@ -38,16 +41,12 @@ public AbstractLoadedStage getLoadedStage() {
return loadedStage;
}
- public StageState getState() {
- return state;
+ public StageResult getResult() {
+ return result;
}
- public Optional getResult() {
- return Optional.ofNullable(result);
- }
-
- void setState(StageState state) {
- this.state = state;
+ public StageState getState() {
+ return result.getState();
}
void setResult(StageResult result) {
diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageState.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageState.java
deleted file mode 100644
index f2178cd41..000000000
--- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageState.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright 2026 Flamingock (https://www.flamingock.io)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.flamingock.internal.core.pipeline.run;
-
-import io.flamingock.internal.common.core.response.data.ErrorInfo;
-
-import java.util.Optional;
-
-public abstract class StageState {
-
- public static final StageState NOT_STARTED = new NotStarted();
- public static final StageState STARTED = new Started();
- public static final StageState COMPLETED = new Completed();
-
- public static StageState failed(ErrorInfo info) {
- return new Failed(info);
- }
-
- private StageState() {
- }
-
- public boolean isFailed() {
- return false;
- }
-
- public Optional getErrorInfo() {
- return Optional.empty();
- }
-
- private static final class NotStarted extends StageState {
- }
-
- private static final class Started extends StageState {
- }
-
- private static final class Completed extends StageState {
- }
-
- private static final class Failed extends StageState {
- private final ErrorInfo info;
-
- Failed(ErrorInfo info) {
- this.info = info;
- }
-
- @Override
- public boolean isFailed() {
- return true;
- }
-
- @Override
- public Optional getErrorInfo() {
- return Optional.of(info);
- }
- }
-}
diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperationTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperationTest.java
index 6770a4547..464a39934 100644
--- a/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperationTest.java
+++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperationTest.java
@@ -18,6 +18,7 @@
import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException;
import io.flamingock.internal.common.core.recovery.action.ChangeAction;
import io.flamingock.internal.core.change.executable.ExecutableChange;
+import io.flamingock.internal.core.change.loaded.AbstractLoadedChange;
import io.flamingock.internal.core.event.EventPublisher;
import io.flamingock.internal.core.operation.execute.ExecuteApplyOperation;
import io.flamingock.internal.core.operation.execute.ExecuteArgs;
@@ -51,7 +52,7 @@ void shouldThrowManualInterventionWhenPlannerReturnsAbort() {
ExecutionPlanner planner = mock(ExecutionPlanner.class);
when(planner.getNextExecution(any(PipelineRun.class))).thenReturn(abortPlan);
- LoadedPipeline pipeline = mockPipeline();
+ LoadedPipeline pipeline = mockPipeline("change-1");
ExecuteApplyOperation operation = buildOperation(planner);
ManualInterventionRequiredException ex = assertThrows(
@@ -95,9 +96,19 @@ private static ExecuteApplyOperation buildOperation(ExecutionPlanner planner, St
);
}
- private static LoadedPipeline mockPipeline() {
+ private static LoadedPipeline mockPipeline(String... loadedChangeIds) {
+ AbstractLoadedStage loadedStage = mock(AbstractLoadedStage.class);
+ when(loadedStage.getName()).thenReturn("stage-1");
+ java.util.List loadedChanges = new java.util.ArrayList<>();
+ for (String id : loadedChangeIds) {
+ AbstractLoadedChange ch = mock(AbstractLoadedChange.class);
+ when(ch.getId()).thenReturn(id);
+ loadedChanges.add(ch);
+ }
+ when(loadedStage.getChanges()).thenReturn(loadedChanges);
+
LoadedPipeline pipeline = mock(LoadedPipeline.class);
- when(pipeline.getStages()).thenReturn(Collections.singletonList(mock(AbstractLoadedStage.class)));
+ when(pipeline.getStages()).thenReturn(Collections.singletonList(loadedStage));
when(pipeline.getSystemStage()).thenReturn(java.util.Optional.empty());
doNothing().when(pipeline).validate();
return pipeline;
diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/ExecuteApplyOperationTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/ExecuteApplyOperationTest.java
index bef3b7ccb..aea58b6c8 100644
--- a/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/ExecuteApplyOperationTest.java
+++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/ExecuteApplyOperationTest.java
@@ -19,7 +19,7 @@
import io.flamingock.internal.common.core.response.data.ChangeStatus;
import io.flamingock.internal.common.core.response.data.ExecutionStatus;
import io.flamingock.internal.common.core.response.data.StageResult;
-import io.flamingock.internal.common.core.response.data.StageStatus;
+import io.flamingock.internal.common.core.response.data.StageState;
import io.flamingock.internal.core.event.EventPublisher;
import io.flamingock.internal.core.operation.execute.ExecuteApplyOperation;
import io.flamingock.internal.core.operation.execute.ExecuteArgs;
@@ -173,7 +173,7 @@ private StageResult createSuccessStageResult(String stageId, int applied, int sk
StageResult.Builder builder = StageResult.builder()
.stageId(stageId)
.stageName(stageId)
- .status(StageStatus.COMPLETED)
+ .state(StageState.COMPLETED)
.durationMs(100);
for (int i = 0; i < applied; i++) {
@@ -198,7 +198,7 @@ private StageResult createFailedStageResult(String stageId, String failedChangeI
return StageResult.builder()
.stageId(stageId)
.stageName(stageId)
- .status(StageStatus.FAILED)
+ .state(StageState.failed(null))
.durationMs(50)
.addChange(ChangeResult.builder()
.changeId(failedChangeId)
diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunTest.java
index f1d3fed65..6c0fb3c81 100644
--- a/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunTest.java
+++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunTest.java
@@ -15,9 +15,12 @@
*/
package io.flamingock.internal.core.pipeline.run;
+import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException;
+import io.flamingock.internal.common.core.recovery.RecoveryIssue;
import io.flamingock.internal.common.core.response.data.ErrorInfo;
import io.flamingock.internal.common.core.response.data.StageResult;
-import io.flamingock.internal.common.core.response.data.StageStatus;
+import io.flamingock.internal.common.core.response.data.StageState;
+import io.flamingock.internal.core.change.loaded.AbstractLoadedChange;
import io.flamingock.internal.core.pipeline.execution.StageExecutionException;
import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage;
import org.junit.jupiter.api.Test;
@@ -26,6 +29,7 @@
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -68,7 +72,7 @@ void markStageFailedDerivesErrorInfoFromException() {
StageResult stageResult = StageResult.builder()
.stageId("alpha")
.stageName("alpha")
- .status(StageStatus.FAILED)
+ .state(StageState.failed(null))
.build();
RuntimeException cause = new RuntimeException("boom");
StageExecutionException exception = StageExecutionException.fromResult(cause, stageResult, "change-1");
@@ -79,7 +83,7 @@ void markStageFailedDerivesErrorInfoFromException() {
StageState state = pipelineRun.getStageRun("alpha").getState();
assertTrue(state.isFailed());
ErrorInfo info = state.getErrorInfo().get();
- assertEquals("change-1", info.getChangeId());
+ assertEquals(java.util.Collections.singletonList("change-1"), info.getChangeIds());
assertEquals("alpha", info.getStageId());
}
@@ -101,6 +105,93 @@ void getStageRunReturnsNullForUnknownStage() {
assertNull(pipelineRun.getStageRun("missing"));
}
+ @Test
+ void markStageFailedWithThrowableSynthesisesErrorInfo() {
+ AbstractLoadedStage a = mockStage("alpha");
+ PipelineRun pipelineRun = PipelineRun.of(java.util.Collections.singletonList(a));
+
+ RuntimeException cause = new RuntimeException("boom");
+ pipelineRun.markStageFailed("alpha", cause);
+
+ StageState state = pipelineRun.getStageRun("alpha").getState();
+ assertTrue(state.isFailed());
+ ErrorInfo info = state.getErrorInfo().get();
+ assertEquals("alpha", info.getStageId());
+ assertEquals("RuntimeException", info.getErrorType());
+ assertEquals("boom", info.getMessage());
+ }
+
+ @Test
+ void markPipelineFailedIsIdempotentFirstWins() {
+ AbstractLoadedStage a = mockStage("alpha");
+ PipelineRun pipelineRun = PipelineRun.of(java.util.Collections.singletonList(a));
+
+ pipelineRun.markPipelineFailed(new RuntimeException("first"));
+ pipelineRun.markPipelineFailed(new IllegalStateException("second"));
+
+ ErrorInfo info = pipelineRun.getPipelineError().get();
+ assertEquals("RuntimeException", info.getErrorType());
+ assertEquals("first", info.getMessage());
+ }
+
+ @Test
+ void getPipelineErrorIsEmptyUntilMarked() {
+ AbstractLoadedStage a = mockStage("alpha");
+ PipelineRun pipelineRun = PipelineRun.of(java.util.Collections.singletonList(a));
+
+ assertFalse(pipelineRun.getPipelineError().isPresent());
+ }
+
+ @Test
+ void markStagesBlockedFromMIGroupsIssuesByOwningStage() {
+ AbstractLoadedChange alphaChange1 = mockChange("alpha-c1");
+ AbstractLoadedChange alphaChange2 = mockChange("alpha-c2");
+ AbstractLoadedChange betaChange = mockChange("beta-c1");
+
+ AbstractLoadedStage alpha = mockStageWithChanges("alpha", alphaChange1, alphaChange2);
+ AbstractLoadedStage beta = mockStageWithChanges("beta", betaChange);
+ AbstractLoadedStage gamma = mockStageWithChanges("gamma"); // no MI'd changes
+
+ PipelineRun pipelineRun = PipelineRun.of(Arrays.asList(alpha, beta, gamma));
+
+ List issues = Arrays.asList(
+ new RecoveryIssue("alpha-c1"),
+ new RecoveryIssue("alpha-c2"),
+ new RecoveryIssue("beta-c1"));
+ ManualInterventionRequiredException miEx = new ManualInterventionRequiredException(issues, "alpha");
+
+ pipelineRun.markStagesBlockedFromMI(miEx);
+
+ StageState alphaState = pipelineRun.getStageRun("alpha").getState();
+ assertTrue(alphaState.isBlockedForManualIntervention());
+ assertEquals(2, alphaState.getRecoveryIssues().size());
+
+ StageState betaState = pipelineRun.getStageRun("beta").getState();
+ assertTrue(betaState.isBlockedForManualIntervention());
+ assertEquals(1, betaState.getRecoveryIssues().size());
+ assertEquals("beta-c1", betaState.getRecoveryIssues().get(0).getChangeId());
+
+ // No MI'd changes for gamma → stays NOT_STARTED.
+ assertSame(StageState.NOT_STARTED, pipelineRun.getStageRun("gamma").getState());
+ }
+
+ @Test
+ void markStagesBlockedFromMIThrowsForUnknownChange() {
+ AbstractLoadedStage alpha = mockStageWithChanges("alpha", mockChange("alpha-c1"));
+ PipelineRun pipelineRun = PipelineRun.of(java.util.Collections.singletonList(alpha));
+
+ ManualInterventionRequiredException miEx = new ManualInterventionRequiredException(
+ Arrays.asList(new RecoveryIssue("ghost-change")), "alpha");
+
+ IllegalStateException ex = assertThrows(
+ IllegalStateException.class,
+ () -> pipelineRun.markStagesBlockedFromMI(miEx));
+ assertTrue(ex.getMessage().contains("ghost-change"));
+
+ // No partial state — the alpha stage stays NOT_STARTED.
+ assertSame(StageState.NOT_STARTED, pipelineRun.getStageRun("alpha").getState());
+ }
+
@Test
void getLoadedStagesReturnsTheUnderlyingStagesInOrder() {
AbstractLoadedStage a = mockStage("alpha");
@@ -119,4 +210,17 @@ private static AbstractLoadedStage mockStage(String name) {
when(stage.getName()).thenReturn(name);
return stage;
}
+
+ private static AbstractLoadedStage mockStageWithChanges(String name, AbstractLoadedChange... changes) {
+ AbstractLoadedStage stage = mock(AbstractLoadedStage.class);
+ when(stage.getName()).thenReturn(name);
+ when(stage.getChanges()).thenReturn(Arrays.asList(changes));
+ return stage;
+ }
+
+ private static AbstractLoadedChange mockChange(String id) {
+ AbstractLoadedChange change = mock(AbstractLoadedChange.class);
+ when(change.getId()).thenReturn(id);
+ return change;
+ }
}
diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunToResponseTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunToResponseTest.java
index 352a790a4..26d7e2f71 100644
--- a/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunToResponseTest.java
+++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunToResponseTest.java
@@ -15,13 +15,16 @@
*/
package io.flamingock.internal.core.pipeline.run;
+import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException;
+import io.flamingock.internal.common.core.recovery.RecoveryIssue;
import io.flamingock.internal.common.core.response.data.ChangeResult;
import io.flamingock.internal.common.core.response.data.ChangeStatus;
import io.flamingock.internal.common.core.response.data.ErrorInfo;
import io.flamingock.internal.common.core.response.data.ExecuteResponseData;
import io.flamingock.internal.common.core.response.data.ExecutionStatus;
import io.flamingock.internal.common.core.response.data.StageResult;
-import io.flamingock.internal.common.core.response.data.StageStatus;
+import io.flamingock.internal.common.core.response.data.StageState;
+import io.flamingock.internal.core.change.loaded.AbstractLoadedChange;
import io.flamingock.internal.core.pipeline.execution.StageExecutionException;
import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage;
import org.junit.jupiter.api.Test;
@@ -31,6 +34,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -103,28 +107,105 @@ void oneCompletedAndOneFailedYieldsFailedAndPipelineErrorMatchesStage() {
assertEquals(1, response.getAppliedChanges());
assertEquals(1, response.getFailedChanges());
ErrorInfo error = response.getError();
- assertEquals("change-b1", error.getChangeId());
+ assertEquals(java.util.Collections.singletonList("change-b1"), error.getChangeIds());
assertEquals("beta", error.getStageId());
assertEquals("RuntimeException", error.getErrorType());
assertEquals("boom", error.getMessage());
}
@Test
- void stagesNeverReachingTerminalStateAreNotReported() {
+ void pipelineFailedWithNoStageFailuresYieldsFailedAndPipelineError() {
+ AbstractLoadedStage stageA = mockStage("alpha");
+ PipelineRun pipelineRun = PipelineRun.of(java.util.Collections.singletonList(stageA));
+
+ pipelineRun.start();
+ pipelineRun.markPipelineFailed(new RuntimeException("lock not acquired"));
+ pipelineRun.stop();
+
+ ExecuteResponseData response = pipelineRun.toResponse();
+
+ assertEquals(ExecutionStatus.FAILED, response.getStatus());
+ // Stage was NOT_STARTED — it appears in response.stages with state=NOT_STARTED, but
+ // it doesn't count toward failedStages/completedStages.
+ assertEquals(1, response.getTotalStages());
+ assertEquals(0, response.getCompletedStages());
+ assertEquals(0, response.getFailedStages());
+ assertTrue(response.getStages().get(0).getState().isNotStarted());
+ ErrorInfo error = response.getError();
+ assertEquals("RuntimeException", error.getErrorType());
+ assertEquals("lock not acquired", error.getMessage());
+ }
+
+ @Test
+ void stageErrorTakesPrecedenceOverPipelineError() {
+ AbstractLoadedStage stageA = mockStage("alpha");
+ PipelineRun pipelineRun = PipelineRun.of(java.util.Collections.singletonList(stageA));
+
+ RuntimeException stageCause = new RuntimeException("stage boom");
+ StageExecutionException stageException = StageExecutionException.fromResult(
+ stageCause, failedStageResult("alpha"), "change-1");
+
+ pipelineRun.start();
+ pipelineRun.markStageFailed("alpha", stageException);
+ pipelineRun.markPipelineFailed(new IllegalStateException("late pipeline error"));
+ pipelineRun.stop();
+
+ ExecuteResponseData response = pipelineRun.toResponse();
+
+ assertEquals(ExecutionStatus.FAILED, response.getStatus());
+ ErrorInfo error = response.getError();
+ // Stage-level error wins over pipeline-level
+ assertEquals("RuntimeException", error.getErrorType());
+ assertEquals("stage boom", error.getMessage());
+ assertEquals("alpha", error.getStageId());
+ }
+
+ @Test
+ void blockedForMIStageIsTrackedInResponseWithBlockedState() {
+ AbstractLoadedChange c1 = mockChange("c1");
+ AbstractLoadedStage alpha = mockStageWithChanges("alpha", c1);
+ PipelineRun pipelineRun = PipelineRun.of(java.util.Collections.singletonList(alpha));
+
+ pipelineRun.start();
+ ManualInterventionRequiredException miEx = new ManualInterventionRequiredException(
+ java.util.Arrays.asList(new RecoveryIssue("c1")), "alpha");
+ pipelineRun.markStagesBlockedFromMI(miEx);
+ pipelineRun.stop();
+
+ ExecuteResponseData response = pipelineRun.toResponse();
+
+ // Blocked stage appears in response.stages with its BlockedForMI state (carrying the
+ // RecoveryIssue list). It also contributes to failedStages because BlockedForMI.isFailed()
+ // returns true (preserves today's FAILED pipeline status).
+ assertEquals(1, response.getTotalStages());
+ assertEquals(1, response.getStages().size());
+ assertEquals(1, response.getFailedStages());
+ assertTrue(response.getStages().get(0).getState().isBlockedForManualIntervention());
+ assertEquals(1, response.getStages().get(0).getState().getRecoveryIssues().size());
+ // Pipeline-level failure surfaces via pipelineError as today.
+ assertEquals(ExecutionStatus.FAILED, response.getStatus());
+ assertNotNull(response.getError());
+ }
+
+ @Test
+ void notStartedStagesAreReportedInResponseWithNotStartedState() {
AbstractLoadedStage stageA = mockStage("alpha");
AbstractLoadedStage stageB = mockStage("beta");
PipelineRun pipelineRun = PipelineRun.of(Arrays.asList(stageA, stageB));
pipelineRun.start();
pipelineRun.markStageCompleted("alpha", completedStageResult("alpha", 1, 0));
- // beta never advances past NOT_STARTED — no StageResult attached
+ // beta never advances past NOT_STARTED — still appears in the response.
pipelineRun.stop();
ExecuteResponseData response = pipelineRun.toResponse();
- assertEquals(1, response.getTotalStages());
+ assertEquals(2, response.getTotalStages());
assertEquals(1, response.getCompletedStages());
- assertEquals(1, response.getStages().size());
+ assertEquals(0, response.getFailedStages());
+ assertEquals(2, response.getStages().size());
+ assertTrue(response.getStages().get(0).getState().isCompleted());
+ assertTrue(response.getStages().get(1).getState().isNotStarted());
}
private static AbstractLoadedStage mockStage(String name) {
@@ -133,11 +214,24 @@ private static AbstractLoadedStage mockStage(String name) {
return stage;
}
+ private static AbstractLoadedStage mockStageWithChanges(String name, AbstractLoadedChange... changes) {
+ AbstractLoadedStage stage = mock(AbstractLoadedStage.class);
+ when(stage.getName()).thenReturn(name);
+ when(stage.getChanges()).thenReturn(Arrays.asList(changes));
+ return stage;
+ }
+
+ private static AbstractLoadedChange mockChange(String id) {
+ AbstractLoadedChange change = mock(AbstractLoadedChange.class);
+ when(change.getId()).thenReturn(id);
+ return change;
+ }
+
private static StageResult completedStageResult(String name, int applied, int skipped) {
StageResult.Builder builder = StageResult.builder()
.stageId(name)
.stageName(name)
- .status(StageStatus.COMPLETED);
+ .state(StageState.COMPLETED);
for (int i = 0; i < applied; i++) {
builder.addChange(ChangeResult.builder().changeId(name + "-a" + i).status(ChangeStatus.APPLIED).build());
}
@@ -151,7 +245,7 @@ private static StageResult failedStageResult(String name) {
return StageResult.builder()
.stageId(name)
.stageName(name)
- .status(StageStatus.FAILED)
+ .state(StageState.failed(null))
.addChange(ChangeResult.builder().changeId(name + "-f0").status(ChangeStatus.FAILED).build())
.build();
}
diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/StageStateTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/StageStateTest.java
index dd70b6cd1..62e5b4919 100644
--- a/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/StageStateTest.java
+++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/StageStateTest.java
@@ -15,9 +15,15 @@
*/
package io.flamingock.internal.core.pipeline.run;
+import io.flamingock.internal.common.core.recovery.RecoveryIssue;
import io.flamingock.internal.common.core.response.data.ErrorInfo;
+import io.flamingock.internal.common.core.response.data.StageState;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -44,11 +50,43 @@ void completedIsNotFailedAndHasNoErrorInfo() {
@Test
void failedCarriesItsErrorInfo() {
- ErrorInfo info = new ErrorInfo("RuntimeException", "boom", "change-1", "stage-1");
+ ErrorInfo info = new ErrorInfo("RuntimeException", "boom", java.util.Collections.singletonList("change-1"), "stage-1");
StageState state = StageState.failed(info);
assertTrue(state.isFailed());
assertTrue(state.getErrorInfo().isPresent());
assertSame(info, state.getErrorInfo().get());
}
+
+ @Test
+ void blockedForMICarriesSynthesizedErrorInfoAndRecoveryIssues() {
+ List issues = Arrays.asList(new RecoveryIssue("c1"), new RecoveryIssue("c2"));
+ StageState state = StageState.blockedManualIntervention("stage-1", issues);
+
+ assertTrue(state.isBlockedForManualIntervention());
+ assertTrue(state.isFailed()); // inherited from Failed
+
+ // ErrorInfo is now synthesized at construction (inherited surface from Failed).
+ assertTrue(state.getErrorInfo().isPresent());
+ ErrorInfo info = state.getErrorInfo().get();
+ assertEquals("MANUAL_INTERVENTION_REQUIRED", info.getErrorType());
+ assertEquals("stage-1", info.getStageId());
+ // Change IDs are now structured in the ErrorInfo.changeIds list.
+ assertEquals(Arrays.asList("c1", "c2"), info.getChangeIds());
+
+ // Recovery issues are still available for callers that want the richer detail
+ // (today equivalent to changeIds, but the type carries more shape for future evolution).
+ assertEquals(2, state.getRecoveryIssues().size());
+ assertEquals("c1", state.getRecoveryIssues().get(0).getChangeId());
+ assertEquals("c2", state.getRecoveryIssues().get(1).getChangeId());
+ }
+
+ @Test
+ void nonBlockedStatesAreNotBlockedForMI() {
+ assertFalse(StageState.NOT_STARTED.isBlockedForManualIntervention());
+ assertFalse(StageState.STARTED.isBlockedForManualIntervention());
+ assertFalse(StageState.COMPLETED.isBlockedForManualIntervention());
+ assertFalse(StageState.failed(new ErrorInfo("E", "m", java.util.Collections.singletonList("c"), "s"))
+ .isBlockedForManualIntervention());
+ }
}