Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
864d1cd
initial API
yifeih Mar 20, 2019
c88751c
wip
yifeih Mar 21, 2019
9af216f
wip
yifeih Mar 22, 2019
a35b826
initial implementation of reader
yifeih Mar 26, 2019
14c47ae
fix based on comments
yifeih Mar 26, 2019
5bb4c32
fix java lang import and delete unneeded class
yifeih Mar 26, 2019
584e6c8
address initial comments
yifeih Mar 27, 2019
0292fe2
fix unit tests
yifeih Mar 27, 2019
71c2cc7
java checkstyle
yifeih Mar 27, 2019
43c377c
fix tests
yifeih Mar 27, 2019
9fc6a60
address some comments
yifeih Mar 28, 2019
45172a5
blah
yifeih Mar 28, 2019
4e5652b
address more comments
yifeih Mar 28, 2019
a35d8fe
Use decorators to customize how the read metrics reporter is instanti…
mccheah Apr 1, 2019
1a09ebe
blah
yifeih Apr 2, 2019
c149d24
initial tests
yifeih Apr 2, 2019
672d473
Revert "initial tests"
yifeih Apr 3, 2019
e0a3289
initial impl
yifeih Apr 3, 2019
1e89b3f
get shuffle reader tests to pass
yifeih Apr 3, 2019
495c7bd
update
yifeih Apr 3, 2019
88a03cb
tests
yifeih Apr 3, 2019
741deed
style
yifeih Apr 3, 2019
76c0381
Merge branch 'spark-25299' into yh/reader-api
yifeih Apr 3, 2019
c7c52b0
hook up executor components
yifeih Apr 4, 2019
897c0bf
fix compile
yifeih Apr 4, 2019
34eaaf6
remove unnecessary fields
yifeih Apr 4, 2019
0548800
remove unused
yifeih Apr 4, 2019
0637e70
refactor retrying iterator
yifeih Apr 4, 2019
f069dc1
remove unused import
yifeih Apr 4, 2019
0bba677
fix some comments
yifeih Apr 5, 2019
a82a725
null check
yifeih Apr 5, 2019
ac392a1
refactor interface
yifeih Apr 5, 2019
53dd94b
refactor API
yifeih Apr 5, 2019
4c0c791
shuffle iterator style
yifeih Apr 5, 2019
84f7931
add some javadocs for interfaces
yifeih Apr 5, 2019
b59efb5
attach apache headers
yifeih Apr 5, 2019
aba8a94
remove unused imports
yifeih Apr 5, 2019
5ef59b6
remove another import
yifeih Apr 5, 2019
49a1901
fix reader
yifeih Apr 5, 2019
8c6c09c
fix imports
yifeih Apr 5, 2019
6370b41
add exception comment for retry API
yifeih Apr 10, 2019
c442b63
address some comments
yifeih Apr 10, 2019
2c1272a
address comments
yifeih Apr 10, 2019
2758a5c
Merge branch 'spark-25299' into yh/reader-api
yifeih Apr 19, 2019
bd349ca
resolve conflicts
yifeih Apr 19, 2019
653f67c
style
yifeih Apr 19, 2019
9f53839
address some comments
yifeih Apr 19, 2019
94275fd
style
yifeih Apr 20, 2019
26e97c1
refactor API
yifeih Apr 20, 2019
91db776
cleanup
yifeih Apr 20, 2019
f0fa7b8
fix tests and style
yifeih Apr 22, 2019
50c8fc3
style
yifeih Apr 22, 2019
4aa4b6e
reorder result for test?
yifeih Apr 22, 2019
7d23f47
wip
yifeih Apr 26, 2019
363d4ab
address comments
yifeih Apr 29, 2019
bb7fa4c
style
yifeih Apr 29, 2019
711109b
cleanup tests
yifeih Apr 29, 2019
04a135c
Remove unused class
mccheah Apr 30, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.shuffle;

import org.apache.spark.api.java.Optional;

import java.util.Objects;

