Skip to content

Commit 87dd537

Browse files
committed
[SPARK-5549] Define TaskContext interface in Scala.
So the interface documentation shows up in ScalaDoc.
1 parent c306555 commit 87dd537

File tree

4 files changed

+100
-54
lines changed

4 files changed

+100
-54
lines changed

core/src/main/java/org/apache/spark/TaskContext.java renamed to core/src/main/scala/org/apache/spark/TaskContext.scala

Lines changed: 53 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -15,112 +15,116 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark;
18+
package org.apache.spark
1919

20-
import java.io.Serializable;
20+
import java.io.Serializable
2121

22-
import scala.Function0;
23-
import scala.Function1;
24-
import scala.Unit;
22+
import org.apache.spark.annotation.DeveloperApi
23+
import org.apache.spark.executor.TaskMetrics
24+
import org.apache.spark.util.TaskCompletionListener
2525

26-
import org.apache.spark.annotation.DeveloperApi;
27-
import org.apache.spark.executor.TaskMetrics;
28-
import org.apache.spark.util.TaskCompletionListener;
2926

30-
/**
31-
* Contextual information about a task which can be read or mutated during
32-
* execution. To access the TaskContext for a running task use
33-
* TaskContext.get().
34-
*/
35-
public abstract class TaskContext implements Serializable {
27+
object TaskContext {
3628
/**
3729
* Return the currently active TaskContext. This can be called inside of
3830
* user functions to access contextual information about running tasks.
3931
*/
40-
public static TaskContext get() {
41-
return taskContext.get();
42-
}
32+
def get(): TaskContext = taskContext.get
33+
34+
private val taskContext: ThreadLocal[TaskContext] = new ThreadLocal[TaskContext]
35+
36+
private[spark] def setTaskContext(tc: TaskContext): Unit = taskContext.set(tc)
37+
38+
private[spark] def unset(): Unit = taskContext.remove()
39+
}
4340

44-
private static ThreadLocal<TaskContext> taskContext =
45-
new ThreadLocal<TaskContext>();
4641

47-
static void setTaskContext(TaskContext tc) {
48-
taskContext.set(tc);
49-
}
42+
/**
43+
* Contextual information about a task which can be read or mutated during
44+
* execution. To access the TaskContext for a running task, use:
45+
* {{{
46+
* org.apache.spark.TaskContext.get()
47+
* }}}
48+
*/
49+
abstract class TaskContext extends Serializable {
50+
// Note: TaskContext must NOT define a get method. Otherwise it will prevent the Scala compiler
51+
// from generating a static get method (based on the companion object's get method).
5052

51-
static void unset() {
52-
taskContext.remove();
53-
}
53+
// Note: getters in this class are defined with parentheses to maintain backward compatibility.
5454

5555
/**
56-
* Whether the task has completed.
56+
* Returns true if the task has completed.
5757
*/
58-
public abstract boolean isCompleted();
58+
def isCompleted(): Boolean
5959

6060
/**
61-
* Whether the task has been killed.
61+
* Returns true if the task has been killed.
6262
*/
63-
public abstract boolean isInterrupted();
63+
def isInterrupted(): Boolean
6464

65-
/** @deprecated use {@link #isRunningLocally()} */
66-
@Deprecated
67-
public abstract boolean runningLocally();
65+
/** @deprecated use { @link #isRunningLocally()}*/
66+
@deprecated("1.2.0", "use isRunningLocally")
67+
def runningLocally(): Boolean
6868

69-
public abstract boolean isRunningLocally();
69+
/**
70+
* Returns true if the task is running locally in the driver program.
71+
* @return
72+
*/
73+
def isRunningLocally(): Boolean
7074

7175
/**
72-
* Add a (Java friendly) listener to be executed on task completion.
76+
* Adds a (Java friendly) listener to be executed on task completion.
7377
* This will be called in all situation - success, failure, or cancellation.
7478
* An example use is for HadoopRDD to register a callback to close the input stream.
7579
*/
76-
public abstract TaskContext addTaskCompletionListener(TaskCompletionListener listener);
80+
def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext
7781

7882
/**
79-
* Add a listener in the form of a Scala closure to be executed on task completion.
83+
* Adds a listener in the form of a Scala closure to be executed on task completion.
8084
* This will be called in all situations - success, failure, or cancellation.
8185
* An example use is for HadoopRDD to register a callback to close the input stream.
8286
*/
83-
public abstract TaskContext addTaskCompletionListener(final Function1<TaskContext, Unit> f);
87+
def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext
8488

8589
/**
86-
* Add a callback function to be executed on task completion. An example use
90+
* Adds a callback function to be executed on task completion. An example use
8791
* is for HadoopRDD to register a callback to close the input stream.
8892
* Will be called in any situation - success, failure, or cancellation.
8993
*
90-
* @deprecated use {@link #addTaskCompletionListener(scala.Function1)}
94+
* @deprecated use { @link #addTaskCompletionListener(scala.Function1)}
9195
*
9296
* @param f Callback function.
9397
*/
94-
@Deprecated
95-
public abstract void addOnCompleteCallback(final Function0<Unit> f);
98+
@deprecated("1.2.0", "use addTaskCompletionListener")
99+
def addOnCompleteCallback(f: () => Unit)
96100

97101
/**
98102
* The ID of the stage that this task belong to.
99103
*/
100-
public abstract int stageId();
104+
def stageId(): Int
101105

102106
/**
103107
* The ID of the RDD partition that is computed by this task.
104108
*/
105-
public abstract int partitionId();
109+
def partitionId(): Int
106110

107111
/**
108112
* How many times this task has been attempted. The first task attempt will be assigned
109113
* attemptNumber = 0, and subsequent attempts will have increasing attempt numbers.
110114
*/
111-
public abstract int attemptNumber();
115+
def attemptNumber(): Int
112116

113-
/** @deprecated use {@link #taskAttemptId()}; it was renamed to avoid ambiguity. */
114-
@Deprecated
115-
public abstract long attemptId();
117+
/** @deprecated use { @link #taskAttemptId()}; it was renamed to avoid ambiguity. */
118+
@deprecated("1.3.0", "use attemptNumber")
119+
def attemptId(): Long
116120

117121
/**
118122
* An ID that is unique to this task attempt (within the same SparkContext, no two task attempts
119123
* will share the same attempt ID). This is roughly equivalent to Hadoop's TaskAttemptID.
120124
*/
121-
public abstract long taskAttemptId();
125+
def taskAttemptId(): Long
122126

123127
/** ::DeveloperApi:: */
124128
@DeveloperApi
125-
public abstract TaskMetrics taskMetrics();
129+
def taskMetrics(): TaskMetrics
126130
}

core/src/main/scala/org/apache/spark/TaskContextImpl.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ private[spark] class TaskContextImpl(
3333
with Logging {
3434

3535
// For backwards-compatibility; this method is now deprecated as of 1.3.0.
36-
override def attemptId: Long = taskAttemptId
36+
override def attemptId(): Long = taskAttemptId
3737

3838
// List of callback functions to execute when the task completes.
3939
@transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener]
@@ -87,10 +87,10 @@ private[spark] class TaskContextImpl(
8787
interrupted = true
8888
}
8989

90-
override def isCompleted: Boolean = completed
90+
override def isCompleted(): Boolean = completed
9191

92-
override def isRunningLocally: Boolean = runningLocally
92+
override def isRunningLocally(): Boolean = runningLocally
9393

94-
override def isInterrupted: Boolean = interrupted
94+
override def isInterrupted(): Boolean = interrupted
9595
}
9696

core/src/test/java/org/apache/spark/util/JavaTaskCompletionListenerImpl.java renamed to core/src/test/java/test/org/apache/spark/JavaTaskCompletionListenerImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.util;
18+
package test.org.apache.spark;
1919

2020
import org.apache.spark.TaskContext;
21+
import org.apache.spark.util.TaskCompletionListener;
2122

2223

2324
/**
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package test.org.apache.spark;
19+
20+
import org.apache.spark.TaskContext;
21+
22+
/**
23+
* Something to make sure that TaskContext can be used in Java.
24+
*/
25+
public class JavaTaskContextCompileCheck {
26+
27+
public static void test() {
28+
TaskContext tc = TaskContext.get();
29+
30+
tc.isCompleted();
31+
tc.isInterrupted();
32+
tc.isRunningLocally();
33+
34+
tc.addTaskCompletionListener(new JavaTaskCompletionListenerImpl());
35+
36+
tc.attemptNumber();
37+
tc.partitionId();
38+
tc.stageId();
39+
tc.taskAttemptId();
40+
}
41+
}

0 commit comments

Comments
 (0)