Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f427ab5
Register map output metadata upon committing shuffle map tasks.
mccheah Mar 17, 2020
65d077d
Add a mock version for tests.
mccheah May 23, 2020
c2882ab
Update manifests
mccheah Jun 3, 2020
dc6f853
Add a test for specifically checking map output registration and backup
mccheah Jun 3, 2020
529122f
Return map output commit message from single spill writer
mccheah Jun 23, 2020
d20a0ee
Address comments and fix build
mccheah Jun 23, 2020
dc8d15c
Temporarily remove tests
mccheah Jun 23, 2020
e340bca
Address comments
mccheah Jul 30, 2020
edd5c05
Merge remote-tracking branch 'origin/master' into register-map-output…
mccheah Jul 30, 2020
7baa3d2
Fix Java linting errors
mccheah Jul 31, 2020
cce67ab
Fix infinite loop.
mccheah Jul 31, 2020
409bebb
Fix checkstyle
mccheah Jul 31, 2020
a6fabd2
Fix test
mccheah Aug 1, 2020
3c66353
Invoke super.afterEach in ShuffleDriverComponentsSuite
mccheah Aug 3, 2020
b88d724
Fix build
mccheah Aug 3, 2020
3505af8
Merge remote-tracking branch 'origin/master' into register-map-output…
mccheah Aug 3, 2020
89bb528
Address comments.
mccheah Aug 10, 2020
0da9e22
Address comments
mccheah Sep 10, 2020
2b5108f
Merge remote-tracking branch 'origin/master' into register-map-output…
mccheah Sep 10, 2020
1e10577
Merge branch 'master' into pr/28618
attilapiros Oct 15, 2020
cac0e9e
apply Attila's review comments
attilapiros Oct 16, 2020
861f089
remove unused import
attilapiros Oct 16, 2020
a6d974c
Merge pull request #15 from attilapiros/updated_28618
mccheah Oct 16, 2020
e210160
fix mima
attilapiros Oct 17, 2020
f69cba7
Merge pull request #16 from attilapiros/updated_28618
mccheah Oct 17, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-xml_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.shuffle.api;

import java.util.Map;

import org.apache.spark.annotation.Private;

/**
Expand Down Expand Up @@ -44,12 +46,18 @@ public interface ShuffleDataIO {
/**
* Called once on executor processes to bootstrap the shuffle data storage modules that
* are only invoked on the executors.
*
* @param appId The Spark application id
* @param execId The unique identifier of the executor being initialized
* @param extraConfigs Extra configs that were returned by
* {@link ShuffleDriverComponents#getAddedExecutorSparkConf()}
*/
ShuffleExecutorComponents executor();
ShuffleExecutorComponents initializeShuffleExecutorComponents(
String appId, String execId, Map<String, String> extraConfigs);

/**
* Called once on driver process to bootstrap the shuffle metadata modules that
* are maintained by the driver.
*/
ShuffleDriverComponents driver();
ShuffleDriverComponents initializeShuffleDriverComponents();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Map;

import org.apache.spark.annotation.Private;
import org.apache.spark.shuffle.api.metadata.NoOpShuffleOutputTracker;
import org.apache.spark.shuffle.api.metadata.ShuffleOutputTracker;

/**
* :: Private ::
Expand All @@ -29,36 +31,21 @@
public interface ShuffleDriverComponents {

/**
* Called once in the driver to bootstrap this module that is specific to this application.
* This method is called before submitting executor requests to the cluster manager.
*
* This method should prepare the module with its shuffle components i.e. registering against
* an external file servers or shuffle services, or creating tables in a shuffle
* storage data database.
* Provide additional configuration for the executors when their plugin system is initialized
* via {@link ShuffleDataIO#initializeShuffleExecutorComponents(String, String, Map)} ()}
*
* @return additional SparkConf settings necessary for initializing the executor components.
* This would include configurations that cannot be statically set on the application, like
* the host:port of external services for shuffle storage.
*/
Map<String, String> initializeApplication();
Map<String, String> getAddedExecutorSparkConf();

