-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-54663][CORE] Computes RowBasedChecksum in ShuffleWriters #50230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4cd559f
53d11af
c7675b1
7b89c44
64dd36b
422e370
89901ca
a1c50fa
db59634
04e08eb
c9c28e6
d82bad2
2575d52
3b99edb
74266a5
22c79c8
df48158
cf28940
4cfaac8
786fdd3
602729c
137f254
dde16d4
f7d9dfa
5aabe70
2fd0a94
bbe26bf
1a8e9f7
97af717
5e01c52
ce29311
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.shuffle.checksum | ||
|
|
||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.spark.internal.Logging | ||
|
|
||
| /** | ||
| * A class for computing checksum for input (key, value) pairs. The checksum is independent of | ||
| * the order of the input (key, value) pairs. It is done by computing a checksum for each row | ||
| * first, then computing the XOR and SUM for all the row checksums and mixing these two values | ||
| * as the final checksum. | ||
| */ | ||
| abstract class RowBasedChecksum() extends Serializable with Logging { | ||
| private val ROTATE_POSITIONS = 27 | ||
| private var hasError: Boolean = false | ||
| private var checksumXor: Long = 0 | ||
| private var checksumSum: Long = 0 | ||
|
|
||
| /** | ||
| * Returns the checksum value. It returns the default checksum value (0) if there | ||
| * are any errors encountered during the checksum computation. | ||
| */ | ||
| def getValue: Long = { | ||
| if (!hasError) { | ||
| // Here we rotate the `checksumSum` to transforms these two values into a single, strong | ||
| // composite checksum by ensuring their bit patterns are thoroughly mixed. | ||
| checksumXor ^ rotateLeft(checksumSum) | ||
| } else { | ||
| 0 | ||
| } | ||
| } | ||
|
|
||
| /** Updates the row-based checksum with the given (key, value) pair. Not thread safe. */ | ||
| def update(key: Any, value: Any): Unit = { | ||
| if (!hasError) { | ||
| try { | ||
| val rowChecksumValue = calculateRowChecksum(key, value) | ||
| checksumXor = checksumXor ^ rowChecksumValue | ||
| checksumSum += rowChecksumValue | ||
| } catch { | ||
| case NonFatal(e) => | ||
| logError("Checksum computation encountered error: ", e) | ||
| hasError = true | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** Computes and returns the checksum value for the given (key, value) pair */ | ||
| protected def calculateRowChecksum(key: Any, value: Any): Long | ||
|
|
||
| // Rotate the value by shifting the bits by `ROTATE_POSITIONS` positions to the left. | ||
| private def rotateLeft(value: Long): Long = { | ||
| (value << ROTATE_POSITIONS) | (value >>> (64 - ROTATE_POSITIONS)) | ||
| } | ||
| } | ||
|
|
||
| object RowBasedChecksum { | ||
| def getAggregatedChecksumValue(rowBasedChecksums: Array[RowBasedChecksum]): Long = { | ||
| Option(rowBasedChecksums) | ||
| .map(_.foldLeft(0L)((acc, c) => acc * 31L + c.getValue)) | ||
| .getOrElse(0L) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.util; | ||
|
|
||
| import java.io.ByteArrayOutputStream; | ||
|
|
||
| /** Subclass of ByteArrayOutputStream that exposes `buf` directly. */ | ||
| public final class ExposedBufferByteArrayOutputStream extends ByteArrayOutputStream { | ||
| public ExposedBufferByteArrayOutputStream(int size) { super(size); } | ||
| public byte[] getBuf() { return buf; } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,7 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue, ThreadPoolE | |
| import java.util.concurrent.locks.ReentrantReadWriteLock | ||
|
|
||
| import scala.collection | ||
| import scala.collection.mutable.{HashMap, ListBuffer, Map} | ||
| import scala.collection.mutable.{HashMap, ListBuffer, Map, Set} | ||
| import scala.concurrent.{ExecutionContext, Future} | ||
| import scala.concurrent.duration.Duration | ||
| import scala.jdk.CollectionConverters._ | ||
|
|
@@ -99,6 +99,12 @@ private class ShuffleStatus( | |
| */ | ||
| val mapStatusesDeleted = new Array[MapStatus](numPartitions) | ||
|
|
||
| /** | ||
| * Keep the indices of the Map tasks whose checksums are different across retries. | ||
| * Exposed for testing. | ||
| */ | ||
| private[spark] val checksumMismatchIndices: Set[Int] = Set() | ||
|
|
||
| /** | ||
| * MergeStatus for each shuffle partition when push-based shuffle is enabled. The index of the | ||
| * array is the shuffle partition id (reduce id). Each value in the array is the MergeStatus for | ||
|
|
@@ -169,6 +175,15 @@ private class ShuffleStatus( | |
| } else { | ||
| mapIdToMapIndex.remove(currentMapStatus.mapId) | ||
| } | ||
| logDebug(s"Checksum of map output for task ${status.mapId} is ${status.checksumValue}") | ||
|
|
||
| val preStatus = | ||
| if (mapStatuses(mapIndex) != null) mapStatuses(mapIndex) else mapStatusesDeleted(mapIndex) | ||
| if (preStatus != null && preStatus.checksumValue != status.checksumValue) { | ||
| logInfo(s"Checksum of map output changes from ${preStatus.checksumValue} to " + | ||
| s"${status.checksumValue} for task ${status.mapId}.") | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| checksumMismatchIndices.add(mapIndex) | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are three main cases here:
For the latter two, we dont need to track it in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, for case 1, we need to track the mismatches. The usage of checksumMismatchIndices is that (in the next PR) we will rollback the downstream stages, if we detect checksum mismatches for its upstream stages. For case 2, if downstream stages have not consumed output, which means they have not started. In this case, the rollback is a no-op, and it doesn't hurt to record the mismatches here. For case 3, I think we need to record the mismatches. Assuming a situation where all partitions of a stage have finished, while some speculative tasks are still running. As all outputs have been produced, the downstream stage can start and read from the data. Later, some speculative tasks finish, and new mapStatus will override the old mapStatus with new data location. For the downstream stage, the not yet started tasks or retried tasks would read from the new data, while the finished and running tasks would read from the old data, resulting in inconsistency.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It is unclear how
That is fair, this is indeed possible. |
||
| mapStatuses(mapIndex) = status | ||
| mapIdToMapIndex(status.mapId) = mapIndex | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.