-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-6315] Feature flag for disabling prepped merge. #9203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-6315] Feature flag for disabling prepped merge. #9203
Conversation
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets also parametrize some tests for MIT as well
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java
Outdated
Show resolved
Hide resolved
|
We can set ENABLE_PREPPED_MERGE_WRITES in MergeIntoHoodieTableCommand using hoodie.spark.sql.writes.optimized.enable. We don't need to introduce another public config that way |
Parameterized test case to test with both
Moved |
|
@hudi-bot run azure |
|
@hudi-bot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized we have two diff configs across MIT and UPDATE/DELETEs. not a good user exp imo. lets unfiy them.
So, we will have one config "hoodie.spark.sql.optimized.writes.enable" in HoodieWriteConfig for users to enable to disable the optimized flow. default value is true.
But internally, we can use two diff internal configs, one for MIT (_hoodie.datasource.merge.into.prepped) and one for UPDATE and DELETEs (_hoodie.datasource.writes.prepped).
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
Show resolved
Hide resolved
| .withDocumentation("Controls whether spark sql optimized update is enabled.") | ||
| .withDocumentation("Controls whether spark sql prepped update and delete is enabled.") | ||
|
|
||
| val ENABLE_OPTIMIZED_SQL_MERGE_WRITES: ConfigProperty[String] = ConfigProperty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also thinking, from a user standpoint we should have just 1 config to enable or disable the optimized flow (irrespective of whether its mIT or updates or deletes).
but internally we can use diff configs if we wish to differentiate MIT and rest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
| + "The class must be a subclass of `org.apache.hudi.callback.HoodieClientInitCallback`." | ||
| + "By default, no Hudi client init callback is executed."); | ||
|
|
||
| public static final String WRITE_PREPPED_MERGE_KEY = "_hoodie.datasource.merge.into.prepped"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add java docs calling out the purpose of this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry. lets name this
_hoodie.spark.sql.merge.into.prepped
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets name the variable
SPARK_SQL_MERGE_INTO_PREPPED
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
Show resolved
Hide resolved
| val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY, | ||
| optParams.getOrDefault(SQL_MERGE_INTO_WRITES.key(), SQL_MERGE_INTO_WRITES.defaultValue().toString)) | ||
| .equalsIgnoreCase("true")) { | ||
| val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY, "false").toBoolean || optParams.getOrDefault(WRITE_PREPPED_MERGE_KEY, "false").toBoolean) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets also rename the variable
DATASOURCE_WRITE_PREPPED_KEY to
SPARK_SQL_WRITE_PREPPED_KEY
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and the config key be
_hoodie.spark.sql.writes.prepped
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
| test("Test pkless complex merge cond") { | ||
| withRecordType()(withTempDir { tmp => | ||
| spark.sql("set hoodie.payload.combined.schema.validate = true") | ||
| spark.sql("set hoodie.spark.sql.optimized.merge.enable=true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we need to fix all these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
|
@hudi-bot run azure |
| public static HoodieIndex createIndex(HoodieWriteConfig config) { | ||
| boolean mergeIntoWrites = config.getProps().getBoolean(HoodieInternalConfig.SQL_MERGE_INTO_WRITES.key(), | ||
| HoodieInternalConfig.SQL_MERGE_INTO_WRITES.defaultValue()); | ||
| boolean mergeIntoWrites = config.getProps().getBoolean(HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets align the var name w/ the config.
boolean sqlMergeIntoPrepped
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced all instances of mergeIntoWrites to sqlMergeIntoPrepped (except for the original reference in deduceWriterSchema)
| * Config key with boolean value that indicates whether record being written is already prepped. | ||
| */ | ||
| val DATASOURCE_WRITE_PREPPED_KEY = "_hoodie.datasource.write.prepped"; | ||
| val SPARK_SQL_WRITE_PREPPED_KEY = "_hoodie.spark.sql.writes.prepped"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor
"SPARK_SQL_WRITES_PREPPED_KEY"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
| val mergeIntoWrites = parameters.getOrDefault(SQL_MERGE_INTO_WRITES.key(), | ||
| SQL_MERGE_INTO_WRITES.defaultValue.toString).toBoolean | ||
| val isPrepped = hoodieConfig.getBooleanOrDefault(SPARK_SQL_WRITE_PREPPED_KEY, false) | ||
| val mergeIntoWrites = parameters.getOrDefault(SPARK_SQL_MERGE_INTO_PREPPED_KEY, "false").toBoolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets fix the var name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
| hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS) | ||
| hoodieConfig.setDefaultValue(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED) | ||
| hoodieConfig.setDefaultValue(ENABLE_OPTIMIZED_SQL_WRITES) | ||
| hoodieConfig.setDefaultValue(SPARK_SQL_OPTIMIZED_WRITES) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets check if this is really required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. From what I can see it is not really needed.
...udi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
Show resolved
Hide resolved
| .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true") | ||
| .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true") | ||
| .option(DataSourceWriteOptions.ENABLE_OPTIMIZED_SQL_WRITES().key(), "true") | ||
| .option(DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES().key(), "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for defaults, lets try to use the actual default instead of hard coding it
DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES().default()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
|
|
||
| // test with optimized sql writes enabled / disabled. | ||
| spark.sql(s"set hoodie.spark.sql.writes.optimized.enable=$optimizedSqlEnabled") | ||
| spark.sql(s"set hoodie.spark.sql.optimized.writes.enable=$optimizedSqlEnabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets see if we can use the variable to avoid any mis-steps
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
Show resolved
Hide resolved
...udi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
Show resolved
Hide resolved
|
@hudi-bot run azure |
|
CI failed due to known flaky tests. going ahead w/ merging |
Change Logs
Add user-defined feature flag for disabling prepped merge.
Impact
New feature flag
ENABLE_OPTIMIZED_MERGE_WRITESRisk level (write none, low medium or high below)
Low
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist