Skip to content

Commit ea5c047

Browse files
committed
Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
2 parents dafebe2 + 3abd0c1 commit ea5c047

File tree

16 files changed

+613
-108
lines changed

16 files changed

+613
-108
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ private[spark] class Worker(
7272
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
7373

7474
val testing: Boolean = sys.props.contains("spark.testing")
75-
val masterLock: Object = new Object()
7675
var master: ActorSelection = null
7776
var masterAddress: Address = null
7877
var activeMasterUrl: String = ""
@@ -145,18 +144,16 @@ private[spark] class Worker(
145144
}
146145

147146
def changeMaster(url: String, uiUrl: String) {
148-
masterLock.synchronized {
149-
activeMasterUrl = url
150-
activeMasterWebUiUrl = uiUrl
151-
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
152-
masterAddress = activeMasterUrl match {
153-
case Master.sparkUrlRegex(_host, _port) =>
154-
Address("akka.tcp", Master.systemName, _host, _port.toInt)
155-
case x =>
156-
throw new SparkException("Invalid spark URL: " + x)
157-
}
158-
connected = true
147+
activeMasterUrl = url
148+
activeMasterWebUiUrl = uiUrl
149+
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
150+
masterAddress = activeMasterUrl match {
151+
case Master.sparkUrlRegex(_host, _port) =>
152+
Address("akka.tcp", Master.systemName, _host, _port.toInt)
153+
case x =>
154+
throw new SparkException("Invalid spark URL: " + x)
159155
}
156+
connected = true
160157
}
161158

162159
def tryRegisterAllMasters() {
@@ -199,9 +196,7 @@ private[spark] class Worker(
199196
}
200197

201198
case SendHeartbeat =>
202-
masterLock.synchronized {
203-
if (connected) { master ! Heartbeat(workerId) }
204-
}
199+
if (connected) { master ! Heartbeat(workerId) }
205200

206201
case WorkDirCleanup =>
207202
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
@@ -244,27 +239,21 @@ private[spark] class Worker(
244239
manager.start()
245240
coresUsed += cores_
246241
memoryUsed += memory_
247-
masterLock.synchronized {
248-
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
249-
}
242+
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
250243
} catch {
251244
case e: Exception => {
252245
logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
253246
if (executors.contains(appId + "/" + execId)) {
254247
executors(appId + "/" + execId).kill()
255248
executors -= appId + "/" + execId
256249
}
257-
masterLock.synchronized {
258-
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
259-
}
250+
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
260251
}
261252
}
262253
}
263254

264255
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
265-
masterLock.synchronized {
266-
master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
267-
}
256+
master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
268257
val fullId = appId + "/" + execId
269258
if (ExecutorState.isFinished(state)) {
270259
executors.get(fullId) match {
@@ -330,9 +319,7 @@ private[spark] class Worker(
330319
case _ =>
331320
logDebug(s"Driver $driverId changed state to $state")
332321
}
333-
masterLock.synchronized {
334-
master ! DriverStateChanged(driverId, state, exception)
335-
}
322+
master ! DriverStateChanged(driverId, state, exception)
336323
val driver = drivers.remove(driverId).get
337324
finishedDrivers(driverId) = driver
338325
memoryUsed -= driver.driverDesc.mem

core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,12 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
9595
* If the elements in RDD do not vary (max == min) always returns a single bucket.
9696
*/
9797
def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
98-
// Compute the minimum and the maxium
98+
// Scala's built-in range has issues. See #SI-8782
99+
def customRange(min: Double, max: Double, steps: Int): IndexedSeq[Double] = {
100+
val span = max - min
101+
Range.Int(0, steps, 1).map(s => min + (s * span) / steps) :+ max
102+
}
103+
// Compute the minimum and the maximum
99104
val (max: Double, min: Double) = self.mapPartitions { items =>
100105
Iterator(items.foldRight(Double.NegativeInfinity,
101106
Double.PositiveInfinity)((e: Double, x: Pair[Double, Double]) =>
@@ -107,9 +112,11 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
107112
throw new UnsupportedOperationException(
108113
"Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
109114
}
110-
val increment = (max-min)/bucketCount.toDouble
111-
val range = if (increment != 0) {
112-
Range.Double.inclusive(min, max, increment)
115+
val range = if (min != max) {
116+
// Range.Double.inclusive(min, max, increment)
117+
// The above code doesn't always work. See Scala bug #SI-8782.
118+
// https://issues.scala-lang.org/browse/SI-8782
119+
customRange(min, max, bucketCount)
113120
} else {
114121
List(min, min)
115122
}

core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,29 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
245245
assert(histogramBuckets === expectedHistogramBuckets)
246246
}
247247

248+
test("WorksWithoutBucketsForLargerDatasets") {
249+
// Verify the case of slighly larger datasets
250+
val rdd = sc.parallelize(6 to 99)
251+
val (histogramBuckets, histogramResults) = rdd.histogram(8)
252+
val expectedHistogramResults =
253+
Array(12, 12, 11, 12, 12, 11, 12, 12)
254+
val expectedHistogramBuckets =
255+
Array(6.0, 17.625, 29.25, 40.875, 52.5, 64.125, 75.75, 87.375, 99.0)
256+
assert(histogramResults === expectedHistogramResults)
257+
assert(histogramBuckets === expectedHistogramBuckets)
258+
}
259+
260+
test("WorksWithoutBucketsWithIrrationalBucketEdges") {
261+
// Verify the case of buckets with irrational edges. See #SPARK-2862.
262+
val rdd = sc.parallelize(6 to 99)
263+
val (histogramBuckets, histogramResults) = rdd.histogram(9)
264+
val expectedHistogramResults =
265+
Array(11, 10, 11, 10, 10, 11, 10, 10, 11)
266+
assert(histogramResults === expectedHistogramResults)
267+
assert(histogramBuckets(0) === 6.0)
268+
assert(histogramBuckets(9) === 99.0)
269+
}
270+
248271
// Test the failure mode with an invalid RDD
249272
test("ThrowsExceptionOnInvalidRDDs") {
250273
// infinity

docs/sql-programming-guide.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,11 @@ Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
605605

606606
You may also use the beeline script comes with Hive.
607607

608+
To set a [Fair Scheduler](job-scheduling.html#fair-scheduler-pools) pool for a JDBC client session,
609+
users can set the `spark.sql.thriftserver.scheduler.pool` variable:
610+
611+
SET spark.sql.thriftserver.scheduler.pool=accounting;
612+
608613
### Migration Guide for Shark Users
609614

610615
#### Reducer number

project/SparkBuild.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,6 @@ object SQL {
228228
object Hive {
229229

230230
lazy val settings = Seq(
231-
232231
javaOptions += "-XX:MaxPermSize=1g",
233232
// Multiple queries rely on the TestHive singleton. See comments there for more details.
234233
parallelExecution in Test := false,

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ private[spark] object SQLConf {
3232
val CODEGEN_ENABLED = "spark.sql.codegen"
3333
val DIALECT = "spark.sql.dialect"
3434
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
35+
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
36+
37+
// This is only used for the thriftserver
38+
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
3539

3640
object Deprecated {
3741
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,3 +303,15 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
303303
left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
304304
}
305305
}
306+
307+
/**
308+
* :: DeveloperApi ::
309+
* A plan node that does nothing but lie about the output of its child. Used to spice a
310+
* (hopefully structurally equivalent) tree from a different optimization sequence into an already
311+
* resolved tree.
312+
*/
313+
@DeveloperApi
314+
case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
315+
def children = child :: Nil
316+
def execute() = child.execute()
317+
}

0 commit comments

Comments
 (0)