diff --git a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/recovery/RecoveryIssue.java b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/recovery/RecoveryIssue.java index 12e267c4d..7414f3fe1 100644 --- a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/recovery/RecoveryIssue.java +++ b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/recovery/RecoveryIssue.java @@ -15,16 +15,20 @@ */ package io.flamingock.internal.common.core.recovery; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + /** * Represents a recovery issue that requires manual intervention during pipeline execution. * This domain object is specifically designed for recovery scenarios and avoids the * inappropriate use of audit-specific objects for recovery purposes. */ public class RecoveryIssue { - + private final String changeId; - - public RecoveryIssue(String changeId) { + + @JsonCreator + public RecoveryIssue(@JsonProperty("changeId") String changeId) { this.changeId = changeId; } diff --git a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/ErrorInfo.java b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/ErrorInfo.java index 6cdc3abcc..560afdd6f 100644 --- a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/ErrorInfo.java +++ b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/ErrorInfo.java @@ -15,23 +15,33 @@ */ package io.flamingock.internal.common.core.response.data; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + /** * Contains error information when an execution fails. + * + *

{@code changeIds} carries the change identifiers associated with the error. It may be + * empty (e.g., lock or validate-time failures with no specific change), a single id (most + * stage-level failures), or several (e.g., a stage blocked by manual intervention because + * multiple changes need recovery). */ public class ErrorInfo { private String errorType; private String message; - private String changeId; + private List changeIds; private String stageId; public ErrorInfo() { + this.changeIds = new ArrayList<>(); } - public ErrorInfo(String errorType, String message, String changeId, String stageId) { + public ErrorInfo(String errorType, String message, List changeIds, String stageId) { this.errorType = errorType; this.message = message; - this.changeId = changeId; + this.changeIds = changeIds != null ? changeIds : new ArrayList<>(); this.stageId = stageId; } @@ -51,12 +61,12 @@ public void setMessage(String message) { this.message = message; } - public String getChangeId() { - return changeId; + public List getChangeIds() { + return changeIds; } - public void setChangeId(String changeId) { - this.changeId = changeId; + public void setChangeIds(List changeIds) { + this.changeIds = changeIds != null ? changeIds : new ArrayList<>(); } public String getStageId() { @@ -68,11 +78,13 @@ public void setStageId(String stageId) { } /** - * Creates an ErrorInfo from a Throwable. + * Creates an ErrorInfo from a Throwable. Pass {@link Collections#emptyList()} when no + * specific change is associated with the failure, or {@link Collections#singletonList(Object)} + * for single-change cases. */ - public static ErrorInfo fromThrowable(Throwable throwable, String changeId, String stageId) { + public static ErrorInfo fromThrowable(Throwable throwable, List changeIds, String stageId) { String errorType = throwable.getClass().getSimpleName(); String message = throwable.getMessage(); - return new ErrorInfo(errorType, message, changeId, stageId); + return new ErrorInfo(errorType, message, changeIds, stageId); } } diff --git a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/StageResult.java b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/StageResult.java index 66cd383c9..a66bb34d6 100644 --- a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/StageResult.java +++ b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/StageResult.java @@ -25,18 +25,19 @@ public class StageResult { private String stageId; private String stageName; - private StageStatus status; + private StageState state; private long durationMs; private List changes; public StageResult() { this.changes = new ArrayList<>(); + this.state = StageState.NOT_STARTED; } private StageResult(Builder builder) { this.stageId = builder.stageId; this.stageName = builder.stageName; - this.status = builder.status; + this.state = builder.state != null ? builder.state : StageState.NOT_STARTED; this.durationMs = builder.durationMs; this.changes = builder.changes != null ? builder.changes : new ArrayList<>(); } @@ -57,12 +58,12 @@ public void setStageName(String stageName) { this.stageName = stageName; } - public StageStatus getStatus() { - return status; + public StageState getState() { + return state; } - public void setStatus(StageStatus status) { - this.status = status; + public void setState(StageState state) { + this.state = state; } public long getDurationMs() { @@ -82,11 +83,11 @@ public void setChanges(List changes) { } public boolean isFailed() { - return status == StageStatus.FAILED; + return state.isFailed(); } public boolean isCompleted() { - return status == StageStatus.COMPLETED; + return state.isCompleted(); } public int getAppliedCount() { @@ -111,10 +112,23 @@ public static Builder builder() { return new Builder(); } + /** + * Creates a builder pre-populated from an existing result (useful for transitions that + * preserve identity fields and tweak the state). + */ + public static Builder builder(StageResult source) { + return new Builder() + .stageId(source.stageId) + .stageName(source.stageName) + .state(source.state) + .durationMs(source.durationMs) + .changes(new ArrayList<>(source.changes)); + } + public static class Builder { private String stageId; private String stageName; - private StageStatus status; + private StageState state; private long durationMs; private List changes = new ArrayList<>(); @@ -128,8 +142,8 @@ public Builder stageName(String stageName) { return this; } - public Builder status(StageStatus status) { - this.status = status; + public Builder state(StageState state) { + this.state = state; return this; } diff --git a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/StageState.java b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/StageState.java new file mode 100644 index 000000000..e331b8131 --- /dev/null +++ b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/StageState.java @@ -0,0 +1,159 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.internal.common.core.response.data; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.flamingock.internal.common.core.recovery.RecoveryIssue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = StageState.NotStarted.class, name = "NOT_STARTED"), + @JsonSubTypes.Type(value = StageState.Started.class, name = "STARTED"), + @JsonSubTypes.Type(value = StageState.Completed.class, name = "COMPLETED"), + @JsonSubTypes.Type(value = StageState.Failed.class, name = "FAILED"), + @JsonSubTypes.Type(value = StageState.BlockedForMI.class, name = "BLOCKED_MANUAL_INTERVENTION") +}) +public abstract class StageState { + + public static final StageState NOT_STARTED = new NotStarted(); + public static final StageState STARTED = new Started(); + public static final StageState COMPLETED = new Completed(); + + public static StageState failed(ErrorInfo info) { + return new Failed(info); + } + + public static StageState blockedManualIntervention(String stageName, List issues) { + List changeIds = issues.stream() + .map(RecoveryIssue::getChangeId) + .collect(Collectors.toList()); + ErrorInfo errorInfo = new ErrorInfo( + "MANUAL_INTERVENTION_REQUIRED", + "Manual intervention required", + changeIds, + stageName + ); + return new BlockedForMI(errorInfo, issues); + } + + StageState() { + } + + public boolean isNotStarted() { + return false; + } + + public boolean isStarted() { + return false; + } + + public boolean isCompleted() { + return false; + } + + public boolean isFailed() { + return false; + } + + public boolean isBlockedForManualIntervention() { + return false; + } + + public Optional getErrorInfo() { + return Optional.empty(); + } + + public List getRecoveryIssues() { + return Collections.emptyList(); + } + + static final class NotStarted extends StageState { + @Override + public boolean isNotStarted() { + return true; + } + } + + static final class Started extends StageState { + @Override + public boolean isStarted() { + return true; + } + } + + static final class Completed extends StageState { + @Override + public boolean isCompleted() { + return true; + } + } + + static class Failed extends StageState { + private final ErrorInfo errorInfo; + + @JsonCreator + Failed(@JsonProperty("errorInfo") ErrorInfo errorInfo) { + this.errorInfo = errorInfo; + } + + @Override + public boolean isFailed() { + return true; + } + + @Override + public Optional getErrorInfo() { + return Optional.ofNullable(errorInfo); + } + + @JsonProperty("errorInfo") + ErrorInfo serialisedErrorInfo() { + return errorInfo; + } + } + + static final class BlockedForMI extends Failed { + private final List recoveryIssues; + + @JsonCreator + BlockedForMI( + @JsonProperty("errorInfo") ErrorInfo errorInfo, + @JsonProperty("recoveryIssues") List recoveryIssues) { + super(errorInfo); + this.recoveryIssues = Collections.unmodifiableList(new ArrayList<>(recoveryIssues)); + } + + @Override + public boolean isBlockedForManualIntervention() { + return true; + } + + @Override + @JsonProperty("recoveryIssues") + public List getRecoveryIssues() { + return recoveryIssues; + } + } +} diff --git a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/StageStatus.java b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/StageStatus.java deleted file mode 100644 index 7367ce6df..000000000 --- a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/StageStatus.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2026 Flamingock (https://www.flamingock.io) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.flamingock.internal.common.core.response.data; - -/** - * Status of a stage execution. - */ -public enum StageStatus { - /** - * Stage completed successfully with all changes applied. - */ - COMPLETED, - - /** - * Stage failed during execution. - */ - FAILED, - - /** - * Stage was skipped (all changes already applied). - */ - SKIPPED, - - /** - * Stage was not started (due to prior failure). - */ - NOT_STARTED -} diff --git a/core/flamingock-core-commons/src/test/java/io/flamingock/internal/common/core/response/ResponseSerializationTest.java b/core/flamingock-core-commons/src/test/java/io/flamingock/internal/common/core/response/ResponseSerializationTest.java index acc4d1367..8efcaa1eb 100644 --- a/core/flamingock-core-commons/src/test/java/io/flamingock/internal/common/core/response/ResponseSerializationTest.java +++ b/core/flamingock-core-commons/src/test/java/io/flamingock/internal/common/core/response/ResponseSerializationTest.java @@ -23,7 +23,7 @@ import io.flamingock.internal.common.core.response.data.ExecuteResponseData; import io.flamingock.internal.common.core.response.data.ExecutionStatus; import io.flamingock.internal.common.core.response.data.StageResult; -import io.flamingock.internal.common.core.response.data.StageStatus; +import io.flamingock.internal.common.core.response.data.StageState; import io.flamingock.internal.util.JsonObjectMapper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -71,7 +71,7 @@ void shouldSerializeAndDeserializeExecuteResponseData() throws Exception { .addStage(StageResult.builder() .stageId("stage-1") .stageName("Stage One") - .status(StageStatus.COMPLETED) + .state(StageState.COMPLETED) .durationMs(500) .addChange(ChangeResult.builder() .changeId("change-001") @@ -97,7 +97,7 @@ void shouldSerializeAndDeserializeExecuteResponseData() throws Exception { assertEquals(0, deserialized.getFailedChanges()); assertEquals(1, deserialized.getStages().size()); assertEquals("stage-1", deserialized.getStages().get(0).getStageId()); - assertEquals(StageStatus.COMPLETED, deserialized.getStages().get(0).getStatus()); + assertTrue(deserialized.getStages().get(0).getState().isCompleted()); assertEquals(1, deserialized.getStages().get(0).getChanges().size()); assertEquals("change-001", deserialized.getStages().get(0).getChanges().get(0).getChangeId()); } diff --git a/core/flamingock-core-commons/src/test/java/io/flamingock/internal/common/core/response/data/StageStateSerializationTest.java b/core/flamingock-core-commons/src/test/java/io/flamingock/internal/common/core/response/data/StageStateSerializationTest.java new file mode 100644 index 000000000..09091064a --- /dev/null +++ b/core/flamingock-core-commons/src/test/java/io/flamingock/internal/common/core/response/data/StageStateSerializationTest.java @@ -0,0 +1,120 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.internal.common.core.response.data; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.flamingock.internal.common.core.recovery.RecoveryIssue; +import io.flamingock.internal.util.JsonObjectMapper; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class StageStateSerializationTest { + + private final ObjectMapper mapper = JsonObjectMapper.DEFAULT_INSTANCE.copy(); + + @Test + void roundTripNotStarted() throws Exception { + String json = mapper.writeValueAsString(StageState.NOT_STARTED); + assertTrue(json.contains("\"type\":\"NOT_STARTED\"")); + + StageState back = mapper.readValue(json, StageState.class); + assertTrue(back.isNotStarted()); + assertFalse(back.isFailed()); + } + + @Test + void roundTripStarted() throws Exception { + String json = mapper.writeValueAsString(StageState.STARTED); + assertTrue(json.contains("\"type\":\"STARTED\"")); + + StageState back = mapper.readValue(json, StageState.class); + assertTrue(back.isStarted()); + } + + @Test + void roundTripCompleted() throws Exception { + String json = mapper.writeValueAsString(StageState.COMPLETED); + assertTrue(json.contains("\"type\":\"COMPLETED\"")); + + StageState back = mapper.readValue(json, StageState.class); + assertTrue(back.isCompleted()); + } + + @Test + void roundTripFailed() throws Exception { + ErrorInfo info = new ErrorInfo("RuntimeException", "boom", java.util.Collections.singletonList("change-1"), "stage-1"); + StageState failed = StageState.failed(info); + + String json = mapper.writeValueAsString(failed); + assertTrue(json.contains("\"type\":\"FAILED\"")); + assertTrue(json.contains("\"errorInfo\"")); + + StageState back = mapper.readValue(json, StageState.class); + assertTrue(back.isFailed()); + ErrorInfo backInfo = back.getErrorInfo().orElseThrow(AssertionError::new); + assertEquals("RuntimeException", backInfo.getErrorType()); + assertEquals("boom", backInfo.getMessage()); + assertEquals(java.util.Collections.singletonList("change-1"), backInfo.getChangeIds()); + assertEquals("stage-1", backInfo.getStageId()); + } + + @Test + void roundTripBlockedForMI() throws Exception { + List issues = Arrays.asList(new RecoveryIssue("c1"), new RecoveryIssue("c2")); + StageState blocked = StageState.blockedManualIntervention("stage-1", issues); + + String json = mapper.writeValueAsString(blocked); + assertTrue(json.contains("\"type\":\"BLOCKED_MANUAL_INTERVENTION\"")); + assertTrue(json.contains("\"errorInfo\"")); // inherited from Failed + assertTrue(json.contains("\"recoveryIssues\"")); + + StageState back = mapper.readValue(json, StageState.class); + assertTrue(back.isBlockedForManualIntervention()); + assertTrue(back.isFailed()); + // ErrorInfo round-tripped + ErrorInfo info = back.getErrorInfo().orElseThrow(AssertionError::new); + assertEquals("MANUAL_INTERVENTION_REQUIRED", info.getErrorType()); + assertEquals("stage-1", info.getStageId()); + // Recovery issues round-tripped + assertEquals(2, back.getRecoveryIssues().size()); + assertEquals("c1", back.getRecoveryIssues().get(0).getChangeId()); + assertEquals("c2", back.getRecoveryIssues().get(1).getChangeId()); + } + + @Test + void roundTripStageResultWithFailedState() throws Exception { + ErrorInfo info = new ErrorInfo("RuntimeException", "boom", java.util.Collections.singletonList("change-1"), "stage-1"); + StageResult result = StageResult.builder() + .stageId("stage-1") + .stageName("stage-1") + .state(StageState.failed(info)) + .durationMs(123) + .build(); + + String json = mapper.writeValueAsString(result); + StageResult back = mapper.readValue(json, StageResult.class); + + assertEquals("stage-1", back.getStageId()); + assertTrue(back.getState().isFailed()); + assertEquals("boom", back.getState().getErrorInfo().orElseThrow(AssertionError::new).getMessage()); + } +} diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperation.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperation.java index 6ed835bb2..c9a600f92 100644 --- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperation.java +++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperation.java @@ -141,7 +141,7 @@ private ExecuteResponseData execute(LoadedPipeline pipeline) throws FlamingockEx break; } } catch (LockException exception) { - + pipelineRun.markPipelineFailed(exception); eventPublisher.publish(new StageFailedEvent(exception)); eventPublisher.publish(new PipelineFailedEvent(exception)); if (throwExceptionIfCannotObtainLock) { @@ -165,6 +165,20 @@ private ExecuteResponseData execute(LoadedPipeline pipeline) throws FlamingockEx } pipelineRun.stop(); throw OperationException.fromExisting(e.getCause(), pipelineRun.toResponse()); + } catch (ManualInterventionRequiredException miEx) { + // Per-stage attribution: each RecoveryIssue is mapped to its owning stage and that + // stage's state becomes BLOCKED_MANUAL_INTERVENTION with the subset of issues + // affecting it. The blocked stages carry their own ErrorInfo, so no pipeline-level + // mark is needed. + pipelineRun.markStagesBlockedFromMI(miEx); + throw miEx; + } catch (FlamingockException e) { + // Validate-time exceptions (aborted FlamingockException, PendingChangesException) + // and any FlamingockException that bubbles up from runStage's generic-throwable + // handler. Stage-level failures (if any) have already been recorded; pipeline-level + // marking is idempotent (first-wins) and toResponse() keeps the stage error as primary. + pipelineRun.markPipelineFailed(e); + throw e; } } while (true); @@ -188,6 +202,7 @@ private void runStage(String executionId, Lock lock, ExecutableStage executableS eventPublisher.publish(new PipelineFailedEvent(exception)); throw exception; } catch (Throwable generalException) { + pipelineRun.markStageFailed(executableStage.getName(), generalException); throw processAndGetFlamingockException(generalException); } } @@ -203,7 +218,7 @@ private void startStage(String executionId, Lock lock, ExecutableStage executabl eventPublisher.publish(new StageCompletedEvent(executionOutput)); } - private FlamingockException processAndGetFlamingockException(Throwable exception) throws FlamingockException { +private FlamingockException processAndGetFlamingockException(Throwable exception) throws FlamingockException { FlamingockException flamingockException; if (exception instanceof OperationException) { OperationException pipelineException = (OperationException) exception; diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/result/StageResultBuilder.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/result/StageResultBuilder.java index 197577c35..fe50bdcad 100644 --- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/result/StageResultBuilder.java +++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/result/StageResultBuilder.java @@ -16,8 +16,9 @@ package io.flamingock.internal.core.operation.result; import io.flamingock.internal.common.core.response.data.ChangeResult; +import io.flamingock.internal.common.core.response.data.ErrorInfo; import io.flamingock.internal.common.core.response.data.StageResult; -import io.flamingock.internal.common.core.response.data.StageStatus; +import io.flamingock.internal.common.core.response.data.StageState; import java.time.Duration; import java.time.LocalDateTime; @@ -31,7 +32,7 @@ public class StageResultBuilder { private String stageId; private String stageName; - private StageStatus status; + private StageState state; private long durationMs; private List changes = new ArrayList<>(); private LocalDateTime startTime; @@ -61,28 +62,28 @@ public StageResultBuilder stopTimer() { return this; } - public StageResultBuilder status(StageStatus status) { - this.status = status; + public StageResultBuilder state(StageState state) { + this.state = state; return this; } public StageResultBuilder completed() { - this.status = StageStatus.COMPLETED; + this.state = StageState.COMPLETED; return this; } public StageResultBuilder failed() { - this.status = StageStatus.FAILED; + this.state = StageState.failed(null); return this; } - public StageResultBuilder skipped() { - this.status = StageStatus.SKIPPED; + public StageResultBuilder failed(ErrorInfo errorInfo) { + this.state = StageState.failed(errorInfo); return this; } public StageResultBuilder notStarted() { - this.status = StageStatus.NOT_STARTED; + this.state = StageState.NOT_STARTED; return this; } @@ -109,7 +110,7 @@ public StageResult build() { return StageResult.builder() .stageId(stageId) .stageName(stageName) - .status(status) + .state(state) .durationMs(durationMs) .changes(changes) .build(); diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/PipelineRun.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/PipelineRun.java index 49d2fc08c..e161c5a9b 100644 --- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/PipelineRun.java +++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/PipelineRun.java @@ -15,13 +15,16 @@ */ package io.flamingock.internal.core.pipeline.run; +import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException; +import io.flamingock.internal.common.core.recovery.RecoveryIssue; import io.flamingock.internal.common.core.response.data.ChangeResult; import io.flamingock.internal.common.core.response.data.ChangeStatus; import io.flamingock.internal.common.core.response.data.ErrorInfo; import io.flamingock.internal.common.core.response.data.ExecuteResponseData; import io.flamingock.internal.common.core.response.data.ExecutionStatus; import io.flamingock.internal.common.core.response.data.StageResult; -import io.flamingock.internal.common.core.response.data.StageStatus; +import io.flamingock.internal.common.core.response.data.StageState; +import io.flamingock.internal.core.change.loaded.AbstractLoadedChange; import io.flamingock.internal.core.pipeline.execution.StageExecutionException; import io.flamingock.internal.core.pipeline.loaded.LoadedPipeline; import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage; @@ -33,6 +36,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; public class PipelineRun { @@ -56,6 +60,7 @@ public static PipelineRun of(List stages) { private final Map byName; private Instant startedAt; private Instant stoppedAt; + private ErrorInfo pipelineError; private PipelineRun(List stageRuns) { this.stageRuns = Collections.unmodifiableList(stageRuns); @@ -89,25 +94,90 @@ public void stop() { } public void markStageStarted(String stageName) { - lookupOrThrow(stageName).setState(StageState.STARTED); + StageRun run = lookupOrThrow(stageName); + run.setResult(StageResult.builder(run.getResult()) + .state(StageState.STARTED) + .build()); } public void markStageCompleted(String stageName, StageResult result) { - StageRun run = lookupOrThrow(stageName); - run.setState(StageState.COMPLETED); - run.setResult(result); + // Incoming result from the executor already carries state=COMPLETED. + lookupOrThrow(stageName).setResult(result); } public void markStageFailed(String stageName, StageExecutionException exception) { - StageResult result = exception.getResult(); + StageResult resultFromExecutor = exception.getResult(); ErrorInfo errorInfo = ErrorInfo.fromThrowable( exception.getCause(), - exception.getFailedChangeId(), - result.getStageId() + Collections.singletonList(exception.getFailedChangeId()), + resultFromExecutor.getStageId() ); StageRun run = lookupOrThrow(stageName); - run.setState(StageState.failed(errorInfo)); - run.setResult(result); + run.setResult(StageResult.builder(resultFromExecutor) + .state(StageState.failed(errorInfo)) + .build()); + } + + /** + * Stage-scope failure for non-{@link StageExecutionException} Throwables (no enriched + * {@link StageResult} from the executor; we rebuild the existing one with a Failed state). + */ + public void markStageFailed(String stageName, Throwable cause) { + StageRun run = lookupOrThrow(stageName); + ErrorInfo errorInfo = ErrorInfo.fromThrowable(cause, Collections.emptyList(), stageName); + run.setResult(StageResult.builder(run.getResult()) + .state(StageState.failed(errorInfo)) + .build()); + } + + /** + * Attributes a {@link ManualInterventionRequiredException} to its owning stages. Each + * {@link RecoveryIssue} is resolved to the stage that owns the change, and that stage's + * state becomes {@link StageState#blockedManualIntervention(String, List)} with the subset + * of issues that affect it. + * + * @throws IllegalStateException if any issue's changeId can't be resolved to a loaded + * stage (signals a planner/loaded-stage inconsistency). + */ + public void markStagesBlockedFromMI(ManualInterventionRequiredException exception) { + Map> issuesByStage = new LinkedHashMap<>(); + for (RecoveryIssue issue : exception.getConflictingChanges()) { + String stageName = findStageForChange(issue.getChangeId()); + issuesByStage.computeIfAbsent(stageName, k -> new ArrayList<>()).add(issue); + } + for (Map.Entry> entry : issuesByStage.entrySet()) { + StageRun run = lookupOrThrow(entry.getKey()); + run.setResult(StageResult.builder(run.getResult()) + .state(StageState.blockedManualIntervention(entry.getKey(), entry.getValue())) + .build()); + } + } + + private String findStageForChange(String changeId) { + for (StageRun stageRun : stageRuns) { + for (AbstractLoadedChange change : stageRun.getLoadedStage().getChanges()) { + if (changeId.equals(change.getId())) { + return stageRun.getName(); + } + } + } + throw new IllegalStateException( + "Cannot attribute manual-intervention issue: changeId '" + changeId + + "' is not part of any loaded stage in this run. " + + "This indicates a planner/loaded-stage inconsistency."); + } + + /** + * Pipeline-wide failure marker. Idempotent: first call wins. + */ + public void markPipelineFailed(Throwable cause) { + if (this.pipelineError == null) { + this.pipelineError = ErrorInfo.fromThrowable(cause, Collections.emptyList(), null); + } + } + + public Optional getPipelineError() { + return Optional.ofNullable(pipelineError); } private StageRun lookupOrThrow(String stageName) { @@ -131,17 +201,19 @@ public ExecuteResponseData toResponse() { boolean anyFailed = false; for (StageRun stageRun : stageRuns) { - StageResult stageResult = stageRun.getResult().orElse(null); - if (stageResult == null) { - continue; - } + StageResult stageResult = stageRun.getResult(); stages.add(stageResult); totalStages++; - if (stageResult.getStatus() == StageStatus.COMPLETED) { + StageState state = stageResult.getState(); + if (state.isCompleted()) { completedStages++; - } else if (stageResult.getStatus() == StageStatus.FAILED) { + } else if (state.isFailed()) { failedStages++; + anyFailed = true; + if (error == null) { + error = state.getErrorInfo().orElse(null); + } } if (stageResult.getChanges() != null) { @@ -156,16 +228,13 @@ public ExecuteResponseData toResponse() { } } } - - if (stageRun.getState().isFailed()) { - anyFailed = true; - if (error == null) { - error = stageRun.getState().getErrorInfo().orElse(null); - } - } } - ExecutionStatus status = anyFailed ? ExecutionStatus.FAILED : ExecutionStatus.SUCCESS; + if (error == null) { + error = this.pipelineError; + } + boolean pipelineFailed = anyFailed || this.pipelineError != null; + ExecutionStatus status = pipelineFailed ? ExecutionStatus.FAILED : ExecutionStatus.SUCCESS; long durationMs = (startedAt != null && stoppedAt != null) ? Duration.between(startedAt, stoppedAt).toMillis() : 0L; diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageRun.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageRun.java index 5566070f5..a196dcc5e 100644 --- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageRun.java +++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageRun.java @@ -16,18 +16,21 @@ package io.flamingock.internal.core.pipeline.run; import io.flamingock.internal.common.core.response.data.StageResult; +import io.flamingock.internal.common.core.response.data.StageState; import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage; -import java.util.Optional; - public class StageRun { private final AbstractLoadedStage loadedStage; - private StageState state = StageState.NOT_STARTED; private StageResult result; public StageRun(AbstractLoadedStage loadedStage) { this.loadedStage = loadedStage; + this.result = StageResult.builder() + .stageId(loadedStage.getName()) + .stageName(loadedStage.getName()) + .state(StageState.NOT_STARTED) + .build(); } public String getName() { @@ -38,16 +41,12 @@ public AbstractLoadedStage getLoadedStage() { return loadedStage; } - public StageState getState() { - return state; + public StageResult getResult() { + return result; } - public Optional getResult() { - return Optional.ofNullable(result); - } - - void setState(StageState state) { - this.state = state; + public StageState getState() { + return result.getState(); } void setResult(StageResult result) { diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageState.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageState.java deleted file mode 100644 index f2178cd41..000000000 --- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageState.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright 2026 Flamingock (https://www.flamingock.io) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.flamingock.internal.core.pipeline.run; - -import io.flamingock.internal.common.core.response.data.ErrorInfo; - -import java.util.Optional; - -public abstract class StageState { - - public static final StageState NOT_STARTED = new NotStarted(); - public static final StageState STARTED = new Started(); - public static final StageState COMPLETED = new Completed(); - - public static StageState failed(ErrorInfo info) { - return new Failed(info); - } - - private StageState() { - } - - public boolean isFailed() { - return false; - } - - public Optional getErrorInfo() { - return Optional.empty(); - } - - private static final class NotStarted extends StageState { - } - - private static final class Started extends StageState { - } - - private static final class Completed extends StageState { - } - - private static final class Failed extends StageState { - private final ErrorInfo info; - - Failed(ErrorInfo info) { - this.info = info; - } - - @Override - public boolean isFailed() { - return true; - } - - @Override - public Optional getErrorInfo() { - return Optional.of(info); - } - } -} diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperationTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperationTest.java index 6770a4547..464a39934 100644 --- a/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperationTest.java +++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperationTest.java @@ -18,6 +18,7 @@ import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException; import io.flamingock.internal.common.core.recovery.action.ChangeAction; import io.flamingock.internal.core.change.executable.ExecutableChange; +import io.flamingock.internal.core.change.loaded.AbstractLoadedChange; import io.flamingock.internal.core.event.EventPublisher; import io.flamingock.internal.core.operation.execute.ExecuteApplyOperation; import io.flamingock.internal.core.operation.execute.ExecuteArgs; @@ -51,7 +52,7 @@ void shouldThrowManualInterventionWhenPlannerReturnsAbort() { ExecutionPlanner planner = mock(ExecutionPlanner.class); when(planner.getNextExecution(any(PipelineRun.class))).thenReturn(abortPlan); - LoadedPipeline pipeline = mockPipeline(); + LoadedPipeline pipeline = mockPipeline("change-1"); ExecuteApplyOperation operation = buildOperation(planner); ManualInterventionRequiredException ex = assertThrows( @@ -95,9 +96,19 @@ private static ExecuteApplyOperation buildOperation(ExecutionPlanner planner, St ); } - private static LoadedPipeline mockPipeline() { + private static LoadedPipeline mockPipeline(String... loadedChangeIds) { + AbstractLoadedStage loadedStage = mock(AbstractLoadedStage.class); + when(loadedStage.getName()).thenReturn("stage-1"); + java.util.List loadedChanges = new java.util.ArrayList<>(); + for (String id : loadedChangeIds) { + AbstractLoadedChange ch = mock(AbstractLoadedChange.class); + when(ch.getId()).thenReturn(id); + loadedChanges.add(ch); + } + when(loadedStage.getChanges()).thenReturn(loadedChanges); + LoadedPipeline pipeline = mock(LoadedPipeline.class); - when(pipeline.getStages()).thenReturn(Collections.singletonList(mock(AbstractLoadedStage.class))); + when(pipeline.getStages()).thenReturn(Collections.singletonList(loadedStage)); when(pipeline.getSystemStage()).thenReturn(java.util.Optional.empty()); doNothing().when(pipeline).validate(); return pipeline; diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/ExecuteApplyOperationTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/ExecuteApplyOperationTest.java index bef3b7ccb..aea58b6c8 100644 --- a/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/ExecuteApplyOperationTest.java +++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/ExecuteApplyOperationTest.java @@ -19,7 +19,7 @@ import io.flamingock.internal.common.core.response.data.ChangeStatus; import io.flamingock.internal.common.core.response.data.ExecutionStatus; import io.flamingock.internal.common.core.response.data.StageResult; -import io.flamingock.internal.common.core.response.data.StageStatus; +import io.flamingock.internal.common.core.response.data.StageState; import io.flamingock.internal.core.event.EventPublisher; import io.flamingock.internal.core.operation.execute.ExecuteApplyOperation; import io.flamingock.internal.core.operation.execute.ExecuteArgs; @@ -173,7 +173,7 @@ private StageResult createSuccessStageResult(String stageId, int applied, int sk StageResult.Builder builder = StageResult.builder() .stageId(stageId) .stageName(stageId) - .status(StageStatus.COMPLETED) + .state(StageState.COMPLETED) .durationMs(100); for (int i = 0; i < applied; i++) { @@ -198,7 +198,7 @@ private StageResult createFailedStageResult(String stageId, String failedChangeI return StageResult.builder() .stageId(stageId) .stageName(stageId) - .status(StageStatus.FAILED) + .state(StageState.failed(null)) .durationMs(50) .addChange(ChangeResult.builder() .changeId(failedChangeId) diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunTest.java index f1d3fed65..6c0fb3c81 100644 --- a/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunTest.java +++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunTest.java @@ -15,9 +15,12 @@ */ package io.flamingock.internal.core.pipeline.run; +import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException; +import io.flamingock.internal.common.core.recovery.RecoveryIssue; import io.flamingock.internal.common.core.response.data.ErrorInfo; import io.flamingock.internal.common.core.response.data.StageResult; -import io.flamingock.internal.common.core.response.data.StageStatus; +import io.flamingock.internal.common.core.response.data.StageState; +import io.flamingock.internal.core.change.loaded.AbstractLoadedChange; import io.flamingock.internal.core.pipeline.execution.StageExecutionException; import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage; import org.junit.jupiter.api.Test; @@ -26,6 +29,7 @@ import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -68,7 +72,7 @@ void markStageFailedDerivesErrorInfoFromException() { StageResult stageResult = StageResult.builder() .stageId("alpha") .stageName("alpha") - .status(StageStatus.FAILED) + .state(StageState.failed(null)) .build(); RuntimeException cause = new RuntimeException("boom"); StageExecutionException exception = StageExecutionException.fromResult(cause, stageResult, "change-1"); @@ -79,7 +83,7 @@ void markStageFailedDerivesErrorInfoFromException() { StageState state = pipelineRun.getStageRun("alpha").getState(); assertTrue(state.isFailed()); ErrorInfo info = state.getErrorInfo().get(); - assertEquals("change-1", info.getChangeId()); + assertEquals(java.util.Collections.singletonList("change-1"), info.getChangeIds()); assertEquals("alpha", info.getStageId()); } @@ -101,6 +105,93 @@ void getStageRunReturnsNullForUnknownStage() { assertNull(pipelineRun.getStageRun("missing")); } + @Test + void markStageFailedWithThrowableSynthesisesErrorInfo() { + AbstractLoadedStage a = mockStage("alpha"); + PipelineRun pipelineRun = PipelineRun.of(java.util.Collections.singletonList(a)); + + RuntimeException cause = new RuntimeException("boom"); + pipelineRun.markStageFailed("alpha", cause); + + StageState state = pipelineRun.getStageRun("alpha").getState(); + assertTrue(state.isFailed()); + ErrorInfo info = state.getErrorInfo().get(); + assertEquals("alpha", info.getStageId()); + assertEquals("RuntimeException", info.getErrorType()); + assertEquals("boom", info.getMessage()); + } + + @Test + void markPipelineFailedIsIdempotentFirstWins() { + AbstractLoadedStage a = mockStage("alpha"); + PipelineRun pipelineRun = PipelineRun.of(java.util.Collections.singletonList(a)); + + pipelineRun.markPipelineFailed(new RuntimeException("first")); + pipelineRun.markPipelineFailed(new IllegalStateException("second")); + + ErrorInfo info = pipelineRun.getPipelineError().get(); + assertEquals("RuntimeException", info.getErrorType()); + assertEquals("first", info.getMessage()); + } + + @Test + void getPipelineErrorIsEmptyUntilMarked() { + AbstractLoadedStage a = mockStage("alpha"); + PipelineRun pipelineRun = PipelineRun.of(java.util.Collections.singletonList(a)); + + assertFalse(pipelineRun.getPipelineError().isPresent()); + } + + @Test + void markStagesBlockedFromMIGroupsIssuesByOwningStage() { + AbstractLoadedChange alphaChange1 = mockChange("alpha-c1"); + AbstractLoadedChange alphaChange2 = mockChange("alpha-c2"); + AbstractLoadedChange betaChange = mockChange("beta-c1"); + + AbstractLoadedStage alpha = mockStageWithChanges("alpha", alphaChange1, alphaChange2); + AbstractLoadedStage beta = mockStageWithChanges("beta", betaChange); + AbstractLoadedStage gamma = mockStageWithChanges("gamma"); // no MI'd changes + + PipelineRun pipelineRun = PipelineRun.of(Arrays.asList(alpha, beta, gamma)); + + List issues = Arrays.asList( + new RecoveryIssue("alpha-c1"), + new RecoveryIssue("alpha-c2"), + new RecoveryIssue("beta-c1")); + ManualInterventionRequiredException miEx = new ManualInterventionRequiredException(issues, "alpha"); + + pipelineRun.markStagesBlockedFromMI(miEx); + + StageState alphaState = pipelineRun.getStageRun("alpha").getState(); + assertTrue(alphaState.isBlockedForManualIntervention()); + assertEquals(2, alphaState.getRecoveryIssues().size()); + + StageState betaState = pipelineRun.getStageRun("beta").getState(); + assertTrue(betaState.isBlockedForManualIntervention()); + assertEquals(1, betaState.getRecoveryIssues().size()); + assertEquals("beta-c1", betaState.getRecoveryIssues().get(0).getChangeId()); + + // No MI'd changes for gamma → stays NOT_STARTED. + assertSame(StageState.NOT_STARTED, pipelineRun.getStageRun("gamma").getState()); + } + + @Test + void markStagesBlockedFromMIThrowsForUnknownChange() { + AbstractLoadedStage alpha = mockStageWithChanges("alpha", mockChange("alpha-c1")); + PipelineRun pipelineRun = PipelineRun.of(java.util.Collections.singletonList(alpha)); + + ManualInterventionRequiredException miEx = new ManualInterventionRequiredException( + Arrays.asList(new RecoveryIssue("ghost-change")), "alpha"); + + IllegalStateException ex = assertThrows( + IllegalStateException.class, + () -> pipelineRun.markStagesBlockedFromMI(miEx)); + assertTrue(ex.getMessage().contains("ghost-change")); + + // No partial state — the alpha stage stays NOT_STARTED. + assertSame(StageState.NOT_STARTED, pipelineRun.getStageRun("alpha").getState()); + } + @Test void getLoadedStagesReturnsTheUnderlyingStagesInOrder() { AbstractLoadedStage a = mockStage("alpha"); @@ -119,4 +210,17 @@ private static AbstractLoadedStage mockStage(String name) { when(stage.getName()).thenReturn(name); return stage; } + + private static AbstractLoadedStage mockStageWithChanges(String name, AbstractLoadedChange... changes) { + AbstractLoadedStage stage = mock(AbstractLoadedStage.class); + when(stage.getName()).thenReturn(name); + when(stage.getChanges()).thenReturn(Arrays.asList(changes)); + return stage; + } + + private static AbstractLoadedChange mockChange(String id) { + AbstractLoadedChange change = mock(AbstractLoadedChange.class); + when(change.getId()).thenReturn(id); + return change; + } } diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunToResponseTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunToResponseTest.java index 352a790a4..26d7e2f71 100644 --- a/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunToResponseTest.java +++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunToResponseTest.java @@ -15,13 +15,16 @@ */ package io.flamingock.internal.core.pipeline.run; +import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException; +import io.flamingock.internal.common.core.recovery.RecoveryIssue; import io.flamingock.internal.common.core.response.data.ChangeResult; import io.flamingock.internal.common.core.response.data.ChangeStatus; import io.flamingock.internal.common.core.response.data.ErrorInfo; import io.flamingock.internal.common.core.response.data.ExecuteResponseData; import io.flamingock.internal.common.core.response.data.ExecutionStatus; import io.flamingock.internal.common.core.response.data.StageResult; -import io.flamingock.internal.common.core.response.data.StageStatus; +import io.flamingock.internal.common.core.response.data.StageState; +import io.flamingock.internal.core.change.loaded.AbstractLoadedChange; import io.flamingock.internal.core.pipeline.execution.StageExecutionException; import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage; import org.junit.jupiter.api.Test; @@ -31,6 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -103,28 +107,105 @@ void oneCompletedAndOneFailedYieldsFailedAndPipelineErrorMatchesStage() { assertEquals(1, response.getAppliedChanges()); assertEquals(1, response.getFailedChanges()); ErrorInfo error = response.getError(); - assertEquals("change-b1", error.getChangeId()); + assertEquals(java.util.Collections.singletonList("change-b1"), error.getChangeIds()); assertEquals("beta", error.getStageId()); assertEquals("RuntimeException", error.getErrorType()); assertEquals("boom", error.getMessage()); } @Test - void stagesNeverReachingTerminalStateAreNotReported() { + void pipelineFailedWithNoStageFailuresYieldsFailedAndPipelineError() { + AbstractLoadedStage stageA = mockStage("alpha"); + PipelineRun pipelineRun = PipelineRun.of(java.util.Collections.singletonList(stageA)); + + pipelineRun.start(); + pipelineRun.markPipelineFailed(new RuntimeException("lock not acquired")); + pipelineRun.stop(); + + ExecuteResponseData response = pipelineRun.toResponse(); + + assertEquals(ExecutionStatus.FAILED, response.getStatus()); + // Stage was NOT_STARTED — it appears in response.stages with state=NOT_STARTED, but + // it doesn't count toward failedStages/completedStages. + assertEquals(1, response.getTotalStages()); + assertEquals(0, response.getCompletedStages()); + assertEquals(0, response.getFailedStages()); + assertTrue(response.getStages().get(0).getState().isNotStarted()); + ErrorInfo error = response.getError(); + assertEquals("RuntimeException", error.getErrorType()); + assertEquals("lock not acquired", error.getMessage()); + } + + @Test + void stageErrorTakesPrecedenceOverPipelineError() { + AbstractLoadedStage stageA = mockStage("alpha"); + PipelineRun pipelineRun = PipelineRun.of(java.util.Collections.singletonList(stageA)); + + RuntimeException stageCause = new RuntimeException("stage boom"); + StageExecutionException stageException = StageExecutionException.fromResult( + stageCause, failedStageResult("alpha"), "change-1"); + + pipelineRun.start(); + pipelineRun.markStageFailed("alpha", stageException); + pipelineRun.markPipelineFailed(new IllegalStateException("late pipeline error")); + pipelineRun.stop(); + + ExecuteResponseData response = pipelineRun.toResponse(); + + assertEquals(ExecutionStatus.FAILED, response.getStatus()); + ErrorInfo error = response.getError(); + // Stage-level error wins over pipeline-level + assertEquals("RuntimeException", error.getErrorType()); + assertEquals("stage boom", error.getMessage()); + assertEquals("alpha", error.getStageId()); + } + + @Test + void blockedForMIStageIsTrackedInResponseWithBlockedState() { + AbstractLoadedChange c1 = mockChange("c1"); + AbstractLoadedStage alpha = mockStageWithChanges("alpha", c1); + PipelineRun pipelineRun = PipelineRun.of(java.util.Collections.singletonList(alpha)); + + pipelineRun.start(); + ManualInterventionRequiredException miEx = new ManualInterventionRequiredException( + java.util.Arrays.asList(new RecoveryIssue("c1")), "alpha"); + pipelineRun.markStagesBlockedFromMI(miEx); + pipelineRun.stop(); + + ExecuteResponseData response = pipelineRun.toResponse(); + + // Blocked stage appears in response.stages with its BlockedForMI state (carrying the + // RecoveryIssue list). It also contributes to failedStages because BlockedForMI.isFailed() + // returns true (preserves today's FAILED pipeline status). + assertEquals(1, response.getTotalStages()); + assertEquals(1, response.getStages().size()); + assertEquals(1, response.getFailedStages()); + assertTrue(response.getStages().get(0).getState().isBlockedForManualIntervention()); + assertEquals(1, response.getStages().get(0).getState().getRecoveryIssues().size()); + // Pipeline-level failure surfaces via pipelineError as today. + assertEquals(ExecutionStatus.FAILED, response.getStatus()); + assertNotNull(response.getError()); + } + + @Test + void notStartedStagesAreReportedInResponseWithNotStartedState() { AbstractLoadedStage stageA = mockStage("alpha"); AbstractLoadedStage stageB = mockStage("beta"); PipelineRun pipelineRun = PipelineRun.of(Arrays.asList(stageA, stageB)); pipelineRun.start(); pipelineRun.markStageCompleted("alpha", completedStageResult("alpha", 1, 0)); - // beta never advances past NOT_STARTED — no StageResult attached + // beta never advances past NOT_STARTED — still appears in the response. pipelineRun.stop(); ExecuteResponseData response = pipelineRun.toResponse(); - assertEquals(1, response.getTotalStages()); + assertEquals(2, response.getTotalStages()); assertEquals(1, response.getCompletedStages()); - assertEquals(1, response.getStages().size()); + assertEquals(0, response.getFailedStages()); + assertEquals(2, response.getStages().size()); + assertTrue(response.getStages().get(0).getState().isCompleted()); + assertTrue(response.getStages().get(1).getState().isNotStarted()); } private static AbstractLoadedStage mockStage(String name) { @@ -133,11 +214,24 @@ private static AbstractLoadedStage mockStage(String name) { return stage; } + private static AbstractLoadedStage mockStageWithChanges(String name, AbstractLoadedChange... changes) { + AbstractLoadedStage stage = mock(AbstractLoadedStage.class); + when(stage.getName()).thenReturn(name); + when(stage.getChanges()).thenReturn(Arrays.asList(changes)); + return stage; + } + + private static AbstractLoadedChange mockChange(String id) { + AbstractLoadedChange change = mock(AbstractLoadedChange.class); + when(change.getId()).thenReturn(id); + return change; + } + private static StageResult completedStageResult(String name, int applied, int skipped) { StageResult.Builder builder = StageResult.builder() .stageId(name) .stageName(name) - .status(StageStatus.COMPLETED); + .state(StageState.COMPLETED); for (int i = 0; i < applied; i++) { builder.addChange(ChangeResult.builder().changeId(name + "-a" + i).status(ChangeStatus.APPLIED).build()); } @@ -151,7 +245,7 @@ private static StageResult failedStageResult(String name) { return StageResult.builder() .stageId(name) .stageName(name) - .status(StageStatus.FAILED) + .state(StageState.failed(null)) .addChange(ChangeResult.builder().changeId(name + "-f0").status(ChangeStatus.FAILED).build()) .build(); } diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/StageStateTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/StageStateTest.java index dd70b6cd1..62e5b4919 100644 --- a/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/StageStateTest.java +++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/StageStateTest.java @@ -15,9 +15,15 @@ */ package io.flamingock.internal.core.pipeline.run; +import io.flamingock.internal.common.core.recovery.RecoveryIssue; import io.flamingock.internal.common.core.response.data.ErrorInfo; +import io.flamingock.internal.common.core.response.data.StageState; import org.junit.jupiter.api.Test; +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -44,11 +50,43 @@ void completedIsNotFailedAndHasNoErrorInfo() { @Test void failedCarriesItsErrorInfo() { - ErrorInfo info = new ErrorInfo("RuntimeException", "boom", "change-1", "stage-1"); + ErrorInfo info = new ErrorInfo("RuntimeException", "boom", java.util.Collections.singletonList("change-1"), "stage-1"); StageState state = StageState.failed(info); assertTrue(state.isFailed()); assertTrue(state.getErrorInfo().isPresent()); assertSame(info, state.getErrorInfo().get()); } + + @Test + void blockedForMICarriesSynthesizedErrorInfoAndRecoveryIssues() { + List issues = Arrays.asList(new RecoveryIssue("c1"), new RecoveryIssue("c2")); + StageState state = StageState.blockedManualIntervention("stage-1", issues); + + assertTrue(state.isBlockedForManualIntervention()); + assertTrue(state.isFailed()); // inherited from Failed + + // ErrorInfo is now synthesized at construction (inherited surface from Failed). + assertTrue(state.getErrorInfo().isPresent()); + ErrorInfo info = state.getErrorInfo().get(); + assertEquals("MANUAL_INTERVENTION_REQUIRED", info.getErrorType()); + assertEquals("stage-1", info.getStageId()); + // Change IDs are now structured in the ErrorInfo.changeIds list. + assertEquals(Arrays.asList("c1", "c2"), info.getChangeIds()); + + // Recovery issues are still available for callers that want the richer detail + // (today equivalent to changeIds, but the type carries more shape for future evolution). + assertEquals(2, state.getRecoveryIssues().size()); + assertEquals("c1", state.getRecoveryIssues().get(0).getChangeId()); + assertEquals("c2", state.getRecoveryIssues().get(1).getChangeId()); + } + + @Test + void nonBlockedStatesAreNotBlockedForMI() { + assertFalse(StageState.NOT_STARTED.isBlockedForManualIntervention()); + assertFalse(StageState.STARTED.isBlockedForManualIntervention()); + assertFalse(StageState.COMPLETED.isBlockedForManualIntervention()); + assertFalse(StageState.failed(new ErrorInfo("E", "m", java.util.Collections.singletonList("c"), "s")) + .isBlockedForManualIntervention()); + } }