Skip to content

Conversation

@advancedxy
Copy link
Contributor

@advancedxy advancedxy commented Sep 10, 2019

What changes were proposed in this pull request?

This commit enables concurrent writes to different partitions in the same table with dynamicPartitionOverwrite enabled. Currently Spark uses table's location as output when writing to the table, which would conflict each other(multiple OutputCommitters operating on the same output dir) when writing to the same table concurrently. In this commit, we set OutputCommitter's output to stagingDir to avoid collision when dynamicPartitionOverwrite is enabled.

Why are the changes needed?

This is an improvement of user case.

Does this PR introduce any user-facing change?

Yes, users can expect success concurrent write to the same table

How was this patch tested?

Added two tests and existing tests for regression.

…ferent

partitions in the same table. This commit only enables concurrent writes
with dynamicPartitionOverwrite enabled.
@advancedxy
Copy link
Contributor Author

I labeled this as [WIP] as I think we can also enable concurrent writes to the same table with dynamicPartitionOverwrite disabled. When we are writing to table dynamically in the strict mode, we can support concurrent writes to different static partitions. However that would require more changes and I'd like to know others opinions.

cc @cloud-fan @koertkuipers

@dongjoon-hyun
Copy link
Member

ok to test

@dongjoon-hyun dongjoon-hyun changed the title [WIP][SPARK-28945] support concurrent dynamic partition writes to different partitions in the same table [WIP][SPARK-28945][CORE][SQL] Support concurrent dynamic partition writes to different partitions in the same table Sep 10, 2019
@SparkQA
Copy link

SparkQA commented Sep 10, 2019

Test build #110389 has finished for PR 25739 at commit 1de7d30.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@advancedxy
Copy link
Contributor Author

Test build #110389 has finished for PR 25739 at commit 1de7d30.
This patch fails Spark unit tests.
This patch merges cleanly.
This patch adds no public classes.

Looks unrelated...

@koertkuipers
Copy link
Contributor

I labeled this as [WIP] as I think we can also enable concurrent writes to the same table with dynamicPartitionOverwrite disabled. When we are writing to table dynamically in the strict mode, we can support concurrent writes to different static partitions. However that would require more changes and I'd like to know others opinions.

cc @cloud-fan @koertkuipers

what would concurrent writes to the same table with dynamicPartitionOverwrite disabled look like? i have a hard time coming up with a useful example of this.

@advancedxy
Copy link
Contributor Author

what would concurrent writes to the same table with dynamicPartitionOverwrite disabled look like? i have a hard time coming up with a useful example of this.

Suppose we have a table with partition columns (day, hour, action). It would be useful to support
concurrent writes to Partition(day=20190910, hour=01, action), Partition(day=20190910, hour=02, action) and etc. Or to Partitions: Partition(day=20190910, hour, action), Partition(day=20190909, hour, action) and etc.

The concurrent write would be succeeded as long as the static partitions share the same size.

@koertkuipers
Copy link
Contributor

I labeled this as [WIP] as I think we can also enable concurrent writes to the same table with dynamicPartitionOverwrite disabled. When we are writing to table dynamically in the strict mode, we can support concurrent writes to different static partitions. However that would require more changes and I'd like to know others opinions.

cc @cloud-fan @koertkuipers

what would concurrent writes to the same table with dynamicPartitionOverwrite disabled look like? i have a hard time coming up with a useful example of this.

what would concurrent writes to the same table with dynamicPartitionOverwrite disabled look like? i have a hard time coming up with a useful example of this.

Suppose we have a table with partition columns (day, hour, action). It would be useful to support
concurrent writes to Partition(day=20190910, hour=01, action), Partition(day=20190910, hour=02, action) and etc. Or to Partitions: Partition(day=20190910, hour, action), Partition(day=20190909, hour, action) and etc.

The concurrent write would be succeeded as long as the static partitions share the same size.

my understanding is that currently if you don't have dynamic partition overwrite enabled it will always delete all partitions before writing. i don't see concurrency being useful in this situation.

the example you give sounds interesting to me but its inconsistent with how i currently know static partition overwrite to function. would this be a new feature?

@advancedxy
Copy link
Contributor Author

my understanding is that currently if you don't have dynamic partition overwrite enabled it will always delete all partitions before writing.

IIRC, dynamic partition writing without dynamicPartitionOverwrite enabled, Spark will delete matching partitions(partitions prefixed by the static partition), not all partitions.

For example, when insert overwrite Partition(day=20190910, hour=01, action), Spark will first delete all the partitions under $some/path/to/table/location/day=20190910/hour=01

@koertkuipers
Copy link
Contributor

my understanding is that currently if you don't have dynamic partition overwrite enabled it will always delete all partitions before writing.

IIRC, dynamic partition writing without dynamicPartitionOverwrite enabled, Spark will delete matching partitions(partitions prefixed by the static partition), not all partitions.

For example, when insert overwrite Partition(day=20190910, hour=01, action), Spark will first delete all the partitions under $some/path/to/table/location/day=20190910/hour=01

when i need to overwrite a particular partition in a filesource such as parquet i will write directly to the partition path, e.g.:

df.write.format("parquet").save("some/path/to/table/location/day=20190910/hour=01")

but in that case concurrency already works, as the writers do not use the same baseDir.

i was not aware that there is an alternative syntax or way of doing this without dynamic partition overwrite. sorry for the confusion.

@advancedxy
Copy link
Contributor Author

i was not aware that there is an alternative syntax or way of doing this without dynamic partition overwrite. sorry for the confusion.

No worries. Consider the sql way of dynamic partition insertion:

insert overwrite table t partition(day='20190901', hour='01', action) 
select xxx, action from src

@koertkuipers
Copy link
Contributor

i was not aware that there is an alternative syntax or way of doing this without dynamic partition overwrite. sorry for the confusion.

No worries. Consider the sql way of dynamic partition insertion:

insert overwrite table t partition(day='20190901', hour='01', action) 
select xxx, action from src

ah ok we dont use sql syntax at all so thats why i was not aware of it

@koertkuipers
Copy link
Contributor

i was not aware that there is an alternative syntax or way of doing this without dynamic partition overwrite. sorry for the confusion.

No worries. Consider the sql way of dynamic partition insertion:

insert overwrite table t partition(day='20190901', hour='01', action) 
select xxx, action from src

ah ok we dont use sql syntax at all so thats why i was not aware of it

since i dont use this feature of overwriting partitions using static overwrite mode i do not have an opinion on it. however i am excited about dynamic partition overwrite with concurrent writers. thanks for this!

@advancedxy
Copy link
Contributor Author

Ping @cloud-fan, what do you think about concurrent writes to the same table with dynamicPartitionOverwrite disabled?

  1. Added in this pr
  2. Added in another pr
  3. neutral, nice to have(In my opinion, dynamicPartitionOverwrite is more reasonable, and should be the default behaviour in Spark 3.0)

}

protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
// set output path to stagingDir to avoid potential collision of multiple concurrent write tasks
Copy link
Contributor

@cloud-fan cloud-fan Sep 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when dynamicPartitionOverwrite=true, we already write files to the staging dir, see newTaskTempFile.

In fact, I don't see how the committer is related to the staging dir. If you look at commitTask and commitJob, we kind of manually commit the files in the staging dir, by moving it to the table dir.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, I don't see how the committer is related to the staging dir. If you look at commitTask and commitJob, we kind of manually commit the files in the staging dir, by moving it to the table dir.

Yes, we manually commit files in the staging dir. The problem is in the HadoopMapReduceCommitProtocol's commitJob calls, it first calls committer.commitJob(jobContext), which relates to the output path passes to the JobContext.

override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
committer.commitJob(jobContext)
if (hasValidPath) {
val (allAbsPathFiles, allPartitionPaths) =
taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _)

The OutputCommitter cannot work correctly if multiple OutputCommitter working on the same output path( concurrent writes to different partition to the same table, as the output would be the same: the table output location). After changing the output path to the staging dir, concurrent jobs can have different output dirs.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

private def stagingDir = new Path(path, ".spark-staging-" + jobId)

/**
* Get the desired output path for the job. The output will be [[path]] when
Copy link
Contributor

@cloud-fan cloud-fan Sep 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output will be [[path]] what does path mean here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the path is defined in the class parameter, and the comment for that is:

 * @param jobId the job's or stage's id
 * @param path the job's output path, or null if committer acts as a noop
 * @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime
 *                                  dynamically, i.e., we first write files under a staging
 *                                  directory with partition path, e.g.
 *                                  /path/to/staging/a=1/b=1/xxx.parquet. When committing the job,
 *                                  we first clean up the corresponding partition directories at
 *                                  destination path, e.g. /path/to/destination/a=1/b=1, and move
 *                                  files from staging directory to the corresponding partition
 *                                  directories under destination path.

@advancedxy
Copy link
Contributor Author

I am closing this in favour of #25863, thanks for @turboFei's excellent job.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants