1818
1919import io .netty .buffer .ByteBuf ;
2020import io .rsocket .internal .jctools .queues .MpscUnboundedArrayQueue ;
21+ import java .lang .reflect .Method ;
2122import java .util .Objects ;
2223import java .util .Queue ;
2324import java .util .concurrent .CancellationException ;
@@ -97,6 +98,8 @@ public final class UnboundedProcessor extends Flux<ByteBuf>
9798
9899 boolean outputFused ;
99100
101+ @ Nullable private static final Method THREAD_ID_METHOD = resolveThreadIdMethod ();
102+
100103 public UnboundedProcessor () {
101104 this (() -> {});
102105 }
@@ -141,7 +144,7 @@ public boolean tryEmitPrioritized(ByteBuf t) {
141144 }
142145
143146 if (!this .priorityQueue .offer (t )) {
144- onError (Operators .onOperatorError (null , Exceptions .failWithOverflow (), t , currentContext ()));
147+ tryEmitError (Operators .onOperatorError (null , Exceptions .failWithOverflow (), t , currentContext ()));
145148 release (t );
146149 return false ;
147150 }
@@ -177,7 +180,7 @@ public boolean tryEmitNormal(ByteBuf t) {
177180 }
178181
179182 if (!this .queue .offer (t )) {
180- onError (Operators .onOperatorError (null , Exceptions .failWithOverflow (), t , currentContext ()));
183+ tryEmitError (Operators .onOperatorError (null , Exceptions .failWithOverflow (), t , currentContext ()));
181184 release (t );
182185 return false ;
183186 }
@@ -207,126 +210,140 @@ public boolean tryEmitNormal(ByteBuf t) {
207210 return true ;
208211 }
209212
210- public boolean tryEmitFinal (ByteBuf t ) {
213+ public boolean tryEmitError (Throwable t ) {
214+ Objects .requireNonNull (t , "throwable must not be null" );
215+
211216 if (this .done || this .cancelled ) {
212- release ( t );
217+ Operators . onErrorDropped ( t , currentContext () );
213218 return false ;
214219 }
215220
216- this .last = t ;
221+ this .error = t ;
217222 this .done = true ;
218223
219- final long previousState = markValueAddedAndTerminated (this );
220- if (isFinalized (previousState )) {
221- this .clearSafely ();
224+ final long previousState = markTerminatedOrFinalized (this );
225+ if (isFinalized (previousState )
226+ || isDisposed (previousState )
227+ || isCancelled (previousState )
228+ || isTerminated (previousState )) {
229+ Operators .onErrorDropped (t , currentContext ());
222230 return false ;
223231 }
224232
225233 if (isSubscriberReady (previousState )) {
226234 if (this .outputFused ) {
227- // fast path for fusion
228- this .actual .onNext (null );
229- this .actual .onComplete ();
235+ // fast path for fusion scenario
236+ this .actual .onError (t );
230237 return true ;
231238 }
232239
233240 if (isWorkInProgress (previousState )) {
234241 return true ;
235242 }
236243
237- drainRegular ((previousState | FLAG_TERMINATED | FLAG_HAS_VALUE ) + 1 );
244+ if (!hasValue (previousState )) {
245+ // fast path no-values scenario
246+ this .actual .onError (t );
247+ return true ;
248+ }
249+
250+ drainRegular ((previousState | FLAG_TERMINATED ) + 1 );
238251 }
239252
240253 return true ;
241254 }
242255
243- @ Deprecated
244- public void onNextPrioritized (ByteBuf t ) {
245- tryEmitPrioritized (t );
246- }
247-
248- @ Override
249- @ Deprecated
250- public void onNext (ByteBuf t ) {
251- tryEmitNormal (t );
252- }
253-
254- @ Override
255- @ Deprecated
256- public void onError (Throwable t ) {
256+ public boolean tryEmitComplete () {
257257 if (this .done || this .cancelled ) {
258- Operators .onErrorDropped (t , currentContext ());
259- return ;
258+ return false ;
260259 }
261260
262- this .error = t ;
263261 this .done = true ;
264262
265263 final long previousState = markTerminatedOrFinalized (this );
266264 if (isFinalized (previousState )
267265 || isDisposed (previousState )
268266 || isCancelled (previousState )
269267 || isTerminated (previousState )) {
270- Operators .onErrorDropped (t , currentContext ());
271- return ;
268+ return false ;
272269 }
273270
274271 if (isSubscriberReady (previousState )) {
275272 if (this .outputFused ) {
276273 // fast path for fusion scenario
277- this .actual .onError ( t );
278- return ;
274+ this .actual .onComplete ( );
275+ return true ;
279276 }
280277
281278 if (isWorkInProgress (previousState )) {
282- return ;
279+ return true ;
283280 }
284281
285282 if (!hasValue (previousState )) {
286- // fast path no-values scenario
287- this .actual .onError (t );
288- return ;
283+ this .actual .onComplete ();
284+ return true ;
289285 }
290286
291287 drainRegular ((previousState | FLAG_TERMINATED ) + 1 );
292288 }
289+
290+ return true ;
293291 }
294292
295- @ Override
296- @ Deprecated
297- public void onComplete () {
293+ public boolean tryEmitFinal (ByteBuf t ) {
298294 if (this .done || this .cancelled ) {
299- return ;
295+ release (t );
296+ return false ;
300297 }
301298
299+ this .last = t ;
302300 this .done = true ;
303301
304- final long previousState = markTerminatedOrFinalized (this );
305- if (isFinalized (previousState )
306- || isDisposed (previousState )
307- || isCancelled (previousState )
308- || isTerminated (previousState )) {
309- return ;
302+ final long previousState = markValueAddedAndTerminated (this );
303+ if (isFinalized (previousState )) {
304+ this .clearSafely ();
305+ return false ;
310306 }
311307
312308 if (isSubscriberReady (previousState )) {
313309 if (this .outputFused ) {
314- // fast path for fusion scenario
310+ // fast path for fusion
311+ this .actual .onNext (null );
315312 this .actual .onComplete ();
316- return ;
313+ return true ;
317314 }
318315
319316 if (isWorkInProgress (previousState )) {
320- return ;
321- }
322-
323- if (!hasValue (previousState )) {
324- this .actual .onComplete ();
325- return ;
317+ return true ;
326318 }
327319
328- drainRegular ((previousState | FLAG_TERMINATED ) + 1 );
320+ drainRegular ((previousState | FLAG_TERMINATED | FLAG_HAS_VALUE ) + 1 );
329321 }
322+
323+ return true ;
324+ }
325+
326+ @ Deprecated
327+ public void onNextPrioritized (ByteBuf t ) {
328+ tryEmitPrioritized (t );
329+ }
330+
331+ @ Override
332+ @ Deprecated
333+ public void onNext (ByteBuf t ) {
334+ tryEmitNormal (t );
335+ }
336+
337+ @ Override
338+ @ Deprecated
339+ public void onError (Throwable t ) {
340+ tryEmitError (t );
341+ }
342+
343+ @ Override
344+ @ Deprecated
345+ public void onComplete () {
346+ tryEmitComplete ();
330347 }
331348
332349 void drainRegular (long expectedState ) {
@@ -1091,7 +1108,7 @@ static void log(
10911108 instance ,
10921109 action ,
10931110 action ,
1094- Thread . currentThread (). getId (),
1111+ currentThreadId (),
10951112 formatState (initialState , 64 ),
10961113 formatState (committedState , 64 )),
10971114 new RuntimeException ());
@@ -1101,7 +1118,7 @@ static void log(
11011118 "[%s][%s][%s][%s-%s]" ,
11021119 instance ,
11031120 action ,
1104- Thread . currentThread (). getId (),
1121+ currentThreadId (),
11051122 formatState (initialState , 64 ),
11061123 formatState (committedState , 64 )));
11071124 }
@@ -1130,7 +1147,7 @@ static void log(
11301147 instance ,
11311148 action ,
11321149 action ,
1133- Thread . currentThread (). getId (),
1150+ currentThreadId (),
11341151 formatState (initialState , 32 ),
11351152 formatState (committedState , 32 )),
11361153 new RuntimeException ());
@@ -1140,12 +1157,39 @@ static void log(
11401157 "[%s][%s][%s][%s-%s]" ,
11411158 instance ,
11421159 action ,
1143- Thread . currentThread (). getId (),
1160+ currentThreadId (),
11441161 formatState (initialState , 32 ),
11451162 formatState (committedState , 32 )));
11461163 }
11471164 }
11481165
1166+ private static long currentThreadId () {
1167+ Thread currentThread = Thread .currentThread ();
1168+ if (THREAD_ID_METHOD != null ) {
1169+ try {
1170+ return (long ) THREAD_ID_METHOD .invoke (currentThread );
1171+ } catch (Throwable ignore ) {
1172+ // Fallback to the legacy method below for older runtimes or reflective failures.
1173+ }
1174+ }
1175+
1176+ return legacyThreadId (currentThread );
1177+ }
1178+
1179+ @ Nullable
1180+ private static Method resolveThreadIdMethod () {
1181+ try {
1182+ return Thread .class .getMethod ("threadId" );
1183+ } catch (NoSuchMethodException e ) {
1184+ return null ;
1185+ }
1186+ }
1187+
1188+ @ SuppressWarnings ("deprecation" )
1189+ private static long legacyThreadId (Thread thread ) {
1190+ return thread .getId ();
1191+ }
1192+
11491193 static String formatState (long state , int size ) {
11501194 final String defaultFormat = Long .toBinaryString (state );
11511195 final StringBuilder formatted = new StringBuilder ();
0 commit comments