/**
* :: Experimental ::
* An object defining the shuffle block and length metadata associated with the block.
* @since 3.0.0
*/
public class ShuffleBlockInfo {
private final int shuffleId;
private final int mapId;
private final int reduceId;
private final long length;
private final Optional<ShuffleLocation> shuffleLocation;

public ShuffleBlockInfo(int shuffleId, int mapId, int reduceId, long length,
Optional<ShuffleLocation> shuffleLocation) {
this.shuffleId = shuffleId;
this.mapId = mapId;
this.reduceId = reduceId;
this.length = length;
this.shuffleLocation = shuffleLocation;
}

public int getShuffleId() {
return shuffleId;
}

public int getMapId() {
return mapId;
}

public int getReduceId() {
return reduceId;
}

public long getLength() {
return length;
}

public Optional<ShuffleLocation> getShuffleLocation() {
return shuffleLocation;
}

@Override
public boolean equals(Object other) {
return other instanceof ShuffleBlockInfo
&& shuffleId == ((ShuffleBlockInfo) other).shuffleId
&& mapId == ((ShuffleBlockInfo) other).mapId
&& reduceId == ((ShuffleBlockInfo) other).reduceId
&& length == ((ShuffleBlockInfo) other).length
&& Objects.equals(shuffleLocation, ((ShuffleBlockInfo) other).shuffleLocation);
}

@Override
public int hashCode() {
return Objects.hash(shuffleId, mapId, reduceId, length, shuffleLocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ public interface ShuffleExecutorComponents {
void initializeExecutor(String appId, String execId);

ShuffleWriteSupport writes();

ShuffleReadSupport reads();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,4 @@
* Marker interface representing a location of a shuffle block. Implementations of shuffle readers
* and writers are expected to cast this down to an implementation-specific representation.
*/
public interface ShuffleLocation {
}
public interface ShuffleLocation {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.shuffle;

import org.apache.spark.annotation.Experimental;

import java.io.IOException;
import java.io.InputStream;

/**
* :: Experimental ::
* An interface for reading shuffle records.
* @since 3.0.0
*/
@Experimental
public interface ShuffleReadSupport {
/**
* Returns an underlying {@link Iterable<InputStream>} that will iterate
* through shuffle data, given an iterable for the shuffle blocks to fetch.
*/
Iterable<InputStream> getPartitionReaders(Iterable<ShuffleBlockInfo> blockMetadata)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,24 @@

package org.apache.spark.shuffle.sort.io;

import org.apache.spark.MapOutputTracker;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.api.shuffle.ShuffleExecutorComponents;
import org.apache.spark.api.shuffle.ShuffleReadSupport;
import org.apache.spark.api.shuffle.ShuffleWriteSupport;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.shuffle.io.DefaultShuffleReadSupport;
import org.apache.spark.storage.BlockManager;

public class DefaultShuffleExecutorComponents implements ShuffleExecutorComponents {

private final SparkConf sparkConf;
private BlockManager blockManager;
private IndexShuffleBlockResolver blockResolver;
private MapOutputTracker mapOutputTracker;
private SerializerManager serializerManager;

public DefaultShuffleExecutorComponents(SparkConf sparkConf) {
this.sparkConf = sparkConf;
Expand All @@ -37,15 +43,30 @@ public DefaultShuffleExecutorComponents(SparkConf sparkConf) {
@Override
public void initializeExecutor(String appId, String execId) {
blockManager = SparkEnv.get().blockManager();
mapOutputTracker = SparkEnv.get().mapOutputTracker();
serializerManager = SparkEnv.get().serializerManager();
blockResolver = new IndexShuffleBlockResolver(sparkConf, blockManager);
}

@Override
public ShuffleWriteSupport writes() {
checkInitialized();
return new DefaultShuffleWriteSupport(sparkConf, blockResolver, blockManager.shuffleServerId());
}

@Override
public ShuffleReadSupport reads() {
checkInitialized();
return new DefaultShuffleReadSupport(blockManager,
mapOutputTracker,
serializerManager,
sparkConf);
}

private void checkInitialized() {
if (blockResolver == null) {
throw new IllegalStateException(
"Executor components must be initialized before getting writers.");
"Executor components must be initialized before getting writers.");
}
return new DefaultShuffleWriteSupport(sparkConf, blockResolver, blockManager.shuffleServerId());
}
}
21 changes: 13 additions & 8 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging

// For testing
def getMapSizesByShuffleLocation(shuffleId: Int, reduceId: Int)
: Iterator[(ShuffleLocation, Seq[(BlockId, Long)])] = {
: Iterator[(Option[ShuffleLocation], Seq[(BlockId, Long)])] = {
getMapSizesByShuffleLocation(shuffleId, reduceId, reduceId + 1)
}

Expand All @@ -297,7 +297,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
* describing the shuffle blocks that are stored at that block manager.
*/
def getMapSizesByShuffleLocation(shuffleId: Int, startPartition: Int, endPartition: Int)
: Iterator[(ShuffleLocation, Seq[(BlockId, Long)])]
: Iterator[(Option[ShuffleLocation], Seq[(BlockId, Long)])]

/**
* Deletes map output status information for the specified shuffle stage.
Expand Down Expand Up @@ -647,7 +647,7 @@ private[spark] class MapOutputTrackerMaster(
// Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result.
// This method is only called in local-mode.
def getMapSizesByShuffleLocation(shuffleId: Int, startPartition: Int, endPartition: Int)
: Iterator[(ShuffleLocation, Seq[(BlockId, Long)])] = {
: Iterator[(Option[ShuffleLocation], Seq[(BlockId, Long)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
shuffleStatuses.get(shuffleId) match {
case Some (shuffleStatus) =>
Expand Down Expand Up @@ -684,7 +684,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr

// Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result.
override def getMapSizesByShuffleLocation(shuffleId: Int, startPartition: Int, endPartition: Int)
: Iterator[(ShuffleLocation, Seq[(BlockId, Long)])] = {
: Iterator[(Option[ShuffleLocation], Seq[(BlockId, Long)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
val statuses = getStatuses(shuffleId)
try {
Expand Down Expand Up @@ -873,9 +873,9 @@ private[spark] object MapOutputTracker extends Logging {
shuffleId: Int,
startPartition: Int,
endPartition: Int,
statuses: Array[MapStatus]): Iterator[(ShuffleLocation, Seq[(BlockId, Long)])] = {
statuses: Array[MapStatus]): Iterator[(Option[ShuffleLocation], Seq[(BlockId, Long)])] = {
assert (statuses != null)
val splitsByAddress = new HashMap[ShuffleLocation, ListBuffer[(BlockId, Long)]]
val splitsByAddress = new HashMap[Option[ShuffleLocation], ListBuffer[(BlockId, Long)]]
for ((status, mapId) <- statuses.iterator.zipWithIndex) {
if (status == null) {
val errorMessage = s"Missing an output location for shuffle $shuffleId"
Expand All @@ -885,9 +885,14 @@ private[spark] object MapOutputTracker extends Logging {
for (part <- startPartition until endPartition) {
val size = status.getSizeForBlock(part)
if (size != 0) {
val shuffleLoc = status.mapShuffleLocations.getLocationForBlock(part)
splitsByAddress.getOrElseUpdate(shuffleLoc, ListBuffer()) +=
if (status.mapShuffleLocations == null) {
splitsByAddress.getOrElseUpdate(Option.empty, ListBuffer()) +=
((ShuffleBlockId(shuffleId, mapId, part), size))
} else {
val shuffleLoc = status.mapShuffleLocations.getLocationForBlock(part)
splitsByAddress.getOrElseUpdate(Option.apply(shuffleLoc), ListBuffer()) +=
((ShuffleBlockId(shuffleId, mapId, part), size))
}
}
}
}
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class TaskMetrics private[spark] () extends Serializable {
private val _diskBytesSpilled = new LongAccumulator
private val _peakExecutionMemory = new LongAccumulator
private val _updatedBlockStatuses = new CollectionAccumulator[(BlockId, BlockStatus)]
private var _decorFunc: TempShuffleReadMetrics => TempShuffleReadMetrics =
Predef.identity[TempShuffleReadMetrics]

/**
* Time taken on the executor to deserialize this task.
Expand Down Expand Up @@ -187,11 +189,17 @@ class TaskMetrics private[spark] () extends Serializable {
* be lost.
*/
private[spark] def createTempShuffleReadMetrics(): TempShuffleReadMetrics = synchronized {
val readMetrics = new TempShuffleReadMetrics
tempShuffleReadMetrics += readMetrics
val tempShuffleMetrics = new TempShuffleReadMetrics
val readMetrics = _decorFunc(tempShuffleMetrics)
tempShuffleReadMetrics += tempShuffleMetrics
readMetrics
}

private[spark] def decorateTempShuffleReadMetrics(
decorFunc: TempShuffleReadMetrics => TempShuffleReadMetrics): Unit = synchronized {
_decorFunc = decorFunc
}

/**
* Merge values across all temporary [[ShuffleReadMetrics]] into `_shuffleReadMetrics`.
* This is expected to be called on executor heartbeat and at the end of a task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@

package org.apache.spark.shuffle

import java.io.InputStream

import scala.collection.JavaConverters._

import org.apache.spark._
import org.apache.spark.api.java.Optional
import org.apache.spark.api.shuffle.{ShuffleBlockInfo, ShuffleReadSupport}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.SerializerManager
import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations
import org.apache.spark.storage.{BlockId, BlockManager, ShuffleBlockFetcherIterator}
import org.apache.spark.shuffle.io.DefaultShuffleReadSupport
import org.apache.spark.storage.{ShuffleBlockFetcherIterator, ShuffleBlockId}
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.ExternalSorter

Expand All @@ -35,40 +42,68 @@ private[spark] class BlockStoreShuffleReader[K, C](
endPartition: Int,
context: TaskContext,
readMetrics: ShuffleReadMetricsReporter,
shuffleReadSupport: ShuffleReadSupport,
serializerManager: SerializerManager = SparkEnv.get.serializerManager,
blockManager: BlockManager = SparkEnv.get.blockManager,
mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker)
mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker,
sparkConf: SparkConf = SparkEnv.get.conf)
extends ShuffleReader[K, C] with Logging {

private val dep = handle.dependency

private val compressionCodec = CompressionCodec.createCodec(sparkConf)

private val compressShuffle = sparkConf.get(config.SHUFFLE_COMPRESS)

/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
val wrappedStreams = new ShuffleBlockFetcherIterator(
context,
blockManager.shuffleClient,
blockManager,
mapOutputTracker.getMapSizesByShuffleLocation(handle.shuffleId, startPartition, endPartition)
.map {
case (loc: DefaultMapShuffleLocations, blocks: Seq[(BlockId, Long)]) =>
(loc.getBlockManagerId, blocks)
case _ =>
throw new UnsupportedOperationException("Not allowed to using non-default map shuffle" +
" locations yet.")
},
serializerManager.wrapStream,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.get(config.REDUCER_MAX_SIZE_IN_FLIGHT) * 1024 * 1024,
SparkEnv.get.conf.get(config.REDUCER_MAX_REQS_IN_FLIGHT),
SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
SparkEnv.get.conf.get(config.SHUFFLE_DETECT_CORRUPT),
readMetrics).toCompletionIterator
val streamsIterator =
shuffleReadSupport.getPartitionReaders(new Iterable[ShuffleBlockInfo] {
override def iterator: Iterator[ShuffleBlockInfo] = {
mapOutputTracker
.getMapSizesByShuffleLocation(handle.shuffleId, startPartition, endPartition)
.flatMap { shuffleLocationInfo =>
shuffleLocationInfo._2.map { blockInfo =>
val block = blockInfo._1.asInstanceOf[ShuffleBlockId]
new ShuffleBlockInfo(
block.shuffleId,
block.mapId,
block.reduceId,
blockInfo._2,
Optional.ofNullable(shuffleLocationInfo._1.orNull))
}
}
}
}.asJava).iterator()

val serializerInstance = dep.serializer.newInstance()
val retryingWrappedStreams = new Iterator[InputStream] {
override def hasNext: Boolean = streamsIterator.hasNext

// Create a key/value iterator for each stream
val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) =>
override def next(): InputStream = {
var returnStream: InputStream = null
while (streamsIterator.hasNext && returnStream == null) {
if (shuffleReadSupport.isInstanceOf[DefaultShuffleReadSupport]) {
// The default implementation checks for corrupt streams, so it will already have
// decompressed/decrypted the bytes
returnStream = streamsIterator.next()
} else {
val nextStream = streamsIterator.next()
returnStream = if (compressShuffle) {
compressionCodec.compressedInputStream(
serializerManager.wrapForEncryption(nextStream))
} else {
serializerManager.wrapForEncryption(nextStream)
}
}
}
if (returnStream == null) {
throw new IllegalStateException("Expected shuffle reader iterator to return a stream")
}
returnStream
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defensive programming: if wrappedStreams didn't have a next value and returnStream is null, it means we ran out of elements despite the underlying iterator claiming that there were indeed more elements - which indicates that retrying didn't work properly. Let's check that returnStream is not null here and report an error message accordingly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm so the retrying not working properly would result in either wrappedStreams.next() or wrappedStreams.retryLastBlock() to throw a FetchFailedException, but you're right, we aren't guaranteed that that other implementations will do this correctly. Will add the check

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping here? Think the last push doesn't address this.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind I was looking at a stale diff

}
}

val serializerInstance = dep.serializer.newInstance()
val recordIter = retryingWrappedStreams.flatMap { wrappedStream =>
// Note: the asKeyValueIterator below wraps a key/value iterator inside of a
// NextIterator. The NextIterator makes sure that close() is called on the
// underlying InputStream when all records have been read.
Expand Down
Loading