resultCb)
+ throws HttpException, IOException {
+
+ if (entityDetails.getContentEncoding() != null
+ && !"gzip".equalsIgnoreCase(entityDetails.getContentEncoding())) {
+ throw new HttpException("Unsupported content-encoding: "
+ + entityDetails.getContentEncoding());
+ }
+ this.cb = resultCb;
+ inner.streamStart(entityDetails, resultCb);
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel channel) throws IOException {
+ channel.update(Integer.MAX_VALUE);
+ inner.updateCapacity(channel);
+ }
+
+ @Override
+ public void consume(final ByteBuffer src) throws IOException {
+ try {
+ while (src.hasRemaining()) {
+
+ if (!headerDone) {
+ if (!parseHeader(src)) {
+ return; // need more bytes
+ }
+ continue; // fall through
+ }
+
+ final int n = src.remaining();
+ final byte[] chunk = new byte[n];
+ src.get(chunk);
+
+ inflater.setInput(chunk);
+ inflateLoop();
+
+ for (int i = Math.max(0, n - 8); i < n; i++) {
+ trailerBuf.add(chunk[i]);
+ if (trailerBuf.size() > 8) trailerBuf.remove();
+ }
+ }
+ } catch (final DataFormatException ex) {
+ throw new IOException("Corrupt GZIP stream", ex);
+ }
+ }
+
+ /**
+ * Incrementally parses the variable-length part of a GZIP header.
+ *
+ * The method consumes bytes from {@code src} until either the header is
+ * finished (returns {@code true}) or the buffer runs empty
+ * (returns {@code false}). It keeps state across calls so it can be fed
+ * arbitrarily-sized chunks straight from the I/O reactor.
+ *
+ * Handled sections, in order:
+ *
+ * - Fixed 10-byte header ID1, ID2, CM, FLG, MTIME, XFL, OS.
+ * - {@code FEXTRA} – extra field whose length is a little-endian 16-bit value.
+ * - {@code FNAME} – zero-terminated original file-name.
+ * - {@code FCOMMENT} – zero-terminated comment string.
+ * - {@code FHCRC} – CRC-16 over everything after the fixed
+ * 10-byte header (checked against {@link #hdrCrcCalc}).
+ *
+ *
+ * Every time a byte is consumed, {@link #updateHdrCrc(byte)} is called
+ * so that the running CRC-16 is maintained whenever {@code FHCRC} is set.
+ *
+ * @param src inbound buffer containing header bytes
+ * @return {@code true} once the entire header is parsed, otherwise
+ * {@code false} (caller must supply more data)
+ * @throws IllegalStateException if the header is malformed
+ */
+ private boolean parseHeader(final ByteBuffer src) {
+
+ while (src.hasRemaining() && !headerDone) {
+
+ /* 0----- consume the mandatory 10-byte fixed header */
+ if (hdrFixed.size() < 10) {
+ final byte b = src.get();
+ hdrFixed.add(b);
+ updateHdrCrc(b);
+
+ if (hdrFixed.size() == 10) {
+ final byte[] h = new byte[10];
+ int i = 0;
+ for (final Byte bb : hdrFixed) h[i++] = bb;
+
+ if (h[0] != (byte) 0x1F || h[1] != (byte) 0x8B || h[2] != 8) {
+ throw new IllegalStateException("Not a GZIP header");
+ }
+ flg = h[3] & 0xFF;
+ if ((flg & 0xE0) != 0) { // bits 5-7 reserved
+ throw new IllegalStateException("Reserved GZIP flag bits set: " + flg);
+ }
+ wantHdrCrc = (flg & FHCRC) != 0;
+ }
+ continue;
+ }
+
+ if ((flg & FEXTRA) != 0) {
+ if (extraRemaining < 0) { // read length (2 B LE)
+ if (src.remaining() < 2) {
+ return false;
+ }
+ final int lo = src.get() & 0xFF;
+ updateHdrCrc((byte) lo);
+ final int hi = src.get() & 0xFF;
+ updateHdrCrc((byte) hi);
+ extraRemaining = (hi << 8) | lo;
+ if (extraRemaining == 0) {
+ flg &= ~FEXTRA;
+ }
+ continue;
+ }
+ final int skip = Math.min(extraRemaining, src.remaining());
+ for (int i = 0; i < skip; i++) {
+ updateHdrCrc(src.get());
+ }
+ extraRemaining -= skip;
+ if (extraRemaining == 0) {
+ flg &= ~FEXTRA;
+ }
+ continue;
+ }
+
+ if ((flg & FNAME) != 0) {
+ while (src.hasRemaining()) {
+ final byte b = src.get();
+ updateHdrCrc(b);
+ if (b == 0) {
+ flg &= ~FNAME;
+ break;
+ }
+ }
+ continue;
+ }
+
+ if ((flg & FCOMMENT) != 0) {
+ while (src.hasRemaining()) {
+ final byte b = src.get();
+ updateHdrCrc(b);
+ if (b == 0) {
+ flg &= ~FCOMMENT;
+ break;
+ }
+ }
+ continue;
+ }
+
+ if (wantHdrCrc) {
+ if (src.remaining() < 2) {
+ return false;
+ }
+ final byte lo = src.get();
+ final byte hi = src.get();
+ final int expect = ((hi & 0xFF) << 8) | (lo & 0xFF);
+ if ((hdrCrcCalc & 0xFFFF) != expect) {
+ throw new IllegalStateException("Header CRC16 mismatch");
+ }
+ wantHdrCrc = false; // consumed
+ continue;
+ }
+
+ /* header complete */
+ headerDone = true;
+ }
+ return headerDone;
+ }
+
+ /**
+ * Updates the running CRC-16 used when the {@code FHCRC} flag is present.
+ *
+ * The polynomial is the “reverse” 0xA001 (the same as Modbus / CRC-16-IBM),
+ * which is exactly what RFC 1952 requires for the header checksum.
+ * The algorithm is intentionally implemented bit-by-bit so it needs no
+ * temporary tables and can run on any JVM version.
+ *
+ * @param b the header byte just consumed
+ */
+ private void updateHdrCrc(final byte b) {
+ if (!wantHdrCrc) {
+ return;
+ }
+ hdrCrcCalc ^= b & 0xFF;
+ for (int k = 0; k < 8; k++) {
+ hdrCrcCalc = (hdrCrcCalc & 1) != 0
+ ? (hdrCrcCalc >>> 1) ^ 0xA001
+ : hdrCrcCalc >>> 1;
+ }
+ }
+
+
+ /**
+ * Pulls as many plain bytes as currently available from {@link #inflater},
+ * streams them to the wrapped {@code inner} consumer, and updates the
+ * running CRC-32 / ISIZE counters needed for trailer validation.
+ *
+ * @throws DataFormatException if the underlying DEFLATE stream is corrupt
+ * @throws IOException if {@code inner.consume(...)} throws
+ */
+ private void inflateLoop() throws IOException, DataFormatException {
+ int n;
+ while ((n = inflater.inflate(out)) > 0) {
+ crc32.update(out, 0, n);
+ uncompressed += n;
+ inner.consume(ByteBuffer.wrap(out, 0, n));
+ }
+ }
+
+ /**
+ * Called once the upstream I/O reactor signals end-of-stream.
+ *
+ *
+ * - Drains any remaining compressed bytes (via {@link #inflateLoop()}).
+ * - Collects the eight-byte trailer from {@link #trailerBuf}.
+ * - Verifies CRC-32 and ISIZE against the values accumulated while
+ * inflating.
+ * - Propagates {@code streamEnd()} to the wrapped consumer and fires
+ * the user callback.
+ *
+ *
+ * @throws HttpException on protocol errors (e.g. wrong encoding)
+ * @throws IOException on corrupt streams or checksum mismatch
+ */
+ @Override
+ public void streamEnd(final List extends Header> trailers)
+ throws HttpException, IOException {
+
+ try {
+ inflateLoop();
+ } catch (final DataFormatException ex) {
+ throw new IOException("Corrupt GZIP stream", ex);
+ }
+ if (trailerBuf.size() != 8) {
+ throw new IOException("Truncated GZIP trailer");
+ }
+
+ final byte[] tail = new byte[8];
+ for (int i = 0; i < 8; i++) {
+ tail[i] = trailerBuf.remove();
+ }
+
+ final long crcVal = ((tail[3] & 0xFFL) << 24) | ((tail[2] & 0xFFL) << 16)
+ | ((tail[1] & 0xFFL) << 8) | (tail[0] & 0xFFL);
+ final long isz = ((tail[7] & 0xFFL) << 24) | ((tail[6] & 0xFFL) << 16)
+ | ((tail[5] & 0xFFL) << 8) | (tail[4] & 0xFFL);
+
+ if (crcVal != crc32.getValue()) {
+ throw new IOException("CRC-32 mismatch");
+ }
+ if (isz != (uncompressed & 0xFFFFFFFFL)) {
+ throw new IOException("ISIZE mismatch");
+ }
+
+ inner.streamEnd(trailers);
+ completed.set(true);
+ if (cb != null) cb.completed(inner.getContent());
+ }
+
+ @Override
+ public T getContent() {
+ return inner.getContent();
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ if (completed.compareAndSet(false, true) && cb != null) {
+ cb.failed(cause);
+ }
+ inner.failed(cause);
+ }
+
+ @Override
+ public void releaseResources() {
+ inflater.end();
+ inner.releaseResources();
+ }
+}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityProducer.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityProducer.java
new file mode 100644
index 0000000000..d6e1a71774
--- /dev/null
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityProducer.java
@@ -0,0 +1,266 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.async.methods;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.CRC32;
+import java.util.zip.Deflater;
+
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * Streams the output of another {@link AsyncEntityProducer} through GZIP
+ * compression without blocking I/O classes. A minimal 10-byte header is
+ * emitted first, deflate blocks follow, then an 8-byte trailer carrying the
+ * CRC-32 and the uncompressed byte count.
+ *
+ * The producer honours back-pressure: if {@code DataStreamChannel.write()}
+ * returns 0, compression stops until the reactor calls {@code requestOutput()}
+ * again.
+ *
+ * @since 5.6
+ */
+public final class GzipAsyncEntityProducer implements AsyncEntityProducer {
+
+ private static final int IN_BUF = 8 * 1024;
+ private static final int OUT_BUF = 8 * 1024;
+
+ private static final byte[] GZIP_HEADER = {
+ (byte) 0x1F, (byte) 0x8B, // ID1-ID2
+ 8, // CM = deflate
+ 0, // FLG = no extras
+ 0, 0, 0, 0, // MTIME = 0
+ 0, // XFL
+ (byte) 255 // OS = unknown
+ };
+
+ private final AsyncEntityProducer delegate;
+ private final ContentType contentType;
+
+ private final Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
+ private final CRC32 crc = new CRC32();
+
+ private final byte[] inBuf = new byte[IN_BUF];
+ private final byte[] outBuf = new byte[OUT_BUF];
+ private final ByteBuffer pending = ByteBuffer.allocate(OUT_BUF * 2);
+
+ private boolean headerSent = false;
+ private boolean trailerSent = false;
+ private long uncompressed = 0;
+ private final AtomicBoolean delegateEnded = new AtomicBoolean(false);
+
+ public GzipAsyncEntityProducer(final AsyncEntityProducer delegate) {
+ this.delegate = Args.notNull(delegate, "delegate");
+ this.contentType = ContentType.parse(delegate.getContentType());
+ pending.flip(); // empty read-mode
+ }
+
+
+ @Override
+ public boolean isRepeatable() {
+ return false;
+ }
+
+ @Override
+ public long getContentLength() {
+ return -1;
+ }
+
+ @Override
+ public String getContentType() {
+ return contentType.toString();
+ }
+
+ @Override
+ public String getContentEncoding() {
+ return "gzip";
+ }
+
+ @Override
+ public boolean isChunked() {
+ return true;
+ }
+
+ @Override
+ public Set getTrailerNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public int available() {
+ return pending.hasRemaining() ? pending.remaining() : delegate.available();
+ }
+
+ @Override
+ public void produce(final DataStreamChannel ch) throws IOException {
+
+ if (flushPending(ch)) {
+ return;
+ }
+
+ if (!headerSent) {
+ writeBytes(GZIP_HEADER);
+ headerSent = true;
+ if (flushPending(ch)) {
+ return;
+ }
+ }
+
+ delegate.produce(new InnerChannel(ch));
+
+ if (delegateEnded.get() && !trailerSent) {
+
+ deflater.finish();
+
+ while (!deflater.finished()) {
+ deflateToPending(Deflater.NO_FLUSH);
+ if (flushPending(ch)) {
+ return;
+ }
+ }
+ writeTrailer();
+ trailerSent = true;
+ flushPending(ch);
+
+ if (!pending.hasRemaining()) {
+ ch.endStream();
+ }
+ }
+ }
+
+
+ private final class InnerChannel implements DataStreamChannel {
+ private final DataStreamChannel outer;
+
+ InnerChannel(final DataStreamChannel outer) {
+ this.outer = outer;
+ }
+
+ @Override
+ public void requestOutput() {
+ outer.requestOutput();
+ }
+
+ @Override
+ public int write(final ByteBuffer src) throws IOException {
+ int consumed = 0;
+ while (src.hasRemaining()) {
+ final int len = Math.min(src.remaining(), inBuf.length);
+ src.get(inBuf, 0, len);
+ crc.update(inBuf, 0, len);
+ uncompressed += len;
+
+ deflater.setInput(inBuf, 0, len);
+ /* SYNC_FLUSH closes the current block so no stray bits linger */
+ deflateToPending(Deflater.SYNC_FLUSH);
+
+ if (flushPending(outer)) {
+ return consumed + len;
+ }
+ consumed += len;
+ }
+ return consumed;
+ }
+
+ @Override
+ public void endStream() {
+ delegateEnded.set(true);
+ }
+
+ @Override
+ public void endStream(final List extends Header> t) {
+ endStream();
+ }
+ }
+
+
+ private void deflateToPending(final int flushMode) {
+ pending.compact();
+ while (true) {
+ final int n = deflater.deflate(outBuf, 0, outBuf.length, flushMode);
+ if (n == 0) {
+ break;
+ }
+ pending.put(outBuf, 0, n);
+ }
+ pending.flip();
+ }
+
+ private void writeBytes(final byte[] src) {
+ pending.compact();
+ pending.put(src);
+ pending.flip();
+ }
+
+ private void writeTrailer() {
+ pending.compact();
+ writeIntLE((int) crc.getValue());
+ writeIntLE((int) (uncompressed & 0xFFFFFFFFL));
+ pending.flip();
+ }
+
+ private void writeIntLE(final int v) {
+ pending.put((byte) v);
+ pending.put((byte) (v >> 8));
+ pending.put((byte) (v >> 16));
+ pending.put((byte) (v >> 24));
+ }
+
+ /**
+ * @return {@code true} if transport is full and caller must stop.
+ */
+ private boolean flushPending(final DataStreamChannel ch) throws IOException {
+ while (pending.hasRemaining()) {
+ if (ch.write(pending) == 0) {
+ return true; // back-pressure
+ }
+ }
+ return false;
+ }
+
+ /* ---------- boiler-plate ---------- */
+
+ @Override
+ public void failed(final Exception ex) {
+ delegate.failed(ex);
+ }
+
+ @Override
+ public void releaseResources() {
+ delegate.releaseResources();
+ deflater.end();
+ }
+}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityConsumerTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityConsumerTest.java
new file mode 100644
index 0000000000..f5235c4597
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityConsumerTest.java
@@ -0,0 +1,290 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.async.methods;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.CRC32;
+import java.util.zip.Deflater;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+class GzipAsyncEntityConsumerTest {
+
+ private static String ORIGINAL;
+
+ @BeforeAll
+ static void build() {
+ final String seed = "inflate me 🎉 ";
+ final StringBuilder sb = new StringBuilder(seed.length() * 3000);
+ for (int i = 0; i < 3000; i++) {
+ sb.append(seed);
+ }
+ ORIGINAL = sb.toString();
+ }
+
+ private static byte[] gzip(final byte[] data) throws Exception {
+ final java.io.ByteArrayOutputStream out = new java.io.ByteArrayOutputStream();
+ final GZIPOutputStream gos = new GZIPOutputStream(out);
+ gos.write(data);
+ gos.close();
+ return out.toByteArray();
+ }
+
+ /**
+ * Build a gzip frame with optional FEXTRA / FNAME / FCOMMENT / FHCRC flags.
+ */
+ private static byte[] customGzip(final byte[] data,
+ final boolean extra,
+ final boolean name,
+ final boolean comment,
+ final boolean hcrc) throws Exception {
+
+ /* --------- compress payload (raw deflate) --------- */
+ final Deflater def = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
+ def.setInput(data);
+ def.finish();
+ final byte[] buf = new byte[8192];
+ final java.io.ByteArrayOutputStream deflated = new java.io.ByteArrayOutputStream();
+ while (!def.finished()) {
+ final int n = def.deflate(buf);
+ deflated.write(buf, 0, n);
+ }
+ def.end();
+ final byte[] deflatedBytes = deflated.toByteArray();
+
+ /* --------- construct header --------- */
+ final java.io.ByteArrayOutputStream out = new java.io.ByteArrayOutputStream();
+ final CRC32 crc32 = new CRC32();
+ crc32.update(data, 0, data.length); // trailer CRC-32
+
+ final java.io.ByteArrayOutputStream hdr = new java.io.ByteArrayOutputStream();
+ hdr.write(0x1F);
+ hdr.write(0x8B); // ID1, ID2
+ hdr.write(8); // CM = deflate
+ int flg = 0;
+ if (extra) {
+ flg |= 4;
+ }
+ if (name) {
+ flg |= 8;
+ }
+ if (comment) {
+ flg |= 16;
+ }
+ if (hcrc) {
+ flg |= 2;
+ }
+ hdr.write(flg); // FLG
+ hdr.write(new byte[]{0, 0, 0, 0}); // MTIME
+ hdr.write(0); // XFL
+ hdr.write(0xFF); // OS
+
+ if (extra) {
+ hdr.write(4);
+ hdr.write(0); // XLEN = 4
+ hdr.write(new byte[]{1, 2, 3, 4});
+ }
+ if (name) {
+ hdr.write("file.txt".getBytes(StandardCharsets.ISO_8859_1));
+ hdr.write(0);
+ }
+ if (comment) {
+ hdr.write("comment".getBytes(StandardCharsets.ISO_8859_1));
+ hdr.write(0);
+ }
+ byte[] hdrBytes = hdr.toByteArray();
+
+ if (hcrc) {
+ /* ---------- CRC-16 over *optional* part only (bytes 10 .. n-1) ---------- */
+ int crc16 = 0;
+ for (int i = 10; i < hdrBytes.length; i++) { // skip fixed 10-byte header
+ final int b = hdrBytes[i] & 0xFF;
+ crc16 ^= b;
+ for (int k = 0; k < 8; k++) {
+ crc16 = (crc16 & 1) != 0 ? (crc16 >>> 1) ^ 0xA001 : (crc16 >>> 1);
+ }
+ }
+ hdr.write(crc16 & 0xFF);
+ hdr.write((crc16 >>> 8) & 0xFF);
+ hdrBytes = hdr.toByteArray(); // final header incl. CRC-16
+ }
+
+ out.write(hdrBytes);
+ out.write(deflatedBytes);
+
+ /* --------- trailer --------- */
+ writeIntLE(out, (int) crc32.getValue());
+ writeIntLE(out, data.length);
+
+ return out.toByteArray();
+ }
+
+ private static void writeIntLE(final java.io.ByteArrayOutputStream out, final int v) {
+ out.write(v);
+ out.write(v >>> 8);
+ out.write(v >>> 16);
+ out.write(v >>> 24);
+ }
+
+ private static class BytesCollector implements AsyncEntityConsumer {
+ private final java.io.ByteArrayOutputStream buf = new java.io.ByteArrayOutputStream();
+
+ @Override
+ public void streamStart(final EntityDetails ed, final FutureCallback cb) {
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel c) {
+ }
+
+ @Override
+ public void consume(final ByteBuffer src) {
+ final byte[] b = new byte[src.remaining()];
+ src.get(b);
+ buf.write(b, 0, b.length);
+ }
+
+ @Override
+ public void streamEnd(final List extends Header> t) {
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ throw new RuntimeException(cause);
+ }
+
+ @Override
+ public void releaseResources() {
+ }
+
+ @Override
+ public String getContent() {
+ return new String(buf.toByteArray(), StandardCharsets.UTF_8);
+ }
+ }
+
+ private static void runInflateTest(final byte[] wire) throws Exception {
+ final BytesCollector inner = new BytesCollector();
+ final GzipAsyncEntityConsumer inflating = new GzipAsyncEntityConsumer<>(inner);
+
+ final CountDownLatch done = new CountDownLatch(1);
+ final FutureCallback cb = new FutureCallback() {
+ @Override
+ public void completed(final String r) {
+ done.countDown();
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ fail(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ fail("cancelled");
+ }
+ };
+
+ inflating.streamStart(new EntityDetails() {
+ public long getContentLength() {
+ return wire.length;
+ }
+
+ public String getContentType() {
+ return "text/plain";
+ }
+
+ public String getContentEncoding() {
+ return "gzip";
+ }
+
+ public boolean isChunked() {
+ return false;
+ }
+
+ public Set getTrailerNames() {
+ return new HashSet<>();
+ }
+ }, cb);
+
+ for (int off = 0; off < wire.length; off += 777) {
+ final int n = Math.min(777, wire.length - off);
+ inflating.consume(ByteBuffer.wrap(wire, off, n));
+ }
+ inflating.streamEnd(Collections.emptyList());
+
+ assertTrue(done.await(2, TimeUnit.SECONDS));
+ assertEquals(ORIGINAL, inner.getContent());
+ }
+
+ @Test
+ void fullInflate() throws Exception {
+ runInflateTest(gzip(ORIGINAL.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ @Test
+ void headerExtra() throws Exception {
+ runInflateTest(customGzip(ORIGINAL.getBytes(StandardCharsets.UTF_8),
+ true, false, false, false));
+ }
+
+ @Test
+ void headerName() throws Exception {
+ runInflateTest(customGzip(ORIGINAL.getBytes(StandardCharsets.UTF_8),
+ false, true, false, false));
+ }
+
+ @Test
+ void headerComment() throws Exception {
+ runInflateTest(customGzip(ORIGINAL.getBytes(StandardCharsets.UTF_8),
+ false, false, true, false));
+ }
+
+ @Test
+ void headerCrc16() throws Exception {
+ runInflateTest(customGzip(ORIGINAL.getBytes(StandardCharsets.UTF_8),
+ true, true, true, true));
+ }
+}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityProducerTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityProducerTest.java
new file mode 100644
index 0000000000..0aa1d6c142
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityProducerTest.java
@@ -0,0 +1,125 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+package org.apache.hc.client5.http.async.methods;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
+import org.junit.jupiter.api.Test;
+
+class GzipAsyncEntityProducerTest {
+
+ private static final class CollectingChannel implements DataStreamChannel {
+ private final ByteArrayOutputStream sink = new ByteArrayOutputStream();
+ private final int maxChunk;
+
+ CollectingChannel(final int maxChunk) {
+ this.maxChunk = maxChunk;
+ }
+
+ @Override
+ public void requestOutput() { /* not used in unit test */ }
+
+ @Override
+ public int write(final ByteBuffer src) {
+ final int len = Math.min(src.remaining(), maxChunk);
+ final byte[] tmp = new byte[len];
+ src.get(tmp);
+ sink.write(tmp, 0, len);
+ return len;
+ }
+
+ @Override
+ public void endStream() {
+ }
+
+ @Override
+ public void endStream(final List extends Header> trailers) {
+ }
+
+ byte[] toByteArray() {
+ return sink.toByteArray();
+ }
+ }
+
+ private static String buildLargeText() {
+ final String unit = "GZIP round-trip ✓ ";
+ final StringBuilder sb = new StringBuilder(unit.length() * 2000);
+ for (int i = 0; i < 2000; i++) {
+ sb.append(unit);
+ }
+ return sb.toString();
+ }
+
+ @Test
+ void roundTrip() throws Exception {
+
+ final String original = buildLargeText();
+
+ final AsyncEntityProducer raw =
+ new StringAsyncEntityProducer(original, ContentType.TEXT_PLAIN);
+ final GzipAsyncEntityProducer gzip =
+ new GzipAsyncEntityProducer(raw);
+
+ final CollectingChannel channel = new CollectingChannel(1024);
+
+ /* drive the producer until it reports no more data */
+ while (gzip.available() > 0) {
+ gzip.produce(channel);
+ }
+
+ final byte[] wire = channel.toByteArray();
+ assertTrue(wire.length > 0, "producer wrote no data");
+
+ /* inflate using JDK's GZIPInputStream to verify correctness */
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final byte[] buf = new byte[4096];
+ final GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream(wire));
+ int n;
+ while ((n = gis.read(buf)) != -1) {
+ out.write(buf, 0, n);
+ }
+ gis.close();
+
+ final String roundTrip = out.toString(StandardCharsets.UTF_8.name());
+ assertEquals(original, roundTrip, "round-tripped text differs");
+ assertEquals("gzip", gzip.getContentEncoding(), "wrong Content-Encoding");
+ assertTrue(gzip.isChunked(), "producer must be chunked");
+ }
+}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientGzipCompressionExample.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientGzipCompressionExample.java
new file mode 100644
index 0000000000..557f9ee57c
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientGzipCompressionExample.java
@@ -0,0 +1,86 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+
+package org.apache.hc.client5.http.examples;
+
+import java.util.concurrent.Future;
+
+import org.apache.hc.client5.http.async.methods.GzipAsyncEntityProducer;
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
+import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
+import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
+import org.apache.hc.core5.io.CloseMode;
+
+/**
+ * POST a JSON body compressed on-the-fly with GZIP.
+ */
+public class AsyncClientGzipCompressionExample {
+
+ public static void main(final String[] args) throws Exception {
+ try (final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault()) {
+ client.start();
+
+ final String payload = "{\"msg\":\"hello gzip world\"}";
+ final AsyncEntityProducer raw = new StringAsyncEntityProducer(payload, ContentType.APPLICATION_JSON);
+ final AsyncEntityProducer gzip = new GzipAsyncEntityProducer(raw);
+
+ final SimpleHttpRequest request = SimpleRequestBuilder
+ .post("https://httpbin.org/post")
+ .addHeader("Content-Encoding", "gzip")
+ .build();
+
+ final Future> future = client.execute(
+ new BasicRequestProducer(request, gzip),
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
+ new FutureCallback>() {
+ public void completed(final Message msg) {
+ System.out.println(msg.getHead().getCode() + " " + msg.getBody());
+ }
+
+ public void failed(final Exception cause) {
+ System.out.println(request + "->" + cause);
+ }
+
+ public void cancelled() {
+ System.out.println(request + " cancelled");
+ }
+ });
+ future.get();
+ client.close(CloseMode.GRACEFUL);
+ }
+ }
+}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientGzipDecompressionExample.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientGzipDecompressionExample.java
new file mode 100644
index 0000000000..37b26545f3
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientGzipDecompressionExample.java
@@ -0,0 +1,79 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * .
+ *
+ */
+
+package org.apache.hc.client5.http.examples;
+
+import java.util.concurrent.Future;
+
+import org.apache.hc.client5.http.async.methods.GzipAsyncEntityConsumer;
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
+import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
+
+/**
+ * GET /gzip from httpbin and inflate on the fly.
+ */
+public class AsyncClientGzipDecompressionExample {
+
+ public static void main(final String[] args) throws Exception {
+ try (final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault()) {
+ client.start();
+
+ final SimpleHttpRequest request =
+ SimpleRequestBuilder.get("https://httpbin.org/gzip").build();
+
+ final GzipAsyncEntityConsumer unzip =
+ new GzipAsyncEntityConsumer<>(new StringAsyncEntityConsumer());
+
+ final Future> future = client.execute(
+ SimpleRequestProducer.create(request),
+ new BasicResponseConsumer<>(unzip),
+ new FutureCallback>() {
+ public void completed(final Message msg) {
+ System.out.println(msg.getHead().getCode() + " OK");
+ System.out.println("inflated len: " + msg.getBody().length());
+ }
+
+ public void failed(final Exception cause) {
+ System.out.println(request + "->" + cause);
+ }
+
+ public void cancelled() {
+ System.out.println(request + " cancelled");
+ }
+ });
+ future.get();
+ }
+ }
+}