[HUDI-6466] Fix spark insert overwrite partitioned table with dynamic partition#9113
[HUDI-6466] Fix spark insert overwrite partitioned table with dynamic partition#9113danny0405 merged 4 commits intoapache:masterfrom
Conversation
|
Seems a huge behavior change, may not have time for the fix for release 0.14.0, cc @boneanxs can you help for the review here? |
|
Modify of this piece of code cause many test failing due to the delete old base path logic, so just revert it and handle it in another pr |
c6127a0 to
ea59dc5
Compare
|
@hudi-bot run azure |
ea59dc5 to
573cf77
Compare
|
rebase master to fix pipeline |
573cf77 to
72e9fc3
Compare
|
@hudi-bot run azure |
|
@flashJd I noticed this issue before. Yes, this is a behavior change for Spark sql also does the same way. i.e. |
|
|
As we need the capacity to insert overwrite the whole partitioned table, why not use the config to enable it and make semantics forward compatible, meanwhile not lose the dynamic partition capacity |
|
You can still use dynamic partition, in this way: insert overwrite hudi_cow_pt_tbl partition(dt, hh) select 13, 'a13', 1100, '2021-12-09', '12'the main point is that do we consider
Also, this change can also keep the consistent behavior with spark sql, |
|
I'm confused why -- insert overwrite partitioned table with static partition |
|
How about follow the spark behavior? We should respect the spark configure: It appeals this is also how iceberg works. |
Agreed, I will check the logic to respect the spark configure |
|
|
Do you mean use hudi's own parameter to control the dynamic partition behavior |
Wondering how Iceberg handles the syncing of different engine behaviors. |
yeah, but from the engine, hudi is just a format, and the engine control deletion behavior is also reasonable. |
I only know that spark is controlled by the computing engine |
@boneanxs @KnightChess @danny0405
|
a811bc5 to
034e827
Compare
|
I looked into how iceberg and delta handle this, Iceberglike @flashJd mentioned before:
DeltaRoughly the same with Iceberg, but delta has an extra delta-configure to control this, so
I tend to favor the iceberg's behavior, since it's more clear and enough for users to control this, and the addition of a new configuration would increase the learning curve for users, especially they already are familiar with
private class HoodieV1WriteBuilder(writeOptions: CaseInsensitiveStringMap,
hoodieCatalogTable: HoodieCatalogTable,
spark: SparkSession)
extends SupportsTruncate with SupportsDynamicOverwrite with SupportsOverwrite with ProvidesHoodieConfig {
// ...
override def overwriteDynamicPartitions(): WriteBuilder = {
overwriteDynamic = true
this
}
}then you need to update the method For V1 write path |
1)SupportsDynamicOverwrite need the table capabilities BATCH_WRITE, now not support |
|
Add the ability
I mean we also need to add an option in |
Make a summary as we discussed, when come accorss an insert overwrite :
|
|
@flashJd Since we already have a conclusion, do you have intreast to fix it for release 0.14.0 ? |
Yeah, sorry to late fix it, i"ll fix it today as we have discussed |
034e827 to
5c02442
Compare
|
As the behavor is aligned with spark but not forward compatible, we should notify this change in the release of 0.14.0 |
|
@boneanxs Can you help to take a look again ~ |
boneanxs
left a comment
There was a problem hiding this comment.
Great job! Overall LGTM from my side, just 2 minor comments to refine codes
| def deduceIsOverwriteTable(sparkSession: SparkSession, | ||
| catalogTable: HoodieCatalogTable, | ||
| partitionSpec: Map[String, Option[String]]): Boolean = { | ||
| val operation = sparkSession.sqlContext.getConf(OPERATION.key, "") |
There was a problem hiding this comment.
Better use combineOptions() here to keep the sources of HUDI configure consistent with others.
| true | ||
| } else if (operation.nonEmpty && operation.equals(INSERT_OVERWRITE_OPERATION_OPT_VAL)) { | ||
| false | ||
| } else { |
There was a problem hiding this comment.
Use Scala match expression can be more clear?
operation match {
case INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL =>
true
case INSERT_OPERATION_OPT_VAL =>
false
case _ =>
// NonPartitioned table always insert overwrite whole table
if (catalogTable.partitionFields.isEmpty) {
true
} else {
// Insert overwrite partitioned table with PARTITION clause will always insert overwrite the specific partition
if (partitionSpec.nonEmpty) {
false
} else {
// If hoodie.datasource.overwrite.mode configured, respect it
val hoodieOverwriteMode = sparkSession.sqlContext.getConf(OVERWRITE_MODE.key,
sparkSession.sqlContext.getConf(PARTITION_OVERWRITE_MODE.key)).toUpperCase()
hoodieOverwriteMode match {
case "STATIC" =>
true
case "DYNAMIC" =>
false
case _ =>
throw new IllegalArgumentException("xxx")
}
}
}
}There was a problem hiding this comment.
Thanks for the review, I'll refine the code later
| false | ||
| } else { | ||
| // If hoodie.datasource.overwrite.mode configured, respect it, otherwise respect spark.sql.sources.partitionOverwriteMode | ||
| val hoodieOverwriteMode = sparkSession.sqlContext.getConf(OVERWRITE_MODE.key, |
There was a problem hiding this comment.
should be:
val hoodieOverwriteMode = combinedOpts.getOrElse(OVERWRITE_MODE.key, sparkSession.sqlContext.getConf(PARTITION_OVERWRITE_MODE.key)).toUpperCase()
1f8dd85 to
a14a553
Compare
|
@hudi-bot run azure |
|
@danny0405 @flashJd our version contains this pr,but insert overwrite table with partition also trigger overwrite table not overwrite partition |
Default behavior is overwrite table, use config to switch to overwrite partition, see tests |
Change Logs
When upgrade hudi from 0.12.2->0.13.1, I found spark's capcity of insert overwrite partitioned table with dynamic partition lost,
see #8283 (comment):
It will cause serious data problems if upgrade to 0.13.1, user will delete all data by mistake
As #7365 (comment) mentioned,
insert_overwrite_tablewill override entire table. whileinsert_overwrite_partitionwill overwrite only matching partitions.Now we can only use static partition syntax to realize
insert_overwrite_partitionsemantics.Impact
insert_overwrite_tablesemantics with partitioned table as [HUDI-5317] Fix insert overwrite table for partitioned table #7365 (comment) mentioned using a config(set hoodie.datasource.write.operation = insert_overwrite_table)Risk level (write none, low medium or high below)
Media
Documentation Update
N/A
Contributor's checklist