Skip to content

Conversation

@waywtdcc
Copy link
Contributor

@waywtdcc waywtdcc commented Nov 24, 2022

Change Logs

Fix reading data using the HoodieHiveCatalog will cause the Spark write to fail.
The original hoodiehivecatalog read the table only according to the attributes of the hive table, but not according to the hoodie.properties file, resulting in an error reading the table attributes. Changed hoodie.datasource.write hive_ style_ The partitioning attribute is inconsistent with the real attribute

Impact

Fix reading data using the HoodieHiveCatalog will cause the Spark write to fail.

Risk level (write none, low medium or high below)

low

Documentation Update

  • Fix reading data using the HoodieHiveCatalog will cause the Spark write to fail

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

parameters.put(PATH.key(), path);
Map<String, String> hoodieProps = loadFromHoodiePropertieFile(path, hiveConf);
parameters.putAll(TableOptionProperties.translateSparkTableProperties2Flink(hoodieProps));
if (!parameters.containsKey(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to put the explicit table options here ? Which function needs this config and where we miss it then ?

Copy link
Contributor

@danny0405 danny0405 Nov 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can read the table config through StreamerUtil.createMetaClient(xx).getTableConfig()

We actually get the original hive table params first here: parameters = hiveTable.getParameters(), you mean there are options missing for spark ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, after Spark created the table, there are some table properties in .hoodie/hoodie.properties, such as hoodie.datasource.write.hive_style_partitioning. These attributes are not included in the hive table attribute.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we only append hive options here, curious why this can cause error ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because spark uses the hoodie.datasource.write.hive_style_partitioning property to be true when creating hudi non-partitioned tables, and records hoodie.datasource.write.hive_style_partitioning=true in hoodie.properties. But this property is not seen in hoodiehivecatalog, and inferred It is also inferred that it is wrong, so hoodie.datasource.write.hive_style_partitioning is assigned a value of false and uploaded to hive options. As a result, spark has two different hoodie.datasource.write.hive_style_partitioning attribute values when viewing the table, thus reporting an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (!parameters.containsKey(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
Path hoodieTablePath = new Path(path);
boolean hiveStyle = Arrays.stream(FSUtils.getFs(hoodieTablePath, hiveConf).listStatus(hoodieTablePath))
.map(fileStatus -> fileStatus.getPath().getName())
.filter(f -> !f.equals(".hoodie") && !f.equals("default"))
.anyMatch(FilePathUtils::isHiveStylePartitioning);
parameters.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), String.valueOf(hiveStyle));
}

--- The params in this code are not inferred according to the hoodie.properties property file, resulting in an inference error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got you, i think we should not put extra options like FlinkOptions.HIVE_STYLE_PARTITIONING here, the right fix is to supplement the table config options for the flink read/write path, I have applied a patch actually.

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@waywtdcc waywtdcc requested a review from danny0405 November 28, 2022 08:31
@codope codope added priority:high Significant impact; potential bugs engine:flink Flink integration labels Nov 29, 2022
@codope codope changed the title [HUDI-5275][flink]Fix reading data using the HoodieHiveCatalog will cause the Spark write to fail [HUDI-5275] Fix reading data using the HoodieHiveCatalog will cause the Spark write to fail Nov 29, 2022
@codope codope added engine:spark Spark integration area:catalog Catalog integration labels Nov 29, 2022
@nsivabalan nsivabalan added priority:blocker Production down; release blocker release-0.12.2 Patches targetted for 0.12.2 and removed priority:high Significant impact; potential bugs labels Dec 5, 2022
Copy link
Member

@codope codope left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a test for this scenario?

@codope codope added priority:critical Production degraded; pipelines stalled and removed priority:blocker Production down; release blocker labels Dec 7, 2022
@waywtdcc
Copy link
Contributor Author

@danny0405 Hello, do you have any comments on this PR?

@danny0405
Copy link
Contributor

Yeah, I have reviewed and applied a path:
5275.zip

The idea to fix is: we better not add extra options like FlinkOptions.HIVE_STYLE_PARTITIONING when reading the table, a more proper way to fix the issue is to merge the table config options for reader/writer path.

Can you apply the patch and add a test case in TestHoodieTableFactory to test 2 cases (to test that the write config options always have higher priority):

  1. the table source merges the built-in table config that is not defined in write config.
  2. the table source can not override the existing write config if the option value changes.

public static Map<String, String> loadFromHoodiePropertieFile(String basePath, Configuration hadoopConf) {
Path propertiesFilePath = getHoodiePropertiesFilePath(basePath);
return getPropsFromFile(basePath, hadoopConf, propertiesFilePath);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, gentle ping :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, just saw it. Is it better to replace loadFromHoodiePropertieFile with StreamerUtil.getTableConfig here?

@danny0405
Copy link
Contributor

Because there is no response for a long time and the code freeze is very near, I worked based on the path and would land it once the CI passed: #7666

Close this one instead.

@danny0405 danny0405 closed this Jan 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:catalog Catalog integration engine:flink Flink integration engine:spark Spark integration priority:critical Production degraded; pipelines stalled release-0.12.2 Patches targetted for 0.12.2

Projects

Archived in project

Development

Successfully merging this pull request may close these issues.

5 participants