2121import java .util .List ;
2222import java .util .Map ;
2323
24+ import java .util .Collection ;
2425import java .util .concurrent .BlockingQueue ;
2526import java .util .concurrent .ConcurrentHashMap ;
2627import java .util .concurrent .LinkedBlockingQueue ;
3132import org .ros2 .rcljava .RCLJava ;
3233import org .ros2 .rcljava .client .Client ;
3334import org .ros2 .rcljava .common .JNIUtils ;
35+ import org .ros2 .rcljava .events .EventHandler ;
3436import org .ros2 .rcljava .executors .AnyExecutable ;
3537import org .ros2 .rcljava .executors .Executor ;
3638import org .ros2 .rcljava .interfaces .MessageDefinition ;
3739import org .ros2 .rcljava .interfaces .ServiceDefinition ;
3840import org .ros2 .rcljava .node .ComposableNode ;
41+ import org .ros2 .rcljava .publisher .Publisher ;
3942import org .ros2 .rcljava .service .RMWRequestId ;
4043import org .ros2 .rcljava .service .Service ;
4144import org .ros2 .rcljava .subscription .Subscription ;
@@ -64,6 +67,8 @@ public class BaseExecutor {
6467
6568 private List <Map .Entry <Long , Client >> clientHandles = new ArrayList <Map .Entry <Long , Client >>();
6669
70+ private List <Map .Entry <Long , EventHandler >> eventHandles = new ArrayList <Map .Entry <Long , EventHandler >>();
71+
6772 protected void addNode (ComposableNode node ) {
6873 this .nodes .add (node );
6974 }
@@ -158,20 +163,34 @@ protected void executeAnyExecutable(AnyExecutable anyExecutable) {
158163 }
159164 clientHandles .remove (anyExecutable .client .getHandle ());
160165 }
166+
167+ if (anyExecutable .eventHandler != null ) {
168+ anyExecutable .eventHandler .executeCallback ();
169+ eventHandles .remove (anyExecutable .eventHandler .getHandle ());
170+ }
161171 }
162172
163173 protected void waitForWork (long timeout ) {
164174 this .subscriptionHandles .clear ();
165175 this .timerHandles .clear ();
166176 this .serviceHandles .clear ();
167177 this .clientHandles .clear ();
178+ this .eventHandles .clear ();
168179
169180 for (ComposableNode node : this .nodes ) {
170181 for (Subscription <MessageDefinition > subscription : node .getNode ().getSubscriptions ()) {
171182 this .subscriptionHandles .add (new AbstractMap .SimpleEntry <Long , Subscription >(
172183 subscription .getHandle (), subscription ));
173184 }
174185
186+ for (Publisher publisher : node .getNode ().getPublishers ()) {
187+ Collection <EventHandler > eventHandlers = publisher .getEventHandlers ();
188+ for (EventHandler eventHandler : eventHandlers ) {
189+ this .eventHandles .add (new AbstractMap .SimpleEntry <Long , EventHandler >(
190+ eventHandler .getHandle (), eventHandler ));
191+ }
192+ }
193+
175194 for (Timer timer : node .getNode ().getTimers ()) {
176195 this .timerHandles .add (new AbstractMap .SimpleEntry <Long , Timer >(timer .getHandle (), timer ));
177196 }
@@ -191,6 +210,7 @@ protected void waitForWork(long timeout) {
191210 int timersSize = 0 ;
192211 int clientsSize = 0 ;
193212 int servicesSize = 0 ;
213+ int eventsSize = this .eventHandles .size ();
194214
195215 for (ComposableNode node : this .nodes ) {
196216 subscriptionsSize += node .getNode ().getSubscriptions ().size ();
@@ -205,7 +225,9 @@ protected void waitForWork(long timeout) {
205225
206226 long waitSetHandle = nativeGetZeroInitializedWaitSet ();
207227 long contextHandle = RCLJava .getDefaultContext ().getHandle ();
208- nativeWaitSetInit (waitSetHandle , contextHandle , subscriptionsSize , 0 , timersSize , clientsSize , servicesSize , 0 );
228+ nativeWaitSetInit (
229+ waitSetHandle , contextHandle , subscriptionsSize , 0 ,
230+ timersSize , clientsSize , servicesSize , eventsSize );
209231
210232 nativeWaitSetClear (waitSetHandle );
211233
@@ -225,6 +247,10 @@ protected void waitForWork(long timeout) {
225247 nativeWaitSetAddClient (waitSetHandle , entry .getKey ());
226248 }
227249
250+ for (Map .Entry <Long , EventHandler > entry : this .eventHandles ) {
251+ nativeWaitSetAddEvent (waitSetHandle , entry .getKey ());
252+ }
253+
228254 nativeWait (waitSetHandle , timeout );
229255
230256 for (int i = 0 ; i < this .subscriptionHandles .size (); ++i ) {
@@ -251,6 +277,12 @@ protected void waitForWork(long timeout) {
251277 }
252278 }
253279
280+ for (int i = 0 ; i < this .eventHandles .size (); ++i ) {
281+ if (!nativeWaitSetEventIsReady (waitSetHandle , i )) {
282+ this .eventHandles .get (i ).setValue (null );
283+ }
284+ }
285+
254286 Iterator <Map .Entry <Long , Subscription >> subscriptionIterator =
255287 this .subscriptionHandles .iterator ();
256288 while (subscriptionIterator .hasNext ()) {
@@ -284,6 +316,14 @@ protected void waitForWork(long timeout) {
284316 }
285317 }
286318
319+ Iterator <Map .Entry <Long , EventHandler >> eventIterator = this .eventHandles .iterator ();
320+ while (eventIterator .hasNext ()) {
321+ Map .Entry <Long , EventHandler > entry = eventIterator .next ();
322+ if (entry .getValue () == null ) {
323+ eventIterator .remove ();
324+ }
325+ }
326+
287327 nativeDisposeWaitSet (waitSetHandle );
288328 }
289329
@@ -325,6 +365,14 @@ protected AnyExecutable getNextExecutable() {
325365 }
326366 }
327367
368+ for (Map .Entry <Long , EventHandler > entry : this .eventHandles ) {
369+ if (entry .getValue () != null ) {
370+ anyExecutable .eventHandler = entry .getValue ();
371+ entry .setValue (null );
372+ return anyExecutable ;
373+ }
374+ }
375+
328376 return null ;
329377 }
330378
@@ -405,6 +453,8 @@ private static native MessageDefinition nativeTake(
405453
406454 private static native void nativeWaitSetAddTimer (long waitSetHandle , long timerHandle );
407455
456+ private static native void nativeWaitSetAddEvent (long waitSetHandle , long eventHandle );
457+
408458 private static native RMWRequestId nativeTakeRequest (long serviceHandle ,
409459 long requestFromJavaConverterHandle , long requestToJavaConverterHandle ,
410460 long requestDestructorHandle , MessageDefinition requestMessage );
@@ -421,6 +471,8 @@ private static native RMWRequestId nativeTakeResponse(long clientHandle,
421471
422472 private static native boolean nativeWaitSetTimerIsReady (long waitSetHandle , long index );
423473
474+ private static native boolean nativeWaitSetEventIsReady (long waitSetHandle , long index );
475+
424476 private static native boolean nativeWaitSetServiceIsReady (long waitSetHandle , long index );
425477
426478 private static native boolean nativeWaitSetClientIsReady (long waitSetHandle , long index );
0 commit comments