Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -1465,7 +1465,7 @@ protected final HoodieTable initTable(WriteOperationType operationType, Option<S
}

// Validate table properties
metaClient.validateTableProperties(config.getProps(), operationType);
metaClient.validateTableProperties(config.getProps());
// Make sure that FS View is in sync
table.getHoodieView().sync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public class HoodieTableConfig extends HoodieConfig {
public static final ConfigProperty<Boolean> PARTITION_METAFILE_USE_BASE_FORMAT = ConfigProperty
.key("hoodie.partition.metafile.use.base.format")
.defaultValue(false)
.withDocumentation("If true, partition metafiles are saved in the same format as basefiles for this dataset (e.g. Parquet / ORC). "
.withDocumentation("If true, partition metafiles are saved in the same format as base-files for this dataset (e.g. Parquet / ORC). "
+ "If false (default) partition metafiles are saved as properties files.");

public static final ConfigProperty<Boolean> DROP_PARTITION_COLUMNS = ConfigProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieTimelineTimeZone;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand Down Expand Up @@ -377,16 +376,15 @@ public HoodieArchivedTimeline getArchivedTimeline(String startTs) {
/**
* Validate table properties.
* @param properties Properties from writeConfig.
* @param operationType operation type to be executed.
*/
public void validateTableProperties(Properties properties, WriteOperationType operationType) {
// once meta fields are disabled, it cant be re-enabled for a given table.
public void validateTableProperties(Properties properties) {
// Once meta fields are disabled, it cant be re-enabled for a given table.
if (!getTableConfig().populateMetaFields()
&& Boolean.parseBoolean((String) properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))) {
throw new HoodieException(HoodieTableConfig.POPULATE_META_FIELDS.key() + " already disabled for the table. Can't be re-enabled back");
}

// meta fields can be disabled only with SimpleKeyGenerator
// Meta fields can be disabled only when {@code SimpleKeyGenerator} is used
if (!getTableConfig().populateMetaFields()
&& !properties.getProperty(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.SimpleKeyGenerator")
.equals("org.apache.hudi.keygen.SimpleKeyGenerator")) {
Expand Down Expand Up @@ -698,7 +696,7 @@ public static class PropertyBuilder {
private Boolean urlEncodePartitioning;
private HoodieTimelineTimeZone commitTimeZone;
private Boolean partitionMetafileUseBaseFormat;
private Boolean dropPartitionColumnsWhenWrite;
private Boolean shouldDropPartitionColumns;
private String metadataPartitions;
private String inflightMetadataPartitions;

Expand Down Expand Up @@ -820,8 +818,8 @@ public PropertyBuilder setPartitionMetafileUseBaseFormat(Boolean useBaseFormat)
return this;
}

public PropertyBuilder setDropPartitionColumnsWhenWrite(Boolean dropPartitionColumnsWhenWrite) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having "write" in the name makes is clear. If not, one could read it as "should drop partition columns when reading". So, I feel we can leave it as is.

this.dropPartitionColumnsWhenWrite = dropPartitionColumnsWhenWrite;
public PropertyBuilder setShouldDropPartitionColumns(Boolean shouldDropPartitionColumns) {
this.shouldDropPartitionColumns = shouldDropPartitionColumns;
return this;
}

Expand Down Expand Up @@ -933,15 +931,12 @@ public PropertyBuilder fromProperties(Properties properties) {
if (hoodieConfig.contains(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT)) {
setPartitionMetafileUseBaseFormat(hoodieConfig.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT));
}

if (hoodieConfig.contains(HoodieTableConfig.DROP_PARTITION_COLUMNS)) {
setDropPartitionColumnsWhenWrite(hoodieConfig.getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS));
setShouldDropPartitionColumns(hoodieConfig.getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS));
}

if (hoodieConfig.contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS)) {
setMetadataPartitions(hoodieConfig.getString(HoodieTableConfig.TABLE_METADATA_PARTITIONS));
}

if (hoodieConfig.contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT)) {
setInflightMetadataPartitions(hoodieConfig.getString(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT));
}
Expand Down Expand Up @@ -1026,15 +1021,12 @@ public Properties build() {
if (null != partitionMetafileUseBaseFormat) {
tableConfig.setValue(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT, partitionMetafileUseBaseFormat.toString());
}

if (null != dropPartitionColumnsWhenWrite) {
tableConfig.setValue(HoodieTableConfig.DROP_PARTITION_COLUMNS, Boolean.toString(dropPartitionColumnsWhenWrite));
if (null != shouldDropPartitionColumns) {
tableConfig.setValue(HoodieTableConfig.DROP_PARTITION_COLUMNS, Boolean.toString(shouldDropPartitionColumns));
}

if (null != metadataPartitions) {
tableConfig.setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS, metadataPartitions);
}

if (null != inflightMetadataPartitions) {
tableConfig.setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT, inflightMetadataPartitions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public DataSourceInternalWriterHelper(String instantTime, HoodieWriteConfig writ
writeClient.startCommitWithTime(instantTime);

this.metaClient = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build();
this.metaClient.validateTableProperties(writeConfig.getProps(), WriteOperationType.BULK_INSERT);
this.metaClient.validateTableProperties(writeConfig.getProps());
this.hoodieTable = HoodieSparkTable.create(writeConfig, new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), metaClient);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,29 +114,29 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
* rule; you can find more details in HUDI-3896)
*/
def toHadoopFsRelation: HadoopFsRelation = {
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
val shouldAppendPartitionColumns = shouldOmitPartitionColumns

val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match {
case HoodieFileFormat.PARQUET =>
(sparkAdapter.createHoodieParquetFileFormat(shouldAppendPartitionColumns).get, HoodieParquetFileFormat.FILE_FORMAT_ID)
case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
}
val (tableFileFormat, formatClassName) =
metaClient.getTableConfig.getBaseFileFormat match {
case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
case HoodieFileFormat.PARQUET =>
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
val parquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(shouldExtractPartitionValuesFromPartitionPath).get
(parquetFileFormat, HoodieParquetFileFormat.FILE_FORMAT_ID)
}

if (globPaths.isEmpty) {
// NOTE: There are currently 2 ways partition values could be fetched:
// - Source columns (producing the values used for physical partitioning) will be read
// from the data file
// - Values parsed from the actual partition pat would be appended to the final dataset
// - Values parsed from the actual partition path would be appended to the final dataset
//
// In the former case, we don't need to provide the partition-schema to the relation,
// therefore we simply stub it w/ empty schema and use full table-schema as the one being
// read from the data file.
//
// In the latter, we have to specify proper partition schema as well as "data"-schema, essentially
// being a table-schema with all partition columns stripped out
val (partitionSchema, dataSchema) = if (shouldAppendPartitionColumns) {
val (partitionSchema, dataSchema) = if (shouldExtractPartitionValuesFromPartitionPath) {
(fileIndex.partitionSchema, fileIndex.dataSchema)
} else {
(StructType(Nil), tableStructSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
package org.apache.hudi

import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.config.{ConfigProperty, HoodieConfig}
import org.apache.hudi.common.fs.ConsistencyGuardConfig
import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.Option
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool, MultiPartKeysValueExtractor, NonPartitionedExtractor, SlashEncodedDayPartitionValueExtractor}
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.sync.common.HoodieSyncConfig
Expand All @@ -45,6 +47,7 @@ import scala.language.implicitConversions
* Options supported for reading hoodie tables.
*/
object DataSourceReadOptions {
import DataSourceOptionsHelper._

val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot"
val QUERY_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized"
Expand Down Expand Up @@ -124,6 +127,15 @@ object DataSourceReadOptions {
.withDocumentation("Enables data-skipping allowing queries to leverage indexes to reduce the search space by " +
"skipping over files")

val EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH: ConfigProperty[Boolean] =
ConfigProperty.key("hoodie.datasource.read.extract.partition.values.from.path")
.defaultValue(false)
.sinceVersion("0.11.0")
.withDocumentation("When set to true, values for partition columns (partition values) will be extracted" +
" from physical partition path (default Spark behavior). When set to false partition values will be" +
" read from the data file (in Hudi partition columns are persisted by default)." +
" This config is a fallback allowing to preserve existing behavior, and should not be used otherwise.")

val INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.incr.fallback.fulltablescan.enable")
.defaultValue("false")
Expand Down Expand Up @@ -185,6 +197,8 @@ object DataSourceReadOptions {
*/
object DataSourceWriteOptions {

import DataSourceOptionsHelper._

val BULK_INSERT_OPERATION_OPT_VAL = WriteOperationType.BULK_INSERT.value
val INSERT_OPERATION_OPT_VAL = WriteOperationType.INSERT.value
val UPSERT_OPERATION_OPT_VAL = WriteOperationType.UPSERT.value
Expand Down Expand Up @@ -471,10 +485,7 @@ object DataSourceWriteOptions {
.sinceVersion("0.9.0")
.withDocumentation("This class is used by kafka client to deserialize the records")

val DROP_PARTITION_COLUMNS: ConfigProperty[Boolean] = ConfigProperty
.key(HoodieTableConfig.DROP_PARTITION_COLUMNS.key())
.defaultValue(HoodieTableConfig.DROP_PARTITION_COLUMNS.defaultValue().booleanValue())
.withDocumentation(HoodieTableConfig.DROP_PARTITION_COLUMNS.doc())
val DROP_PARTITION_COLUMNS: ConfigProperty[Boolean] = HoodieTableConfig.DROP_PARTITION_COLUMNS

/** @deprecated Use {@link HIVE_ASSUME_DATE_PARTITION} and its methods instead */
@Deprecated
Expand Down Expand Up @@ -774,4 +785,23 @@ object DataSourceOptionsHelper {
override def apply (input: From): To = function (input)
}
}

implicit def convert[T, U](prop: ConfigProperty[T])(implicit converter: T => U): ConfigProperty[U] = {
checkState(prop.hasDefaultValue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could implicitly break when add a new config with no default. i see this improves code quality but we should avoid nice-to-have changes in the last min patch before release.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this will break it will break when the class is loaded, meaning that all the tests using class would be broken, which is very easy to diagnose

var newProp: ConfigProperty[U] = ConfigProperty.key(prop.key())
.defaultValue(converter(prop.defaultValue()))
.withDocumentation(prop.doc())
.withAlternatives(prop.getAlternatives.asScala: _*)

newProp = toScalaOption(prop.getSinceVersion) match {
case Some(version) => newProp.sinceVersion(version)
case None => newProp
}
newProp = toScalaOption(prop.getDeprecatedVersion) match {
case Some(version) => newProp.deprecatedAfter(version)
case None => newProp
}

newProp
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,36 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

protected val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty)

protected val shouldOmitPartitionColumns: Boolean =
metaClient.getTableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty
/**
* Controls whether partition values (ie values of partition columns) should be
* <ol>
* <li>Extracted from partition path and appended to individual rows read from the data file (we
* delegate this to Spark's [[ParquetFileFormat]])</li>
* <li>Read from the data-file as is (by default Hudi persists all columns including partition ones)</li>
* </ol>
*
* This flag is only be relevant in conjunction with the usage of [["hoodie.datasource.write.drop.partition.columns"]]
* config, when Hudi will NOT be persisting partition columns in the data file, and therefore values for
* such partition columns (ie "partition values") will have to be parsed from the partition path, and appended
* to every row only in the fetched dataset.
*
* NOTE: Partition values extracted from partition path might be deviating from the values of the original
* partition columns: for ex, if originally as partition column was used column [[ts]] bearing epoch
* timestamp, which was used by [[TimestampBasedKeyGenerator]] to generate partition path of the format
* [["yyyy/mm/dd"]], appended partition value would bear the format verbatim as it was used in the
* partition path, meaning that string value of "2022/01/01" will be appended, and not its original
* representation
*/
protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
// Controls whether partition columns (which are the source for the partition path values) should
// be omitted from persistence in the data files. On the read path it affects whether partition values (values
// of partition columns) will be read from the data file ot extracted from partition path
val shouldOmitPartitionColumns = metaClient.getTableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty
val shouldExtractPartitionValueFromPath =
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath
}

/**
* NOTE: PLEASE READ THIS CAREFULLY
Expand Down Expand Up @@ -228,7 +256,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

val fileSplits = collectFileSplits(partitionFilters, dataFilters)


val tableAvroSchemaStr =
if (internalSchema.isEmptySchema) tableAvroSchema.toString
else AvroInternalSchemaConverter.convert(internalSchema, tableAvroSchema.getName).toString
Expand Down Expand Up @@ -367,7 +394,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected def getPartitionColumnsAsInternalRow(file: FileStatus): InternalRow = {
try {
val tableConfig = metaClient.getTableConfig
if (shouldOmitPartitionColumns) {
if (shouldExtractPartitionValuesFromPartitionPath) {
val relativePath = new URI(metaClient.getBasePath).relativize(new URI(file.getPath.getParent.toString)).toString
val hiveStylePartitioningEnabled = tableConfig.getHiveStylePartitioningEnable.toBoolean
if (hiveStylePartitioningEnabled) {
Expand Down Expand Up @@ -420,9 +447,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
hadoopConf = hadoopConf
)

// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
val shouldAppendPartitionColumns = shouldOmitPartitionColumns
val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = spark,
dataSchema = dataSchema.structTypeSchema,
Expand All @@ -431,7 +455,9 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
filters = filters,
options = options,
hadoopConf = hadoopConf,
appendPartitionValues = shouldAppendPartitionColumns
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
)

partitionedFile => {
Expand All @@ -448,7 +474,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

private def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = {
if (shouldOmitPartitionColumns) {
if (shouldExtractPartitionValuesFromPartitionPath) {
val partitionSchema = StructType(partitionColumns.map(StructField(_, StringType)))
val prunedDataStructSchema = prunePartitionColumns(tableSchema.structTypeSchema)
val prunedRequiredSchema = prunePartitionColumns(requiredSchema.structTypeSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport {
options: Map[String, String],
hadoopConf: Configuration,
appendPartitionValues: Boolean = false): PartitionedFile => Iterator[InternalRow] = {

val parquetFileFormat: ParquetFileFormat = sparkAdapter.createHoodieParquetFileFormat(appendPartitionValues).get
val readParquetFile: PartitionedFile => Iterator[Any] = parquetFileFormat.buildReaderWithPartitionValues(
sparkSession = sparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ object HoodieSparkSqlWriter {
.setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING))
.setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING))
.setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
.setDropPartitionColumnsWhenWrite(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
.setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
.setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE)))
.initTable(sparkContext.hadoopConfiguration, path)
tableConfig = tableMetaClient.getTableConfig
Expand Down
Loading