Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f427ab5
Register map output metadata upon committing shuffle map tasks.
mccheah Mar 17, 2020
65d077d
Add a mock version for tests.
mccheah May 23, 2020
c2882ab
Update manifests
mccheah Jun 3, 2020
dc6f853
Add a test for specifically checking map output registration and backup
mccheah Jun 3, 2020
529122f
Return map output commit message from single spill writer
mccheah Jun 23, 2020
d20a0ee
Address comments and fix build
mccheah Jun 23, 2020
dc8d15c
Temporarily remove tests
mccheah Jun 23, 2020
e340bca
Address comments
mccheah Jul 30, 2020
edd5c05
Merge remote-tracking branch 'origin/master' into register-map-output…
mccheah Jul 30, 2020
7baa3d2
Fix Java linting errors
mccheah Jul 31, 2020
cce67ab
Fix infinite loop.
mccheah Jul 31, 2020
409bebb
Fix checkstyle
mccheah Jul 31, 2020
a6fabd2
Fix test
mccheah Aug 1, 2020
3c66353
Invoke super.afterEach in ShuffleDriverComponentsSuite
mccheah Aug 3, 2020
b88d724
Fix build
mccheah Aug 3, 2020
3505af8
Merge remote-tracking branch 'origin/master' into register-map-output…
mccheah Aug 3, 2020
89bb528
Address comments.
mccheah Aug 10, 2020
0da9e22
Address comments
mccheah Sep 10, 2020
2b5108f
Merge remote-tracking branch 'origin/master' into register-map-output…
mccheah Sep 10, 2020
1e10577
Merge branch 'master' into pr/28618
attilapiros Oct 15, 2020
cac0e9e
apply Attila's review comments
attilapiros Oct 16, 2020
861f089
remove unused import
attilapiros Oct 16, 2020
e210160
fix mima
attilapiros Oct 17, 2020
1fc86f6
forgotten commas
attilapiros Oct 17, 2020
a09ced3
Merge branch 'master' into updated_28618
attilapiros Jan 11, 2021
3835ed8
applying review comments
attilapiros Jan 26, 2021
88d370e
Merge remote-tracking branch 'parent/master' into updated_28618
attilapiros Jan 26, 2021
6f571a2
fix unidoc's Java API documentation generation
attilapiros Jan 26, 2021
b350258
Merge branch 'master' into updated_28618
attilapiros Jan 27, 2021
abcf8f3
Merge branch 'master' into updated_28618
attilapiros Mar 8, 2021
8e54b41
update mima excludes
attilapiros Mar 8, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-xml_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/java/org/apache/spark/shuffle/api/ShuffleDataIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.shuffle.api;

import java.util.Map;

import org.apache.spark.annotation.Private;

/**
Expand Down Expand Up @@ -44,12 +46,18 @@ public interface ShuffleDataIO {
/**
* Called once on executor processes to bootstrap the shuffle data storage modules that
* are only invoked on the executors.
*
* @param appId The Spark application id
* @param execId The unique identifier of the executor being initialized
* @param extraConfigs Extra configs that were returned by
* {@link ShuffleDriverComponents#getAddedExecutorSparkConf()}
*/
ShuffleExecutorComponents executor();
ShuffleExecutorComponents initializeShuffleExecutorComponents(
String appId, String execId, Map<String, String> extraConfigs);

/**
* Called once on driver process to bootstrap the shuffle metadata modules that
* are maintained by the driver.
*/
ShuffleDriverComponents driver();
ShuffleDriverComponents initializeShuffleDriverComponents();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Map;

import org.apache.spark.annotation.Private;
import org.apache.spark.shuffle.api.metadata.NoOpShuffleOutputTracker;
import org.apache.spark.shuffle.api.metadata.ShuffleOutputTracker;

/**
* :: Private ::
Expand All @@ -29,36 +31,21 @@
public interface ShuffleDriverComponents {

/**
* Called once in the driver to bootstrap this module that is specific to this application.
* This method is called before submitting executor requests to the cluster manager.
*
* This method should prepare the module with its shuffle components i.e. registering against
* an external file servers or shuffle services, or creating tables in a shuffle
* storage data database.
* Provide additional configuration for the executors when their plugin system is initialized
* via {@link ShuffleDataIO#initializeShuffleExecutorComponents(String, String, Map)}
*
* @return additional SparkConf settings necessary for initializing the executor components.
* This would include configurations that cannot be statically set on the application, like
* the host:port of external services for shuffle storage.
*/
Map<String, String> initializeApplication();
Map<String, String> getAddedExecutorSparkConf();

/**
* Called once at the end of the Spark application to clean up any existing shuffle state.
*/
void cleanupApplication();

/**
* Called once per shuffle id when the shuffle id is first generated for a shuffle stage.
*
* @param shuffleId The unique identifier for the shuffle stage.
*/
default void registerShuffle(int shuffleId) {}

/**
* Removes shuffle data associated with the given shuffle.
*
* @param shuffleId The unique identifier for the shuffle stage.
* @param blocking Whether this call should block on the deletion of the data.
*/
default void removeShuffle(int shuffleId, boolean blocking) {}
default ShuffleOutputTracker shuffleOutputTracker() {
return new NoOpShuffleOutputTracker();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.shuffle.api;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;

import org.apache.spark.annotation.Private;
Expand All @@ -32,17 +31,6 @@
@Private
public interface ShuffleExecutorComponents {

/**
* Called once per executor to bootstrap this module with state that is specific to
* that executor, specifically the application ID and executor ID.
*
* @param appId The Spark application id
* @param execId The unique identifier of the executor being initialized
* @param extraConfigs Extra configs that were returned by
* {@link ShuffleDriverComponents#initializeApplication()}
*/
void initializeExecutor(String appId, String execId, Map<String, String> extraConfigs);

/**
* Called once per map task to create a writer that will be responsible for persisting all the
* partitioned bytes written by that map task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;

import org.apache.spark.annotation.Private;
import org.apache.spark.shuffle.api.metadata.MapOutputCommitMessage;

/**
* Optional extension for partition writing that is optimized for transferring a single
Expand All @@ -32,5 +33,6 @@ public interface SingleSpillShuffleMapOutputWriter {
/**
* Transfer a file that contains the bytes of all the partitions written by this map task.
*/
void transferMapSpillFile(File mapOutputFile, long[] partitionLengths) throws IOException;
MapOutputCommitMessage transferMapSpillFile(File mapOutputFile, long[] partitionLengths)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.io.Serializable;

import org.apache.spark.annotation.Private;

/**
* :: Private ::
* An opaque metadata tag for registering the result of committing the output of a
Expand All @@ -27,4 +29,5 @@
* All implementations must be serializable since this is sent from the executors to
* the driver.
*/
@Private
public interface MapOutputMetadata extends Serializable {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.api.metadata;

/**
* An implementation of shuffle output tracking that does not keep track of any shuffle state.
*/
public class NoOpShuffleOutputTracker implements ShuffleOutputTracker {

@Override
public void registerShuffle(int shuffleId) {}

@Override
public void unregisterShuffle(int shuffleId, boolean blocking) {}

@Override
public void registerMapOutput(
int shuffleId, int mapIndex, long mapId, MapOutputMetadata mapOutputMetadata) {}

@Override
public void removeMapOutput(int shuffleId, int mapIndex, long mapId) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.api.metadata;

import org.apache.spark.annotation.Private;

/**
* :: Private ::
*
* A plugin that can monitor the storage of shuffle data from map tasks, and can provide
* metadata to shuffle readers to aid their reading of shuffle blocks in reduce tasks.
* <p>
* {@link MapOutputMetadata} instances provided from the plugin tree's implementation of
* {@link org.apache.spark.shuffle.api.ShuffleMapOutputWriter} are sent to this module's map output
* metadata registration method in {@link #registerMapOutput(int, int, long, MapOutputMetadata)}.
* <p>
* Implementations MUST be thread-safe. Spark will invoke methods in this module in parallel.
* <p>
* A singleton instance of this module is instantiated on the driver via
* {@link org.apache.spark.shuffle.api.ShuffleDriverComponents#shuffleOutputTracker()}.
*/
@Private
public interface ShuffleOutputTracker {

/**
* Called when a new shuffle stage is going to be run.
*
* @param shuffleId the unique identifier for the new shuffle stage
*/
void registerShuffle(int shuffleId);

/**
* Called when the shuffle with the given id is unregistered because it will no longer
* be used by Spark tasks.
*
* @param shuffleId the unique identifier for the shuffle stage to be unregistered
*/
void unregisterShuffle(int shuffleId, boolean blocking);

/**
* Called when a map task completes, and the map output writer has provided metadata to be
* persisted by this shuffle output tracker.
*
* @param shuffleId the unique identifier for the shuffle stage that the map task is a
* part of
* @param mapIndex the map index of the map task in its shuffle map stage - not
* necessarily unique across multiple attempts of this task
* @param mapId the identifier for this map task, which is unique even across
* multiple attempts at this task
* @param mapOutputMetadata metadata about the map output data's storage returned by the map
* task's writer
*
*/
void registerMapOutput(
int shuffleId, int mapIndex, long mapId, MapOutputMetadata mapOutputMetadata);

/**
* Called when the given map output is discarded, and will not longer be used in future Spark
* shuffles.
*
* @param shuffleId the unique identifier for the shuffle stage that the map task is a
* part of
* @param mapIndex the map index of the map task which is having its output being
* discarded - not necessarily unique across multiple attempts of this
* task
* @param mapId the identifier for the map task which is having its output being
* discarded, which is unique even across multiple attempts at this task
*/
void removeMapOutput(int shuffleId, int mapIndex, long mapId);
}
Loading