Skip to content

Commit 5fdfc0d

Browse files
committed
Merge remote-tracking branch 'apache-github/master' into SPARK-7361
Conflicts: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
2 parents 5870e2b + 9f1f9b1 commit 5fdfc0d

File tree

146 files changed

+20877
-456
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

146 files changed

+20877
-456
lines changed

.rat-excludes

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,5 +74,12 @@ logs
7474
.*scalastyle-output.xml
7575
.*dependency-reduced-pom.xml
7676
known_translations
77+
json_expectation
78+
local-1422981759269/*
79+
local-1422981780767/*
80+
local-1425081759269/*
81+
local-1426533911241/*
82+
local-1426633911242/*
83+
local-1427397477963/*
7784
DESCRIPTION
7885
NAMESPACE

core/pom.xml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,14 @@
228228
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
229229
<version>3.2.10</version>
230230
</dependency>
231+
<dependency>
232+
<groupId>com.sun.jersey</groupId>
233+
<artifactId>jersey-server</artifactId>
234+
</dependency>
235+
<dependency>
236+
<groupId>com.sun.jersey</groupId>
237+
<artifactId>jersey-core</artifactId>
238+
</dependency>
231239
<dependency>
232240
<groupId>org.apache.mesos</groupId>
233241
<artifactId>mesos</artifactId>
@@ -273,7 +281,6 @@
273281
<dependency>
274282
<groupId>org.apache.ivy</groupId>
275283
<artifactId>ivy</artifactId>
276-
<version>${ivy.version}</version>
277284
</dependency>
278285
<dependency>
279286
<groupId>oro</groupId>
@@ -362,7 +369,7 @@
362369
<dependency>
363370
<groupId>org.spark-project</groupId>
364371
<artifactId>pyrolite</artifactId>
365-
<version>2.0.1</version>
372+
<version>4.4</version>
366373
</dependency>
367374
<dependency>
368375
<groupId>net.sf.py4j</groupId>

core/src/main/java/org/apache/spark/JobExecutionStatus.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,15 @@
1717

1818
package org.apache.spark;
1919

20+
import org.apache.spark.util.EnumUtil;
21+
2022
public enum JobExecutionStatus {
2123
RUNNING,
2224
SUCCEEDED,
2325
FAILED,
24-
UNKNOWN
26+
UNKNOWN;
27+
28+
public static JobExecutionStatus fromString(String str) {
29+
return EnumUtil.parseIgnoreCase(JobExecutionStatus.class, str);
30+
}
2531
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.status.api.v1;
19+
20+
import org.apache.spark.util.EnumUtil;
21+
22+
public enum ApplicationStatus {
23+
COMPLETED,
24+
RUNNING;
25+
26+
public static ApplicationStatus fromString(String str) {
27+
return EnumUtil.parseIgnoreCase(ApplicationStatus.class, str);
28+
}
29+
30+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.status.api.v1;
19+
20+
import org.apache.spark.util.EnumUtil;
21+
22+
public enum StageStatus {
23+
ACTIVE,
24+
COMPLETE,
25+
FAILED,
26+
PENDING;
27+
28+
public static StageStatus fromString(String str) {
29+
return EnumUtil.parseIgnoreCase(StageStatus.class, str);
30+
}
31+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.status.api.v1;
19+
20+
import org.apache.spark.util.EnumUtil;
21+
22+
import java.util.HashSet;
23+
import java.util.Set;
24+
25+
public enum TaskSorting {
26+
ID,
27+
INCREASING_RUNTIME("runtime"),
28+
DECREASING_RUNTIME("-runtime");
29+
30+
private final Set<String> alternateNames;
31+
private TaskSorting(String... names) {
32+
alternateNames = new HashSet<String>();
33+
for (String n: names) {
34+
alternateNames.add(n);
35+
}
36+
}
37+
38+
public static TaskSorting fromString(String str) {
39+
String lower = str.toLowerCase();
40+
for (TaskSorting t: values()) {
41+
if (t.alternateNames.contains(lower)) {
42+
return t;
43+
}
44+
}
45+
return EnumUtil.parseIgnoreCase(TaskSorting.class, str);
46+
}
47+
48+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.util;
18+
19+
import com.google.common.base.Joiner;
20+
import org.apache.spark.annotation.Private;
21+
22+
@Private
23+
public class EnumUtil {
24+
public static <E extends Enum<E>> E parseIgnoreCase(Class<E> clz, String str) {
25+
E[] constants = clz.getEnumConstants();
26+
if (str == null) {
27+
return null;
28+
}
29+
for (E e : constants) {
30+
if (e.name().equalsIgnoreCase(str)) {
31+
return e;
32+
}
33+
}
34+
throw new IllegalArgumentException(
35+
String.format("Illegal type='%s'. Supported type values: %s",
36+
str, Joiner.on(", ").join(constants)));
37+
}
38+
}

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ import java.util.concurrent.TimeUnit
2121

2222
import scala.collection.mutable
2323

24+
import com.codahale.metrics.{Gauge, MetricRegistry}
25+
2426
import org.apache.spark.scheduler._
27+
import org.apache.spark.metrics.source.Source
2528
import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}
2629

2730
/**
@@ -144,6 +147,9 @@ private[spark] class ExecutorAllocationManager(
144147
private val executor =
145148
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
146149

150+
// Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem.
151+
val executorAllocationManagerSource = new ExecutorAllocationManagerSource
152+
147153
/**
148154
* Verify that the settings specified through the config are valid.
149155
* If not, throw an appropriate exception.
@@ -579,6 +585,29 @@ private[spark] class ExecutorAllocationManager(
579585
}
580586
}
581587

588+
/**
589+
* Metric source for ExecutorAllocationManager to expose its internal executor allocation
590+
* status to MetricsSystem.
591+
* Note: These metrics heavily rely on the internal implementation of
592+
* ExecutorAllocationManager, metrics or value of metrics will be changed when internal
593+
* implementation is changed, so these metrics are not stable across Spark version.
594+
*/
595+
private[spark] class ExecutorAllocationManagerSource extends Source {
596+
val sourceName = "ExecutorAllocationManager"
597+
val metricRegistry = new MetricRegistry()
598+
599+
private def registerGauge[T](name: String, value: => T, defaultValue: T): Unit = {
600+
metricRegistry.register(MetricRegistry.name("executors", name), new Gauge[T] {
601+
override def getValue: T = synchronized { Option(value).getOrElse(defaultValue) }
602+
})
603+
}
604+
605+
registerGauge("numberExecutorsToAdd", numExecutorsToAdd, 0)
606+
registerGauge("numberExecutorsPendingToRemove", executorsPendingToRemove.size, 0)
607+
registerGauge("numberAllExecutors", executorIds.size, 0)
608+
registerGauge("numberTargetExecutors", numExecutorsTarget, 0)
609+
registerGauge("numberMaxNeededExecutors", maxNumExecutorsNeeded(), 0)
610+
}
582611
}
583612

