diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/DeflatingAsyncEntityProducer.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/DeflatingAsyncEntityProducer.java
new file mode 100644
index 0000000000..820d0482c8
--- /dev/null
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/DeflatingAsyncEntityProducer.java
@@ -0,0 +1,239 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ *
The delegate’s bytes are read in small chunks, compressed with + * {@link java.util.zip.Deflater} and written immediately to the HTTP I/O + * layer. Memory use is therefore bounded even for very large request + * entities.
+ * + * @since 5.6 + */ +public final class DeflatingAsyncEntityProducer implements AsyncEntityProducer { + + /** + * inbound copy‐buffer + */ + private static final int IN_BUF = 8 * 1024; + /** + * outbound staging buffer + */ + private static final int OUT_BUF = 8 * 1024; + + private final AsyncEntityProducer delegate; + private final String contentType; + private final Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, /*nowrap=*/true); + + /** + * holds compressed bytes not yet sent downstream + */ + private final ByteBuffer pending = ByteBuffer.allocate(OUT_BUF); + private final byte[] in = new byte[IN_BUF]; + + private final AtomicBoolean delegateEnded = new AtomicBoolean(false); + private boolean finished = false; + + public DeflatingAsyncEntityProducer(final AsyncEntityProducer delegate) { + this.delegate = Args.notNull(delegate, "delegate"); + this.contentType = delegate.getContentType(); + /* place pending into “read-mode” with no data */ + pending.flip(); + } + + // ------------------------------------------------------------------ metadata + + @Override + public boolean isRepeatable() { + return false; + } + + @Override + public long getContentLength() { + return -1; + } // unknown + + @Override + public String getContentType() { + return contentType; + } + + @Override + public String getContentEncoding() { + return "deflate"; + } + + @Override + public boolean isChunked() { + return true; + } + + @Override + public SetStreaming, non-blocking {@link AsyncDataConsumer} that transparently + * inflates a response whose {@code Content-Encoding} is {@code deflate}. + * The decoded bytes are pushed straight to the wrapped downstream consumer + * while honouring reactor back-pressure.
+ * + *The implementation understands both formats that exist “in the wild”: the + * raw DEFLATE stream (RFC 1951) and the zlib-wrapped variant (RFC 1950). + * If the caller does not specify which one to expect, the first two bytes of + * the stream are inspected and the proper decoder is chosen automatically.
+ * + *No {@code InputStream}/{@code OutputStream} buffering is used; memory + * footprint is bounded and suitable for very large payloads.
+ * + * @since 5.6 + */ +public final class InflatingAsyncDataConsumer implements AsyncDataConsumer { + + private final AsyncDataConsumer downstream; + private final Boolean nowrapHint; + private Inflater inflater; + private boolean formatChosen; + private final byte[] out = new byte[8 * 1024]; + private final AtomicBoolean closed = new AtomicBoolean(false); + + public InflatingAsyncDataConsumer( + final AsyncDataConsumer downstream, final Boolean nowrapHint) { + this.downstream = downstream; + this.nowrapHint = nowrapHint; + this.inflater = new Inflater(nowrapHint == null || nowrapHint); + } + + @Override + public void updateCapacity(final CapacityChannel ch) throws IOException { + downstream.updateCapacity(ch); + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + if (closed.get()) { + return; + } + + if (nowrapHint == null && !formatChosen && src.remaining() >= 2) { + src.mark(); + final int b0 = src.get() & 0xFF; + final int b1 = src.get() & 0xFF; + src.reset(); + final boolean zlib = b0 == 0x78 && + (b1 == 0x01 || b1 == 0x5E || b1 == 0x9C || b1 == 0xDA); + if (zlib) { + inflater.end(); + inflater = new Inflater(false); + } + formatChosen = true; + } + + final byte[] in = new byte[src.remaining()]; + src.get(in); + inflater.setInput(in); + + try { + int n; + while ((n = inflater.inflate(out)) > 0) { + downstream.consume(ByteBuffer.wrap(out, 0, n)); + } + if (inflater.needsDictionary()) { + throw new IOException("Deflate dictionary required"); + } + } catch (final DataFormatException ex) { + throw new IOException("Corrupt DEFLATE stream", ex); + } + } + + @Override + public void streamEnd(final List extends Header> trailers) + throws HttpException, IOException { + if (closed.compareAndSet(false, true)) { + inflater.end(); + downstream.streamEnd(trailers); + } + } + + @Override + public void releaseResources() { + inflater = null; + downstream.releaseResources(); + } +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java new file mode 100644 index 0000000000..c70d6a6df2 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java @@ -0,0 +1,204 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + *The map’s insertion order defines the {@code Accept-Encoding} + * preference list sent on every request.
+ * + * @param contentDecoderMap a non-empty {@link LinkedHashMap} whose keys are + * lower-case coding tokens and whose values create + * a fresh wrapper consumer for each response + * @return {@code this} builder instance + * @since 5.6 + */ + public HttpAsyncClientBuilder setContentDecoderMap( + final LinkedHashMap
@@ -1001,6 +1045,19 @@ public CloseableHttpAsyncClient build() {
ChainElement.RETRY.name());
}
+ if (!contentCompressionDisabled) {
+ if (contentDecoderMap != null && !contentDecoderMap.isEmpty()) {
+ execChainDefinition.addFirst(
+ new ContentCompressionAsyncExec(contentDecoderMap, true),
+ ChainElement.COMPRESS.name());
+ } else {
+ execChainDefinition.addFirst(
+ new ContentCompressionAsyncExec(),
+ ChainElement.COMPRESS.name());
+ }
+ }
+
+
HttpRoutePlanner routePlannerCopy = this.routePlanner;
if (routePlannerCopy == null) {
SchemePortResolver schemePortResolverCopy = this.schemePortResolver;
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/DeflatingAsyncEntityProducerTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/DeflatingAsyncEntityProducerTest.java
new file mode 100644
index 0000000000..01c22a2ac3
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/DeflatingAsyncEntityProducerTest.java
@@ -0,0 +1,127 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * Key take-aways: In the DEFLATE algorithm the server deflates (compresses)
+ * the payload; the client then inflates (decompresses) it.
+ * The example calls https://httpbin.org/deflate,
+ * whose response is deliberately compressed. Because decompression is now
+ * part of the execution pipeline, the inner
+ * {@link org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer}
+ * receives plain UTF-8 text and the program prints the readable JSON. If you need to stream the inflated bytes into a file or a custom parser,
+ * just replace the {@code StringAsyncEntityConsumer} with your own
+ * {@code AsyncDataConsumer}; no further changes are required.maxChunk bytes per write.
+ */
+ private static final class ThrottledChannel implements DataStreamChannel {
+ private final ByteArrayOutputStream sink = new ByteArrayOutputStream();
+ private final int maxChunk;
+
+ ThrottledChannel(final int max) {
+ this.maxChunk = max;
+ }
+
+ @Override
+ public void requestOutput() {
+ }
+
+ @Override
+ public int write(final ByteBuffer src) {
+ final int len = Math.min(src.remaining(), maxChunk);
+ final byte[] tmp = new byte[len];
+ src.get(tmp);
+ sink.write(tmp, 0, len);
+ return len;
+ }
+
+ @Override
+ public void endStream() {
+ }
+
+ @Override
+ public void endStream(final List extends Header> trailers) {
+ }
+
+ byte[] data() {
+ return sink.toByteArray();
+ }
+ }
+
+ private static String longText() {
+ final String seed = "lorem ipsum äëïöü – ";
+ final StringBuilder sb = new StringBuilder(seed.length() * 3000);
+ for (int i = 0; i < 3000; i++) {
+ sb.append(seed);
+ }
+ return sb.toString();
+ }
+
+ @Test
+ void roundTrip() throws Exception {
+ final String text = longText();
+
+ final AsyncEntityProducer raw =
+ new StringAsyncEntityProducer(text, ContentType.TEXT_PLAIN);
+ final DeflatingAsyncEntityProducer def =
+ new DeflatingAsyncEntityProducer(raw);
+
+ final ThrottledChannel ch = new ThrottledChannel(1024);
+
+ while (def.available() > 0) {
+ def.produce(ch);
+ }
+
+ final byte[] compressed = ch.data();
+ assertTrue(compressed.length > 0);
+
+ // Inflate (raw DEFLATE)
+ final Inflater inflater = new Inflater(true);
+ inflater.setInput(compressed);
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final byte[] buf = new byte[8192];
+ while (!inflater.finished()) {
+ final int n = inflater.inflate(buf);
+ if (n == 0 && inflater.needsInput()) break;
+ out.write(buf, 0, n);
+ }
+ inflater.end();
+
+ assertEquals(text, out.toString("UTF-8"));
+ assertEquals("deflate", def.getContentEncoding());
+ assertTrue(def.isChunked());
+ assertEquals(-1, def.getContentLength());
+ }
+}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/InflatingAsyncEntityConsumerTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/InflatingAsyncEntityConsumerTest.java
new file mode 100644
index 0000000000..fca30dc825
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/InflatingAsyncEntityConsumerTest.java
@@ -0,0 +1,210 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ *
+ *
+ *
+ * @since 5.6
+ */
+public class AsyncClientDeflateCompressionExample {
+
+ public static void main(final String[] args) throws Exception {
+ try (final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault()) {
+ client.start();
+
+ final String json = "{\"msg\":\"hello deflated world\"}";
+ final AsyncEntityProducer raw =
+ new StringAsyncEntityProducer(json, ContentType.APPLICATION_JSON);
+
+ final AsyncEntityProducer deflated = new DeflatingAsyncEntityProducer(raw);
+
+ final SimpleHttpRequest request = SimpleRequestBuilder
+ .post("https://httpbin.org/post")
+ .build();
+
+ final Future