Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
d6a3cbe
[MINOR][DOC] Add "completedStages" metircs for namespace=appStatus
Mar 17, 2019
4132c98
[MINOR][CORE] spark.diskStore.subDirectories <= 0 should throw Exception
lcqzte10192193 Mar 17, 2019
c324e1d
[SPARK-27122][CORE] Jetty classes must not be return via getters in o…
ajithme Mar 17, 2019
1bc481b
[SPARK-27070] Improve performance of DefaultPartitionCoalescer
fitermay Mar 17, 2019
fc88d3d
[SPARK-27164][CORE] RDD.countApprox on empty RDDs schedules jobs whic…
ajithme Mar 17, 2019
f9180f8
[SPARK-26979][PYTHON] Add missing string column name support for some…
amello-palantir Mar 17, 2019
dbcb479
[SPARK-27161][SQL] improve the document of SQL keywords
cloud-fan Mar 18, 2019
e348f14
[SPARK-26811][SQL] Add capabilities to v2.Table
rdblue Mar 18, 2019
7043aee
[SPARK-27112][CORE] : Create a resource ordering between threads to r…
Mar 18, 2019
5564fe5
[SPARK-27178][K8S] add nss to the spark/k8s Dockerfile
shaneknapp Mar 18, 2019
26e9849
[SPARK-27195][SQL][TEST] Add AvroReadSchemaSuite
dongjoon-hyun Mar 19, 2019
28d35c8
[SPARK-27162][SQL] Add new method asCaseSensitiveMap in CaseInsensiti…
gengliangwang Mar 19, 2019
a8af23d
[SPARK-27193][SQL] CodeFormatter should format multiple comment lines…
Ngone51 Mar 19, 2019
d5c08fc
[SPARK-26555][SQL] make ScalaReflection subtype checking thread safe
Mar 19, 2019
901c740
[SPARK-27161][SQL][FOLLOWUP] Drops non-keywords from docs/sql-keyword…
maropu Mar 19, 2019
e402de5
[SPARK-26176][SQL] Verify column names for CTAS with `STORED AS`
sujith71955 Mar 19, 2019
99c427b
[SPARK-27168][SQL][TEST] Add docker integration test for MsSql server
zhulipeng Mar 19, 2019
6783831
[SPARK-27179][BUILD] Exclude javax.ws.rs:jsr311-api from hadoop-client
wangyum Mar 19, 2019
8b0aa59
[SPARK-26288][CORE] add initRegisteredExecutorsDB
weixiuli Mar 19, 2019
c99463d
[SPARK-26979][PYTHON][FOLLOW-UP] Make binary math/string functions ta…
HyukjinKwon Mar 19, 2019
4d52477
[SPARK-27197][SQL][TEST] Add ReadNestedSchemaTest for file-based data…
dongjoon-hyun Mar 20, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFi
new ExternalShuffleBlockResolver(conf, registeredExecutorFile));
}

@VisibleForTesting
public ExternalShuffleBlockResolver getBlockResolver() {
return blockManager;
}

