-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Add gRPC outbound gateway for client calls #10684
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
artembilan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed reference to the respective issue: #10622.
From branch name to the Fixes: sentence in the commit message.
4e4d0ae to
6d8595c
Compare
artembilan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good start.
Feel free to reach me offline for any clarifications.
Thank you!
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Outdated
Show resolved
Hide resolved
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Outdated
Show resolved
Hide resolved
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Outdated
Show resolved
Hide resolved
a4de223 to
0611e36
Compare
artembilan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks promising!
Thanks
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Show resolved
Hide resolved
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Show resolved
Hide resolved
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Outdated
Show resolved
Hide resolved
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Outdated
Show resolved
Hide resolved
| // Sends a greeting | ||
| rpc SayHello(HelloRequest) returns (HelloReply) {} | ||
|
|
||
| } No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think new line is still has to be in the end even in this file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was not addressed.
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Outdated
Show resolved
Hide resolved
artembilan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are very close.
Probably after the next commit, I'll pull it for a local final review.
Thanks
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Show resolved
Hide resolved
| */ | ||
| public class GrpcOutboundGateway extends AbstractReplyProducingMessageHandler { | ||
|
|
||
| private final FunctionExpression<Message<?>> defaultFunctionExpression = new FunctionExpression<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why cannot we rely on the boolean methodNameExpressionSet instead?
And have this new FunctionExpression as default value for the methodNameExpression.
| } | ||
|
|
||
| @Override | ||
| protected void doInit() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is confusing method order.
I'd prefer to have setters just after constructor and then init methods.
| * Set an {@link Expression} to resolve the gRPC method name at runtime. | ||
| * <p> | ||
| * If not provided, the gateway will auto-detect the method when the service | ||
| * descriptor contains exactly one method. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, also mention here some way that the default expression is against GrpcHeaders.SERVICE_METHOD.
| } | ||
|
|
||
| /** | ||
| * Set the method name of the gRPC method. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some tautology is in this sentence:
Set the name of the gRPC method to call.
?
| === Attribute Expressions | ||
|
|
||
| Set the CloudEvents' attributes of `id`, `source`, `type`, `dataSchema`, and `subject` through SpEL expressions. | ||
| Set the CloudEvents attributes of `id`, `source`, `type`, `dataSchema`, and `subject` through SpEL expressions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what CloudEvents changes are doing in this PR, but that's OK.
How about to use a singular: Set the CloudEvent attributes ?
With plural it does not sound.
...org/springframework/integration/grpc/outbound/GrpcClientOutboundGatewayMultiMethodTests.java
Show resolved
Hide resolved
| } | ||
|
|
||
| @Bean | ||
| ManagedChannel channel() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DITTO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's discuss. I'm unclear on the issue.
| ** `Collection<RequestType>` | ||
| ** Single `RequestType` object | ||
| + | ||
| Client streaming methods return a `Mono<ResponseType>` with the single aggregated response, while bidirectional streaming methods return a `Flux<ResponseType>` with multiple responses. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think its correct to talk about an aggregation from the gateway perspective.
That is just gRPC prtocol nuance what exactly is returned for the client streaming call.
Not a fact that a single result is an aggregation of request.
Just summary?
I mean this sentence should be a bit generic and don't mislead end-users for something what our gateway doesn't do.
| GrpcOutboundGateway gateway = new GrpcOutboundGateway(channel, TestHelloWorldGrpc.class); | ||
| // Method name determined from GrpcHeaders.SERVICE_METHOD header at runtime | ||
| gateway.setMethodNameExpression(new FunctionExpression<>( | ||
| msg -> msg.getHeaders().get(GrpcHeaders.SERVICE_METHOD, String.class))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually a default expression.
Please, consider to mention that in the first item where you talk about auto-detection.
And here use some other expression, e.g. payload.class.simpleName
Implement `GrpcOutboundGateway` to bridge Spring Integration messaging with gRPC client invocations. This component supports four gRPC communication patterns: - Unary: single request/single response - Server streaming: single request/multiple responses - Client streaming: multiple requests/single response - Bidirectional streaming: multiple requests/multiple responses The implementation includes two main components: `GrpcOutboundGateway`: Extends `AbstractReplyProducingMessageHandler` to provide request-reply semantics for gRPC clients. Features configurable async timeout (defaults to 10 seconds) and automatic method type detection based on the gRPC `MethodDescriptor`. The gateway automatically determines the invocation strategy: blocking calls for unary operations and reactive Flux-based responses for streaming operations. For streaming patterns, the component uses Project Reactor's `Sinks.Many` to bridge gRPC's `StreamObserver` callback model with Spring Integration's reactive message handling. Build configuration updated to include necessary gRPC API dependencies for both compilation and testing. Reference documentation extended with examples demonstrating all four communication patterns and configuration options. What's New section updated to highlight this new feature. Fixes spring-projects#10622
Simplify the `GrpcOutboundGateway` constructor to require only channel and service class, making method name configuration optional. The gateway now auto-detects the method name when the service descriptor contains exactly one method, improving developer experience for single-method services. Key changes: Constructor simplified from three parameters (channel, service class, method expression) to two (channel, service class). Method name can now be configured via `setMethodNameExpression()` setter or omitted entirely for auto-detection. CLIENT_STREAMING pattern now correctly returns `Mono` instead of `Flux`, reflecting gRPC semantics where multiple requests produce a single aggregated response. New `invokeClientStreaming()` method uses `Sinks.One` for proper single-value handling. Test suite expanded with `GrpcClientOutboundGatewaySingleMethodTests` demonstrating auto-detection capability. Existing tests reorganized into `GrpcClientOutboundGatewayMultiMethodTests` and enhanced with CLIENT_STREAMING verification. New test proto file added for single-method service testing. Documentation updated to reflect simplified API, auto-detection feature, and correct return types for each gRPC pattern. Method name configuration section added with examples for explicit, dynamic, and auto-detected scenarios. Fix grammar in the grpc adoc rebased
Implement runtime method name resolution in `GrpcOutboundGateway` to support flexible gRPC invocation patterns. The gateway now evaluates method names dynamically per request instead of at initialization. Key changes: * Replace static method resolution with per-request evaluation * Add `setMethodName` setter for literal method name configuration * Support `GrpcHeaders.SERVICE_METHOD` header for method specification * Extend client streaming to accept Mono, POJO, Stream, and Collection * Separate streaming methods (server vs bidirectional) into distinct handlers * Convert logger calls to lambda expressions for deferred evaluation * Add javadoc for request/response patterns by method type
* Simplify method name resolution by tracking whether the expression was explicitly set. This allows proper auto-detection when only one method exists in the service descriptor. * Reorder private methods to group related functionality together. Move `invokeBidirectionalStreaming()` before `invokeServerStreaming()` and relocate `getMethodDescriptor()` closer to the main handler method. * Add `@SuppressWarnings` for unavoidable unchecked casts and raw types in dynamic method invocation paths. * Update test configurations to use shared `TestInProcessConfiguration` and `@EnableIntegration` annotation. * Remove CloudEvents documentation update. * Update gRPC documentation to clarify method resolution behavior and change the Expression example to a SpelExpression vs. the default expression. * Add array payload support for gRPC streaming * Add test coverage for client streaming methods that accept array payloads. * The new test verifies that an array of `HelloRequest` objects can be passed to a CLIENT_STREAMING method and properly converted to a Flux for transmission.
c315573 to
9a74cd8
Compare
artembilan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great!
thank you !
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Outdated
Show resolved
Hide resolved
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Outdated
Show resolved
Hide resolved
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Outdated
Show resolved
Hide resolved
| * <ol> | ||
| * <li>Evaluates the {@code methodNameExpression} to determine which gRPC method to invoke. | ||
| * The method name can be resolved from the message headers (via {@link org.springframework.integration.grpc.GrpcHeaders#SERVICE_METHOD}), | ||
| * a literal expression, or a custom expression.</li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was not addressed
| * <li><b>Client Streaming:</b> Returns a {@link Mono} with the single response</li> | ||
| * <li><b>Bidirectional Streaming:</b> Returns a {@link Flux} of response messages</li> | ||
| * </ul> | ||
| * @param requestMessage the message containing the gRPC request payload. The payload type depends on the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Outdated
Show resolved
Hide resolved
...rg/springframework/integration/grpc/outbound/GrpcClientOutboundGatewaySingleMethodTests.java
Outdated
Show resolved
Hide resolved
...org/springframework/integration/grpc/outbound/GrpcClientOutboundGatewayMultiMethodTests.java
Outdated
Show resolved
Hide resolved
...org/springframework/integration/grpc/outbound/GrpcClientOutboundGatewayMultiMethodTests.java
Outdated
Show resolved
Hide resolved
...rg/springframework/integration/grpc/outbound/GrpcClientOutboundGatewaySingleMethodTests.java
Outdated
Show resolved
Hide resolved
* Change UNARY gRPC invocations from blocking `blockingUnaryCall()` to async `asyncUnaryCall()` for consistency. All method types now return reactive types: `Mono` for UNARY and CLIENT_STREAMING, `Flux` for SERVER_STREAMING and BIDI_STREAMING. * Extract `createSingleSinkResponseObserver()` helper method to reduce code duplication between UNARY and CLIENT_STREAMING implementations. * Simplify `setMethodName()` to delegate to `setMethodNameExpression()` instead of duplicating flag-setting logic. * Replace `Iterator` usage with `List.get(0)` * Replace switch expression in `invokeStreamingCall()` with if-else r * Update tests to handle `Mono` return type from UNARY methods. Remove unused imports and simplify test helper methods. * Update reference documentation to reflect that all method types return reactive types instead of UNARY returning values directly.
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Outdated
Show resolved
Hide resolved
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Outdated
Show resolved
Hide resolved
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Show resolved
Hide resolved
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Outdated
Show resolved
Hide resolved
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Show resolved
Hide resolved
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Outdated
Show resolved
Hide resolved
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Outdated
Show resolved
Hide resolved
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Outdated
Show resolved
Hide resolved
|
|
||
| Object request = requestMessage.getPayload(); | ||
|
|
||
| this.logger.debug(() -> "Invoking gRPC method '" + methodDescriptor.getBareMethodName() + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the whole methodDescriptor here?
Perhaps that one includes a service name, too not just method.
I mean if we have several GrpcOutboundGateway against different gRPC services, it might be confusing just to see a method name in logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was not addressed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. We can live with that.
I just expected something like:
this.logger.debug(() -> "Invoking gRPC method '" + methodDescriptor + "' for request: " + request
But if you think that method name and service is enough, then OK.
Although, it looks like we could use then getFullMethodName() instead of that pair.
...on-grpc/src/main/java/org/springframework/integration/grpc/outbound/GrpcOutboundGateway.java
Outdated
Show resolved
Hide resolved
* Add `isAsync()` configuration option to control whether UNARY gRPC methods return `Mono` or direct response objects. When async mode is enabled, unary calls use `asyncUnaryCall()` and return `Mono<T>`. When disabled (default), unary calls use `blockingUnaryCall()` and return the response object directly. * Restore `invokeBlocking()` method for synchronous unary invocations to maintain backward compatibility with existing blocking behavior. * Add service name to debug logging for better traceability when invoking gRPC methods. * Simplify `invokeStreamingCall()` to use early return pattern instead of intermediate variable assignment. * Fix `createSingleSinkResponseObserver()` to call `tryEmitEmpty()` on completion for proper `Mono` handling. * Update Javadoc to document dual return type behavior for UNARY methods based on async configuration. Clarify method name resolution fallback logic in `setMethodName()` Javadoc. * Add test coverage for both async and blocking modes. Split single method test into separate async and blocking test cases. Add new test for async unary calls in multi-method test suite. * Update asciidoc to establish default mode of gateway is asynchronous * Update asciidoc to discuss Unary returns mono by default and result object if asnyc is set to false
artembilan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a bonus I think we need an integration test involving flows for both server and client based on our both gateways.
| this.grpcServiceClass = grpcServiceClass; | ||
| Method getServiceDescriptor = ClassUtils.getMethod(this.grpcServiceClass, "getServiceDescriptor"); | ||
| this.serviceDescriptor = (ServiceDescriptor) Objects.requireNonNull(ReflectionUtils.invokeMethod(getServiceDescriptor, null)); | ||
| this.setAsync(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No this. on method calls.
And we need to state in the class Javadocs that this gateway is an async by default.
|
|
||
| Object request = requestMessage.getPayload(); | ||
|
|
||
| this.logger.debug(() -> "Invoking gRPC method '" + methodDescriptor.getBareMethodName() + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was not addressed.
| return responseSink.asFlux(); | ||
| } | ||
|
|
||
| @SuppressWarnings({"NullAway", "unchecked", "rawtypes"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably need to revise this suppressions.
|
|
||
| }; | ||
|
|
||
| ClientCall call = this.channel.newCall(methodDescriptor, CallOptions.DEFAULT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we need to introduce a setter for custom CallOptions
| return responseSink.asMono(); | ||
| } | ||
|
|
||
| @SuppressWarnings({"NullAway", "unchecked"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DITTO
| } | ||
| ---- | ||
|
|
||
| 3. **Dynamic resolution** from message headers using `setMethodNameExpression()`: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not reflect reality of the target expression.
However I agree that we need one more item to mention the default expression when many methods in the service.
| ---- | ||
| + | ||
| If no `methodName` or `methodNameExpression` is configured and there is only one method offered by the service then, that method's name will be used. | ||
| If no `methodName` or `methodNameExpression` is configured and the service has more than one method, the gateway uses the `GrpcHeaders.SERVICE_METHOD` header to determine which method to invoke. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think both of these sentences are not required after all of those list items.
Especially, when we also move GrpcHeaders.SERVICE_METHOD default expression explanation over there into its own item.
|
|
||
| The `GrpcOutboundGateway` automatically detects the method type from the `MethodDescriptor` and handles the invocation appropriately: | ||
|
|
||
| * **Unary** methods accept a single gRPC request message , returning a `Mono<ResponseType>` by default containing the response object. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think comma needs to stick to the word it is used after.
| The `GrpcOutboundGateway` automatically detects the method type from the `MethodDescriptor` and handles the invocation appropriately: | ||
|
|
||
| * **Unary** methods accept a single gRPC request message , returning a `Mono<ResponseType>` by default containing the response object. | ||
| If `async` is set to `true` then the response object is returned. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is something not true, since gateway produces a Mono in case of async.
I guess both these sentences need some rewording to explain behavior for async flag.
|
Provide a sample of what you would like to see for this comment. The update contains method and service name. |
Enhance the gRPC outbound gateway to support configurable call options: - Add `setCallOptions()` method with default `CallOptions.DEFAULT` - Apply call options to all RPC invocations (unary, streaming, bidi) - Improve debug logging to show full method descriptor - Remove unnecessary `NullAway` suppressions Refactor test organization and improve test naming: - Move inner classes to be nested within configuration classes - remove `setupGateway()` - Make validation methods static for better test structure - Rename test methods to remove "Proto" suffix Update documentation to clarify: - Default async mode is `true` for the gateway - Method resolution fallback using `GrpcHeaders.SERVICE_METHOD` header - Fix internal link to use xref notation - Document array support for client streaming requests
| + | ||
| If no `methodName` or `methodNameExpression` is configured and there is only one method offered by the service then, that method's name will be used. | ||
| If no `methodName` or `methodNameExpression` is configured and the service has more than one method, the gateway uses the `GrpcHeaders.SERVICE_METHOD` header to determine which method to invoke. | ||
| 4. **Default method resolution** if method name or method name expression is configured and there are multiple methods offered by the service then, the gateway uses the `GrpcHeaders.SERVICE_METHOD` header to determine which method to invoke. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think not is missed between is and configured.
I know I'm not good with English but that comma after then and before the looks suspicious.
I would expand the sentence a bit:
uses the `GrpcHeaders.SERVICE_METHOD` header resolution from the request message
Otherwise it is not clear what exactly that header belongs to.
| ** `Mono<RequestType>` | ||
| ** `Stream<RequestType>` | ||
| ** `Collection<RequestType>` | ||
| ** `Array of RequestTypes` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we go code snippet, then let's give a proper example to readers:
RequestType[]
!
| } | ||
|
|
||
| @SuppressWarnings({"NullAway", "unchecked", "rawtypes"}) | ||
| @SuppressWarnings({"unchecked", "rawtypes"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I'll play with these rawtypes on merge.
Cannot judge properly from GH 😄
Sorry for the noise!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll submit another commit getting rid of the rawtypes.
| private Object invokeServerStreaming(Object request, MethodDescriptor<?, ?> methodDescriptor) { | ||
| Sinks.Many<Object> responseSink = Sinks.many().unicast().onBackpressureBuffer(); | ||
|
|
||
| StreamObserver<Object> responseObserver = new StreamObserver<>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like both invokeBidirectionalStreaming and this invokeServerStreaming do the same around responseObserver.
Can we extract that into an utility method like we have with the createSingleSinkResponseObserver?
| } | ||
|
|
||
| private Object invokeBlockingUnary(MethodDescriptor<Object, Object> method, Object request) { | ||
| return ClientCalls.blockingUnaryCall(this.channel, method, this.callOptions, request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't look like we need this single-line utility method.
Why don't call ClientCalls.blockingUnaryCall directly in the invokeUnary?
| Sinks.One<Object> responseSink = Sinks.one(); | ||
|
|
||
| StreamObserver<Object> responseObserver = createSingleSinkResponseObserver(responseSink); | ||
| handleRequestAsFlux(request, responseObserver, method); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is wrong here.
Perhaps copy/paste artifact.
The request has to go to UNARY as is, not as a streaming wrapper.
The responseObserver is handled by the gRPC client.
Not sure how tests works since you call streaming for unary...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case we are handling the async unary as a flux that converts to a mono.
i.e. a stream is a stream even if it only returns one Async item. We can discuss.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is not correct. This is unary and there is a dedicated API you call in the end of this method. Why do we call streaming here?
You have already converted response observer to Mono and you propagate it down to asyncUnary call. Why do we do streaming if that was not requested?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the mistake. Thank you for bringing it to my attention.
|
|
||
| NOTE: GrpcOutboundGateway is asynchronous by default. | ||
| However, to set the gateway to synchronous mode (specifically for the Unary gRPC communication pattern) use the `setAsync` method. | ||
| More can read about asynchronous service activators xref:service-activator.adoc#async-service-activator[here]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sentence is a bit awkward.
And I'm not a fun of here links.
Why just don't say like:
See more information in the xref:service-activator.adoc#async-service-activator[].
?
| } | ||
| ---- | ||
|
|
||
| 3. **Dynamic resolution** establish expression to determine method name `setMethodNameExpression()`: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sentence does not compile in my head.
How about:
**Dynamic resolution** via `setMethodNameExpression()`:
?
| } | ||
| ---- | ||
| + | ||
| 4. **Default method resolution** if neither a method name nor a method name expression is configured, and the service offers multiple methods, the gateway uses the `GrpcHeaders.SERVICE_METHOD` header to determine which method to invoke. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still missed from the request message context.
"Uses header" of what?
Imagine I'm new with Spring Integration, please, give me enough info to understand what is going on.
| The `GrpcOutboundGateway` automatically detects the method type from the `MethodDescriptor` and handles the invocation appropriately: | ||
|
|
||
| * **Unary** methods accept a single gRPC request message, returning a `Mono<ResponseType>` by default containing the response object. | ||
| The default `async` mode of the gateway is `true`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this sentence is bit of context.
Feels like something else is missed in between.
How about just to say in the first sentence:
returning a `Mono<ResponseType>` containing the response object in async mode (by default).
Then this second sentence is redundant and the next one makes sense.
Extract duplicated `StreamObserver` creation into helper methods: - Add `createStreamSinkResponseObserver()` for streaming responses - Consolidate bidirectional and server streaming observer logic - Inline `invokeBlockingUnary()` into `invokeUnary()` caller - Remove unnecessary `handleRequestAsFlux()` call in async unary Polish documentation for clarity: - Simplify async service activator xref link - Clarify how message headers work in message name resolution - Remove redundant wording about async mode default
artembilan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Pulling local for final review, clean up and merge.
Thanks
| } | ||
| ---- | ||
| + | ||
| 4. **Default method resolution** if neither a method name nor a method name expression is configured, and the service offers multiple methods, the gateway will look for the `GrpcHeaders.SERVICE_METHOD` key in the headers of the message (if present) to determine which method to invoke. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if present what?
Message?
But it is always there because that is a request message for this message handler.
|
|
||
| The `GrpcOutboundGateway` automatically detects the method type from the `MethodDescriptor` and handles the invocation appropriately: | ||
|
|
||
| * **Unary** methods accept a single gRPC request message, returning a `Mono<ResponseType>` by default containing the response object in async mode (by default). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have several "by default" in one sentence?
|
Merged after some cleanup: 1f8caf7 Thank you! |
Implement
GrpcOutboundGatewayto bridge Spring Integration messaging with gRPC client invocations. This component supports four gRPC communication patterns:The implementation includes two main components:
GrpcOutboundGateway: ExtendsAbstractReplyProducingMessageHandlerto provide request-reply semantics for gRPC clients. Features configurable async timeout (defaults to 10 seconds) and automatic method type detection based on the gRPCMethodDescriptor.The gateway automatically determines the invocation strategy: blocking calls for unary operations and reactive Flux-based responses for streaming operations. For streaming patterns, the component uses Project Reactor's
Sinks.Manyto bridge gRPC'sStreamObservercallback model with Spring Integration's reactive message handling.Build configuration updated to include necessary gRPC API dependencies for both compilation and testing. Reference documentation extended with examples demonstrating all four communication patterns and configuration options. What's New section updated to highlight this new feature.
Fixes #10622