feat(map): Fix map replay, wait-inside-map, and concurrency race conditions#229
feat(map): Fix map replay, wait-inside-map, and concurrency race conditions#229ayushiahjolia wants to merge 4 commits intomainfrom
Conversation
cae0595 to
00210be
Compare
00210be to
14a215e
Compare
fde6a1b to
3c9974e
Compare
|
unit tests failed: This is due to the race condition introduced in the following change: Removing this change should fix the tests |
3c9974e to
bb907d9
Compare
|
When all map branches call wait(), every branch thread deregisters and suspendExecution() fires. But the parent thread sitting on completionFuture.join() in BaseConcurrentOperation.get() is stuck forever - no branch will ever complete to trigger onChildContextComplete and finalize the map's completionFuture. runUntilCompleteOrSuspend races completionFuture against executionExceptionFuture, so when suspension fires, the parent thread gets freed via SuspendExecutionException instead of blocking indefinitely. |
bb907d9 to
33e5548
Compare
| // so when all active threads suspend (e.g., wait inside map branches), the | ||
| // SuspendExecutionException propagates and this thread is freed — preventing thread leaks | ||
| // on shared executor pools across invocations. | ||
| executionManager.runUntilCompleteOrSuspend(completionFuture).join(); |
There was a problem hiding this comment.
This is not the correct behavior. This will unblock all the threads waiting for completion of this operation when suspension.
72e4167 to
d7339bc
Compare
| public R get() { | ||
| var op = waitForOperationCompletion(); | ||
| var executionManager = getContext().getExecutionManager(); | ||
| var threadContext = getCurrentThreadContext(); |
There was a problem hiding this comment.
why don't we just call waitForOpeartionCompletion here?
| public JacksonSerDes() { | ||
| this(new ObjectMapper() | ||
| .registerModule(new JavaTimeModule()) | ||
| .registerModule(new AwsSdkV2Module()) |
There was a problem hiding this comment.
Do not add this module to user's serDes. Why do we need this?
| sendOperationUpdateAsync( | ||
| OperationUpdate.builder().action(OperationAction.START).subType(subType.getValue())); | ||
| var parentThreadContext = getCurrentThreadContext(); | ||
| this.rootContext = getContext().createChildContext(getOperationId(), getName()); |
There was a problem hiding this comment.
Why do we initialize here?
|
|
||
| protected CompletionReason getCompletionReason() { | ||
| return completionReason; | ||
| synchronized (lock) { |
There was a problem hiding this comment.
Why do we need a lock to access a field? Same below
| } else if (op.status() == OperationStatus.STARTED) { | ||
| // All branches suspended (e.g., wait inside map branches) — propagate suspension. | ||
| // onChildContextSuspended completed completionFuture when activeBranches hit 0. | ||
| throw new SuspendExecutionException(); |
There was a problem hiding this comment.
When completionFuture is completed, the status can't be STARTED
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
Issue Link, if available
#39
Description
Demo/Screenshots
Checklist
Testing
Unit Tests
Have unit tests been written for these changes? N/A
Integration Tests
Have integration tests been written for these changes? Yes
Examples
Has a new example been added for the change? (if applicable) Yes