Skip to content

Conversation

@mccheah
Copy link
Contributor

@mccheah mccheah commented May 23, 2020

What changes were proposed in this pull request?

Adds a ShuffleOutputTracker API that can be used for managing shuffle metadata on the driver. Accepts map output metadata returned by the map output writers.

Requires #28616.

Why are the changes needed?

Part of the design as discussed in this document, and part of the wider effort of SPARK-25299.

Does this PR introduce any user-facing change?

Enables additional APIs for the shuffle storage plugin tree. Usage will become more apparent when the read side of the shuffle plugin tree is introduced.

How was this patch tested?

We've added a mock implementation of the shuffle plugin tree here, to prove that a Spark job using a different implementation of the plugin can use all of the plugin points for an alternative shuffle data storage solution. But we don't include it here, in order to minimize the diff and the code to review in this specific patch. See #28902.

@mccheah mccheah changed the title [WIP][SPARK-31801][API][SHUFFLE] Register map output metadata [SPARK-31801][WIP][API][SHUFFLE] Register map output metadata May 23, 2020
@SparkQA
Copy link

SparkQA commented May 23, 2020

Test build #123022 has finished for PR 28618 at commit 98821b9.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* number of bytes written by the partition writer for that partition id.
*/
long[] commitAllPartitions() throws IOException;
MapOutputCommitMessage commitAllPartitions() throws IOException;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest change the relate comments of this code, the return is an object instead of array

@SparkQA
Copy link

SparkQA commented Jun 4, 2020

Test build #123502 has finished for PR 28618 at commit 25e98e7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 4, 2020

Test build #123504 has finished for PR 28618 at commit 51df151.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 4, 2020

