Skip to content

Commit 2e58dbd

Browse files
committed
Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
2 parents b8b7db4 + c2048a5 commit 2e58dbd

File tree

194 files changed

+1728
-398
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

194 files changed

+1728
-398
lines changed

README.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
# Apache Spark
22

3-
Lightning-Fast Cluster Computing - <http://spark.apache.org/>
3+
Spark is a fast and general cluster computing system for Big Data. It provides
4+
high-level APIs in Scala, Java, and Python, and an optimized engine that
5+
supports general computation graphs for data analysis. It also supports a
6+
rich set of higher-level tools including Spark SQL for SQL and structured
7+
data processing, MLLib for machine learning, GraphX for graph processing,
8+
and Spark Streaming.
9+
10+
<http://spark.apache.org/>
411

512

613
## Online Documentation
@@ -81,7 +88,7 @@ versions without YARN, use:
8188
$ sbt/sbt -Dhadoop.version=2.0.0-mr1-cdh4.2.0 assembly
8289

8390
For Apache Hadoop 2.2.X, 2.1.X, 2.0.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
84-
with YARN, also set `SPARK_YARN=true`:
91+
with YARN, also set `-Pyarn`:
8592

8693
# Apache Hadoop 2.0.5-alpha
8794
$ sbt/sbt -Dhadoop.version=2.0.5-alpha -Pyarn assembly

core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@
114114
<groupId>org.xerial.snappy</groupId>
115115
<artifactId>snappy-java</artifactId>
116116
</dependency>
117+
<dependency>
118+
<groupId>net.jpountz.lz4</groupId>
119+
<artifactId>lz4</artifactId>
120+
</dependency>
117121
<dependency>
118122
<groupId>com.twitter</groupId>
119123
<artifactId>chill_${scala.binary.version}</artifactId>

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1531,7 +1531,16 @@ object SparkContext extends Logging {
15311531
throw new SparkException("YARN mode not available ?", e)
15321532
}
15331533
}
1534-
val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
1534+
val backend = try {
1535+
val clazz =
1536+
Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
1537+
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
1538+
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
1539+
} catch {
1540+
case e: Exception => {
1541+
throw new SparkException("YARN mode not available ?", e)
1542+
}
1543+
}
15351544
scheduler.initialize(backend)
15361545
scheduler
15371546

