diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractClassicOverAsyncIntegrationTestBase.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractClassicOverAsyncIntegrationTestBase.java new file mode 100644 index 0000000000..9972761394 --- /dev/null +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/AbstractClassicOverAsyncIntegrationTestBase.java @@ -0,0 +1,80 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.testing.async; + +import java.net.InetSocketAddress; +import java.util.function.Consumer; + +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.compat.ClassicToAsyncAdaptor; +import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel; +import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel; +import org.apache.hc.client5.testing.extension.async.TestAsyncClient; +import org.apache.hc.client5.testing.extension.async.TestAsyncClientBuilder; +import org.apache.hc.client5.testing.extension.async.TestAsyncResources; +import org.apache.hc.client5.testing.extension.async.TestAsyncServer; +import org.apache.hc.client5.testing.extension.async.TestAsyncServerBootstrap; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; + +abstract class AbstractClassicOverAsyncIntegrationTestBase { + + public static final Timeout TIMEOUT = Timeout.ofMinutes(1); + + @RegisterExtension + private final TestAsyncResources testResources; + + protected AbstractClassicOverAsyncIntegrationTestBase(final URIScheme scheme, + final ClientProtocolLevel clientProtocolLevel, + final ServerProtocolLevel serverProtocolLevel) { + this.testResources = new TestAsyncResources(scheme, clientProtocolLevel, serverProtocolLevel, TIMEOUT); + } + + public void configureServer(final Consumer serverCustomizer) { + testResources.configureServer(serverCustomizer); + } + + public HttpHost startServer() throws Exception { + final TestAsyncServer server = testResources.server(); + final InetSocketAddress inetSocketAddress = server.start(); + return new HttpHost(testResources.scheme().id, "localhost", inetSocketAddress.getPort()); + } + + public void configureClient(final Consumer clientCustomizer) { + testResources.configureClient(clientCustomizer); + } + + public CloseableHttpClient startClient() throws Exception { + final TestAsyncClient client = testResources.client(); + client.start(); + return new ClassicToAsyncAdaptor(client, TIMEOUT); + } + +} diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/ClassicOverAsyncIntegrationTests.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/ClassicOverAsyncIntegrationTests.java new file mode 100644 index 0000000000..4537d69d5d --- /dev/null +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/ClassicOverAsyncIntegrationTests.java @@ -0,0 +1,77 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.testing.async; + +import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel; +import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel; +import org.apache.hc.core5.http.URIScheme; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; + +class ClassicOverAsyncIntegrationTests { + + @Nested + @DisplayName("Fundamentals") + class Fundamentals extends TestClassicOverAsyncHttp1 { + + public Fundamentals() { + super(URIScheme.HTTP, ClientProtocolLevel.STANDARD, ServerProtocolLevel.STANDARD); + } + + } + + @Nested + @DisplayName("Fundamentals (TLS)") + class FundamentalsTls extends TestClassicOverAsyncHttp1 { + + public FundamentalsTls() { + super(URIScheme.HTTPS, ClientProtocolLevel.STANDARD, ServerProtocolLevel.STANDARD); + } + + } + + @Nested + @DisplayName("Fundamentals (HTTP/2)") + class FundamentalsH2 extends TestClassicOverAsync { + + public FundamentalsH2() { + super(URIScheme.HTTP, ClientProtocolLevel.H2_ONLY, ServerProtocolLevel.H2_ONLY); + } + + } + + @Nested + @DisplayName("Fundamentals (HTTP/2, TLS)") + class FundamentalsH2Tls extends TestClassicOverAsync { + + public FundamentalsH2Tls() { + super(URIScheme.HTTPS, ClientProtocolLevel.H2_ONLY, ServerProtocolLevel.H2_ONLY); + } + + } + +} \ No newline at end of file diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestClassicOverAsync.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestClassicOverAsync.java new file mode 100644 index 0000000000..ac06bdf527 --- /dev/null +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestClassicOverAsync.java @@ -0,0 +1,216 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.testing.async; + +import java.io.IOException; +import java.util.Queue; +import java.util.Random; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.testing.Result; +import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel; +import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel; +import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.io.entity.ByteArrayEntity; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public abstract class TestClassicOverAsync extends AbstractClassicOverAsyncIntegrationTestBase { + + public TestClassicOverAsync(final URIScheme scheme, + final ClientProtocolLevel clientProtocolLevel, + final ServerProtocolLevel serverProtocolLevel) { + super(scheme, clientProtocolLevel, serverProtocolLevel); + } + + @ValueSource(ints = {0, 2048, 10240}) + @ParameterizedTest(name = "{displayName}; content length: {0}") + void testSequentialGetRequests(final int contentSize) throws Exception { + configureServer(bootstrap -> + bootstrap.register("/random/*", AsyncRandomHandler::new)); + final HttpHost target = startServer(); + + final CloseableHttpClient client = startClient(); + + final int n = 10; + for (int i = 0; i < n; i++) { + client.execute( + ClassicRequestBuilder.get() + .setHttpHost(target) + .setPath("/random/" + contentSize) + .build(), + response -> { + Assertions.assertEquals(200, response.getCode()); + final byte[] bytes = EntityUtils.toByteArray(response.getEntity()); + Assertions.assertNotNull(bytes); + Assertions.assertEquals(contentSize, bytes.length); + return null; + }); + } + } + + @ValueSource(ints = {0, 2048, 10240}) + @ParameterizedTest(name = "{displayName}; content length: {0}") + void testConcurrentGetRequests(final int contentSize) throws Exception { + configureServer(bootstrap -> + bootstrap.register("/random/*", AsyncRandomHandler::new)); + final HttpHost target = startServer(); + + final CloseableHttpClient client = startClient(); + + final int n = 10; + + final ExecutorService executorService = Executors.newFixedThreadPool(n); + final CountDownLatch countDownLatch = new CountDownLatch(n); + final Queue>> resultQueue = new ConcurrentLinkedQueue<>(); + for (int i = 0; i < n; i++) { + resultQueue.add(executorService.submit(() -> { + final ClassicHttpRequest request = ClassicRequestBuilder.get() + .setHttpHost(target) + .setPath("/random/" + contentSize) + .build(); + try { + return client.execute( + request, + response -> { + final byte[] bytes = EntityUtils.toByteArray(response.getEntity()); + countDownLatch.countDown(); + return new Result<>(request, response, bytes); + }); + } catch (final RuntimeException | IOException ex) { + countDownLatch.countDown(); + return new Result<>(request, ex); + } + })); + } + Assertions.assertTrue(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit())); + Assertions.assertEquals(n, resultQueue.size()); + for (final Future> future : resultQueue) { + final Result result = future.get(); + Assertions.assertNotNull(result.response); + Assertions.assertEquals(200, result.response.getCode()); + Assertions.assertNotNull(result.content); + Assertions.assertEquals(contentSize, result.content.length); + } + + executorService.shutdownNow(); + } + + @ValueSource(ints = {0, 2048, 10240}) + @ParameterizedTest(name = "{displayName}; content length: {0}") + void testSequentialPostRequests(final int contentSize) throws Exception { + configureServer(bootstrap -> + bootstrap.register("/echo/*", AsyncEchoHandler::new)); + final HttpHost target = startServer(); + + final CloseableHttpClient client = startClient(); + + final int n = 10; + for (int i = 0; i < n; i++) { + final byte[] temp = new byte[contentSize]; + new Random(System.currentTimeMillis()).nextBytes(temp); + client.execute( + ClassicRequestBuilder.post() + .setHttpHost(target) + .setPath("/echo/") + .setEntity(new ByteArrayEntity(temp, ContentType.DEFAULT_BINARY)) + .build(), + response -> { + Assertions.assertEquals(200, response.getCode()); + final byte[] bytes = EntityUtils.toByteArray(response.getEntity()); + Assertions.assertNotNull(bytes); + Assertions.assertArrayEquals(temp, bytes); + return null; + }); + } + } + + @ValueSource(ints = {0, 2048, 10240}) + @ParameterizedTest(name = "{displayName}; content length: {0}") + void testConcurrentPostRequests(final int contentSize) throws Exception { + configureServer(bootstrap -> + bootstrap.register("/echo/*", AsyncEchoHandler::new)); + final HttpHost target = startServer(); + + final CloseableHttpClient client = startClient(); + + final int n = 10; + + final ExecutorService executorService = Executors.newFixedThreadPool(n); + final CountDownLatch countDownLatch = new CountDownLatch(n); + final Queue>> resultQueue = new ConcurrentLinkedQueue<>(); + for (int i = 0; i < n; i++) { + final byte[] temp = new byte[contentSize]; + new Random(System.currentTimeMillis()).nextBytes(temp); + resultQueue.add(executorService.submit(() -> { + final ClassicHttpRequest request = ClassicRequestBuilder.post() + .setHttpHost(target) + .setPath("/echo/") + .setEntity(new ByteArrayEntity(temp, ContentType.DEFAULT_BINARY)) + .build(); + try { + return client.execute( + request, + response -> { + final byte[] bytes = EntityUtils.toByteArray(response.getEntity()); + countDownLatch.countDown(); + return new Result<>(request, response, bytes); + }); + } catch (final RuntimeException | IOException ex) { + countDownLatch.countDown(); + return new Result<>(request, ex); + } + })); + } + Assertions.assertTrue(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit())); + Assertions.assertEquals(n, resultQueue.size()); + for (final Future> future : resultQueue) { + final Result result = future.get(); + if (result.exception != null) { + Assertions.fail(result.exception); + } + Assertions.assertNotNull(result.response); + Assertions.assertEquals(200, result.response.getCode()); + Assertions.assertNotNull(result.content); + Assertions.assertEquals(contentSize, result.content.length); + } + + executorService.shutdownNow(); + } + +} \ No newline at end of file diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestClassicOverAsyncHttp1.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestClassicOverAsyncHttp1.java new file mode 100644 index 0000000000..8f6ba5bf08 --- /dev/null +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestClassicOverAsyncHttp1.java @@ -0,0 +1,83 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.testing.async; + +import java.util.Random; + +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel; +import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.io.entity.ByteArrayEntity; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public abstract class TestClassicOverAsyncHttp1 extends TestClassicOverAsync { + + public TestClassicOverAsyncHttp1(final URIScheme scheme, + final ClientProtocolLevel clientProtocolLevel, + final ServerProtocolLevel serverProtocolLevel) { + super(scheme, clientProtocolLevel, serverProtocolLevel); + } + + @ValueSource(ints = {0, 2048, 10240}) + @ParameterizedTest(name = "{displayName}; content length: {0}") + void testSequentialPostRequestsNoKeepAlive(final int contentSize) throws Exception { + configureServer(bootstrap -> + bootstrap.register("/echo/*", AsyncEchoHandler::new)); + final HttpHost target = startServer(); + + final CloseableHttpClient client = startClient(); + + final int n = 10; + for (int i = 0; i < n; i++) { + final byte[] temp = new byte[contentSize]; + new Random(System.currentTimeMillis()).nextBytes(temp); + client.execute( + ClassicRequestBuilder.post() + .setHttpHost(target) + .setPath("/echo/") + .addHeader(HttpHeaders.CONNECTION, "Close") + .setEntity(new ByteArrayEntity(temp, ContentType.DEFAULT_BINARY)) + .build(), + response -> { + Assertions.assertEquals(200, response.getCode()); + final byte[] bytes = EntityUtils.toByteArray(response.getEntity()); + Assertions.assertNotNull(bytes); + Assertions.assertArrayEquals(temp, bytes); + return null; + }); + } + } + +} \ No newline at end of file diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java index 22e62f7b4b..d7d87cb330 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClients.java @@ -33,9 +33,12 @@ import org.apache.hc.client5.http.config.TlsConfig; import org.apache.hc.client5.http.impl.DefaultClientConnectionReuseStrategy; import org.apache.hc.client5.http.impl.DefaultSchemePortResolver; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.compat.ClassicToAsyncAdaptor; import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; import org.apache.hc.client5.http.nio.AsyncClientConnectionManager; import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy; +import org.apache.hc.core5.annotation.Experimental; import org.apache.hc.core5.concurrent.DefaultThreadFactory; import org.apache.hc.core5.function.Supplier; import org.apache.hc.core5.http.config.CharCodingConfig; @@ -54,6 +57,7 @@ import org.apache.hc.core5.http2.protocol.H2RequestTargetHost; import org.apache.hc.core5.reactor.IOEventHandlerFactory; import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.Timeout; import org.apache.hc.core5.util.VersionInfo; /** @@ -349,4 +353,23 @@ public static HandlerFactory pushRouter(final RequestRouter + * This feature is considered experimental and may be discontinued in the future. + * + * @param client async client + * @param operationTimeout maximum period an operation can block awaiting input / output. + * + * @since 5.5 + */ + @Experimental + public static CloseableHttpClient classic(final CloseableHttpAsyncClient client, final Timeout operationTimeout) { + client.start(); + return new ClassicToAsyncAdaptor(client, operationTimeout); + } + } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java index 74bae9c246..8e50cc4a62 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalAbstractHttpAsyncClient.java @@ -362,7 +362,7 @@ public void failed(final Exception cause) { } }); - }, context); + }, clientContext); } catch (final HttpException | IOException | IllegalStateException ex) { future.failed(ex); } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/CloseableDelegate.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/CloseableDelegate.java new file mode 100644 index 0000000000..ae2d33de30 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/CloseableDelegate.java @@ -0,0 +1,45 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.impl.classic; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.io.CloseMode; + +/** + * @since 5.5 + */ +@Internal +@FunctionalInterface +public interface CloseableDelegate { + + void close(Closeable closeable, CloseMode closeMode) throws IOException; + +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/CloseableHttpResponse.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/CloseableHttpResponse.java index 32b5a57ae9..7f66413190 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/CloseableHttpResponse.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/CloseableHttpResponse.java @@ -50,7 +50,7 @@ public final class CloseableHttpResponse implements ClassicHttpResponse, ModalCloseable { private final ClassicHttpResponse response; - private final ExecRuntime execRuntime; + private final CloseableDelegate closeableDelegate; /** * @since 5.4 @@ -60,14 +60,45 @@ public static CloseableHttpResponse adapt(final ClassicHttpResponse response) { if (response == null) { return null; } - return response instanceof CloseableHttpResponse - ? (CloseableHttpResponse) response - : new CloseableHttpResponse(response, null); + return response instanceof CloseableHttpResponse ? + (CloseableHttpResponse) response : + new CloseableHttpResponse(response); + } + + /** + * @since 5.5 + */ + @Internal + public static CloseableHttpResponse create(final ClassicHttpResponse response, + final CloseableDelegate closeableDelegate) { + if (response == null) { + return null; + } + return new CloseableHttpResponse(response, closeableDelegate); + } + + CloseableHttpResponse(final ClassicHttpResponse response) { + this.response = Args.notNull(response, "Response"); + this.closeableDelegate = null; + } + + CloseableHttpResponse(final ClassicHttpResponse response, final CloseableDelegate closeableDelegate) { + this.response = Args.notNull(response, "Response"); + this.closeableDelegate = closeableDelegate; } CloseableHttpResponse(final ClassicHttpResponse response, final ExecRuntime execRuntime) { this.response = Args.notNull(response, "Response"); - this.execRuntime = execRuntime; + this.closeableDelegate = (closeable, closeMode) -> { + try { + if (closeMode == CloseMode.GRACEFUL) { + response.close(); + } + execRuntime.disconnectEndpoint(); + } finally { + execRuntime.discardEndpoint(); + } + }; } @Override @@ -201,15 +232,8 @@ public Iterator
headerIterator(final String name) { } private void doClose(final CloseMode closeMode) throws IOException { - if (execRuntime != null) { - try { - if (closeMode == CloseMode.GRACEFUL) { - response.close(); - } - execRuntime.disconnectEndpoint(); - } finally { - execRuntime.discardEndpoint(); - } + if (closeableDelegate != null) { + closeableDelegate.close(response, closeMode); } else { response.close(); } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/MainClientExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/MainClientExec.java index f51cbb55bd..43121e46fd 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/MainClientExec.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/MainClientExec.java @@ -170,7 +170,7 @@ public ClassicHttpResponse execute( if (entity == null || !entity.isStreaming()) { // connection not needed and (assumed to be) in re-usable state execRuntime.releaseEndpoint(); - return new CloseableHttpResponse(response, null); + return new CloseableHttpResponse(response); } return new CloseableHttpResponse(response, execRuntime); } catch (final ConnectionShutdownException ex) { diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/MinimalHttpClient.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/MinimalHttpClient.java index cceb61af3a..8d4417d6a5 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/MinimalHttpClient.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/MinimalHttpClient.java @@ -158,7 +158,7 @@ protected CloseableHttpResponse doExecute( if (entity == null || !entity.isStreaming()) { // connection not needed and (assumed to be) in re-usable state execRuntime.releaseEndpoint(); - return new CloseableHttpResponse(response, null); + return new CloseableHttpResponse(response); } ResponseEntityProxy.enhance(response, execRuntime); return new CloseableHttpResponse(response, execRuntime); diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/AbstractSharedBuffer.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/AbstractSharedBuffer.java new file mode 100644 index 0000000000..a6abb489d9 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/AbstractSharedBuffer.java @@ -0,0 +1,118 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.compat; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.http.impl.nio.ExpandableBuffer; +import org.apache.hc.core5.util.Args; + +/** + * TODO: to be replaced by core functionality + */ +@Internal +abstract class AbstractSharedBuffer extends ExpandableBuffer { + + final ReentrantLock lock; + final Condition condition; + + volatile boolean endStream; + volatile boolean aborted; + + public AbstractSharedBuffer(final ReentrantLock lock, final int initialBufferSize) { + super(initialBufferSize); + this.lock = Args.notNull(lock, "Lock"); + this.condition = lock.newCondition(); + } + + @Override + public boolean hasData() { + lock.lock(); + try { + return super.hasData(); + } finally { + lock.unlock(); + } + } + + @Override + public int capacity() { + lock.lock(); + try { + return super.capacity(); + } finally { + lock.unlock(); + } + } + + @Override + public int length() { + lock.lock(); + try { + return super.length(); + } finally { + lock.unlock(); + } + } + + public void abort() { + lock.lock(); + try { + endStream = true; + aborted = true; + condition.signalAll(); + } finally { + lock.unlock(); + } + } + + public void reset() { + if (aborted) { + return; + } + lock.lock(); + try { + setInputMode(); + buffer().clear(); + endStream = false; + } finally { + lock.unlock(); + } + } + + public boolean isEndStream() { + lock.lock(); + try { + return endStream && !super.hasData(); + } finally { + lock.unlock(); + } + } + +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/ClassicToAsyncAdaptor.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/ClassicToAsyncAdaptor.java new file mode 100644 index 0000000000..f1bfa36b3a --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/ClassicToAsyncAdaptor.java @@ -0,0 +1,110 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.impl.compat; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.Future; + +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.core5.annotation.Contract; +import org.apache.hc.core5.annotation.Experimental; +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.annotation.ThreadingBehavior; +import org.apache.hc.core5.concurrent.CancellableDependency; +import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.util.Timeout; + +/** + * {@link CloseableHttpClient} implementation backed by {@link CloseableHttpAsyncClient} + * acting as a compatibility bridge with the classic APIs based on the standard + * {@link java.io.InputStream} / {@link java.io.OutputStream} model. + * + * @since 5.5 + */ +@Experimental +@Internal +@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL) +public class ClassicToAsyncAdaptor extends CloseableHttpClient { + + private final CloseableHttpAsyncClient client; + private final Timeout operationTimeout; + + public ClassicToAsyncAdaptor(final CloseableHttpAsyncClient client, final Timeout operationTimeout) { + super(); + this.client = client; + this.operationTimeout = operationTimeout; + } + + @Override + protected CloseableHttpResponse doExecute( + final HttpHost target, + final ClassicHttpRequest request, + final HttpContext context) throws IOException { + final ClassicToAsyncRequestProducer requestProducer = new ClassicToAsyncRequestProducer(request, operationTimeout); + final ClassicToAsyncResponseConsumer responseConsumer = new ClassicToAsyncResponseConsumer(operationTimeout); + final Future resultFuture = client.execute(target, requestProducer, responseConsumer, null, context, null); + if (request instanceof CancellableDependency) { + ((CancellableDependency) request).setDependency(() -> resultFuture.cancel(true)); + } + try { + requestProducer.blockWaiting().execute(); + final ClassicHttpResponse response = responseConsumer.blockWaiting(); + return CloseableHttpResponse.create(response, + (closeable, closeMode) -> { + try { + if (closeMode == CloseMode.GRACEFUL) { + closeable.close(); + } + } finally { + resultFuture.cancel(true); + } + }); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException(); + } + } + + @Override + public void close() throws IOException { + client.close(); + } + + @Override + public void close(final CloseMode closeMode) { + client.close(closeMode); + } + +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/ClassicToAsyncRequestProducer.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/ClassicToAsyncRequestProducer.java new file mode 100644 index 0000000000..2baaea5aa7 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/ClassicToAsyncRequestProducer.java @@ -0,0 +1,198 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.compat; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hc.core5.annotation.Experimental; +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.RequestChannel; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.Asserts; +import org.apache.hc.core5.util.Timeout; + +/** + * TODO: to be replaced by core functionality + */ +@Experimental +@Internal +class ClassicToAsyncRequestProducer implements AsyncRequestProducer { + + private final ClassicHttpRequest request; + private final int initialBufferSize; + private final Timeout timeout; + private final CountDownLatch countDownLatch; + private final AtomicReference bufferRef; + private final AtomicReference exceptionRef; + + private volatile boolean repeatable; + + public interface IORunnable { + + void execute() throws IOException; + + } + + public ClassicToAsyncRequestProducer(final ClassicHttpRequest request, final int initialBufferSize, final Timeout timeout) { + this.request = Args.notNull(request, "HTTP request"); + this.initialBufferSize = Args.positive(initialBufferSize, "Initial buffer size"); + this.timeout = timeout; + this.countDownLatch = new CountDownLatch(1); + this.bufferRef = new AtomicReference<>(); + this.exceptionRef = new AtomicReference<>(); + } + + public ClassicToAsyncRequestProducer(final ClassicHttpRequest request, final Timeout timeout) { + this(request, ClassicToAsyncSupport.INITIAL_BUF_SIZE, timeout); + } + + void propagateException() throws IOException { + final Exception ex = exceptionRef.getAndSet(null); + if (ex != null) { + ClassicToAsyncSupport.rethrow(ex); + } + } + + public IORunnable blockWaiting() throws IOException, InterruptedException { + if (timeout == null) { + countDownLatch.await(); + } else { + if (!countDownLatch.await(timeout.getDuration(), timeout.getTimeUnit())) { + throw new InterruptedIOException("Timeout blocked waiting for output (" + timeout + ")"); + } + } + propagateException(); + final SharedOutputBuffer outputBuffer = bufferRef.get(); + return () -> { + final HttpEntity requestEntity = request.getEntity(); + if (requestEntity != null) { + try (final InternalOutputStream outputStream = new InternalOutputStream(outputBuffer)) { + requestEntity.writeTo(outputStream); + } + } + }; + } + + @Override + public void sendRequest(final RequestChannel channel, final HttpContext context) throws HttpException, IOException { + final HttpEntity requestEntity = request.getEntity(); + final SharedOutputBuffer buffer = requestEntity != null ? new SharedOutputBuffer(initialBufferSize) : null; + bufferRef.set(buffer); + repeatable = requestEntity == null || requestEntity.isRepeatable(); + channel.sendRequest(request, requestEntity, null); + countDownLatch.countDown(); + } + + @Override + public boolean isRepeatable() { + return repeatable; + } + + @Override + public int available() { + final SharedOutputBuffer buffer = bufferRef.get(); + if (buffer != null) { + return buffer.length(); + } + return 0; + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + final SharedOutputBuffer buffer = bufferRef.get(); + if (buffer != null) { + buffer.flush(channel); + } + } + + @Override + public void failed(final Exception cause) { + try { + exceptionRef.set(cause); + } finally { + countDownLatch.countDown(); + } + } + + @Override + public void releaseResources() { + } + + class InternalOutputStream extends OutputStream { + + private final SharedOutputBuffer buffer; + + public InternalOutputStream(final SharedOutputBuffer buffer) { + Asserts.notNull(buffer, "Shared buffer"); + this.buffer = buffer; + } + + @Override + public void close() throws IOException { + propagateException(); + this.buffer.writeCompleted(timeout); + } + + @Override + public void flush() throws IOException { + propagateException(); + } + + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + propagateException(); + this.buffer.write(b, off, len, timeout); + } + + @Override + public void write(final byte[] b) throws IOException { + propagateException(); + if (b == null) { + return; + } + this.buffer.write(b, 0, b.length, timeout); + } + + @Override + public void write(final int b) throws IOException { + propagateException(); + this.buffer.write(b, timeout); + } + + } + +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/ClassicToAsyncResponseConsumer.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/ClassicToAsyncResponseConsumer.java new file mode 100644 index 0000000000..2a47e741d3 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/ClassicToAsyncResponseConsumer.java @@ -0,0 +1,331 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.compat; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hc.core5.annotation.Experimental; +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.function.Supplier; +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.io.entity.AbstractHttpEntity; +import org.apache.hc.core5.http.io.support.ClassicResponseBuilder; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.io.Closer; +import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.Asserts; +import org.apache.hc.core5.util.Timeout; + +/** + * TODO: to be replaced by core functionality + */ +@Experimental +@Internal +class ClassicToAsyncResponseConsumer implements AsyncResponseConsumer { + + static class ResponseData { + + final HttpResponse head; + final EntityDetails entityDetails; + + ResponseData(final HttpResponse head, + final EntityDetails entityDetails) { + this.head = head; + this.entityDetails = entityDetails; + } + + } + + private final int initialBufferSize; + private final Timeout timeout; + private final CountDownLatch countDownLatch; + private final AtomicReference responseRef; + private final AtomicReference> callbackRef; + private final AtomicReference bufferRef; + private final AtomicReference exceptionRef; + + public ClassicToAsyncResponseConsumer(final int initialBufferSize, final Timeout timeout) { + this.initialBufferSize = Args.positive(initialBufferSize, "Initial buffer size"); + this.timeout = timeout; + this.countDownLatch = new CountDownLatch(1); + this.responseRef = new AtomicReference<>(); + this.callbackRef = new AtomicReference<>(); + this.bufferRef = new AtomicReference<>(); + this.exceptionRef = new AtomicReference<>(); + } + + public ClassicToAsyncResponseConsumer(final Timeout timeout) { + this(ClassicToAsyncSupport.INITIAL_BUF_SIZE, timeout); + } + + void propagateException() throws IOException { + final Exception ex = exceptionRef.getAndSet(null); + if (ex != null) { + ClassicToAsyncSupport.rethrow(ex); + } + } + + void fireComplete() throws IOException { + final FutureCallback callback = callbackRef.getAndSet(null); + if (callback != null) { + callback.completed(null); + } + } + + public ClassicHttpResponse blockWaiting() throws IOException, InterruptedException { + if (timeout == null) { + countDownLatch.await(); + } else { + if (!countDownLatch.await(timeout.getDuration(), timeout.getTimeUnit())) { + throw new InterruptedIOException("Timeout blocked waiting for input (" + timeout + ")"); + } + } + propagateException(); + final ResponseData r = responseRef.getAndSet(null); + Asserts.notNull(r, "HTTP response is missing"); + final SharedInputBuffer inputBuffer = bufferRef.get(); + return ClassicResponseBuilder.create(r.head.getCode()) + .setHeaders(r.head.getHeaders()) + .setVersion(r.head.getVersion()) + .setEntity(r.entityDetails != null ? + new IncomingHttpEntity(new InternalInputStream(inputBuffer), r.entityDetails) : + null) + .build(); + } + + @Override + public void consumeResponse(final HttpResponse asyncResponse, + final EntityDetails entityDetails, + final HttpContext context, + final FutureCallback resultCallback) throws HttpException, IOException { + callbackRef.set(resultCallback); + final ResponseData responseData = new ResponseData(asyncResponse, entityDetails); + responseRef.set(responseData); + if (entityDetails != null) { + bufferRef.set(new SharedInputBuffer(initialBufferSize)); + } else { + fireComplete(); + } + countDownLatch.countDown(); + } + + @Override + public void informationResponse(final HttpResponse response, + final HttpContext context) throws HttpException, IOException { + } + + @Override + public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + final SharedInputBuffer buffer = bufferRef.get(); + if (buffer != null) { + buffer.updateCapacity(capacityChannel); + } + } + + @Override + public final void consume(final ByteBuffer src) throws IOException { + final SharedInputBuffer buffer = bufferRef.get(); + if (buffer != null) { + buffer.fill(src); + } + } + + @Override + public final void streamEnd(final List trailers) throws HttpException, IOException { + final SharedInputBuffer buffer = bufferRef.get(); + if (buffer != null) { + buffer.markEndStream(); + } + } + + @Override + public final void failed(final Exception cause) { + try { + exceptionRef.set(cause); + } finally { + countDownLatch.countDown(); + } + } + + @Override + public void releaseResources() { + } + + class InternalInputStream extends InputStream { + + private final SharedInputBuffer buffer; + + InternalInputStream(final SharedInputBuffer buffer) { + super(); + Args.notNull(buffer, "Input buffer"); + this.buffer = buffer; + } + + @Override + public int available() throws IOException { + propagateException(); + return this.buffer.length(); + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + propagateException(); + if (len == 0) { + return 0; + } + final int bytesRead = this.buffer.read(b, off, len, timeout); + if (bytesRead == -1) { + fireComplete(); + } + return bytesRead; + } + + @Override + public int read(final byte[] b) throws IOException { + propagateException(); + if (b == null) { + return 0; + } + final int bytesRead = this.buffer.read(b, 0, b.length, timeout); + if (bytesRead == -1) { + fireComplete(); + } + return bytesRead; + } + + @Override + public int read() throws IOException { + propagateException(); + final int b = this.buffer.read(timeout); + if (b == -1) { + fireComplete(); + } + return b; + } + + @Override + public void close() throws IOException { + // read and discard the remainder of the message + final byte[] tmp = new byte[1024]; + do { + /* empty */ + } while (read(tmp) >= 0); + super.close(); + } + + } + + static class IncomingHttpEntity implements HttpEntity { + + private final InputStream content; + private final EntityDetails entityDetails; + + IncomingHttpEntity(final InputStream content, final EntityDetails entityDetails) { + this.content = content; + this.entityDetails = entityDetails; + } + + @Override + public boolean isRepeatable() { + return false; + } + + @Override + public boolean isChunked() { + return entityDetails.isChunked(); + } + + @Override + public long getContentLength() { + return entityDetails.getContentLength(); + } + + @Override + public String getContentType() { + return entityDetails.getContentType(); + } + + @Override + public String getContentEncoding() { + return entityDetails.getContentEncoding(); + } + + @Override + public InputStream getContent() throws IOException, IllegalStateException { + return content; + } + + @Override + public boolean isStreaming() { + return content != null; + } + + @Override + public void writeTo(final OutputStream outStream) throws IOException { + AbstractHttpEntity.writeTo(this, outStream); + } + + @Override + public Supplier> getTrailers() { + return null; + } + + @Override + public Set getTrailerNames() { + return Collections.emptySet(); + } + + @Override + public void close() throws IOException { + Closer.close(content); + } + + @Override + public String toString() { + return entityDetails.toString(); + } + + } + +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/ClassicToAsyncSupport.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/ClassicToAsyncSupport.java new file mode 100644 index 0000000000..31c74b2cd8 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/ClassicToAsyncSupport.java @@ -0,0 +1,53 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.impl.compat; + +import java.io.IOException; + +import org.apache.hc.core5.http.HttpException; + +final class ClassicToAsyncSupport { + + final static int INITIAL_BUF_SIZE = 2048; + + static void rethrow(final Throwable ex) throws IOException { + if (ex instanceof Error) { + throw (Error) ex; + } else if (ex instanceof RuntimeException) { + throw (RuntimeException) ex; + } else if (ex instanceof IOException) { + throw new TransportException((IOException) ex); + } else if (ex instanceof HttpException) { + throw new ProtocolException((HttpException) ex); + } else { + // Unexpected exception type + throw new IllegalStateException(ex); + } + } + +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/IOCallback.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/IOCallback.java new file mode 100644 index 0000000000..e9582d61dc --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/IOCallback.java @@ -0,0 +1,42 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.impl.compat; + +import java.io.IOException; + +/** + * Abstract I/O callback. + * + * @param the type of the input to the operation. + * @since 5.0 + */ +interface IOCallback { + + void execute(T object) throws IOException; + +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/ProtocolException.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/ProtocolException.java new file mode 100644 index 0000000000..b054ebbb07 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/ProtocolException.java @@ -0,0 +1,45 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.impl.compat; + +import java.io.IOException; + +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.http.HttpException; + +/** + * TODO: to be replaced by core functionality + */ +@Internal +public class ProtocolException extends IOException { + + public ProtocolException(final HttpException ex) { + super(ex); + } + +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/SharedInputBuffer.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/SharedInputBuffer.java new file mode 100644 index 0000000000..5bd0835482 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/SharedInputBuffer.java @@ -0,0 +1,200 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.compat; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.support.classic.ContentInputBuffer; +import org.apache.hc.core5.util.Timeout; + +/** + * TODO: to be replaced by core functionality + */ +@Internal +final class SharedInputBuffer extends AbstractSharedBuffer implements ContentInputBuffer { + + private final int initialBufferSize; + private final AtomicInteger capacityIncrement; + + private volatile CapacityChannel capacityChannel; + + public SharedInputBuffer(final ReentrantLock lock, final int initialBufferSize) { + super(lock, initialBufferSize); + this.initialBufferSize = initialBufferSize; + this.capacityIncrement = new AtomicInteger(0); + } + + public SharedInputBuffer(final int bufferSize) { + this(new ReentrantLock(), bufferSize); + } + + public int fill(final ByteBuffer src) { + lock.lock(); + try { + setInputMode(); + ensureAdjustedCapacity(buffer().position() + src.remaining()); + buffer().put(src); + final int remaining = buffer().remaining(); + condition.signalAll(); + return remaining; + } finally { + lock.unlock(); + } + } + + private void incrementCapacity() throws IOException { + if (capacityChannel != null) { + final int increment = capacityIncrement.getAndSet(0); + if (increment > 0) { + capacityChannel.update(increment); + } + } + } + + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + lock.lock(); + try { + this.capacityChannel = capacityChannel; + setInputMode(); + if (buffer().position() == 0) { + capacityChannel.update(initialBufferSize); + } + } finally { + lock.unlock(); + } + } + + private void awaitInput(final Timeout timeout) throws InterruptedIOException { + if (!buffer().hasRemaining()) { + setInputMode(); + while (buffer().position() == 0 && !endStream && !aborted) { + try { + if (timeout == null) { + condition.await(); + } else { + if (!condition.await(timeout.getDuration(), timeout.getTimeUnit())) { + throw new InterruptedIOException("Timeout blocked waiting for input (" + timeout + ")"); + } + } + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException(ex.getMessage()); + } + } + setOutputMode(); + } + } + + private void ensureNotAborted() throws InterruptedIOException { + if (aborted) { + throw new InterruptedIOException("Operation aborted"); + } + } + + @Override + public int read() throws IOException { + return read(null); + } + + /** + * @since 5.4 + */ + public int read(final Timeout timeout) throws IOException { + lock.lock(); + try { + setOutputMode(); + awaitInput(timeout); + ensureNotAborted(); + if (!buffer().hasRemaining() && endStream) { + return -1; + } + final int b = buffer().get() & 0xff; + capacityIncrement.incrementAndGet(); + if (!buffer().hasRemaining()) { + incrementCapacity(); + } + return b; + } finally { + lock.unlock(); + } + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + return read(b, off, len, null); + } + + /** + * @since 5.4 + */ + public int read(final byte[] b, final int off, final int len, final Timeout timeout) throws IOException { + if (len == 0) { + return 0; + } + lock.lock(); + try { + setOutputMode(); + awaitInput(timeout); + ensureNotAborted(); + if (!buffer().hasRemaining() && endStream) { + return -1; + } + final int chunk = Math.min(buffer().remaining(), len); + buffer().get(b, off, chunk); + capacityIncrement.addAndGet(chunk); + if (!buffer().hasRemaining()) { + incrementCapacity(); + } + return chunk; + } finally { + lock.unlock(); + } + } + + public void markEndStream() { + if (endStream) { + return; + } + lock.lock(); + try { + if (!endStream) { + endStream = true; + capacityChannel = null; + condition.signalAll(); + } + } finally { + lock.unlock(); + } + } + +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/SharedOutputBuffer.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/SharedOutputBuffer.java new file mode 100644 index 0000000000..f75188cacc --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/SharedOutputBuffer.java @@ -0,0 +1,216 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.compat; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.support.classic.ContentOutputBuffer; +import org.apache.hc.core5.util.Timeout; + +/** + * TODO: to be replaced by core functionality + */ +@Internal +final class SharedOutputBuffer extends AbstractSharedBuffer implements ContentOutputBuffer { + + private final AtomicBoolean endStreamPropagated; + private volatile DataStreamChannel dataStreamChannel; + private volatile boolean hasCapacity; + + public SharedOutputBuffer(final ReentrantLock lock, final int initialBufferSize) { + super(lock, initialBufferSize); + this.hasCapacity = false; + this.endStreamPropagated = new AtomicBoolean(); + } + + public SharedOutputBuffer(final int bufferSize) { + this(new ReentrantLock(), bufferSize); + } + + public void flush(final DataStreamChannel channel) throws IOException { + lock.lock(); + try { + dataStreamChannel = channel; + hasCapacity = true; + setOutputMode(); + if (buffer().hasRemaining()) { + dataStreamChannel.write(buffer()); + } + if (!buffer().hasRemaining() && endStream) { + propagateEndStream(); + } + condition.signalAll(); + } finally { + lock.unlock(); + } + } + + private void ensureNotAborted() throws InterruptedIOException { + if (aborted) { + throw new InterruptedIOException("Operation aborted"); + } + } + + /** + * @since 5.4 + */ + public void write(final byte[] b, final int off, final int len, final Timeout timeout) throws IOException { + final ByteBuffer src = ByteBuffer.wrap(b, off, len); + lock.lock(); + try { + ensureNotAborted(); + setInputMode(); + while (src.hasRemaining()) { + // always buffer small chunks + if (src.remaining() < 1024 && buffer().remaining() > src.remaining()) { + buffer().put(src); + } else { + if (buffer().position() > 0 || dataStreamChannel == null) { + waitFlush(timeout); + } + if (buffer().position() == 0 && dataStreamChannel != null) { + final int bytesWritten = dataStreamChannel.write(src); + if (bytesWritten == 0) { + hasCapacity = false; + waitFlush(timeout); + } + } + } + } + } finally { + lock.unlock(); + } + } + + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + write(b, off, len, null); + } + + /** + * @since 5.4 + */ + public void write(final int b, final Timeout timeout) throws IOException { + lock.lock(); + try { + ensureNotAborted(); + setInputMode(); + if (!buffer().hasRemaining()) { + waitFlush(timeout); + } + buffer().put((byte)b); + } finally { + lock.unlock(); + } + } + + @Override + public void write(final int b) throws IOException { + write(b, null); + } + + /** + * @since 5.4 + */ + public void writeCompleted(final Timeout timeout) throws IOException { + if (endStream) { + return; + } + lock.lock(); + try { + if (!endStream) { + endStream = true; + if (dataStreamChannel != null) { + setOutputMode(); + if (buffer().hasRemaining()) { + dataStreamChannel.requestOutput(); + waitEndStream(timeout); + } else { + propagateEndStream(); + } + } + } + } finally { + lock.unlock(); + } + } + + @Override + public void writeCompleted() throws IOException { + writeCompleted(null); + } + + private void waitFlush(final Timeout timeout) throws InterruptedIOException { + if (dataStreamChannel != null) { + dataStreamChannel.requestOutput(); + } + setOutputMode(); + while (buffer().hasRemaining() || !hasCapacity) { + ensureNotAborted(); + waitForSignal(timeout); + } + setInputMode(); + } + + private void waitEndStream(final Timeout timeout) throws InterruptedIOException { + if (dataStreamChannel != null) { + dataStreamChannel.requestOutput(); + } + while (!endStreamPropagated.get() && !aborted) { + waitForSignal(timeout); + } + } + + private void waitForSignal(final Timeout timeout) throws InterruptedIOException { + try { + if (timeout == null) { + condition.await(); + } else { + if (!condition.await(timeout.getDuration(), timeout.getTimeUnit())) { + aborted = true; + throw new InterruptedIOException("Timeout blocked waiting for output (" + timeout + ")"); + } + } + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException(ex.getMessage()); + } + } + + private void propagateEndStream() throws IOException { + if (endStreamPropagated.compareAndSet(false, true)) { + dataStreamChannel.endStream(); + } + } + +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/TransportException.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/TransportException.java new file mode 100644 index 0000000000..b44e2b7794 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/compat/TransportException.java @@ -0,0 +1,44 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.impl.compat; + +import java.io.IOException; + +import org.apache.hc.core5.annotation.Internal; + +/** + * TODO: to be replaced by core functionality + */ +@Internal +public class TransportException extends IOException { + + public TransportException(final IOException ex) { + super(ex); + } + +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/ClientClassicOverAsync.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/ClientClassicOverAsync.java new file mode 100644 index 0000000000..dd6d61c23f --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/ClientClassicOverAsync.java @@ -0,0 +1,79 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.examples; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +import org.apache.hc.client5.http.classic.methods.HttpGet; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.core5.annotation.Experimental; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.message.StatusLine; +import org.apache.hc.core5.util.Timeout; + +/** + * This example demonstrates create an {@link CloseableHttpClient} adaptor over + * {@link CloseableHttpAsyncClient} providing compatibility with the classic APIs + * based on the {@link java.io.InputStream} / {@link java.io.OutputStream} model + */ +@Experimental +public class ClientClassicOverAsync { + + public static void main(final String[] args) throws Exception { + try (final CloseableHttpClient httpclient = HttpAsyncClients.classic( + HttpAsyncClients.createDefault(), + Timeout.ofMinutes(1))) { + final HttpGet httpget = new HttpGet("http://httpbin.org/get"); + System.out.println("Executing request " + httpget.getMethod() + " " + httpget.getUri()); + httpclient.execute(httpget, response -> { + System.out.println("----------------------------------------"); + System.out.println(httpget + "->" + new StatusLine(response)); + final HttpEntity entity = response.getEntity(); + if (entity != null) { + final ContentType contentType = ContentType.parseLenient(entity.getContentType()); + final Charset charset = ContentType.getCharset(contentType, StandardCharsets.UTF_8); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(entity.getContent(), charset))) { + String line; + while ((line = reader.readLine()) != null) { + System.out.println(line); + } + } + } + return null; + }); + } + } + +} +