Test build #123507 has finished for PR 28618 at commit e7c9988.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapId);
mapOutputCommitMessage = mapOutputWriter.commitAllPartitions();
taskResult = new MapTaskResult(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As these lines are repeating you could extract them into a new def, like:

  protected void setTaskResult(MapOutputCommitMessage mapOutputCommitMessage) {
    taskResult = new MapTaskResult(
        MapStatus$.MODULE$.apply(
            blockManager.shuffleServerId(),
            mapOutputCommitMessage.getPartitionLengths(),
            mapId),
        OptionConverters.toScala(mapOutputCommitMessage.getMapOutputMetadata()));
  }

With the help of this new def and Mockito's spy you can even get rid of the storing the mapOutputCommitMessage for testing purposes only but it has a price (this class cannot be final) for details you can check:
attilapiros@f4578a3

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack - didn't address this in my latest patch but will get around to this

Comment on lines 286 to 288
mapOutputCommitMessage = maybeMetadata.map(
metadata -> MapOutputCommitMessage.of(spills[0].partitionLengths, metadata))
.orElse(MapOutputCommitMessage.of(spills[0].partitionLengths));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot se why transferMapSpillFile cannot return a MapOutputCommitMessage that would simply this part:
attilapiros@289050e

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I think this was originally designed this way because we didn't want the single spill writer to set a list of partition lengths that was different from what was passed into the writer's transfer function. But, maybe we can wrap this with a preconditions check to ensure that the state remains consistent, and that's good enough along with Javadoc.

@attilapiros
Copy link
Contributor

I was thinking a lot on ShuffleDriverComponents and I have an idea how to improve it.

The problem I believe this class tries to fulfil two very separate roles: be a builder and the result of the building in the same time.

This is why we need this kind of check:

if (outputTracker == null) {
throw new IllegalStateException("Driver components must be initialized before use");
}

If the building is cleanly separated from the result of the building then we can be sure the prerequisites are fulfilled before.

I would change it by transforming it to be the result of the building in the following way:

  • the initializeApplication (I mean the process of the initialisation and not the returned Map) should be part of the building. The documentation of the ShuffleDataIO#driver method can be extended by mentioning this is the right place to initialize.
  • the ShuffleDriverComponents could have a new method which gives back the "additional SparkConf settings necessary for initializing the executor components" we can call it like additonalExecutorConfigs. This new method would replace the old initializeApplication

One more idea / question:

  • I do not see why the ShuffleOutputTracker is optional. Either we or the API user can provide an implementation where the methods are empty this way the API a bit simpler.

@mccheah what do you think?

@SparkQA
Copy link

SparkQA commented Jun 19, 2020

Test build #124261 has finished for PR 28618 at commit e7c9988.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Jun 20, 2020

Test build #124307 has finished for PR 28618 at commit e7c9988.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mccheah
Copy link
Contributor Author

mccheah commented Jun 23, 2020

I was thinking a lot on ShuffleDriverComponents and I have an idea how to improve it.

The problem I believe this class tries to fulfil two very separate roles: be a builder and the result of the building in the same time.

This is why we need this kind of check:

if (outputTracker == null) {
throw new IllegalStateException("Driver components must be initialized before use");
}

If the building is cleanly separated from the result of the building then we can be sure the prerequisites are fulfilled before.

I would change it by transforming it to be the result of the building in the following way:

  • the initializeApplication (I mean the process of the initialisation and not the returned Map) should be part of the building. The documentation of the ShuffleDataIO#driver method can be extended by mentioning this is the right place to initialize.
  • the ShuffleDriverComponents could have a new method which gives back the "additional SparkConf settings necessary for initializing the executor components" we can call it like additonalExecutorConfigs. This new method would replace the old initializeApplication

One more idea / question:

  • I do not see why the ShuffleOutputTracker is optional. Either we or the API user can provide an implementation where the methods are empty this way the API a bit simpler.

@mccheah what do you think?

You bring up a good point. I can adjust the PR accordingly. It does seem like the components does both an initialization and a runtime mode, and it would be more ideal to separate the two. Thanks for critically thinking about this!

@mccheah
Copy link
Contributor Author

mccheah commented Jun 23, 2020

Also I think it makes sense for the executor side and the driver side to be mirrored.

@mccheah mccheah force-pushed the register-map-output-metadata branch from e7c9988 to dc8d15c Compare June 23, 2020 04:15
@mccheah
Copy link
Contributor Author

mccheah commented Jun 23, 2020

I rebased on master in my latest patch. I also addressed your comments @attilapiros. Thanks for the feedback!

The diff was growing extremely large (> 1000 lines), so I removed all the tests for now. I'm going to open a separate patch with tests for this.

@mccheah mccheah changed the title [SPARK-31801][WIP][API][SHUFFLE] Register map output metadata [SPARK-31801][API][SHUFFLE] Register map output metadata Jun 23, 2020
@SparkQA
Copy link

SparkQA commented Jun 23, 2020

Test build #124385 has finished for PR 28618 at commit dc8d15c.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@attilapiros
Copy link
Contributor

In BaseReceivedBlockHandlerSuite one parameter (a ShuffleOutputTracker) is missing but as the test does not do any shuffle we can pass a null as value (or a NoOpShuffleOutputTracker). With null:

index 0976494b6d..5f6a0f164f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -70,7 +70,7 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
   val streamId = 1
   val securityMgr = new SecurityManager(conf, encryptionKey)
   val broadcastManager = new BroadcastManager(true, conf, securityMgr)
-  val mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true)
+  val mapOutputTracker = new MapOutputTrackerMaster(conf, null, broadcastManager, true)
   val shuffleManager = new SortShuffleManager(conf)
   val serializer = new KryoSerializer(conf)
   var serializerManager = new SerializerManager(serializer, conf, encryptionKey)
 [info] ReceivedBlockHandlerSuite:
  [info] - BlockManagerBasedBlockHandler - store blocks (315 milliseconds)
  [info] - BlockManagerBasedBlockHandler - handle errors in storing block (5 milliseconds)
  [info] - WriteAheadLogBasedBlockHandler - store blocks (216 milliseconds)
  [info] - WriteAheadLogBasedBlockHandler - handle errors in storing block (13 milliseconds)
  [info] - WriteAheadLogBasedBlockHandler - clean old blocks (53 milliseconds)
  [info] - Test Block - count messages (121 milliseconds)
  [info] - Test Block - isFullyConsumed (24 milliseconds)
  [info] ReceivedBlockHandlerWithEncryptionSuite:
  [info] - BlockManagerBasedBlockHandler - store blocks (27 milliseconds)
  [info] - BlockManagerBasedBlockHandler - handle errors in storing block (2 milliseconds)
  [info] - WriteAheadLogBasedBlockHandler - store blocks (89 milliseconds)
  [info] - WriteAheadLogBasedBlockHandler - handle errors in storing block (8 milliseconds)
  [info] - WriteAheadLogBasedBlockHandler - clean old blocks (16 milliseconds)
  [info] - Test Block - count messages (71 milliseconds)
  [info] - Test Block - isFullyConsumed (14 milliseconds)
  [info] ScalaTest
  [info] Run completed in 4 seconds, 30 milliseconds.
  [info] Total number of tests run: 14
  [info] Suites: completed 2, aborted 0
  [info] Tests: succeeded 14, failed 0, canceled 0, ignored 0, pending 0
  [info] All tests passed.