584613
private object ExecutorAllocationManager {
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import org.apache.spark.annotation.DeveloperApi
21+
22+
/**
23+
* Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in
24+
* memory-aware caches.
25+
*
26+
* Based on the following JavaWorld article:
27+
* http://www.javaworld.com/javaworld/javaqa/2003-12/02-qa-1226-sizeof.html
28+
*/
29+
@DeveloperApi
30+
object SizeEstimator {
31+
/**
32+
* :: DeveloperApi ::
33+
* Estimate the number of bytes that the given object takes up on the JVM heap. The estimate
34+
* includes space taken up by objects referenced by the given object, their references, and so on
35+
* and so forth.
36+
*
37+
* This is useful for determining the amount of heap space a broadcast variable will occupy on
38+
* each executor or the amount of space each object will take when caching objects in
39+
* deserialized form. This is not the same as the serialized size of the object, which will
40+
* typically be much smaller.
41+
*/
42+
@DeveloperApi
43+
def estimate(obj: AnyRef): Long = org.apache.spark.util.SizeEstimator.estimate(obj)
44+
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
428428
_ui =
429429
if (conf.getBoolean("spark.ui.enabled", true)) {
430430
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
431-
_env.securityManager,appName))
431+
_env.securityManager,appName, startTime = startTime))
432432
} else {
433433
// For tests, do not enable the UI
434434
None
@@ -537,6 +537,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
537537
_taskScheduler.postStartHook()
538538
_env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
539539
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
540+
_executorAllocationManager.foreach { e =>
541+
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
542+
}
540543

541544
// Make sure the context is stopped if the user forgets about it. This avoids leaving
542545
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
@@ -1676,7 +1679,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
16761679
partitions: Seq[Int],
16771680
allowLocal: Boolean
16781681
): Array[U] = {
1679-
runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)
1682+
val cleanedFunc = clean(func)
1683+
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions, allowLocal)
16801684
}
16811685

16821686
/**
@@ -1730,7 +1734,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
17301734
val callSite = getCallSite
17311735
logInfo("Starting job: " + callSite.shortForm)
17321736
val start = System.nanoTime
1733-
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
1737+
val cleanedFunc = clean(func)
1738+
val result = dagScheduler.runApproximateJob(rdd, cleanedFunc, evaluator, callSite, timeout,
17341739
localProperties.get)
17351740
logInfo(
17361741
"Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")

0 commit comments

Comments
 (0)