3030import java .nio .ByteBuffer ;
3131import java .util .List ;
3232import java .util .concurrent .atomic .AtomicBoolean ;
33+ import java .util .concurrent .atomic .AtomicReference ;
3334
3435import org .apache .hc .core5 .http .EntityDetails ;
3536import org .apache .hc .core5 .http .Header ;
@@ -65,13 +66,12 @@ class ClientH2StreamHandler implements H2StreamHandler {
6566 private final AsyncClientExchangeHandler exchangeHandler ;
6667 private final HandlerFactory <AsyncPushConsumer > pushHandlerFactory ;
6768 private final HttpCoreContext context ;
69+ private final AtomicReference <MessageState > requestState ;
70+ private final AtomicReference <MessageState > responseState ;
6871 private final AtomicBoolean requestCommitted ;
6972 private final AtomicBoolean failed ;
7073 private final AtomicBoolean done ;
7174
72- private volatile MessageState requestState ;
73- private volatile MessageState responseState ;
74-
7575 ClientH2StreamHandler (
7676 final H2StreamChannel outputChannel ,
7777 final HttpProcessor httpProcessor ,
@@ -95,13 +95,13 @@ public int write(final ByteBuffer src) throws IOException {
9595 @ Override
9696 public void endStream (final List <? extends Header > trailers ) throws IOException {
9797 outputChannel .endStream (trailers );
98- requestState = MessageState .COMPLETE ;
98+ requestState . set ( MessageState .COMPLETE ) ;
9999 }
100100
101101 @ Override
102102 public void endStream () throws IOException {
103103 outputChannel .endStream ();
104- requestState = MessageState .COMPLETE ;
104+ requestState . set ( MessageState .COMPLETE ) ;
105105 }
106106
107107 };
@@ -113,8 +113,8 @@ public void endStream() throws IOException {
113113 this .requestCommitted = new AtomicBoolean ();
114114 this .failed = new AtomicBoolean ();
115115 this .done = new AtomicBoolean ();
116- this .requestState = MessageState .HEADERS ;
117- this .responseState = MessageState .HEADERS ;
116+ this .requestState = new AtomicReference <>( MessageState .HEADERS ) ;
117+ this .responseState = new AtomicReference <>( MessageState .HEADERS ) ;
118118 }
119119
120120 @ Override
@@ -124,7 +124,7 @@ public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
124124
125125 @ Override
126126 public boolean isOutputReady () {
127- switch (requestState ) {
127+ switch (requestState . get () ) {
128128 case HEADERS :
129129 return true ;
130130 case BODY :
@@ -146,14 +146,14 @@ private void commitRequest(final HttpRequest request, final EntityDetails entity
146146 connMetrics .incrementRequestCount ();
147147
148148 if (entityDetails == null ) {
149- requestState = MessageState .COMPLETE ;
149+ requestState . set ( MessageState .COMPLETE ) ;
150150 } else {
151151 final Header h = request .getFirstHeader (HttpHeaders .EXPECT );
152152 final boolean expectContinue = h != null && HeaderElements .CONTINUE .equalsIgnoreCase (h .getValue ());
153153 if (expectContinue ) {
154- requestState = MessageState .ACK ;
154+ requestState . set ( MessageState .ACK ) ;
155155 } else {
156- requestState = MessageState .BODY ;
156+ requestState . set ( MessageState .BODY ) ;
157157 exchangeHandler .produce (dataChannel );
158158 }
159159 }
@@ -164,7 +164,7 @@ private void commitRequest(final HttpRequest request, final EntityDetails entity
164164
165165 @ Override
166166 public void produceOutput () throws HttpException , IOException {
167- switch (requestState ) {
167+ switch (requestState . get () ) {
168168 case HEADERS :
169169 exchangeHandler .produceRequest ((request , entityDetails , httpContext ) -> commitRequest (request , entityDetails ), context );
170170 break ;
@@ -184,7 +184,7 @@ public void consumeHeader(final List<Header> headers, final boolean endStream) t
184184 if (done .get ()) {
185185 throw new ProtocolException ("Unexpected message headers" );
186186 }
187- switch (responseState ) {
187+ switch (responseState . get () ) {
188188 case HEADERS :
189189 final HttpResponse response = DefaultH2ResponseConverter .INSTANCE .convert (headers );
190190 final int status = response .getCode ();
@@ -194,9 +194,9 @@ public void consumeHeader(final List<Header> headers, final boolean endStream) t
194194 if (status > HttpStatus .SC_CONTINUE && status < HttpStatus .SC_SUCCESS ) {
195195 exchangeHandler .consumeInformation (response , context );
196196 }
197- if (requestState == MessageState .ACK ) {
197+ if (requestState . get () == MessageState .ACK ) {
198198 if (status == HttpStatus .SC_CONTINUE || status >= HttpStatus .SC_SUCCESS ) {
199- requestState = MessageState .BODY ;
199+ requestState . set ( MessageState .BODY ) ;
200200 exchangeHandler .produce (dataChannel );
201201 }
202202 }
@@ -210,10 +210,10 @@ public void consumeHeader(final List<Header> headers, final boolean endStream) t
210210 connMetrics .incrementResponseCount ();
211211
212212 exchangeHandler .consumeResponse (response , entityDetails , context );
213- responseState = endStream ? MessageState .COMPLETE : MessageState .BODY ;
213+ responseState . set ( endStream ? MessageState .COMPLETE : MessageState .BODY ) ;
214214 break ;
215215 case BODY :
216- responseState = MessageState .COMPLETE ;
216+ responseState . set ( MessageState .COMPLETE ) ;
217217 exchangeHandler .streamEnd (headers );
218218 break ;
219219 default :
@@ -228,14 +228,14 @@ public void updateInputCapacity() throws IOException {
228228
229229 @ Override
230230 public void consumeData (final ByteBuffer src , final boolean endStream ) throws HttpException , IOException {
231- if (done .get () || responseState != MessageState .BODY ) {
231+ if (done .get () || responseState . get () != MessageState .BODY ) {
232232 throw new ProtocolException ("Unexpected message data" );
233233 }
234234 if (src != null ) {
235235 exchangeHandler .consume (src );
236236 }
237237 if (endStream ) {
238- responseState = MessageState .COMPLETE ;
238+ responseState . set ( MessageState .COMPLETE ) ;
239239 exchangeHandler .streamEnd (null );
240240 }
241241 }
@@ -261,17 +261,17 @@ public void failed(final Exception cause) {
261261 @ Override
262262 public void releaseResources () {
263263 if (done .compareAndSet (false , true )) {
264- responseState = MessageState .COMPLETE ;
265- requestState = MessageState .COMPLETE ;
264+ responseState . set ( MessageState .COMPLETE ) ;
265+ requestState . set ( MessageState .COMPLETE ) ;
266266 exchangeHandler .releaseResources ();
267267 }
268268 }
269269
270270 @ Override
271271 public String toString () {
272272 return "[" +
273- "requestState=" + requestState +
274- ", responseState=" + responseState +
273+ "requestState=" + requestState . get () +
274+ ", responseState=" + responseState . get () +
275275 ']' ;
276276 }
277277
0 commit comments