Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
handle.shuffleId, _ => new OpenHashSet[Long](16))
mapTaskIds.synchronized { mapTaskIds.add(context.taskAttemptId()) }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For more convenient review:

ShuffleMapTask#runTask
val mapId = if (SparkEnv.get.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) { partitionId } else context.taskAttemptId()

SortShuffleManager#getWriter
val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent( handle.shuffleId, _ => new OpenHashSet[Long](16)) mapTaskIds.synchronized { mapTaskIds.add(context.taskAttemptId()) }

SortShuffleManager#unregisterShuffle
Option(taskIdMapsForShuffle.remove(shuffleId)).foreach { mapTaskIds => mapTaskIds.iterator.foreach { mapTaskId => shuffleBlockResolver.removeDataByMap(shuffleId, mapTaskId) } }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xuanyuanking do you have an opinion here? I think you wrote this piece of the code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pinging me, I'm reviewing #31664

mapTaskIds.synchronized { mapTaskIds.add(mapId) }
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
test("concat") {
def testConcat(inputs: String*): Unit = {
val expected = if (inputs.contains(null)) null else inputs.mkString
checkEvaluation(Concat(inputs.map(Literal.create(_, StringType))), expected, EmptyRow)
checkEvaluation(Concat(inputs.map(Literal.create(_, StringType))), expected)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewer : clean up unused code for code simplifications.
Method signature for checkEvaluation as follow:
checkEvaluation(expression: => Expression, expected: Any, inputRow: InternalRow = EmptyRow)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this test change related to the code change?

}

testConcat()
Expand All @@ -50,7 +50,7 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
test("SPARK-22498: Concat should not generate codes beyond 64KB") {
val N = 5000
val strs = (1 to N).map(x => s"s$x")
checkEvaluation(Concat(strs.map(Literal.create(_, StringType))), strs.mkString, EmptyRow)
checkEvaluation(Concat(strs.map(Literal.create(_, StringType))), strs.mkString)
}

test("SPARK-22771 Check Concat.checkInputDataTypes results") {
Expand All @@ -73,7 +73,7 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
case s: String => Literal.create(s, StringType)
}
val sepExpr = Literal.create(sep, StringType)
checkEvaluation(ConcatWs(sepExpr +: inputExprs), expected, EmptyRow)
checkEvaluation(ConcatWs(sepExpr +: inputExprs), expected)
}

// scalastyle:off
Expand All @@ -99,12 +99,12 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val sepExpr = Literal.create("#", StringType)
val strings1 = (1 to N).map(x => s"s$x")
val inputsExpr1 = strings1.map(Literal.create(_, StringType))
checkEvaluation(ConcatWs(sepExpr +: inputsExpr1), strings1.mkString("#"), EmptyRow)
checkEvaluation(ConcatWs(sepExpr +: inputsExpr1), strings1.mkString("#"))

val strings2 = (1 to N).map(x => Seq(s"s$x"))
val inputsExpr2 = strings2.map(Literal.create(_, ArrayType(StringType)))
checkEvaluation(
ConcatWs(sepExpr +: inputsExpr2), strings2.map(s => s(0)).mkString("#"), EmptyRow)
ConcatWs(sepExpr +: inputsExpr2), strings2.map(s => s(0)).mkString("#"))
}

test("elt") {
Expand Down