Skip to content

Commit 42fb5fa

Browse files
committed
Merge branch 'master' into pypy
2 parents cb2d724 + 27df6ce commit 42fb5fa

File tree

26 files changed

+236
-336
lines changed

26 files changed

+236
-336
lines changed

core/src/main/scala/org/apache/spark/io/CompressionCodec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ private[spark] object CompressionCodec {
6464
}
6565

6666
val DEFAULT_COMPRESSION_CODEC = "snappy"
67+
val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
6768
}
6869

6970

core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala

Lines changed: 2 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,8 @@ import org.apache.spark.serializer.Serializer
3333
import org.apache.spark.util.Utils
3434

3535
/**
36-
* A block fetcher iterator interface. There are two implementations:
37-
*
38-
* BasicBlockFetcherIterator: uses a custom-built NIO communication layer.
39-
* NettyBlockFetcherIterator: uses Netty (OIO) as the communication layer.
40-
*
41-
* Eventually we would like the two to converge and use a single NIO-based communication layer,
42-
* but extensive tests show that under some circumstances (e.g. large shuffles with lots of cores),
43-
* NIO would perform poorly and thus the need for the Netty OIO one.
36+
* A block fetcher iterator interface for fetching shuffle blocks.
4437
*/
45-
4638
private[storage]
4739
trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
4840
def initialize()
@@ -204,11 +196,8 @@ object BlockFetcherIterator {
204196
// any memory that might exceed our maxBytesInFlight
205197
for (id <- localBlocksToFetch) {
206198
try {
207-
// getLocalFromDisk never return None but throws BlockException
208-
val iter = getLocalFromDisk(id, serializer).get
209-
// Pass 0 as size since it's not in flight
210199
readMetrics.localBlocksFetched += 1
211-
results.put(new FetchResult(id, 0, () => iter))
200+
results.put(new FetchResult(id, 0, () => getLocalFromDisk(id, serializer).get))
212201
logDebug("Got local block " + id)
213202
} catch {
214203
case e: Exception => {
@@ -262,67 +251,4 @@ object BlockFetcherIterator {
262251
}
263252
}
264253
// End of BasicBlockFetcherIterator
265-
266-
class NettyBlockFetcherIterator(
267-
blockManager: BlockManager,
268-
blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
269-
serializer: Serializer,
270-
readMetrics: ShuffleReadMetrics)
271-
extends BasicBlockFetcherIterator(blockManager, blocksByAddress, serializer, readMetrics) {
272-
273-
override protected def sendRequest(req: FetchRequest) {
274-
logDebug("Sending request for %d blocks (%s) from %s".format(
275-
req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))
276-
val cmId = new ConnectionManagerId(req.address.host, req.address.port)
277-
278-
bytesInFlight += req.size
279-
val sizeMap = req.blocks.toMap // so we can look up the size of each blockID
280-
281-
// This could throw a TimeoutException. In that case we will just retry the task.
282-
val client = blockManager.nettyBlockClientFactory.createClient(
283-
cmId.host, req.address.nettyPort)
284-
val blocks = req.blocks.map(_._1.toString)
285-
286-
client.fetchBlocks(
287-
blocks,
288-
new BlockClientListener {
289-
override def onFetchFailure(blockId: String, errorMsg: String): Unit = {
290-
logError(s"Could not get block(s) from $cmId with error: $errorMsg")
291-
for ((blockId, size) <- req.blocks) {
292-
results.put(new FetchResult(blockId, -1, null))
293-
}
294-
}
295-
296-
override def onFetchSuccess(blockId: String, data: ReferenceCountedBuffer): Unit = {
297-
// Increment the reference count so the buffer won't be recycled.
298-
// TODO: This could result in memory leaks when the task is stopped due to exception
299-
// before the iterator is exhausted.
300-
data.retain()
301-
val buf = data.byteBuffer()
302-
val blockSize = buf.remaining()
303-
val bid = BlockId(blockId)
304-
305-
// TODO: remove code duplication between here and BlockManager.dataDeserialization.
306-
results.put(new FetchResult(bid, sizeMap(bid), () => {
307-
def createIterator: Iterator[Any] = {
308-
val stream = blockManager.wrapForCompression(bid, data.inputStream())
309-
serializer.newInstance().deserializeStream(stream).asIterator
310-
}
311-
new LazyInitIterator(createIterator) {
312-
// Release the buffer when we are done traversing it.
313-
override def close(): Unit = data.release()
314-
}
315-
}))
316-
317-
readMetrics.synchronized {
318-
readMetrics.remoteBytesRead += blockSize
319-
readMetrics.remoteBlocksFetched += 1
320-
}
321-
logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
322-
}
323-
}
324-
)
325-
}
326-
}
327-
// End of NettyBlockFetcherIterator
328254
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 5 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ import org.apache.spark._
3232
import org.apache.spark.executor._
3333
import org.apache.spark.io.CompressionCodec
3434
import org.apache.spark.network._
35-
import org.apache.spark.network.netty.client.BlockFetchingClientFactory
36-
import org.apache.spark.network.netty.server.BlockServer
3735
import org.apache.spark.serializer.Serializer
3836
import org.apache.spark.shuffle.ShuffleManager
3937
import org.apache.spark.util._
@@ -90,27 +88,8 @@ private[spark] class BlockManager(
9088
new TachyonStore(this, tachyonBlockManager)
9189
}
9290

93-
private val useNetty = conf.getBoolean("spark.shuffle.use.netty", false)
94-
95-
// If we use Netty for shuffle, start a new Netty-based shuffle sender service.
96-
private[storage] val nettyBlockClientFactory: BlockFetchingClientFactory = {
97-
if (useNetty) new BlockFetchingClientFactory(conf) else null
98-
}
99-
100-
private val nettyBlockServer: BlockServer = {
101-
if (useNetty) {
102-
val server = new BlockServer(conf, this)
103-
logInfo(s"Created NettyBlockServer binding to port: ${server.port}")
104-
server
105-
} else {
106-
null
107-
}
108-
}
109-
110-
private val nettyPort: Int = if (useNetty) nettyBlockServer.port else 0
111-
11291
val blockManagerId = BlockManagerId(
113-
executorId, connectionManager.id.host, connectionManager.id.port, nettyPort)
92+
executorId, connectionManager.id.host, connectionManager.id.port)
11493

11594
// Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
11695
// for receiving shuffle outputs)
@@ -572,14 +551,8 @@ private[spark] class BlockManager(
572551
blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
573552
serializer: Serializer,
574553
readMetrics: ShuffleReadMetrics): BlockFetcherIterator = {
575-
val iter =
576-
if (conf.getBoolean("spark.shuffle.use.netty", false)) {
577-
new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer,
578-
readMetrics)
579-
} else {
580-
new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer,
581-
readMetrics)
582-
}
554+
val iter = new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer,
555+
readMetrics)
583556
iter.initialize()
584557
iter
585558
}
@@ -1066,40 +1039,14 @@ private[spark] class BlockManager(
10661039
bytes: ByteBuffer,
10671040
serializer: Serializer = defaultSerializer): Iterator[Any] = {
10681041
bytes.rewind()
1069-
1070-
def getIterator: Iterator[Any] = {
1071-
val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
1072-
serializer.newInstance().deserializeStream(stream).asIterator
1073-
}
1074-
1075-
if (blockId.isShuffle) {
1076-
/* Reducer may need to read many local shuffle blocks and will wrap them into Iterators
1077-
* at the beginning. The wrapping will cost some memory (compression instance
1078-
* initialization, etc.). Reducer reads shuffle blocks one by one so we could do the
1079-
* wrapping lazily to save memory. */
1080-
class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] {
1081-
lazy val proxy = f
1082-
override def hasNext: Boolean = proxy.hasNext
1083-
override def next(): Any = proxy.next()
1084-
}
1085-
new LazyProxyIterator(getIterator)
1086-
} else {
1087-
getIterator
1088-
}
1042+
val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
1043+
serializer.newInstance().deserializeStream(stream).asIterator
10891044
}
10901045

10911046
def stop(): Unit = {
10921047
connectionManager.stop()
10931048
shuffleBlockManager.stop()
10941049
diskBlockManager.stop()
1095-
1096-
if (nettyBlockClientFactory != null) {
1097-
nettyBlockClientFactory.stop()
1098-
}
1099-
if (nettyBlockServer != null) {
1100-
nettyBlockServer.stop()
1101-
}
1102-
11031050
actorSystem.stop(slaveActor)
11041051
blockInfo.clear()
11051052
memoryStore.clear()

core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,10 @@ import org.apache.spark.util.Utils
3636
class BlockManagerId private (
3737
private var executorId_ : String,
3838
private var host_ : String,
39-
private var port_ : Int,
40-
private var nettyPort_ : Int
39+
private var port_ : Int
4140
) extends Externalizable {
4241

43-
private def this() = this(null, null, 0, 0) // For deserialization only
42+
private def this() = this(null, null, 0) // For deserialization only
4443

4544
def executorId: String = executorId_
4645

@@ -60,32 +59,28 @@ class BlockManagerId private (
6059

6160
def port: Int = port_
6261

63-
def nettyPort: Int = nettyPort_
64-
6562
override def writeExternal(out: ObjectOutput) {
6663
out.writeUTF(executorId_)
6764
out.writeUTF(host_)
6865
out.writeInt(port_)
69-
out.writeInt(nettyPort_)
7066
}
7167

7268
override def readExternal(in: ObjectInput) {
7369
executorId_ = in.readUTF()
7470
host_ = in.readUTF()
7571
port_ = in.readInt()
76-
nettyPort_ = in.readInt()
7772
}
7873

7974
@throws(classOf[IOException])
8075
private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this)
8176

82-
override def toString = "BlockManagerId(%s, %s, %d, %d)".format(executorId, host, port, nettyPort)
77+
override def toString = s"BlockManagerId($executorId, $host, $port)"
8378

84-
override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port + nettyPort
79+
override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port
8580

8681
override def equals(that: Any) = that match {
8782
case id: BlockManagerId =>
88-
executorId == id.executorId && port == id.port && host == id.host && nettyPort == id.nettyPort
83+
executorId == id.executorId && port == id.port && host == id.host
8984
case _ =>
9085
false
9186
}
@@ -100,11 +95,10 @@ private[spark] object BlockManagerId {
10095
* @param execId ID of the executor.
10196
* @param host Host name of the block manager.
10297
* @param port Port of the block manager.
103-
* @param nettyPort Optional port for the Netty-based shuffle sender.
10498
* @return A new [[org.apache.spark.storage.BlockManagerId]].
10599
*/
106-
def apply(execId: String, host: String, port: Int, nettyPort: Int) =
107-
getCachedBlockManagerId(new BlockManagerId(execId, host, port, nettyPort))
100+
def apply(execId: String, host: String, port: Int) =
101+
getCachedBlockManagerId(new BlockManagerId(execId, host, port))
108102

109103
def apply(in: ObjectInput) = {
110104
val obj = new BlockManagerId()

core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,6 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
6565

6666
/**
6767
* BlockObjectWriter which writes directly to a file on disk. Appends to the given file.
68-
* The given write metrics will be updated incrementally, but will not necessarily be current until
69-
* commitAndClose is called.
7068
*/
7169
private[spark] class DiskBlockObjectWriter(
7270
blockId: BlockId,
@@ -75,6 +73,8 @@ private[spark] class DiskBlockObjectWriter(
7573
bufferSize: Int,
7674
compressStream: OutputStream => OutputStream,
7775
syncWrites: Boolean,
76+
// These write metrics concurrently shared with other active BlockObjectWriter's who
77+
// are themselves performing writes. All updates must be relative.
7878
writeMetrics: ShuffleWriteMetrics)
7979
extends BlockObjectWriter(blockId)
8080
with Logging
@@ -94,14 +94,30 @@ private[spark] class DiskBlockObjectWriter(
9494
private var fos: FileOutputStream = null
9595
private var ts: TimeTrackingOutputStream = null
9696
private var objOut: SerializationStream = null
97+
private var initialized = false
98+
99+
/**
100+
* Cursors used to represent positions in the file.
101+
*
102+
* xxxxxxxx|--------|--- |
103+
* ^ ^ ^
104+
* | | finalPosition
105+
* | reportedPosition
106+
* initialPosition
107+
*
108+
* initialPosition: Offset in the file where we start writing. Immutable.
109+
* reportedPosition: Position at the time of the last update to the write metrics.
110+
* finalPosition: Offset where we stopped writing. Set on closeAndCommit() then never changed.
111+
* -----: Current writes to the underlying file.
112+
* xxxxx: Existing contents of the file.
113+
*/
97114
private val initialPosition = file.length()
98115
private var finalPosition: Long = -1
99-
private var initialized = false
116+
private var reportedPosition = initialPosition
100117

101118
/** Calling channel.position() to update the write metrics can be a little bit expensive, so we
102119
* only call it every N writes */
103120
private var writesSinceMetricsUpdate = 0
104-
private var lastPosition = initialPosition
105121

106122
override def open(): BlockObjectWriter = {
107123
fos = new FileOutputStream(file, true)
@@ -140,17 +156,18 @@ private[spark] class DiskBlockObjectWriter(
140156
// serializer stream and the lower level stream.
141157
objOut.flush()
142158
bs.flush()
143-
updateBytesWritten()
144159
close()
145160
}
146161
finalPosition = file.length()
162+
// In certain compression codecs, more bytes are written after close() is called
163+
writeMetrics.shuffleBytesWritten += (finalPosition - reportedPosition)
147164
}
148165

149166
// Discard current writes. We do this by flushing the outstanding writes and then
150167
// truncating the file to its initial position.
151168
override def revertPartialWritesAndClose() {
152169
try {
153-
writeMetrics.shuffleBytesWritten -= (lastPosition - initialPosition)
170+
writeMetrics.shuffleBytesWritten -= (reportedPosition - initialPosition)
154171

155172
if (initialized) {
156173
objOut.flush()
@@ -189,10 +206,14 @@ private[spark] class DiskBlockObjectWriter(
189206
new FileSegment(file, initialPosition, finalPosition - initialPosition)
190207
}
191208

209+
/**
210+
* Report the number of bytes written in this writer's shuffle write metrics.
211+
* Note that this is only valid before the underlying streams are closed.
212+
*/
192213
private def updateBytesWritten() {
193214
val pos = channel.position()
194-
writeMetrics.shuffleBytesWritten += (pos - lastPosition)
195-
lastPosition = pos
215+
writeMetrics.shuffleBytesWritten += (pos - reportedPosition)
216+
reportedPosition = pos
196217
}
197218

198219
private def callWithTiming(f: => Unit) = {

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,7 @@ private[spark] object JsonProtocol {
295295
def blockManagerIdToJson(blockManagerId: BlockManagerId): JValue = {
296296
("Executor ID" -> blockManagerId.executorId) ~
297297
("Host" -> blockManagerId.host) ~
298-
("Port" -> blockManagerId.port) ~
299-
("Netty Port" -> blockManagerId.nettyPort)
298+
("Port" -> blockManagerId.port)
300299
}
301300

302301
def jobResultToJson(jobResult: JobResult): JValue = {
@@ -644,8 +643,7 @@ private[spark] object JsonProtocol {
644643
val executorId = (json \ "Executor ID").extract[String]
645644
val host = (json \ "Host").extract[String]
646645
val port = (json \ "Port").extract[Int]
647-
val nettyPort = (json \ "Netty Port").extract[Int]
648-
BlockManagerId(executorId, host, port, nettyPort)
646+
BlockManagerId(executorId, host, port)
649647
}
650648

651649
def jobResultFromJson(json: JValue): JobResult = {

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,12 @@ class ExternalAppendOnlyMap[K, V, C](
413413
extends Iterator[(K, C)]
414414
{
415415
private val batchOffsets = batchSizes.scanLeft(0L)(_ + _) // Size will be batchSize.length + 1
416-
assert(file.length() == batchOffsets(batchOffsets.length - 1))
416+
assert(file.length() == batchOffsets.last,
417+
"File length is not equal to the last batch offset:\n" +
418+
s" file length = ${file.length}\n" +
419+
s" last batch offset = ${batchOffsets.last}\n" +
420+
s" all batch offsets = ${batchOffsets.mkString(",")}"
421+
)
417422

418423
private var batchIndex = 0 // Which batch we're in
419424
private var fileStream: FileInputStream = null

0 commit comments

Comments
 (0)