diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index c1ebef7bb6110..4b747d3a77c00 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -1465,7 +1465,7 @@ protected final HoodieTable initTable(WriteOperationType operationType, Option PARTITION_METAFILE_USE_BASE_FORMAT = ConfigProperty .key("hoodie.partition.metafile.use.base.format") .defaultValue(false) - .withDocumentation("If true, partition metafiles are saved in the same format as basefiles for this dataset (e.g. Parquet / ORC). " + .withDocumentation("If true, partition metafiles are saved in the same format as base-files for this dataset (e.g. Parquet / ORC). " + "If false (default) partition metafiles are saved as properties files."); public static final ConfigProperty DROP_PARTITION_COLUMNS = ConfigProperty diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index a65be689a498c..251a990d87c04 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -30,7 +30,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTimelineTimeZone; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -377,16 +376,15 @@ public HoodieArchivedTimeline getArchivedTimeline(String startTs) { /** * Validate table properties. * @param properties Properties from writeConfig. - * @param operationType operation type to be executed. */ - public void validateTableProperties(Properties properties, WriteOperationType operationType) { - // once meta fields are disabled, it cant be re-enabled for a given table. + public void validateTableProperties(Properties properties) { + // Once meta fields are disabled, it cant be re-enabled for a given table. if (!getTableConfig().populateMetaFields() && Boolean.parseBoolean((String) properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))) { throw new HoodieException(HoodieTableConfig.POPULATE_META_FIELDS.key() + " already disabled for the table. Can't be re-enabled back"); } - // meta fields can be disabled only with SimpleKeyGenerator + // Meta fields can be disabled only when {@code SimpleKeyGenerator} is used if (!getTableConfig().populateMetaFields() && !properties.getProperty(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.SimpleKeyGenerator") .equals("org.apache.hudi.keygen.SimpleKeyGenerator")) { @@ -698,7 +696,7 @@ public static class PropertyBuilder { private Boolean urlEncodePartitioning; private HoodieTimelineTimeZone commitTimeZone; private Boolean partitionMetafileUseBaseFormat; - private Boolean dropPartitionColumnsWhenWrite; + private Boolean shouldDropPartitionColumns; private String metadataPartitions; private String inflightMetadataPartitions; @@ -820,8 +818,8 @@ public PropertyBuilder setPartitionMetafileUseBaseFormat(Boolean useBaseFormat) return this; } - public PropertyBuilder setDropPartitionColumnsWhenWrite(Boolean dropPartitionColumnsWhenWrite) { - this.dropPartitionColumnsWhenWrite = dropPartitionColumnsWhenWrite; + public PropertyBuilder setShouldDropPartitionColumns(Boolean shouldDropPartitionColumns) { + this.shouldDropPartitionColumns = shouldDropPartitionColumns; return this; } @@ -933,15 +931,12 @@ public PropertyBuilder fromProperties(Properties properties) { if (hoodieConfig.contains(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT)) { setPartitionMetafileUseBaseFormat(hoodieConfig.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT)); } - if (hoodieConfig.contains(HoodieTableConfig.DROP_PARTITION_COLUMNS)) { - setDropPartitionColumnsWhenWrite(hoodieConfig.getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS)); + setShouldDropPartitionColumns(hoodieConfig.getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS)); } - if (hoodieConfig.contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS)) { setMetadataPartitions(hoodieConfig.getString(HoodieTableConfig.TABLE_METADATA_PARTITIONS)); } - if (hoodieConfig.contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT)) { setInflightMetadataPartitions(hoodieConfig.getString(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT)); } @@ -1026,15 +1021,12 @@ public Properties build() { if (null != partitionMetafileUseBaseFormat) { tableConfig.setValue(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT, partitionMetafileUseBaseFormat.toString()); } - - if (null != dropPartitionColumnsWhenWrite) { - tableConfig.setValue(HoodieTableConfig.DROP_PARTITION_COLUMNS, Boolean.toString(dropPartitionColumnsWhenWrite)); + if (null != shouldDropPartitionColumns) { + tableConfig.setValue(HoodieTableConfig.DROP_PARTITION_COLUMNS, Boolean.toString(shouldDropPartitionColumns)); } - if (null != metadataPartitions) { tableConfig.setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS, metadataPartitions); } - if (null != inflightMetadataPartitions) { tableConfig.setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT, inflightMetadataPartitions); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java index 7ca62d1520957..5aa82642de62e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java @@ -66,7 +66,7 @@ public DataSourceInternalWriterHelper(String instantTime, HoodieWriteConfig writ writeClient.startCommitWithTime(instantTime); this.metaClient = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build(); - this.metaClient.validateTableProperties(writeConfig.getProps(), WriteOperationType.BULK_INSERT); + this.metaClient.validateTableProperties(writeConfig.getProps()); this.hoodieTable = HoodieSparkTable.create(writeConfig, new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), metaClient); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index 5414a228c7317..3c667d2b427e7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -114,21 +114,21 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, * rule; you can find more details in HUDI-3896) */ def toHadoopFsRelation: HadoopFsRelation = { - // We're delegating to Spark to append partition values to every row only in cases - // when these corresponding partition-values are not persisted w/in the data file itself - val shouldAppendPartitionColumns = shouldOmitPartitionColumns - - val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match { - case HoodieFileFormat.PARQUET => - (sparkAdapter.createHoodieParquetFileFormat(shouldAppendPartitionColumns).get, HoodieParquetFileFormat.FILE_FORMAT_ID) - case HoodieFileFormat.ORC => (new OrcFileFormat, "orc") - } + val (tableFileFormat, formatClassName) = + metaClient.getTableConfig.getBaseFileFormat match { + case HoodieFileFormat.ORC => (new OrcFileFormat, "orc") + case HoodieFileFormat.PARQUET => + // We're delegating to Spark to append partition values to every row only in cases + // when these corresponding partition-values are not persisted w/in the data file itself + val parquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get + (parquetFileFormat, HoodieParquetFileFormat.FILE_FORMAT_ID) + } if (globPaths.isEmpty) { // NOTE: There are currently 2 ways partition values could be fetched: // - Source columns (producing the values used for physical partitioning) will be read // from the data file - // - Values parsed from the actual partition pat would be appended to the final dataset + // - Values parsed from the actual partition path would be appended to the final dataset // // In the former case, we don't need to provide the partition-schema to the relation, // therefore we simply stub it w/ empty schema and use full table-schema as the one being @@ -136,7 +136,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, // // In the latter, we have to specify proper partition schema as well as "data"-schema, essentially // being a table-schema with all partition columns stripped out - val (partitionSchema, dataSchema) = if (shouldAppendPartitionColumns) { + val (partitionSchema, dataSchema) = if (shouldExtractPartitionValuesFromPartitionPath) { (fileIndex.partitionSchema, fileIndex.dataSchema) } else { (StructType(Nil), tableStructSchema) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 30bbaeceb1b9a..0d4c7cf184ddc 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -18,14 +18,16 @@ package org.apache.hudi import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL} +import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.common.config.{ConfigProperty, HoodieConfig} import org.apache.hudi.common.fs.ConsistencyGuardConfig import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.util.Option +import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig} import org.apache.hudi.hive.util.ConfigUtils -import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool, MultiPartKeysValueExtractor, NonPartitionedExtractor, SlashEncodedDayPartitionValueExtractor} +import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.hudi.sync.common.HoodieSyncConfig @@ -45,6 +47,7 @@ import scala.language.implicitConversions * Options supported for reading hoodie tables. */ object DataSourceReadOptions { + import DataSourceOptionsHelper._ val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot" val QUERY_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized" @@ -124,6 +127,15 @@ object DataSourceReadOptions { .withDocumentation("Enables data-skipping allowing queries to leverage indexes to reduce the search space by " + "skipping over files") + val EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH: ConfigProperty[Boolean] = + ConfigProperty.key("hoodie.datasource.read.extract.partition.values.from.path") + .defaultValue(false) + .sinceVersion("0.11.0") + .withDocumentation("When set to true, values for partition columns (partition values) will be extracted" + + " from physical partition path (default Spark behavior). When set to false partition values will be" + + " read from the data file (in Hudi partition columns are persisted by default)." + + " This config is a fallback allowing to preserve existing behavior, and should not be used otherwise.") + val INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.read.incr.fallback.fulltablescan.enable") .defaultValue("false") @@ -185,6 +197,8 @@ object DataSourceReadOptions { */ object DataSourceWriteOptions { + import DataSourceOptionsHelper._ + val BULK_INSERT_OPERATION_OPT_VAL = WriteOperationType.BULK_INSERT.value val INSERT_OPERATION_OPT_VAL = WriteOperationType.INSERT.value val UPSERT_OPERATION_OPT_VAL = WriteOperationType.UPSERT.value @@ -471,10 +485,7 @@ object DataSourceWriteOptions { .sinceVersion("0.9.0") .withDocumentation("This class is used by kafka client to deserialize the records") - val DROP_PARTITION_COLUMNS: ConfigProperty[Boolean] = ConfigProperty - .key(HoodieTableConfig.DROP_PARTITION_COLUMNS.key()) - .defaultValue(HoodieTableConfig.DROP_PARTITION_COLUMNS.defaultValue().booleanValue()) - .withDocumentation(HoodieTableConfig.DROP_PARTITION_COLUMNS.doc()) + val DROP_PARTITION_COLUMNS: ConfigProperty[Boolean] = HoodieTableConfig.DROP_PARTITION_COLUMNS /** @deprecated Use {@link HIVE_ASSUME_DATE_PARTITION} and its methods instead */ @Deprecated @@ -774,4 +785,23 @@ object DataSourceOptionsHelper { override def apply (input: From): To = function (input) } } + + implicit def convert[T, U](prop: ConfigProperty[T])(implicit converter: T => U): ConfigProperty[U] = { + checkState(prop.hasDefaultValue) + var newProp: ConfigProperty[U] = ConfigProperty.key(prop.key()) + .defaultValue(converter(prop.defaultValue())) + .withDocumentation(prop.doc()) + .withAlternatives(prop.getAlternatives.asScala: _*) + + newProp = toScalaOption(prop.getSinceVersion) match { + case Some(version) => newProp.sinceVersion(version) + case None => newProp + } + newProp = toScalaOption(prop.getDeprecatedVersion) match { + case Some(version) => newProp.deprecatedAfter(version) + case None => newProp + } + + newProp + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 2fd1da595036f..f776d08ec9827 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -149,8 +149,36 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty) - protected val shouldOmitPartitionColumns: Boolean = - metaClient.getTableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty + /** + * Controls whether partition values (ie values of partition columns) should be + *
    + *
  1. Extracted from partition path and appended to individual rows read from the data file (we + * delegate this to Spark's [[ParquetFileFormat]])
  2. + *
  3. Read from the data-file as is (by default Hudi persists all columns including partition ones)
  4. + *
+ * + * This flag is only be relevant in conjunction with the usage of [["hoodie.datasource.write.drop.partition.columns"]] + * config, when Hudi will NOT be persisting partition columns in the data file, and therefore values for + * such partition columns (ie "partition values") will have to be parsed from the partition path, and appended + * to every row only in the fetched dataset. + * + * NOTE: Partition values extracted from partition path might be deviating from the values of the original + * partition columns: for ex, if originally as partition column was used column [[ts]] bearing epoch + * timestamp, which was used by [[TimestampBasedKeyGenerator]] to generate partition path of the format + * [["yyyy/mm/dd"]], appended partition value would bear the format verbatim as it was used in the + * partition path, meaning that string value of "2022/01/01" will be appended, and not its original + * representation + */ + protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = { + // Controls whether partition columns (which are the source for the partition path values) should + // be omitted from persistence in the data files. On the read path it affects whether partition values (values + // of partition columns) will be read from the data file ot extracted from partition path + val shouldOmitPartitionColumns = metaClient.getTableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty + val shouldExtractPartitionValueFromPath = + optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key, + DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean + shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath + } /** * NOTE: PLEASE READ THIS CAREFULLY @@ -228,7 +256,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, val fileSplits = collectFileSplits(partitionFilters, dataFilters) - val tableAvroSchemaStr = if (internalSchema.isEmptySchema) tableAvroSchema.toString else AvroInternalSchemaConverter.convert(internalSchema, tableAvroSchema.getName).toString @@ -367,7 +394,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected def getPartitionColumnsAsInternalRow(file: FileStatus): InternalRow = { try { val tableConfig = metaClient.getTableConfig - if (shouldOmitPartitionColumns) { + if (shouldExtractPartitionValuesFromPartitionPath) { val relativePath = new URI(metaClient.getBasePath).relativize(new URI(file.getPath.getParent.toString)).toString val hiveStylePartitioningEnabled = tableConfig.getHiveStylePartitioningEnable.toBoolean if (hiveStylePartitioningEnabled) { @@ -420,9 +447,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, hadoopConf = hadoopConf ) - // We're delegating to Spark to append partition values to every row only in cases - // when these corresponding partition-values are not persisted w/in the data file itself - val shouldAppendPartitionColumns = shouldOmitPartitionColumns val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( sparkSession = spark, dataSchema = dataSchema.structTypeSchema, @@ -431,7 +455,9 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, filters = filters, options = options, hadoopConf = hadoopConf, - appendPartitionValues = shouldAppendPartitionColumns + // We're delegating to Spark to append partition values to every row only in cases + // when these corresponding partition-values are not persisted w/in the data file itself + appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath ) partitionedFile => { @@ -448,7 +474,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, private def tryPrunePartitionColumns(tableSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = { - if (shouldOmitPartitionColumns) { + if (shouldExtractPartitionValuesFromPartitionPath) { val partitionSchema = StructType(partitionColumns.map(StructField(_, StringType))) val prunedDataStructSchema = prunePartitionColumns(tableSchema.structTypeSchema) val prunedRequiredSchema = prunePartitionColumns(requiredSchema.structTypeSchema) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala index 1fc9e70a5a522..e430364be9423 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala @@ -50,7 +50,6 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport { options: Map[String, String], hadoopConf: Configuration, appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = { - val parquetFileFormat: ParquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get val readParquetFile: PartitionedFile => Iterator[Any] = parquetFileFormat.buildReaderWithPartitionValues( sparkSession = sparkSession, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 38062aa80209b..c5204fcd8e9e4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -160,7 +160,7 @@ object HoodieSparkSqlWriter { .setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)) .setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)) .setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile) - .setDropPartitionColumnsWhenWrite(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS)) + .setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS)) .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))) .initTable(sparkContext.hadoopConfiguration, path) tableConfig = tableMetaClient.getTableConfig diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala index dbb62d089ece8..a52e9335fe374 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormat.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet import org.apache.hadoop.conf.Configuration -import org.apache.hudi.SparkAdapterSupport +import org.apache.hudi.{DataSourceReadOptions, SparkAdapterSupport} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.PartitionedFile @@ -41,14 +41,16 @@ class HoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + val shouldExtractPartitionValuesFromPartitionPath = + options.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key, + DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean + sparkAdapter - .createHoodieParquetFileFormat(appendPartitionValues = false).get + .createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get .buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf) } } object HoodieParquetFileFormat { - val FILE_FORMAT_ID = "hoodie-parquet" - } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 28a6dcdcd60c9..a088d4e01be52 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -749,7 +749,7 @@ class TestCOWDataSource extends HoodieClientTestBase { @ParameterizedTest @ValueSource(booleans = Array(true, false)) - def testCopyOnWriteWithDropPartitionColumns(enableDropPartitionColumns: Boolean) { + def testCopyOnWriteWithDroppedPartitionColumns(enableDropPartitionColumns: Boolean) { val records1 = recordsToStrings(dataGen.generateInsertsContainsAllPartitions("000", 100)).toList val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) inputDF1.write.format("org.apache.hudi") @@ -900,7 +900,7 @@ class TestCOWDataSource extends HoodieClientTestBase { @ParameterizedTest @ValueSource(booleans = Array(true, false)) - def testHoodieBaseFileOnlyViewRelation(useGlobbing: Boolean): Unit = { + def testPartitionColumnsProperHandling(useGlobbing: Boolean): Unit = { val _spark = spark import _spark.implicits._ @@ -935,18 +935,41 @@ class TestCOWDataSource extends HoodieClientTestBase { basePath } - val res = spark.read.format("hudi").load(path) + // Case #1: Partition columns are read from the data file + val firstDF = spark.read.format("hudi").load(path) - assert(res.count() == 2) + assert(firstDF.count() == 2) // data_date is the partition field. Persist to the parquet file using the origin values, and read it. assertEquals( - res.select("data_date").map(_.get(0).toString).collect().sorted.toSeq, - Seq("2018-09-23", "2018-09-24") + Seq("2018-09-23", "2018-09-24"), + firstDF.select("data_date").map(_.get(0).toString).collect().sorted.toSeq ) assertEquals( - res.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq, - Seq("2018/09/23", "2018/09/24") + Seq("2018/09/23", "2018/09/24"), + firstDF.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq ) + + // Case #2: Partition columns are extracted from the partition path + // + // NOTE: This case is only relevant when globbing is NOT used, since when globbing is used Spark + // won't be able to infer partitioning properly + if (!useGlobbing) { + val secondDF = spark.read.format("hudi") + .option(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key, "true") + .load(path) + + assert(secondDF.count() == 2) + + // data_date is the partition field. Persist to the parquet file using the origin values, and read it. + assertEquals( + Seq("2018/09/23", "2018/09/24"), + secondDF.select("data_date").map(_.get(0).toString).collect().sorted.toSeq + ) + assertEquals( + Seq("2018/09/23", "2018/09/24"), + secondDF.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq + ) + } } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index a9d67a0d723ef..b086a6c9edbab 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -281,7 +281,7 @@ public void refreshTimeline() throws IOException { .setPreCombineField(cfg.sourceOrderingField) .setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())) - .setDropPartitionColumnsWhenWrite(isDropPartitionColumns()) + .setShouldDropPartitionColumns(isDropPartitionColumns()) .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath); } @@ -377,7 +377,7 @@ public Pair>> readFromSource( SimpleKeyGenerator.class.getName())) .setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(), HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())) - .setDropPartitionColumnsWhenWrite(isDropPartitionColumns()) + .setShouldDropPartitionColumns(isDropPartitionColumns()) .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath); }