From e94872f2d8211a068ca94b5d943bc8d9ffc5ae19 Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Fri, 25 Jul 2025 18:27:20 +0200 Subject: [PATCH] Add fully-streaming async GZIP producer/consumer --- .../methods/GzipAsyncEntityConsumer.java | 379 ++++++++++++++++++ .../methods/GzipAsyncEntityProducer.java | 266 ++++++++++++ .../methods/GzipAsyncEntityConsumerTest.java | 290 ++++++++++++++ .../methods/GzipAsyncEntityProducerTest.java | 125 ++++++ .../AsyncClientGzipCompressionExample.java | 86 ++++ .../AsyncClientGzipDecompressionExample.java | 79 ++++ 6 files changed, 1225 insertions(+) create mode 100644 httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityConsumer.java create mode 100644 httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityProducer.java create mode 100644 httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityConsumerTest.java create mode 100644 httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityProducerTest.java create mode 100644 httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientGzipCompressionExample.java create mode 100644 httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientGzipDecompressionExample.java diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityConsumer.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityConsumer.java new file mode 100644 index 0000000000..fc670d02f6 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityConsumer.java @@ -0,0 +1,379 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.async.methods; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.zip.CRC32; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; + +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.nio.AsyncEntityConsumer; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.util.Args; + +/** + * Streaming, non-blocking consumer for responses encoded with + * {@code Content-Encoding: gzip}. + * + *

The implementation follows the public GZIP specification:

+ *
    + *
  1. Verifies the fixed 10-byte header (ID1 0x1F, ID2 0x8B, CM 8).
  2. + *
  3. Parses / skips optional sections signalled by the FLG bits
    + * (FEXTRA, FNAME, FCOMMENT, FHCRC, FTEXT).
  4. + *
  5. Inflates the raw DEFLATE blocks (nowrap=true) while + * streaming the plain bytes to an inner consumer.
  6. + *
  7. Collects the 8-byte trailer (CRC-32 & ISIZE) and validates it + * on {@link #streamEnd(List)}.
  8. + *
+ * + * @param result type produced by the wrapped inner consumer + * @since 5.6 + */ +public final class GzipAsyncEntityConsumer implements AsyncEntityConsumer { + + private static final int FTEXT = 1; // not used, informative only + private static final int FHCRC = 2; + private static final int FEXTRA = 4; + private static final int FNAME = 8; + private static final int FCOMMENT = 16; + + private static final int OUT_BUF = 8 * 1024; // inflate buffer + + private final AsyncEntityConsumer inner; + private final Inflater inflater = new Inflater(true); // raw DEFLATE blocks + private final CRC32 crc32 = new CRC32(); + + private final byte[] out = new byte[OUT_BUF]; + private final Queue trailerBuf = new ArrayDeque(8); + + private final Queue hdrFixed = new ArrayDeque(10); + private int flg = 0; + private int extraRemaining = -1; + private boolean wantHdrCrc = false; + private int hdrCrcCalc = 0; // incremental CRC-16 + + private boolean headerDone = false; + private long uncompressed = 0; + + /* ---------- completion plumbing ---------- */ + private FutureCallback cb; + private final AtomicBoolean completed = new AtomicBoolean(false); + + public GzipAsyncEntityConsumer(final AsyncEntityConsumer inner) { + this.inner = Args.notNull(inner, "inner"); + } + + @Override + public void streamStart(final EntityDetails entityDetails, + final FutureCallback resultCb) + throws HttpException, IOException { + + if (entityDetails.getContentEncoding() != null + && !"gzip".equalsIgnoreCase(entityDetails.getContentEncoding())) { + throw new HttpException("Unsupported content-encoding: " + + entityDetails.getContentEncoding()); + } + this.cb = resultCb; + inner.streamStart(entityDetails, resultCb); + } + + @Override + public void updateCapacity(final CapacityChannel channel) throws IOException { + channel.update(Integer.MAX_VALUE); + inner.updateCapacity(channel); + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + try { + while (src.hasRemaining()) { + + if (!headerDone) { + if (!parseHeader(src)) { + return; // need more bytes + } + continue; // fall through + } + + final int n = src.remaining(); + final byte[] chunk = new byte[n]; + src.get(chunk); + + inflater.setInput(chunk); + inflateLoop(); + + for (int i = Math.max(0, n - 8); i < n; i++) { + trailerBuf.add(chunk[i]); + if (trailerBuf.size() > 8) trailerBuf.remove(); + } + } + } catch (final DataFormatException ex) { + throw new IOException("Corrupt GZIP stream", ex); + } + } + + /** + * Incrementally parses the variable-length part of a GZIP header. + * + *

The method consumes bytes from {@code src} until either the header is + * finished (returns {@code true}) or the buffer runs empty + * (returns {@code false}). It keeps state across calls so it can be fed + * arbitrarily-sized chunks straight from the I/O reactor.

+ * + *

Handled sections, in order:

+ *
    + *
  • Fixed 10-byte header  ID1, ID2, CM, FLG, MTIME, XFL, OS.
  • + *
  • {@code FEXTRA} – extra field whose length is a little-endian 16-bit value.
  • + *
  • {@code FNAME} – zero-terminated original file-name.
  • + *
  • {@code FCOMMENT} – zero-terminated comment string.
  • + *
  • {@code FHCRC} – CRC-16 over everything after the fixed + * 10-byte header (checked against {@link #hdrCrcCalc}).
  • + *
+ * + *

Every time a byte is consumed, {@link #updateHdrCrc(byte)} is called + * so that the running CRC-16 is maintained whenever {@code FHCRC} is set.

+ * + * @param src inbound buffer containing header bytes + * @return {@code true} once the entire header is parsed, otherwise + * {@code false} (caller must supply more data) + * @throws IllegalStateException if the header is malformed + */ + private boolean parseHeader(final ByteBuffer src) { + + while (src.hasRemaining() && !headerDone) { + + /* 0----- consume the mandatory 10-byte fixed header */ + if (hdrFixed.size() < 10) { + final byte b = src.get(); + hdrFixed.add(b); + updateHdrCrc(b); + + if (hdrFixed.size() == 10) { + final byte[] h = new byte[10]; + int i = 0; + for (final Byte bb : hdrFixed) h[i++] = bb; + + if (h[0] != (byte) 0x1F || h[1] != (byte) 0x8B || h[2] != 8) { + throw new IllegalStateException("Not a GZIP header"); + } + flg = h[3] & 0xFF; + if ((flg & 0xE0) != 0) { // bits 5-7 reserved + throw new IllegalStateException("Reserved GZIP flag bits set: " + flg); + } + wantHdrCrc = (flg & FHCRC) != 0; + } + continue; + } + + if ((flg & FEXTRA) != 0) { + if (extraRemaining < 0) { // read length (2 B LE) + if (src.remaining() < 2) { + return false; + } + final int lo = src.get() & 0xFF; + updateHdrCrc((byte) lo); + final int hi = src.get() & 0xFF; + updateHdrCrc((byte) hi); + extraRemaining = (hi << 8) | lo; + if (extraRemaining == 0) { + flg &= ~FEXTRA; + } + continue; + } + final int skip = Math.min(extraRemaining, src.remaining()); + for (int i = 0; i < skip; i++) { + updateHdrCrc(src.get()); + } + extraRemaining -= skip; + if (extraRemaining == 0) { + flg &= ~FEXTRA; + } + continue; + } + + if ((flg & FNAME) != 0) { + while (src.hasRemaining()) { + final byte b = src.get(); + updateHdrCrc(b); + if (b == 0) { + flg &= ~FNAME; + break; + } + } + continue; + } + + if ((flg & FCOMMENT) != 0) { + while (src.hasRemaining()) { + final byte b = src.get(); + updateHdrCrc(b); + if (b == 0) { + flg &= ~FCOMMENT; + break; + } + } + continue; + } + + if (wantHdrCrc) { + if (src.remaining() < 2) { + return false; + } + final byte lo = src.get(); + final byte hi = src.get(); + final int expect = ((hi & 0xFF) << 8) | (lo & 0xFF); + if ((hdrCrcCalc & 0xFFFF) != expect) { + throw new IllegalStateException("Header CRC16 mismatch"); + } + wantHdrCrc = false; // consumed + continue; + } + + /* header complete */ + headerDone = true; + } + return headerDone; + } + + /** + * Updates the running CRC-16 used when the {@code FHCRC} flag is present. + * + *

The polynomial is the “reverse” 0xA001 (the same as Modbus / CRC-16-IBM), + * which is exactly what RFC 1952 requires for the header checksum. + * The algorithm is intentionally implemented bit-by-bit so it needs no + * temporary tables and can run on any JVM version.

+ * + * @param b the header byte just consumed + */ + private void updateHdrCrc(final byte b) { + if (!wantHdrCrc) { + return; + } + hdrCrcCalc ^= b & 0xFF; + for (int k = 0; k < 8; k++) { + hdrCrcCalc = (hdrCrcCalc & 1) != 0 + ? (hdrCrcCalc >>> 1) ^ 0xA001 + : hdrCrcCalc >>> 1; + } + } + + + /** + * Pulls as many plain bytes as currently available from {@link #inflater}, + * streams them to the wrapped {@code inner} consumer, and updates the + * running CRC-32 / ISIZE counters needed for trailer validation. + * + * @throws DataFormatException if the underlying DEFLATE stream is corrupt + * @throws IOException if {@code inner.consume(...)} throws + */ + private void inflateLoop() throws IOException, DataFormatException { + int n; + while ((n = inflater.inflate(out)) > 0) { + crc32.update(out, 0, n); + uncompressed += n; + inner.consume(ByteBuffer.wrap(out, 0, n)); + } + } + + /** + * Called once the upstream I/O reactor signals end-of-stream. + * + *
    + *
  1. Drains any remaining compressed bytes (via {@link #inflateLoop()}).
  2. + *
  3. Collects the eight-byte trailer from {@link #trailerBuf}.
  4. + *
  5. Verifies CRC-32 and ISIZE against the values accumulated while + * inflating.
  6. + *
  7. Propagates {@code streamEnd()} to the wrapped consumer and fires + * the user callback.
  8. + *
+ * + * @throws HttpException on protocol errors (e.g. wrong encoding) + * @throws IOException on corrupt streams or checksum mismatch + */ + @Override + public void streamEnd(final List trailers) + throws HttpException, IOException { + + try { + inflateLoop(); + } catch (final DataFormatException ex) { + throw new IOException("Corrupt GZIP stream", ex); + } + if (trailerBuf.size() != 8) { + throw new IOException("Truncated GZIP trailer"); + } + + final byte[] tail = new byte[8]; + for (int i = 0; i < 8; i++) { + tail[i] = trailerBuf.remove(); + } + + final long crcVal = ((tail[3] & 0xFFL) << 24) | ((tail[2] & 0xFFL) << 16) + | ((tail[1] & 0xFFL) << 8) | (tail[0] & 0xFFL); + final long isz = ((tail[7] & 0xFFL) << 24) | ((tail[6] & 0xFFL) << 16) + | ((tail[5] & 0xFFL) << 8) | (tail[4] & 0xFFL); + + if (crcVal != crc32.getValue()) { + throw new IOException("CRC-32 mismatch"); + } + if (isz != (uncompressed & 0xFFFFFFFFL)) { + throw new IOException("ISIZE mismatch"); + } + + inner.streamEnd(trailers); + completed.set(true); + if (cb != null) cb.completed(inner.getContent()); + } + + @Override + public T getContent() { + return inner.getContent(); + } + + @Override + public void failed(final Exception cause) { + if (completed.compareAndSet(false, true) && cb != null) { + cb.failed(cause); + } + inner.failed(cause); + } + + @Override + public void releaseResources() { + inflater.end(); + inner.releaseResources(); + } +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityProducer.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityProducer.java new file mode 100644 index 0000000000..d6e1a71774 --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityProducer.java @@ -0,0 +1,266 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.async.methods; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.zip.CRC32; +import java.util.zip.Deflater; + +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.util.Args; + +/** + * Streams the output of another {@link AsyncEntityProducer} through GZIP + * compression without blocking I/O classes. A minimal 10-byte header is + * emitted first, deflate blocks follow, then an 8-byte trailer carrying the + * CRC-32 and the uncompressed byte count. + * + *

The producer honours back-pressure: if {@code DataStreamChannel.write()} + * returns 0, compression stops until the reactor calls {@code requestOutput()} + * again.

+ * + * @since 5.6 + */ +public final class GzipAsyncEntityProducer implements AsyncEntityProducer { + + private static final int IN_BUF = 8 * 1024; + private static final int OUT_BUF = 8 * 1024; + + private static final byte[] GZIP_HEADER = { + (byte) 0x1F, (byte) 0x8B, // ID1-ID2 + 8, // CM = deflate + 0, // FLG = no extras + 0, 0, 0, 0, // MTIME = 0 + 0, // XFL + (byte) 255 // OS = unknown + }; + + private final AsyncEntityProducer delegate; + private final ContentType contentType; + + private final Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true); + private final CRC32 crc = new CRC32(); + + private final byte[] inBuf = new byte[IN_BUF]; + private final byte[] outBuf = new byte[OUT_BUF]; + private final ByteBuffer pending = ByteBuffer.allocate(OUT_BUF * 2); + + private boolean headerSent = false; + private boolean trailerSent = false; + private long uncompressed = 0; + private final AtomicBoolean delegateEnded = new AtomicBoolean(false); + + public GzipAsyncEntityProducer(final AsyncEntityProducer delegate) { + this.delegate = Args.notNull(delegate, "delegate"); + this.contentType = ContentType.parse(delegate.getContentType()); + pending.flip(); // empty read-mode + } + + + @Override + public boolean isRepeatable() { + return false; + } + + @Override + public long getContentLength() { + return -1; + } + + @Override + public String getContentType() { + return contentType.toString(); + } + + @Override + public String getContentEncoding() { + return "gzip"; + } + + @Override + public boolean isChunked() { + return true; + } + + @Override + public Set getTrailerNames() { + return Collections.emptySet(); + } + + @Override + public int available() { + return pending.hasRemaining() ? pending.remaining() : delegate.available(); + } + + @Override + public void produce(final DataStreamChannel ch) throws IOException { + + if (flushPending(ch)) { + return; + } + + if (!headerSent) { + writeBytes(GZIP_HEADER); + headerSent = true; + if (flushPending(ch)) { + return; + } + } + + delegate.produce(new InnerChannel(ch)); + + if (delegateEnded.get() && !trailerSent) { + + deflater.finish(); + + while (!deflater.finished()) { + deflateToPending(Deflater.NO_FLUSH); + if (flushPending(ch)) { + return; + } + } + writeTrailer(); + trailerSent = true; + flushPending(ch); + + if (!pending.hasRemaining()) { + ch.endStream(); + } + } + } + + + private final class InnerChannel implements DataStreamChannel { + private final DataStreamChannel outer; + + InnerChannel(final DataStreamChannel outer) { + this.outer = outer; + } + + @Override + public void requestOutput() { + outer.requestOutput(); + } + + @Override + public int write(final ByteBuffer src) throws IOException { + int consumed = 0; + while (src.hasRemaining()) { + final int len = Math.min(src.remaining(), inBuf.length); + src.get(inBuf, 0, len); + crc.update(inBuf, 0, len); + uncompressed += len; + + deflater.setInput(inBuf, 0, len); + /* SYNC_FLUSH closes the current block so no stray bits linger */ + deflateToPending(Deflater.SYNC_FLUSH); + + if (flushPending(outer)) { + return consumed + len; + } + consumed += len; + } + return consumed; + } + + @Override + public void endStream() { + delegateEnded.set(true); + } + + @Override + public void endStream(final List t) { + endStream(); + } + } + + + private void deflateToPending(final int flushMode) { + pending.compact(); + while (true) { + final int n = deflater.deflate(outBuf, 0, outBuf.length, flushMode); + if (n == 0) { + break; + } + pending.put(outBuf, 0, n); + } + pending.flip(); + } + + private void writeBytes(final byte[] src) { + pending.compact(); + pending.put(src); + pending.flip(); + } + + private void writeTrailer() { + pending.compact(); + writeIntLE((int) crc.getValue()); + writeIntLE((int) (uncompressed & 0xFFFFFFFFL)); + pending.flip(); + } + + private void writeIntLE(final int v) { + pending.put((byte) v); + pending.put((byte) (v >> 8)); + pending.put((byte) (v >> 16)); + pending.put((byte) (v >> 24)); + } + + /** + * @return {@code true} if transport is full and caller must stop. + */ + private boolean flushPending(final DataStreamChannel ch) throws IOException { + while (pending.hasRemaining()) { + if (ch.write(pending) == 0) { + return true; // back-pressure + } + } + return false; + } + + /* ---------- boiler-plate ---------- */ + + @Override + public void failed(final Exception ex) { + delegate.failed(ex); + } + + @Override + public void releaseResources() { + delegate.releaseResources(); + deflater.end(); + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityConsumerTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityConsumerTest.java new file mode 100644 index 0000000000..f5235c4597 --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityConsumerTest.java @@ -0,0 +1,290 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.async.methods; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.zip.CRC32; +import java.util.zip.Deflater; +import java.util.zip.GZIPOutputStream; + +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.nio.AsyncEntityConsumer; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class GzipAsyncEntityConsumerTest { + + private static String ORIGINAL; + + @BeforeAll + static void build() { + final String seed = "inflate me 🎉 "; + final StringBuilder sb = new StringBuilder(seed.length() * 3000); + for (int i = 0; i < 3000; i++) { + sb.append(seed); + } + ORIGINAL = sb.toString(); + } + + private static byte[] gzip(final byte[] data) throws Exception { + final java.io.ByteArrayOutputStream out = new java.io.ByteArrayOutputStream(); + final GZIPOutputStream gos = new GZIPOutputStream(out); + gos.write(data); + gos.close(); + return out.toByteArray(); + } + + /** + * Build a gzip frame with optional FEXTRA / FNAME / FCOMMENT / FHCRC flags. + */ + private static byte[] customGzip(final byte[] data, + final boolean extra, + final boolean name, + final boolean comment, + final boolean hcrc) throws Exception { + + /* --------- compress payload (raw deflate) --------- */ + final Deflater def = new Deflater(Deflater.DEFAULT_COMPRESSION, true); + def.setInput(data); + def.finish(); + final byte[] buf = new byte[8192]; + final java.io.ByteArrayOutputStream deflated = new java.io.ByteArrayOutputStream(); + while (!def.finished()) { + final int n = def.deflate(buf); + deflated.write(buf, 0, n); + } + def.end(); + final byte[] deflatedBytes = deflated.toByteArray(); + + /* --------- construct header --------- */ + final java.io.ByteArrayOutputStream out = new java.io.ByteArrayOutputStream(); + final CRC32 crc32 = new CRC32(); + crc32.update(data, 0, data.length); // trailer CRC-32 + + final java.io.ByteArrayOutputStream hdr = new java.io.ByteArrayOutputStream(); + hdr.write(0x1F); + hdr.write(0x8B); // ID1, ID2 + hdr.write(8); // CM = deflate + int flg = 0; + if (extra) { + flg |= 4; + } + if (name) { + flg |= 8; + } + if (comment) { + flg |= 16; + } + if (hcrc) { + flg |= 2; + } + hdr.write(flg); // FLG + hdr.write(new byte[]{0, 0, 0, 0}); // MTIME + hdr.write(0); // XFL + hdr.write(0xFF); // OS + + if (extra) { + hdr.write(4); + hdr.write(0); // XLEN = 4 + hdr.write(new byte[]{1, 2, 3, 4}); + } + if (name) { + hdr.write("file.txt".getBytes(StandardCharsets.ISO_8859_1)); + hdr.write(0); + } + if (comment) { + hdr.write("comment".getBytes(StandardCharsets.ISO_8859_1)); + hdr.write(0); + } + byte[] hdrBytes = hdr.toByteArray(); + + if (hcrc) { + /* ---------- CRC-16 over *optional* part only (bytes 10 .. n-1) ---------- */ + int crc16 = 0; + for (int i = 10; i < hdrBytes.length; i++) { // skip fixed 10-byte header + final int b = hdrBytes[i] & 0xFF; + crc16 ^= b; + for (int k = 0; k < 8; k++) { + crc16 = (crc16 & 1) != 0 ? (crc16 >>> 1) ^ 0xA001 : (crc16 >>> 1); + } + } + hdr.write(crc16 & 0xFF); + hdr.write((crc16 >>> 8) & 0xFF); + hdrBytes = hdr.toByteArray(); // final header incl. CRC-16 + } + + out.write(hdrBytes); + out.write(deflatedBytes); + + /* --------- trailer --------- */ + writeIntLE(out, (int) crc32.getValue()); + writeIntLE(out, data.length); + + return out.toByteArray(); + } + + private static void writeIntLE(final java.io.ByteArrayOutputStream out, final int v) { + out.write(v); + out.write(v >>> 8); + out.write(v >>> 16); + out.write(v >>> 24); + } + + private static class BytesCollector implements AsyncEntityConsumer { + private final java.io.ByteArrayOutputStream buf = new java.io.ByteArrayOutputStream(); + + @Override + public void streamStart(final EntityDetails ed, final FutureCallback cb) { + } + + @Override + public void updateCapacity(final CapacityChannel c) { + } + + @Override + public void consume(final ByteBuffer src) { + final byte[] b = new byte[src.remaining()]; + src.get(b); + buf.write(b, 0, b.length); + } + + @Override + public void streamEnd(final List t) { + } + + @Override + public void failed(final Exception cause) { + throw new RuntimeException(cause); + } + + @Override + public void releaseResources() { + } + + @Override + public String getContent() { + return new String(buf.toByteArray(), StandardCharsets.UTF_8); + } + } + + private static void runInflateTest(final byte[] wire) throws Exception { + final BytesCollector inner = new BytesCollector(); + final GzipAsyncEntityConsumer inflating = new GzipAsyncEntityConsumer<>(inner); + + final CountDownLatch done = new CountDownLatch(1); + final FutureCallback cb = new FutureCallback() { + @Override + public void completed(final String r) { + done.countDown(); + } + + @Override + public void failed(final Exception ex) { + fail(ex); + } + + @Override + public void cancelled() { + fail("cancelled"); + } + }; + + inflating.streamStart(new EntityDetails() { + public long getContentLength() { + return wire.length; + } + + public String getContentType() { + return "text/plain"; + } + + public String getContentEncoding() { + return "gzip"; + } + + public boolean isChunked() { + return false; + } + + public Set getTrailerNames() { + return new HashSet<>(); + } + }, cb); + + for (int off = 0; off < wire.length; off += 777) { + final int n = Math.min(777, wire.length - off); + inflating.consume(ByteBuffer.wrap(wire, off, n)); + } + inflating.streamEnd(Collections.
emptyList()); + + assertTrue(done.await(2, TimeUnit.SECONDS)); + assertEquals(ORIGINAL, inner.getContent()); + } + + @Test + void fullInflate() throws Exception { + runInflateTest(gzip(ORIGINAL.getBytes(StandardCharsets.UTF_8))); + } + + @Test + void headerExtra() throws Exception { + runInflateTest(customGzip(ORIGINAL.getBytes(StandardCharsets.UTF_8), + true, false, false, false)); + } + + @Test + void headerName() throws Exception { + runInflateTest(customGzip(ORIGINAL.getBytes(StandardCharsets.UTF_8), + false, true, false, false)); + } + + @Test + void headerComment() throws Exception { + runInflateTest(customGzip(ORIGINAL.getBytes(StandardCharsets.UTF_8), + false, false, true, false)); + } + + @Test + void headerCrc16() throws Exception { + runInflateTest(customGzip(ORIGINAL.getBytes(StandardCharsets.UTF_8), + true, true, true, true)); + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityProducerTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityProducerTest.java new file mode 100644 index 0000000000..0aa1d6c142 --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/GzipAsyncEntityProducerTest.java @@ -0,0 +1,125 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.async.methods; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.zip.GZIPInputStream; + +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer; +import org.junit.jupiter.api.Test; + +class GzipAsyncEntityProducerTest { + + private static final class CollectingChannel implements DataStreamChannel { + private final ByteArrayOutputStream sink = new ByteArrayOutputStream(); + private final int maxChunk; + + CollectingChannel(final int maxChunk) { + this.maxChunk = maxChunk; + } + + @Override + public void requestOutput() { /* not used in unit test */ } + + @Override + public int write(final ByteBuffer src) { + final int len = Math.min(src.remaining(), maxChunk); + final byte[] tmp = new byte[len]; + src.get(tmp); + sink.write(tmp, 0, len); + return len; + } + + @Override + public void endStream() { + } + + @Override + public void endStream(final List trailers) { + } + + byte[] toByteArray() { + return sink.toByteArray(); + } + } + + private static String buildLargeText() { + final String unit = "GZIP round-trip ✓ "; + final StringBuilder sb = new StringBuilder(unit.length() * 2000); + for (int i = 0; i < 2000; i++) { + sb.append(unit); + } + return sb.toString(); + } + + @Test + void roundTrip() throws Exception { + + final String original = buildLargeText(); + + final AsyncEntityProducer raw = + new StringAsyncEntityProducer(original, ContentType.TEXT_PLAIN); + final GzipAsyncEntityProducer gzip = + new GzipAsyncEntityProducer(raw); + + final CollectingChannel channel = new CollectingChannel(1024); + + /* drive the producer until it reports no more data */ + while (gzip.available() > 0) { + gzip.produce(channel); + } + + final byte[] wire = channel.toByteArray(); + assertTrue(wire.length > 0, "producer wrote no data"); + + /* inflate using JDK's GZIPInputStream to verify correctness */ + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final byte[] buf = new byte[4096]; + final GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream(wire)); + int n; + while ((n = gis.read(buf)) != -1) { + out.write(buf, 0, n); + } + gis.close(); + + final String roundTrip = out.toString(StandardCharsets.UTF_8.name()); + assertEquals(original, roundTrip, "round-tripped text differs"); + assertEquals("gzip", gzip.getContentEncoding(), "wrong Content-Encoding"); + assertTrue(gzip.isChunked(), "producer must be chunked"); + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientGzipCompressionExample.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientGzipCompressionExample.java new file mode 100644 index 0000000000..557f9ee57c --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientGzipCompressionExample.java @@ -0,0 +1,86 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.examples; + +import java.util.concurrent.Future; + +import org.apache.hc.client5.http.async.methods.GzipAsyncEntityProducer; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer; +import org.apache.hc.core5.http.nio.support.BasicRequestProducer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.io.CloseMode; + +/** + * POST a JSON body compressed on-the-fly with GZIP. + */ +public class AsyncClientGzipCompressionExample { + + public static void main(final String[] args) throws Exception { + try (final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault()) { + client.start(); + + final String payload = "{\"msg\":\"hello gzip world\"}"; + final AsyncEntityProducer raw = new StringAsyncEntityProducer(payload, ContentType.APPLICATION_JSON); + final AsyncEntityProducer gzip = new GzipAsyncEntityProducer(raw); + + final SimpleHttpRequest request = SimpleRequestBuilder + .post("https://httpbin.org/post") + .addHeader("Content-Encoding", "gzip") + .build(); + + final Future> future = client.execute( + new BasicRequestProducer(request, gzip), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + new FutureCallback>() { + public void completed(final Message msg) { + System.out.println(msg.getHead().getCode() + " " + msg.getBody()); + } + + public void failed(final Exception cause) { + System.out.println(request + "->" + cause); + } + + public void cancelled() { + System.out.println(request + " cancelled"); + } + }); + future.get(); + client.close(CloseMode.GRACEFUL); + } + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientGzipDecompressionExample.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientGzipDecompressionExample.java new file mode 100644 index 0000000000..37b26545f3 --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientGzipDecompressionExample.java @@ -0,0 +1,79 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.examples; + +import java.util.concurrent.Future; + +import org.apache.hc.client5.http.async.methods.GzipAsyncEntityConsumer; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; + +/** + * GET /gzip from httpbin and inflate on the fly. + */ +public class AsyncClientGzipDecompressionExample { + + public static void main(final String[] args) throws Exception { + try (final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault()) { + client.start(); + + final SimpleHttpRequest request = + SimpleRequestBuilder.get("https://httpbin.org/gzip").build(); + + final GzipAsyncEntityConsumer unzip = + new GzipAsyncEntityConsumer<>(new StringAsyncEntityConsumer()); + + final Future> future = client.execute( + SimpleRequestProducer.create(request), + new BasicResponseConsumer<>(unzip), + new FutureCallback>() { + public void completed(final Message msg) { + System.out.println(msg.getHead().getCode() + " OK"); + System.out.println("inflated len: " + msg.getBody().length()); + } + + public void failed(final Exception cause) { + System.out.println(request + "->" + cause); + } + + public void cancelled() { + System.out.println(request + " cancelled"); + } + }); + future.get(); + } + } +}