From f05fe0b89d39bcdfc777ff0a3eb600e5e82191fa Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Mon, 21 Jan 2019 18:22:17 -0800 Subject: [PATCH 1/3] compiles ` --- .../shuffle/api/ShufflePartitionReader.java | 5 ++++- .../external/ExternalShuffleDataIO.java | 5 +---- .../ExternalShufflePartitionReader.java | 21 ++++++++++++------- .../external/ExternalShuffleReadSupport.java | 13 +----------- .../shuffle/BlockStoreShuffleReader.scala | 10 ++++++--- .../apache/spark/SplitFilesShuffleIO.scala | 2 +- 6 files changed, 27 insertions(+), 29 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShufflePartitionReader.java b/core/src/main/java/org/apache/spark/shuffle/api/ShufflePartitionReader.java index 59eae0a782200..817d213cd8cc1 100644 --- a/core/src/main/java/org/apache/spark/shuffle/api/ShufflePartitionReader.java +++ b/core/src/main/java/org/apache/spark/shuffle/api/ShufflePartitionReader.java @@ -17,9 +17,12 @@ package org.apache.spark.shuffle.api; +import org.apache.spark.storage.ShuffleLocation; + import java.io.InputStream; +import java.util.Optional; public interface ShufflePartitionReader { - InputStream fetchPartition(int reduceId); + InputStream fetchPartition(int reduceId, Optional shuffleLocation); } diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleDataIO.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleDataIO.java index ac20d13de6f2c..2a0a39e4b82e4 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleDataIO.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleDataIO.java @@ -21,7 +21,6 @@ public class ExternalShuffleDataIO implements ShuffleDataIO { private static SecurityManager securityManager; private static String hostname; private static int port; - private static MapOutputTracker mapOutputTracker; public ExternalShuffleDataIO( SparkConf sparkConf) { @@ -37,15 +36,13 @@ public void initialize() { securityManager = env.securityManager(); hostname = blockManager.getRandomShuffleHost(); port = blockManager.getRandomShufflePort(); - mapOutputTracker = env.mapOutputTracker(); // TODO: Register Driver and Executor } @Override public ShuffleReadSupport readSupport() { return new ExternalShuffleReadSupport( - conf, context, securityManager.isAuthenticationEnabled(), - securityManager, mapOutputTracker); + conf, context, securityManager.isAuthenticationEnabled(), securityManager); } @Override diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionReader.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionReader.java index 8aefac239e97f..f027832639250 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionReader.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionReader.java @@ -4,14 +4,17 @@ import org.apache.spark.network.client.TransportClientFactory; import org.apache.spark.network.shuffle.protocol.OpenShufflePartition; import org.apache.spark.shuffle.api.ShufflePartitionReader; +import org.apache.spark.storage.ShuffleLocation; import org.apache.spark.util.ByteBufferInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.compat.java8.OptionConverters; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Optional; public class ExternalShufflePartitionReader implements ShufflePartitionReader { @@ -19,34 +22,36 @@ public class ExternalShufflePartitionReader implements ShufflePartitionReader { LoggerFactory.getLogger(ExternalShufflePartitionReader.class); private final TransportClientFactory clientFactory; - private final String hostName; - private final int port; private final String appId; private final int shuffleId; private final int mapId; public ExternalShufflePartitionReader( TransportClientFactory clientFactory, - String hostName, - int port, String appId, int shuffleId, int mapId) { this.clientFactory = clientFactory; - this.hostName = hostName; - this.port = port; this.appId = appId; this.shuffleId = shuffleId; this.mapId = mapId; } @Override - public InputStream fetchPartition(int reduceId) { + public InputStream fetchPartition(int reduceId, Optional shuffleLocation) { + assert shuffleLocation.isPresent() && shuffleLocation.get() instanceof ExternalShuffleLocation; + ExternalShuffleLocation externalShuffleLocation = (ExternalShuffleLocation) shuffleLocation.get(); + logger.info(String.format("Found external shuffle location on node: %s:%d", + externalShuffleLocation.getShuffleHostname(), + externalShuffleLocation.getShufflePort())); + String hostname = externalShuffleLocation.getShuffleHostname(); + int port = externalShuffleLocation.getShufflePort(); + OpenShufflePartition openMessage = new OpenShufflePartition(appId, shuffleId, mapId, reduceId); TransportClient client = null; try { - client = clientFactory.createUnmanagedClient(hostName, port); + client = clientFactory.createUnmanagedClient(hostname, port); String requestID = String.format( "read-%s-%d-%d-%d", appId, shuffleId, mapId, reduceId); client.setClientId(requestID); diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleReadSupport.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleReadSupport.java index 9e7ff55f47741..a671b80904ed0 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleReadSupport.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleReadSupport.java @@ -26,19 +26,16 @@ public class ExternalShuffleReadSupport implements ShuffleReadSupport { private final TransportContext context; private final boolean authEnabled; private final SecretKeyHolder secretKeyHolder; - private final MapOutputTracker mapOutputTracker; public ExternalShuffleReadSupport( TransportConf conf, TransportContext context, boolean authEnabled, - SecretKeyHolder secretKeyHolder, - MapOutputTracker mapOutputTracker) { + SecretKeyHolder secretKeyHolder) { this.conf = conf; this.context = context; this.authEnabled = authEnabled; this.secretKeyHolder = secretKeyHolder; - this.mapOutputTracker = mapOutputTracker; } @Override @@ -48,17 +45,9 @@ public ShufflePartitionReader newPartitionReader(String appId, int shuffleId, in if (authEnabled) { bootstraps.add(new AuthClientBootstrap(conf, appId, secretKeyHolder)); } - Optional maybeShuffleLocation = OptionConverters.toJava(mapOutputTracker.getShuffleLocation(shuffleId, mapId, 0)); - assert maybeShuffleLocation.isPresent(); - ExternalShuffleLocation externalShuffleLocation = (ExternalShuffleLocation) maybeShuffleLocation.get(); - logger.info(String.format("Found external shuffle location on node: %s:%d", - externalShuffleLocation.getShuffleHostname(), - externalShuffleLocation.getShufflePort())); TransportClientFactory clientFactory = context.createClientFactory(bootstraps); try { return new ExternalShufflePartitionReader(clientFactory, - externalShuffleLocation.getShuffleHostname(), - externalShuffleLocation.getShufflePort(), appId, shuffleId, mapId); diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 0974c91392744..70c76d5948153 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -17,8 +17,10 @@ package org.apache.spark.shuffle +import scala.compat.java8.OptionConverters + import org.apache.spark._ -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{Logging, config} import org.apache.spark.serializer.SerializerManager import org.apache.spark.shuffle.api.ShuffleReadSupport import org.apache.spark.storage._ @@ -54,10 +56,12 @@ private[spark] class BlockStoreShuffleReader[K, C]( blockIds.map { case blockId@ShuffleBlockId(_, _, reduceId) => (blockId, serializerManager.wrapStream(blockId, - reader.fetchPartition(reduceId))) + reader.fetchPartition(reduceId, OptionConverters.toJava( + mapOutputTracker.getShuffleLocation(handle.shuffleId, mapId, reduceId))))) case dataBlockId@ShuffleDataBlockId(_, _, reduceId) => (dataBlockId, serializerManager.wrapStream(dataBlockId, - reader.fetchPartition(reduceId))) + reader.fetchPartition(reduceId, OptionConverters.toJava( + mapOutputTracker.getShuffleLocation(handle.shuffleId, mapId, reduceId))))) case invalid => throw new IllegalArgumentException(s"Invalid block id $invalid") } diff --git a/core/src/test/scala/org/apache/spark/SplitFilesShuffleIO.scala b/core/src/test/scala/org/apache/spark/SplitFilesShuffleIO.scala index 097d1e406dc04..579fc9a45ba9b 100644 --- a/core/src/test/scala/org/apache/spark/SplitFilesShuffleIO.scala +++ b/core/src/test/scala/org/apache/spark/SplitFilesShuffleIO.scala @@ -33,7 +33,7 @@ class SplitFilesShuffleIO(conf: SparkConf) extends ShuffleDataIO { override def initialize(): Unit = {} override def readSupport(): ShuffleReadSupport = (appId: String, shuffleId: Int, mapId: Int) => { - reduceId: Int => { + (reduceId: Int, shuffleLocation: Optional[ShuffleLocation]) => { new FileInputStream(resolvePartitionFile(appId, shuffleId, mapId, reduceId)) } } From aa4bbfad3eed7d5626d08a56e13cac31b964f785 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Tue, 22 Jan 2019 21:48:42 -0800 Subject: [PATCH 2/3] fix UnsafeShuffleWriter --- .../spark/shuffle/sort/UnsafeShuffleWriter.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 9eddfc924404e..97f34bf460495 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -548,6 +548,10 @@ private CommittedPartition[] mergeSpillsWithPluggableWriter( ShufflePartitionWriter writer = mapOutputWriter.newPartitionWriter(partition); try { try (OutputStream partitionOutput = writer.openPartitionStream()) { + OutputStream partitionOutputStream = partitionOutput; + if (compressionCodec != null) { + partitionOutputStream = compressionCodec.compressedOutputStream(partitionOutput); + } for (int i = 0; i < spills.length; i++) { final long partitionLengthInSpill = spills[i].partitionLengths[partition]; if (partitionLengthInSpill > 0) { @@ -560,7 +564,7 @@ private CommittedPartition[] mergeSpillsWithPluggableWriter( partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream); } - Utils.copyStream(partitionInputStream, partitionOutput, false, false); + Utils.copyStream(partitionInputStream, partitionOutputStream, false, false); } finally { partitionInputStream.close(); } @@ -621,7 +625,11 @@ private CommittedPartition[] writeSingleSpillFileUsingPluggableWriter( partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream); } try (OutputStream partitionOutput = writer.openPartitionStream()) { - Utils.copyStream(partitionInputStream, partitionOutput, false, false); + OutputStream partitionOutputStream = partitionOutput; + if (compressionCodec != null) { + partitionOutputStream = compressionCodec.compressedOutputStream(partitionOutput); + } + Utils.copyStream(partitionInputStream, partitionOutputStream, false, false); } } catch (Exception e) { try { @@ -637,6 +645,7 @@ private CommittedPartition[] writeSingleSpillFileUsingPluggableWriter( writeMetrics.incBytesWritten(committedPartitions[partition].length()); } threwException = false; + mapOutputWriter.commitAllPartitions(); } catch (Exception e) { try { mapOutputWriter.abort(e); From f08be9871eac7d4be6673ed66fee2ed05a02d331 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Tue, 22 Jan 2019 21:51:14 -0800 Subject: [PATCH 3/3] remove unnecessary changes --- .../spark/shuffle/external/ExternalShufflePartitionReader.java | 1 - .../org/apache/spark/shuffle/BlockStoreShuffleReader.scala | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionReader.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionReader.java index 1f7aeacd00351..10f1b71008472 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionReader.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionReader.java @@ -8,7 +8,6 @@ import org.apache.spark.util.ByteBufferInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.compat.java8.OptionConverters; import java.io.ByteArrayInputStream; import java.io.InputStream; diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 70c76d5948153..caeecedc5d36e 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -20,7 +20,7 @@ package org.apache.spark.shuffle import scala.compat.java8.OptionConverters import org.apache.spark._ -import org.apache.spark.internal.{Logging, config} +import org.apache.spark.internal.{config, Logging} import org.apache.spark.serializer.SerializerManager import org.apache.spark.shuffle.api.ShuffleReadSupport import org.apache.spark.storage._