Comment on lines 32 to 50
private var _driver: ShuffleDriverComponents = _
private var _executor: ShuffleExecutorComponents = _

def getOrCreateDriverComponents(): ShuffleDriverComponents = synchronized {
if (_driver == null) {
_driver = delegate.initializeShuffleDriverComponents()
}
_driver
}

def getOrCreateExecutorComponents(
appId: String,
execId: String,
extraConfigs: Map[String, String]): ShuffleExecutorComponents = synchronized {
if (_executor == null) {
_executor = delegate.initializeShuffleExecutorComponents(appId, execId, extraConfigs.asJava)
}
_executor
}
Copy link
Contributor

@attilapiros attilapiros Jun 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can improve the methods of this class. Especially the getOrCreateExecutorComponents as reading that I have a feeling it is easy to use it badly. For example by calling it twice with two different values (for any of its params) as always the object generated for the first call will be returned.

But first getOrCreateDriverComponents that is the easier here as it has no input params at all. So this could be replaced with a simple lazy val (it is thread-safe for a long time ago: scala/bug#3007).

And the same is true for getOrCreateExecutorComponents as all its parameters are basically coming from a SparkEnv instance.

So my idea is:

Suggested change
private var _driver: ShuffleDriverComponents = _
private var _executor: ShuffleExecutorComponents = _
def getOrCreateDriverComponents(): ShuffleDriverComponents = synchronized {
if (_driver == null) {
_driver = delegate.initializeShuffleDriverComponents()
}
_driver
}
def getOrCreateExecutorComponents(
appId: String,
execId: String,
extraConfigs: Map[String, String]): ShuffleExecutorComponents = synchronized {
if (_executor == null) {
_executor = delegate.initializeShuffleExecutorComponents(appId, execId, extraConfigs.asJava)
}
_executor
}
lazy val driverComponents = delegate.initializeShuffleDriverComponents()
lazy val executorComponents = {
val env = SparkEnv.get
delegate.initializeShuffleExecutorComponents(
env.conf.getAppId,
env.executorId,
env.conf.getAllWithPrefix(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX).toMap.asJava)
}

I still have to test it. What is your opinion @mccheah ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I certainly gave lazy val a consideration. I'm not entirely familiar with the most modern Scala conventions - if lazy is preferred over explicit initialization methods in general, then I'm ok with the above recommendation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think transforming getOrCreateExecutorComponents into method without any argument is already big win.
I can let go the lazy val it is just implementation details.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I'll make those changes. Thanks!

@SparkQA
Copy link

SparkQA commented Sep 10, 2020

Test build #128474 has finished for PR 28618 at commit 2b5108f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mridulm
Copy link
Contributor

mridulm commented Sep 13, 2020

Btw, noticed that updateMapOutput was added as part of executor decommission support. We will need to hook that into ShuffleOutputTracker as well.

+CC @holdenk

@otterc
Copy link
Contributor

otterc commented Sep 14, 2020

I looked at the changes proposed here so that we can use the interfaces here for Push-based shuffle (SPIP and code).
In Push-based shuffle, we have introduced merge-statuses which represent all the map outputs that were merged into a larger block. These statuses are collected by the driver from the Shuffle Services.

I think we will be able to use the current ShuffleOutputTracker API. The implementation of this API could have the triggers for finalizing the shuffle merge.

I still have to wrap my head around how we can model mergeStatus as part of MapOutputMetadata. MultiplemapStatus's would point to a single mergeStatus, so this would introduce some complexity.

We may need to evolve them to fit the push-based shuffle use case. As long as we are open to potentially making some backward incompatible changes, these APIs look good to me for now.

@mccheah
Copy link
Contributor Author

mccheah commented Sep 14, 2020

Btw, noticed that updateMapOutput was added as part of executor decommission support. We will need to hook that into ShuffleOutputTracker as well.

@mridulm Can we also do this as follow-up? The main thing is, this patch is already at ~900 lines changed total (+s and -s combined) and I really don't want to increase the scope of this. This patch has already stalled from merging for awhile and I'd rather get something completed and have follow-up tasks than try to make the entire feature perfect in a single patch.

Can we add follow-up JIRA tasks that followed from the comments for purely additive changes to the API, and move forward with the scope of this patch as-is?

@mridulm
Copy link
Contributor

mridulm commented Sep 14, 2020

Sure @mccheah we can do that in follow up work to keep things more bite sized.
I was making sure we dont miss out of it, and tagging holden on this.

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (just a few nits)

final ShuffleMapOutputWriter mapWriter = shuffleExecutorComponents
.createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions());
return mapWriter.commitAllPartitions().getPartitionLengths();
return mapWriter.commitAllPartitions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
return mapWriter.commitAllPartitions();
mapOutputCommitMessage = mapWriter.commitAllPartitions();

// output file would have already been counted as shuffle bytes written.
partitionLengths = spills[0].partitionLengths;
long[] partitionLengths = spills[0].partitionLengths;
logger.debug("Merge shuffle spills for mapId {} with length {}", mapId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is not this log a bit misleading? I mean here is no merge done as there is only one spill.

Even transferMapSpillFile says:

The map spill file already has the proper format, and it contains all of the partition data.
So just transfer it directly to the destination without any merging.

@holdenk
Copy link
Contributor

holdenk commented Oct 6, 2020

Thanks for working on this :)

@tgravescs
Copy link
Contributor

sorry for my delay on getting back to this, could you up merge to latest?

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Test build #129906 has finished for PR 28618 at commit a6d974c.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34512/

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34512/

@SparkQA
Copy link

SparkQA commented Oct 17, 2020

Test build #129950 has finished for PR 28618 at commit f69cba7.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 17, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34555/

@SparkQA
Copy link

SparkQA commented Oct 17, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34555/

@holdenk
Copy link
Contributor

holdenk commented Oct 24, 2020

Jenkins retest this please.

@SparkQA
Copy link

SparkQA commented Oct 24, 2020

Test build #130236 has finished for PR 28618 at commit f69cba7.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 24, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34836/

@SparkQA
Copy link

SparkQA commented Oct 24, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34836/

@attilapiros
Copy link
Contributor

For helping @mccheah who is busy with other projects I cloned this PR as #30763.
I will keep that up-to-date with master and react to review comments.
So if this fine for you please continue the review discussions there.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Mar 26, 2021
@github-actions github-actions bot closed this Mar 27, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants