Skip to content

Conversation

@turboFei
Copy link
Member

@turboFei turboFei commented Sep 15, 2019

What changes were proposed in this pull request?

Case:
Application appA insert overwrite table table_a with static partition overwrite.
But it was killed when committing tasks, because one task is hang.
And parts of its committed tasks output is kept under /path/table_a/_temporary/0/.

Then we run application appB insert overwrite table table_a with dynamic partition overwrite.
It executes successfully.
But it also commit the data under /path/table_a/_temporary/0/ to destination dir.

In this PR, we skip the FileOutputCommitter.{setupJob, commitJob, abortJob} operations when dynamicPartitionOverwrite.

Why are the changes needed?

Data may corrupt without this PR.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing Unit Test.

@wangyum
Copy link
Member

wangyum commented Sep 15, 2019

ok to test

@SparkQA
Copy link

SparkQA commented Sep 15, 2019

Test build #110617 has finished for PR 25795 at commit c3aff4e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
}

test("SPARK-29037: Spark gives duplicate result when an application was killed") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this. This looks like a good IT. If possible,

  • Can we have a concise UT at core module because you are touching HadoopMapReduceCommitProtocol?
  • Otherwise, can we move this to sql/core module instead of sql/hive because you are using using parquet?

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, @turboFei . Unfortunately, the UT seems to pass without your PR. Could you check that?

$ git diff master --stat
 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 30 ++++++++++++++++++++++++++++++
 1 file changed, 30 insertions(+)

$ build/sbt "hive/testOnly *.HiveQuerySuite" -Phive
...
[info] - SPARK-29037: Spark gives duplicate result when an application was killed (1 second, 177 milliseconds)
[info] ScalaTest
[info] Run completed in 51 seconds, 894 milliseconds.
[info] Total number of tests run: 137
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 137, failed 0, canceled 0, ignored 2, pending 0
[info] All tests passed.
[info] Passed: Total 137, Failed 0, Errors 0, Passed 137, Ignored 2
[success] Total time: 233 s, completed Sep 15, 2019 2:21:42 PM

@HyukjinKwon
Copy link
Member

retest this please

@turboFei
Copy link
Member Author

@dongjoon-hyun
Thanks for your suggestion.
I have moved the UT to sql/core module.
And I found that the committer here is an instance of org.apache.parquet.hadoop.ParquetOutputCommitter .
And ParquetOutputCommitter is only used in sql/core module.
So, I think we only need add UT to sql/core module.
Can you help take a look again? Thanks in advance!

@SparkQA
Copy link

SparkQA commented Sep 16, 2019

Test build #110637 has finished for PR 25795 at commit 804ac8c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@turboFei
Copy link
Member Author

The failed test org.apache.spark.SparkContextSuite passed locally.
retest please.

@SparkQA
Copy link

SparkQA commented Sep 17, 2019

Test build #110725 has finished for PR 25795 at commit 144ec63.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member

wangyum commented Sep 17, 2019

cc @vanzin @steveloughran

@dongjoon-hyun
Copy link
Member

Hi, @cloud-fan and @gatorsmile .
Could you review this PR, please?

@dongjoon-hyun dongjoon-hyun removed their request for review September 17, 2019 06:17
val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
committer = setupCommitter(taskAttemptContext)
committer.setupJob(jobContext)
if (!dynamicPartitionOverwrite) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to add comments to explain it. It looks to me that the hadoop output committer doesn't support concurrent writing to the same directory by design, so there is nothing we can do at Spark side.

The fix here is to avoid using the hadoop output committer when dynamicPartitionOverwrite=true. I'm fine with this fix.

BTW, when writing partitioned table with dynamicPartitionOverwrite=false, can we support it as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also cc @advancedxy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and for non-partitioned table, can we clean up the staging dir when the job is killed?

Copy link
Member Author

@turboFei turboFei Sep 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@advancedxy has discussed with me offline about writing partitioned table with dynamicPartitionOverwrite=false .
He proposed a suggestion that, we can add JobAttemptPath(_temporary/0) existence check when dynamicPartitionOverwrite=false.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a non-partitioned table, dynamicPartitionOverwrite is false, and the staging dir is under JobAttemptPath(_temporary/0), I think the staging dir will be cleaned up by FileOutputCommitter.abortJob().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the staging dir will be cleaned up by FileOutputCommitter.abortJob().

Why it can't be cleaned when dynamicPartitionOverwrite=true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a job is killed, its staging dir can be cleaned up by abortJob method.
But when an application is killed, its job's staging dir would not be cleaned up gracefully.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the staging dir will be cleaned up by FileOutputCommitter.abortJob().

Why it can't be cleaned when dynamicPartitionOverwrite=true?

