Skip to content

Commit f2eeac7

Browse files
authored
Refactor future implementation (#80)
Signed-off-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com>
1 parent 09c0e68 commit f2eeac7

File tree

16 files changed

+172
-166
lines changed

16 files changed

+172
-166
lines changed

rcljava/src/main/java/org/ros2/rcljava/RCLJava.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Collection;
2323
import java.util.Map;
2424
import java.util.concurrent.ConcurrentSkipListMap;
25+
import java.util.concurrent.Future;
2526
import java.util.concurrent.LinkedBlockingQueue;
2627

2728
import org.ros2.rcljava.client.Client;
@@ -360,6 +361,34 @@ public static void spinSome(final ComposableNode composableNode) {
360361
getGlobalExecutor().removeNode(composableNode);
361362
}
362363

364+
public static void spinUntilComplete(final Node node, final Future future, long timeoutNs) {
365+
ComposableNode composableNode = new ComposableNode() {
366+
public Node getNode() {
367+
return node;
368+
}
369+
};
370+
RCLJava.spinUntilComplete(composableNode, future, timeoutNs);
371+
}
372+
373+
public static void spinUntilComplete(final Node node, final Future future)
374+
{
375+
RCLJava.spinUntilComplete(node, future, -1);
376+
}
377+
378+
public static void spinUntilComplete(
379+
final ComposableNode node, final Future future, long timeoutNs)
380+
{
381+
getGlobalExecutor().addNode(node);
382+
getGlobalExecutor().spinUntilComplete(future, timeoutNs);
383+
getGlobalExecutor().removeNode(node);
384+
}
385+
386+
public static void spinUntilComplete(
387+
final ComposableNode node, final Future future)
388+
{
389+
RCLJava.spinUntilComplete(node, future, -1);
390+
}
391+
363392
public static synchronized void shutdown() {
364393
cleanup();
365394
if (RCLJava.defaultContext != null) {

rcljava/src/main/java/org/ros2/rcljava/client/ClientImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void accept(Future<V> input) {}
8787
long sequenceNumber = nativeSendClientRequest(
8888
handle, request.getFromJavaConverterInstance(),
8989
request.getDestructorInstance(), request);
90-
RCLFuture<V> future = new RCLFuture<V>(this.nodeReference);
90+
RCLFuture<V> future = new RCLFuture<V>();
9191

9292
Map.Entry<Consumer, RCLFuture> entry =
9393
new AbstractMap.SimpleEntry<Consumer, RCLFuture>(callback, future);

rcljava/src/main/java/org/ros2/rcljava/concurrent/RCLFuture.java

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -15,80 +15,58 @@
1515

1616
package org.ros2.rcljava.concurrent;
1717

18+
import java.lang.Deprecated;
1819
import java.lang.ref.WeakReference;
1920
import java.util.concurrent.ExecutionException;
2021
import java.util.concurrent.Future;
2122
import java.util.concurrent.TimeUnit;
2223
import java.util.concurrent.TimeoutException;
2324

2425
import org.ros2.rcljava.RCLJava;
25-
import org.ros2.rcljava.executors.Executor;
2626
import org.ros2.rcljava.node.Node;
2727

2828
public class RCLFuture<V> implements Future<V> {
2929
private WeakReference<Node> nodeReference;
3030
private boolean done = false;
3131
private V value = null;
32-
private Executor executor = null;
3332

34-
public RCLFuture(final WeakReference<Node> nodeReference) {
35-
this.nodeReference = nodeReference;
36-
}
37-
38-
public RCLFuture(final Executor executor) {
39-
this.executor = executor;
40-
}
33+
public RCLFuture() {}
4134

42-
public final V get() throws InterruptedException, ExecutionException {
35+
public final synchronized V get() throws InterruptedException, ExecutionException {
4336
if(this.value != null) {
4437
return this.value;
4538
}
4639
while (RCLJava.ok() && !isDone()) {
47-
if (executor != null) {
48-
executor.spinOnce();
49-
} else {
50-
Node node = nodeReference.get();
51-
if (node == null) {
52-
return null; // TODO(esteve) do something
53-
}
54-
RCLJava.spinOnce(node);
55-
}
40+
this.wait();
5641
}
5742
return this.value;
5843
}
5944

60-
public final V get(final long timeout, final TimeUnit unit)
45+
public final synchronized V get(final long timeout, final TimeUnit unit)
6146
throws InterruptedException, ExecutionException, TimeoutException {
6247
if (isDone()) {
6348
return value;
6449
}
6550

66-
long endTime = TimeUnit.NANOSECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
67-
68-
long timeoutNS = TimeUnit.NANOSECONDS.convert(timeout, unit);
51+
long endTime = System.nanoTime();
52+
long timeoutNS = unit.toNanos(timeout);
6953

7054
if (timeoutNS > 0) {
7155
endTime += timeoutNS;
7256
}
7357

7458
while (RCLJava.ok()) {
75-
if (executor != null) {
76-
executor.spinOnce(timeoutNS);
77-
} else {
78-
Node node = nodeReference.get();
79-
if (node == null) {
80-
return null; // TODO(esteve) do something
81-
}
82-
RCLJava.spinOnce(node, timeoutNS);
83-
}
59+
this.wait(TimeUnit.NANOSECONDS.toMillis(timeoutNS), (int) (timeoutNS % 1000000l));
8460

8561
if (isDone()) {
8662
return value;
8763
}
8864

89-
long now = TimeUnit.NANOSECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
65+
long now = System.nanoTime();
9066
if (now >= endTime) {
9167
throw new TimeoutException();
68+
} else {
69+
timeoutNS = endTime - now;
9270
}
9371
}
9472
throw new InterruptedException();
@@ -109,5 +87,6 @@ public final boolean cancel(final boolean mayInterruptIfRunning) {
10987
public final synchronized void set(final V value) {
11088
this.value = value;
11189
done = true;
90+
this.notify();
11291
}
11392
}

rcljava/src/main/java/org/ros2/rcljava/executors/BaseExecutor.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package org.ros2.rcljava.executors;
1717

18+
import java.lang.Math;
1819
import java.lang.SuppressWarnings;
1920
import java.util.AbstractMap;
2021
import java.util.ArrayList;
@@ -24,6 +25,7 @@
2425
import java.util.Collection;
2526
import java.util.concurrent.BlockingQueue;
2627
import java.util.concurrent.ConcurrentHashMap;
28+
import java.util.concurrent.Future;
2729
import java.util.concurrent.LinkedBlockingQueue;
2830

2931

@@ -439,6 +441,30 @@ private boolean maxDurationNotElapsed(long maxDurationNs, long startNs) {
439441
return false;
440442
}
441443

444+
public void spinUntilComplete(Future future, long maxDurationNs) {
445+
long startNs = System.nanoTime();
446+
// only use a blocking call to waitForWork when maxDurationNs < 0
447+
long waitTimeout = -1;
448+
if (maxDurationNs > 0) {
449+
// We cannot be waiting for work forever, if not we're not going to respect the passed timeout.
450+
// We can neither do a non-blocking call to waitForWork(), because if the future has not yet
451+
// been completed it will result in a busy loop.
452+
// Use an arbitrary timeout to relax cpu usage.
453+
waitTimeout = Math.min(maxDurationNs / 10, 10000000 /* 1ms*/);
454+
}
455+
while (RCLJava.ok() && (maxDurationNs < 0 || maxDurationNotElapsed(maxDurationNs, startNs))) {
456+
waitForWork(waitTimeout);
457+
AnyExecutable anyExecutable = getNextExecutable();
458+
while (anyExecutable != null) {
459+
executeAnyExecutable(anyExecutable);
460+
if (future.isDone()) {
461+
return;
462+
}
463+
anyExecutable = getNextExecutable();
464+
}
465+
}
466+
}
467+
442468
private void spinSomeImpl(long maxDurationNs, boolean exhaustive) {
443469
long startNs = System.nanoTime();
444470
boolean workAvailable = false;

rcljava/src/main/java/org/ros2/rcljava/executors/Executor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package org.ros2.rcljava.executors;
1717

18+
import java.util.concurrent.Future;
19+
1820
import org.ros2.rcljava.node.ComposableNode;
1921

2022
public interface Executor {
@@ -26,6 +28,10 @@ public interface Executor {
2628

2729
public void spinOnce(long timeout);
2830

31+
public void spinUntilComplete(Future future, long maxDurationNs);
32+
33+
public void spinUntilComplete(Future future);
34+
2935
public void spinSome();
3036

3137
public void spinSome(long maxDurationNs);

rcljava/src/main/java/org/ros2/rcljava/executors/MultiThreadedExecutor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.concurrent.Executors;
1919
import java.util.concurrent.ExecutorService;
20+
import java.util.concurrent.Future;
2021

2122
import org.ros2.rcljava.RCLJava;
2223
import org.ros2.rcljava.node.ComposableNode;
@@ -55,6 +56,14 @@ public void spinOnce(long timeout) {
5556
this.baseExecutor.spinOnce(timeout);
5657
}
5758

59+
public void spinUntilComplete(Future future, long timeoutNs) {
60+
this.baseExecutor.spinUntilComplete(future, timeoutNs);
61+
}
62+
63+
public void spinUntilComplete(Future future) {
64+
this.baseExecutor.spinUntilComplete(future, -1);
65+
}
66+
5867
public void spinSome() {
5968
this.spinSome(0);
6069
}

rcljava/src/main/java/org/ros2/rcljava/executors/SingleThreadedExecutor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package org.ros2.rcljava.executors;
1717

18+
import java.util.concurrent.Future;
19+
1820
import org.ros2.rcljava.RCLJava;
1921
import org.ros2.rcljava.node.ComposableNode;
2022
import org.ros2.rcljava.executors.BaseExecutor;
@@ -38,6 +40,14 @@ public void spinOnce(long timeout) {
3840
this.baseExecutor.spinOnce(timeout);
3941
}
4042

43+
public void spinUntilComplete(Future future, long timeoutNs) {
44+
this.baseExecutor.spinUntilComplete(future, timeoutNs);
45+
}
46+
47+
public void spinUntilComplete(Future future) {
48+
this.baseExecutor.spinUntilComplete(future, -1);
49+
}
50+
4151
public void spinSome() {
4252
this.spinSome(0);
4353
}

rcljava/src/main/java/org/ros2/rcljava/parameters/client/AsyncParametersClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.Future;
2020

2121
import org.ros2.rcljava.consumers.Consumer;
22+
import org.ros2.rcljava.node.Node;
2223
import org.ros2.rcljava.parameters.ParameterType;
2324
import org.ros2.rcljava.parameters.ParameterVariant;
2425

@@ -59,4 +60,6 @@ public Future<List<rcl_interfaces.msg.ParameterDescriptor>> describeParameters(
5960
public Future<List<rcl_interfaces.msg.ParameterDescriptor>> describeParameters(
6061
final List<String> names,
6162
final Consumer<Future<List<rcl_interfaces.msg.ParameterDescriptor>>> callback);
63+
64+
public Node getNode();
6265
}

rcljava/src/main/java/org/ros2/rcljava/parameters/client/AsyncParametersClientImpl.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,7 @@ public Future<List<ParameterVariant>> getParameters(final List<String> names) {
104104

105105
public Future<List<ParameterVariant>> getParameters(
106106
final List<String> names, final Consumer<Future<List<ParameterVariant>>> callback) {
107-
final RCLFuture<List<ParameterVariant>> futureResult =
108-
new RCLFuture<List<ParameterVariant>>(new WeakReference<Node>(this.node));
107+
final RCLFuture<List<ParameterVariant>> futureResult = new RCLFuture<List<ParameterVariant>>();
109108
final rcl_interfaces.srv.GetParameters_Request request =
110109
new rcl_interfaces.srv.GetParameters_Request();
111110
request.setNames(names);
@@ -141,8 +140,7 @@ public Future<List<ParameterType>> getParameterTypes(final List<String> names) {
141140

142141
public Future<List<ParameterType>> getParameterTypes(
143142
final List<String> names, final Consumer<Future<List<ParameterType>>> callback) {
144-
final RCLFuture<List<ParameterType>> futureResult =
145-
new RCLFuture<List<ParameterType>>(new WeakReference<Node>(this.node));
143+
final RCLFuture<List<ParameterType>> futureResult = new RCLFuture<List<ParameterType>>();
146144
final rcl_interfaces.srv.GetParameterTypes_Request request =
147145
new rcl_interfaces.srv.GetParameterTypes_Request();
148146
request.setNames(names);
@@ -178,8 +176,7 @@ public Future<List<rcl_interfaces.msg.SetParametersResult>> setParameters(
178176
final List<ParameterVariant> parameters,
179177
final Consumer<Future<List<rcl_interfaces.msg.SetParametersResult>>> callback) {
180178
final RCLFuture<List<rcl_interfaces.msg.SetParametersResult>> futureResult =
181-
new RCLFuture<List<rcl_interfaces.msg.SetParametersResult>>(
182-
new WeakReference<Node>(this.node));
179+
new RCLFuture<List<rcl_interfaces.msg.SetParametersResult>>();
183180
final rcl_interfaces.srv.SetParameters_Request request =
184181
new rcl_interfaces.srv.SetParameters_Request();
185182
List<rcl_interfaces.msg.Parameter> requestParameters =
@@ -188,7 +185,6 @@ public Future<List<rcl_interfaces.msg.SetParametersResult>> setParameters(
188185
requestParameters.add(parameterVariant.toParameter());
189186
}
190187
request.setParameters(requestParameters);
191-
192188
setParametersClient.asyncSendRequest(
193189
request, new Consumer<Future<rcl_interfaces.srv.SetParameters_Response>>() {
194190
public void accept(final Future<rcl_interfaces.srv.SetParameters_Response> future) {
@@ -216,7 +212,7 @@ public Future<rcl_interfaces.msg.SetParametersResult> setParametersAtomically(
216212
final List<ParameterVariant> parameters,
217213
final Consumer<Future<rcl_interfaces.msg.SetParametersResult>> callback) {
218214
final RCLFuture<rcl_interfaces.msg.SetParametersResult> futureResult =
219-
new RCLFuture<rcl_interfaces.msg.SetParametersResult>(new WeakReference<Node>(this.node));
215+
new RCLFuture<rcl_interfaces.msg.SetParametersResult>();
220216
final rcl_interfaces.srv.SetParametersAtomically_Request request =
221217
new rcl_interfaces.srv.SetParametersAtomically_Request();
222218
List<rcl_interfaces.msg.Parameter> requestParameters =
@@ -253,7 +249,7 @@ public Future<rcl_interfaces.msg.ListParametersResult> listParameters(
253249
public Future<rcl_interfaces.msg.ListParametersResult> listParameters(final List<String> prefixes,
254250
long depth, final Consumer<Future<rcl_interfaces.msg.ListParametersResult>> callback) {
255251
final RCLFuture<rcl_interfaces.msg.ListParametersResult> futureResult =
256-
new RCLFuture<rcl_interfaces.msg.ListParametersResult>(new WeakReference<Node>(this.node));
252+
new RCLFuture<rcl_interfaces.msg.ListParametersResult>();
257253
final rcl_interfaces.srv.ListParameters_Request request =
258254
new rcl_interfaces.srv.ListParameters_Request();
259255
request.setPrefixes(prefixes);
@@ -286,8 +282,7 @@ public Future<List<rcl_interfaces.msg.ParameterDescriptor>> describeParameters(
286282
final List<String> names,
287283
final Consumer<Future<List<rcl_interfaces.msg.ParameterDescriptor>>> callback) {
288284
final RCLFuture<List<rcl_interfaces.msg.ParameterDescriptor>> futureResult =
289-
new RCLFuture<List<rcl_interfaces.msg.ParameterDescriptor>>(
290-
new WeakReference<Node>(this.node));
285+
new RCLFuture<List<rcl_interfaces.msg.ParameterDescriptor>>();
291286
final rcl_interfaces.srv.DescribeParameters_Request request =
292287
new rcl_interfaces.srv.DescribeParameters_Request();
293288
request.setNames(names);
@@ -309,4 +304,8 @@ public void accept(final Future<rcl_interfaces.srv.DescribeParameters_Response>
309304
});
310305
return futureResult;
311306
}
307+
308+
public Node getNode() {
309+
return this.node;
310+
}
312311
}

0 commit comments

Comments
 (0)