diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java index 24193850fa2c9..87a1528e0ffd8 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitManager.java @@ -33,6 +33,7 @@ import com.facebook.presto.hive.metastore.Partition; import com.facebook.presto.hive.metastore.PartitionStatistics; import com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore; +import com.facebook.presto.hive.metastore.StorageFormat; import com.facebook.presto.hive.metastore.Table; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorSession; @@ -55,7 +56,10 @@ import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import io.airlift.units.DataSize; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hudi.hadoop.HoodieParquetInputFormat; +import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; import org.weakref.jmx.Managed; import org.weakref.jmx.Nested; @@ -385,7 +389,18 @@ private Iterable getPartitionMetadata( } } - Optional storageFormat = getHiveStorageFormat(table.getStorage().getStorageFormat()); + StorageFormat storageFormat = table.getStorage().getStorageFormat(); + Optional hiveStorageFormat = getHiveStorageFormat(storageFormat); + + Optional resolvedHiveStorageFormat; + + if (isUseParquetColumnNames(session)) { + // Use Hive Storage Format as Parquet if table is of HUDI format + resolvedHiveStorageFormat = (!hiveStorageFormat.isPresent() && isHudiFormat(storageFormat)) ? Optional.of(PARQUET) : hiveStorageFormat; + } + else { + resolvedHiveStorageFormat = hiveStorageFormat; + } Iterable> partitionNameBatches = partitionExponentially(hivePartitions, minPartitionBatchSize, maxPartitionBatchSize); Iterable> partitionBatches = transform(partitionNameBatches, partitionBatch -> { @@ -448,7 +463,7 @@ private Iterable getPartitionMetadata( if ((tableColumns == null) || (partitionColumns == null)) { throw new PrestoException(HIVE_INVALID_METADATA, format("Table '%s' or partition '%s' has null columns", tableName, partitionName)); } - TableToPartitionMapping tableToPartitionMapping = getTableToPartitionMapping(session, storageFormat, tableName, partitionName, tableColumns, partitionColumns); + TableToPartitionMapping tableToPartitionMapping = getTableToPartitionMapping(session, resolvedHiveStorageFormat, tableName, partitionName, tableColumns, partitionColumns); if (hiveBucketHandle.isPresent() && !hiveBucketHandle.get().isVirtuallyBucketed()) { Optional partitionBucketProperty = partition.getStorage().getBucketProperty(); @@ -603,6 +618,21 @@ private PrestoException tablePartitionColumnMismatchException(SchemaTableName ta partitionType)); } + /** + * This method is used to check if a table is of HUDI format + * + * @param storageFormat Table Storage Format + * @return true if table is of HUDI format, else false + */ + private boolean isHudiFormat(StorageFormat storageFormat) + { + String serde = storageFormat.getSerDeNullable(); + String inputFormat = storageFormat.getInputFormatNullable(); + return serde != null && serde.equals(ParquetHiveSerDe.class.getName()) + && (inputFormat != null && (inputFormat.equals(HoodieParquetInputFormat.class.getName()) + || inputFormat.equals(HoodieParquetRealtimeInputFormat.class.getName()))); + } + private Map getPartitionSplitInfo( ConnectorSession session, SemiTransactionalHiveMetastore metastore,