@@ -59,8 +59,9 @@ public final class BasicClientExchangeHandler<T> implements AsyncClientExchangeH
5959 private final AsyncRequestProducer requestProducer ;
6060 private final AsyncResponseConsumer <T > responseConsumer ;
6161 private final AtomicBoolean completed ;
62- private final FutureCallback <T > resultCallback ;
6362 private final AtomicBoolean outputTerminated ;
63+ private final AtomicBoolean inputTerminated ;
64+ private final FutureCallback <T > resultCallback ;
6465
6566 public BasicClientExchangeHandler (
6667 final AsyncRequestProducer requestProducer ,
@@ -71,6 +72,7 @@ public BasicClientExchangeHandler(
7172 this .completed = new AtomicBoolean ();
7273 this .resultCallback = resultCallback ;
7374 this .outputTerminated = new AtomicBoolean ();
75+ this .inputTerminated = new AtomicBoolean ();
7476 }
7577
7678 @ Override
@@ -100,64 +102,31 @@ public void consumeInformation(final HttpResponse response, final HttpContext ht
100102 @ Override
101103 public void consumeResponse (final HttpResponse response , final EntityDetails entityDetails , final HttpContext httpContext ) throws HttpException , IOException {
102104 if (response .getCode () >= HttpStatus .SC_CLIENT_ERROR ) {
103- outputTerminated .set (true );
104- requestProducer .releaseResources ();
105+ releaseRequestProducer ();
105106 }
106107 responseConsumer .consumeResponse (response , entityDetails , httpContext , new FutureCallback <T >() {
107108
108109 @ Override
109110 public void completed (final T result ) {
110- if (completed .compareAndSet (false , true )) {
111- try {
112- if (resultCallback != null ) {
113- resultCallback .completed (result );
114- }
115- } finally {
116- internalReleaseResources ();
117- }
118- }
111+ completedInternal (result );
119112 }
120113
121114 @ Override
122115 public void failed (final Exception ex ) {
123- if (completed .compareAndSet (false , true )) {
124- try {
125- if (resultCallback != null ) {
126- resultCallback .failed (ex );
127- }
128- } finally {
129- internalReleaseResources ();
130- }
131- }
116+ failedInternal (ex );
132117 }
133118
134119 @ Override
135120 public void cancelled () {
136- if (completed .compareAndSet (false , true )) {
137- try {
138- if (resultCallback != null ) {
139- resultCallback .cancelled ();
140- }
141- } finally {
142- internalReleaseResources ();
143- }
144- }
121+ cancelledInternal ();
145122 }
146123
147124 });
148125 }
149126
150127 @ Override
151128 public void cancel () {
152- if (completed .compareAndSet (false , true )) {
153- try {
154- if (resultCallback != null ) {
155- resultCallback .cancelled ();
156- }
157- } finally {
158- internalReleaseResources ();
159- }
160- }
129+ cancelledInternal ();
161130 }
162131
163132 @ Override
@@ -178,28 +147,77 @@ public void streamEnd(final List<? extends Header> trailers) throws HttpExceptio
178147 @ Override
179148 public void failed (final Exception cause ) {
180149 try {
181- requestProducer .failed (cause );
182- responseConsumer .failed (cause );
150+ if (inputTerminated .get ()) {
151+ responseConsumer .failed (cause );
152+ }
153+ if (!outputTerminated .get ()) {
154+ requestProducer .failed (cause );
155+ }
183156 } finally {
184- if (completed .compareAndSet (false , true )) {
185- try {
186- if (resultCallback != null ) {
187- resultCallback .failed (cause );
188- }
189- } finally {
190- internalReleaseResources ();
157+ failedInternal (cause );
158+ }
159+ }
160+
161+ private void completedInternal (final T result ) {
162+ if (completed .compareAndSet (false , true )) {
163+ try {
164+ if (resultCallback != null ) {
165+ resultCallback .completed (result );
191166 }
167+ } finally {
168+ releaseResourcesInternal ();
192169 }
193170 }
194171 }
195172
196- private void internalReleaseResources () {
197- requestProducer .releaseResources ();
198- responseConsumer .releaseResources ();
173+ private void failedInternal (final Exception ex ) {
174+ if (completed .compareAndSet (false , true )) {
175+ try {
176+ if (resultCallback != null ) {
177+ resultCallback .failed (ex );
178+ }
179+ } finally {
180+ releaseResourcesInternal ();
181+ }
182+ }
183+ }
184+
185+ private void cancelledInternal () {
186+ if (completed .compareAndSet (false , true )) {
187+ try {
188+ if (resultCallback != null ) {
189+ resultCallback .cancelled ();
190+ }
191+ } finally {
192+ releaseResourcesInternal ();
193+ }
194+ }
195+ }
196+
197+ private void releaseResponseConsumer () {
198+ if (inputTerminated .compareAndSet (false , true )) {
199+ responseConsumer .releaseResources ();
200+ }
201+ }
202+
203+ private void releaseRequestProducer () {
204+ if (outputTerminated .compareAndSet (false , true )) {
205+ requestProducer .releaseResources ();
206+ }
207+ }
208+
209+ private void releaseResourcesInternal () {
210+ releaseRequestProducer ();
211+ releaseResponseConsumer ();
199212 }
200213
201214 @ Override
202215 public void releaseResources () {
216+ // Note even though the message exchange has been fully
217+ // completed on the transport level, the response
218+ // consumer may still be busy consuming and digesting
219+ // the response message
220+ releaseRequestProducer ();
203221 }
204222
205223}
0 commit comments