/** Enables mocking out the StreamManager and BlockManager. */
@VisibleForTesting
public ExternalShuffleBlockHandler(
Expand Down
40 changes: 40 additions & 0 deletions core/benchmarks/CoalescedRDDBenchmark-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
================================================================================================
Coalesced RDD , large scale
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 1.8.0_201-b09 on Windows 10 10.0
Intel64 Family 6 Model 63 Stepping 2, GenuineIntel
Coalesced RDD: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Coalesce Num Partitions: 100 Num Hosts: 1 346 364 24 0.3 3458.9 1.0X
Coalesce Num Partitions: 100 Num Hosts: 5 258 264 6 0.4 2579.0 1.3X
Coalesce Num Partitions: 100 Num Hosts: 10 242 249 7 0.4 2415.2 1.4X
Coalesce Num Partitions: 100 Num Hosts: 20 237 242 7 0.4 2371.7 1.5X
Coalesce Num Partitions: 100 Num Hosts: 40 230 231 1 0.4 2299.8 1.5X
Coalesce Num Partitions: 100 Num Hosts: 80 222 233 14 0.4 2223.0 1.6X
Coalesce Num Partitions: 500 Num Hosts: 1 659 665 5 0.2 6590.4 0.5X
Coalesce Num Partitions: 500 Num Hosts: 5 340 381 47 0.3 3395.2 1.0X
Coalesce Num Partitions: 500 Num Hosts: 10 279 307 47 0.4 2788.3 1.2X
Coalesce Num Partitions: 500 Num Hosts: 20 259 261 2 0.4 2591.9 1.3X
Coalesce Num Partitions: 500 Num Hosts: 40 241 250 15 0.4 2406.5 1.4X
Coalesce Num Partitions: 500 Num Hosts: 80 235 237 3 0.4 2349.9 1.5X
Coalesce Num Partitions: 1000 Num Hosts: 1 1050 1053 4 0.1 10503.2 0.3X
Coalesce Num Partitions: 1000 Num Hosts: 5 405 407 2 0.2 4049.5 0.9X
Coalesce Num Partitions: 1000 Num Hosts: 10 320 322 2 0.3 3202.7 1.1X
Coalesce Num Partitions: 1000 Num Hosts: 20 276 277 0 0.4 2762.3 1.3X
Coalesce Num Partitions: 1000 Num Hosts: 40 257 260 5 0.4 2571.2 1.3X
Coalesce Num Partitions: 1000 Num Hosts: 80 245 252 13 0.4 2448.9 1.4X
Coalesce Num Partitions: 5000 Num Hosts: 1 3099 3145 55 0.0 30988.6 0.1X
Coalesce Num Partitions: 5000 Num Hosts: 5 1037 1050 20 0.1 10374.4 0.3X
Coalesce Num Partitions: 5000 Num Hosts: 10 626 633 8 0.2 6261.8 0.6X
Coalesce Num Partitions: 5000 Num Hosts: 20 426 431 5 0.2 4258.6 0.8X
Coalesce Num Partitions: 5000 Num Hosts: 40 328 341 22 0.3 3275.4 1.1X
Coalesce Num Partitions: 5000 Num Hosts: 80 272 275 4 0.4 2721.4 1.3X
Coalesce Num Partitions: 10000 Num Hosts: 1 5516 5526 9 0.0 55156.8 0.1X
Coalesce Num Partitions: 10000 Num Hosts: 5 1956 1992 48 0.1 19560.9 0.2X
Coalesce Num Partitions: 10000 Num Hosts: 10 1045 1057 18 0.1 10447.4 0.3X
Coalesce Num Partitions: 10000 Num Hosts: 20 637 658 24 0.2 6373.2 0.5X
Coalesce Num Partitions: 10000 Num Hosts: 40 431 448 15 0.2 4312.9 0.8X
Coalesce Num Partitions: 10000 Num Hosts: 80 326 328 2 0.3 3263.4 1.1X


7 changes: 7 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,13 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-shuffle_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>

<!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.deploy

import java.io.File
import java.util.concurrent.CountDownLatch

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -49,6 +50,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
private val enabled = sparkConf.get(config.SHUFFLE_SERVICE_ENABLED)
private val port = sparkConf.get(config.SHUFFLE_SERVICE_PORT)

private val registeredExecutorsDB = "registeredExecutors.ldb"

private val transportConf =
SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
private val blockHandler = newShuffleBlockHandler(transportConf)
Expand All @@ -58,9 +61,29 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana

private val shuffleServiceSource = new ExternalShuffleServiceSource

protected def findRegisteredExecutorsDBFile(dbName: String): File = {
val localDirs = sparkConf.getOption("spark.local.dir").map(_.split(",")).getOrElse(Array())
if (localDirs.length >= 1) {
new File(localDirs.find(new File(_, dbName).exists()).getOrElse(localDirs(0)), dbName)
} else {
logWarning(s"'spark.local.dir' should be set first when we use db in " +
s"ExternalShuffleService. Note that this only affects standalone mode.")
null
}
}

/** Get blockhandler */
def getBlockHandler: ExternalShuffleBlockHandler = {
blockHandler
}

/** Create a new shuffle block handler. Factored out for subclasses to override. */
protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = {
new ExternalShuffleBlockHandler(conf, null)
if (sparkConf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && enabled) {
new ExternalShuffleBlockHandler(conf, findRegisteredExecutorsDBFile(registeredExecutorsDB))
} else {
new ExternalShuffleBlockHandler(conf, null)
}
}

/** Starts the external shuffle service if the user has configured us to. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,15 @@ private[deploy] class Worker(
}.foreach { dir =>
logInfo(s"Removing directory: ${dir.getPath}")
Utils.deleteRecursively(dir)

// Remove some registeredExecutors information of DB in external shuffle service when
// #spark.shuffle.service.db.enabled=true, the one which comes to mind is, what happens
// if an application is stopped while the external shuffle service is down?
// So then it'll leave an entry in the DB and the entry should be removed.
if (conf.get(config.SHUFFLE_SERVICE_DB_ENABLED) &&
conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
shuffleService.applicationRemoved(dir.getName)
}
}
}(cleanupThreadExecutor)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ package object config {
.doc("Number of subdirectories inside each path listed in spark.local.dir for " +
"hashing Block files into.")
.intConf
.checkValue(_ > 0, "The number of subdirectories must be positive.")
.createWithDefault(64)

private[spark] val BLOCK_FAILURES_BEFORE_LOCATION_REFRESH =
Expand Down Expand Up @@ -358,6 +359,13 @@ package object config {
private[spark] val SHUFFLE_SERVICE_ENABLED =
ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false)

private[spark] val SHUFFLE_SERVICE_DB_ENABLED =
ConfigBuilder("spark.shuffle.service.db.enabled")
.doc("Whether to use db in ExternalShuffleService. Note that this only affects " +
"standalone mode.")
.booleanConf
.createWithDefault(true)

private[spark] val SHUFFLE_SERVICE_PORT =
ConfigBuilder("spark.shuffle.service.port").intConf.createWithDefault(7337)

Expand Down
35 changes: 18 additions & 17 deletions core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private[spark] case class CoalescedRDDPartition(
val parentPreferredLocations = rdd.context.getPreferredLocs(rdd, p.index).map(_.host)
preferredLocation.exists(parentPreferredLocations.contains)
}
if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
if (parents.isEmpty) 0.0 else loc.toDouble / parents.size.toDouble
}
}

Expand Down Expand Up @@ -91,7 +91,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
pc.coalesce(maxPartitions, prev).zipWithIndex.map {
case (pg, i) =>
val ids = pg.partitions.map(_.index).toArray
new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
}
}

Expand All @@ -116,7 +116,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
/**
* Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition,
* then the preferred machine will be one which most parent splits prefer too.
* @param partition
* @param partition the partition for which to retrieve the preferred machine, if exists
* @return the machine most preferred by split
*/
override def getPreferredLocations(partition: Partition): Seq[String] = {
Expand Down Expand Up @@ -156,9 +156,12 @@ private[spark] class CoalescedRDD[T: ClassTag](

private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
extends PartitionCoalescer {
def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.numPartitions < o2.numPartitions
def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get)

implicit object partitionGroupOrdering extends Ordering[PartitionGroup] {
override def compare(o1: PartitionGroup, o2: PartitionGroup): Int =
java.lang.Integer.compare(o1.numPartitions, o2.numPartitions)
}


val rnd = new scala.util.Random(7919) // keep this class deterministic

Expand All @@ -178,7 +181,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
}

class PartitionLocations(prev: RDD[_]) {
private class PartitionLocations(prev: RDD[_]) {

// contains all the partitions from the previous RDD that don't have preferred locations
val partsWithoutLocs = ArrayBuffer[Partition]()
Expand Down Expand Up @@ -213,15 +216,14 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
}

/**
* Sorts and gets the least element of the list associated with key in groupHash
* Gets the least element of the list associated with key in groupHash
* The returned PartitionGroup is the least loaded of all groups that represent the machine "key"
*
* @param key string representing a partitioned group on preferred machine key
* @return Option of [[PartitionGroup]] that has least elements for key
*/
def getLeastGroupHash(key: String): Option[PartitionGroup] = {
groupHash.get(key).map(_.sortWith(compare).head)
}
def getLeastGroupHash(key: String): Option[PartitionGroup] =
groupHash.get(key).filter(_.nonEmpty).map(_.min)

def addPartToPGroup(part: Partition, pgroup: PartitionGroup): Boolean = {
if (!initialHash.contains(part)) {
Expand All @@ -236,12 +238,12 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
* is assigned a preferredLocation. This uses coupon collector to estimate how many
* preferredLocations it must rotate through until it has seen most of the preferred
* locations (2 * n log(n))
* @param targetLen
* @param targetLen The number of desired partition groups
*/
def setupGroups(targetLen: Int, partitionLocs: PartitionLocations) {
// deal with empty case, just create targetLen partition groups with no preferred location
if (partitionLocs.partsWithLocs.isEmpty) {
(1 to targetLen).foreach(x => groupArr += new PartitionGroup())
(1 to targetLen).foreach(_ => groupArr += new PartitionGroup())
return
}

Expand Down Expand Up @@ -297,9 +299,8 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
partitionLocs: PartitionLocations): PartitionGroup = {
val slack = (balanceSlack * prev.partitions.length).toInt
// least loaded pref locs
val pref = currPrefLocs(p, prev).map(getLeastGroupHash(_)).sortWith(compare)
val prefPart = if (pref == Nil) None else pref.head

val pref = currPrefLocs(p, prev).flatMap(getLeastGroupHash)
val prefPart = if (pref.isEmpty) None else Some(pref.min)
val r1 = rnd.nextInt(groupArr.size)
val r2 = rnd.nextInt(groupArr.size)
val minPowerOfTwo = {
Expand Down Expand Up @@ -351,7 +352,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
val partIter = partitionLocs.partsWithLocs.iterator
groupArr.filter(pg => pg.numPartitions == 0).foreach { pg =>
while (partIter.hasNext && pg.numPartitions == 0) {
var (nxt_replica, nxt_part) = partIter.next()
var (_, nxt_part) = partIter.next()
if (!initialHash.contains(nxt_part)) {
pg.partitions += nxt_part
initialHash += nxt_part
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -767,10 +767,17 @@ private[spark] class DAGScheduler(
callSite: CallSite,
timeout: Long,
properties: Properties): PartialResult[R] = {
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.partitions.length).toArray
val jobId = nextJobId.getAndIncrement()
if (partitions.isEmpty) {
// Return immediately if the job is running 0 tasks
val time = clock.getTimeMillis()
listenerBus.post(SparkListenerJobStart(jobId, time, Seq[StageInfo](), properties))
listenerBus.post(SparkListenerJobEnd(jobId, time, JobSucceeded))
return new PartialResult(evaluator.currentResult(), true)
}
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions, callSite, listener, SerializationUtils.clone(properties)))
listener.awaitResult() // Will throw an exception if the job fails
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on all executors
private def makeOffers() {
// Make sure no executor is killed while some task is launching on it
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
val taskDescs = withLock {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map {
Expand All @@ -284,7 +284,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on just one executor
private def makeOffers(executorId: String) {
// Make sure no executor is killed while some task is launching on it
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
val taskDescs = withLock {
// Filter out executors under killing
if (executorIsAlive(executorId)) {
val executorData = executorDataMap(executorId)
Expand Down Expand Up @@ -631,7 +631,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
force: Boolean): Seq[String] = {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")

val response = synchronized {
val response = withLock {
val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
unknownExecutors.foreach { id =>
logWarning(s"Executor to kill $id does not exist!")
Expand Down Expand Up @@ -730,6 +730,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

protected def currentDelegationTokens: Array[Byte] = delegationTokens.get()

// SPARK-27112: We need to ensure that there is ordering of lock acquisition
// between TaskSchedulerImpl and CoarseGrainedSchedulerBackend objects in order to fix
// the deadlock issue exposed in SPARK-27112
private def withLock[T](fn: => T): T = scheduler.synchronized {
CoarseGrainedSchedulerBackend.this.synchronized { fn }
}

}

private[spark] object CoarseGrainedSchedulerBackend {
Expand Down
47 changes: 45 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.spark.ui

import javax.servlet.http.HttpServletRequest
import java.util.EnumSet
import javax.servlet.DispatcherType
import javax.servlet.http.{HttpServlet, HttpServletRequest}

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.xml.Node

import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.servlet.{FilterHolder, FilterMapping, ServletContextHandler, ServletHolder}
import org.json4s.JsonAST.{JNothing, JValue}

import org.apache.spark.{SecurityManager, SparkConf, SSLOptions}
Expand Down Expand Up @@ -59,6 +61,10 @@ private[spark] abstract class WebUI(
def getTabs: Seq[WebUITab] = tabs
def getHandlers: Seq[ServletContextHandler] = handlers

def getDelegatingHandlers: Seq[DelegatingServletContextHandler] = {
handlers.map(new DelegatingServletContextHandler(_))
}

/** Attaches a tab to this UI, along with all of its attached pages. */
def attachTab(tab: WebUITab): Unit = {
tab.pages.foreach(attachPage)
Expand Down Expand Up @@ -95,6 +101,14 @@ private[spark] abstract class WebUI(
serverInfo.foreach(_.addHandler(handler, securityManager))
}

/** Attaches a handler to this UI. */
def attachHandler(contextPath: String, httpServlet: HttpServlet, pathSpec: String): Unit = {
val ctx = new ServletContextHandler()
ctx.setContextPath(contextPath)
ctx.addServlet(new ServletHolder(httpServlet), pathSpec)
attachHandler(ctx)
}

/** Detaches a handler from this UI. */
def detachHandler(handler: ServletContextHandler): Unit = synchronized {
handlers -= handler
Expand Down Expand Up @@ -193,3 +207,32 @@ private[spark] abstract class WebUIPage(var prefix: String) {
def render(request: HttpServletRequest): Seq[Node]
def renderJson(request: HttpServletRequest): JValue = JNothing
}

private[spark] class DelegatingServletContextHandler(handler: ServletContextHandler) {

def prependFilterMapping(
filterName: String,
spec: String,
types: EnumSet[DispatcherType]): Unit = {
val mapping = new FilterMapping()
mapping.setFilterName(filterName)
mapping.setPathSpec(spec)
mapping.setDispatcherTypes(types)
handler.getServletHandler.prependFilterMapping(mapping)
}

def addFilter(
filterName: String,
className: String,
filterParams: Map[String, String]): Unit = {
val filterHolder = new FilterHolder()
filterHolder.setName(filterName)
filterHolder.setClassName(className)
filterParams.foreach { case (k, v) => filterHolder.setInitParameter(k, v) }
handler.getServletHandler.addFilter(filterHolder)
}

def filterCount(): Int = {
handler.getServletHandler.getFilters.length
}
}
Loading