Skip to content

Commit e94872f

Browse files
committed
Add fully-streaming async GZIP producer/consumer
1 parent 9d38e5f commit e94872f

6 files changed

Lines changed: 1225 additions & 0 deletions

File tree

Lines changed: 379 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,379 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.client5.http.async.methods;
28+
29+
import java.io.IOException;
30+
import java.nio.ByteBuffer;
31+
import java.util.ArrayDeque;
32+
import java.util.List;
33+
import java.util.Queue;
34+
import java.util.concurrent.atomic.AtomicBoolean;
35+
import java.util.zip.CRC32;
36+
import java.util.zip.DataFormatException;
37+
import java.util.zip.Inflater;
38+
39+
import org.apache.hc.core5.concurrent.FutureCallback;
40+
import org.apache.hc.core5.http.EntityDetails;
41+
import org.apache.hc.core5.http.Header;
42+
import org.apache.hc.core5.http.HttpException;
43+
import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
44+
import org.apache.hc.core5.http.nio.CapacityChannel;
45+
import org.apache.hc.core5.util.Args;
46+
47+
/**
48+
* Streaming, non-blocking consumer for responses encoded with
49+
* {@code Content-Encoding: gzip}.
50+
*
51+
* <p>The implementation follows the public GZIP specification:</p>
52+
* <ol>
53+
* <li>Verifies the fixed 10-byte header (ID1 0x1F, ID2 0x8B, CM 8).</li>
54+
* <li>Parses / skips optional sections signalled by the FLG bits<br>
55+
* (FEXTRA, FNAME, FCOMMENT, FHCRC, FTEXT).</li>
56+
* <li>Inflates the raw DEFLATE blocks (<em>nowrap=true</em>) while
57+
* streaming the plain bytes to an inner consumer.</li>
58+
* <li>Collects the 8-byte trailer (CRC-32 &amp; ISIZE) and validates it
59+
* on {@link #streamEnd(List)}.</li>
60+
* </ol>
61+
*
62+
* @param <T> result type produced by the wrapped inner consumer
63+
* @since 5.6
64+
*/
65+
public final class GzipAsyncEntityConsumer<T> implements AsyncEntityConsumer<T> {
66+
67+
private static final int FTEXT = 1; // not used, informative only
68+
private static final int FHCRC = 2;
69+
private static final int FEXTRA = 4;
70+
private static final int FNAME = 8;
71+
private static final int FCOMMENT = 16;
72+
73+
private static final int OUT_BUF = 8 * 1024; // inflate buffer
74+
75+
private final AsyncEntityConsumer<T> inner;
76+
private final Inflater inflater = new Inflater(true); // raw DEFLATE blocks
77+
private final CRC32 crc32 = new CRC32();
78+
79+
private final byte[] out = new byte[OUT_BUF];
80+
private final Queue<Byte> trailerBuf = new ArrayDeque<Byte>(8);
81+
82+
private final Queue<Byte> hdrFixed = new ArrayDeque<Byte>(10);
83+
private int flg = 0;
84+
private int extraRemaining = -1;
85+
private boolean wantHdrCrc = false;
86+
private int hdrCrcCalc = 0; // incremental CRC-16
87+
88+
private boolean headerDone = false;
89+
private long uncompressed = 0;
90+
91+
/* ---------- completion plumbing ---------- */
92+
private FutureCallback<T> cb;
93+
private final AtomicBoolean completed = new AtomicBoolean(false);
94+
95+
public GzipAsyncEntityConsumer(final AsyncEntityConsumer<T> inner) {
96+
this.inner = Args.notNull(inner, "inner");
97+
}
98+
99+
@Override
100+
public void streamStart(final EntityDetails entityDetails,
101+
final FutureCallback<T> resultCb)
102+
throws HttpException, IOException {
103+
104+
if (entityDetails.getContentEncoding() != null
105+
&& !"gzip".equalsIgnoreCase(entityDetails.getContentEncoding())) {
106+
throw new HttpException("Unsupported content-encoding: "
107+
+ entityDetails.getContentEncoding());
108+
}
109+
this.cb = resultCb;
110+
inner.streamStart(entityDetails, resultCb);
111+
}
112+
113+
@Override
114+
public void updateCapacity(final CapacityChannel channel) throws IOException {
115+
channel.update(Integer.MAX_VALUE);
116+
inner.updateCapacity(channel);
117+
}
118+
119+
@Override
120+
public void consume(final ByteBuffer src) throws IOException {
121+
try {
122+
while (src.hasRemaining()) {
123+
124+
if (!headerDone) {
125+
if (!parseHeader(src)) {
126+
return; // need more bytes
127+
}
128+
continue; // fall through
129+
}
130+
131+
final int n = src.remaining();
132+
final byte[] chunk = new byte[n];
133+
src.get(chunk);
134+
135+
inflater.setInput(chunk);
136+
inflateLoop();
137+
138+
for (int i = Math.max(0, n - 8); i < n; i++) {
139+
trailerBuf.add(chunk[i]);
140+
if (trailerBuf.size() > 8) trailerBuf.remove();
141+
}
142+
}
143+
} catch (final DataFormatException ex) {
144+
throw new IOException("Corrupt GZIP stream", ex);
145+
}
146+
}
147+
148+
/**
149+
* Incrementally parses the variable-length part of a GZIP header.
150+
*
151+
* <p>The method consumes bytes from {@code src} until either the header is
152+
* finished (returns {@code true}) or the buffer runs empty
153+
* (returns {@code false}). It keeps state across calls so it can be fed
154+
* arbitrarily-sized chunks straight from the I/O reactor.</p>
155+
*
156+
* <p>Handled sections, in order:</p>
157+
* <ul>
158+
* <li><strong>Fixed 10-byte header</strong> &nbsp;ID1, ID2, CM, FLG, MTIME, XFL, OS.</li>
159+
* <li>{@code FEXTRA} – extra field whose length is a little-endian 16-bit value.</li>
160+
* <li>{@code FNAME} – zero-terminated original file-name.</li>
161+
* <li>{@code FCOMMENT} – zero-terminated comment string.</li>
162+
* <li>{@code FHCRC} – CRC-16 over everything <em>after</em> the fixed
163+
* 10-byte header (checked against {@link #hdrCrcCalc}).</li>
164+
* </ul>
165+
*
166+
* <p>Every time a byte is consumed, {@link #updateHdrCrc(byte)} is called
167+
* so that the running CRC-16 is maintained whenever {@code FHCRC} is set.</p>
168+
*
169+
* @param src inbound buffer containing header bytes
170+
* @return {@code true} once the entire header is parsed, otherwise
171+
* {@code false} (caller must supply more data)
172+
* @throws IllegalStateException if the header is malformed
173+
*/
174+
private boolean parseHeader(final ByteBuffer src) {
175+
176+
while (src.hasRemaining() && !headerDone) {
177+
178+
/* 0----- consume the mandatory 10-byte fixed header */
179+
if (hdrFixed.size() < 10) {
180+
final byte b = src.get();
181+
hdrFixed.add(b);
182+
updateHdrCrc(b);
183+
184+
if (hdrFixed.size() == 10) {
185+
final byte[] h = new byte[10];
186+
int i = 0;
187+
for (final Byte bb : hdrFixed) h[i++] = bb;
188+
189+
if (h[0] != (byte) 0x1F || h[1] != (byte) 0x8B || h[2] != 8) {
190+
throw new IllegalStateException("Not a GZIP header");
191+
}
192+
flg = h[3] & 0xFF;
193+
if ((flg & 0xE0) != 0) { // bits 5-7 reserved
194+
throw new IllegalStateException("Reserved GZIP flag bits set: " + flg);
195+
}
196+
wantHdrCrc = (flg & FHCRC) != 0;
197+
}
198+
continue;
199+
}
200+
201+
if ((flg & FEXTRA) != 0) {
202+
if (extraRemaining < 0) { // read length (2 B LE)
203+
if (src.remaining() < 2) {
204+
return false;
205+
}
206+
final int lo = src.get() & 0xFF;
207+
updateHdrCrc((byte) lo);
208+
final int hi = src.get() & 0xFF;
209+
updateHdrCrc((byte) hi);
210+
extraRemaining = (hi << 8) | lo;
211+
if (extraRemaining == 0) {
212+
flg &= ~FEXTRA;
213+
}
214+
continue;
215+
}
216+
final int skip = Math.min(extraRemaining, src.remaining());
217+
for (int i = 0; i < skip; i++) {
218+
updateHdrCrc(src.get());
219+
}
220+
extraRemaining -= skip;
221+
if (extraRemaining == 0) {
222+
flg &= ~FEXTRA;
223+
}
224+
continue;
225+
}
226+
227+
if ((flg & FNAME) != 0) {
228+
while (src.hasRemaining()) {
229+
final byte b = src.get();
230+
updateHdrCrc(b);
231+
if (b == 0) {
232+
flg &= ~FNAME;
233+
break;
234+
}
235+
}
236+
continue;
237+
}
238+
239+
if ((flg & FCOMMENT) != 0) {
240+
while (src.hasRemaining()) {
241+
final byte b = src.get();
242+
updateHdrCrc(b);
243+
if (b == 0) {
244+
flg &= ~FCOMMENT;
245+
break;
246+
}
247+
}
248+
continue;
249+
}
250+
251+
if (wantHdrCrc) {
252+
if (src.remaining() < 2) {
253+
return false;
254+
}
255+
final byte lo = src.get();
256+
final byte hi = src.get();
257+
final int expect = ((hi & 0xFF) << 8) | (lo & 0xFF);
258+
if ((hdrCrcCalc & 0xFFFF) != expect) {
259+
throw new IllegalStateException("Header CRC16 mismatch");
260+
}
261+
wantHdrCrc = false; // consumed
262+
continue;
263+
}
264+
265+
/* header complete */
266+
headerDone = true;
267+
}
268+
return headerDone;
269+
}
270+
271+
/**
272+
* Updates the running CRC-16 used when the {@code FHCRC} flag is present.
273+
*
274+
* <p>The polynomial is the “reverse” 0xA001 (the same as Modbus / CRC-16-IBM),
275+
* which is exactly what RFC 1952 requires for the header checksum.
276+
* The algorithm is intentionally implemented bit-by-bit so it needs no
277+
* temporary tables and can run on any JVM version.</p>
278+
*
279+
* @param b the header byte just consumed
280+
*/
281+
private void updateHdrCrc(final byte b) {
282+
if (!wantHdrCrc) {
283+
return;
284+
}
285+
hdrCrcCalc ^= b & 0xFF;
286+
for (int k = 0; k < 8; k++) {
287+
hdrCrcCalc = (hdrCrcCalc & 1) != 0
288+
? (hdrCrcCalc >>> 1) ^ 0xA001
289+
: hdrCrcCalc >>> 1;
290+
}
291+
}
292+
293+
294+
/**
295+
* Pulls as many plain bytes as currently available from {@link #inflater},
296+
* streams them to the wrapped {@code inner} consumer, and updates the
297+
* running CRC-32 / ISIZE counters needed for trailer validation.
298+
*
299+
* @throws DataFormatException if the underlying DEFLATE stream is corrupt
300+
* @throws IOException if {@code inner.consume(...)} throws
301+
*/
302+
private void inflateLoop() throws IOException, DataFormatException {
303+
int n;
304+
while ((n = inflater.inflate(out)) > 0) {
305+
crc32.update(out, 0, n);
306+
uncompressed += n;
307+
inner.consume(ByteBuffer.wrap(out, 0, n));
308+
}
309+
}
310+
311+
/**
312+
* Called once the upstream I/O reactor signals end-of-stream.
313+
*
314+
* <ol>
315+
* <li>Drains any remaining compressed bytes (via {@link #inflateLoop()}).</li>
316+
* <li>Collects the eight-byte trailer from {@link #trailerBuf}.</li>
317+
* <li>Verifies CRC-32 and ISIZE against the values accumulated while
318+
* inflating.</li>
319+
* <li>Propagates {@code streamEnd()} to the wrapped consumer and fires
320+
* the user callback.</li>
321+
* </ol>
322+
*
323+
* @throws HttpException on protocol errors (e.g. wrong encoding)
324+
* @throws IOException on corrupt streams or checksum mismatch
325+
*/
326+
@Override
327+
public void streamEnd(final List<? extends Header> trailers)
328+
throws HttpException, IOException {
329+
330+
try {
331+
inflateLoop();
332+
} catch (final DataFormatException ex) {
333+
throw new IOException("Corrupt GZIP stream", ex);
334+
}
335+
if (trailerBuf.size() != 8) {
336+
throw new IOException("Truncated GZIP trailer");
337+
}
338+
339+
final byte[] tail = new byte[8];
340+
for (int i = 0; i < 8; i++) {
341+
tail[i] = trailerBuf.remove();
342+
}
343+
344+
final long crcVal = ((tail[3] & 0xFFL) << 24) | ((tail[2] & 0xFFL) << 16)
345+
| ((tail[1] & 0xFFL) << 8) | (tail[0] & 0xFFL);
346+
final long isz = ((tail[7] & 0xFFL) << 24) | ((tail[6] & 0xFFL) << 16)
347+
| ((tail[5] & 0xFFL) << 8) | (tail[4] & 0xFFL);
348+
349+
if (crcVal != crc32.getValue()) {
350+
throw new IOException("CRC-32 mismatch");
351+
}
352+
if (isz != (uncompressed & 0xFFFFFFFFL)) {
353+
throw new IOException("ISIZE mismatch");
354+
}
355+
356+
inner.streamEnd(trailers);
357+
completed.set(true);
358+
if (cb != null) cb.completed(inner.getContent());
359+
}
360+
361+
@Override
362+
public T getContent() {
363+
return inner.getContent();
364+
}
365+
366+
@Override
367+
public void failed(final Exception cause) {
368+
if (completed.compareAndSet(false, true) && cb != null) {
369+
cb.failed(cause);
370+
}
371+
inner.failed(cause);
372+
}
373+
374+
@Override
375+
public void releaseResources() {
376+
inflater.end();
377+
inner.releaseResources();
378+
}
379+
}

0 commit comments

Comments
 (0)