From 5ddd9d49cac297d24f7b0391e317e61a65054eea Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Fri, 18 Jul 2025 15:04:51 +0200 Subject: [PATCH] HTTPCLIENT-1822: async transparent content decompression. Add DeflatingAsyncEntityProducer and InflatingAsyncEntityCunsumer using Deflater / Inflater API directly --- .../methods/DeflatingAsyncEntityProducer.java | 239 ++++++++++++++++++ .../methods/InflatingAsyncDataConsumer.java | 129 ++++++++++ .../async/ContentCompressionAsyncExec.java | 204 +++++++++++++++ .../impl/async/HttpAsyncClientBuilder.java | 57 +++++ .../DeflatingAsyncEntityProducerTest.java | 127 ++++++++++ .../InflatingAsyncEntityConsumerTest.java | 210 +++++++++++++++ .../AsyncClientDeflateCompressionExample.java | 105 ++++++++ ...syncClientInflateDecompressionExample.java | 104 ++++++++ .../TestContentCompressionAsyncExec.java | 188 ++++++++++++++ 9 files changed, 1363 insertions(+) create mode 100644 httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/DeflatingAsyncEntityProducer.java create mode 100644 httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/InflatingAsyncDataConsumer.java create mode 100644 httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java create mode 100644 httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/DeflatingAsyncEntityProducerTest.java create mode 100644 httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/InflatingAsyncEntityConsumerTest.java create mode 100644 httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientDeflateCompressionExample.java create mode 100644 httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientInflateDecompressionExample.java create mode 100644 httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestContentCompressionAsyncExec.java diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/DeflatingAsyncEntityProducer.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/DeflatingAsyncEntityProducer.java new file mode 100644 index 0000000000..820d0482c8 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/DeflatingAsyncEntityProducer.java @@ -0,0 +1,239 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.async.methods; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.zip.Deflater; + +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.util.Args; + +/** + * {@code AsyncEntityProducer} that streams the output of another producer + * through the raw DEFLATE compression algorithm. + * + *

The delegate’s bytes are read in small chunks, compressed with + * {@link java.util.zip.Deflater} and written immediately to the HTTP I/O + * layer. Memory use is therefore bounded even for very large request + * entities.

+ * + * @since 5.6 + */ +public final class DeflatingAsyncEntityProducer implements AsyncEntityProducer { + + /** + * inbound copy‐buffer + */ + private static final int IN_BUF = 8 * 1024; + /** + * outbound staging buffer + */ + private static final int OUT_BUF = 8 * 1024; + + private final AsyncEntityProducer delegate; + private final String contentType; + private final Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, /*nowrap=*/true); + + /** + * holds compressed bytes not yet sent downstream + */ + private final ByteBuffer pending = ByteBuffer.allocate(OUT_BUF); + private final byte[] in = new byte[IN_BUF]; + + private final AtomicBoolean delegateEnded = new AtomicBoolean(false); + private boolean finished = false; + + public DeflatingAsyncEntityProducer(final AsyncEntityProducer delegate) { + this.delegate = Args.notNull(delegate, "delegate"); + this.contentType = delegate.getContentType(); + /* place pending into “read-mode” with no data */ + pending.flip(); + } + + // ------------------------------------------------------------------ metadata + + @Override + public boolean isRepeatable() { + return false; + } + + @Override + public long getContentLength() { + return -1; + } // unknown + + @Override + public String getContentType() { + return contentType; + } + + @Override + public String getContentEncoding() { + return "deflate"; + } + + @Override + public boolean isChunked() { + return true; + } + + @Override + public Set getTrailerNames() { + return Collections.emptySet(); + } + + @Override + public int available() { + if (pending.hasRemaining()) { + return pending.remaining(); + } + return delegate.available(); + } + + // ------------------------------------------------------------------ core + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + /* 1 — flush any leftover compressed bytes first */ + if (flushPending(channel)) { + return; // back-pressure: outer channel could not accept more + } + + /* 2 — pull more data from delegate */ + delegate.produce(new InnerChannel(channel)); + + /* 3 — if delegate ended, finish the deflater */ + if (delegateEnded.get() && !finished) { + deflater.finish(); + deflateToPending(); + flushPending(channel); + if (!pending.hasRemaining()) { + finished = true; + channel.endStream(); + } + } + } + + /** + * copy as much as possible from {@link #pending} to the wire + */ + private boolean flushPending(final DataStreamChannel ch) throws IOException { + while (pending.hasRemaining()) { + final int written = ch.write(pending); + if (written == 0) { + return true; // back-pressure + } + } + pending.clear().flip(); // no data left → empty read-mode + return false; + } + + /** + * drain {@link #deflater} into {@link #pending} + */ + private void deflateToPending() { + /* switch pending to write-mode */ + pending.compact(); + final byte[] out = pending.array(); + int total; + do { + total = deflater.deflate(out, pending.position(), pending.remaining(), + Deflater.NO_FLUSH); + pending.position(pending.position() + total); + if (!pending.hasRemaining() && total > 0) { + /* buffer full: grow to the next power of two */ + final ByteBuffer bigger = ByteBuffer.allocate(pending.capacity() * 2); + pending.flip(); + bigger.put(pending); + pending.clear(); + pending.put(bigger); + } + } while (total > 0); + pending.flip(); // back to read-mode + } + + // ------------------------------------------------------------------ inner channel that receives raw bytes + + private final class InnerChannel implements DataStreamChannel { + private final DataStreamChannel outer; + + InnerChannel(final DataStreamChannel outer) { + this.outer = outer; + } + + @Override + public void requestOutput() { + outer.requestOutput(); + } + + @Override + public int write(final ByteBuffer src) throws IOException { + int consumed = 0; + while (src.hasRemaining()) { + final int chunk = Math.min(src.remaining(), in.length); + src.get(in, 0, chunk); + deflater.setInput(in, 0, chunk); + consumed += chunk; + deflateToPending(); + if (flushPending(outer)) { // honour back-pressure + break; + } + } + return consumed; + } + + @Override + public void endStream() { + delegateEnded.set(true); + } + + @Override + public void endStream(final List trailers) { + endStream(); + } + } + + // ------------------------------------------------------------------ error / cleanup + + @Override + public void failed(final Exception cause) { + delegate.failed(cause); + } + + @Override + public void releaseResources() { + delegate.releaseResources(); + deflater.end(); + } +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/InflatingAsyncDataConsumer.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/InflatingAsyncDataConsumer.java new file mode 100644 index 0000000000..76a846877d --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/InflatingAsyncDataConsumer.java @@ -0,0 +1,129 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.async.methods; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; + +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.nio.AsyncDataConsumer; +import org.apache.hc.core5.http.nio.CapacityChannel; + +/** + *

