-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24296][CORE] Replicate large blocks as a stream. #21451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
fe31a7d
6d059f2
034acb4
4664def
c45e702
44149a5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.network.shuffle.protocol; | ||
|
|
||
| import java.util.Arrays; | ||
|
|
||
| import com.google.common.base.Objects; | ||
| import io.netty.buffer.ByteBuf; | ||
|
|
||
| import org.apache.spark.network.protocol.Encoders; | ||
|
|
||
| // Needed by ScalaDoc. See SPARK-7726 | ||
| import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; | ||
|
|
||
| /** | ||
| * A request to Upload a block, which the destintation should receive as a stream. | ||
| * | ||
| * The actual block data is not contained here. It will be passed to the StreamCallbackWithID | ||
| * that is returned from RpcHandler.receiveStream() | ||
| */ | ||
| public class UploadBlockStream extends BlockTransferMessage { | ||
| public final String blockId; | ||
| public final byte[] metadata; | ||
|
|
||
| public UploadBlockStream(String blockId, byte[] metadata) { | ||
| this.blockId = blockId; | ||
| this.metadata = metadata; | ||
| } | ||
|
|
||
| @Override | ||
| protected Type type() { return Type.UPLOAD_BLOCK_STREAM; } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| int objectsHashCode = Objects.hashCode(blockId); | ||
| return objectsHashCode * 41 + Arrays.hashCode(metadata); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return Objects.toStringHelper(this) | ||
| .add("blockId", blockId) | ||
| .add("metadata size", metadata.length) | ||
| .toString(); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object other) { | ||
| if (other != null && other instanceof UploadBlockStream) { | ||
| UploadBlockStream o = (UploadBlockStream) other; | ||
| return Objects.equal(blockId, o.blockId) | ||
| && Arrays.equals(metadata, o.metadata); | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| public int encodedLength() { | ||
| return Encoders.Strings.encodedLength(blockId) | ||
| + Encoders.ByteArrays.encodedLength(metadata); | ||
| } | ||
|
|
||
| @Override | ||
| public void encode(ByteBuf buf) { | ||
| Encoders.Strings.encode(buf, blockId); | ||
| Encoders.ByteArrays.encode(buf, metadata); | ||
| } | ||
|
|
||
| public static UploadBlockStream decode(ByteBuf buf) { | ||
| String blockId = Encoders.Strings.decode(buf); | ||
| byte[] metadata = Encoders.ByteArrays.decode(buf); | ||
| return new UploadBlockStream(blockId, metadata); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -567,4 +567,10 @@ package object config { | |
| .intConf | ||
| .checkValue(v => v > 0, "The value should be a positive integer.") | ||
| .createWithDefault(2000) | ||
|
|
||
| private[spark] val MEMORY_MAP_LIMIT_FOR_TESTS = | ||
| ConfigBuilder("spark.storage.memoryMapLimitForTests") | ||
| .internal() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a .doc that says is for testing only |
||
| .bytesConf(ByteUnit.BYTE) | ||
| .createWithDefault(Int.MaxValue) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,9 +26,9 @@ import scala.reflect.ClassTag | |
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.network.BlockDataManager | ||
| import org.apache.spark.network.buffer.NioManagedBuffer | ||
| import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} | ||
| import org.apache.spark.network.client.{RpcResponseCallback, StreamCallbackWithID, TransportClient} | ||
| import org.apache.spark.network.server.{OneForOneStreamManager, RpcHandler, StreamManager} | ||
| import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, OpenBlocks, StreamHandle, UploadBlock} | ||
| import org.apache.spark.network.shuffle.protocol._ | ||
| import org.apache.spark.serializer.Serializer | ||
| import org.apache.spark.storage.{BlockId, StorageLevel} | ||
|
|
||
|
|
@@ -73,10 +73,32 @@ class NettyBlockRpcServer( | |
| } | ||
| val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData)) | ||
| val blockId = BlockId(uploadBlock.blockId) | ||
| logInfo(s"Receiving replicated block $blockId with level ${level} " + | ||
|
||
| s"from ${client.getSocketAddress}") | ||
| blockManager.putBlockData(blockId, data, level, classTag) | ||
| responseContext.onSuccess(ByteBuffer.allocate(0)) | ||
| } | ||
| } | ||
|
|
||
| override def receiveStream( | ||
| client: TransportClient, | ||
| messageHeader: ByteBuffer, | ||
| responseContext: RpcResponseCallback): StreamCallbackWithID = { | ||
|
||
| val message = | ||
| BlockTransferMessage.Decoder.fromByteBuffer(messageHeader).asInstanceOf[UploadBlockStream] | ||
| val (level: StorageLevel, classTag: ClassTag[_]) = { | ||
| serializer | ||
| .newInstance() | ||
| .deserialize(ByteBuffer.wrap(message.metadata)) | ||
| .asInstanceOf[(StorageLevel, ClassTag[_])] | ||
| } | ||
| val blockId = BlockId(message.blockId) | ||
| logInfo(s"Receiving replicated block $blockId with level ${level} as stream " + | ||
|
||
| s"from ${client.getSocketAddress}") | ||
| // This will return immediately, but will setup a callback on streamData which will still | ||
| // do all the processing in the netty thread. | ||
| blockManager.putBlockDataAsStream(blockId, level, classTag) | ||
| } | ||
|
|
||
| override def getStreamManager(): StreamManager = streamManager | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,7 @@ package org.apache.spark.storage | |
| import java.io._ | ||
| import java.lang.ref.{ReferenceQueue => JReferenceQueue, WeakReference} | ||
| import java.nio.ByteBuffer | ||
| import java.nio.channels.Channels | ||
| import java.nio.channels.{Channels, WritableByteChannel} | ||
| import java.util.Collections | ||
| import java.util.concurrent.ConcurrentHashMap | ||
|
|
||
|
|
@@ -41,6 +41,7 @@ import org.apache.spark.memory.{MemoryManager, MemoryMode} | |
| import org.apache.spark.metrics.source.Source | ||
| import org.apache.spark.network._ | ||
| import org.apache.spark.network.buffer.ManagedBuffer | ||
| import org.apache.spark.network.client.StreamCallbackWithID | ||
| import org.apache.spark.network.netty.SparkTransportConf | ||
| import org.apache.spark.network.shuffle.{ExternalShuffleClient, TempFileManager} | ||
| import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo | ||
|
|
@@ -404,6 +405,47 @@ private[spark] class BlockManager( | |
| putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)(classTag) | ||
| } | ||
|
|
||
| override def putBlockDataAsStream( | ||
| blockId: BlockId, | ||
| level: StorageLevel, | ||
| classTag: ClassTag[_]): StreamCallbackWithID = { | ||
| // TODO if we're going to only put the data in the disk store, we should just write it directly | ||
| // to the final location, but that would require a deeper refactor of this code. So instead | ||
| // we just write to a temp file, and call putBytes on the data in that file. | ||
| val tmpFile = diskBlockManager.createTempLocalBlock()._2 | ||
| new StreamCallbackWithID { | ||
| val channel: WritableByteChannel = Channels.newChannel(new FileOutputStream(tmpFile)) | ||
|
||
|
|
||
| override def getID: String = blockId.name | ||
|
|
||
| override def onData(streamId: String, buf: ByteBuffer): Unit = { | ||
| while (buf.hasRemaining) { | ||
| channel.write(buf) | ||
| } | ||
| } | ||
|
|
||
| override def onComplete(streamId: String): Unit = { | ||
| // Read the contents of the downloaded file as a buffer to put into the blockManager. | ||
| // Note this is all happening inside the netty thread as soon as it reads the end of the | ||
| // stream. | ||
| channel.close() | ||
| // TODO Even if we're only going to write the data to disk after this, we end up using a lot | ||
| // of memory here. We wont' get a jvm OOM, but might get killed by the OS / cluster | ||
|
||
| // manager. We could at least read the tmp file as a stream. | ||
| val buffer = ChunkedByteBuffer.map(tmpFile, | ||
| conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt) | ||
| putBytes(blockId, buffer, level)(classTag) | ||
| tmpFile.delete() | ||
| } | ||
|
|
||
| override def onFailure(streamId: String, cause: Throwable): Unit = { | ||
| // the framework handles the connection itself, we just need to do local cleanup | ||
| channel.close() | ||
| tmpFile.delete() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Get the BlockStatus for the block identified by the given ID, if it exists. | ||
| * NOTE: This is mainly for testing. | ||
|
|
@@ -665,7 +707,7 @@ private[spark] class BlockManager( | |
| // TODO if we change this method to return the ManagedBuffer, then getRemoteValues | ||
| // could just use the inputStream on the temp file, rather than memory-mapping the file. | ||
| // Until then, replication can cause the process to use too much memory and get killed | ||
| // by the OS / cluster manager (not a java OOM, since its a memory-mapped file) even though | ||
| // by the OS / cluster manager (not a java OOM, since it's a memory-mapped file) even though | ||
| // we've read the data to disk. | ||
| logDebug(s"Getting remote block $blockId") | ||
| require(blockId != null, "BlockId is null") | ||
|
|
@@ -1349,12 +1391,16 @@ private[spark] class BlockManager( | |
| try { | ||
| val onePeerStartTime = System.nanoTime | ||
| logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer") | ||
| // This thread keeps a lock on the block, so we do not want the netty thread to unlock | ||
| // block when it finishes sending the message. | ||
| val buffer = new BlockManagerManagedBuffer(blockInfoManager, blockId, data, false, | ||
| unlockOnDeallocate = false) | ||
| blockTransferService.uploadBlockSync( | ||
| peer.host, | ||
| peer.port, | ||
| peer.executorId, | ||
| blockId, | ||
| new BlockManagerManagedBuffer(blockInfoManager, blockId, data, false), | ||
| buffer, | ||
| tLevel, | ||
| classTag) | ||
| logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" + | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: spelling destination