/**
* Called once at the end of the Spark application to clean up any existing shuffle state.
*/
void cleanupApplication();

/**
* Called once per shuffle id when the shuffle id is first generated for a shuffle stage.
*
* @param shuffleId The unique identifier for the shuffle stage.
*/
default void registerShuffle(int shuffleId) {}

/**
* Removes shuffle data associated with the given shuffle.
*
* @param shuffleId The unique identifier for the shuffle stage.
* @param blocking Whether this call should block on the deletion of the data.
*/
default void removeShuffle(int shuffleId, boolean blocking) {}
default ShuffleOutputTracker shuffleOutputTracker() {
return new NoOpShuffleOutputTracker();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.shuffle.api;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;

import org.apache.spark.annotation.Private;
Expand All @@ -32,17 +31,6 @@
@Private
public interface ShuffleExecutorComponents {

/**
* Called once per executor to bootstrap this module with state that is specific to
* that executor, specifically the application ID and executor ID.
*
* @param appId The Spark application id
* @param execId The unique identifier of the executor being initialized
* @param extraConfigs Extra configs that were returned by
* {@link ShuffleDriverComponents#initializeApplication()}
*/
void initializeExecutor(String appId, String execId, Map<String, String> extraConfigs);

/**
* Called once per map task to create a writer that will be responsible for persisting all the
* partitioned bytes written by that map task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;

import org.apache.spark.annotation.Private;
import org.apache.spark.shuffle.api.metadata.MapOutputCommitMessage;

/**
* Optional extension for partition writing that is optimized for transferring a single
Expand All @@ -32,5 +33,6 @@ public interface SingleSpillShuffleMapOutputWriter {
/**
* Transfer a file that contains the bytes of all the partitions written by this map task.
*/
void transferMapSpillFile(File mapOutputFile, long[] partitionLengths) throws IOException;
MapOutputCommitMessage transferMapSpillFile(File mapOutputFile, long[] partitionLengths)
throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.api.metadata;

/**
* An implementation of shuffle output tracking that does not keep track of any shuffle state.
*/
public class NoOpShuffleOutputTracker implements ShuffleOutputTracker {

@Override
public void registerShuffle(int shuffleId) {}

@Override
public void unregisterShuffle(int shuffleId, boolean blocking) {}

@Override
public void registerMapOutput(
int shuffleId, int mapIndex, long mapId, MapOutputMetadata mapOutputMetadata) {}

@Override
public void removeMapOutput(int shuffleId, int mapIndex, long mapId) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.api.metadata;

/**
* :: Private ::
*
* A plugin that can monitor the storage of shuffle data from map tasks, and can provide
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assume this is on the driver, might be nice just to mention.

* metadata to shuffle readers to aid their reading of shuffle blocks in reduce tasks.
* <p>
* {@link MapOutputMetadata} instances provided from the plugin tree's implementation of
* {@link org.apache.spark.shuffle.api.ShuffleMapOutputWriter} are sent to this module's map output
* metadata registration method in {@link #registerMapOutput(int, int, long, MapOutputMetadata)}.
* <p>
* Implementations MUST be thread-safe. Spark will invoke methods in this module in parallel.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a note about what locking semantics are when the methods are invoked ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what can be added. Can you give an example of guidance we could give to developers?

Copy link
Contributor

@mridulm mridulm Sep 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an example, registerShuffle is invoked with write lock held, while unregisterShuffle is invoked without any lock.
As a public api we are exposing, the locking semantics needs to be elaborated; so that users can evaluate what can be done safely/without contention, and what needs additional protection (for example, updateMapOutput can be performed without contention from registerShuffle or registerMapOutput)

Additionally, it also informs users about how to design their implementations. For example, if an implementation makes rpc calls (or other blocking calls) from registerMapOutput , there could be nontrivial side effects in driver.

(I just picked some api calls to illustrate, we need to elaborate for each).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it sufficient to simply say that all APIs need to have exclusive access from each other, at least locking on the shuffle id level?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unregisterShuffle currently does not lock ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, that's up to the implementation to handle, no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not referring to how implementations manage their state/locking (if they have additional state/coordination required, you are right, it has to be handled there - state managed across shuffles as an example for some custom impl).

I was referring to what guarantees/expectations that implementations have from spark (MapOutputTracker currently) when these methods are invoked. We have to document this, so that implementation can be both aware of the MT-safety guarantees we provide (and at what granularity) - so that they can make reasonable assumptions about what can/cant be done in an implementation.

For example, as mentioned above, registerShuffle is invoked only with write lock is held for a particular shuffle. While unregisterShuffle does not have that guarantee.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole point of just saying that implementations must be thread-safe is that implementations cannot make any guarantees about how the caller will invoke this API. Even if we can say that the current implementation of MapOutputTracker would guarantee locking on per-shuffle-id, for example, that isn't an API guarantee we can realistically make since MapOutputTracker is a module internal to Spark.

I think saying that there is no guarantee about concurrent access to all methods suffices. But then that basically means "the implementation should be thread-safe". Is that reasonable, or do we need to be more granular than that?

* <p>
* A singleton instance of this module is instantiated on the driver via
* {@link ShuffleDriverComponents#shuffleOutputTracker()}.
*/
public interface ShuffleOutputTracker {

/**
* Called when a new shuffle stage is going to be run.
*
* @param shuffleId the unique identifier for the new shuffle stage
*/
void registerShuffle(int shuffleId);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A callback to signal completion (or failure) of shuffle for a shuffleId will help ShuffleDriverComponents coordinate with external services to commit (or cleanup) shuffle metadata/files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hesitant to make significant API changes given how late this is in the review stage - let's add it as a follow-up if there are concrete use cases that require it down the road.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the currently designed api, this is a missing gap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again - I don't want to add it here, since it would require further integration of such a new API call in the rest of the Spark codebase, which I am holding as out of scope for this patch.

Copy link
Contributor

@mridulm mridulm Sep 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is as a gap in the proposed api - which limits the effectiveness to leverage it for other shuffle usecases.
+CC @otterc PTAL if without this, shuffle changes can be made for your patches.
Alternative would be to do this in subsequent or other efforts to complete the api.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API isn't complete as of this patch anyways, so I'd prefer that functionality to be deferred since the complexity being added here is already pretty significant. Can we file a follow up JIRA for it and go from there?

/**
* Called when the shuffle with the given id is unregistered because it will no longer
* be used by Spark tasks.
*
* @param shuffleId the unique identifier for the shuffle stage to be unregistered
*/
void unregisterShuffle(int shuffleId, boolean blocking);

/**
* Called when a map task completes, and the map output writer has provided metadata to be
* persisted by this shuffle output tracker.
*
* @param shuffleId the unique identifier for the shuffle stage that the map task is a
* part of
* @param mapIndex the map index of the map task in its shuffle map stage - not
* necessarily unique across multiple attempts of this task
* @param mapId the identifier for this map task, which is unique even across
* multiple attempts at this task
* @param mapOutputMetadata metadata about the map output data's storage returned by the map
* task's writer
*
*/
void registerMapOutput(
int shuffleId, int mapIndex, long mapId, MapOutputMetadata mapOutputMetadata);

/**
* Called when the given map output is discarded, and will not longer be used in future Spark
* shuffles.
*
* @param shuffleId the unique identifier for the shuffle stage that the map task is a
* part of
* @param mapIndex the map index of the map task which is having its output being
* discarded - not necessarily unique across multiple attempts of this
* task
* @param mapId the identifier for the map task which is having its output being
* discarded, which is unique even across multiple attempts at this task
*/
void removeMapOutput(int shuffleId, int mapIndex, long mapId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import scala.Product2;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.compat.java8.OptionConverters;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Closeables;
Expand All @@ -39,17 +40,18 @@
import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.scheduler.MapTaskResult;
import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
import org.apache.spark.shuffle.api.ShuffleMapOutputWriter;
import org.apache.spark.shuffle.api.ShufflePartitionWriter;
import org.apache.spark.shuffle.api.WritableByteChannelWrapper;
import org.apache.spark.internal.config.package$;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.shuffle.api.metadata.MapOutputCommitMessage;
import org.apache.spark.storage.*;
import org.apache.spark.util.Utils;

Expand Down Expand Up @@ -92,8 +94,8 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
/** Array of file writers, one for each partition */
private DiskBlockObjectWriter[] partitionWriters;
private FileSegment[] partitionWriterSegments;
@Nullable private MapStatus mapStatus;
private long[] partitionLengths;
@Nullable private MapTaskResult taskResult;
private MapOutputCommitMessage mapOutputCommitMessage;

/**
* Are we in the process of stopping? Because map tasks can call stop() with success = true
Expand Down Expand Up @@ -130,9 +132,13 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
.createMapOutputWriter(shuffleId, mapId, numPartitions);
try {
if (!records.hasNext()) {
partitionLengths = mapOutputWriter.commitAllPartitions().getPartitionLengths();
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapId);
mapOutputCommitMessage = mapOutputWriter.commitAllPartitions();
taskResult = new MapTaskResult(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As these lines are repeating you could extract them into a new def, like:

  protected void setTaskResult(MapOutputCommitMessage mapOutputCommitMessage) {
    taskResult = new MapTaskResult(
        MapStatus$.MODULE$.apply(
            blockManager.shuffleServerId(),
            mapOutputCommitMessage.getPartitionLengths(),
            mapId),
        OptionConverters.toScala(mapOutputCommitMessage.getMapOutputMetadata()));
  }

With the help of this new def and Mockito's spy you can even get rid of the storing the mapOutputCommitMessage for testing purposes only but it has a price (this class cannot be final) for details you can check:
attilapiros@f4578a3

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack - didn't address this in my latest patch but will get around to this

MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(),
mapOutputCommitMessage.getPartitionLengths(),
mapId),
OptionConverters.toScala(mapOutputCommitMessage.getMapOutputMetadata()));
return;
}
final SerializerInstance serInstance = serializer.newInstance();
Expand Down Expand Up @@ -164,9 +170,13 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
}
}

partitionLengths = writePartitionedData(mapOutputWriter);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapId);
mapOutputCommitMessage = writePartitionedData(mapOutputWriter);
taskResult = new MapTaskResult(
MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(),
mapOutputCommitMessage.getPartitionLengths(),
mapId),
OptionConverters.toScala(mapOutputCommitMessage.getMapOutputMetadata()));
} catch (Exception e) {
try {
mapOutputWriter.abort(e);
Expand All @@ -179,16 +189,17 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
}

@VisibleForTesting
long[] getPartitionLengths() {
return partitionLengths;
MapOutputCommitMessage getMapOutputCommitMessage() {
return mapOutputCommitMessage;
}

/**
* Concatenate all of the per-partition files into a single combined file.
*
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker).
*/
private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) throws IOException {
private MapOutputCommitMessage writePartitionedData(ShuffleMapOutputWriter mapOutputWriter)
throws IOException {
// Track location of the partition starts in the output file
if (partitionWriters != null) {
final long writeStartTime = System.nanoTime();
Expand Down Expand Up @@ -219,7 +230,7 @@ private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) thro
}
partitionWriters = null;
}
return mapOutputWriter.commitAllPartitions().getPartitionLengths();
return mapOutputWriter.commitAllPartitions();
}

private void writePartitionedDataWithChannel(
Expand Down Expand Up @@ -259,16 +270,16 @@ private void writePartitionedDataWithStream(File file, ShufflePartitionWriter wr
}

@Override
public Option<MapStatus> stop(boolean success) {
public Option<MapTaskResult> stop(boolean success) {
if (stopping) {
return None$.empty();
} else {
stopping = true;
if (success) {
if (mapStatus == null) {
if (taskResult == null) {
throw new IllegalStateException("Cannot call stop(true) without having called write()");
}
return Option.apply(mapStatus);
return Option.apply(taskResult);
} else {
// The map task failed, so delete our output data.
if (partitionWriters != null) {
Expand Down
Loading