-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-5047] Add partition value in HoodieLogRecordReader when hoodie.datasource.write.drop.partition.columns=true #6986
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…datasource.write.drop.partition.columns=true
|
@xicm better to have some UT to know the effect of this changes. |
Hi @YannByron I added an UT, have a review if you are free.:) |
...asource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
Outdated
Show resolved
Hide resolved
|
@hudi-bot run azure |
| Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp())); | ||
| } | ||
| this.partitionName = partitionName; | ||
| this.partitionValues = getPartitionValues(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a good practice to call non-static methods from the ctor, since it relies implicitly on the ordering of the initialization. Let's instead make this method static and pass it all values it needs.
| if (this.hiveStylePartition) { | ||
| return Option.of(Arrays.stream(partitionValues) | ||
| .map(partition -> partition.split("=")) | ||
| .filter(partition -> partition.length == 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we filter on specific length here?
| String[] partitionFields = this.partitionFields.get(); | ||
| String[] partitionValues = this.partitionValues.get(); | ||
|
|
||
| if (partitionFields.length == partitionValues.length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition should be asserted on during init
| // Append is not supported in LocalFileSystem. HDFS needs to be setup. | ||
| Configuration conf = new Configuration(); | ||
| // lower heartbeat interval for fast recognition of DN | ||
| conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dfsBaseDir.getAbsolutePath()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we really need HDFS for this test. What's the idea behind doing it on HDFS?
|
|
||
| if (partitionFields.length == partitionValues.length) { | ||
| for (int i = 0; i < partitionValues.length; i++) { | ||
| record.put(schema.getField(partitionFields[i]).pos(), partitionValues[i]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't just blindly inject strings into record, we need to make sure we coercing it to a type of the column
|
@alexeykudinkin Thanks for your advice. I updated the pr, review again when you are free. |
alexeykudinkin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing the feedback @xicm!
What i've been thinking about in the background is that ideally we'd want to leverage the logic already implemented in SparkParsePartitionUtil for ex by making it more generic )(ie engine-agnostic).
| switch (newSchema.getType()) { | ||
| case NULL: | ||
| case BOOLEAN: | ||
| if (oldSchema.getType() == Schema.Type.STRING) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Appreciate your approach of extending existing functionality, but i think in that case we actually shouldn't be doing that -- conversions we're doing here aren't "canonical" (there's no permitted conversion from string to int, for ex in Spark) and as such, we'd rather keep this conversion limited in scope to be applicable only to partition values where it's both inevitable and is actually sensible (since we know that we're not trying to convert some random string to int, but the int that was previously converted to a string)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also i'd suggest you to check out Spark3ParsePartitionUtil
|
Hi @alexeykudinkin, Please review when you are available. If there are unreasonable implementations, correct me, I will fix. Thanks. :) |
|
@hudi-bot run azure |
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's avoid copying the whole ctor. Instead, generalize existing one and redirect this one to it.
| // Use scanV2 method. | ||
| private boolean useScanV2 = false; | ||
|
|
||
| private Object[] partitionValues; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's create an object akin to PartitionPath we have in Spark
|
|
||
| val partitionedBaseFile = baseFile.map { file => | ||
| val filePath = getFilePath(file.getFileStatus.getPath) | ||
| PartitionedFile(getPartitionColumnsAsInternalRow(file.getFileStatus), filePath, 0, file.getFileLen) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also update getPartitionColumnsAsInternalRow to reuse parsePartitionColumnValues instead
| } | ||
|
|
||
| HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles) | ||
| if (shouldExtractPartitionValuesFromPartitionPath && logFiles.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we actually need this -- log-files always co-located w/ base-file and as such we can extract partition-values from there
| logRecordScannerBuilder.withPartition( | ||
| getRelativePartitionPath(new Path(tableState.tablePath), logFiles.head.getPath.getParent)) | ||
|
|
||
| if (logFiles.head.isInstanceOf[HoodieLogFileWithPartition]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check my comment below
|
|
||
| package org.apache.hudi.common.model; | ||
|
|
||
| public class HoodieLogFileWithPartition extends HoodieLogFile { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check my comment below regarding this class
|
The issue of log reader is caused by setting "hoodie.datasource.write.drop.partition.columns=true", and the purpose of setting "hoodie.datasource.write.drop.partition.columns=true" is to solve the problem of hive query. |
Change Logs
If hoodie.datasource.write.drop.partition.columns=true, the log reader will not add partition values to record. The query with partition in where clause will return empty.
This pr adds the partition extracted from file path to the record.
Impact
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
Risk level (write none, low medium or high below)
low
If medium or high, explain what verification was done to mitigate the risks.
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist