Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d815654
[SPARK-9720] [ML] Identifiable types need UID in toString methods
BertrandDechoux Sep 14, 2015
32407bf
[SPARK-9899] [SQL] log warning for direct output committer with specu…
cloud-fan Sep 14, 2015
cf2821e
[SPARK-10584] [DOC] [SQL] Documentation about spark.sql.hive.metastor…
sarutak Sep 14, 2015
ce6f3f1
[SPARK-10194] [MLLIB] [PYSPARK] SGD algorithms need convergenceTol pa…
yanboliang Sep 14, 2015
8a634e9
[SPARK-10573] [ML] IndexToString output schema should be StringType
Sep 14, 2015
7e32387
[SPARK-10522] [SQL] Nanoseconds of Timestamp in Parquet should be pos…
Sep 14, 2015
64f0415
[SPARK-6981] [SQL] Factor out SparkPlanner and QueryExecution from SQ…
evacchi Sep 14, 2015
217e496
[SPARK-9996] [SPARK-9997] [SQL] Add local expand and NestedLoopJoin o…
zsxwing Sep 14, 2015
16b6d18
[SPARK-10594] [YARN] Remove reference to --num-executors, add --prope…
erickt Sep 14, 2015
4e2242b
[SPARK-10576] [BUILD] Move .java files out of src/main/scala
srowen Sep 14, 2015
ffbbc2c
[SPARK-10549] scala 2.11 spark on yarn with security - Repl doesn't work
Sep 14, 2015
fd1e8cd
[SPARK-10543] [CORE] Peak Execution Memory Quantile should be Per-tas…
saurfang Sep 14, 2015
7b6c856
[SPARK-10564] ThreadingSuite: assertion failures in threads don't fai…
Sep 14, 2015
1a09552
[SPARK-9851] Support submitting map stages individually in DAGScheduler
mateiz Sep 15, 2015
5520418
[SPARK-10542] [PYSPARK] fix serialize namedtuple
Sep 15, 2015
4ae4d54
[SPARK-9793] [MLLIB] [PYSPARK] PySpark DenseVector, SparseVector impl…
yanboliang Sep 15, 2015
610971e
[SPARK-10273] Add @since annotation to pyspark.mllib.feature
noel-smith Sep 15, 2015
a224935
[SPARK-10275] [MLLIB] Add @since annotation to pyspark.mllib.random
yu-iskw Sep 15, 2015
833be73
Small fixes to docs
Sep 15, 2015
6503c4b
[SPARK-10598] [DOCS]
insidedctm Sep 15, 2015
09b7e7c
Update version to 1.6.0-SNAPSHOT.
rxin Sep 15, 2015
c35fdcb
[SPARK-10491] [MLLIB] move RowMatrix.dspr to BLAS
hhbyyh Sep 15, 2015
0f9a01d
resolve conflicts
mengxr Sep 15, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: SparkR
Type: Package
Title: R frontend for Spark
Version: 1.5.0
Version: 1.6.0
Date: 2013-09-09
Author: The Apache Software Foundation
Maintainer: Shivaram Venkataraman <[email protected]>
Expand Down
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.5.0-SNAPSHOT</version>
<version>1.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion bagel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.5.0-SNAPSHOT</version>
<version>1.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>1.5.0-SNAPSHOT</version>
<version>1.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
27 changes: 27 additions & 0 deletions core/src/main/scala/org/apache/spark/MapOutputStatistics.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

/**
* Holds statistics about the output sizes in a map stage. May become a DeveloperApi in the future.
*
* @param shuffleId ID of the shuffle
* @param bytesByPartitionId approximate number of output bytes for each map output partition
* (may be inexact due to use of compressed map statuses)
*/
private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])
49 changes: 38 additions & 11 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import java.io._
import java.util.Arrays
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

Expand Down Expand Up @@ -132,13 +133,43 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
* describing the shuffle blocks that are stored at that block manager.
*/
def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, reduce $reduceId")
val startTime = System.currentTimeMillis
val statuses = getStatuses(shuffleId)
// Synchronize on the returned array because, on the driver, it gets mutated in place
statuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
}
}

/**
* Return statistics about all of the outputs for a given shuffle.
*/
def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
val statuses = getStatuses(dep.shuffleId)
// Synchronize on the returned array because, on the driver, it gets mutated in place
statuses.synchronized {
val totalSizes = new Array[Long](dep.partitioner.numPartitions)
for (s <- statuses) {
for (i <- 0 until totalSizes.length) {
totalSizes(i) += s.getSizeForBlock(i)
}
}
new MapOutputStatistics(dep.shuffleId, totalSizes)
}
}

/**
* Get or fetch the array of MapStatuses for a given shuffle ID. NOTE: clients MUST synchronize
* on this array when reading it, because on the driver, we may be changing it in place.
*
* (It would be nice to remove this restriction in the future.)
*/
private def getStatuses(shuffleId: Int): Array[MapStatus] = {
val statuses = mapStatuses.get(shuffleId).orNull
if (statuses == null) {
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
val startTime = System.currentTimeMillis
var fetchedStatuses: Array[MapStatus] = null
fetching.synchronized {
// Someone else is fetching it; wait for them to be done
Expand All @@ -160,7 +191,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}

if (fetchedStatuses == null) {
// We won the race to fetch the output locs; do so
// We won the race to fetch the statuses; do so
logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
// This try-finally prevents hangs due to timeouts:
try {
Expand All @@ -175,22 +206,18 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}
}
}
logDebug(s"Fetching map output location for shuffle $shuffleId, reduce $reduceId took " +
logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
s"${System.currentTimeMillis - startTime} ms")

if (fetchedStatuses != null) {
fetchedStatuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
}
return fetchedStatuses
} else {
logError("Missing all output locations for shuffle " + shuffleId)
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
shuffleId, -1, "Missing all output locations for shuffle " + shuffleId)
}
} else {
statuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
}
return statuses
}
}

Expand Down
17 changes: 17 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1984,6 +1984,23 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
new SimpleFutureAction(waiter, resultFunc)
}

/**
* Submit a map stage for execution. This is currently an internal API only, but might be
* promoted to DeveloperApi in the future.
*/
private[spark] def submitMapStage[K, V, C](dependency: ShuffleDependency[K, V, C])
: SimpleFutureAction[MapOutputStatistics] = {
assertNotStopped()
val callSite = getCallSite()
var result: MapOutputStatistics = null
val waiter = dagScheduler.submitMapStage(
dependency,
(r: MapOutputStatistics) => { result = r },
callSite,
localProperties.get)
new SimpleFutureAction[MapOutputStatistics](waiter, result)
}

/**
* Cancel active jobs for the specified group. See [[org.apache.spark.SparkContext.setJobGroup]]
* for more information.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ package org.apache

package object spark {
// For package docs only
val SPARK_VERSION = "1.5.0-SNAPSHOT"
val SPARK_VERSION = "1.6.0-SNAPSHOT"
}
44 changes: 38 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
/**
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD.
*
* Note that, we should make sure our tasks are idempotent when speculation is enabled, i.e. do
* not use output committer that writes data directly.
* There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
* result of using direct output committer with speculation enabled.
*/
def saveAsHadoopFile(
path: String,
Expand All @@ -1030,10 +1035,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val hadoopConf = conf
hadoopConf.setOutputKeyClass(keyClass)
hadoopConf.setOutputValueClass(valueClass)
// Doesn't work in Scala 2.9 due to what may be a generics bug
// TODO: Should we uncomment this for Scala 2.10?
// conf.setOutputFormat(outputFormatClass)
hadoopConf.set("mapred.output.format.class", outputFormatClass.getName)
conf.setOutputFormat(outputFormatClass)
for (c <- codec) {
hadoopConf.setCompressMapOutput(true)
hadoopConf.set("mapred.output.compress", "true")
Expand All @@ -1047,6 +1049,19 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
}

// When speculation is on and output committer class name contains "Direct", we should warn
// users that they may loss data if they are using a direct output committer.
val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
val warningMessage =
s"$outputCommitterClass may be an output committer that writes data directly to " +
"the final location. Because speculation is enabled, this output committer may " +
"cause data loss (see the case in SPARK-10063). If possible, please use a output " +
"committer that does not have this behavior (e.g. FileOutputCommitter)."
logWarning(warningMessage)
}