For the case in PR description, It is happened when appA(static partition overwrite) is killed and its staging dir is not cleaned up gracefully, then appB commits parts result of appA.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so we can't rely on the job cleanup. And ideally we should use different staging dir for each job.

That said, seems we can't fix the problem for non-partitioned table if we continue to use the hadoop output committer.

Copy link
Member Author

@turboFei turboFei Sep 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a solution proposed by advancedxy is adding job attempt path existence check for non-partitioned table and static partition overwrite operation.

And the implementation of InsertIntoHiveTable uses different staging dir for each job.

@SparkQA
Copy link

SparkQA commented Sep 17, 2019

Test build #110760 has finished for PR 25795 at commit 979fbe4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks @steveloughran for the detailed explanation!

Seems it's very complicated to make Spark support concurrent and object-store friendly files writing. We should ask users to try object-store-first Data structures such as Delta and Iceberg.

I'd like to narrow down the scope. Spark should at least guarantee data correctness. Duplicated data should not happen. @turboFei @advancedxy is it possible to detect concurrent writes and fail the query?

@turboFei
Copy link
Member Author

turboFei commented Sep 17, 2019

I think it is feasible to detect concurrent writes.
As known, when dynamicPartitionOverwrite is true, the job attempt path is .spark-staging-${UUID}.
when dynamicPartitionOverwrite is false, it is _temporary.
We just need check the existences of these paths and throw exception to fast fail when concurrent writes are detected, Include:

  • dynamicPartitionOverwrite and dynamicPartitionOverwrite
  • dynamicPartitionOverwrite and staticPartitionOverwrite/non-partitioned table write
  • staticPartitionOverwrite/non-partitioned table write and staticPartitionOverwrite/non-partitioned table write

@advancedxy @cloud-fan

@cloud-fan
Copy link
Contributor

when dynamicPartitionOverwrite=true, we still set up the output committer, will that create the _temporary dir?

@cloud-fan
Copy link
Contributor

BTW .spark-staging-${UUID} is not a concern. UUID should never conflict.

@turboFei
Copy link
Member Author

turboFei commented Sep 17, 2019

when dynamicPartitionOverwrite=true, we still set up the output committer, will that create the _temporary dir?

No, setupCommitter will not create the _temporary dir, it is created by committer.setupJob(jobContext).

BTW .spark-staging-${UUID} is not a concern. UUID should never conflict.

Yes, UUID is not a concern, but we can check the directory name whether started with .spark-staging-.

@advancedxy
Copy link
Contributor

We just need check the existences of these paths and throw exception to fast fail when concurrent writes are detected, Include:

  • dynamicPartitionOverwrite and dynamicPartitionOverwrite
  • dynamicPartitionOverwrite and staticPartitionOverwrite/non-partitioned table write
  • staticPartitionOverwrite/non-partitioned table write and staticPartitionOverwrite/non-partitioned table write

Yeah, this is what we discussed offline. To fail fast, Spark should check output existence to detect concurrent writes to the same partitions.

Seems it's very complicated to make Spark support concurrent and object-store friendly files writing. We should ask users to try object-store-first Data structures such as Delta and Iceberg.

Yes, it's hard for Spark to support concurrent writes. But the scope of #25739 and this one is to support concurrent writes to different partitions, which can be a nice feature and not that hard to achieve.

@cloud-fan
Copy link
Contributor

@advancedxy can you give a completed proposal for it? Basically the requirement:

  1. concurrent writing to the same non-partitioned table should fail
  2. concurrent writing to different partitions should success
  3. concurrent writing to the same partition should fail

@SparkQA
Copy link

SparkQA commented Sep 17, 2019

Test build #110799 has finished for PR 25795 at commit f0b6c84.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@turboFei turboFei changed the title [SPARK-29037][Core] Spark gives duplicate result when an application was killed [WIP][SPARK-29037][Core] Spark gives duplicate result when an application was killed Sep 17, 2019
@advancedxy
Copy link
Contributor

@advancedxy can you give a completed proposal for it?

All right, I think the requirements can be split into two parts:

  1. support concurrent writes to different locations(partitions).
    It's achieved by setting different output path for different writes:

  2. detect concurrent writes to the same location and fail fast.
    This can be archived during setupJob stage. We can check the existence of output path like the FileOutputFormat did. If the output path has already been existed, it must be created by other concurrent writing job or left by previous failed/killed job. We can throw an exception with the possible reasons and fails the current job. Of course, we cannot simple check the output passed to JobConf as the $table_output should be presented(unless the first time to create table). $table_output/_temporary/$app_attempt_num could be a good candidate.

    One more thing to do in Spark, spark should infer yarn app attempt num when running under yarn mode. Currently, the app attempt num is always 0 when writing.

