-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Add support for partition schema evolution for HUDI tables #16348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,7 @@ | |
| import com.facebook.presto.hive.metastore.Partition; | ||
| import com.facebook.presto.hive.metastore.PartitionStatistics; | ||
| import com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore; | ||
| import com.facebook.presto.hive.metastore.StorageFormat; | ||
| import com.facebook.presto.hive.metastore.Table; | ||
| import com.facebook.presto.spi.ColumnHandle; | ||
| import com.facebook.presto.spi.ConnectorSession; | ||
|
|
@@ -55,7 +56,10 @@ | |
| import com.google.common.collect.Lists; | ||
| import com.google.common.collect.Ordering; | ||
| import io.airlift.units.DataSize; | ||
| import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; | ||
| import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; | ||
| import org.apache.hudi.hadoop.HoodieParquetInputFormat; | ||
| import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; | ||
| import org.weakref.jmx.Managed; | ||
| import org.weakref.jmx.Nested; | ||
|
|
||
|
|
@@ -385,7 +389,18 @@ private Iterable<HivePartitionMetadata> getPartitionMetadata( | |
| } | ||
| } | ||
|
|
||
| Optional<HiveStorageFormat> storageFormat = getHiveStorageFormat(table.getStorage().getStorageFormat()); | ||
| StorageFormat storageFormat = table.getStorage().getStorageFormat(); | ||
| Optional<HiveStorageFormat> hiveStorageFormat = getHiveStorageFormat(storageFormat); | ||
|
|
||
| Optional<HiveStorageFormat> resolvedHiveStorageFormat; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about:
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zhenxiao The variable Due to this reason I had to use an else block. Let me know if you think I can improve it some other way.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I might miss something. which lambda function is used for |
||
|
|
||
| if (isUseParquetColumnNames(session)) { | ||
| // Use Hive Storage Format as Parquet if table is of HUDI format | ||
| resolvedHiveStorageFormat = (!hiveStorageFormat.isPresent() && isHudiFormat(storageFormat)) ? Optional.of(PARQUET) : hiveStorageFormat; | ||
| } | ||
| else { | ||
| resolvedHiveStorageFormat = hiveStorageFormat; | ||
| } | ||
|
|
||
| Iterable<List<HivePartition>> partitionNameBatches = partitionExponentially(hivePartitions, minPartitionBatchSize, maxPartitionBatchSize); | ||
| Iterable<List<HivePartitionMetadata>> partitionBatches = transform(partitionNameBatches, partitionBatch -> { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @zhenxiao I was talking about this. You are right that resolvedHiveStorageFormat is only being used in the method getTableToPartitionMapping but that method is being called from this lambda function. The call is on the line 466 in the updated code.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. get it. you are correct |
||
|
|
@@ -448,7 +463,7 @@ private Iterable<HivePartitionMetadata> getPartitionMetadata( | |
| if ((tableColumns == null) || (partitionColumns == null)) { | ||
| throw new PrestoException(HIVE_INVALID_METADATA, format("Table '%s' or partition '%s' has null columns", tableName, partitionName)); | ||
| } | ||
| TableToPartitionMapping tableToPartitionMapping = getTableToPartitionMapping(session, storageFormat, tableName, partitionName, tableColumns, partitionColumns); | ||
| TableToPartitionMapping tableToPartitionMapping = getTableToPartitionMapping(session, resolvedHiveStorageFormat, tableName, partitionName, tableColumns, partitionColumns); | ||
|
|
||
| if (hiveBucketHandle.isPresent() && !hiveBucketHandle.get().isVirtuallyBucketed()) { | ||
| Optional<HiveBucketProperty> partitionBucketProperty = partition.getStorage().getBucketProperty(); | ||
|
|
@@ -603,6 +618,21 @@ private PrestoException tablePartitionColumnMismatchException(SchemaTableName ta | |
| partitionType)); | ||
| } | ||
|
|
||
| /** | ||
| * This method is used to check if a table is of HUDI format | ||
| * | ||
| * @param storageFormat Table Storage Format | ||
| * @return true if table is of HUDI format, else false | ||
| */ | ||
| private boolean isHudiFormat(StorageFormat storageFormat) | ||
| { | ||
| String serde = storageFormat.getSerDeNullable(); | ||
| String inputFormat = storageFormat.getInputFormatNullable(); | ||
| return serde != null && serde.equals(ParquetHiveSerDe.class.getName()) | ||
| && (inputFormat != null && (inputFormat.equals(HoodieParquetInputFormat.class.getName()) | ||
| || inputFormat.equals(HoodieParquetRealtimeInputFormat.class.getName()))); | ||
| } | ||
|
|
||
| private Map<String, PartitionSplitInfo> getPartitionSplitInfo( | ||
| ConnectorSession session, | ||
| SemiTransactionalHiveMetastore metastore, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we check session property or configuration here? according to the release note:
This is enabled when configuration property
hive.parquet.use-column-namesor the hive catalog session propertyparquet_use_column_namesis set to true. By default they are mapped by index.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhenxiao Line 392 and Line 393 are required in any case. Only next couple of lines are not mandatory if
hive.parquet.use-column-namesis not set to true. But in any case value of this variable is only used in one methodgetTableToPartitionMappingwhich has the required check at line 528.Anyways, I have added a check here as well, let me know if you feel it is not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks nice