Skip to content

Commit be96e83

Browse files
committed
Merge pull request #7 from rkuhn/wip-future-converters
add scala.concurrent.java8.FutureConverter
2 parents 0339ec1 + e781cf0 commit be96e83

File tree

6 files changed

+601
-1
lines changed

6 files changed

+601
-1
lines changed

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,7 @@
11
target
2+
.cache
3+
.classpath
4+
.project
5+
.settings/
6+
.target/
7+
bin/

build.sbt

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ test in Test := {
3333
(test in Test).value
3434
}
3535

36+
testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a")
37+
3638
sourceGenerators in Compile <+= sourceManaged in Compile map { dir =>
3739
def write(name: String, content: String) = {
3840
val f = dir / "java" / "scala" / "compat" / "java8" / s"${name}.java"
@@ -59,3 +61,24 @@ initialize := {
5961
if (Set("1.5", "1.6", "1.7") contains specVersion)
6062
sys.error("Java 8 or higher is required for this project.")
6163
}
64+
65+
lazy val JavaDoc = config("genjavadoc") extend Compile
66+
67+
inConfig(JavaDoc)(Defaults.configSettings) ++ Seq(
68+
packageDoc in Compile <<= packageDoc in JavaDoc,
69+
sources in JavaDoc <<= (target, compile in Compile, sources in Compile) map ((t, c, s) =>
70+
(t / "java" ** "*.java").get ++ s.filter(_.getName.endsWith(".java"))
71+
),
72+
javacOptions in JavaDoc := Seq(),
73+
artifactName in packageDoc in JavaDoc := ((sv, mod, art) => "" + mod.name + "_" + sv.binary + "-" + mod.revision + "-javadoc.jar"),
74+
libraryDependencies += compilerPlugin("com.typesafe.genjavadoc" %% "genjavadoc-plugin" % "0.5" cross CrossVersion.full),
75+
scalacOptions in Compile <+= target map (t => "-P:genjavadoc:out=" + (t / "java"))
76+
)
77+
78+
initialCommands :=
79+
"""|import scala.concurrent._
80+
|import ExecutionContext.Implicits.global
81+
|import java.util.concurrent.{CompletionStage,CompletableFuture}
82+
|import scala.concurrent.java8.FutureConverter._
83+
|""".stripMargin
84+
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
package scala.concurrent.java8
2+
3+
import scala.concurrent.{ Future, Promise, ExecutionContext, ExecutionContextExecutorService, ExecutionContextExecutor, impl }
4+
import java.util.concurrent.{ CompletionStage, Executor, ExecutorService, CompletableFuture }
5+
import scala.util.{ Try, Success, Failure }
6+
import java.util.function.{ BiConsumer, Function JF, Consumer, BiFunction }
7+
8+
/**
9+
* This class contains static methods which convert between Java CompletionStage
10+
* and Scala Future. This is useful when mediating between Scala and Java
11+
* libraries with asynchronous APIs.
12+
*
13+
* Note that the bridge is implemented at the read-only side of asynchronous
14+
* handles, namely scala.concurrent.Future instead of scala.concurrent.Promise
15+
* and CompletionStage instead of CompletableFuture. This is intentional, as
16+
* the semantics of bridging the write-handles would be prone to race
17+
* conditions; if both ends (CompletableFuture and Promise) are completed
18+
* independently at the same time, they may contain different values afterwards.
19+
* For this reason, <code>toCompletableFuture()</code> is not supported on the
20+
* created CompletionStages.
21+
*
22+
* Example usage:
23+
*
24+
* {{{
25+
* import java.util.concurrent.CompletionStage;
26+
* import scala.concurrent.Future;
27+
* import static scala.concurrent.java8.FutureConverter.*;
28+
*
29+
* final CompletionStage<String> cs = ... // from an async Java API
30+
* final Future<String> f = toScala(cs);
31+
* ...
32+
* final Future<Integer> f2 = ... // from an async Scala API
33+
* final CompletionStage<Integer> cs2 = toJava(f2);
34+
* }}}
35+
*/
36+
object FutureConverter {
37+
38+
private class CF[T] extends CompletableFuture[T] with (Try[T] => Unit) {
39+
override def apply(t: Try[T]): Unit = t match {
40+
case Success(v) complete(v)
41+
case Failure(e) completeExceptionally(e)
42+
}
43+
44+
/*
45+
* Ensure that completions of this future cannot hold the Scala Future’s completer hostage.
46+
*/
47+
override def thenApply[U](fn: JF[_ >: T, _ <: U]): CompletableFuture[U] = thenApplyAsync(fn)
48+
override def thenAccept(fn: Consumer[_ >: T]): CompletableFuture[Void] = thenAcceptAsync(fn)
49+
override def thenRun(fn: Runnable): CompletableFuture[Void] = thenRunAsync(fn)
50+
override def thenCombine[U, V](cs: CompletionStage[_ <: U], fn: BiFunction[_ >: T, _ >: U, _ <: V]): CompletableFuture[V] = thenCombineAsync(cs, fn)
51+
override def thenAcceptBoth[U](cs: CompletionStage[_ <: U], fn: BiConsumer[_ >: T, _ >: U]): CompletableFuture[Void] = thenAcceptBothAsync(cs, fn)
52+
override def runAfterBoth(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterBothAsync(cs, fn)
53+
override def applyToEither[U](cs: CompletionStage[_ <: T], fn: JF[_ >: T, U]): CompletableFuture[U] = applyToEitherAsync(cs, fn)
54+
override def acceptEither(cs: CompletionStage[_ <: T], fn: Consumer[_ >: T]): CompletableFuture[Void] = acceptEitherAsync(cs, fn)
55+
override def runAfterEither(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterEitherAsync(cs, fn)
56+
override def thenCompose[U](fn: JF[_ >: T, _ <: CompletionStage[U]]): CompletableFuture[U] = thenComposeAsync(fn)
57+
override def whenComplete(fn: BiConsumer[_ >: T, _ >: Throwable]): CompletableFuture[T] = whenCompleteAsync(fn)
58+
override def handle[U](fn: BiFunction[_ >: T, Throwable, _ <: U]): CompletableFuture[U] = handleAsync(fn)
59+
override def exceptionally(fn: JF[Throwable, _ <: T]): CompletableFuture[T] = {
60+
val cf = new CompletableFuture[T]
61+
whenCompleteAsync(new BiConsumer[T, Throwable] {
62+
override def accept(t: T, e: Throwable): Unit = {
63+
if (e == null) cf.complete(t)
64+
else {
65+
val n: AnyRef =
66+
try {
67+
fn(e).asInstanceOf[AnyRef]
68+
} catch {
69+
case thr: Throwable cf.completeExceptionally(thr); this
70+
}
71+
if (n ne this) cf.complete(n.asInstanceOf[T])
72+
}
73+
}
74+
})
75+
cf
76+
}
77+
78+
override def toCompletableFuture(): CompletableFuture[T] =
79+
throw new UnsupportedOperationException("this CompletionStage represents a read-only Scala Future")
80+
81+
override def toString: String = super[CompletableFuture].toString
82+
}
83+
84+
/**
85+
* Returns a CompletionStage that will be completed with the same value or
86+
* exception as the given Scala Future when that completes. Since the Future is a read-only
87+
* representation, this CompletionStage does not support the
88+
* <code>toCompletableFuture</code> method. The semantics of Scala Future
89+
* demand that all callbacks are invoked asynchronously by default, therefore
90+
* the returned CompletionStage routes all calls to synchronous
91+
* transformations to their asynchronous counterparts, i.e.
92+
* <code>thenRun</code> will internally call <code>thenRunAsync</code>.
93+
*
94+
* @param f The Scala Future which may eventually supply the completion for
95+
* the returned CompletionStage
96+
* @return a CompletionStage that runs all callbacks asynchronously and does
97+
* not support the CompletableFuture interface
98+
*/
99+
def toJava[T](f: Future[T]): CompletionStage[T] = {
100+
val cf = new CF[T]
101+
implicit val ec = Future.InternalCallbackExecutor
102+
f onComplete cf
103+
cf
104+
}
105+
106+
private class P[T] extends impl.Promise.DefaultPromise[T] with BiConsumer[T, Throwable] {
107+
override def accept(v: T, e: Throwable): Unit = {
108+
if (e == null) complete(Success(v))
109+
else complete(Failure(e))
110+
}
111+
}
112+
113+
/**
114+
* Returns a Scala Future that will be completed with the same value or
115+
* exception as the given CompletionStage when that completes. Transformations
116+
* of the returned Future are executed asynchronously as specified by the
117+
* ExecutionContext that is given to the combinator methods.
118+
*
119+
* @param cs The CompletionStage which may eventually supply the completion
120+
* for the returned Scala Future
121+
* @return a Scala Future that represents the CompletionStage's completion
122+
*/
123+
def toScala[T](cs: CompletionStage[T]): Future[T] = {
124+
val p = new P[T]
125+
cs whenComplete p
126+
p.future
127+
}
128+
129+
/**
130+
* Creates an ExecutionContext from a given ExecutorService, using the given
131+
* Consumer for reporting errors. The latter can be created as in the
132+
* following example:
133+
*
134+
* {{{
135+
* final ExecutionContext ec = Converter.fromExecutorService(es, thr -> thr.printStackTrace());
136+
* }}}
137+
*
138+
* @param e an ExecutorService
139+
* @param reporter a Consumer for reporting errors during execution
140+
* @return an ExecutionContext backed by the given ExecutorService
141+
*/
142+
def fromExecutorService(e: ExecutorService, reporter: Consumer[Throwable]): ExecutionContextExecutorService =
143+
ExecutionContext.fromExecutorService(e, reporter.accept)
144+
145+
/**
146+
* Creates an ExecutionContext from a given ExecutorService, using the
147+
* default reporter for uncaught exceptions which will just call
148+
* <code>.printStackTrace()</code>.
149+
*
150+
* @param e an ExecutorService
151+
* @return an ExecutionContext backed by the given ExecutorService
152+
*/
153+
def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService =
154+
ExecutionContext.fromExecutorService(e, ExecutionContext.defaultReporter)
155+
156+
/**
157+
* Creates an ExecutionContext from a given Executor, using the given
158+
* Consumer for reporting errors. The latter can be created as in the
159+
* following example:
160+
*
161+
* {{{
162+
* final ExecutionContext ec = Converter.fromExecutor(es, thr -> thr.printStackTrace());
163+
* }}}
164+
*
165+
* @param e an Executor
166+
* @param reporter a Consumer for reporting errors during execution
167+
* @return an ExecutionContext backed by the given Executor
168+
*/
169+
def fromExecutor(e: Executor, reporter: Consumer[Throwable]): ExecutionContextExecutor =
170+
ExecutionContext.fromExecutor(e, reporter.accept)
171+
172+
/**
173+
* Creates an ExecutionContext from a given Executor, using the
174+
* default reporter for uncaught exceptions which will just call
175+
* <code>.printStackTrace()</code>.
176+
*
177+
* @param e an Executor
178+
* @return an ExecutionContext backed by the given Executor
179+
*/
180+
def fromExecutor(e: Executor): ExecutionContextExecutor =
181+
ExecutionContext.fromExecutor(e, ExecutionContext.defaultReporter)
182+
183+
/**
184+
* Return the global ExecutionContext for Scala Futures.
185+
*
186+
* @return the ExecutionContext
187+
*/
188+
def globalExecutionContext: ExecutionContext = ExecutionContext.global
189+
190+
/**
191+
* Construct an empty <code>scala.concurrent.Promise</code>.
192+
*
193+
* @return a Promise which is not yet completed
194+
*/
195+
def promise[T](): Promise[T] = Promise()
196+
197+
/**
198+
* Construct an already fulfilled <code>scala.concurrent.Promise</code> which holds the given value.
199+
*
200+
* @return the fulfilled Promise
201+
*/
202+
def keptPromise[T](v: T): Promise[T] = Promise.successful(v)
203+
204+
/**
205+
* Construct an already fulfilled <code>scala.concurrent.Promise</code> which holds the given failure.
206+
*
207+
* @return the fulfilled Promise
208+
*/
209+
def failedPromise[T](ex: Throwable): Promise[T] = Promise.failed(ex)
210+
211+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package scala.concurrent
2+
3+
import java.util.concurrent.CompletionStage
4+
5+
package object java8 {
6+
7+
implicit class futureToCompletionStage[T](val f: Future[T]) extends AnyVal {
8+
/**
9+
* Returns a CompletionStage that will be completed with the same value or
10+
* exception as the given Scala Future when that completes. Since the Future is a read-only
11+
* representation, this CompletionStage does not support the
12+
* <code>toCompletableFuture</code> method. The semantics of Scala Future
13+
* demand that all callbacks are invoked asynchronously by default, therefore
14+
* the returned CompletionStage routes all calls to synchronous
15+
* transformations to their asynchronous counterparts, i.e.
16+
* <code>thenRun</code> will internally call <code>thenRunAsync</code>.
17+
*
18+
* @param f The Scala Future which may eventually supply the completion for
19+
* the returned CompletionStage
20+
* @return a CompletionStage that runs all callbacks asynchronously and does
21+
* not support the CompletableFuture interface
22+
*/
23+
def toJava: CompletionStage[T] = FutureConverter.toJava(f)
24+
}
25+
26+
implicit class completionStageToFuture[T](val cs: CompletionStage[T]) extends AnyVal {
27+
/**
28+
* Returns a Scala Future that will be completed with the same value or
29+
* exception as the given CompletionStage when that completes. Transformations
30+
* of the returned Future are executed asynchronously as specified by the
31+
* ExecutionContext that is given to the combinator methods.
32+
*
33+
* @param cs The CompletionStage which may eventually supply the completion
34+
* for the returned Scala Future
35+
* @return a Scala Future that represents the CompletionStage's completion
36+
*/
37+
def toScala: Future[T] = FutureConverter.toScala(cs)
38+
}
39+
}

src/test/java/scala/compat/java8/LambdaTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
package scala.compat.java8;
55

66
import scala.runtime.*;
7-
import static scala.compat.java8.TestAPI.*;
87
import static scala.compat.java8.JFunction.*;
8+
import static scala.compat.java8.TestAPI.*;
9+
910
import org.junit.Test;
1011

1112
public class LambdaTest {

0 commit comments

Comments
 (0)