core/src/main/scala/org/apache/spark/TestUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ private[spark] object TestUtils {
9292
def createCompiledClass(className: String, destDir: File, value: String = ""): File = {
9393
val compiler = ToolProvider.getSystemJavaCompiler
9494
val sourceFile = new JavaSourceFromString(className,
95-
"public class " + className + " { @Override public String toString() { " +
96-
"return \"" + value + "\";}}")
95+
"public class " + className + " implements java.io.Serializable {" +
96+
" @Override public String toString() { return \"" + value + "\"; }}")
9797

9898
// Calling this outputs a class file in pwd. It's easier to just rename the file than
9999
// build a custom FileManager that controls the output location.

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ private[spark] class Master(
5757
def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
5858
val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
5959
val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
60+
val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
6061
val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
6162
val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
6263
val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
@@ -741,6 +742,10 @@ private[spark] class Master(
741742
case Some(driver) =>
742743
logInfo(s"Removing driver: $driverId")
743744
drivers -= driver
745+
if (completedDrivers.size >= RETAINED_DRIVERS) {
746+
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
747+
completedDrivers.trimStart(toRemove)
748+
}
744749
completedDrivers += driver
745750
persistenceEngine.removeDriver(driver)
746751
driver.state = finalState

core/src/main/scala/org/apache/spark/io/CompressionCodec.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.io
2020
import java.io.{InputStream, OutputStream}
2121

2222
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
23+
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
2324
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
2425

2526
import org.apache.spark.SparkConf
@@ -59,6 +60,27 @@ private[spark] object CompressionCodec {
5960
}
6061

6162

63+
/**
64+
* :: DeveloperApi ::
65+
* LZ4 implementation of [[org.apache.spark.io.CompressionCodec]].
66+
* Block size can be configured by `spark.io.compression.lz4.block.size`.
67+
*
68+
* Note: The wire protocol for this codec is not guaranteed to be compatible across versions
69+
* of Spark. This is intended for use as an internal compression utility within a single Spark
70+
* application.
71+
*/
72+
@DeveloperApi
73+
class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {
74+
75+
override def compressedOutputStream(s: OutputStream): OutputStream = {
76+
val blockSize = conf.getInt("spark.io.compression.lz4.block.size", 32768)
77+
new LZ4BlockOutputStream(s, blockSize)
78+
}
79+
80+
override def compressedInputStream(s: InputStream): InputStream = new LZ4BlockInputStream(s)
81+
}
82+
83+
6284
/**
6385
* :: DeveloperApi ::
6486
* LZF implementation of [[org.apache.spark.io.CompressionCodec]].

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -353,9 +353,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
353353
* Group the values for each key in the RDD into a single sequence. Allows controlling the
354354
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
355355
*
356-
* Note: If you are grouping in order to perform an aggregation (such as a sum or average) over
357-
* each key, using [[PairRDDFunctions.reduceByKey]] or [[PairRDDFunctions.combineByKey]]
358-
* will provide much better performance.
356+
* Note: This operation may be very expensive. If you are grouping in order to perform an
357+
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
358+
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
359359
*/
360360
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = {
361361
// groupByKey shouldn't use map side combine because map side combine does not
@@ -373,9 +373,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
373373
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
374374
* resulting RDD with into `numPartitions` partitions.
375375
*
376-
* Note: If you are grouping in order to perform an aggregation (such as a sum or average) over
377-
* each key, using [[PairRDDFunctions.reduceByKey]] or [[PairRDDFunctions.combineByKey]]
378-
* will provide much better performance.
376+
* Note: This operation may be very expensive. If you are grouping in order to perform an
377+
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
378+
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
379379
*/
380380
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = {
381381
groupByKey(new HashPartitioner(numPartitions))
@@ -462,9 +462,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
462462
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
463463
* resulting RDD with the existing partitioner/parallelism level.
464464
*
465-
* Note: If you are grouping in order to perform an aggregation (such as a sum or average) over
466-
* each key, using [[PairRDDFunctions.reduceByKey]] or [[PairRDDFunctions.combineByKey]]
467-
* will provide much better performance,
465+
* Note: This operation may be very expensive. If you are grouping in order to perform an
466+
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
467+
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
468468
*/
469469
def groupByKey(): RDD[(K, Iterable[V])] = {
470470
groupByKey(defaultPartitioner(self))

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,20 +509,32 @@ abstract class RDD[T: ClassTag](
509509
/**
510510
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
511511
* mapping to that key.
512+
*
513+
* Note: This operation may be very expensive. If you are grouping in order to perform an
514+
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
515+
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
512516
*/
513517
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
514518
groupBy[K](f, defaultPartitioner(this))
515519

516520
/**
517521
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
518522
* mapping to that key.
523+
*
524+
* Note: This operation may be very expensive. If you are grouping in order to perform an
525+
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
526+
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
519527
*/
520528
def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
521529
groupBy(f, new HashPartitioner(numPartitions))
522530

523531
/**
524532
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
525533
* mapping to that key.
534+
*
535+
* Note: This operation may be very expensive. If you are grouping in order to perform an
536+
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
537+
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
526538
*/
527539
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
528540
: RDD[(K, Iterable[T])] = {

core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,5 @@ private[spark] trait SchedulerBackend {
3030

3131
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
3232
throw new UnsupportedOperationException
33+
def isReady(): Boolean = true
3334
}

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,10 @@ private[spark] class TaskSchedulerImpl(
145145
}
146146
}
147147

148+
override def postStartHook() {
149+
waitBackendReady()
150+
}
151+
148152
override def submitTasks(taskSet: TaskSet) {
149153
val tasks = taskSet.tasks
150154
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
@@ -437,6 +441,17 @@ private[spark] class TaskSchedulerImpl(
437441

438442
// By default, rack is unknown
439443
def getRackForHost(value: String): Option[String] = None
444+
445+
private def waitBackendReady(): Unit = {
446+
if (backend.isReady) {
447+
return
448+
}
449+
while (!backend.isReady) {
450+
synchronized {
451+
this.wait(100)
452+
}
453+
}
454+
}
440455
}
441456

442457

0 commit comments

Comments
 (0)