Skip to content

Conversation

@WinkerDu
Copy link
Contributor

@WinkerDu WinkerDu commented Dec 20, 2019

What changes were proposed in this pull request?

This PR fix insert overwrite table error when running multiple speculative task. This PR introduces task attempt id appended to dynamic partition staging dir to identify different speculative task output dir rather than all speculative tasks use same output dir. CommitTask in HadoopMapReduceCommitProtocol carries attempt id in TaskCommitMessage

Why are the changes needed?

This PR fix insert overwrite to DataSource table with dynamic partition error when running multiple task attempts. Suppose there are one task attempt and one speculative task attempt, the speculative would raise FileAlreadyExistsException because of same staging dir attempt tasks commit

Does this PR introduce any user-facing change?

How was this patch tested?

Added UT

…mic partition error when running multiple task attempts
@WinkerDu
Copy link
Contributor Author

@xuanyuanking @LuciferYang @LinhongLiu pls have a review

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@WinkerDu
Copy link
Contributor Author

cc @cloud-fan

if (hasValidPath) {
val (allAbsPathFiles, allPartitionPaths) =
taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip
val (allAbsPathFiles, allPartitionPaths, successAttemptIDs) =
Copy link
Contributor

@LuciferYang LuciferYang Dec 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add implicit function like

implicit def asPair(x:(Map[String, String], Set[String], String)) = (x._1, (x._2, x._3))

before line 172, then we can unzip taskCommits as (Map[String, String], (Set[String], String)) and eliminate re-zip operation at line 188

Is it right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable, I'll try implicit conversion

.createOptional

private[spark] val MAX_LOCAL_TASK_FAILURES = ConfigBuilder("spark.task.local.maxFailures")
.doc("The max failure times for a task while SparkContext running in Local mode, " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How could you launch speculative task when running under local mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In UT class InsertWithMultipleTaskAttemptSuite, I don't expect launching speculative task in local mode. Actually, I make a customized commit protocol named "InsertExceptionCommitProtocol" in InsertWithMultipleTaskAttemptSuite, which overriding commitTask method to fail the first commit task on purpose and restore to normal in subsequent commit tasks. This scene is similar to what happened in speculative tasks with failure.

@WinkerDu
Copy link
Contributor Author

WinkerDu commented Mar 5, 2020

cc @dongjoon-hyun please have a review?

// on the rename.
fs.mkdirs(finalPartPath.getParent)
}
fs.rename(new Path(s"$stagingDir/$successAttemptID", part), finalPartPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do i understand it correctly that part here is a directory (e.g. x=1/y=2), not a file? so a directory full of files is being moved.
if so couldn't multiple tasks write to the same partition? and then wouldnt these moves conflict with each other?

@ramesh-muthusamy
Copy link
Contributor

@WinkerDu I think the same issue is being worked upon in the PR for the ticket. https://issues.apache.org/jira/browse/SPARK-27194
#26339
Please validate

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants