Skip to content

Commit 72a00a0

Browse files
Add Zstandard to async content compression (#703)
Wire InflatingZstdDataConsumer; add DeflatingZstdEntityProducer for streaming requests
1 parent 3947600 commit 72a00a0

File tree

9 files changed

+1123
-11
lines changed

9 files changed

+1123
-11
lines changed

httpclient5/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@
116116
<dependency>
117117
<groupId>com.github.luben</groupId>
118118
<artifactId>zstd-jni</artifactId>
119-
<scope>test</scope>
119+
<optional>true</optional>
120120
</dependency>
121121
</dependencies>
122122

Lines changed: 352 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,352 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
28+
package org.apache.hc.client5.http.async.methods;
29+
30+
import java.io.IOException;
31+
import java.nio.ByteBuffer;
32+
import java.util.ArrayDeque;
33+
import java.util.Collections;
34+
import java.util.Deque;
35+
import java.util.Set;
36+
import java.util.concurrent.atomic.AtomicBoolean;
37+
38+
import com.github.luben.zstd.ZstdDirectBufferCompressingStream;
39+
40+
import org.apache.hc.core5.http.Header;
41+
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
42+
import org.apache.hc.core5.http.nio.DataStreamChannel;
43+
import org.apache.hc.core5.util.Args;
44+
45+
/**
46+
* {@code AsyncEntityProducer} that compresses the bytes produced by a delegate entity
47+
* into a single <a href="https://www.rfc-editor.org/rfc/rfc8878">Zstandard</a> (zstd) frame
48+
* on the fly.
49+
*
50+
* <p>This producer wraps a {@link org.apache.hc.core5.http.nio.AsyncEntityProducer} and
51+
* performs streaming, ByteBuffer-to-ByteBuffer compression as the delegate writes to the
52+
* provided {@link org.apache.hc.core5.http.nio.DataStreamChannel}. No {@code InputStream}
53+
* is used in the client pipeline.</p>
54+
*
55+
* <p>Metadata reported by this producer:</p>
56+
* <ul>
57+
* <li>{@link #getContentEncoding()} returns {@code "zstd"}.</li>
58+
* <li>{@link #getContentLength()} returns {@code -1} (unknown after compression).</li>
59+
* <li>{@link #isChunked()} returns {@code true} (requests are typically sent chunked).</li>
60+
* </ul>
61+
*
62+
* <h3>Usage</h3>
63+
* <pre>{@code
64+
* AsyncEntityProducer plain = new StringAsyncEntityProducer("payload", ContentType.TEXT_PLAIN);
65+
* AsyncEntityProducer zstd = new DeflatingZstdEntityProducer(plain);
66+
*
67+
* SimpleHttpRequest req = SimpleRequestBuilder.post("http://localhost/echo")
68+
* .setHeader(HttpHeaders.CONTENT_ENCODING, "zstd") // inform the server
69+
* .build();
70+
*
71+
* client.execute(
72+
* new BasicRequestProducer(req, zstd),
73+
* new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
74+
* null);
75+
* }</pre>
76+
*
77+
* <h3>Behavior</h3>
78+
* <ul>
79+
* <li><b>Streaming & back-pressure:</b> compressed output is staged in direct
80+
* {@link java.nio.ByteBuffer}s and written only when the channel accepts bytes.
81+
* When {@code DataStreamChannel.write(...)} returns {@code 0}, the producer pauses and
82+
* requests another output turn.</li>
83+
* <li><b>Finalization:</b> after the delegate signals {@code endStream()}, this producer emits
84+
* the zstd frame epilogue and then calls {@code DataStreamChannel.endStream()}.</li>
85+
* <li><b>Repeatability:</b> repeatable only if the delegate is repeatable.</li>
86+
* <li><b>Headers:</b> callers are responsible for sending {@code Content-Encoding: zstd} on
87+
* the request if required by the server. Content length is not known in advance.</li>
88+
* <li><b>Resources:</b> invoke {@link #releaseResources()} to free native compressor resources.</li>
89+
* </ul>
90+
*
91+
* <h3>Constructors</h3>
92+
* <ul>
93+
* <li>{@code DeflatingZstdEntityProducer(delegate)} – uses a default compression level.</li>
94+
* <li>{@code DeflatingZstdEntityProducer(delegate, level)} – explicitly sets the zstd level.</li>
95+
* </ul>
96+
*
97+
* <h3>Thread-safety</h3>
98+
* <p>Not thread-safe; one instance per message exchange.</p>
99+
*
100+
* <h3>Runtime dependency</h3>
101+
* <p>Requires {@code com.github.luben:zstd-jni} on the classpath.</p>
102+
*
103+
* @see org.apache.hc.client5.http.async.methods.InflatingZstdDataConsumer
104+
* @see org.apache.hc.core5.http.nio.support.BasicRequestProducer
105+
* @see org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer
106+
* @see org.apache.hc.client5.http.impl.async.ContentCompressionAsyncExec
107+
* @since 5.6
108+
*/
109+
public final class DeflatingZstdEntityProducer implements AsyncEntityProducer {
110+
111+
private static final int IN_BUF = 64 * 1024;
112+
private static final int OUT_BUF_DEFAULT = 128 * 1024;
113+
114+
private final AsyncEntityProducer delegate;
115+
116+
/**
117+
* Direct staging for heap inputs.
118+
*/
119+
private final ByteBuffer inDirect = ByteBuffer.allocateDirect(IN_BUF);
120+
121+
/**
122+
* Pending compressed output buffers, ready to write (pos=0..limit).
123+
*/
124+
private final Deque<ByteBuffer> pending = new ArrayDeque<>();
125+
126+
/**
127+
* Current output buffer owned by zstd; replaced when it overflows or flushes.
128+
*/
129+
private ByteBuffer outBuf;
130+
131+
/**
132+
* Zstd compressor stream.
133+
*/
134+
private ZstdDirectBufferCompressingStream zstream;
135+
136+
private volatile boolean upstreamEnded = false;
137+
private volatile boolean finished = false;
138+
private final AtomicBoolean released = new AtomicBoolean(false);
139+
140+
private final int level;
141+
private final int outCap;
142+
143+
public DeflatingZstdEntityProducer(final AsyncEntityProducer delegate) {
144+
this(delegate, 3); // default compression level
145+
}
146+
147+
public DeflatingZstdEntityProducer(final AsyncEntityProducer delegate, final int level) {
148+
this.delegate = Args.notNull(delegate, "delegate");
149+
this.level = level;
150+
inDirect.limit(0);
151+
152+
// Pick a sensible out buffer size (at least the recommended size).
153+
final int rec = ZstdDirectBufferCompressingStream.recommendedOutputBufferSize();
154+
this.outCap = Math.max(OUT_BUF_DEFAULT, rec);
155+
outBuf = ByteBuffer.allocateDirect(outCap);
156+
}
157+
158+
@Override
159+
public boolean isRepeatable() {
160+
return delegate.isRepeatable();
161+
}
162+
163+
@Override
164+
public long getContentLength() {
165+
return -1;
166+
} // unknown after compression
167+
168+
@Override
169+
public String getContentType() {
170+
return delegate.getContentType();
171+
}
172+
173+
@Override
174+
public String getContentEncoding() {
175+
return "zstd";
176+
}
177+
178+
@Override
179+
public boolean isChunked() {
180+
return true;
181+
}
182+
183+
@Override
184+
public Set<String> getTrailerNames() {
185+
return Collections.emptySet();
186+
}
187+
188+
@Override
189+
public int available() {
190+
if (!pending.isEmpty()) {
191+
final ByteBuffer head = pending.peekFirst();
192+
return head != null ? head.remaining() : 1;
193+
}
194+
// Delegate ended but we still must write zstd frame epilogue (produced on close()).
195+
if (upstreamEnded && !finished) {
196+
// Return a positive value to keep the reactor calling produce().
197+
return OUT_BUF_DEFAULT;
198+
}
199+
return delegate.available();
200+
}
201+
202+
@Override
203+
public void produce(final DataStreamChannel chan) throws IOException {
204+
ensureStreamInitialized();
205+
206+
// 1) flush anything already compressed
207+
if (!flushPending(chan)) {
208+
return; // back-pressure; we'll be called again
209+
}
210+
if (finished) {
211+
return;
212+
}
213+
214+
// 2) pull more input from delegate (this will drive compression via Inner.write)
215+
delegate.produce(new Inner(chan));
216+
217+
// 3) If upstream ended, finish the frame and drain everything
218+
if (upstreamEnded && !finished) {
219+
try {
220+
zstream.close(); // triggers flushBuffer for remaining + frame trailer
221+
} finally {
222+
// fall through
223+
}
224+
225+
if (!flushPending(chan)) {
226+
// trailer not fully sent yet; wait for next turn
227+
return;
228+
}
229+
finished = true;
230+
chan.endStream();
231+
}
232+
}
233+
234+
private void ensureStreamInitialized() throws IOException {
235+
if (zstream != null) {
236+
return;
237+
}
238+
// Create the compressor; override flushBuffer to queue full buffers.
239+
zstream = new ZstdDirectBufferCompressingStream(outBuf, level) {
240+
@Override
241+
protected ByteBuffer flushBuffer(final ByteBuffer toFlush) throws IOException {
242+
toFlush.flip();
243+
if (toFlush.hasRemaining()) {
244+
pending.addLast(toFlush); // queue for network write
245+
}
246+
// hand a fresh direct buffer back to the compressor
247+
outBuf = ByteBuffer.allocateDirect(outCap);
248+
return outBuf;
249+
}
250+
};
251+
}
252+
253+
/**
254+
* Try to write as much of the pending compressed data as the channel accepts.
255+
*/
256+
private boolean flushPending(final DataStreamChannel chan) throws IOException {
257+
while (!pending.isEmpty()) {
258+
final ByteBuffer head = pending.peekFirst();
259+
while (head.hasRemaining()) {
260+
final int n = chan.write(head);
261+
if (n == 0) {
262+
// back-pressure: ask to be called again
263+
chan.requestOutput();
264+
return false;
265+
}
266+
}
267+
pending.removeFirst(); // this buffer fully sent
268+
}
269+
return true;
270+
}
271+
272+
/**
273+
* Compress the bytes in {@code src} (may be heap or direct).
274+
*/
275+
private int compressFrom(final ByteBuffer src) throws IOException {
276+
final int before = src.remaining();
277+
if (src.isDirect()) {
278+
zstream.compress(src);
279+
} else {
280+
// Stage heap → direct in chunks
281+
while (src.hasRemaining()) {
282+
inDirect.compact();
283+
final int take = Math.min(inDirect.remaining(), src.remaining());
284+
final int oldLimit = src.limit();
285+
src.limit(src.position() + take);
286+
inDirect.put(src);
287+
src.limit(oldLimit);
288+
inDirect.flip();
289+
zstream.compress(inDirect);
290+
}
291+
}
292+
// The compressor calls flushBuffer() as needed; new buffers are queued in 'pending'.
293+
return before - src.remaining();
294+
}
295+
296+
private final class Inner implements DataStreamChannel {
297+
private final DataStreamChannel downstream;
298+
299+
Inner(final DataStreamChannel downstream) {
300+
this.downstream = downstream;
301+
}
302+
303+
@Override
304+
public void requestOutput() {
305+
downstream.requestOutput();
306+
}
307+
308+
@Override
309+
public int write(final ByteBuffer src) throws IOException {
310+
final int consumed = compressFrom(src);
311+
// Try to flush any buffers the compressor just queued
312+
if (!flushPending(downstream)) {
313+
// Not all data could be written now; ensure we get another callback
314+
downstream.requestOutput();
315+
}
316+
return consumed;
317+
}
318+
319+
@Override
320+
public void endStream() {
321+
upstreamEnded = true;
322+
// We will finalize and flush in the outer produce(); make sure it runs again soon.
323+
downstream.requestOutput();
324+
}
325+
326+
@Override
327+
public void endStream(final java.util.List<? extends Header> trailers) {
328+
endStream();
329+
}
330+
}
331+
332+
@Override
333+
public void failed(final Exception cause) {
334+
delegate.failed(cause);
335+
}
336+
337+
@Override
338+
public void releaseResources() {
339+
if (released.compareAndSet(false, true)) {
340+
try {
341+
try {
342+
if (zstream != null) {
343+
zstream.close();
344+
}
345+
} catch (final IOException ignore) {
346+
}
347+
} finally {
348+
delegate.releaseResources();
349+
}
350+
}
351+
}
352+
}

0 commit comments

Comments
 (0)