Skip to content

Commit f02d3f8

Browse files
author
Marcelo Vanzin
committed
Merge branch 'master' into SPARK-2933
Conflicts: yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
2 parents f581122 + 7eb9cbc commit f02d3f8

39 files changed

+756
-86
lines changed

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 27 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717

1818
package org.apache.spark.broadcast
1919

20-
import java.io.{ByteArrayOutputStream, ByteArrayInputStream, InputStream,
21-
ObjectInputStream, ObjectOutputStream, OutputStream}
20+
import java.io._
2221

2322
import scala.reflect.ClassTag
2423
import scala.util.Random
@@ -53,10 +52,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](
5352

5453
private val broadcastId = BroadcastBlockId(id)
5554

56-
TorrentBroadcast.synchronized {
57-
SparkEnv.get.blockManager.putSingle(
58-
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
59-
}
55+
SparkEnv.get.blockManager.putSingle(
56+
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
6057

6158
@transient private var arrayOfBlocks: Array[TorrentBlock] = null
6259
@transient private var totalBlocks = -1
@@ -91,18 +88,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](
9188
// Store meta-info
9289
val metaId = BroadcastBlockId(id, "meta")
9390
val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
94-
TorrentBroadcast.synchronized {
95-
SparkEnv.get.blockManager.putSingle(
96-
metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
97-
}
91+
SparkEnv.get.blockManager.putSingle(
92+
metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
9893

9994
// Store individual pieces
10095
for (i <- 0 until totalBlocks) {
10196
val pieceId = BroadcastBlockId(id, "piece" + i)
102-
TorrentBroadcast.synchronized {
103-
SparkEnv.get.blockManager.putSingle(
104-
pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
105-
}
97+
SparkEnv.get.blockManager.putSingle(
98+
pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
10699
}
107100
}
108101

@@ -165,21 +158,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
165158
val metaId = BroadcastBlockId(id, "meta")
166159
var attemptId = 10
167160
while (attemptId > 0 && totalBlocks == -1) {
168-
TorrentBroadcast.synchronized {
169-
SparkEnv.get.blockManager.getSingle(metaId) match {
170-
case Some(x) =>
171-
val tInfo = x.asInstanceOf[TorrentInfo]
172-
totalBlocks = tInfo.totalBlocks
173-
totalBytes = tInfo.totalBytes
174-
arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
175-
hasBlocks = 0
176-
177-
case None =>
178-
Thread.sleep(500)
179-
}
161+
SparkEnv.get.blockManager.getSingle(metaId) match {
162+
case Some(x) =>
163+
val tInfo = x.asInstanceOf[TorrentInfo]
164+
totalBlocks = tInfo.totalBlocks
165+
totalBytes = tInfo.totalBytes
166+
arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
167+
hasBlocks = 0
168+
169+
case None =>
170+
Thread.sleep(500)
180171
}
181172
attemptId -= 1
182173
}
174+
183175
if (totalBlocks == -1) {
184176
return false
185177
}
@@ -192,17 +184,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](
192184
val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ + 1).toList)
193185
for (pid <- recvOrder) {
194186
val pieceId = BroadcastBlockId(id, "piece" + pid)
195-
TorrentBroadcast.synchronized {
196-
SparkEnv.get.blockManager.getSingle(pieceId) match {
197-
case Some(x) =>
198-
arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
199-
hasBlocks += 1
200-
SparkEnv.get.blockManager.putSingle(
201-
pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
187+
SparkEnv.get.blockManager.getSingle(pieceId) match {
188+
case Some(x) =>
189+
arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
190+
hasBlocks += 1
191+
SparkEnv.get.blockManager.putSingle(
192+
pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
202193

203-
case None =>
204-
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
205-
}
194+
case None =>
195+
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
206196
}
207197
}
208198

@@ -291,9 +281,7 @@ private[broadcast] object TorrentBroadcast extends Logging {
291281
* If removeFromDriver is true, also remove these persisted blocks on the driver.
292282
*/
293283
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
294-
synchronized {
295-
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
296-
}
284+
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
297285
}
298286
}
299287

dev/run-tests

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ if [ -n "$AMPLAB_JENKINS" ]; then
5858
diffs=`git diff --name-only master | grep "^sql/"`
5959
if [ -n "$diffs" ]; then
6060
echo "Detected changes in SQL. Will run Hive test suite."
61-
export _RUN_SQL_TESTS=true # exported for PySpark tests
61+
_RUN_SQL_TESTS=true
6262
fi
6363
fi
6464

@@ -89,13 +89,22 @@ echo "========================================================================="
8989
echo "Running Spark unit tests"
9090
echo "========================================================================="
9191

92+
# Build Spark; we always build with Hive because the PySpark SparkSQL tests need it.
93+
# echo "q" is needed because sbt on encountering a build file with failure
94+
# (either resolution or compilation) prompts the user for input either q, r,
95+
# etc to quit or retry. This echo is there to make it not block.
96+
BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver "
97+
echo -e "q\n" | sbt/sbt $BUILD_MVN_PROFILE_ARGS clean package assembly/assembly | \
98+
grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
99+
100+
# If the Spark SQL tests are enabled, run the tests with the Hive profiles enabled:
92101
if [ -n "$_RUN_SQL_TESTS" ]; then
93102
SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver"
94103
fi
95-
# echo "q" is needed because sbt on encountering a build file with failure
96-
# (either resolution or compilation) prompts the user for input either q, r,
104+
# echo "q" is needed because sbt on encountering a build file with failure
105+
# (either resolution or compilation) prompts the user for input either q, r,
97106
# etc to quit or retry. This echo is there to make it not block.
98-
echo -e "q\n" | sbt/sbt $SBT_MAVEN_PROFILES_ARGS clean package assembly/assembly test | \
107+
echo -e "q\n" | sbt/sbt $SBT_MAVEN_PROFILES_ARGS test | \
99108
grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
100109

101110
echo ""

docs/mllib-decision-tree.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ Section 9.2.4 in
8484
[Elements of Statistical Machine Learning](http://statweb.stanford.edu/~tibs/ElemStatLearn/) for
8585
details). For example, for a binary classification problem with one categorical feature with three
8686
categories A, B and C with corresponding proportion of label 1 as 0.2, 0.6 and 0.4, the categorical
87-
features are ordered as A followed by C followed B or A, B, C. The two split candidates are A \| C, B
88-
and A , B \| C where \| denotes the split. A similar heuristic is used for multiclass classification
87+
features are ordered as A followed by C followed B or A, C, B. The two split candidates are A \| C, B
88+
and A , C \| B where \| denotes the split. A similar heuristic is used for multiclass classification
8989
when `$2^(M-1)-1$` is greater than the number of bins -- the impurity for each categorical feature value
9090
is used for ordering.
9191

examples/src/main/python/als.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,5 @@ def update(i, vec, mat, ratings):
9797
error = rmse(R, ms, us)
9898
print "Iteration %d:" % i
9999
print "\nRMSE: %5.4f\n" % error
100+
101+
sc.stop()

examples/src/main/python/cassandra_inputformat.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,5 @@
7777
output = cass_rdd.collect()
7878
for (k, v) in output:
7979
print (k, v)
80+
81+
sc.stop()

examples/src/main/python/cassandra_outputformat.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,5 @@
8181
conf=conf,
8282
keyConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLKeyConverter",
8383
valueConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLValueConverter")
84+
85+
sc.stop()

examples/src/main/python/hbase_inputformat.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,5 @@
7171
output = hbase_rdd.collect()
7272
for (k, v) in output:
7373
print (k, v)
74+
75+
sc.stop()

examples/src/main/python/hbase_outputformat.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,5 @@
6363
conf=conf,
6464
keyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter",
6565
valueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter")
66+
67+
sc.stop()

examples/src/main/python/kmeans.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,5 @@ def closestPoint(p, centers):
7777
kPoints[x] = y
7878

7979
print "Final centers: " + str(kPoints)
80+
81+
sc.stop()

examples/src/main/python/logistic_regression.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,5 @@ def add(x, y):
8080
w -= points.map(lambda m: gradient(m, w)).reduce(add)
8181

8282
print "Final w: " + str(w)
83+
84+
sc.stop()

0 commit comments

Comments
 (0)