Skip to content

feat(map): Fix map replay, wait-inside-map, and concurrency race conditions#229

Open
ayushiahjolia wants to merge 4 commits intomainfrom
map_bug_fixes
Open

feat(map): Fix map replay, wait-inside-map, and concurrency race conditions#229
ayushiahjolia wants to merge 4 commits intomainfrom
map_bug_fixes

Conversation

@ayushiahjolia
Copy link
Contributor

@ayushiahjolia ayushiahjolia commented Mar 18, 2026

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

Issue Link, if available

#39

Description

  • Fix BaseConcurrentOperation to properly handle suspension when branches call wait(), add synchronized concurrency control, and move parent checkpoint from get() into the last-completing-branch callback.
  • Convert MapResult/MapResultItem to records with ErrorObject for checkpoint-replay durability. Remove throws Exception from MapFunction.
  • Added more integration tests.

Demo/Screenshots

Screenshot 2026-03-18 at 2 15 30 PM

Checklist

  • I have filled out every section of the PR template
  • I have thoroughly tested this change

Testing

Unit Tests

Have unit tests been written for these changes? N/A

Integration Tests

Have integration tests been written for these changes? Yes

Examples

Has a new example been added for the change? (if applicable) Yes

@ayushiahjolia ayushiahjolia marked this pull request as ready for review March 18, 2026 22:08
@ayushiahjolia ayushiahjolia requested a review from a team March 18, 2026 22:08
@zhongkechen
Copy link
Contributor

zhongkechen commented Mar 18, 2026

unit tests failed:

[ERROR] Errors: 
[ERROR]   ParallelOperationTest.branchCreation_multipleBranchesAllCreated:106 » IllegalState Cannot add items to a completed operation

This is due to the race condition introduced in the following change:

-        // Block until operation completes. No-op if the future is already completed.
-        completionFuture.join();
+        // Block until operation completes or execution suspends.
+        // Using runUntilCompleteOrSuspend races completionFuture against executionExceptionFuture,
+        // so when all active threads suspend (e.g., wait inside map branches), the
+        // SuspendExecutionException propagates and this thread is freed — preventing thread leaks
+        // on shared executor pools across invocations.
+        executionManager.runUntilCompleteOrSuspend(completionFuture).join();

Removing this change should fix the tests

@ayushiahjolia
Copy link
Contributor Author

ayushiahjolia commented Mar 19, 2026

When all map branches call wait(), every branch thread deregisters and suspendExecution() fires. But the parent thread sitting on completionFuture.join() in BaseConcurrentOperation.get() is stuck forever - no branch will ever complete to trigger onChildContextComplete and finalize the map's completionFuture. runUntilCompleteOrSuspend races completionFuture against executionExceptionFuture, so when suspension fires, the parent thread gets freed via SuspendExecutionException instead of blocking indefinitely.

// so when all active threads suspend (e.g., wait inside map branches), the
// SuspendExecutionException propagates and this thread is freed — preventing thread leaks
// on shared executor pools across invocations.
executionManager.runUntilCompleteOrSuspend(completionFuture).join();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the correct behavior. This will unblock all the threads waiting for completion of this operation when suspension.

public R get() {
var op = waitForOperationCompletion();
var executionManager = getContext().getExecutionManager();
var threadContext = getCurrentThreadContext();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we just call waitForOpeartionCompletion here?

public JacksonSerDes() {
this(new ObjectMapper()
.registerModule(new JavaTimeModule())
.registerModule(new AwsSdkV2Module())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not add this module to user's serDes. Why do we need this?

sendOperationUpdateAsync(
OperationUpdate.builder().action(OperationAction.START).subType(subType.getValue()));
var parentThreadContext = getCurrentThreadContext();
this.rootContext = getContext().createChildContext(getOperationId(), getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we initialize here?


protected CompletionReason getCompletionReason() {
return completionReason;
synchronized (lock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a lock to access a field? Same below

} else if (op.status() == OperationStatus.STARTED) {
// All branches suspended (e.g., wait inside map branches) — propagate suspension.
// onChildContextSuspended completed completionFuture when activeBranches hit 0.
throw new SuspendExecutionException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When completionFuture is completed, the status can't be STARTED

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants