Skip to content

Commit 346e46e

Browse files
committed
[SPARK-3902] Stabilize AsyncRDDActions; add Java API.
1 parent 72f36ee commit 346e46e

File tree

5 files changed

+254
-33
lines changed

5 files changed

+254
-33
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.api.java;
19+
20+
21+
import java.util.List;
22+
import java.util.concurrent.Future;
23+
24+
public interface JavaFutureAction<T> extends Future<T> {
25+
26+
/**
27+
* Returns the job IDs run by the underlying async operation.
28+
*
29+
* This returns the current snapshot of the job list. Certain operations may run multiple
30+
* jobs, so multiple calls to this method may return different lists.
31+
*/
32+
List<Integer> jobIds();
33+
}

core/src/main/scala/org/apache/spark/FutureAction.scala

Lines changed: 70 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,20 @@
1717

1818
package org.apache.spark
1919

20-
import scala.concurrent._
21-
import scala.concurrent.duration.Duration
22-
import scala.util.Try
20+
import java.util.concurrent.TimeUnit
2321

24-
import org.apache.spark.annotation.Experimental
22+
import org.apache.spark.api.java.JavaFutureAction
2523
import org.apache.spark.rdd.RDD
2624
import org.apache.spark.scheduler.{JobFailed, JobSucceeded, JobWaiter}
2725

26+
import scala.concurrent._
27+
import scala.concurrent.duration.Duration
28+
import scala.util.{Failure, Try}
29+
2830
/**
29-
* :: Experimental ::
3031
* A future for the result of an action to support cancellation. This is an extension of the
3132
* Scala Future interface to support cancellation.
3233
*/
33-
@Experimental
3434
trait FutureAction[T] extends Future[T] {
3535
// Note that we redefine methods of the Future trait here explicitly so we can specify a different
3636
// documentation (with reference to the word "action").
@@ -69,6 +69,11 @@ trait FutureAction[T] extends Future[T] {
6969
*/
7070
override def isCompleted: Boolean
7171

72+
/**
73+
* Returns whether the action has been cancelled.
74+
*/
75+
def isCancelled: Boolean
76+
7277
/**
7378
* The value of this Future.
7479
*
@@ -96,15 +101,16 @@ trait FutureAction[T] extends Future[T] {
96101

97102

98103
/**
99-
* :: Experimental ::
100104
* A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
101105
* count, collect, reduce.
102106
*/
103-
@Experimental
104107
class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
105108
extends FutureAction[T] {
106109

110+
@volatile private var _cancelled: Boolean = false
111+
107112
override def cancel() {
113+
_cancelled = true
108114
jobWaiter.cancel()
109115
}
110116

@@ -143,6 +149,8 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
143149
}
144150

145151
override def isCompleted: Boolean = jobWaiter.jobFinished
152+
153+
override def isCancelled: Boolean = _cancelled
146154

147155
override def value: Option[Try[T]] = {
148156
if (jobWaiter.jobFinished) {
@@ -164,12 +172,10 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
164172

165173

166174
/**
167-
* :: Experimental ::
168175
* A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take,
169176
* takeSample. Cancellation works by setting the cancelled flag to true and interrupting the
170177
* action thread if it is being blocked by a job.
171178
*/
172-
@Experimental
173179
class ComplexFutureAction[T] extends FutureAction[T] {
174180

175181
// Pointer to the thread that is executing the action. It is set when the action is run.
@@ -222,7 +228,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
222228
// If the action hasn't been cancelled yet, submit the job. The check and the submitJob
223229
// command need to be in an atomic block.
224230
val job = this.synchronized {
225-
if (!cancelled) {
231+
if (!isCancelled) {
226232
rdd.context.submitJob(rdd, processPartition, partitions, resultHandler, resultFunc)
227233
} else {
228234
throw new SparkException("Action has been cancelled")
@@ -243,10 +249,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
243249
}
244250
}
245251

246-
/**
247-
* Returns whether the promise has been cancelled.
248-
*/
249-
def cancelled: Boolean = _cancelled
252+
override def isCancelled: Boolean = _cancelled
250253

251254
@throws(classOf[InterruptedException])
252255
@throws(classOf[scala.concurrent.TimeoutException])
@@ -271,3 +274,55 @@ class ComplexFutureAction[T] extends FutureAction[T] {
271274
def jobIds = jobs
272275

273276
}
277+
278+
private[spark]
279+
class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S => T)
280+
extends JavaFutureAction[T] {
281+
282+
import scala.collection.JavaConverters._
283+
284+
override def isCancelled: Boolean = futureAction.isCancelled
285+
286+
override def isDone: Boolean = {
287+
// According to java.util.Future's Javadoc, this returns True if the task was completed,
288+
// whether that completion was due to succesful execution, an exception, or a cancellation.
289+
futureAction.isCancelled || futureAction.isCompleted
290+
}
291+
292+
override def jobIds(): java.util.List[java.lang.Integer] = {
293+
new java.util.ArrayList(futureAction.jobIds.map(x => new Integer(x)).asJava)
294+
}
295+
296+
private def getImpl(timeout: Duration): T = {
297+
// This will throw TimeoutException on timeout:
298+
Await.ready(futureAction, timeout)
299+
futureAction.value.get match {
300+
case scala.util.Success(value) => converter(value)
301+
case Failure(exception) =>
302+
if (isCancelled) {
303+
throw new CancellationException("Job cancelled: ${exception.message}");
304+
} else {
305+
// java.util.Future.get() wraps exceptions in ExecutionException
306+
throw new ExecutionException("Exception thrown by job: ", exception)
307+
}
308+
}
309+
}
310+
311+
override def get(): T = getImpl(Duration.Inf)
312+
313+
override def get(timeout: Long, unit: TimeUnit): T =
314+
getImpl(Duration.fromNanos(unit.toNanos(timeout)))
315+
316+
override def cancel(mayInterruptIfRunning: Boolean): Boolean = {
317+
if (isDone) {
318+
// According to java.util.Future's Javadoc, this should return false if the task is completed.
319+
false
320+
} else {
321+
// We're limited in terms of the semantics we can provide here; our cancellation is
322+
// asynchronous and doesn't provide a mechanism to not cancel if the job is running.
323+
futureAction.cancel()
324+
true
325+
}
326+
}
327+
328+
}

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.api.java
1919

20+
import java.util
2021
import java.util.{Comparator, List => JList, Iterator => JIterator}
2122
import java.lang.{Iterable => JIterable, Long => JLong}
2223

@@ -26,7 +27,7 @@ import scala.reflect.ClassTag
2627
import com.google.common.base.Optional
2728
import org.apache.hadoop.io.compress.CompressionCodec
2829

29-
import org.apache.spark.{FutureAction, Partition, SparkContext, TaskContext}
30+
import org.apache.spark._
3031
import org.apache.spark.annotation.Experimental
3132
import org.apache.spark.api.java.JavaPairRDD._
3233
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
@@ -293,8 +294,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
293294
* Applies a function f to all elements of this RDD.
294295
*/
295296
def foreach(f: VoidFunction[T]) {
296-
val cleanF = rdd.context.clean((x: T) => f.call(x))
297-
rdd.foreach(cleanF)
297+
rdd.foreach(x => f.call(x))
298298
}
299299

300300
/**
@@ -575,16 +575,49 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
575575
def name(): String = rdd.name
576576

577577
/**
578-
* :: Experimental ::
579-
* The asynchronous version of the foreach action.
580-
*
581-
* @param f the function to apply to all the elements of the RDD
582-
* @return a FutureAction for the action
578+
* The asynchronous version of `count`, which returns a
579+
* future for counting the number of elements in this RDD.
583580
*/
584-
@Experimental
585-
def foreachAsync(f: VoidFunction[T]): FutureAction[Unit] = {
581+
def countAsync(): JavaFutureAction[JLong] = {
582+
import org.apache.spark.SparkContext._
583+
new JavaFutureActionWrapper[Long, JLong](rdd.countAsync(), x => new JLong(x))
584+
}
585+
586+
/**
587+
* The asynchronous version of `collect`, which returns a future for
588+
* retrieving an array containing all of the elements in this RDD.
589+
*/
590+
def collectAsync(): JavaFutureAction[JList[T]] = {
591+
import org.apache.spark.SparkContext._
592+
new JavaFutureActionWrapper(rdd.collectAsync(), (x: Seq[T]) => new java.util.ArrayList(x))
593+
}
594+
595+
/**
596+
* The asynchronous version of the `take` action, which returns a
597+
* future for retrieving the first `num` elements of this RDD.
598+
*/
599+
def takeAsync(num: Int): JavaFutureAction[JList[T]] = {
586600
import org.apache.spark.SparkContext._
587-
rdd.foreachAsync(x => f.call(x))
601+
new JavaFutureActionWrapper(rdd.takeAsync(num), (x: Seq[T]) => new java.util.ArrayList(x))
588602
}
589603

604+
/**
605+
* The asynchronous version of the `foreach` action, which
606+
* applies a function f to all the elements of this RDD.
607+
*/
608+
def foreachAsync(f: VoidFunction[T]): JavaFutureAction[Void] = {
609+
import org.apache.spark.SparkContext._
610+
new JavaFutureActionWrapper[Unit, Void](rdd.foreachAsync(x => f.call(x)),
611+
{ x => null.asInstanceOf[Void] })
612+
}
613+
614+
/**
615+
* The asynchronous version of the `foreachPartition` action, which
616+
* applies a function f to each partition of this RDD.
617+
*/
618+
def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): JavaFutureAction[Void] = {
619+
import org.apache.spark.SparkContext._
620+
new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x)),
621+
{ x => null.asInstanceOf[Void] })
622+
}
590623
}

core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,11 @@ import scala.concurrent.ExecutionContext.Implicits.global
2424
import scala.reflect.ClassTag
2525

2626
import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
27-
import org.apache.spark.annotation.Experimental
2827

2928
/**
30-
* :: Experimental ::
3129
* A set of asynchronous RDD actions available through an implicit conversion.
3230
* Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
3331
*/
34-
@Experimental
3532
class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Logging {
3633

3734
/**

0 commit comments

Comments
 (0)