1- package scala .concurrent .java8
1+ package scala .compat .java8
22
3+ import scala .concurrent .java8 .FuturesConvertersImpl ._
34import scala .concurrent .{ Future , Promise , ExecutionContext , ExecutionContextExecutorService , ExecutionContextExecutor , impl }
45import java .util .concurrent .{ CompletionStage , Executor , ExecutorService , CompletableFuture }
56import scala .util .{ Try , Success , Failure }
@@ -33,54 +34,7 @@ import java.util.function.{ BiConsumer, Function ⇒ JF, Consumer, BiFunction }
3334 * final CompletionStage<Integer> cs2 = toJava(f2);
3435 * }}}
3536 */
36- object FutureConverter {
37-
38- private class CF [T ] extends CompletableFuture [T ] with (Try [T ] => Unit ) {
39- override def apply (t : Try [T ]): Unit = t match {
40- case Success (v) ⇒ complete(v)
41- case Failure (e) ⇒ completeExceptionally(e)
42- }
43-
44- /*
45- * Ensure that completions of this future cannot hold the Scala Future’s completer hostage.
46- */
47- override def thenApply [U ](fn : JF [_ >: T , _ <: U ]): CompletableFuture [U ] = thenApplyAsync(fn)
48- override def thenAccept (fn : Consumer [_ >: T ]): CompletableFuture [Void ] = thenAcceptAsync(fn)
49- override def thenRun (fn : Runnable ): CompletableFuture [Void ] = thenRunAsync(fn)
50- override def thenCombine [U , V ](cs : CompletionStage [_ <: U ], fn : BiFunction [_ >: T , _ >: U , _ <: V ]): CompletableFuture [V ] = thenCombineAsync(cs, fn)
51- override def thenAcceptBoth [U ](cs : CompletionStage [_ <: U ], fn : BiConsumer [_ >: T , _ >: U ]): CompletableFuture [Void ] = thenAcceptBothAsync(cs, fn)
52- override def runAfterBoth (cs : CompletionStage [_], fn : Runnable ): CompletableFuture [Void ] = runAfterBothAsync(cs, fn)
53- override def applyToEither [U ](cs : CompletionStage [_ <: T ], fn : JF [_ >: T , U ]): CompletableFuture [U ] = applyToEitherAsync(cs, fn)
54- override def acceptEither (cs : CompletionStage [_ <: T ], fn : Consumer [_ >: T ]): CompletableFuture [Void ] = acceptEitherAsync(cs, fn)
55- override def runAfterEither (cs : CompletionStage [_], fn : Runnable ): CompletableFuture [Void ] = runAfterEitherAsync(cs, fn)
56- override def thenCompose [U ](fn : JF [_ >: T , _ <: CompletionStage [U ]]): CompletableFuture [U ] = thenComposeAsync(fn)
57- override def whenComplete (fn : BiConsumer [_ >: T , _ >: Throwable ]): CompletableFuture [T ] = whenCompleteAsync(fn)
58- override def handle [U ](fn : BiFunction [_ >: T , Throwable , _ <: U ]): CompletableFuture [U ] = handleAsync(fn)
59- override def exceptionally (fn : JF [Throwable , _ <: T ]): CompletableFuture [T ] = {
60- val cf = new CompletableFuture [T ]
61- whenCompleteAsync(new BiConsumer [T , Throwable ] {
62- override def accept (t : T , e : Throwable ): Unit = {
63- if (e == null ) cf.complete(t)
64- else {
65- val n : AnyRef =
66- try {
67- fn(e).asInstanceOf [AnyRef ]
68- } catch {
69- case thr : Throwable ⇒ cf.completeExceptionally(thr); this
70- }
71- if (n ne this ) cf.complete(n.asInstanceOf [T ])
72- }
73- }
74- })
75- cf
76- }
77-
78- override def toCompletableFuture (): CompletableFuture [T ] =
79- throw new UnsupportedOperationException (" this CompletionStage represents a read-only Scala Future" )
80-
81- override def toString : String = super [CompletableFuture ].toString
82- }
83-
37+ object FutureConverters {
8438 /**
8539 * Returns a CompletionStage that will be completed with the same value or
8640 * exception as the given Scala Future when that completes. Since the Future is a read-only
@@ -98,18 +52,11 @@ object FutureConverter {
9852 */
9953 def toJava [T ](f : Future [T ]): CompletionStage [T ] = {
10054 val cf = new CF [T ]
101- implicit val ec = Future . InternalCallbackExecutor
55+ implicit val ec = InternalCallbackExecutor
10256 f onComplete cf
10357 cf
10458 }
10559
106- private class P [T ] extends impl.Promise .DefaultPromise [T ] with BiConsumer [T , Throwable ] {
107- override def accept (v : T , e : Throwable ): Unit = {
108- if (e == null ) complete(Success (v))
109- else complete(Failure (e))
110- }
111- }
112-
11360 /**
11461 * Returns a Scala Future that will be completed with the same value or
11562 * exception as the given CompletionStage when that completes. Transformations
@@ -208,4 +155,36 @@ object FutureConverter {
208155 */
209156 def failedPromise [T ](ex : Throwable ): Promise [T ] = Promise .failed(ex)
210157
158+ implicit class futureToCompletionStage [T ](val f : Future [T ]) extends AnyVal {
159+ /**
160+ * Returns a CompletionStage that will be completed with the same value or
161+ * exception as the given Scala Future when that completes. Since the Future is a read-only
162+ * representation, this CompletionStage does not support the
163+ * <code>toCompletableFuture</code> method. The semantics of Scala Future
164+ * demand that all callbacks are invoked asynchronously by default, therefore
165+ * the returned CompletionStage routes all calls to synchronous
166+ * transformations to their asynchronous counterparts, i.e.
167+ * <code>thenRun</code> will internally call <code>thenRunAsync</code>.
168+ *
169+ * @param f The Scala Future which may eventually supply the completion for
170+ * the returned CompletionStage
171+ * @return a CompletionStage that runs all callbacks asynchronously and does
172+ * not support the CompletableFuture interface
173+ */
174+ def toJava : CompletionStage [T ] = FutureConverters .toJava(f)
175+ }
176+
177+ implicit class completionStageToFuture [T ](val cs : CompletionStage [T ]) extends AnyVal {
178+ /**
179+ * Returns a Scala Future that will be completed with the same value or
180+ * exception as the given CompletionStage when that completes. Transformations
181+ * of the returned Future are executed asynchronously as specified by the
182+ * ExecutionContext that is given to the combinator methods.
183+ *
184+ * @param cs The CompletionStage which may eventually supply the completion
185+ * for the returned Scala Future
186+ * @return a Scala Future that represents the CompletionStage's completion
187+ */
188+ def toScala : Future [T ] = FutureConverters .toScala(cs)
189+ }
211190}
0 commit comments