Skip to content

Conversation

@turboFei
Copy link
Member

@turboFei turboFei commented Oct 31, 2019

What changes were proposed in this pull request?

For dynamic partition overwrite, its working dir is .spark-staging-{jobId}.
Task file name formatted part-$taskId-$jobId$ext(regardless task attempt Id).
Each task writes its output to:

  • .spark-staging-{jobId}/partitionPath1/taskFileName1
  • .spark-staging-{jobId}/partitionPath2/taskFileName2
  • ...
  • .spark-staging-{jobId}/partitionPathN/taskFileNameN

If speculation is enabled, there may be several tasks, which have same taskId and different attemptId, write to the same files concurrently.
For distributedFileSystem, it only allow one task to hold the lease to write a file, if two tasks want to write the same file, an exception like no lease on inode would be thrown.

Even speculation is not enabled, if a task aborted due to Executor OOM, its output would not be cleaned up.
Then a new task launched to write the same file, because parquet disallows overwriting, a FileAlreadyExistsException would be thrown, like.

Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: /user/hive/warehouse/t2/.spark-staging-1f1efbfd-7e20-4e0f-a49c-a7fa3eae4cb1/part1=2/part2=2/part-00000-1f1efbfd-7e20-4e0f-a49c-a7fa3eae4cb1.c000.snappy.parquet for client 127.0.0.1 already exists
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2578)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2465)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2349)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:624)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:398)

It is a critical issue and would cause job failed.

In this PR, we fix this issue with the solution below:

  1. set a working path under staging dir named partitionPath-attemptId.
  2. after task completed, rename partitionPath-attemptId/fileName to partitionPath/fileName

Why are the changes needed?

Without this PR, dynamic partition overwrite operation might fail.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added UT.

@turboFei
Copy link
Member Author

turboFei commented Oct 31, 2019

cc @cloud-fan @advancedxy @viirya @wangyum
Can you help take a look? Thanks in advance.

@turboFei
Copy link
Member Author

turboFei commented Nov 7, 2019

gentle ping @cloud-fan @advancedxy @viirya

@turboFei
Copy link
Member Author

gentle ping @cloud-fan @advancedxy @viirya Could you help take a look? Thanks in advance!

@dbtsai
Copy link
Member

dbtsai commented Nov 18, 2019

Jenkins, okay to test.

@turboFei
Copy link
Member Author

retest this please.

@dongjoon-hyun
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Dec 12, 2019

Test build #115200 has finished for PR 26339 at commit 286a87f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@turboFei turboFei force-pushed the SPARK-27194-dynamic-cleanUp branch from 286a87f to 4c18493 Compare December 12, 2019 06:52
@SparkQA
Copy link

SparkQA commented Dec 12, 2019

Test build #115221 has finished for PR 26339 at commit 4c18493.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@turboFei
Copy link
Member Author

@dongjoon-hyun test passed. Could you help take a look? thanks in advance!

@turboFei
Copy link
Member Author

gentle ping @dongjoon-hyun @dbtsai @viirya

@ramesh-muthusamy
Copy link
Contributor

@turboFei do we have test cases to capture the changes.

@turboFei
Copy link
Member Author

@turboFei do we have test cases to capture the changes.

I'll try my best to think how to add UT.

val fileName = stagingTaskFile.getName
val taskPartitionPath = getPartitionPath(stagingTaskFile)
val destFile = new Path(new Path(stagingDir, taskPartitionPath), fileName)
fs.rename(stagingTaskFile, destFile)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fs.rename returns boolean in specific cases , please handle the same.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@dongjoon-hyun
Copy link
Member

Can we have a test case for this PR?

@dongjoon-hyun
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Mar 30, 2020

Test build #120566 has finished for PR 26339 at commit 4c18493.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@venkata91
Copy link
Contributor

@dongjoon-hyun @turboFei Is this PR still being worked on? We are having similar issues in our platform for a while, it would be great if we can get this fixed soon.

@turboFei
Copy link
Member Author

turboFei commented Apr 1, 2020

I will follow it. But I am confused that how to add an UT.
PS: I am a little busy this week.