Streaming, non-blocking {@link AsyncDataConsumer} that transparently + * inflates a response whose {@code Content-Encoding} is {@code deflate}. + * The decoded bytes are pushed straight to the wrapped downstream consumer + * while honouring reactor back-pressure.

+ * + *

The implementation understands both formats that exist “in the wild”: the + * raw DEFLATE stream (RFC 1951) and the zlib-wrapped variant (RFC 1950). + * If the caller does not specify which one to expect, the first two bytes of + * the stream are inspected and the proper decoder is chosen automatically.

+ * + *

No {@code InputStream}/{@code OutputStream} buffering is used; memory + * footprint is bounded and suitable for very large payloads.

+ * + * @since 5.6 + */ +public final class InflatingAsyncDataConsumer implements AsyncDataConsumer { + + private final AsyncDataConsumer downstream; + private final Boolean nowrapHint; + private Inflater inflater; + private boolean formatChosen; + private final byte[] out = new byte[8 * 1024]; + private final AtomicBoolean closed = new AtomicBoolean(false); + + public InflatingAsyncDataConsumer( + final AsyncDataConsumer downstream, final Boolean nowrapHint) { + this.downstream = downstream; + this.nowrapHint = nowrapHint; + this.inflater = new Inflater(nowrapHint == null || nowrapHint); + } + + @Override + public void updateCapacity(final CapacityChannel ch) throws IOException { + downstream.updateCapacity(ch); + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + if (closed.get()) { + return; + } + + if (nowrapHint == null && !formatChosen && src.remaining() >= 2) { + src.mark(); + final int b0 = src.get() & 0xFF; + final int b1 = src.get() & 0xFF; + src.reset(); + final boolean zlib = b0 == 0x78 && + (b1 == 0x01 || b1 == 0x5E || b1 == 0x9C || b1 == 0xDA); + if (zlib) { + inflater.end(); + inflater = new Inflater(false); + } + formatChosen = true; + } + + final byte[] in = new byte[src.remaining()]; + src.get(in); + inflater.setInput(in); + + try { + int n; + while ((n = inflater.inflate(out)) > 0) { + downstream.consume(ByteBuffer.wrap(out, 0, n)); + } + if (inflater.needsDictionary()) { + throw new IOException("Deflate dictionary required"); + } + } catch (final DataFormatException ex) { + throw new IOException("Corrupt DEFLATE stream", ex); + } + } + + @Override + public void streamEnd(final List trailers) + throws HttpException, IOException { + if (closed.compareAndSet(false, true)) { + inflater.end(); + downstream.streamEnd(trailers); + } + } + + @Override + public void releaseResources() { + inflater = null; + downstream.releaseResources(); + } +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java new file mode 100644 index 0000000000..c70d6a6df2 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java @@ -0,0 +1,204 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.async; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Locale; +import java.util.Set; +import java.util.function.UnaryOperator; + +import org.apache.hc.client5.http.async.AsyncExecCallback; +import org.apache.hc.client5.http.async.AsyncExecChain; +import org.apache.hc.client5.http.async.AsyncExecChainHandler; +import org.apache.hc.client5.http.async.methods.InflatingAsyncDataConsumer; +import org.apache.hc.client5.http.entity.compress.ContentCoding; +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.core5.annotation.Contract; +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.annotation.ThreadingBehavior; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HeaderElement; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.config.Lookup; +import org.apache.hc.core5.http.config.RegistryBuilder; +import org.apache.hc.core5.http.message.BasicHeaderValueParser; +import org.apache.hc.core5.http.message.MessageSupport; +import org.apache.hc.core5.http.message.ParserCursor; +import org.apache.hc.core5.http.nio.AsyncDataConsumer; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.util.Args; + +@Contract(threading = ThreadingBehavior.STATELESS) +@Internal +public final class ContentCompressionAsyncExec implements AsyncExecChainHandler { + + private final Lookup> decoders; + private final Header acceptEncoding; + private final boolean ignoreUnknown; + + public ContentCompressionAsyncExec( + final LinkedHashMap> decoderMap, + final boolean ignoreUnknown) { + + Args.notEmpty(decoderMap, "Decoder map"); + + this.acceptEncoding = MessageSupport.headerOfTokens( + HttpHeaders.ACCEPT_ENCODING, new ArrayList<>(decoderMap.keySet())); + + final RegistryBuilder> rb = RegistryBuilder.create(); + decoderMap.forEach(rb::register); + this.decoders = rb.build(); + this.ignoreUnknown = ignoreUnknown; + } + + /** + * default = DEFLATE only + */ + public ContentCompressionAsyncExec() { + final LinkedHashMap> map = new LinkedHashMap<>(); + map.put(ContentCoding.DEFLATE.token(), + d -> new InflatingAsyncDataConsumer(d, null)); + this.acceptEncoding = MessageSupport.headerOfTokens( + HttpHeaders.ACCEPT_ENCODING, Collections.singletonList(ContentCoding.DEFLATE.token())); + this.decoders = RegistryBuilder.>create() + .register(ContentCoding.DEFLATE.token(), map.get(ContentCoding.DEFLATE.token())) + .build(); + this.ignoreUnknown = true; + } + + + @Override + public void execute( + final HttpRequest request, + final AsyncEntityProducer producer, + final AsyncExecChain.Scope scope, + final AsyncExecChain chain, + final AsyncExecCallback cb) throws IOException, HttpException { + + final HttpClientContext ctx = scope != null ? scope.clientContext : HttpClientContext.create(); + final boolean enabled = ctx.getRequestConfigOrDefault().isContentCompressionEnabled(); + + if (enabled && !request.containsHeader(HttpHeaders.ACCEPT_ENCODING)) { + request.addHeader(MessageSupport.headerOfTokens( + HttpHeaders.ACCEPT_ENCODING, Collections.singletonList("deflate"))); + } + + chain.proceed(request, producer, scope, new AsyncExecCallback() { + + @Override + public AsyncDataConsumer handleResponse(final HttpResponse rsp, + final EntityDetails details) + throws HttpException, IOException { + + if (!enabled) { + return cb.handleResponse(rsp, details); + } + + final String coding = details != null ? details.getContentEncoding() : null; + + EntityDetails patched = details; + if (coding != null) { + patched = wrapEntityDetails(details); + } + AsyncDataConsumer downstream = cb.handleResponse(rsp, patched); + + if (coding != null) { + final HeaderElement[] el = BasicHeaderValueParser.INSTANCE + .parseElements(coding, new ParserCursor(0, coding.length())); + for (int i = el.length - 1; i >= 0; i--) { + final String token = el[i].getName().toLowerCase(Locale.ROOT); + if ("identity".equals(token) || token.isEmpty()) { + continue; + } + final UnaryOperator op = decoders.lookup(token); + if (op != null) { + downstream = op.apply(downstream); + } else if (!ignoreUnknown) { + throw new HttpException("Unsupported Content-Encoding: " + token); + } + } + rsp.removeHeaders(HttpHeaders.CONTENT_ENCODING); + rsp.removeHeaders(HttpHeaders.CONTENT_LENGTH); + rsp.removeHeaders(HttpHeaders.CONTENT_MD5); + } + return downstream; + } + + @Override + public void handleInformationResponse(final HttpResponse r) + throws HttpException, IOException { + cb.handleInformationResponse(r); + } + + @Override + public void completed() { + cb.completed(); + } + + @Override + public void failed(final Exception ex) { + cb.failed(ex); + } + }); + } + + private static EntityDetails wrapEntityDetails(final EntityDetails original) { + return new EntityDetails() { + @Override + public long getContentLength() { + return -1; + } + + @Override + public String getContentType() { + return original.getContentType(); + } + + @Override + public String getContentEncoding() { + return null; + } + + @Override + public boolean isChunked() { + return true; + } + + @Override + public Set getTrailerNames() { + return original.getTrailerNames(); + } + }; + } +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java index bb73238e45..028604744e 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java @@ -33,10 +33,12 @@ import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ThreadFactory; import java.util.function.Function; +import java.util.function.UnaryOperator; import org.apache.hc.client5.http.AuthenticationStrategy; import org.apache.hc.client5.http.ConnectionKeepAliveStrategy; @@ -96,6 +98,7 @@ import org.apache.hc.core5.http.config.Lookup; import org.apache.hc.core5.http.config.NamedElementChain; import org.apache.hc.core5.http.config.RegistryBuilder; +import org.apache.hc.core5.http.nio.AsyncDataConsumer; import org.apache.hc.core5.http.nio.command.ShutdownCommand; import org.apache.hc.core5.http.protocol.DefaultHttpProcessor; import org.apache.hc.core5.http.protocol.HttpContext; @@ -259,6 +262,16 @@ private ExecInterceptorEntry( private ProxySelector proxySelector; + /** + * Maps {@code Content-Encoding} tokens to decoder factories in insertion order. + */ + private LinkedHashMap> contentDecoderMap; + + /** + * When {@code true} the client skips all transparent response decompression. + */ + private boolean contentCompressionDisabled; + public static HttpAsyncClientBuilder create() { return new HttpAsyncClientBuilder(); } @@ -845,6 +858,37 @@ public final HttpAsyncClientBuilder evictIdleConnections(final TimeValue maxIdle return this; } + + /** + * Replaces the current decoder registry with {@code contentDecoderMap}. + * + *

The map’s insertion order defines the {@code Accept-Encoding} + * preference list sent on every request.

+ * + * @param contentDecoderMap a non-empty {@link LinkedHashMap} whose keys are + * lower-case coding tokens and whose values create + * a fresh wrapper consumer for each response + * @return {@code this} builder instance + * @since 5.6 + */ + public HttpAsyncClientBuilder setContentDecoderMap( + final LinkedHashMap> contentDecoderMap) { + this.contentDecoderMap = contentDecoderMap; + return this; + } + + /** + * Disables transparent response decompression for the client produced by + * this builder. + * + * @return {@code this} builder instance + * @since 5.6 + */ + public HttpAsyncClientBuilder disableContentCompression() { + this.contentCompressionDisabled = true; + return this; + } + /** * Request exec chain customization and extension. *

@@ -1001,6 +1045,19 @@ public CloseableHttpAsyncClient build() { ChainElement.RETRY.name()); } + if (!contentCompressionDisabled) { + if (contentDecoderMap != null && !contentDecoderMap.isEmpty()) { + execChainDefinition.addFirst( + new ContentCompressionAsyncExec(contentDecoderMap, true), + ChainElement.COMPRESS.name()); + } else { + execChainDefinition.addFirst( + new ContentCompressionAsyncExec(), + ChainElement.COMPRESS.name()); + } + } + + HttpRoutePlanner routePlannerCopy = this.routePlanner; if (routePlannerCopy == null) { SchemePortResolver schemePortResolverCopy = this.schemePortResolver; diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/DeflatingAsyncEntityProducerTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/DeflatingAsyncEntityProducerTest.java new file mode 100644 index 0000000000..01c22a2ac3 --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/DeflatingAsyncEntityProducerTest.java @@ -0,0 +1,127 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.async.methods; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.zip.Inflater; + +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer; +import org.junit.jupiter.api.Test; + +class DeflatingAsyncEntityProducerTest { + + /** + * DataStreamChannel that accepts at most maxChunk bytes per write. + */ + private static final class ThrottledChannel implements DataStreamChannel { + private final ByteArrayOutputStream sink = new ByteArrayOutputStream(); + private final int maxChunk; + + ThrottledChannel(final int max) { + this.maxChunk = max; + } + + @Override + public void requestOutput() { + } + + @Override + public int write(final ByteBuffer src) { + final int len = Math.min(src.remaining(), maxChunk); + final byte[] tmp = new byte[len]; + src.get(tmp); + sink.write(tmp, 0, len); + return len; + } + + @Override + public void endStream() { + } + + @Override + public void endStream(final List trailers) { + } + + byte[] data() { + return sink.toByteArray(); + } + } + + private static String longText() { + final String seed = "lorem ipsum äëïöü – "; + final StringBuilder sb = new StringBuilder(seed.length() * 3000); + for (int i = 0; i < 3000; i++) { + sb.append(seed); + } + return sb.toString(); + } + + @Test + void roundTrip() throws Exception { + final String text = longText(); + + final AsyncEntityProducer raw = + new StringAsyncEntityProducer(text, ContentType.TEXT_PLAIN); + final DeflatingAsyncEntityProducer def = + new DeflatingAsyncEntityProducer(raw); + + final ThrottledChannel ch = new ThrottledChannel(1024); + + while (def.available() > 0) { + def.produce(ch); + } + + final byte[] compressed = ch.data(); + assertTrue(compressed.length > 0); + + // Inflate (raw DEFLATE) + final Inflater inflater = new Inflater(true); + inflater.setInput(compressed); + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final byte[] buf = new byte[8192]; + while (!inflater.finished()) { + final int n = inflater.inflate(buf); + if (n == 0 && inflater.needsInput()) break; + out.write(buf, 0, n); + } + inflater.end(); + + assertEquals(text, out.toString("UTF-8")); + assertEquals("deflate", def.getContentEncoding()); + assertTrue(def.isChunked()); + assertEquals(-1, def.getContentLength()); + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/InflatingAsyncEntityConsumerTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/InflatingAsyncEntityConsumerTest.java new file mode 100644 index 0000000000..fca30dc825 --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/InflatingAsyncEntityConsumerTest.java @@ -0,0 +1,210 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.async.methods; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.UnaryOperator; +import java.util.zip.Deflater; + +import org.apache.hc.client5.http.impl.async.ContentCompressionAsyncExec; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.nio.AsyncDataConsumer; +import org.apache.hc.core5.http.nio.AsyncEntityConsumer; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class InflatingAsyncDataConsumerTest { + + /** + * Collects bytes and decodes to UTF-8 once at the end. + */ + private static final class ByteCollector implements AsyncEntityConsumer { + + private final java.io.ByteArrayOutputStream buf = new java.io.ByteArrayOutputStream(); + private FutureCallback callback; + + @Override + public void streamStart(final EntityDetails ed, final FutureCallback cb) { + this.callback = cb; + } + + @Override + public void updateCapacity(final CapacityChannel c) { + } + + @Override + public void consume(final ByteBuffer src) { + final byte[] tmp = new byte[src.remaining()]; + src.get(tmp); + buf.write(tmp, 0, tmp.length); + } + + @Override + public void streamEnd(final List t) { + if (callback != null) { + callback.completed(getContent()); + } + } + + @Override + public void failed(final Exception cause) { + throw new RuntimeException(cause); + } + + @Override + public void releaseResources() { + } + + @Override + public String getContent() { + return new String(buf.toByteArray(), StandardCharsets.UTF_8); + } + } + + private static String ORIGINAL; + + @BeforeAll + static void buildText() { + final String seed = "Hello ✈ DEF-in-late 🎈! "; + final StringBuilder sb = new StringBuilder(seed.length() * 4000); + for (int i = 0; i < 4000; i++) { + sb.append(seed); + } + ORIGINAL = sb.toString(); + } + + private static byte[] compress(final boolean nowrap) { + final Deflater def = new Deflater(Deflater.DEFAULT_COMPRESSION, nowrap); + final byte[] src = ORIGINAL.getBytes(StandardCharsets.UTF_8); + def.setInput(src); + def.finish(); + + final byte[] buf = new byte[4096]; + final java.io.ByteArrayOutputStream out = new java.io.ByteArrayOutputStream(); + while (!def.finished()) { + final int n = def.deflate(buf); + out.write(buf, 0, n); + } + def.end(); + return out.toByteArray(); + } + + @Test + void inflateRawAndZlib() throws Exception { + + for (final boolean nowrap : new boolean[]{true, false}) { + + final byte[] compressed = compress(nowrap); + + final ByteCollector inner = new ByteCollector(); + final InflatingAsyncDataConsumer inflating = + new InflatingAsyncDataConsumer(inner, Boolean.valueOf(nowrap)); + + final CountDownLatch done = new CountDownLatch(1); + final FutureCallback cb = new FutureCallback() { + @Override + public void completed(final String result) { + done.countDown(); + } + + @Override + public void failed(final Exception ex) { + fail(ex); + } + + @Override + public void cancelled() { + fail("cancelled"); + } + }; + + /* minimal EntityDetails stub */ + final EntityDetails details = new EntityDetails() { + @Override + public long getContentLength() { + return compressed.length; + } + + @Override + public String getContentType() { + return "text/plain"; + } + + @Override + public String getContentEncoding() { + return "deflate"; + } + + @Override + public boolean isChunked() { + return false; + } + + @Override + public Set getTrailerNames() { + return new HashSet<>(); + } + }; + inner.streamStart(details, cb); + + for (int off = 0; off < compressed.length; off += 1024) { + final int n = Math.min(1024, compressed.length - off); + inflating.consume(ByteBuffer.wrap(compressed, off, n)); + } + inflating.streamEnd(Collections.emptyList()); + + assertTrue(done.await(2, TimeUnit.SECONDS), "callback timeout"); + assertEquals(ORIGINAL, inner.getContent(), + (nowrap ? "raw" : "zlib") + " inflate mismatch"); + } + } + + @Test + void unknownEncodingMapFlag() throws Exception { + final LinkedHashMap> map = new LinkedHashMap<>(); + map.put("deflate", d -> new InflatingAsyncDataConsumer(d, null)); + final ContentCompressionAsyncExec exec = new ContentCompressionAsyncExec(map, false); + assertNotNull(exec); + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientDeflateCompressionExample.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientDeflateCompressionExample.java new file mode 100644 index 0000000000..0c203b4281 --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientDeflateCompressionExample.java @@ -0,0 +1,105 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.examples; + +import java.util.concurrent.Future; + +import org.apache.hc.client5.http.async.methods.DeflatingAsyncEntityProducer; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer; +import org.apache.hc.core5.http.nio.support.BasicRequestProducer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; + +/** + * Demonstrates how to POST a JSON body compressed on-the-fly with + * {@link DeflatingAsyncEntityProducer}. The program talks to + * httpbin, which echoes your request, + * making it easy to verify that the {@code Content-Encoding: deflate} + * header was honoured. + * + *

Key take-aways:

+ *
    + *
  1. Wrap any existing {@code AsyncEntityProducer} in the compressor.
  2. + *
  3. The {@code Content-Encoding} header is added automatically by HttpClient if not set in the request.
  4. + *
  5. The producer is streaming: huge payloads are never fully buffered.
  6. + *
+ * + * @since 5.6 + */ +public class AsyncClientDeflateCompressionExample { + + public static void main(final String[] args) throws Exception { + try (final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault()) { + client.start(); + + final String json = "{\"msg\":\"hello deflated world\"}"; + final AsyncEntityProducer raw = + new StringAsyncEntityProducer(json, ContentType.APPLICATION_JSON); + + final AsyncEntityProducer deflated = new DeflatingAsyncEntityProducer(raw); + + final SimpleHttpRequest request = SimpleRequestBuilder + .post("https://httpbin.org/post") + .build(); + + final Future> future = client.execute( + /* works in every 5.x version ↓ */ + new BasicRequestProducer(request, deflated), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + new FutureCallback>() { + + @Override + public void completed(final Message result) { + System.out.println("HTTP " + result.getHead().getCode()); + System.out.println(result.getBody()); + } + + @Override + public void failed(final Exception cause) { + System.out.println(request + "->" + cause); + } + + @Override + public void cancelled() { + System.out.println(request + " cancelled"); + } + }); + + + future.get(); + } + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientInflateDecompressionExample.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientInflateDecompressionExample.java new file mode 100644 index 0000000000..c5eb1fd014 --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientInflateDecompressionExample.java @@ -0,0 +1,104 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.examples; + +import java.util.concurrent.Future; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; + +/** + * Demonstrates transparent response decompression added in + * HttpClient 5.6. The async builder inserts + * {@code ContentCompressionAsyncExec}, which + * automatically wraps whatever + * {@link org.apache.hc.core5.http.nio.AsyncDataConsumer} the user provides + * with an {@code InflatingAsyncDataConsumer} whenever the server sends + * {@code Content-Encoding: deflate}. + * + *

In the DEFLATE algorithm the server deflates (compresses) + * the payload; the client then inflates (decompresses) it. + * The example calls https://httpbin.org/deflate, + * whose response is deliberately compressed. Because decompression is now + * part of the execution pipeline, the inner + * {@link org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer} + * receives plain UTF-8 text and the program prints the readable JSON.

+ * + *

If you need to stream the inflated bytes into a file or a custom parser, + * just replace the {@code StringAsyncEntityConsumer} with your own + * {@code AsyncDataConsumer}; no further changes are required.

+ * + * @since 5.6 + */ + +public class AsyncClientInflateDecompressionExample { + + public static void main(final String[] args) throws Exception { + + /* The default builder now contains ContentCompressionAsyncExec, + so transparent decompression is automatic. */ + try (final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault()) { + client.start(); + + final SimpleHttpRequest request = + SimpleRequestBuilder.get("https://httpbin.org/deflate").build(); + + final Future> future = client.execute( + SimpleRequestProducer.create(request), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + new FutureCallback>() { + + @Override + public void completed(final Message result) { + System.out.println(request.getRequestUri() + + " -> " + result.getHead().getCode()); + System.out.println("Decompressed body:\n" + result.getBody()); + } + + @Override + public void failed(final Exception ex) { + System.err.println(request + "->" + ex); + } + + @Override + public void cancelled() { + System.out.println(request + " cancelled"); + } + }); + + future.get(); // wait for completion + } + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestContentCompressionAsyncExec.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestContentCompressionAsyncExec.java new file mode 100644 index 0000000000..88a01f0b70 --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestContentCompressionAsyncExec.java @@ -0,0 +1,188 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl.async; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.LinkedHashMap; +import java.util.function.UnaryOperator; + +import org.apache.hc.client5.http.HttpRoute; +import org.apache.hc.client5.http.async.AsyncExecCallback; +import org.apache.hc.client5.http.async.AsyncExecChain; +import org.apache.hc.client5.http.async.AsyncExecRuntime; +import org.apache.hc.client5.http.async.methods.InflatingAsyncDataConsumer; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.core5.concurrent.CancellableDependency; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Method; +import org.apache.hc.core5.http.message.BasicHttpRequest; +import org.apache.hc.core5.http.message.BasicHttpResponse; +import org.apache.hc.core5.http.nio.AsyncDataConsumer; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +class TestContentCompressionAsyncExec { + + @Mock + private AsyncExecChain execChain; + @Mock + private AsyncEntityProducer entityProducer; + @Mock + private AsyncExecCallback originalCb; + @Mock + private AsyncExecRuntime execRuntime; + @Mock + private CancellableDependency dependency; + + private HttpClientContext context; + private AsyncExecChain.Scope scope; + private ContentCompressionAsyncExec impl; + + @BeforeEach + void init() { + MockitoAnnotations.openMocks(this); + + final HttpHost target = new HttpHost("somehost", 80); + final HttpRequest req = new BasicHttpRequest(Method.GET, "/"); + context = HttpClientContext.create(); + + scope = new AsyncExecChain.Scope( + "test", + new HttpRoute(target), + req, + dependency, + context, + execRuntime); // 6-arg deprecated ctor + + impl = new ContentCompressionAsyncExec(); // default = deflate + } + + private AsyncExecCallback executeAndCapture(final HttpRequest request) throws Exception { + final ArgumentCaptor cap = ArgumentCaptor.forClass(AsyncExecCallback.class); + doNothing().when(execChain).proceed(eq(request), eq(entityProducer), eq(scope), cap.capture()); + impl.execute(request, entityProducer, scope, execChain, originalCb); + return cap.getValue(); + } + + @Test + void testAcceptEncodingAdded() throws Exception { + final HttpRequest request = new BasicHttpRequest(Method.GET, "/path"); + executeAndCapture(request); + assertTrue(request.containsHeader(HttpHeaders.ACCEPT_ENCODING)); + assertEquals("deflate", request.getFirstHeader(HttpHeaders.ACCEPT_ENCODING).getValue()); + } + + @Test + void testDeflateConsumerInserted() throws Exception { + final HttpRequest request = new BasicHttpRequest(Method.GET, "/"); + final AsyncExecCallback cb = executeAndCapture(request); + + final HttpResponse rsp = new BasicHttpResponse(200, "OK"); + final EntityDetails details = mock(EntityDetails.class); + when(details.getContentEncoding()).thenReturn("deflate"); + + final AsyncDataConsumer downstream = new StringAsyncEntityConsumer(); + when(originalCb.handleResponse(same(rsp), same(details))).thenReturn(downstream); + + final AsyncDataConsumer wrapped = cb.handleResponse(rsp, details); + + assertNotNull(wrapped); + assertTrue(wrapped instanceof InflatingAsyncDataConsumer); + assertFalse(rsp.containsHeader(HttpHeaders.CONTENT_ENCODING)); + } + + @Test + void testIdentityIsNoOp() throws Exception { + final HttpRequest request = new BasicHttpRequest(Method.GET, "/"); + final AsyncExecCallback cb = executeAndCapture(request); + + final HttpResponse rsp = new BasicHttpResponse(200, "OK"); + final EntityDetails details = mock(EntityDetails.class); + when(details.getContentEncoding()).thenReturn("identity"); + + final AsyncDataConsumer downstream = new StringAsyncEntityConsumer(); + /* accept any EntityDetails instance */ + when(originalCb.handleResponse(eq(rsp), any(EntityDetails.class))).thenReturn(downstream); + + assertSame(downstream, cb.handleResponse(rsp, details)); + } + + @Test + void testUnknownEncodingRejectedWhenFlagFalse() throws Exception { + final LinkedHashMap> map = new LinkedHashMap<>(); + map.put("deflate", new UnaryOperator() { + @Override + public AsyncDataConsumer apply(final AsyncDataConsumer d) { + return new InflatingAsyncDataConsumer(d, null); + } + }); + impl = new ContentCompressionAsyncExec(map, /*ignoreUnknown*/ false); + + final HttpRequest request = new BasicHttpRequest(Method.GET, "/"); + final AsyncExecCallback cb = executeAndCapture(request); + + final HttpResponse rsp = new BasicHttpResponse(200, "OK"); + final EntityDetails details = mock(EntityDetails.class); + when(details.getContentEncoding()).thenReturn("whatever"); + + assertThrows(HttpException.class, () -> cb.handleResponse(rsp, details)); + } + + @Test + void testCompressionDisabledViaRequestConfig() throws Exception { + context.setRequestConfig(RequestConfig.custom() + .setContentCompressionEnabled(false) + .build()); + final HttpRequest request = new BasicHttpRequest(Method.GET, "/"); + executeAndCapture(request); + + assertFalse(request.containsHeader(HttpHeaders.ACCEPT_ENCODING)); + } +} \ No newline at end of file