I believe the approach proposal should covers concurrent writes and case in this pr. WDYT @cloud-fan, @turboFei and @wangyum

@cloud-fan
Copy link
Contributor

SGTM

@cloud-fan
Copy link
Contributor

So #25739 is to support concurrent writes to different locations, and this PR is to detect concurrent writes to the same location and fail fast?

@advancedxy
Copy link
Contributor

So #25739 is to support concurrent writes to different locations, and this PR is to detect concurrent writes to the same location and fail fast?

I am fine with that as long as @turboFei is ok.

@turboFei
Copy link
Member Author

turboFei commented Sep 18, 2019

So #25739 is to support concurrent writes to different locations, and this PR is to detect concurrent writes to the same location and fail fast?

So #25739 is to support concurrent writes to different locations, and this PR is to detect concurrent writes to the same location and fail fast?

I am fine with that as long as @turboFei is ok.

@turboFei
Copy link
Member Author

turboFei commented Sep 18, 2019

Then when dynamicPartitionOverwrite=false , if I detect there is sub dir under $table_output/_temporary exists, it should fast fail.

With my knowledge.
In this PR, When enableDynamicOverwrite is false.
Case:
tbl_a(c1 int, c2 int) partitioned by (c2)
For the sql: insert overwrite table tbl_a select ...
I can not detect it's destination partition.
So I think if I detect there is sub dir under $table_output/_temporary exists, it should fast fail.

@turboFei
Copy link
Member Author

I have discussed with advancedxy offline, and I am clearly for the solution now.
Thanks @advancedxy . I will complete this PR after #25739 is merged.
Thanks again.

@cloud-fan
Copy link
Contributor

cloud-fan commented Sep 19, 2019

We can check the existence of output path like the FileOutputFormat did.

How does it work for dynamicPartitionOverwrite=true? The output path is staging dir which is unique. Can we detect 2 jobs that are writing to the same table with dynamicPartitionOverwrite=true?

@advancedxy
Copy link
Contributor

How does it work for dynamicPartitionOverwrite=true? The output path is staging dir which is unique. Can we detect 2 jobs that are writing to the same table with dynamicPartitionOverwrite=true?

Looks like there is no easy way to detect concurrent writes with dynamicPartitionOverwrite=true.
Maybe we should notice user about that limitation.

User won't get duplicated result in this case, but the result could be messed(some part replaced by other part, while some part remains) when writing concurrently since no transaction is involved.

@steveloughran
Copy link
Contributor

@cloud-fan wrote

Seems it's very complicated to make Spark support concurrent and object-store friendly files writing.

Mostly it's about S3, that being the hard one to work with. GCS is consistent; file rename O(1); dir rename atomic O(files). Azure Datalake Gen2 (ABFS connector) works exactly like a real filesystem. But they all have latency issues, directory listing overheads, and the other little costs that make switching to them tricky.

S3 is really a fault injection framework for all your assumptions about what a store is.

We should ask users to try object-store-first Data structures such as Delta and Iceberg.

+1

What those object-store-first Data structures do promise is not just correctness in the presence of that fault injection layer, they scale well in world where enumerating files under a directory tree is no longer a low-to-medium cost operation.

w.r.t dynamic partition updates, for AWS S3 the best thing to do may be to say "don't"; at least not this way. But you still have to get it right everywhere else.

Spark should at least guarantee data correctness.

I concur.

Now, one thing stepping through the FileOutputFormat code in a debugger while taking notes showed me is that the critical commit algorithms everyone relies on are barely documented and under-tested when it comes to all their failure paths. And once you realise that the v2 commit algorithm doesn't meet the expectations of MR or Spark, you worry about the correctness of any application which has used it.

Now seems a good time, to not just get the dynamic partition overwrite code to work -but to specify that algorithm enough to be able to convince others of its correctness on a consistent file system. I did try to start with a TLA+ spec of what S3 did, but after Lamport told me I misunderstood the nature of "eventually" I gave up. I hear nice things about Isabelle/HOL these days -maybe someone sufficiently motivated could try to use it.

BTW, if you haven't read it, look at the Stocator: A High Performance Object Store Connector for Spark by @gilv et al, who use task ID information in the file names to know what to roll back in the presence of failure. That is: rather than trying to get commit right, they get abort right.

(returns to debugging a console log where, somehow, cached 404s in the S3 load balancer are still causing problems despite HADOOP-16490 fixing it).

@turboFei
Copy link
Member Author

turboFei commented Sep 20, 2019

@cloud-fan
For the dynamicPartitionOverwrite=true, I propose my thought here:

