diff --git a/httpclient5/pom.xml b/httpclient5/pom.xml index 4a949d24fc..b97e1d21ec 100644 --- a/httpclient5/pom.xml +++ b/httpclient5/pom.xml @@ -116,7 +116,7 @@ com.github.luben zstd-jni - test + true diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/DeflatingZstdEntityProducer.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/DeflatingZstdEntityProducer.java new file mode 100644 index 0000000000..938d6d30ca --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/DeflatingZstdEntityProducer.java @@ -0,0 +1,352 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.async.methods; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Deque; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.github.luben.zstd.ZstdDirectBufferCompressingStream; + +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.util.Args; + +/** + * {@code AsyncEntityProducer} that compresses the bytes produced by a delegate entity + * into a single Zstandard (zstd) frame + * on the fly. + * + *

This producer wraps a {@link org.apache.hc.core5.http.nio.AsyncEntityProducer} and + * performs streaming, ByteBuffer-to-ByteBuffer compression as the delegate writes to the + * provided {@link org.apache.hc.core5.http.nio.DataStreamChannel}. No {@code InputStream} + * is used in the client pipeline.

+ * + *

Metadata reported by this producer:

+ * + * + *

Usage

+ *
{@code
+ * AsyncEntityProducer plain = new StringAsyncEntityProducer("payload", ContentType.TEXT_PLAIN);
+ * AsyncEntityProducer zstd  = new DeflatingZstdEntityProducer(plain);
+ *
+ * SimpleHttpRequest req = SimpleRequestBuilder.post("http://localhost/echo")
+ *     .setHeader(HttpHeaders.CONTENT_ENCODING, "zstd") // inform the server
+ *     .build();
+ *
+ * client.execute(
+ *     new BasicRequestProducer(req, zstd),
+ *     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
+ *     null);
+ * }
+ * + *

Behavior

+ * + * + *

Constructors

+ * + * + *

Thread-safety

+ *

Not thread-safe; one instance per message exchange.

+ * + *

Runtime dependency

+ *

Requires {@code com.github.luben:zstd-jni} on the classpath.

+ * + * @see org.apache.hc.client5.http.async.methods.InflatingZstdDataConsumer + * @see org.apache.hc.core5.http.nio.support.BasicRequestProducer + * @see org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer + * @see org.apache.hc.client5.http.impl.async.ContentCompressionAsyncExec + * @since 5.6 + */ +public final class DeflatingZstdEntityProducer implements AsyncEntityProducer { + + private static final int IN_BUF = 64 * 1024; + private static final int OUT_BUF_DEFAULT = 128 * 1024; + + private final AsyncEntityProducer delegate; + + /** + * Direct staging for heap inputs. + */ + private final ByteBuffer inDirect = ByteBuffer.allocateDirect(IN_BUF); + + /** + * Pending compressed output buffers, ready to write (pos=0..limit). + */ + private final Deque pending = new ArrayDeque<>(); + + /** + * Current output buffer owned by zstd; replaced when it overflows or flushes. + */ + private ByteBuffer outBuf; + + /** + * Zstd compressor stream. + */ + private ZstdDirectBufferCompressingStream zstream; + + private volatile boolean upstreamEnded = false; + private volatile boolean finished = false; + private final AtomicBoolean released = new AtomicBoolean(false); + + private final int level; + private final int outCap; + + public DeflatingZstdEntityProducer(final AsyncEntityProducer delegate) { + this(delegate, 3); // default compression level + } + + public DeflatingZstdEntityProducer(final AsyncEntityProducer delegate, final int level) { + this.delegate = Args.notNull(delegate, "delegate"); + this.level = level; + inDirect.limit(0); + + // Pick a sensible out buffer size (at least the recommended size). + final int rec = ZstdDirectBufferCompressingStream.recommendedOutputBufferSize(); + this.outCap = Math.max(OUT_BUF_DEFAULT, rec); + outBuf = ByteBuffer.allocateDirect(outCap); + } + + @Override + public boolean isRepeatable() { + return delegate.isRepeatable(); + } + + @Override + public long getContentLength() { + return -1; + } // unknown after compression + + @Override + public String getContentType() { + return delegate.getContentType(); + } + + @Override + public String getContentEncoding() { + return "zstd"; + } + + @Override + public boolean isChunked() { + return true; + } + + @Override + public Set getTrailerNames() { + return Collections.emptySet(); + } + + @Override + public int available() { + if (!pending.isEmpty()) { + final ByteBuffer head = pending.peekFirst(); + return head != null ? head.remaining() : 1; + } + // Delegate ended but we still must write zstd frame epilogue (produced on close()). + if (upstreamEnded && !finished) { + // Return a positive value to keep the reactor calling produce(). + return OUT_BUF_DEFAULT; + } + return delegate.available(); + } + + @Override + public void produce(final DataStreamChannel chan) throws IOException { + ensureStreamInitialized(); + + // 1) flush anything already compressed + if (!flushPending(chan)) { + return; // back-pressure; we'll be called again + } + if (finished) { + return; + } + + // 2) pull more input from delegate (this will drive compression via Inner.write) + delegate.produce(new Inner(chan)); + + // 3) If upstream ended, finish the frame and drain everything + if (upstreamEnded && !finished) { + try { + zstream.close(); // triggers flushBuffer for remaining + frame trailer + } finally { + // fall through + } + + if (!flushPending(chan)) { + // trailer not fully sent yet; wait for next turn + return; + } + finished = true; + chan.endStream(); + } + } + + private void ensureStreamInitialized() throws IOException { + if (zstream != null) { + return; + } + // Create the compressor; override flushBuffer to queue full buffers. + zstream = new ZstdDirectBufferCompressingStream(outBuf, level) { + @Override + protected ByteBuffer flushBuffer(final ByteBuffer toFlush) throws IOException { + toFlush.flip(); + if (toFlush.hasRemaining()) { + pending.addLast(toFlush); // queue for network write + } + // hand a fresh direct buffer back to the compressor + outBuf = ByteBuffer.allocateDirect(outCap); + return outBuf; + } + }; + } + + /** + * Try to write as much of the pending compressed data as the channel accepts. + */ + private boolean flushPending(final DataStreamChannel chan) throws IOException { + while (!pending.isEmpty()) { + final ByteBuffer head = pending.peekFirst(); + while (head.hasRemaining()) { + final int n = chan.write(head); + if (n == 0) { + // back-pressure: ask to be called again + chan.requestOutput(); + return false; + } + } + pending.removeFirst(); // this buffer fully sent + } + return true; + } + + /** + * Compress the bytes in {@code src} (may be heap or direct). + */ + private int compressFrom(final ByteBuffer src) throws IOException { + final int before = src.remaining(); + if (src.isDirect()) { + zstream.compress(src); + } else { + // Stage heap → direct in chunks + while (src.hasRemaining()) { + inDirect.compact(); + final int take = Math.min(inDirect.remaining(), src.remaining()); + final int oldLimit = src.limit(); + src.limit(src.position() + take); + inDirect.put(src); + src.limit(oldLimit); + inDirect.flip(); + zstream.compress(inDirect); + } + } + // The compressor calls flushBuffer() as needed; new buffers are queued in 'pending'. + return before - src.remaining(); + } + + private final class Inner implements DataStreamChannel { + private final DataStreamChannel downstream; + + Inner(final DataStreamChannel downstream) { + this.downstream = downstream; + } + + @Override + public void requestOutput() { + downstream.requestOutput(); + } + + @Override + public int write(final ByteBuffer src) throws IOException { + final int consumed = compressFrom(src); + // Try to flush any buffers the compressor just queued + if (!flushPending(downstream)) { + // Not all data could be written now; ensure we get another callback + downstream.requestOutput(); + } + return consumed; + } + + @Override + public void endStream() { + upstreamEnded = true; + // We will finalize and flush in the outer produce(); make sure it runs again soon. + downstream.requestOutput(); + } + + @Override + public void endStream(final java.util.List trailers) { + endStream(); + } + } + + @Override + public void failed(final Exception cause) { + delegate.failed(cause); + } + + @Override + public void releaseResources() { + if (released.compareAndSet(false, true)) { + try { + try { + if (zstream != null) { + zstream.close(); + } + } catch (final IOException ignore) { + } + } finally { + delegate.releaseResources(); + } + } + } +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/InflatingZstdDataConsumer.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/InflatingZstdDataConsumer.java new file mode 100644 index 0000000000..cd773d712d --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/InflatingZstdDataConsumer.java @@ -0,0 +1,149 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.async.methods; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.github.luben.zstd.ZstdDecompressCtx; + +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.nio.AsyncDataConsumer; +import org.apache.hc.core5.http.nio.CapacityChannel; + +/** + * {@code AsyncDataConsumer} that inflates Zstandard (zstd) content codings. + * + *

This consumer accepts compressed bytes and forwards decompressed data to a downstream + * {@link org.apache.hc.core5.http.nio.AsyncDataConsumer}. It is intended to be installed by the + * client execution chain when a response carries {@code Content-Encoding: zstd}. Applications + * normally do not instantiate it directly—enable content compression (default) and let + * {@code ContentCompressionAsyncExec} wire it automatically.

+ * + *

Behavior

+ *
    + *
  • Streams decompression as data arrives; does not require the full message in memory.
  • + *
  • Updates the downstream with plain bytes; the client removes the original + * {@code Content-Encoding} and related headers.
  • + *
  • On malformed input it throws an {@link java.io.IOException} with a descriptive message.
  • + *
  • {@link #releaseResources()} must be called to free native resources.
  • + *
+ * + *

Typical wiring (automatic)

+ *
{@code
+ * // Enabled by default; when the response has Content-Encoding: zstd
+ * // the execution chain wraps the application consumer:
+ * client = HttpAsyncClients.createDefault();
+ * }
+ * + *

Direct use (advanced)

+ *
{@code
+ * AsyncDataConsumer app = ...; // where you want plain bytes
+ * AsyncDataConsumer zstd = new InflatingZstdDataConsumer(app);
+ * // feed zstd.consume(ByteBuffer) with compressed bytes
+ * }
+ * + * @since 5.6 + */ + +public final class InflatingZstdDataConsumer implements AsyncDataConsumer { + + private static final int IN_BUF = 64 * 1024; + private static final int OUT_BUF = 128 * 1024; // ~ZSTD_DStreamOutSize(); tweak if you like. :contentReference[oaicite:3]{index=3} + + private final AsyncDataConsumer downstream; + private final ZstdDecompressCtx dctx = new ZstdDecompressCtx(); + + private final ByteBuffer inDirect = ByteBuffer.allocateDirect(IN_BUF); + private final ByteBuffer outDirect = ByteBuffer.allocateDirect(OUT_BUF); + + private final AtomicBoolean closed = new AtomicBoolean(false); + + public InflatingZstdDataConsumer(final AsyncDataConsumer downstream) { + this.downstream = downstream; + inDirect.limit(0); + outDirect.limit(0); + } + + @Override + public void updateCapacity(final CapacityChannel c) throws IOException { + downstream.updateCapacity(c); + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + if (closed.get()) { + return; + } + + // Copy any incoming bytes into the direct input buffer, draining as we go. + while (src.hasRemaining()) { + inDirect.compact(); + final int take = Math.min(inDirect.remaining(), src.remaining()); + final int oldLimit = src.limit(); + src.limit(src.position() + take); + inDirect.put(src); + src.limit(oldLimit); + inDirect.flip(); + + // Pull decompressed bytes until we either need more input or downstream back-pressures + while (inDirect.hasRemaining()) { + outDirect.compact(); + // Streaming decompress: fills outDirect from inDirect; returns when either + // input exhausted or output buffer full. + dctx.decompressDirectByteBufferStream(outDirect, inDirect); + outDirect.flip(); + if (outDirect.hasRemaining()) { + downstream.consume(outDirect); + if (outDirect.hasRemaining()) { + // downstream applied back-pressure; stop here, we’ll resume on next callback + return; + } + } else { + break; // need more input + } + } + } + } + + @Override + public void streamEnd(final List trailers) throws HttpException, IOException { + if (closed.compareAndSet(false, true)) { + dctx.close(); + downstream.streamEnd(trailers); + } + } + + @Override + public void releaseResources() { + dctx.close(); + downstream.releaseResources(); + } +} \ No newline at end of file diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/ZstdRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/ZstdRuntime.java new file mode 100644 index 0000000000..caf394af8f --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/ZstdRuntime.java @@ -0,0 +1,69 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.impl; + + +import org.apache.hc.core5.annotation.Contract; +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.annotation.ThreadingBehavior; + +/** + * Utility to detect availability of the Zstandard JNI runtime on the classpath. + *

+ * Used by the async client implementation to conditionally register + * zstd encoders/decoders without creating a hard dependency on {@code zstd-jni}. + *

+ * + *

This class performs a lightweight reflective check and intentionally avoids + * linking to JNI classes at compile time to prevent {@link LinkageError}s when + * the optional dependency is absent.

+ * + * @since 5.6 + */ +@Internal +@Contract(threading = ThreadingBehavior.STATELESS) +public final class ZstdRuntime { + + private static final String ZSTD = "com.github.luben.zstd.Zstd"; + + private ZstdRuntime() { + } + + /** + * @return {@code true} if {@code com.github.luben.zstd.Zstd} can be loaded + * by the current class loader; {@code false} otherwise + */ + public static boolean available() { + try { + Class.forName(ZSTD, false, ZstdRuntime.class.getClassLoader()); + return true; + } catch (ClassNotFoundException | LinkageError ex) { + return false; + } + } +} + diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java index dea6d7d7e3..41e69505c1 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java @@ -27,6 +27,7 @@ package org.apache.hc.client5.http.impl.async; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; @@ -38,7 +39,9 @@ import org.apache.hc.client5.http.async.AsyncExecChainHandler; import org.apache.hc.client5.http.async.methods.InflatingAsyncDataConsumer; import org.apache.hc.client5.http.async.methods.InflatingGzipDataConsumer; +import org.apache.hc.client5.http.async.methods.InflatingZstdDataConsumer; import org.apache.hc.client5.http.entity.compress.ContentCoding; +import org.apache.hc.client5.http.impl.ZstdRuntime; import org.apache.hc.client5.http.impl.ContentCodingSupport; import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.core5.annotation.Contract; @@ -61,6 +64,7 @@ public final class ContentCompressionAsyncExec implements AsyncExecChainHandler { private final Lookup> decoders; + private final List acceptTokens; public ContentCompressionAsyncExec( final LinkedHashMap> decoderMap, @@ -71,6 +75,7 @@ public ContentCompressionAsyncExec( final RegistryBuilder> rb = RegistryBuilder.create(); decoderMap.forEach(rb::register); this.decoders = rb.build(); + this.acceptTokens = new ArrayList<>(decoderMap.keySet()); } /** @@ -78,15 +83,25 @@ public ContentCompressionAsyncExec( */ public ContentCompressionAsyncExec() { final LinkedHashMap> map = new LinkedHashMap<>(); - map.put(ContentCoding.DEFLATE.token(), - d -> new InflatingAsyncDataConsumer(d, null)); + map.put(ContentCoding.DEFLATE.token(), d -> new InflatingAsyncDataConsumer(d, null)); map.put(ContentCoding.GZIP.token(), InflatingGzipDataConsumer::new); map.put(ContentCoding.X_GZIP.token(), InflatingGzipDataConsumer::new); - this.decoders = RegistryBuilder.>create() - .register(ContentCoding.GZIP.token(), map.get(ContentCoding.GZIP.token())) - .register(ContentCoding.X_GZIP.token(), map.get(ContentCoding.X_GZIP.token())) - .register(ContentCoding.DEFLATE.token(), map.get(ContentCoding.DEFLATE.token())) - .build(); + + final RegistryBuilder> rb = + RegistryBuilder.>create() + .register(ContentCoding.GZIP.token(), InflatingGzipDataConsumer::new) + .register(ContentCoding.X_GZIP.token(), InflatingGzipDataConsumer::new) + .register(ContentCoding.DEFLATE.token(), d -> new InflatingAsyncDataConsumer(d, null)); + + // Add zstd only when zstd-jni is present (no reflection needed) + final List tokens = new ArrayList<>(Arrays.asList("gzip", "x-gzip", "deflate")); + if (ZstdRuntime.available()) { + rb.register(ContentCoding.ZSTD.token(), InflatingZstdDataConsumer::new); + tokens.add("zstd"); + } + + this.decoders = rb.build(); + this.acceptTokens = tokens; } @@ -102,8 +117,7 @@ public void execute( final boolean enabled = ctx.getRequestConfigOrDefault().isContentCompressionEnabled(); if (enabled && !request.containsHeader(HttpHeaders.ACCEPT_ENCODING)) { - request.addHeader(MessageSupport.headerOfTokens( - HttpHeaders.ACCEPT_ENCODING, Arrays.asList("gzip", "x-gzip", "deflate"))); + request.addHeader(MessageSupport.headerOfTokens(HttpHeaders.ACCEPT_ENCODING, acceptTokens)); } chain.proceed(request, producer, scope, new AsyncExecCallback() { diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/InflatingZstdDataConsumerTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/InflatingZstdDataConsumerTest.java new file mode 100644 index 0000000000..3c6d85f1de --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/InflatingZstdDataConsumerTest.java @@ -0,0 +1,137 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.async.methods; + + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; + +import com.github.luben.zstd.Zstd; + +import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.nio.AsyncDataConsumer; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.junit.jupiter.api.Test; + + +class InflatingZstdDataConsumerTest { + + // --- 1) zstd-jni compress -> our consumer inflates --- + @Test + void zstdjni_to_consumer_roundtrip() throws Exception { + final byte[] plain = buildPayload('A', 20_000).getBytes(StandardCharsets.UTF_8); + + final byte[] compressed = Zstd.compress(plain); + + final CollectingConsumer sink = new CollectingConsumer(); + final InflatingZstdDataConsumer infl = new InflatingZstdDataConsumer(sink); + + feedInChunks(infl, compressed); + infl.streamEnd(Collections.
emptyList()); + infl.releaseResources(); + + assertArrayEquals(plain, sink.toByteArray()); + } + + // --- 2) Apache Commons Compress -> our consumer inflates --- + @Test + void commons_to_consumer_roundtrip() throws Exception { + final byte[] plain = buildPayload('B', 20_000).getBytes(StandardCharsets.UTF_8); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (final ZstdCompressorOutputStream zout = new ZstdCompressorOutputStream(baos)) { + zout.write(plain); + } + final byte[] compressed = baos.toByteArray(); + + final CollectingConsumer sink = new CollectingConsumer(); + final InflatingZstdDataConsumer infl = new InflatingZstdDataConsumer(sink); + + feedInChunks(infl, compressed); + infl.streamEnd(Collections.
emptyList()); + infl.releaseResources(); + + assertArrayEquals(plain, sink.toByteArray()); + } + + // ---- helpers ---- + + private static void feedInChunks(final InflatingZstdDataConsumer infl, final byte[] data) throws Exception { + int off = 0; + final int[] chunks = new int[]{1, 3, 7, 19, 64, 257, 1024, 4096, 16384, Integer.MAX_VALUE}; + for (final int size : chunks) { + if (off >= data.length) break; + final int n = Math.min(size, data.length - off); + infl.consume(ByteBuffer.wrap(data, off, n)); + off += n; + } + } + + private static String buildPayload(final char ch, final int lines) { + final StringBuilder sb = new StringBuilder(lines * 48); + for (int i = 0; i < lines; i++) { + sb.append(ch).append(i).append(" The quick brown fox jumps over the lazy dog.\n"); + } + return sb.toString(); + } + + /** + * Downstream that collects decompressed bytes. + */ + static final class CollectingConsumer implements AsyncDataConsumer { + private final ByteArrayOutputStream out = new ByteArrayOutputStream(); + + @Override + public void updateCapacity(final CapacityChannel c) { /* no-op */ } + + @Override + public void consume(final ByteBuffer src) { + final int n = src.remaining(); + final byte[] tmp = new byte[n]; + src.get(tmp); + out.write(tmp, 0, n); + } + + @Override + public void streamEnd(final List trailers) { + } + + @Override + public void releaseResources() { + } + + byte[] toByteArray() { + return out.toByteArray(); + } + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientServerZstdExample.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientServerZstdExample.java new file mode 100644 index 0000000000..b0303e95df --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientServerZstdExample.java @@ -0,0 +1,199 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.examples; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Future; + +import org.apache.commons.compress.compressors.CompressorException; +import org.apache.commons.compress.compressors.CompressorInputStream; +import org.apache.commons.compress.compressors.CompressorOutputStream; +import org.apache.commons.compress.compressors.CompressorStreamFactory; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.entity.compress.ContentCoding; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.impl.bootstrap.HttpServer; +import org.apache.hc.core5.http.impl.bootstrap.ServerBootstrap; +import org.apache.hc.core5.http.io.HttpRequestHandler; +import org.apache.hc.core5.http.io.entity.ByteArrayEntity; +import org.apache.hc.core5.http.io.entity.StringEntity; +import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.BasicRequestProducer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.io.CloseMode; + +/** + * Example: end-to-end Zstandard (zstd) response decompression with the async client. + * + *

This example starts a tiny {@code HttpCore 5} classic server that always replies + * with {@code Content-Encoding: zstd}. The async client is the regular + * {@link org.apache.hc.client5.http.impl.async.HttpAsyncClients#createDefault() default} client. + * When the response carries the {@code zstd} content-coding, the execution chain + * (see {@code ContentCompressionAsyncExec}) transparently installs + * {@link org.apache.hc.client5.http.async.methods.InflatingZstdDataConsumer} and delivers + * decoded bytes to the application entity consumer. As a consequence the client removes + * {@code Content-Encoding}, {@code Content-Length} and {@code Content-MD5} headers and the + * application sees plain text.

+ * + *

The embedded server side in this example uses Apache Commons Compress only to produce + * deterministic zstd payloads for demonstration and tests; the client side does not depend + * on it and relies solely on HttpComponents.

+ * + *

What this demonstrates

+ *
    + *
  • How to validate transparent zstd decoding without hitting external endpoints.
  • + *
  • That the async client strips {@code Content-Encoding} on success.
  • + *
  • No special client wiring is required beyond enabling content compression (on by default).
  • + *
+ * + *

Expected output

+ *
{@code
+ * status=200
+ * content-encoding=(stripped by client)
+ * body:
+ * {...decoded JSON/text...}
+ * }
+ * + * @since 5.6 + */ +public final class AsyncClientServerZstdExample { + + public static void main(final String[] args) throws Exception { + final HttpServer server = ServerBootstrap.bootstrap() + .setListenerPort(0) + .setCanonicalHostName("localhost") + .register("/echo", new EchoHandler()) + .create(); + server.start(); + final int port = server.getLocalPort(); + final String url = "http://localhost:" + port + "/echo"; + + try (final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault()) { + client.start(); + + final String requestBody = "Hello Zstandard world!"; + final byte[] reqCompressed = zstdCompress(requestBody.getBytes(StandardCharsets.UTF_8)); + + final SimpleHttpRequest post = SimpleRequestBuilder + .post(url) + .setHeader(HttpHeaders.CONTENT_TYPE, ContentType.TEXT_PLAIN.toString()) + .setHeader(HttpHeaders.CONTENT_ENCODING, ContentCoding.ZSTD.token()) + .build(); + + final Future> f = client.execute( + new BasicRequestProducer(post, + new BasicAsyncEntityProducer(reqCompressed, ContentType.APPLICATION_OCTET_STREAM)), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + null); + + final Message msg = f.get(); + System.out.println("Status : " + msg.getHead().getCode()); + System.out.println("C-E : " + (msg.getHead().getFirstHeader("Content-Encoding") != null + ? msg.getHead().getFirstHeader("Content-Encoding").getValue() + : "(stripped by client)")); + System.out.println("Response: " + msg.getBody()); + } finally { + server.close(CloseMode.GRACEFUL); + } + } + + /** + * Classic echo: decode request C-E:zstd, respond with zstd too. + */ + private static final class EchoHandler implements HttpRequestHandler { + @Override + public void handle(final ClassicHttpRequest request, + final ClassicHttpResponse response, + final HttpContext context) throws IOException { + try { + final HttpEntity entity = request.getEntity(); + final byte[] data; + if (entity != null && ContentCoding.ZSTD.token().equalsIgnoreCase(getCE(entity))) { + try (final InputStream in = entity.getContent(); + final CompressorInputStream zin = new CompressorStreamFactory() + .createCompressorInputStream(ContentCoding.ZSTD.token(), in)) { + data = readAll(zin); + } catch (final CompressorException e) { + response.setCode(HttpStatus.SC_BAD_REQUEST); + response.setEntity(new StringEntity("Bad zstd request", StandardCharsets.UTF_8)); + return; + } + } else { + data = entity != null ? readAll(entity.getContent()) : new byte[0]; + } + + final byte[] z = zstdCompress(data); + response.setCode(HttpStatus.SC_OK); + response.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.TEXT_PLAIN.toString()); + response.addHeader(HttpHeaders.CONTENT_ENCODING, ContentCoding.ZSTD.token()); + response.setEntity(new ByteArrayEntity(z, ContentType.APPLICATION_OCTET_STREAM)); + } catch (final Exception ex) { + response.setCode(HttpStatus.SC_INTERNAL_SERVER_ERROR); + response.setEntity(new StringEntity("Server error", StandardCharsets.UTF_8)); + } + } + + private static String getCE(final HttpEntity entity) { + return entity.getContentEncoding(); + } + + private static byte[] readAll(final InputStream in) throws IOException { + final ByteArrayOutputStream buf = new ByteArrayOutputStream(); + final byte[] tmp = new byte[8192]; + int n; + while ((n = in.read(tmp)) != -1) buf.write(tmp, 0, n); + return buf.toByteArray(); + } + } + + private static byte[] zstdCompress(final byte[] plain) throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (final CompressorOutputStream out = new CompressorStreamFactory() + .createCompressorOutputStream(ContentCoding.ZSTD.token(), baos)) { + out.write(plain); + } catch (final CompressorException e) { + throw new IOException("zstd compressor not available", e); + } + return baos.toByteArray(); + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientZstdCompressionExample.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientZstdCompressionExample.java new file mode 100644 index 0000000000..a1fd1b82cf --- /dev/null +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientZstdCompressionExample.java @@ -0,0 +1,192 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.http.examples; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.Future; + +import org.apache.commons.compress.compressors.CompressorException; +import org.apache.commons.compress.compressors.CompressorStreamFactory; +import org.apache.hc.client5.http.async.methods.DeflatingZstdEntityProducer; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.entity.compress.ContentCoding; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.impl.bootstrap.HttpServer; +import org.apache.hc.core5.http.impl.bootstrap.ServerBootstrap; +import org.apache.hc.core5.http.io.HttpRequestHandler; +import org.apache.hc.core5.http.io.entity.ByteArrayEntity; +import org.apache.hc.core5.http.io.entity.StringEntity; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer; +import org.apache.hc.core5.http.nio.support.BasicRequestProducer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.io.CloseMode; + +/** + * Example: streaming a request body compressed with Zstandard (zstd) using the async client. + * + *

This example runs entirely with HttpComponents. It starts a tiny classic + * {@code HttpCore 5} server on an ephemeral port that decodes requests carrying + * {@code Content-Encoding: zstd} (via Apache Commons Compress) and echoes the plain text. + * The async client builds the request body with + * {@link org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer} and wraps it in + * {@link org.apache.hc.client5.http.async.methods.DeflatingZstdEntityProducer} so the payload + * is compressed on the fly as it is streamed to the wire (no {@code InputStream} + * in the client pipeline).

+ * + *

What it demonstrates

+ *
    + *
  • How to send a POST request whose entity is compressed with zstd while streaming.
  • + *
  • How to advertise the request coding with {@code Content-Encoding: zstd}.
  • + *
  • Chunked transfer with proper back-pressure handling on the client side.
  • + *
+ * + *

Outline

+ *
    + *
  1. Start an embedded classic server that accepts {@code /echo} and, if present, + * decodes {@code Content-Encoding: zstd} before reading the text; it replies with + * plain text.
  2. + *
  3. Create the default async client.
  4. + *
  5. Wrap the plain entity in {@link org.apache.hc.client5.http.async.methods.DeflatingZstdEntityProducer} + * and send it with {@link org.apache.hc.core5.http.nio.support.BasicRequestProducer}.
  6. + *
  7. Read the echoed response as a string (normal, uncompressed text).
  8. + *
+ * + *

Expected output

+ *
{@code
+ * Request : Hello Zstandard request body!
+ * Status  : 200
+ * Response: echo: Hello Zstandard request body!
+ * }
+ * + *

Notes

+ *
    + *
  • The client path is fully NIO and ByteBuffer-based; no {@code InputStream} is used.
  • + *
  • The embedded server in this example uses Apache Commons Compress only to decode + * incoming zstd for demonstration; production servers may decode differently.
  • + *
  • Ensure {@code com.github.luben:zstd-jni} is on the runtime classpath for the client.
  • + *
+ * + * @since 5.6 + */ + +public final class AsyncClientZstdCompressionExample { + + public static void main(final String[] args) throws Exception { + // --- tiny classic server that decodes zstd requests and echoes plain text --- + final HttpServer server = ServerBootstrap.bootstrap() + .setListenerPort(0) + .setCanonicalHostName("localhost") + .register("/echo", new EchoHandler()) + .create(); + server.start(); + final int port = server.getLocalPort(); + final String url = "http://localhost:" + port + "/echo"; + + try (CloseableHttpAsyncClient client = HttpAsyncClients.createDefault()) { + client.start(); + + final String payload = "Hello Zstandard request body!"; + System.out.println("Request : " + payload); + + final StringAsyncEntityProducer plain = new StringAsyncEntityProducer(payload, ContentType.TEXT_PLAIN); + final DeflatingZstdEntityProducer zstd = new DeflatingZstdEntityProducer(plain); + + final SimpleHttpRequest post = SimpleRequestBuilder.post(url) + // header is optional; BasicRequestProducer will use producer metadata too + .setHeader(HttpHeaders.CONTENT_ENCODING, ContentCoding.ZSTD.token()) + .setHeader(HttpHeaders.CONTENT_TYPE, ContentType.TEXT_PLAIN.toString()) + .build(); + + final Future> f = client.execute( + new BasicRequestProducer(post, zstd), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + null); + + final Message msg = f.get(); + System.out.println("Status : " + msg.getHead().getCode()); + System.out.println("Response: " + msg.getBody()); + } finally { + server.close(CloseMode.GRACEFUL); + } + } + + /** + * Classic echo handler that decodes request Content-Encoding: zstd and returns plain text. + */ + private static final class EchoHandler implements HttpRequestHandler { + + @Override + public void handle( + final ClassicHttpRequest request, + final ClassicHttpResponse response, + final HttpContext context) throws IOException { + + try (InputStream in = new CompressorStreamFactory() + .createCompressorInputStream(ContentCoding.ZSTD.token(), request.getEntity().getContent())) { + + final byte[] data = readAll(in); + final String text = new String(data, StandardCharsets.UTF_8); + + response.setCode(HttpStatus.SC_OK); + response.addHeader("Content-Encoding", ContentCoding.ZSTD.token()); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (final OutputStream zstdOut = new CompressorStreamFactory() + .createCompressorOutputStream("zstd", baos)) { + zstdOut.write(text.getBytes(StandardCharsets.UTF_8)); + } + response.setEntity(new ByteArrayEntity(baos.toByteArray(), ContentType.TEXT_PLAIN)); + } catch (final CompressorException ex) { + response.setCode(HttpStatus.SC_BAD_REQUEST); + response.setEntity(new StringEntity("Unable to process compressed payload", StandardCharsets.UTF_8)); + } + } + + private static byte[] readAll(final InputStream in) throws IOException { + final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + final byte[] buf = new byte[8192]; + int n; + while ((n = in.read(buf)) != -1) bos.write(buf, 0, n); + return bos.toByteArray(); + } + } +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestContentCompressionAsyncExec.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestContentCompressionAsyncExec.java index 730d70402f..690bc07c10 100644 --- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestContentCompressionAsyncExec.java +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestContentCompressionAsyncExec.java @@ -116,7 +116,7 @@ void testAcceptEncodingAdded() throws Exception { final HttpRequest request = new BasicHttpRequest(Method.GET, "/path"); executeAndCapture(request); assertTrue(request.containsHeader(HttpHeaders.ACCEPT_ENCODING)); - assertEquals("gzip, x-gzip, deflate", request.getFirstHeader(HttpHeaders.ACCEPT_ENCODING).getValue()); + assertEquals("gzip, x-gzip, deflate, zstd", request.getFirstHeader(HttpHeaders.ACCEPT_ENCODING).getValue()); } @Test