diff --git a/pom.xml b/pom.xml
index 145beae..da75f55 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,6 +89,12 @@
3.0.1
test
+
+ org.testcontainers
+ testcontainers
+ 2.0.4
+ test
+
@@ -133,4 +139,4 @@
-
\ No newline at end of file
+
diff --git a/src/main/java/io/vertx/httpproxy/impl/HttpUtils.java b/src/main/java/io/vertx/httpproxy/impl/HttpUtils.java
index c392206..707c5b7 100644
--- a/src/main/java/io/vertx/httpproxy/impl/HttpUtils.java
+++ b/src/main/java/io/vertx/httpproxy/impl/HttpUtils.java
@@ -12,7 +12,8 @@
import io.vertx.core.MultiMap;
import io.vertx.core.http.HttpHeaders;
-import io.vertx.core.http.HttpServerResponse;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.http.HttpVersion;
import java.time.Instant;
import java.util.List;
@@ -54,8 +55,8 @@ static Instant dateHeader(MultiMap headers) {
}
}
- public static boolean trailersSupported(HttpServerResponse proxiedResponse) {
- return proxiedResponse.streamId() >= 0 // HTTP/2 and HTTP/3
- || proxiedResponse.isChunked(); // Required for HTTP/1.1
+ static boolean isNotHttp1x(HttpServerRequest request) {
+ HttpVersion httpVersion = request.connection().protocolVersion();
+ return httpVersion != HttpVersion.HTTP_1_0 && httpVersion != HttpVersion.HTTP_1_1;
}
}
diff --git a/src/main/java/io/vertx/httpproxy/impl/ProxiedRequest.java b/src/main/java/io/vertx/httpproxy/impl/ProxiedRequest.java
index a949d35..751f8b3 100644
--- a/src/main/java/io/vertx/httpproxy/impl/ProxiedRequest.java
+++ b/src/main/java/io/vertx/httpproxy/impl/ProxiedRequest.java
@@ -233,8 +233,9 @@ Future sendRequest() {
if (len >= 0) {
request.putHeader(CONTENT_LENGTH, Long.toString(len));
} else {
- Boolean isChunked = HttpUtils.isChunked(proxiedRequest.headers());
- request.setChunked(len == -1 && Boolean.TRUE == isChunked);
+ boolean isChunked = HttpUtils.isNotHttp1x(proxiedRequest)
+ || Boolean.TRUE == HttpUtils.isChunked(proxiedRequest.headers());
+ request.setChunked(isChunked);
}
Pipe pipe = body.stream().pipe();
diff --git a/src/main/java/io/vertx/httpproxy/impl/ProxiedResponse.java b/src/main/java/io/vertx/httpproxy/impl/ProxiedResponse.java
index 5c98d3b..3431722 100644
--- a/src/main/java/io/vertx/httpproxy/impl/ProxiedResponse.java
+++ b/src/main/java/io/vertx/httpproxy/impl/ProxiedResponse.java
@@ -273,7 +273,7 @@ private Future sendResponse(ReadStream body) {
// Only forward trailers if using the original backend response stream
if (body.equals(response)) {
MultiMap trailers = response.trailers();
- if (!trailers.isEmpty() && HttpUtils.trailersSupported(proxiedResponse)) {
+ if (!trailers.isEmpty() && (HttpUtils.isNotHttp1x(request.proxiedRequest()) || proxiedResponse.isChunked())) {
proxiedResponse.trailers().addAll(trailers);
}
}
diff --git a/src/test/java/io/vertx/tests/grpc/GrpcProxyIntegrationTest.java b/src/test/java/io/vertx/tests/grpc/GrpcProxyIntegrationTest.java
new file mode 100644
index 0000000..a34e621
--- /dev/null
+++ b/src/test/java/io/vertx/tests/grpc/GrpcProxyIntegrationTest.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright (c) 2011-2026 Contributors to the Eclipse Foundation
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+ * which is available at https://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+ */
+package io.vertx.tests.grpc;
+
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.command.WaitContainerCmd;
+import com.github.dockerjava.api.command.WaitContainerResultCallback;
+import io.netty.util.internal.PlatformDependent;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpClient;
+import io.vertx.core.http.HttpClientOptions;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerOptions;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.httpproxy.HttpProxy;
+import io.vertx.httpproxy.ProxyOptions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.InternetProtocol;
+import org.testcontainers.containers.startupcheck.OneShotStartupCheckStrategy;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+import java.time.Duration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeFalse;
+
+public class GrpcProxyIntegrationTest {
+
+ private static final int GRPC_SERVER_PORT = 50051;
+ private static final int PROXY_PORT = 8080;
+
+ private Vertx vertx;
+ private HttpServer proxyServer;
+ private HttpClient httpClient;
+ private ServerContainer> grpcServerContainer;
+ private GenericContainer> grpcClientContainer;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ assumeFalse("Cannot run Linux containers on Windows", PlatformDependent.isWindows());
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ vertx = Vertx.vertx();
+ }
+
+ @After
+ public void tearDownContainers() {
+ if (grpcClientContainer != null) {
+ grpcClientContainer.stop();
+ }
+ if (grpcServerContainer != null) {
+ grpcServerContainer.stop();
+ }
+ if (proxyServer != null) {
+ proxyServer.close().await();
+ }
+ if (httpClient != null) {
+ httpClient.close().await();
+ }
+ if (vertx != null) {
+ vertx.close().await();
+ }
+ }
+
+ @Test
+ public void testGrpcThroughProxy() throws Exception {
+ startGrpcServer();
+ startProxy();
+ int exitCode = runGrpcClient();
+ assertEquals("gRPC client tests should pass", 0, exitCode);
+ }
+
+ private void startProxy() {
+ httpClient = vertx.createHttpClient(new HttpClientOptions()
+ .setProtocolVersion(io.vertx.core.http.HttpVersion.HTTP_2)
+ .setHttp2ClearTextUpgrade(false));
+
+ SocketAddress backend = SocketAddress.inetSocketAddress(grpcServerContainer.getMappedPort(GRPC_SERVER_PORT), grpcServerContainer.getHost());
+ HttpProxy proxy = HttpProxy.reverseProxy(new ProxyOptions(), httpClient).origin(backend);
+
+ proxyServer = vertx.createHttpServer(new HttpServerOptions()
+ .setPort(PROXY_PORT)
+ .setHost("0.0.0.0")
+ .setHttp2ClearTextEnabled(true))
+ .requestHandler(proxy)
+ .listen()
+ .await();
+ }
+
+ private void startGrpcServer() throws Exception {
+ grpcServerContainer = new ServerContainer<>(new ImageFromDockerfile("vertx-http-proxy-grpc-server", false)
+ .withFileFromClasspath("Dockerfile", "grpc/server/Dockerfile")
+ .withFileFromClasspath("server.js", "grpc/server/server.js")
+ .withFileFromClasspath("package.json", "grpc/package.json")
+ .withFileFromClasspath("test.proto", "grpc/test.proto"));
+ if (System.getProperties().containsKey("containerFixedPort")) {
+ grpcServerContainer.withFixedExposedPort(GRPC_SERVER_PORT, GRPC_SERVER_PORT);
+ } else {
+ grpcServerContainer.withExposedPorts(GRPC_SERVER_PORT);
+ }
+ grpcServerContainer
+ .withEnv("GRPC_PORT", String.valueOf(GRPC_SERVER_PORT))
+ .withEnv("GRPC_HOST", "0.0.0.0")
+ .waitingFor(Wait.forLogMessage(".*gRPC server listening.*", 1));
+
+ grpcServerContainer.start();
+ }
+
+ private int runGrpcClient() throws Exception {
+ grpcClientContainer = new GenericContainer<>(
+ new ImageFromDockerfile("vertx-http-proxy-grpc-client", false)
+ .withFileFromClasspath("Dockerfile", "grpc/client/Dockerfile")
+ .withFileFromClasspath("client.js", "grpc/client/client.js")
+ .withFileFromClasspath("package.json", "grpc/package.json")
+ .withFileFromClasspath("test.proto", "grpc/test.proto"))
+ .withNetworkMode("host")
+ .withEnv("GRPC_SERVER", String.format("localhost:%d", PROXY_PORT))
+ .withStartupCheckStrategy(new OneShotStartupCheckStrategy())
+ .withStartupTimeout(Duration.ofMinutes(3));
+
+ grpcClientContainer.start();
+
+ DockerClient dockerClient = grpcClientContainer.getDockerClient();
+ try (WaitContainerCmd cmd = dockerClient.waitContainerCmd(grpcClientContainer.getContainerId())) {
+ return cmd.exec(new WaitContainerResultCallback())
+ .awaitStatusCode();
+ }
+ }
+
+ private static class ServerContainer> extends GenericContainer {
+
+ public ServerContainer(java.util.concurrent.Future dockerImageName) {
+ super(dockerImageName);
+ }
+
+ public SELF withFixedExposedPort(int hostPort, int containerPort) {
+ super.addFixedExposedPort(hostPort, containerPort, InternetProtocol.TCP);
+ return self();
+ }
+ }
+}
diff --git a/src/test/java/module-info.java b/src/test/java/module-info.java
index 1d5c16e..951e787 100644
--- a/src/test/java/module-info.java
+++ b/src/test/java/module-info.java
@@ -4,4 +4,8 @@
requires io.vertx.testing.unit;
requires junit;
requires wiremock.standalone;
+ requires testcontainers;
+ requires com.github.dockerjava.api;
+ requires org.apache.commons.lang3;
+ requires io.netty.common;
}
diff --git a/src/test/resources/grpc/client/Dockerfile b/src/test/resources/grpc/client/Dockerfile
new file mode 100644
index 0000000..9f4d550
--- /dev/null
+++ b/src/test/resources/grpc/client/Dockerfile
@@ -0,0 +1,16 @@
+FROM fedora:43 AS base
+
+RUN dnf install -y nodejs npm && dnf clean all
+
+WORKDIR /app
+
+COPY package.json ./
+RUN npm install
+
+COPY test.proto ./
+
+FROM base AS client
+
+COPY client.js ./
+
+CMD ["node", "client.js"]
diff --git a/src/test/resources/grpc/client/client.js b/src/test/resources/grpc/client/client.js
new file mode 100644
index 0000000..a6c0fef
--- /dev/null
+++ b/src/test/resources/grpc/client/client.js
@@ -0,0 +1,159 @@
+const grpc = require('@grpc/grpc-js');
+const protoLoader = require('@grpc/proto-loader');
+const path = require('path');
+
+// Load the proto file
+const PROTO_PATH = path.join(__dirname, 'test.proto');
+const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
+ keepCase: true,
+ longs: String,
+ enums: String,
+ defaults: true,
+ oneofs: true
+});
+
+const testProto = grpc.loadPackageDefinition(packageDefinition).testservice;
+
+// Get server address from environment or use default
+const serverAddress = process.env.GRPC_SERVER || 'localhost:50051';
+const client = new testProto.TestService(
+ serverAddress,
+ grpc.credentials.createInsecure()
+);
+
+let testsPassed = 0;
+let testsFailed = 0;
+
+// Test 1: Unary call
+function testUnaryCall() {
+ return new Promise((resolve, reject) => {
+ console.log('Testing unary call...');
+ client.UnaryCall({message: 'Hello from client'}, (err, response) => {
+ if (err) {
+ console.error('Unary call failed:', err);
+ testsFailed++;
+ reject(err);
+ } else {
+ console.log('Unary call response:', response.reply);
+ if (response.reply.includes('Hello from client')) {
+ console.log('✓ Unary call test PASSED');
+ testsPassed++;
+ resolve();
+ } else {
+ console.error('✗ Unary call test FAILED: unexpected response');
+ testsFailed++;
+ reject(new Error('Unexpected response'));
+ }
+ }
+ });
+ });
+}
+
+// Test 2: Server streaming call
+function testServerStreamingCall() {
+ return new Promise((resolve, reject) => {
+ console.log('Testing server streaming call...');
+ const call = client.ServerStreamingCall({message: 'Stream test', count: 3});
+ let receivedCount = 0;
+
+ call.on('data', (response) => {
+ console.log(`Received stream response ${response.index}:`, response.reply);
+ receivedCount++;
+ });
+
+ call.on('end', () => {
+ if (receivedCount === 3) {
+ console.log('✓ Server streaming call test PASSED');
+ testsPassed++;
+ resolve();
+ } else {
+ console.error(`✗ Server streaming call test FAILED: expected 3 responses, got ${receivedCount}`);
+ testsFailed++;
+ reject(new Error(`Expected 3 responses, got ${receivedCount}`));
+ }
+ });
+
+ call.on('error', (err) => {
+ console.error('Server streaming call failed:', err);
+ testsFailed++;
+ reject(err);
+ });
+ });
+}
+
+// Test 3: Bidirectional streaming call
+function testBidirectionalStreamingCall() {
+ return new Promise((resolve, reject) => {
+ console.log('Testing bidirectional streaming call...');
+ const call = client.BidirectionalStreamingCall();
+ let receivedCount = 0;
+ const messagesToSend = ['Message 1', 'Message 2', 'Message 3'];
+
+ call.on('data', (response) => {
+ console.log('Received bidi response:', response.reply);
+ receivedCount++;
+
+ if (receivedCount === messagesToSend.length) {
+ call.end();
+ }
+ });
+
+ call.on('end', () => {
+ if (receivedCount === messagesToSend.length) {
+ console.log('✓ Bidirectional streaming call test PASSED');
+ testsPassed++;
+ resolve();
+ } else {
+ console.error(`✗ Bidirectional streaming call test FAILED: expected ${messagesToSend.length} responses, got ${receivedCount}`);
+ testsFailed++;
+ reject(new Error(`Expected ${messagesToSend.length} responses, got ${receivedCount}`));
+ }
+ });
+
+ call.on('error', (err) => {
+ console.error('Bidirectional streaming call failed:', err);
+ testsFailed++;
+ reject(err);
+ });
+
+ // Send messages
+ messagesToSend.forEach((msg, index) => {
+ setTimeout(() => {
+ console.log(`Sending bidi message ${index + 1}:`, msg);
+ call.write({message: msg});
+ }, index * 100);
+ });
+ });
+}
+
+// Run all tests
+async function runTests() {
+ console.log('Starting gRPC client tests...');
+ console.log(`Connecting to server at: ${serverAddress}`);
+
+ try {
+ await testUnaryCall();
+ await testServerStreamingCall();
+ await testBidirectionalStreamingCall();
+
+ console.log('\n=== Test Results ===');
+ console.log(`Tests passed: ${testsPassed}`);
+ console.log(`Tests failed: ${testsFailed}`);
+
+ if (testsFailed === 0) {
+ console.log('All tests PASSED!');
+ process.exit(0);
+ } else {
+ console.log('Some tests FAILED!');
+ process.exit(1);
+ }
+ } catch (err) {
+ console.error('Test execution failed:', err);
+ console.log('\n=== Test Results ===');
+ console.log(`Tests passed: ${testsPassed}`);
+ console.log(`Tests failed: ${testsFailed}`);
+ process.exit(1);
+ }
+}
+
+runTests();
diff --git a/src/test/resources/grpc/package.json b/src/test/resources/grpc/package.json
new file mode 100644
index 0000000..bd08b9a
--- /dev/null
+++ b/src/test/resources/grpc/package.json
@@ -0,0 +1,14 @@
+{
+ "name": "grpc-test",
+ "version": "1.0.0",
+ "description": "gRPC test client and server for vertx-http-proxy",
+ "main": "server/server.js",
+ "scripts": {
+ "server": "node server/server.js",
+ "client": "node client/client.js"
+ },
+ "dependencies": {
+ "@grpc/grpc-js": "^1.9.0",
+ "@grpc/proto-loader": "^0.7.10"
+ }
+}
diff --git a/src/test/resources/grpc/server/Dockerfile b/src/test/resources/grpc/server/Dockerfile
new file mode 100644
index 0000000..3038a58
--- /dev/null
+++ b/src/test/resources/grpc/server/Dockerfile
@@ -0,0 +1,18 @@
+FROM fedora:43 AS base
+
+RUN dnf install -y nodejs npm && dnf clean all
+
+WORKDIR /app
+
+COPY package.json ./
+RUN npm install
+
+COPY test.proto ./
+
+FROM base AS server
+
+COPY server.js ./
+
+EXPOSE 50051
+
+CMD ["node", "server.js"]
diff --git a/src/test/resources/grpc/server/server.js b/src/test/resources/grpc/server/server.js
new file mode 100644
index 0000000..c627a84
--- /dev/null
+++ b/src/test/resources/grpc/server/server.js
@@ -0,0 +1,83 @@
+const grpc = require('@grpc/grpc-js');
+const protoLoader = require('@grpc/proto-loader');
+const path = require('path');
+
+// Load the proto file
+const PROTO_PATH = path.join(__dirname, 'test.proto');
+const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
+ keepCase: true,
+ longs: String,
+ enums: String,
+ defaults: true,
+ oneofs: true
+});
+
+const testProto = grpc.loadPackageDefinition(packageDefinition).testservice;
+
+// Implement the service methods
+function unaryCall(call, callback) {
+ console.log('Received unary call:', call.request.message);
+ callback(null, {reply: `Echo: ${call.request.message}`});
+}
+
+function serverStreamingCall(call) {
+ const count = call.request.count || 5;
+ console.log(`Received server streaming call: ${call.request.message}, count: ${count}`);
+
+ for (let i = 0; i < count; i++) {
+ call.write({
+ reply: `Stream response ${i + 1}: ${call.request.message}`,
+ index: i
+ });
+ }
+ call.end();
+}
+
+function bidirectionalStreamingCall(call) {
+ console.log('Received bidirectional streaming call');
+
+ call.on('data', (request) => {
+ console.log('Received bidi message:', request.message);
+ call.write({
+ reply: `Bidi echo: ${request.message}`
+ });
+ });
+
+ call.on('end', () => {
+ console.log('Bidi stream ended');
+ call.end();
+ });
+
+ call.on('error', (err) => {
+ console.error('Bidi stream error:', err);
+ });
+}
+
+// Start the server
+function main() {
+ const server = new grpc.Server();
+
+ server.addService(testProto.TestService.service, {
+ UnaryCall: unaryCall,
+ ServerStreamingCall: serverStreamingCall,
+ BidirectionalStreamingCall: bidirectionalStreamingCall
+ });
+
+ const port = process.env.GRPC_PORT || '50051';
+ const host = process.env.GRPC_HOST || '0.0.0.0';
+ const address = `${host}:${port}`;
+
+ server.bindAsync(
+ address,
+ grpc.ServerCredentials.createInsecure(),
+ (err, port) => {
+ if (err) {
+ console.error('Failed to bind server:', err);
+ process.exit(1);
+ }
+ console.log(`gRPC server listening on ${address}`);
+ }
+ );
+}
+
+main();
diff --git a/src/test/resources/grpc/test.proto b/src/test/resources/grpc/test.proto
new file mode 100644
index 0000000..6216740
--- /dev/null
+++ b/src/test/resources/grpc/test.proto
@@ -0,0 +1,44 @@
+syntax = "proto3";
+
+package testservice;
+
+// Test service with three types of gRPC communication patterns
+service TestService {
+ // Unary RPC: Simple request/reply
+ rpc UnaryCall(UnaryRequest) returns (UnaryResponse);
+
+ // Server streaming RPC: Client sends one request, server sends stream of responses
+ rpc ServerStreamingCall(StreamRequest) returns (stream StreamResponse);
+
+ // Bidirectional streaming RPC: Both client and server send streams
+ rpc BidirectionalStreamingCall(stream BidiRequest) returns (stream BidiResponse);
+}
+
+// Messages for unary call
+message UnaryRequest {
+ string message = 1;
+}
+
+message UnaryResponse {
+ string reply = 1;
+}
+
+// Messages for server streaming
+message StreamRequest {
+ string message = 1;
+ int32 count = 2;
+}
+
+message StreamResponse {
+ string reply = 1;
+ int32 index = 2;
+}
+
+// Messages for bidirectional streaming
+message BidiRequest {
+ string message = 1;
+}
+
+message BidiResponse {
+ string reply = 1;
+}