We can unify the staging dir for both dynamic and static insert overwrite, base on the specifed static partition keyValue pairs.

  private def stagingDir = {
    val stagingPath = ".spark-staging-" + jobId + "/" +
    staticPartitionKVS.map(kv => "sp_" + kv._1 + "=" +
kv._2).mkString(File.separator)
    new Path(path, stagingPath)
}

For example below:

ta (c1 int, p1 int, p2 int, p3 int) partitioned by (p1, p2, p3)

insert overwrite table ta partition(p1=1,p2,p3) select ...
// stagingDir: .spark-staging-${UUID}/sp_p1=1

insert overwrite table ta partition(p1=1,p2=2,p3) select ...
// stagingDir: .spark-staging-${UUID}/sp_p1=1/sp_p2=2

insert overwrite table ta select ...
// stagingDir: .spark-staging-${UUID}

The stagingDir will be cleaned up after job finished.

Before per insert, we should check the path whose name is started with .spark-staging and find the longest path with sp_ prefix.

For two paths A and B.

If A is fully contained by B or B is fully contained by A, they can not concurrent write.

For example:

A: 'sp_p1=1/sp_p2=2' B: 'sp_p1=1' A is fully contained by B.

A: '' B: 'sp_p1=1' A contains all other paths.

A: 'sp_p1=1/sp_p2=2' B: 'sp_p1=1/sp_p2=3' A and B do not contain each other.

About whether use a UUID in staging dir, It is determined by the implementation of #25739 .

@advancedxy

In addition, this PR is dedicated to resolve the duplicat result issue originally.

I have created a new PR #25863 for duplicate result issue, hope it can make sense. Thanks.

@cloud-fan
Copy link
Contributor

Before per insert, we should check the path whose name is started with .spark-staging and find the longest path with sp_ prefix.

The problem here is that, how do you detect insert overwrite table ta select ...? .spark-staging-${UUID} always exists if there is an ongoing write.

@turboFei
Copy link
Member Author

turboFei commented Sep 20, 2019

The problem here is that, how do you detect insert overwrite table ta select ...? .spark-staging-${UUID} always exists if there is an ongoing write.

We will find a longest path with sp_ prefix.
For insert overwrite table ta select ..., its output path is .spark-staging-${UUID} and its longest path with sp_ prefix is "".
Then we can detect that there is a write operation for whole table, all other write operation for this table are not allowed.

@steveloughran
Copy link
Contributor

Is there any way to block on the commit process to ensure that exactly one can be committing at the same time, eg. some lease file with a timestamp inside whose (unexpired) presence is a sign someone else is committing? for all filesystems where create(path, overwrite=false) is atomic you can be confident when you create the file that you have the permission to commit. For others, you either need to fail, spin, or decide when it is so out of date that you could delete it and go for the lock yourself.

(I'm ignoring inconsistent storage without CRUD consistency or atomic creates, obviously)

@hkkolodner
Copy link

Thanks @steveloughran for mentioning our paper on Stocator. (This is a better reference -- Stocator: Providing High Performance and Fault Tolerance for Apache Spark Over Object Storage.)

It is better to characterize Stocator wrt commit rather than abort. Stocator only commits. The last commit wins and any parts that do not belong are ignored when the partition is read (and they can also be deleted at that time to reclaim space or space can be reclaimed by an independent garbage collection process). This can be achieved by writing a manifest at the time of commit that has a list of the parts that belong to the commit, e.g., by extending the _SUCCESS file/object. And then when the partition is read, the parts listed in the manifest of the most recent commit are read. Maybe a similar solution would be easier here, rather than trying to ensure only one commit at a time.

@steveloughran
Copy link
Contributor

@hkkolodner -thanks for the clarification.

FWIW, in apache/hadoop#1442 I am actually stripping back on enumerating all files in the _SUCCESS manifest I've been creating (for test validation only) because at a sufficiently large terasort and TCP-DS scale, you end up with memory issues in job commits, which makes my colleagues trying to do these things unhappy. I may be overreacting (it's happening at an earlier stage), but I'm just being thorough in reviewing datastructures built in a commit. And even if there's enough heap, I don't want to force a many-MB upload at the end of every query, as that becomes a bottleneck of its own.

Like I say, the post-directory-listing table layouts are the inevitable future. That directory tree has been great: tool neutral, easy to navigate by hand, etc, but it hits scale limits even in HDFS, serious perf limits in higher-latency stores and the commit-by-rename mechanism is running out of steam.

I don't have enough experience of any of the alternatives to have any strong opinions, except to conclude that yes, they are inevitable.

@github-actions
Copy link

github-actions bot commented Jan 5, 2020

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Jan 5, 2020
@turboFei turboFei closed this Jan 5, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants