Skip to content

Conversation

@turboFei
Copy link
Member

@turboFei turboFei commented Jul 3, 2020

What changes were proposed in this pull request?

For dynamic partition overwrite, its working dir is .spark-staging-{jobId}.
Task file name formatted part-$taskId-$jobId$ext(regardless task attempt Id).
Each task writes its output to:

  • .spark-staging-{jobId}/partitionPath1/taskFileName1
  • .spark-staging-{jobId}/partitionPath2/taskFileName2
  • ...
  • .spark-staging-{jobId}/partitionPathN/taskFileNameN

If speculation is enabled, there may be several tasks, which have same taskId and different attemptId, write to the same files concurrently.
For distributedFileSystem, it only allow one task to hold the lease to write a file, if two tasks want to write the same file, an exception like no lease on inode would be thrown.

Even speculation is not enabled, if a task aborted due to Executor OOM, its output would not be cleaned up.
Then a new task launched to write the same file, because parquet disallows overwriting, a FileAlreadyExistsException would be thrown, like.

Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: /user/hive/warehouse/t2/.spark-staging-1f1efbfd-7e20-4e0f-a49c-a7fa3eae4cb1/part1=2/part2=2/part-00000-1f1efbfd-7e20-4e0f-a49c-a7fa3eae4cb1.c000.snappy.parquet for client 127.0.0.1 already exists
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2578)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2465)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2349)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:624)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:398)

It is a critical issue and would cause job failed.

In this Pr, we define a spark staging output committer to fix this issue:

  1. set a working path under staging dir named partitionPath-attemptId.
  2. after task completed, rename partitionPath-attemptId/fileName to partitionPath/fileName
  3. leverage the OutputCommitCoordinator to coordinate the task commits

Why are the changes needed?

Without this PR, dynamic partition overwrite operation might fail.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added UT.

@turboFei turboFei changed the title [SPARK-27194][SPARK-29302][SQL] Define a spark staing committer to resolve FileAlreadyExistingException [WIP][SPARK-27194][SPARK-29302][SQL] Define a spark staing committer to resolve FileAlreadyExistingException Jul 3, 2020
@turboFei turboFei changed the title [WIP][SPARK-27194][SPARK-29302][SQL] Define a spark staing committer to resolve FileAlreadyExistingException [SPARK-27194][SPARK-29302][SQL] Define a spark staging committer to resolve FileAlreadyExistingException Jul 3, 2020
@turboFei
Copy link
Member Author

turboFei commented Jul 3, 2020

gentle ping @cloud-fan

Hi, we found a new solution to fix the issues when dynamic partition overwrite is enabled.

  1. FileAlreadyExsitException when executor was crash
  2. task conflicts with its speculation task.

In this PR, we define a new type of OutputCommitter and leverage the OutputCommitCoordinator to coordinate the task commits.
Could you kindly give some suggestions?

@turboFei turboFei force-pushed the SPARK-27194-custom-committer branch 3 times, most recently from 07066e7 to 8f202c9 Compare July 3, 2020 04:08
@turboFei
Copy link
Member Author

turboFei commented Jul 3, 2020

aslo cc @Ngone51

@turboFei
Copy link
Member Author

turboFei commented Jul 6, 2020

@turboFei turboFei force-pushed the SPARK-27194-custom-committer branch 2 times, most recently from cd42f42 to b09a665 Compare July 6, 2020 06:10
@dongjoon-hyun
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Jul 6, 2020

Test build #125132 has started for PR 28989 at commit b09a665.

@shaneknapp
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Jul 7, 2020

Test build #125143 has finished for PR 28989 at commit b09a665.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class SparkStagingOutputCommitter(

@turboFei
Copy link
Member Author

turboFei commented Jul 7, 2020

will try to fix it.

@turboFei turboFei changed the title [SPARK-27194][SPARK-29302][SQL] Define a spark staging committer to resolve FileAlreadyExistingException [WIP][SPARK-27194][SPARK-29302][SQL] Define a spark staging committer to resolve FileAlreadyExistingException Jul 7, 2020
@turboFei turboFei force-pushed the SPARK-27194-custom-committer branch from b09a665 to 0492977 Compare July 7, 2020 08:11
@SparkQA
Copy link

SparkQA commented Jul 7, 2020

Test build #125196 has finished for PR 28989 at commit 0492977.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class SparkStagingOutputCommitter(

@SparkQA
Copy link

SparkQA commented Jul 7, 2020

Test build #125205 has finished for PR 28989 at commit bc7d2b5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 7, 2020

Test build #125212 has finished for PR 28989 at commit a7a8d4b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@turboFei turboFei changed the title [WIP][SPARK-27194][SPARK-29302][SQL] Define a spark staging committer to resolve FileAlreadyExistingException [SPARK-27194][SPARK-29302][SQL] Define a spark staging committer to resolve FileAlreadyExistingException Jul 8, 2020
@SparkQA
Copy link

SparkQA commented Jul 8, 2020

Test build #125275 has finished for PR 28989 at commit 0d722a9.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@turboFei
Copy link
Member Author

turboFei commented Jul 9, 2020

close this,
I am wrong about that, I thought the taskAttemptContext.getTaskAttemptId.getId is same with the taskAttemptId of spark and it would at most create several(the largest task attempt number) staging partition dir.
But the taskAttemptContext.getTaskAttemptId.getId is also a uniq id, so this would also create multi staging partition dir for each task.

Prefer to #29000

@turboFei turboFei closed this Jul 9, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants