-
Notifications
You must be signed in to change notification settings - Fork 594
HDDS-14571. Remove synchronized methods from XceiverClientGrpc #9718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
HDDS-14571. Remove synchronized methods from XceiverClientGrpc #9718
Conversation
… channel termination.
…channel termination handling.
…cleaner channel termination handling." This reverts commit 7b6b0ee.
…onized modifier from close method.
…ronized methods in XceiverClientGrpc.
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
Show resolved
Hide resolved
| } | ||
| LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ", dn, pipeline.getNodes()); | ||
|
|
||
| ManagedChannel channel = createChannel(dn, port).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather do something like
ManagedChannel channel = channels.computeIfAbsent(dn.getId(), createChannel(dn, port).build());
| XceiverClientProtocolServiceGrpc.newStub(channel); | ||
| asyncStubs.put(dn.getID(), asyncStub); | ||
| channels.put(dn.getID(), channel); | ||
| XceiverClientProtocolServiceStub asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And
asyncStubs.computeIfAbsent(dn.getID(), XceiverClientProtocolServiceGrpc.newStub(channel));
| asyncStubs.put(dn.getID(), asyncStub); | ||
| channels.put(dn.getID(), channel); | ||
| XceiverClientProtocolServiceStub asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel); | ||
| synchronized (this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this block would not be needed if we use atomic map operation in comments above,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be atomic if we want to use it to check the state without synchronized block
|
@yandrey321 please try to avoid "Add single comment", rather "Start review" and submit all comments in a single batch via "Review changes" |
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
Show resolved
Hide resolved
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
Show resolved
Hide resolved
…o improve channel management and reduce synchronization.
… state management in XceiverClientGrpc.
|
Thanks for the review @yandrey321. I have updated the PR. Could you please take a look again? |
| asyncStubs.put(dn.getID(), asyncStub); | ||
| channels.put(dn.getID(), channel); | ||
|
|
||
| asyncStubs.computeIfAbsent(dn.getID(), dnId -> XceiverClientProtocolServiceGrpc.newStub(channel)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to remove stub if associated channel was terminated otherwise here the old stub would be reused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The async stub is being cleared above. Won't that be sufficient?
channels.computeIfPresent(dn.getID(), (dnId, channel) -> {
if (channel.isTerminated() || channel.isShutdown()) {
asyncStubs.remove(dnId); // clearing the stubs here
return null; // removes from channels map
}
return channel;
});There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missed that part
| .collect(Collectors.toSet()); | ||
| LOG.warn("Channels {} did not terminate within timeout.", failedChannels); | ||
|
|
||
| channels.keySet().removeIf(e -> !failedChannels.contains(e)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to wipe channels and stubs from the map even if they timed out on shutdown. We dont have retry mechanism to close them afterwards and we cannot reuse channels after calling shutdown on them.
…nel termination and improve log clarity.
yandrey321
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
What changes were proposed in this pull request?
This PR originates from https://issues.apache.org/jira/browse/HDDS-14454 refactors the code to use ConcurrentHashMaps and remove the synchronized methods that are used in XceiverClientGrpc
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-14571
How was this patch tested?
CI: https://github.com/ptlrs/ozone/actions/runs/21737324867