@turboFei turboFei force-pushed the SPARK-27194-dynamic-cleanUp branch 3 times, most recently from 79d9443 to 9c9f39d Compare April 3, 2020 15:31
dir.map { d =>
new Path(new Path(stagingDir, d), filename).toString
if (dynamicPartitionOverwrite) {
val tempFile = new Path(dynamicStagingTaskPath(dir.get, taskContext), filename)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: dir.get -> d

@Ngone51
Copy link
Member

Ngone51 commented May 6, 2020

ping @cloud-fan

I think the solution given in this PR could work for the issue.

@SparkQA
Copy link

SparkQA commented May 6, 2020

Test build #122368 has finished for PR 26339 at commit 100d0fe.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented May 6, 2020

retest this please

@SparkQA
Copy link

SparkQA commented May 7, 2020

Test build #122382 has finished for PR 26339 at commit 100d0fe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 8, 2020

Test build #122437 has finished for PR 26339 at commit 8feafc4.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 8, 2020

Test build #122438 has finished for PR 26339 at commit f9ae20f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@turboFei
Copy link
Member Author

@jerryshao @cloud-fan could you help review this pr? It can help resolve a critical issue. thanks in advance.

@turboFei
Copy link
Member Author

also cc @jiangxb1987

@HyukjinKwon
Copy link
Member

also cc @vanzin

@koertkuipers
Copy link
Contributor

i am getting worried now this wont make it into spark 3.0.0
this is a fault tolerance bug in spark. not as serious as a correctness issue, but pretty high up there i would say (whats point of deploying distributed fault tolerant system if its not fault tolerant)...

@Ngone51
Copy link
Member

Ngone51 commented May 22, 2020

@koertkuipers you could send your concern to the vote of Spark 3.0 release and see if PMC/committer would consider it as release blocker or not.

@koertkuipers
Copy link
Contributor

@Ngone51 yeah i thought about doing that, but i dont want to slowdown the spark 3 release even more (and this is not a regression i guess?). now i am just hoping someone sees my messages here and reviews this before spark 3.0.0 rc3!

@Ngone51
Copy link
Member

Ngone51 commented May 22, 2020

The vote thread now has more eys on than this PR and as you know this PR is somehow overlooked for a while.

@turboFei turboFei force-pushed the SPARK-27194-dynamic-cleanUp branch from f9ae20f to de9f206 Compare June 18, 2020 07:03
…ion overwrite a task would conflict with its speculative task
@turboFei turboFei force-pushed the SPARK-27194-dynamic-cleanUp branch from de9f206 to 717d9a5 Compare June 18, 2020 07:04
@SparkQA
Copy link

SparkQA commented Jun 18, 2020

Test build #124203 has finished for PR 26339 at commit 717d9a5.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please

@SparkQA
Copy link

SparkQA commented Jun 22, 2020

Test build #124343 has finished for PR 26339 at commit 717d9a5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LuciferYang
Copy link
Contributor

@dongjoon-hyun @turboFei Is this PR still being worked on? We are having similar issues in our production environment, and I found there are similar PRs try to solve this problem, such as #26090, #26971

@turboFei
Copy link
Member Author

Gentle ping @dongjoon-hyun @dbtsai

@turboFei
Copy link
Member Author

turboFei commented Jul 3, 2020

close this and will a create a new pr with new solution. thanks

@turboFei turboFei closed this Jul 3, 2020
@koertkuipers
Copy link
Contributor

close this and will a create a new pr with new solution. thanks

why close this? did you find a better approach?

@turboFei
Copy link
Member Author

turboFei commented Jul 3, 2020

close this and will a create a new pr with new solution. thanks

why close this? did you find a better approach?

Hi, here is the new patch.
In the new solution, I define a new OutputCommitter.
I am stilling working on it.
#28989

@koertkuipers
Copy link
Contributor

close this and will a create a new pr with new solution. thanks

why close this? did you find a better approach?

Hi, here is the new patch.
In the new solution, I define a new OutputCommitter.
I am stilling working on it.
#28989

thank you. curious why you changed direction... if there is anything wrong with approach in this pullreq? we were just about to start testing it at scale that's why i ask.
best

@turboFei
Copy link
Member Author

turboFei commented Jul 3, 2020

close this and will a create a new pr with new solution. thanks

why close this? did you find a better approach?

Hi, here is the new patch.
In the new solution, I define a new OutputCommitter.
I am stilling working on it.
#28989

thank you. curious why you changed direction... if there is anything wrong with approach in this pullreq? we were just about to start testing it at scale that's why i ask.
best

In the origin solution, when renaming staging task file to final file.
We judge whether the final file exists and then judge whether rename staging task file.

It is tricky that the final files may from different tasks.

If the task output for a partition has multi files(or bucket table insert case), the data might be corrupted.

So, we need outputCommitCoordinator to help decide which task can commit.

In the new solution, we define a new output committer to leverage outputCommitCoordinator(by invoking SparkHadoopMapRedUtil.commitTask)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.