FileOutputFormat.setOutputPath(hadoopConf,
SparkHadoopWriter.createPathFromString(path, hadoopConf))
saveAsHadoopDataset(hadoopConf)
Expand All @@ -1057,6 +1072,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Configuration object for that storage system. The Conf should set an OutputFormat and any
* output paths required (e.g. a table name to write to) in the same way as it would be
* configured for a Hadoop MapReduce job.
*
* Note that, we should make sure our tasks are idempotent when speculation is enabled, i.e. do
* not use output committer that writes data directly.
* There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
* result of using direct output committer with speculation enabled.
*/
def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
Expand Down Expand Up @@ -1115,6 +1135,20 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)

// When speculation is on and output committer class name contains "Direct", we should warn
// users that they may loss data if they are using a direct output committer.
val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
val outputCommitterClass = jobCommitter.getClass.getSimpleName
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
val warningMessage =
s"$outputCommitterClass may be an output committer that writes data directly to " +
"the final location. Because speculation is enabled, this output committer may " +
"cause data loss (see the case in SPARK-10063). If possible, please use a output " +
"committer that does not have this behavior (e.g. FileOutputCommitter)."
logWarning(warningMessage)
}

jobCommitter.setupJob(jobTaskContext)
self.context.runJob(self, writeShard)
jobCommitter.commitJob(jobTaskContext)
Expand All @@ -1129,7 +1163,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val wrappedConf = new SerializableConfiguration(hadoopConf)
val outputFormatInstance = hadoopConf.getOutputFormat
val keyClass = hadoopConf.getOutputKeyClass
val valueClass = hadoopConf.getOutputValueClass
Expand Down Expand Up @@ -1157,7 +1190,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.preSetup()

val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
val config = wrappedConf.value
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt
Expand Down
34 changes: 29 additions & 5 deletions core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,42 @@ import org.apache.spark.TaskContext
import org.apache.spark.util.CallSite

/**
* Tracks information about an active job in the DAGScheduler.
* A running job in the DAGScheduler. Jobs can be of two types: a result job, which computes a
* ResultStage to execute an action, or a map-stage job, which computes the map outputs for a
* ShuffleMapStage before any downstream stages are submitted. The latter is used for adaptive
* query planning, to look at map output statistics before submitting later stages. We distinguish
* between these two types of jobs using the finalStage field of this class.
*
* Jobs are only tracked for "leaf" stages that clients directly submitted, through DAGScheduler's
* submitJob or submitMapStage methods. However, either type of job may cause the execution of
* other earlier stages (for RDDs in the DAG it depends on), and multiple jobs may share some of
* these previous stages. These dependencies are managed inside DAGScheduler.
*
* @param jobId A unique ID for this job.
* @param finalStage The stage that this job computes (either a ResultStage for an action or a
* ShuffleMapStage for submitMapStage).
* @param callSite Where this job was initiated in the user's program (shown on UI).
* @param listener A listener to notify if tasks in this job finish or the job fails.
* @param properties Scheduling properties attached to the job, such as fair scheduler pool name.
*/
private[spark] class ActiveJob(
val jobId: Int,
val finalStage: ResultStage,
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],
val finalStage: Stage,
val callSite: CallSite,
val listener: JobListener,
val properties: Properties) {

val numPartitions = partitions.length
/**
* Number of partitions we need to compute for this job. Note that result stages may not need
* to compute all partitions in their target RDD, for actions like first() and lookup().
*/
val numPartitions = finalStage match {
case r: ResultStage => r.partitions.length
case m: ShuffleMapStage => m.rdd.partitions.length
}

/** Which partitions of the stage have finished */
val finished = Array.fill[Boolean](numPartitions)(false)

var numFinished = 0
}
Loading