From 3d686e1f46ce0d9b377ad7510d9756c553939cf8 Mon Sep 17 00:00:00 2001 From: Antonio Perez Dieppa Date: Thu, 21 May 2026 12:42:20 +0100 Subject: [PATCH 1/2] refactor: add stageBlock to cloud API --- cloud/flamingock-cloud-api/build.gradle.kts | 2 + .../api/request/ClientSubmissionRequest.java | 29 ++-- .../api/request/ExecutionPlanRequest.java | 4 +- .../cloud/api/request/StageBlockRequest.java | 74 +++++++++ ...entSubmissionRequestSerializationTest.java | 156 ++++++++++++++++++ .../planner/CloudExecutionPlanMapper.java | 37 +++-- .../planner/CloudExecutionPlanMapperTest.java | 79 ++++++++- .../planner/CloudExecutionPlannerTest.java | 49 +++++- .../cloud/deprecated/MockRunnerServerOld.java | 7 +- .../cloud/mock/MockExecutionPlanBuilder.java | 9 +- 10 files changed, 413 insertions(+), 33 deletions(-) create mode 100644 cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/request/StageBlockRequest.java create mode 100644 cloud/flamingock-cloud-api/src/test/java/io/flamingock/cloud/api/request/ClientSubmissionRequestSerializationTest.java diff --git a/cloud/flamingock-cloud-api/build.gradle.kts b/cloud/flamingock-cloud-api/build.gradle.kts index 7edb2d7cf..6770e4e7c 100644 --- a/cloud/flamingock-cloud-api/build.gradle.kts +++ b/cloud/flamingock-cloud-api/build.gradle.kts @@ -4,8 +4,10 @@ dependencies { description = "Cloud Edition public API definitions" val coreApiVersion: String by extra +val jacksonVersion = "2.14.1" dependencies { api("io.flamingock:flamingock-core-api:${coreApiVersion}") + testImplementation("com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}") } java { diff --git a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/request/ClientSubmissionRequest.java b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/request/ClientSubmissionRequest.java index baf4452b4..034ec2cc1 100644 --- a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/request/ClientSubmissionRequest.java +++ b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/request/ClientSubmissionRequest.java @@ -17,22 +17,31 @@ import java.util.List; +/** + * Payload submitted by the client describing the pipeline state for an execution-plan request. + * Stages are grouped into {@link StageBlockRequest}s where the block list order conveys the + * dependency order — {@code blocks.get(0)} must complete before {@code blocks.get(1)} may run. + * + *

Block membership is owned by the client's {@code PipelineRun.getStageBlocks()}; the server + * consumes the list as-is, with no {@code StageType}-based regrouping. + */ public class ClientSubmissionRequest { - private List stages; + + private List blocks; public ClientSubmissionRequest() { } - public ClientSubmissionRequest(List stages) { - this.stages = stages; + public ClientSubmissionRequest(List blocks) { + this.blocks = blocks; } - public List getStages() { - return stages; + public List getBlocks() { + return blocks; } - public void setStages(List stages) { - this.stages = stages; + public void setBlocks(List blocks) { + this.blocks = blocks; } @Override @@ -40,11 +49,11 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ClientSubmissionRequest that = (ClientSubmissionRequest) o; - return java.util.Objects.equals(stages, that.stages); + return java.util.Objects.equals(blocks, that.blocks); } @Override public int hashCode() { - return java.util.Objects.hash(stages); + return java.util.Objects.hash(blocks); } -} \ No newline at end of file +} diff --git a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/request/ExecutionPlanRequest.java b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/request/ExecutionPlanRequest.java index 6c2a00cfd..2e48dd88b 100644 --- a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/request/ExecutionPlanRequest.java +++ b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/request/ExecutionPlanRequest.java @@ -25,9 +25,9 @@ public class ExecutionPlanRequest { public ExecutionPlanRequest() { } - public ExecutionPlanRequest(long lockAcquiredForMillis, List stages) { + public ExecutionPlanRequest(long lockAcquiredForMillis, List blocks) { this.lockAcquiredForMillis = lockAcquiredForMillis; - this.clientSubmission = new ClientSubmissionRequest(stages); + this.clientSubmission = new ClientSubmissionRequest(blocks); } public void setClientSubmission(ClientSubmissionRequest clientSubmission) { diff --git a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/request/StageBlockRequest.java b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/request/StageBlockRequest.java new file mode 100644 index 000000000..24442d4e7 --- /dev/null +++ b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/request/StageBlockRequest.java @@ -0,0 +1,74 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.cloud.api.request; + +import io.flamingock.api.StageType; + +import java.util.List; +import java.util.Objects; + +/** + * Structural unit of a cloud execution-plan submission: a block of stages that must complete + * before the next block in the {@code ClientSubmissionRequest.blocks} list may run. Block + * membership is owned by the client's {@code PipelineRun}; the server consumes the block list + * as-is and does NOT regroup stages by {@link #type}. + * + *

The {@code type} field is metadata (diagnostics, future use). Two blocks may share the + * same {@link StageType} — the server iterates the block list in order and never collapses by + * type. + */ +public class StageBlockRequest { + + private StageType type; + private List stages; + + public StageBlockRequest() { + } + + public StageBlockRequest(StageType type, List stages) { + this.type = type; + this.stages = stages; + } + + public StageType getType() { + return type; + } + + public void setType(StageType type) { + this.type = type; + } + + public List getStages() { + return stages; + } + + public void setStages(List stages) { + this.stages = stages; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StageBlockRequest that = (StageBlockRequest) o; + return type == that.type && Objects.equals(stages, that.stages); + } + + @Override + public int hashCode() { + return Objects.hash(type, stages); + } +} diff --git a/cloud/flamingock-cloud-api/src/test/java/io/flamingock/cloud/api/request/ClientSubmissionRequestSerializationTest.java b/cloud/flamingock-cloud-api/src/test/java/io/flamingock/cloud/api/request/ClientSubmissionRequestSerializationTest.java new file mode 100644 index 000000000..672cdfa0d --- /dev/null +++ b/cloud/flamingock-cloud-api/src/test/java/io/flamingock/cloud/api/request/ClientSubmissionRequestSerializationTest.java @@ -0,0 +1,156 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.cloud.api.request; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.flamingock.api.StageType; +import io.flamingock.cloud.api.vo.CloudStageStatus; +import io.flamingock.cloud.api.vo.CloudTargetSystemAuditMarkType; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Wire-contract serialization tests for {@link ClientSubmissionRequest}. Pins the JSON shape + * consumed by the cloud server. A divergence here is the canonical "wire mismatch" symptom. + */ +class ClientSubmissionRequestSerializationTest { + + // Match production mapper configuration (see JsonObjectMapper.DEFAULT_INSTANCE in + // flamingock-java-general-util): unknown properties are silently ignored on the wire. + private static final ObjectMapper MAPPER = new ObjectMapper() + .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + + @Test + @DisplayName("Serializes blocks in input order with type + stages") + void serializesBlocksWithTypeAndStages() throws Exception { + StageRequest sysStage = new StageRequest( + "system-stage", 0, CloudStageStatus.NOT_STARTED, + Collections.singletonList(new ChangeRequest("sys-c1", + CloudTargetSystemAuditMarkType.NONE, false))); + StageRequest userStage = new StageRequest( + "user-stage", 1, CloudStageStatus.NOT_STARTED, + Collections.singletonList(new ChangeRequest("user-c1", + CloudTargetSystemAuditMarkType.NONE, false))); + + ClientSubmissionRequest request = new ClientSubmissionRequest(Arrays.asList( + new StageBlockRequest(StageType.SYSTEM, Collections.singletonList(sysStage)), + new StageBlockRequest(StageType.DEFAULT, Collections.singletonList(userStage)))); + + JsonNode json = MAPPER.valueToTree(request); + + assertNotNull(json.get("blocks")); + assertEquals(2, json.get("blocks").size()); + + JsonNode block0 = json.get("blocks").get(0); + assertEquals("SYSTEM", block0.get("type").asText()); + assertEquals(1, block0.get("stages").size()); + assertEquals("system-stage", block0.get("stages").get(0).get("name").asText()); + + JsonNode block1 = json.get("blocks").get(1); + assertEquals("DEFAULT", block1.get("type").asText()); + assertEquals(1, block1.get("stages").size()); + assertEquals("user-stage", block1.get("stages").get(0).get("name").asText()); + } + + @Test + @DisplayName("Round-trips through Jackson preserving block order and contents") + void roundTripsPreservingBlockOrderAndContents() throws Exception { + ClientSubmissionRequest original = new ClientSubmissionRequest(Arrays.asList( + new StageBlockRequest(StageType.LEGACY, Collections.singletonList( + new StageRequest("legacy", 0, CloudStageStatus.COMPLETED, + Collections.singletonList(new ChangeRequest("legacy-c1", + CloudTargetSystemAuditMarkType.APPLIED, true))))), + new StageBlockRequest(StageType.DEFAULT, Arrays.asList( + new StageRequest("user-a", 1, CloudStageStatus.STARTED, + Collections.singletonList(new ChangeRequest("user-a-c1", + CloudTargetSystemAuditMarkType.NONE, false))), + new StageRequest("user-b", 2, CloudStageStatus.NOT_STARTED, + Collections.singletonList(new ChangeRequest("user-b-c1", + CloudTargetSystemAuditMarkType.NONE, false))))))); + + String json = MAPPER.writeValueAsString(original); + ClientSubmissionRequest deserialized = MAPPER.readValue(json, ClientSubmissionRequest.class); + + assertEquals(original, deserialized); + // Block order is significant and preserved verbatim. + assertEquals(2, deserialized.getBlocks().size()); + assertEquals(StageType.LEGACY, deserialized.getBlocks().get(0).getType()); + assertEquals(StageType.DEFAULT, deserialized.getBlocks().get(1).getType()); + assertEquals(2, deserialized.getBlocks().get(1).getStages().size()); + assertEquals("user-a", deserialized.getBlocks().get(1).getStages().get(0).getName()); + assertEquals("user-b", deserialized.getBlocks().get(1).getStages().get(1).getName()); + } + + @Test + @DisplayName("Same StageType repeated across multiple blocks is preserved on the wire (multi-block-same-type lock-in)") + void sameStageTypeAcrossMultipleBlocksIsPreserved() throws Exception { + StageRequest a = new StageRequest("user-a", 0, CloudStageStatus.NOT_STARTED, + Collections.singletonList(new ChangeRequest("a-c1", + CloudTargetSystemAuditMarkType.NONE, false))); + StageRequest b = new StageRequest("user-b", 1, CloudStageStatus.NOT_STARTED, + Collections.singletonList(new ChangeRequest("b-c1", + CloudTargetSystemAuditMarkType.NONE, false))); + + ClientSubmissionRequest request = new ClientSubmissionRequest(Arrays.asList( + new StageBlockRequest(StageType.DEFAULT, Collections.singletonList(a)), + new StageBlockRequest(StageType.DEFAULT, Collections.singletonList(b)))); + + String json = MAPPER.writeValueAsString(request); + ClientSubmissionRequest deserialized = MAPPER.readValue(json, ClientSubmissionRequest.class); + + // Two distinct blocks of the same StageType, NOT collapsed into one. + assertEquals(2, deserialized.getBlocks().size()); + assertEquals(StageType.DEFAULT, deserialized.getBlocks().get(0).getType()); + assertEquals(StageType.DEFAULT, deserialized.getBlocks().get(1).getType()); + assertEquals("user-a", deserialized.getBlocks().get(0).getStages().get(0).getName()); + assertEquals("user-b", deserialized.getBlocks().get(1).getStages().get(0).getName()); + } + + @Test + @DisplayName("Empty blocks list serializes and deserializes cleanly") + void emptyBlocksListRoundTrips() throws Exception { + ClientSubmissionRequest original = new ClientSubmissionRequest(Collections.emptyList()); + + String json = MAPPER.writeValueAsString(original); + ClientSubmissionRequest deserialized = MAPPER.readValue(json, ClientSubmissionRequest.class); + + assertNotNull(deserialized.getBlocks()); + assertTrue(deserialized.getBlocks().isEmpty()); + } + + @Test + @DisplayName("Old flat 'stages' field at top level is silently ignored (clean cut — no fallback)") + void oldFlatStagesFieldIsIgnored() throws Exception { + // Wire format from a hypothetical old client: top-level `stages` instead of `blocks`. + // The new server-side DTO has no `stages` getter/setter, so the field is dropped. + String oldFormat = "{\"stages\":[{\"name\":\"legacy-flat\",\"order\":0}]}"; + ClientSubmissionRequest deserialized = MAPPER.readValue(oldFormat, ClientSubmissionRequest.class); + + // No fallback — blocks is null (or empty) because the old field was ignored. + assertNull(deserialized.getBlocks()); + } +} diff --git a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanMapper.java b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanMapper.java index 739338cf2..2736aa40c 100644 --- a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanMapper.java +++ b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanMapper.java @@ -19,6 +19,7 @@ import io.flamingock.internal.util.TimeService; import io.flamingock.cloud.api.request.ExecutionPlanRequest; import io.flamingock.cloud.api.response.ExecutionPlanResponse; +import io.flamingock.cloud.api.request.StageBlockRequest; import io.flamingock.cloud.api.request.StageRequest; import io.flamingock.cloud.api.request.ChangeRequest; import io.flamingock.cloud.api.response.StageResponse; @@ -29,6 +30,7 @@ import io.flamingock.cloud.CloudApiMapper; import io.flamingock.internal.core.pipeline.run.PipelineRun; import io.flamingock.internal.core.pipeline.run.StageRun; +import io.flamingock.internal.core.pipeline.run.StageRunBlock; import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType; import io.flamingock.cloud.lock.CloudLockService; import io.flamingock.internal.core.configuration.core.CoreConfigurable; @@ -58,21 +60,30 @@ public static ExecutionPlanRequest toRequest(PipelineRun pipelineRun, long lockAcquiredForMillis, Map ongoingStatusesMap) { - List stageRuns = pipelineRun.getStageRuns(); - List requestStages = new ArrayList<>(stageRuns.size()); - for (int i = 0; i < stageRuns.size(); i++) { - StageRun stageRun = stageRuns.get(i); - AbstractLoadedStage currentStage = stageRun.getLoadedStage(); - List stageChanges = currentStage - .getChanges() - .stream() - .map(descriptor -> CloudExecutionPlanMapper.mapToChangeRequest(descriptor, ongoingStatusesMap)) - .collect(Collectors.toList()); - CloudStageStatus status = CloudApiMapper.toCloud(stageRun.getState()); - requestStages.add(new StageRequest(currentStage.getName(), i, status, stageChanges)); + // Walk the PipelineRun's block list verbatim — block grouping is owned by PipelineRun, + // not derived from StageType. Two blocks of the same StageType are preserved as two + // separate StageBlockRequests in input order. + List blocks = pipelineRun.getStageBlocks(); + List requestBlocks = new ArrayList<>(blocks.size()); + // Stage order index is global across the request — preserves the same ordering used + // before block-awareness (each stage's index in the flat run list). + int stageOrder = 0; + for (StageRunBlock block : blocks) { + List blockStages = new ArrayList<>(block.getStageRuns().size()); + for (StageRun stageRun : block.getStageRuns()) { + AbstractLoadedStage currentStage = stageRun.getLoadedStage(); + List stageChanges = currentStage + .getChanges() + .stream() + .map(descriptor -> CloudExecutionPlanMapper.mapToChangeRequest(descriptor, ongoingStatusesMap)) + .collect(Collectors.toList()); + CloudStageStatus status = CloudApiMapper.toCloud(stageRun.getState()); + blockStages.add(new StageRequest(currentStage.getName(), stageOrder++, status, stageChanges)); + } + requestBlocks.add(new StageBlockRequest(block.getType(), blockStages)); } - return new ExecutionPlanRequest(lockAcquiredForMillis, requestStages); + return new ExecutionPlanRequest(lockAcquiredForMillis, requestBlocks); } private static ChangeRequest mapToChangeRequest(AbstractLoadedChange descriptor, diff --git a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java index 5595a4cdf..a351aee2c 100644 --- a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java +++ b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java @@ -210,7 +210,7 @@ void shouldMapOngoingStatusFromAuditMarksToChangeRequests() { ExecutionPlanRequest request = CloudExecutionPlanMapper.toRequest( PipelineRun.of(loadedStages), 60000L, ongoingStatusesMap); - Map marksByChangeId = request.getClientSubmission().getStages().get(0).getChanges().stream() + Map marksByChangeId = request.getClientSubmission().getBlocks().get(0).getStages().get(0).getChanges().stream() .collect(Collectors.toMap(ChangeRequest::getId, ChangeRequest::getOngoingStatus)); assertEquals(CloudTargetSystemAuditMarkType.APPLIED, marksByChangeId.get(change1.getId())); @@ -243,10 +243,12 @@ void shouldMapPerStageStatusFromPipelineRun() { pipelineRun, 60000L, Collections.emptyMap()); // Plain loop because Collectors.toMap rejects null values; NOT_STARTED maps to null. + // All four stages are typed DEFAULT (via the buildStage helper), so they end up in a + // single block; flatten the block list to iterate them. Map statusByName = new HashMap<>(); - for (StageRequest stageRequest : request.getClientSubmission().getStages()) { - statusByName.put(stageRequest.getName(), stageRequest.getStatus()); - } + request.getClientSubmission().getBlocks().forEach(block -> + block.getStages().forEach(stage -> + statusByName.put(stage.getName(), stage.getStatus()))); // NOT_STARTED is encoded as null on the wire (back-compat: missing field == not started). assertNull(statusByName.get("stage-not-started")); @@ -255,6 +257,75 @@ void shouldMapPerStageStatusFromPipelineRun() { assertEquals(CloudStageStatus.BLOCKED_MANUAL_INTERVENTION, statusByName.get("stage-blocked")); } + @Test + @DisplayName("toRequest() emits one block per StageRunBlock in dependency order with the right type and stage contents") + void toRequestEmitsOneBlockPerStageRunBlock() { + // Three stages of three different types — PipelineRun.of(...) groups them into + // SYSTEM -> LEGACY -> DEFAULT blocks (one stage each). + AbstractLoadedStage systemStage = new DefaultLoadedStage("system-stage", StageType.SYSTEM, + Collections.singletonList(change1)); + AbstractLoadedStage legacyStage = new DefaultLoadedStage("legacy-stage", StageType.LEGACY, + Collections.singletonList(change1)); + AbstractLoadedStage userStage = new DefaultLoadedStage("user-stage", StageType.DEFAULT, + Collections.singletonList(change1)); + + PipelineRun pipelineRun = PipelineRun.of(Arrays.asList(userStage, systemStage, legacyStage)); + + ExecutionPlanRequest request = CloudExecutionPlanMapper.toRequest( + pipelineRun, 60000L, Collections.emptyMap()); + + // Block order is dependency order, not input order: SYSTEM -> LEGACY -> DEFAULT. + List blocks = + request.getClientSubmission().getBlocks(); + assertEquals(3, blocks.size()); + assertEquals(StageType.SYSTEM, blocks.get(0).getType()); + assertEquals(1, blocks.get(0).getStages().size()); + assertEquals("system-stage", blocks.get(0).getStages().get(0).getName()); + assertEquals(StageType.LEGACY, blocks.get(1).getType()); + assertEquals("legacy-stage", blocks.get(1).getStages().get(0).getName()); + assertEquals(StageType.DEFAULT, blocks.get(2).getType()); + assertEquals("user-stage", blocks.get(2).getStages().get(0).getName()); + } + + @Test + @DisplayName("toRequest() preserves the global stage order index across blocks") + void toRequestPreservesGlobalStageOrderAcrossBlocks() { + // Two stages in the SYSTEM block, then one in DEFAULT — verify the per-stage `order` + // field increments globally across the flattened block sequence. + AbstractLoadedStage system1 = new DefaultLoadedStage("system-1", StageType.SYSTEM, + Collections.singletonList(change1)); + AbstractLoadedStage system2 = new DefaultLoadedStage("system-2", StageType.SYSTEM, + Collections.singletonList(change2)); + AbstractLoadedStage user1 = new DefaultLoadedStage("user-1", StageType.DEFAULT, + Collections.singletonList(change1)); + + PipelineRun pipelineRun = PipelineRun.of(Arrays.asList(system1, system2, user1)); + + ExecutionPlanRequest request = CloudExecutionPlanMapper.toRequest( + pipelineRun, 60000L, Collections.emptyMap()); + + List blocks = + request.getClientSubmission().getBlocks(); + assertEquals(2, blocks.size()); + assertEquals(StageType.SYSTEM, blocks.get(0).getType()); + assertEquals(0, blocks.get(0).getStages().get(0).getOrder()); + assertEquals(1, blocks.get(0).getStages().get(1).getOrder()); + assertEquals(StageType.DEFAULT, blocks.get(1).getType()); + assertEquals(2, blocks.get(1).getStages().get(0).getOrder()); + } + + @Test + @DisplayName("toRequest() produces an empty blocks list when the PipelineRun has no stages") + void toRequestEmptyPipelineRunYieldsEmptyBlocks() { + PipelineRun pipelineRun = PipelineRun.of(Collections.emptyList()); + + ExecutionPlanRequest request = CloudExecutionPlanMapper.toRequest( + pipelineRun, 60000L, Collections.emptyMap()); + + assertNotNull(request.getClientSubmission().getBlocks()); + assertTrue(request.getClientSubmission().getBlocks().isEmpty()); + } + private static DefaultLoadedStage buildStage(String name, AbstractLoadedChange... changes) { return new DefaultLoadedStage(name, StageType.DEFAULT, Arrays.asList(changes)); } diff --git a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java index bbe3cbb6b..94468dd15 100644 --- a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java +++ b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java @@ -95,6 +95,51 @@ private CloudExecutionPlanner buildPlanner(List auditMa ); } + @Test + @DisplayName("Should send the multi-block PipelineRun on the wire, preserving block order and per-block stages") + void shouldSendMultiBlockPipelineRunOnTheWire() { + CloudExecutionPlanner planner = buildPlanner(Collections.emptyList()); + + // Stub the server to return CONTINUE so the planner doesn't loop or try to acquire a lock. + ExecutionPlanResponse continueResponse = new ExecutionPlanResponse(); + continueResponse.setAction(CloudExecutionAction.CONTINUE); + when(client.createExecution(any(), any(), anyLong())).thenReturn(continueResponse); + + // Build a PipelineRun with three blocks: SYSTEM (1 stage), LEGACY (1 stage), DEFAULT (2 stages). + AbstractLoadedStage systemStage = new DefaultLoadedStage("system-stage", StageType.SYSTEM, + Collections.singletonList(change1)); + AbstractLoadedStage legacyStage = new DefaultLoadedStage("legacy-stage", StageType.LEGACY, + Collections.singletonList(change1)); + AbstractLoadedStage userStageA = new DefaultLoadedStage("user-a", StageType.DEFAULT, + Collections.singletonList(change1)); + AbstractLoadedStage userStageB = new DefaultLoadedStage("user-b", StageType.DEFAULT, + Collections.singletonList(change2)); + + planner.getNextExecution(PipelineRun.of(Arrays.asList(systemStage, legacyStage, userStageA, userStageB))); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(ExecutionPlanRequest.class); + verify(client).createExecution(requestCaptor.capture(), any(), anyLong()); + + List blocks = + requestCaptor.getValue().getClientSubmission().getBlocks(); + + // Three blocks, in dependency order. + assertEquals(3, blocks.size()); + assertEquals(StageType.SYSTEM, blocks.get(0).getType()); + assertEquals(1, blocks.get(0).getStages().size()); + assertEquals("system-stage", blocks.get(0).getStages().get(0).getName()); + + assertEquals(StageType.LEGACY, blocks.get(1).getType()); + assertEquals(1, blocks.get(1).getStages().size()); + assertEquals("legacy-stage", blocks.get(1).getStages().get(0).getName()); + + // DEFAULT block contains both user stages, in input order. + assertEquals(StageType.DEFAULT, blocks.get(2).getType()); + assertEquals(2, blocks.get(2).getStages().size()); + assertEquals("user-a", blocks.get(2).getStages().get(0).getName()); + assertEquals("user-b", blocks.get(2).getStages().get(1).getName()); + } + @Test @DisplayName("Should return ABORT plan when server returns ABORT (regardless of change actions)") void shouldReturnAbortPlanWhenServerReturnsAbort() { @@ -170,7 +215,7 @@ void shouldIncludeAuditMarksInExecutionRequest() { verify(client).createExecution(requestCaptor.capture(), any(), anyLong()); ExecutionPlanRequest request = requestCaptor.getValue(); - Map marksByChangeId = request.getClientSubmission().getStages().get(0).getChanges().stream() + Map marksByChangeId = request.getClientSubmission().getBlocks().get(0).getStages().get(0).getChanges().stream() .collect(Collectors.toMap(ChangeRequest::getId, ChangeRequest::getOngoingStatus)); assertEquals(CloudTargetSystemAuditMarkType.APPLIED, marksByChangeId.get(change1.getId())); @@ -197,7 +242,7 @@ void shouldSendNoneStatusWhenNoMarks() { ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(ExecutionPlanRequest.class); verify(client).createExecution(requestCaptor.capture(), any(), anyLong()); - ChangeRequest changeRequest = requestCaptor.getValue().getClientSubmission().getStages().get(0).getChanges().get(0); + ChangeRequest changeRequest = requestCaptor.getValue().getClientSubmission().getBlocks().get(0).getStages().get(0).getChanges().get(0); assertEquals(CloudTargetSystemAuditMarkType.NONE, changeRequest.getOngoingStatus()); } diff --git a/utils/test-util/src/main/java/io/flamingock/common/test/cloud/deprecated/MockRunnerServerOld.java b/utils/test-util/src/main/java/io/flamingock/common/test/cloud/deprecated/MockRunnerServerOld.java index 8afaba6b1..3bdf97736 100644 --- a/utils/test-util/src/main/java/io/flamingock/common/test/cloud/deprecated/MockRunnerServerOld.java +++ b/utils/test-util/src/main/java/io/flamingock/common/test/cloud/deprecated/MockRunnerServerOld.java @@ -403,7 +403,12 @@ private void mockReleaseLockEndpoint() { private ExecutionPlanRequest getExecutionPlanRequest(int index) { ExecutionPlanRequestResponse executionPlanRequestResponse = executionRequestResponses.get(index); List stages = executionExpectation != null ? executionExpectation.getStageRequest() : Collections.emptyList(); - return new ExecutionPlanRequest(executionPlanRequestResponse.getAcquiredForMillis(), stages); + // Deprecated mock — wrap the flat stages in a single DEFAULT block to satisfy the new + // wire shape. Existing deprecated-flow tests preserve behaviour; multi-block tests use + // MockExecutionPlanBuilder / MockRunnerServer directly. + List blocks = Collections.singletonList( + new io.flamingock.cloud.api.request.StageBlockRequest(io.flamingock.api.StageType.DEFAULT, stages)); + return new ExecutionPlanRequest(executionPlanRequestResponse.getAcquiredForMillis(), blocks); } private ExecutionPlanResponse getExecutionPlanResponse(int index) { diff --git a/utils/test-util/src/main/java/io/flamingock/common/test/cloud/mock/MockExecutionPlanBuilder.java b/utils/test-util/src/main/java/io/flamingock/common/test/cloud/mock/MockExecutionPlanBuilder.java index dd188fbad..eb724dca3 100644 --- a/utils/test-util/src/main/java/io/flamingock/common/test/cloud/mock/MockExecutionPlanBuilder.java +++ b/utils/test-util/src/main/java/io/flamingock/common/test/cloud/mock/MockExecutionPlanBuilder.java @@ -20,7 +20,9 @@ import io.flamingock.common.test.cloud.execution.ExecutionAwaitRequestResponseMock; import io.flamingock.common.test.cloud.execution.ExecutionBaseRequestResponseMock; import io.flamingock.common.test.cloud.execution.ExecutionPlanRequestResponseMock; +import io.flamingock.api.StageType; import io.flamingock.cloud.api.request.ExecutionPlanRequest; +import io.flamingock.cloud.api.request.StageBlockRequest; import io.flamingock.cloud.api.request.StageRequest; import io.flamingock.cloud.api.request.ChangeRequest; import io.flamingock.cloud.api.response.ExecutionPlanResponse; @@ -61,7 +63,12 @@ public ExecutionPlanRequest getRequest(ExecutionBaseRequestResponseMock requestR transformChangeRequests(stagePrototype.getChanges(), requestResponse)) ).collect(Collectors.toList()); - return new ExecutionPlanRequest(requestResponse.getAcquiredForMillis(), stages); + // Wrap the prototype's flat stages into a single DEFAULT block. Existing tests that + // don't model block structure preserve their behaviour; tests that need multi-block + // scenarios should construct StageBlockRequest lists directly. + List blocks = java.util.Collections.singletonList( + new StageBlockRequest(StageType.DEFAULT, stages)); + return new ExecutionPlanRequest(requestResponse.getAcquiredForMillis(), blocks); } public ExecutionPlanResponse getResponse(ExecutionBaseRequestResponseMock mockRequestResponse) { From e6cbc0819695f0904d06ecd1fb840652c8f7b279 Mon Sep 17 00:00:00 2001 From: Antonio Perez Dieppa Date: Thu, 21 May 2026 13:24:22 +0100 Subject: [PATCH 2/2] refactor: add stageBlock to cloud API --- build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle.kts b/build.gradle.kts index d1ab05f32..ddabb0e83 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -19,7 +19,7 @@ plugins { allprojects { group = "io.flamingock" - val declaredVersion = "1.3.0-SNAPSHOT" + val declaredVersion = "1.4.0-SNAPSHOT" version = VersionManager.resolveVersion(declaredVersion, project.hasProperty("release")) extra["generalUtilVersion"] = "1.5.3"