2929
3030import java .io .IOException ;
3131import java .net .InetSocketAddress ;
32+ import java .nio .ByteBuffer ;
33+ import java .util .List ;
3234import java .util .Set ;
3335import java .util .concurrent .Future ;
3436import java .util .concurrent .RejectedExecutionException ;
4345import org .apache .hc .core5 .function .Decorator ;
4446import org .apache .hc .core5 .function .Resolver ;
4547import org .apache .hc .core5 .http .ConnectionClosedException ;
48+ import org .apache .hc .core5 .http .EntityDetails ;
49+ import org .apache .hc .core5 .http .Header ;
4650import org .apache .hc .core5 .http .HttpException ;
4751import org .apache .hc .core5 .http .HttpHost ;
52+ import org .apache .hc .core5 .http .HttpResponse ;
4853import org .apache .hc .core5 .http .impl .DefaultAddressResolver ;
4954import org .apache .hc .core5 .http .impl .bootstrap .AsyncRequester ;
5055import org .apache .hc .core5 .http .nio .AsyncClientExchangeHandler ;
5156import org .apache .hc .core5 .http .nio .AsyncPushConsumer ;
5257import org .apache .hc .core5 .http .nio .AsyncRequestProducer ;
5358import org .apache .hc .core5 .http .nio .AsyncResponseConsumer ;
59+ import org .apache .hc .core5 .http .nio .CapacityChannel ;
60+ import org .apache .hc .core5 .http .nio .DataStreamChannel ;
5461import org .apache .hc .core5 .http .nio .HandlerFactory ;
62+ import org .apache .hc .core5 .http .nio .RequestChannel ;
5563import org .apache .hc .core5 .http .nio .command .RequestExecutionCommand ;
5664import org .apache .hc .core5 .http .nio .command .ShutdownCommand ;
5765import org .apache .hc .core5 .http .nio .ssl .TlsStrategy ;
@@ -183,14 +191,69 @@ private void execute(
183191 if (request .getAuthority () == null ) {
184192 request .setAuthority (new URIAuthority (host ));
185193 }
186- if (request .getScheme () == null ) {
187- request .setScheme (host .getSchemeName ());
188- }
189194 connPool .getSession (host , timeout , new FutureCallback <IOSession >() {
190195
191196 @ Override
192197 public void completed (final IOSession ioSession ) {
198+ final AsyncClientExchangeHandler handlerProxy = new AsyncClientExchangeHandler () {
199+
200+ @ Override
201+ public void releaseResources () {
202+ exchangeHandler .releaseResources ();
203+ }
204+
205+ @ Override
206+ public void produceRequest (final RequestChannel channel , final HttpContext httpContext ) throws HttpException , IOException {
207+ channel .sendRequest (request , entityDetails , httpContext );
208+ }
209+
210+ @ Override
211+ public int available () {
212+ return exchangeHandler .available ();
213+ }
214+
215+ @ Override
216+ public void produce (final DataStreamChannel channel ) throws IOException {
217+ exchangeHandler .produce (channel );
218+ }
219+
220+ @ Override
221+ public void consumeInformation (final HttpResponse response , final HttpContext httpContext ) throws HttpException , IOException {
222+ exchangeHandler .consumeInformation (response , httpContext );
223+ }
224+
225+ @ Override
226+ public void consumeResponse (
227+ final HttpResponse response , final EntityDetails entityDetails , final HttpContext httpContext ) throws HttpException , IOException {
228+ exchangeHandler .consumeResponse (response , entityDetails , httpContext );
229+ }
230+
231+ @ Override
232+ public void updateCapacity (final CapacityChannel capacityChannel ) throws IOException {
233+ exchangeHandler .updateCapacity (capacityChannel );
234+ }
235+
236+ @ Override
237+ public void consume (final ByteBuffer src ) throws IOException {
238+ exchangeHandler .consume (src );
239+ }
240+
241+ @ Override
242+ public void streamEnd (final List <? extends Header > trailers ) throws HttpException , IOException {
243+ exchangeHandler .streamEnd (trailers );
244+ }
193245
246+ @ Override
247+ public void cancel () {
248+ exchangeHandler .cancel ();
249+ }
250+
251+ @ Override
252+ public void failed (final Exception cause ) {
253+ exchangeHandler .failed (cause );
254+ }
255+
256+ };
194257 final int max = maxRequestsPerConnection ;
195258 if (max > 0 ) {
196259 final int current = ioSession .getPendingCommandCount ();
@@ -201,24 +264,20 @@ public void completed(final IOSession ioSession) {
201264 return ;
202265 }
203266 }
204-
205267 final Timeout socketTimeout = ioSession .getSocketTimeout ();
206- final RequestExecutionCommand command = new RequestExecutionCommand (
207- exchangeHandler ,
208- pushHandlerFactory ,
209- context ,
210- streamControl -> {
211- cancellableDependency .setDependency (streamControl );
212- if (socketTimeout != null ) {
213- streamControl .setTimeout (socketTimeout );
214- }
215- });
216-
217- ioSession .enqueue (command , Command .Priority .NORMAL );
218-
268+ ioSession .enqueue (new RequestExecutionCommand (
269+ handlerProxy ,
270+ pushHandlerFactory ,
271+ context ,
272+ streamControl -> {
273+ cancellableDependency .setDependency (streamControl );
274+ if (socketTimeout != null ) {
275+ streamControl .setTimeout (socketTimeout );
276+ }
277+ }),
278+ Command .Priority .NORMAL );
219279 if (!ioSession .isOpen ()) {
220280 exchangeHandler .failed (new ConnectionClosedException ());
221- exchangeHandler .releaseResources ();
222281 }
223282 }
224283
@@ -309,34 +368,4 @@ public final <T> Future<T> execute(
309368 public H2ConnPool getConnPool () {
310369 return connPool ;
311370 }
312-
313- /**
314- * Cancellable that can be wired to the stream control once it becomes available.
315- */
316- private static final class CancellableExecution implements Cancellable , CancellableDependency {
317-
318- private volatile Cancellable dependency ;
319-
320- @ Override
321- public void setDependency (final Cancellable dependency ) {
322- this .dependency = dependency ;
323- }
324-
325- @ Override
326- public boolean isCancelled () {
327- return false ;
328- }
329-
330- @ Override
331- public boolean cancel () {
332- final Cancellable local = this .dependency ;
333- if (local != null ) {
334- local .cancel ();
335- return true ;
336- }
337- return false ;
338- }
339-
340- }
341-
342371}
0 commit comments