From 0f98d35d9dbdf2f15a835f27119f2fe90d6532d0 Mon Sep 17 00:00:00 2001 From: Antonio Perez Dieppa Date: Tue, 12 May 2026 15:39:41 +0100 Subject: [PATCH] refactor: introduce PipelineRun aggregate for runner-planner protocol (#904) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the (List, ExecutionResultBuilder) pair carried through the runner with a single mutable aggregate (PipelineRun) that owns in-flight state and produces the ExecuteResponseData. - New `internal.core.pipeline.run`: **PipelineRun**, **StageRun**, **StageState** (polymorphic; Failed carries ErrorInfo). - API change: `ExecutionPlanner.getNextExecution(List)` → `getNextExecution(PipelineRun)`. CommunityExecutionPlanner and CloudExecutionPlanner migrated. - Intent-named mutators: `markStageStarted`, `markStageCompleted(result)`, `markStageFailed(exception)` — aggregate derives ErrorInfo from StageExecutionException. - Removed **ExecutablePipeline** (trivial wrapper) and **ExecutionResultBuilder**; ExecutionPlan holds `List` directly; `PipelineRun.toResponse()` builds the report. No observable behavior change: runner still fail-fasts on StageExecutionException; response shape and event firing points are identical. --- .../cloud/planner/CloudExecutionPlanner.java | 4 +- .../planner/CloudExecutionPlannerTest.java | 17 +- .../AbstractPipelineTraverseOperation.java | 55 ++--- .../result/ExecutionResultBuilder.java | 165 --------------- .../execution/ExecutablePipeline.java | 40 ---- .../core/pipeline/run/PipelineRun.java | 189 ++++++++++++++++++ .../internal/core/pipeline/run/StageRun.java | 56 ++++++ .../core/pipeline/run/StageState.java | 69 +++++++ .../internal/core/plan/ExecutionPlan.java | 16 +- .../internal/core/plan/ExecutionPlanner.java | 6 +- .../community/CommunityExecutionPlanner.java | 10 +- ...AbstractPipelineTraverseOperationTest.java | 7 +- .../operation/ExecuteApplyOperationTest.java | 1 + .../operation/ValidateApplyOperationTest.java | 10 +- .../core/pipeline/run/PipelineRunTest.java | 122 +++++++++++ .../run/PipelineRunToResponseTest.java | 158 +++++++++++++++ .../core/pipeline/run/StageStateTest.java | 54 +++++ .../CommunityExecutionPlannerTest.java | 7 +- 18 files changed, 721 insertions(+), 265 deletions(-) delete mode 100644 core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/result/ExecutionResultBuilder.java delete mode 100644 core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/execution/ExecutablePipeline.java create mode 100644 core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/PipelineRun.java create mode 100644 core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageRun.java create mode 100644 core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageState.java create mode 100644 core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunTest.java create mode 100644 core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunToResponseTest.java create mode 100644 core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/StageStateTest.java diff --git a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java index 21e18b3d8..3126c8f59 100644 --- a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java +++ b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java @@ -35,6 +35,7 @@ import io.flamingock.internal.core.external.store.lock.LockException; import io.flamingock.internal.core.pipeline.execution.ExecutableStage; import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage; +import io.flamingock.internal.core.pipeline.run.PipelineRun; import io.flamingock.internal.util.log.FlamingockLoggerFactory; import org.slf4j.Logger; @@ -78,7 +79,8 @@ public CloudExecutionPlanner(RunnerId runnerId, } @Override - public ExecutionPlan getNextExecution(List loadedStages) throws LockException { + public ExecutionPlan getNextExecution(PipelineRun pipelineRun) throws LockException { + List loadedStages = pipelineRun.getLoadedStages(); AuditMarkSnapshot snapshot = buildAuditMarkSnapshot(); diff --git a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java index fdbcaecba..4c03dfef6 100644 --- a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java +++ b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java @@ -36,6 +36,7 @@ import io.flamingock.internal.core.plan.ExecutionPlan; import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage; import io.flamingock.internal.core.pipeline.loaded.stage.DefaultLoadedStage; +import io.flamingock.internal.core.pipeline.run.PipelineRun; import io.flamingock.internal.util.TimeService; import io.flamingock.internal.util.id.RunnerId; import io.flamingock.core.cloud.changes._001__CloudChange1; @@ -110,7 +111,7 @@ void shouldReturnAbortPlanWhenServerReturnsAbort() { List stages = Collections.singletonList( new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1))); - ExecutionPlan plan = planner.getNextExecution(stages); + ExecutionPlan plan = planner.getNextExecution(PipelineRun.of(stages)); assertTrue(plan.isAborted()); assertThrows(ManualInterventionRequiredException.class, plan::validate); @@ -131,7 +132,7 @@ void shouldReturnAbortPlanWhenServerReturnsAbortWithNoMIChanges() { List stages = Collections.singletonList( new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1))); - ExecutionPlan plan = planner.getNextExecution(stages); + ExecutionPlan plan = planner.getNextExecution(PipelineRun.of(stages)); assertTrue(plan.isAborted()); assertThrows(io.flamingock.internal.common.core.error.FlamingockException.class, plan::validate); @@ -163,7 +164,7 @@ void shouldIncludeAuditMarksInExecutionRequest() { List stages = Collections.singletonList( new DefaultLoadedStage("stage-1", StageType.DEFAULT, Arrays.asList(change1, change2))); - planner.getNextExecution(stages); + planner.getNextExecution(PipelineRun.of(stages)); ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(ExecutionPlanRequest.class); verify(client).createExecution(requestCaptor.capture(), any(), anyLong()); @@ -191,7 +192,7 @@ void shouldSendNoneStatusWhenNoMarks() { List stages = Collections.singletonList( new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1))); - planner.getNextExecution(stages); + planner.getNextExecution(PipelineRun.of(stages)); ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(ExecutionPlanRequest.class); verify(client).createExecution(requestCaptor.capture(), any(), anyLong()); @@ -221,7 +222,7 @@ void shouldClearMarksWhenResponseHasSynchronizedMarksTrue() { List stages = Collections.singletonList( new DefaultLoadedStage("stage-1", StageType.DEFAULT, Arrays.asList(change1, change2))); - planner.getNextExecution(stages); + planner.getNextExecution(PipelineRun.of(stages)); verify(marker1).clearMark(change1.getId()); verify(marker2).clearMark(change2.getId()); @@ -243,7 +244,7 @@ void shouldNotClearMarksWhenResponseHasSynchronizedMarksFalse() { List stages = Collections.singletonList( new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1))); - planner.getNextExecution(stages); + planner.getNextExecution(PipelineRun.of(stages)); verify(marker1, never()).clearMark(any()); } @@ -264,7 +265,7 @@ void shouldClearMarksRegardlessOfResponseAction() { List stages = Collections.singletonList( new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1))); - planner.getNextExecution(stages); + planner.getNextExecution(PipelineRun.of(stages)); verify(marker1).clearMark(change1.getId()); } @@ -286,7 +287,7 @@ void shouldNotClearNewMarksWrittenAfterRequest() { List stages = Collections.singletonList( new DefaultLoadedStage("stage-1", StageType.DEFAULT, Arrays.asList(change1, change2))); - planner.getNextExecution(stages); + planner.getNextExecution(PipelineRun.of(stages)); // Only change1 should be cleared (was in snapshot), not change2 verify(marker1).clearMark(change1.getId()); diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperation.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperation.java index cdec7078a..6ed835bb2 100644 --- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperation.java +++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperation.java @@ -18,9 +18,7 @@ import io.flamingock.internal.common.core.error.FlamingockException; import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException; import io.flamingock.internal.common.core.error.PendingChangesException; -import io.flamingock.internal.common.core.response.data.ErrorInfo; import io.flamingock.internal.common.core.response.data.ExecuteResponseData; -import io.flamingock.internal.common.core.response.data.StageResult; import io.flamingock.internal.core.event.EventPublisher; import io.flamingock.internal.core.event.model.impl.PipelineCompletedEvent; import io.flamingock.internal.core.event.model.impl.PipelineFailedEvent; @@ -30,7 +28,6 @@ import io.flamingock.internal.core.event.model.impl.StageStartedEvent; import io.flamingock.internal.core.operation.execute.ExecuteArgs; import io.flamingock.internal.core.operation.execute.ExecuteResult; -import io.flamingock.internal.core.operation.result.ExecutionResultBuilder; import io.flamingock.internal.core.pipeline.execution.ExecutableStage; import io.flamingock.internal.core.pipeline.execution.ExecutionContext; import io.flamingock.internal.core.pipeline.execution.OrphanExecutionContext; @@ -38,6 +35,7 @@ import io.flamingock.internal.core.pipeline.execution.StageExecutor; import io.flamingock.internal.core.pipeline.loaded.LoadedPipeline; import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage; +import io.flamingock.internal.core.pipeline.run.PipelineRun; import io.flamingock.internal.core.plan.ExecutionPlan; import io.flamingock.internal.core.plan.ExecutionPlanner; import io.flamingock.internal.core.external.store.lock.Lock; @@ -96,7 +94,7 @@ public ExecuteResult execute(ExecuteArgs args) { } catch (OperationException operationException) { throw operationException; } catch (Throwable throwable) { - throw processAndGetFlamingockException(throwable, null); + throw processAndGetFlamingockException(throwable); } finally { this.finalizer.run(); } @@ -122,11 +120,13 @@ private ExecuteResponseData execute(LoadedPipeline pipeline) throws FlamingockEx logger.info("Flamingock execution started [stages={} changes={}]", stageCount, changeCount); eventPublisher.publish(new PipelineStartedEvent()); - ExecutionResultBuilder resultBuilder = new ExecutionResultBuilder().startTimer(); + + PipelineRun pipelineRun = PipelineRun.of(pipeline); + pipelineRun.start(); do { - List stages = validateAndGetExecutableStages(pipeline); - try (ExecutionPlan execution = executionPlanner.getNextExecution(stages)) { + validateAndGetExecutableStages(pipeline); + try (ExecutionPlan execution = executionPlanner.getNextExecution(pipelineRun)) { // Validate execution plan for manual intervention requirements // This centralized validation ensures both community and cloud paths are validated execution.validate(); @@ -135,10 +135,8 @@ private ExecuteResponseData execute(LoadedPipeline pipeline) throws FlamingockEx if (validateOnlyMode()) { throw new PendingChangesException(); } - execution.applyOnEach((executionId, lock, executableStage) -> { - StageResult stageResult = runStage(executionId, lock, executableStage); - resultBuilder.addStage(stageResult); - }); + execution.applyOnEach((executionId, lock, executableStage) -> + runStage(executionId, lock, executableStage, pipelineRun)); } else { break; } @@ -154,15 +152,24 @@ private ExecuteResponseData execute(LoadedPipeline pipeline) throws FlamingockEx } break; } catch (StageExecutionException e) { - resultBuilder.addStage(e.getResult()); - resultBuilder.stopTimer().failed(); - resultBuilder.error(ErrorInfo.fromThrowable(e.getCause(), e.getFailedChangeId(), e.getResult().getStageId())); - throw OperationException.fromExisting(e.getCause(), resultBuilder.build()); + // Defensive: runStage normally records the failure into pipelineRun before + // rethrowing. If for some reason this exception arrives here without that + // having happened (e.g. thrown by something other than runStage), make sure + // the failure is reflected in the response. + String stageName = e.getResult() != null ? e.getResult().getStageName() : null; + if (stageName != null) { + io.flamingock.internal.core.pipeline.run.StageRun stageRun = pipelineRun.getStageRun(stageName); + if (stageRun != null && !stageRun.getState().isFailed()) { + pipelineRun.markStageFailed(stageName, e); + } + } + pipelineRun.stop(); + throw OperationException.fromExisting(e.getCause(), pipelineRun.toResponse()); } } while (true); - resultBuilder.stopTimer().success(); - ExecuteResponseData result = resultBuilder.build(); + pipelineRun.stop(); + ExecuteResponseData result = pipelineRun.toResponse(); logger.info("Flamingock execution completed [duration={}ms applied={} skipped={}]", result.getTotalDurationMs(), result.getAppliedChanges(), result.getSkippedChanges()); @@ -172,29 +179,31 @@ private ExecuteResponseData execute(LoadedPipeline pipeline) throws FlamingockEx return result; } - private StageResult runStage(String executionId, Lock lock, ExecutableStage executableStage) { + private void runStage(String executionId, Lock lock, ExecutableStage executableStage, PipelineRun pipelineRun) { try { - return startStage(executionId, lock, executableStage); + startStage(executionId, lock, executableStage, pipelineRun); } catch (StageExecutionException exception) { + pipelineRun.markStageFailed(executableStage.getName(), exception); eventPublisher.publish(new StageFailedEvent(exception)); eventPublisher.publish(new PipelineFailedEvent(exception)); throw exception; } catch (Throwable generalException) { - throw processAndGetFlamingockException(generalException, null); + throw processAndGetFlamingockException(generalException); } } - private StageResult startStage(String executionId, Lock lock, ExecutableStage executableStage) throws StageExecutionException { + private void startStage(String executionId, Lock lock, ExecutableStage executableStage, PipelineRun pipelineRun) throws StageExecutionException { + pipelineRun.markStageStarted(executableStage.getName()); eventPublisher.publish(new StageStartedEvent()); logger.debug("Applied state to process:\n{}", executableStage); ExecutionContext executionContext = new ExecutionContext(executionId, orphanExecutionContext.getHostname(), orphanExecutionContext.getMetadata()); StageExecutor.Output executionOutput = stageExecutor.executeStage(executableStage, executionContext, lock); + pipelineRun.markStageCompleted(executableStage.getName(), executionOutput.getResult()); eventPublisher.publish(new StageCompletedEvent(executionOutput)); - return executionOutput.getResult(); } - private FlamingockException processAndGetFlamingockException(Throwable exception, ExecutionResultBuilder resultBuilder) throws FlamingockException { + private FlamingockException processAndGetFlamingockException(Throwable exception) throws FlamingockException { FlamingockException flamingockException; if (exception instanceof OperationException) { OperationException pipelineException = (OperationException) exception; diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/result/ExecutionResultBuilder.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/result/ExecutionResultBuilder.java deleted file mode 100644 index 4c668ec44..000000000 --- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/operation/result/ExecutionResultBuilder.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Copyright 2026 Flamingock (https://www.flamingock.io) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.flamingock.internal.core.operation.result; - -import io.flamingock.internal.common.core.response.data.ChangeResult; -import io.flamingock.internal.common.core.response.data.ChangeStatus; -import io.flamingock.internal.common.core.response.data.ErrorInfo; -import io.flamingock.internal.common.core.response.data.ExecuteResponseData; -import io.flamingock.internal.common.core.response.data.ExecutionStatus; -import io.flamingock.internal.common.core.response.data.StageResult; -import io.flamingock.internal.common.core.response.data.StageStatus; - -import java.time.Duration; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.List; - -/** - * Builder for creating ExecuteResponseData instances from pipeline execution data. - */ -public class ExecutionResultBuilder { - - private ExecutionStatus status; - private LocalDateTime startTime; - private LocalDateTime endTime; - private final List stages = new ArrayList<>(); - private ErrorInfo error; - - public ExecutionResultBuilder() { - } - - public ExecutionResultBuilder startTimer() { - this.startTime = LocalDateTime.now(); - return this; - } - - public ExecutionResultBuilder stopTimer() { - this.endTime = LocalDateTime.now(); - return this; - } - - public ExecutionResultBuilder addStage(StageResult stage) { - this.stages.add(stage); - return this; - } - - public ExecutionResultBuilder status(ExecutionStatus status) { - this.status = status; - return this; - } - - public ExecutionResultBuilder success() { - this.status = ExecutionStatus.SUCCESS; - return this; - } - - public ExecutionResultBuilder failed() { - this.status = ExecutionStatus.FAILED; - return this; - } - - public ExecutionResultBuilder partial() { - this.status = ExecutionStatus.PARTIAL; - return this; - } - - public ExecutionResultBuilder noChanges() { - this.status = ExecutionStatus.NO_CHANGES; - return this; - } - - public ExecutionResultBuilder error(ErrorInfo error) { - this.error = error; - return this; - } - - public ExecutionResultBuilder error(Throwable throwable, String changeId, String stageId) { - this.error = ErrorInfo.fromThrowable(throwable, changeId, stageId); - return this; - } - - public ExecuteResponseData build() { - long durationMs = 0; - if (startTime != null && endTime != null) { - durationMs = Duration.between(startTime, endTime).toMillis(); - } - - // Calculate aggregate counts from stages - int totalStages = stages.size(); - int completedStages = 0; - int failedStages = 0; - int totalChanges = 0; - int appliedChanges = 0; - int skippedChanges = 0; - int failedChanges = 0; - - for (StageResult stage : stages) { - if (stage.getStatus() == StageStatus.COMPLETED) { - completedStages++; - } else if (stage.getStatus() == StageStatus.FAILED) { - failedStages++; - } - - for (ChangeResult change : stage.getChanges()) { - totalChanges++; - if (change.getStatus() == ChangeStatus.APPLIED) { - appliedChanges++; - } else if (change.getStatus() == ChangeStatus.ALREADY_APPLIED) { - skippedChanges++; - } else if (change.getStatus() == ChangeStatus.FAILED) { - failedChanges++; - } - } - } - - // Determine status if not explicitly set - if (status == null) { - if (failedChanges > 0) { - status = ExecutionStatus.FAILED; - } else if (appliedChanges > 0) { - status = ExecutionStatus.SUCCESS; - } else { - status = ExecutionStatus.NO_CHANGES; - } - } - - Instant startInstant = startTime != null - ? startTime.atZone(ZoneId.systemDefault()).toInstant() - : null; - Instant endInstant = endTime != null - ? endTime.atZone(ZoneId.systemDefault()).toInstant() - : null; - - return ExecuteResponseData.builder() - .status(status) - .startTime(startInstant) - .endTime(endInstant) - .totalDurationMs(durationMs) - .totalStages(totalStages) - .completedStages(completedStages) - .failedStages(failedStages) - .totalChanges(totalChanges) - .appliedChanges(appliedChanges) - .skippedChanges(skippedChanges) - .failedChanges(failedChanges) - .stages(stages) - .error(error) - .build(); - } -} diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/execution/ExecutablePipeline.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/execution/ExecutablePipeline.java deleted file mode 100644 index 55bffa250..000000000 --- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/execution/ExecutablePipeline.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2023 Flamingock (https://www.flamingock.io) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.flamingock.internal.core.pipeline.execution; - -import java.util.List; - -public class ExecutablePipeline { - - - private final List stages; - - public ExecutablePipeline(List stages) { - this.stages = stages; - } - - public Iterable getExecutableStages() { - return stages; - } - - public boolean isExecutionRequired() { - return stages - .stream() - .anyMatch(ExecutableStage::isExecutionRequired); - } - - -} diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/PipelineRun.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/PipelineRun.java new file mode 100644 index 000000000..49d2fc08c --- /dev/null +++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/PipelineRun.java @@ -0,0 +1,189 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.internal.core.pipeline.run; + +import io.flamingock.internal.common.core.response.data.ChangeResult; +import io.flamingock.internal.common.core.response.data.ChangeStatus; +import io.flamingock.internal.common.core.response.data.ErrorInfo; +import io.flamingock.internal.common.core.response.data.ExecuteResponseData; +import io.flamingock.internal.common.core.response.data.ExecutionStatus; +import io.flamingock.internal.common.core.response.data.StageResult; +import io.flamingock.internal.common.core.response.data.StageStatus; +import io.flamingock.internal.core.pipeline.execution.StageExecutionException; +import io.flamingock.internal.core.pipeline.loaded.LoadedPipeline; +import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class PipelineRun { + + public static PipelineRun of(LoadedPipeline pipeline) { + List stages = new ArrayList<>(); + pipeline.getSystemStage().ifPresent(stages::add); + stages.addAll(pipeline.getStages()); + return of(stages); + } + + public static PipelineRun of(List stages) { + List runs = new ArrayList<>(); + for (AbstractLoadedStage stage : stages) { + runs.add(new StageRun(stage)); + } + return new PipelineRun(runs); + } + + private final List stageRuns; + private final Map byName; + private Instant startedAt; + private Instant stoppedAt; + + private PipelineRun(List stageRuns) { + this.stageRuns = Collections.unmodifiableList(stageRuns); + Map index = new LinkedHashMap<>(); + for (StageRun run : stageRuns) { + index.put(run.getName(), run); + } + this.byName = Collections.unmodifiableMap(index); + } + + public List getStageRuns() { + return stageRuns; + } + + public StageRun getStageRun(String name) { + return byName.get(name); + } + + public List getLoadedStages() { + return stageRuns.stream() + .map(StageRun::getLoadedStage) + .collect(Collectors.toList()); + } + + public void start() { + this.startedAt = Instant.now(); + } + + public void stop() { + this.stoppedAt = Instant.now(); + } + + public void markStageStarted(String stageName) { + lookupOrThrow(stageName).setState(StageState.STARTED); + } + + public void markStageCompleted(String stageName, StageResult result) { + StageRun run = lookupOrThrow(stageName); + run.setState(StageState.COMPLETED); + run.setResult(result); + } + + public void markStageFailed(String stageName, StageExecutionException exception) { + StageResult result = exception.getResult(); + ErrorInfo errorInfo = ErrorInfo.fromThrowable( + exception.getCause(), + exception.getFailedChangeId(), + result.getStageId() + ); + StageRun run = lookupOrThrow(stageName); + run.setState(StageState.failed(errorInfo)); + run.setResult(result); + } + + private StageRun lookupOrThrow(String stageName) { + StageRun run = byName.get(stageName); + if (run == null) { + throw new IllegalArgumentException("Unknown stage: " + stageName); + } + return run; + } + + public ExecuteResponseData toResponse() { + List stages = new ArrayList<>(); + int totalStages = 0; + int completedStages = 0; + int failedStages = 0; + int totalChanges = 0; + int appliedChanges = 0; + int skippedChanges = 0; + int failedChanges = 0; + ErrorInfo error = null; + boolean anyFailed = false; + + for (StageRun stageRun : stageRuns) { + StageResult stageResult = stageRun.getResult().orElse(null); + if (stageResult == null) { + continue; + } + stages.add(stageResult); + totalStages++; + + if (stageResult.getStatus() == StageStatus.COMPLETED) { + completedStages++; + } else if (stageResult.getStatus() == StageStatus.FAILED) { + failedStages++; + } + + if (stageResult.getChanges() != null) { + for (ChangeResult change : stageResult.getChanges()) { + totalChanges++; + if (change.getStatus() == ChangeStatus.APPLIED) { + appliedChanges++; + } else if (change.getStatus() == ChangeStatus.ALREADY_APPLIED) { + skippedChanges++; + } else if (change.getStatus() == ChangeStatus.FAILED) { + failedChanges++; + } + } + } + + if (stageRun.getState().isFailed()) { + anyFailed = true; + if (error == null) { + error = stageRun.getState().getErrorInfo().orElse(null); + } + } + } + + ExecutionStatus status = anyFailed ? ExecutionStatus.FAILED : ExecutionStatus.SUCCESS; + long durationMs = (startedAt != null && stoppedAt != null) + ? Duration.between(startedAt, stoppedAt).toMillis() + : 0L; + + return ExecuteResponseData.builder() + .status(status) + .startTime(startedAt) + .endTime(stoppedAt) + .totalDurationMs(durationMs) + .totalStages(totalStages) + .completedStages(completedStages) + .failedStages(failedStages) + .totalChanges(totalChanges) + .appliedChanges(appliedChanges) + .skippedChanges(skippedChanges) + .failedChanges(failedChanges) + .stages(stages) + .error(error) + .build(); + } +} diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageRun.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageRun.java new file mode 100644 index 000000000..5566070f5 --- /dev/null +++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageRun.java @@ -0,0 +1,56 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.internal.core.pipeline.run; + +import io.flamingock.internal.common.core.response.data.StageResult; +import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage; + +import java.util.Optional; + +public class StageRun { + + private final AbstractLoadedStage loadedStage; + private StageState state = StageState.NOT_STARTED; + private StageResult result; + + public StageRun(AbstractLoadedStage loadedStage) { + this.loadedStage = loadedStage; + } + + public String getName() { + return loadedStage.getName(); + } + + public AbstractLoadedStage getLoadedStage() { + return loadedStage; + } + + public StageState getState() { + return state; + } + + public Optional getResult() { + return Optional.ofNullable(result); + } + + void setState(StageState state) { + this.state = state; + } + + void setResult(StageResult result) { + this.result = result; + } +} diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageState.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageState.java new file mode 100644 index 000000000..f2178cd41 --- /dev/null +++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/pipeline/run/StageState.java @@ -0,0 +1,69 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.internal.core.pipeline.run; + +import io.flamingock.internal.common.core.response.data.ErrorInfo; + +import java.util.Optional; + +public abstract class StageState { + + public static final StageState NOT_STARTED = new NotStarted(); + public static final StageState STARTED = new Started(); + public static final StageState COMPLETED = new Completed(); + + public static StageState failed(ErrorInfo info) { + return new Failed(info); + } + + private StageState() { + } + + public boolean isFailed() { + return false; + } + + public Optional getErrorInfo() { + return Optional.empty(); + } + + private static final class NotStarted extends StageState { + } + + private static final class Started extends StageState { + } + + private static final class Completed extends StageState { + } + + private static final class Failed extends StageState { + private final ErrorInfo info; + + Failed(ErrorInfo info) { + this.info = info; + } + + @Override + public boolean isFailed() { + return true; + } + + @Override + public Optional getErrorInfo() { + return Optional.of(info); + } + } +} diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/ExecutionPlan.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/ExecutionPlan.java index 996fe4051..f6b1ffcb8 100644 --- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/ExecutionPlan.java +++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/ExecutionPlan.java @@ -17,7 +17,6 @@ import io.flamingock.internal.util.TriConsumer; import io.flamingock.internal.core.external.store.lock.Lock; -import io.flamingock.internal.core.pipeline.execution.ExecutablePipeline; import io.flamingock.internal.core.pipeline.execution.ExecutableStage; import io.flamingock.internal.core.change.executable.ExecutableChange; import io.flamingock.internal.common.core.error.FlamingockException; @@ -49,7 +48,7 @@ public static ExecutionPlan ABORT(List stages) { private final Lock lock; - private final ExecutablePipeline pipeline; + private final List executableStages; private final boolean aborted; @@ -65,7 +64,7 @@ private ExecutionPlan(String executionId, Lock lock, boolean aborted, List getExecutableStages() { + return executableStages; } public void applyOnEach(TriConsumer consumer) { if (isExecutionRequired()) { - pipeline.getExecutableStages() - .forEach(executableStage -> consumer.accept(executionId, lock, executableStage)); + executableStages.forEach(executableStage -> consumer.accept(executionId, lock, executableStage)); } } @@ -105,7 +103,7 @@ public void validate() { String firstStageName = "unknown"; boolean hasStages = false; - for (ExecutableStage stage : pipeline.getExecutableStages()) { + for (ExecutableStage stage : executableStages) { if (!hasStages) { firstStageName = stage.getName(); hasStages = true; diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/ExecutionPlanner.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/ExecutionPlanner.java index 14083cdb1..8d95e9a12 100644 --- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/ExecutionPlanner.java +++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/ExecutionPlanner.java @@ -16,13 +16,11 @@ package io.flamingock.internal.core.plan; import io.flamingock.internal.core.external.store.lock.LockException; -import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage; - -import java.util.List; +import io.flamingock.internal.core.pipeline.run.PipelineRun; public abstract class ExecutionPlanner { - abstract public ExecutionPlan getNextExecution(List loadedStages) throws LockException; + public abstract ExecutionPlan getNextExecution(PipelineRun pipelineRun) throws LockException; } diff --git a/core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/community/CommunityExecutionPlanner.java b/core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/community/CommunityExecutionPlanner.java index 316b8dfc6..207a3121c 100644 --- a/core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/community/CommunityExecutionPlanner.java +++ b/core/flamingock-core/src/main/java/io/flamingock/internal/core/plan/community/CommunityExecutionPlanner.java @@ -31,6 +31,7 @@ import io.flamingock.internal.core.external.store.lock.LockRefreshDaemon; import io.flamingock.internal.core.pipeline.execution.ExecutableStage; import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage; +import io.flamingock.internal.core.pipeline.run.PipelineRun; import io.flamingock.internal.core.external.store.audit.community.CommunityAuditReader; import io.flamingock.internal.util.id.RunnerId; import io.flamingock.internal.util.TimeService; @@ -114,12 +115,15 @@ public CommunityExecutionPlanner(RunnerId instanceId, *

Error Handling: If any exception occurs after acquiring the lock, the lock is released * in the catch block to prevent lock leaks.

* - * @param loadedStages the list of loaded stages containing all defined changes + * @param pipelineRun the in-flight run aggregate; this implementation reads the loaded + * stages from it ({@code pipelineRun.getLoadedStages()}) and does not + * yet consult per-stage state (deferred to a later phase) * @return ExecutionPlan containing either stages to execute (with lock held) or CONTINUE (no lock) * @throws LockException if unable to acquire the distributed lock within the configured timeout */ @Override - public ExecutionPlan getNextExecution(List loadedStages) throws LockException { + public ExecutionPlan getNextExecution(PipelineRun pipelineRun) throws LockException { + List loadedStages = pipelineRun.getLoadedStages(); Map initialSnapshot = auditReader.getAuditSnapshotByChangeId(); logger.debug("Pulled initial remote state:\n{}", initialSnapshot); @@ -140,6 +144,8 @@ public ExecutionPlan getNextExecution(List loadedStages) th List validatedStages = buildExecutableStages(loadedStages, validatedSnapshot); + //TODO add hasManualInterventionChanges here too, after lock acquisition + Optional nextStageOpt = getFirstExecutableStage(validatedStages); if (!nextStageOpt.isPresent()) { diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperationTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperationTest.java index c55c38003..6770a4547 100644 --- a/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperationTest.java +++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/AbstractPipelineTraverseOperationTest.java @@ -26,6 +26,7 @@ import io.flamingock.internal.core.pipeline.execution.StageExecutor; import io.flamingock.internal.core.pipeline.loaded.LoadedPipeline; import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage; +import io.flamingock.internal.core.pipeline.run.PipelineRun; import io.flamingock.internal.core.plan.ExecutionPlan; import io.flamingock.internal.core.plan.ExecutionPlanner; import io.flamingock.internal.util.id.RunnerId; @@ -35,7 +36,7 @@ import java.util.Collections; import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; class AbstractPipelineTraverseOperationTest { @@ -48,7 +49,7 @@ void shouldThrowManualInterventionWhenPlannerReturnsAbort() { ExecutionPlan abortPlan = ExecutionPlan.ABORT(Collections.singletonList(stage)); ExecutionPlanner planner = mock(ExecutionPlanner.class); - when(planner.getNextExecution(anyList())).thenReturn(abortPlan); + when(planner.getNextExecution(any(PipelineRun.class))).thenReturn(abortPlan); LoadedPipeline pipeline = mockPipeline(); ExecuteApplyOperation operation = buildOperation(planner); @@ -67,7 +68,7 @@ void shouldNotExecuteChangesWhenPlanIsAbort() { ExecutionPlan abortPlan = ExecutionPlan.ABORT(Collections.singletonList(stage)); ExecutionPlanner planner = mock(ExecutionPlanner.class); - when(planner.getNextExecution(anyList())).thenReturn(abortPlan); + when(planner.getNextExecution(any(PipelineRun.class))).thenReturn(abortPlan); StageExecutor stageExecutor = mock(StageExecutor.class); LoadedPipeline pipeline = mockPipeline(); diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/ExecuteApplyOperationTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/ExecuteApplyOperationTest.java index 0fb2677b3..bef3b7ccb 100644 --- a/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/ExecuteApplyOperationTest.java +++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/ExecuteApplyOperationTest.java @@ -156,6 +156,7 @@ void shouldThrowOperationExceptionOnStageFailure() throws Exception { when(pipeline.getSystemStage()).thenReturn(java.util.Optional.empty()); when(pipeline.getStages()).thenReturn(Collections.singletonList(loadedStage)); + when(loadedStage.getName()).thenReturn("stage-1"); when(loadedStage.getChanges()).thenReturn(Collections.singletonList(loadedChange)); when(executionPlanner.getNextExecution(any())).thenReturn(executionPlan); diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/ValidateApplyOperationTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/ValidateApplyOperationTest.java index e69ff32fb..b64b78cdc 100644 --- a/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/ValidateApplyOperationTest.java +++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/operation/ValidateApplyOperationTest.java @@ -21,7 +21,6 @@ import io.flamingock.internal.core.operation.execute.ExecuteArgs; import io.flamingock.internal.core.operation.execute.ExecuteResult; import io.flamingock.internal.core.operation.validate.ValidateApplyOperation; -import io.flamingock.internal.core.pipeline.execution.ExecutablePipeline; import io.flamingock.internal.core.pipeline.execution.ExecutableStage; import io.flamingock.internal.core.pipeline.execution.OrphanExecutionContext; import io.flamingock.internal.core.pipeline.execution.StageExecutor; @@ -137,10 +136,7 @@ void shouldThrowPendingChangesExceptionWhenPendingChangesExist() throws Exceptio ExecutableStage executableStage = mock(ExecutableStage.class); doReturn(pendingChanges).when(executableStage).getChanges(); - ExecutablePipeline executablePipeline = mock(ExecutablePipeline.class); - when(executablePipeline.getExecutableStages()).thenReturn(Collections.singletonList(executableStage)); - - ExecutionPlan executionPlan = mockPendingPlan(executablePipeline); + ExecutionPlan executionPlan = mockPendingPlan(Collections.singletonList(executableStage)); when(pipeline.getSystemStage()).thenReturn(java.util.Optional.empty()); when(pipeline.getStages()).thenReturn(Collections.singletonList(loadedStage)); @@ -174,10 +170,10 @@ private ExecutionPlan mockNoPendingPlan() { * Creates an ExecutionPlan mock where execution is required * (i.e., there are pending changes) and the pipeline exposes the given executable stages. */ - private ExecutionPlan mockPendingPlan(ExecutablePipeline executablePipeline) { + private ExecutionPlan mockPendingPlan(List stages) { ExecutionPlan plan = mock(ExecutionPlan.class); when(plan.isExecutionRequired()).thenReturn(true); - when(plan.getPipeline()).thenReturn(executablePipeline); + when(plan.getExecutableStages()).thenReturn(stages); doNothing().when(plan).validate(); doNothing().when(plan).close(); return plan; diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunTest.java new file mode 100644 index 000000000..f1d3fed65 --- /dev/null +++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunTest.java @@ -0,0 +1,122 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.internal.core.pipeline.run; + +import io.flamingock.internal.common.core.response.data.ErrorInfo; +import io.flamingock.internal.common.core.response.data.StageResult; +import io.flamingock.internal.common.core.response.data.StageStatus; +import io.flamingock.internal.core.pipeline.execution.StageExecutionException; +import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class PipelineRunTest { + + @Test + void buildsOneStageRunPerStageInOrderAllNotStarted() { + AbstractLoadedStage a = mockStage("alpha"); + AbstractLoadedStage b = mockStage("beta"); + + PipelineRun pipelineRun = PipelineRun.of(Arrays.asList(a, b)); + + List runs = pipelineRun.getStageRuns(); + assertEquals(2, runs.size()); + assertEquals("alpha", runs.get(0).getName()); + assertEquals("beta", runs.get(1).getName()); + assertSame(StageState.NOT_STARTED, runs.get(0).getState()); + assertSame(StageState.NOT_STARTED, runs.get(1).getState()); + } + + @Test + void markStageStartedUpdatesTheRightStage() { + AbstractLoadedStage a = mockStage("alpha"); + AbstractLoadedStage b = mockStage("beta"); + + PipelineRun pipelineRun = PipelineRun.of(Arrays.asList(a, b)); + pipelineRun.markStageStarted("alpha"); + + assertSame(StageState.STARTED, pipelineRun.getStageRun("alpha").getState()); + assertSame(StageState.NOT_STARTED, pipelineRun.getStageRun("beta").getState()); + } + + @Test + void markStageFailedDerivesErrorInfoFromException() { + AbstractLoadedStage a = mockStage("alpha"); + StageResult stageResult = StageResult.builder() + .stageId("alpha") + .stageName("alpha") + .status(StageStatus.FAILED) + .build(); + RuntimeException cause = new RuntimeException("boom"); + StageExecutionException exception = StageExecutionException.fromResult(cause, stageResult, "change-1"); + + PipelineRun pipelineRun = PipelineRun.of(java.util.Collections.singletonList(a)); + pipelineRun.markStageFailed("alpha", exception); + + StageState state = pipelineRun.getStageRun("alpha").getState(); + assertTrue(state.isFailed()); + ErrorInfo info = state.getErrorInfo().get(); + assertEquals("change-1", info.getChangeId()); + assertEquals("alpha", info.getStageId()); + } + + @Test + void markStageStartedThrowsForUnknownStage() { + AbstractLoadedStage a = mockStage("alpha"); + PipelineRun pipelineRun = PipelineRun.of(java.util.Collections.singletonList(a)); + + assertThrows( + IllegalArgumentException.class, + () -> pipelineRun.markStageStarted("missing")); + } + + @Test + void getStageRunReturnsNullForUnknownStage() { + AbstractLoadedStage a = mockStage("alpha"); + PipelineRun pipelineRun = PipelineRun.of(java.util.Collections.singletonList(a)); + + assertNull(pipelineRun.getStageRun("missing")); + } + + @Test + void getLoadedStagesReturnsTheUnderlyingStagesInOrder() { + AbstractLoadedStage a = mockStage("alpha"); + AbstractLoadedStage b = mockStage("beta"); + + PipelineRun pipelineRun = PipelineRun.of(Arrays.asList(a, b)); + + List loaded = pipelineRun.getLoadedStages(); + assertEquals(2, loaded.size()); + assertSame(a, loaded.get(0)); + assertSame(b, loaded.get(1)); + } + + private static AbstractLoadedStage mockStage(String name) { + AbstractLoadedStage stage = mock(AbstractLoadedStage.class); + when(stage.getName()).thenReturn(name); + return stage; + } +} diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunToResponseTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunToResponseTest.java new file mode 100644 index 000000000..352a790a4 --- /dev/null +++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/PipelineRunToResponseTest.java @@ -0,0 +1,158 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.internal.core.pipeline.run; + +import io.flamingock.internal.common.core.response.data.ChangeResult; +import io.flamingock.internal.common.core.response.data.ChangeStatus; +import io.flamingock.internal.common.core.response.data.ErrorInfo; +import io.flamingock.internal.common.core.response.data.ExecuteResponseData; +import io.flamingock.internal.common.core.response.data.ExecutionStatus; +import io.flamingock.internal.common.core.response.data.StageResult; +import io.flamingock.internal.common.core.response.data.StageStatus; +import io.flamingock.internal.core.pipeline.execution.StageExecutionException; +import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class PipelineRunToResponseTest { + + @Test + void emptyRunYieldsSuccessWithZeroCounts() { + PipelineRun pipelineRun = PipelineRun.of(java.util.Collections.emptyList()); + pipelineRun.start(); + pipelineRun.stop(); + + ExecuteResponseData response = pipelineRun.toResponse(); + + assertEquals(ExecutionStatus.SUCCESS, response.getStatus()); + assertEquals(0, response.getTotalStages()); + assertEquals(0, response.getCompletedStages()); + assertEquals(0, response.getFailedStages()); + assertEquals(0, response.getTotalChanges()); + assertNull(response.getError()); + assertNotNull(response.getStartTime()); + assertNotNull(response.getEndTime()); + } + + @Test + void twoCompletedStagesYieldsSuccessAndCountersRollUp() { + AbstractLoadedStage stageA = mockStage("alpha"); + AbstractLoadedStage stageB = mockStage("beta"); + PipelineRun pipelineRun = PipelineRun.of(Arrays.asList(stageA, stageB)); + + pipelineRun.start(); + pipelineRun.markStageCompleted("alpha", completedStageResult("alpha", 2 /*applied*/, 1 /*skipped*/)); + pipelineRun.markStageCompleted("beta", completedStageResult("beta", 1, 0)); + pipelineRun.stop(); + + ExecuteResponseData response = pipelineRun.toResponse(); + + assertEquals(ExecutionStatus.SUCCESS, response.getStatus()); + assertEquals(2, response.getTotalStages()); + assertEquals(2, response.getCompletedStages()); + assertEquals(0, response.getFailedStages()); + assertEquals(4, response.getTotalChanges()); // 2+1 + 1+0 + assertEquals(3, response.getAppliedChanges()); // 2 + 1 + assertEquals(1, response.getSkippedChanges()); // 1 + 0 + assertEquals(0, response.getFailedChanges()); + assertNull(response.getError()); + } + + @Test + void oneCompletedAndOneFailedYieldsFailedAndPipelineErrorMatchesStage() { + AbstractLoadedStage stageA = mockStage("alpha"); + AbstractLoadedStage stageB = mockStage("beta"); + PipelineRun pipelineRun = PipelineRun.of(Arrays.asList(stageA, stageB)); + + RuntimeException betaCause = new RuntimeException("boom"); + StageExecutionException betaException = StageExecutionException.fromResult( + betaCause, failedStageResult("beta"), "change-b1"); + + pipelineRun.start(); + pipelineRun.markStageCompleted("alpha", completedStageResult("alpha", 1, 0)); + pipelineRun.markStageFailed("beta", betaException); + pipelineRun.stop(); + + ExecuteResponseData response = pipelineRun.toResponse(); + + assertEquals(ExecutionStatus.FAILED, response.getStatus()); + assertEquals(2, response.getTotalStages()); + assertEquals(1, response.getCompletedStages()); + assertEquals(1, response.getFailedStages()); + assertEquals(2, response.getTotalChanges()); // 1 applied + 1 failed + assertEquals(1, response.getAppliedChanges()); + assertEquals(1, response.getFailedChanges()); + ErrorInfo error = response.getError(); + assertEquals("change-b1", error.getChangeId()); + assertEquals("beta", error.getStageId()); + assertEquals("RuntimeException", error.getErrorType()); + assertEquals("boom", error.getMessage()); + } + + @Test + void stagesNeverReachingTerminalStateAreNotReported() { + AbstractLoadedStage stageA = mockStage("alpha"); + AbstractLoadedStage stageB = mockStage("beta"); + PipelineRun pipelineRun = PipelineRun.of(Arrays.asList(stageA, stageB)); + + pipelineRun.start(); + pipelineRun.markStageCompleted("alpha", completedStageResult("alpha", 1, 0)); + // beta never advances past NOT_STARTED — no StageResult attached + pipelineRun.stop(); + + ExecuteResponseData response = pipelineRun.toResponse(); + + assertEquals(1, response.getTotalStages()); + assertEquals(1, response.getCompletedStages()); + assertEquals(1, response.getStages().size()); + } + + private static AbstractLoadedStage mockStage(String name) { + AbstractLoadedStage stage = mock(AbstractLoadedStage.class); + when(stage.getName()).thenReturn(name); + return stage; + } + + private static StageResult completedStageResult(String name, int applied, int skipped) { + StageResult.Builder builder = StageResult.builder() + .stageId(name) + .stageName(name) + .status(StageStatus.COMPLETED); + for (int i = 0; i < applied; i++) { + builder.addChange(ChangeResult.builder().changeId(name + "-a" + i).status(ChangeStatus.APPLIED).build()); + } + for (int i = 0; i < skipped; i++) { + builder.addChange(ChangeResult.builder().changeId(name + "-s" + i).status(ChangeStatus.ALREADY_APPLIED).build()); + } + return builder.build(); + } + + private static StageResult failedStageResult(String name) { + return StageResult.builder() + .stageId(name) + .stageName(name) + .status(StageStatus.FAILED) + .addChange(ChangeResult.builder().changeId(name + "-f0").status(ChangeStatus.FAILED).build()) + .build(); + } +} diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/StageStateTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/StageStateTest.java new file mode 100644 index 000000000..dd70b6cd1 --- /dev/null +++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/pipeline/run/StageStateTest.java @@ -0,0 +1,54 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.internal.core.pipeline.run; + +import io.flamingock.internal.common.core.response.data.ErrorInfo; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class StageStateTest { + + @Test + void notStartedIsNotFailedAndHasNoErrorInfo() { + assertFalse(StageState.NOT_STARTED.isFailed()); + assertFalse(StageState.NOT_STARTED.getErrorInfo().isPresent()); + } + + @Test + void startedIsNotFailedAndHasNoErrorInfo() { + assertFalse(StageState.STARTED.isFailed()); + assertFalse(StageState.STARTED.getErrorInfo().isPresent()); + } + + @Test + void completedIsNotFailedAndHasNoErrorInfo() { + assertFalse(StageState.COMPLETED.isFailed()); + assertFalse(StageState.COMPLETED.getErrorInfo().isPresent()); + } + + @Test + void failedCarriesItsErrorInfo() { + ErrorInfo info = new ErrorInfo("RuntimeException", "boom", "change-1", "stage-1"); + StageState state = StageState.failed(info); + + assertTrue(state.isFailed()); + assertTrue(state.getErrorInfo().isPresent()); + assertSame(info, state.getErrorInfo().get()); + } +} diff --git a/core/flamingock-core/src/test/java/io/flamingock/internal/core/plan/community/CommunityExecutionPlannerTest.java b/core/flamingock-core/src/test/java/io/flamingock/internal/core/plan/community/CommunityExecutionPlannerTest.java index d2fbe0cae..41b092a3c 100644 --- a/core/flamingock-core/src/test/java/io/flamingock/internal/core/plan/community/CommunityExecutionPlannerTest.java +++ b/core/flamingock-core/src/test/java/io/flamingock/internal/core/plan/community/CommunityExecutionPlannerTest.java @@ -25,6 +25,7 @@ import io.flamingock.internal.core.external.store.lock.community.CommunityLockService; import io.flamingock.internal.core.plan.ExecutionPlan; import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage; +import io.flamingock.internal.core.pipeline.run.PipelineRun; import io.flamingock.internal.core.change.loaded.AbstractLoadedChange; import io.flamingock.internal.util.id.RunnerId; import org.junit.jupiter.api.BeforeEach; @@ -78,7 +79,7 @@ void shouldReturnAbortWithoutAcquiringLockWhenManualInterventionRequired() { snapshot.put("change-1", buildAuditEntry("change-1", AuditEntry.Status.FAILED, AuditTxType.NON_TX)); when(auditReader.getAuditSnapshotByChangeId()).thenReturn(snapshot); - ExecutionPlan plan = planner.getNextExecution(Collections.singletonList(stage)); + ExecutionPlan plan = planner.getNextExecution(PipelineRun.of(Collections.singletonList(stage))); assertTrue(plan.isAborted()); verify(lockService, never()).upsert(any(), any(), anyLong()); @@ -94,7 +95,7 @@ void shouldAcquireLockAndReturnExecutionPlanWhenNoManualIntervention() { when(lockService.upsert(any(), any(), anyLong())) .thenReturn(new LockAcquisition(RunnerId.fromString("test-runner"), 60000L)); - ExecutionPlan plan = planner.getNextExecution(Collections.singletonList(stage)); + ExecutionPlan plan = planner.getNextExecution(PipelineRun.of(Collections.singletonList(stage))); assertFalse(plan.isAborted()); assertTrue(plan.isExecutionRequired()); @@ -111,7 +112,7 @@ void shouldReturnContinueWithoutLockWhenAllChangesApplied() { snapshot.put("change-1", buildAuditEntry("change-1", AuditEntry.Status.APPLIED, null)); when(auditReader.getAuditSnapshotByChangeId()).thenReturn(snapshot); - ExecutionPlan plan = planner.getNextExecution(Collections.singletonList(stage)); + ExecutionPlan plan = planner.getNextExecution(PipelineRun.of(Collections.singletonList(stage))); assertFalse(plan.isAborted()); assertFalse(plan.isExecutionRequired());