Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,

override type FileSplit = HoodieBaseFileSplit

// TODO(HUDI-3204) this is to override behavior (exclusively) for COW tables to always extract
// partition values from partition path
// For more details please check HUDI-4161
// NOTE: This override has to mirror semantic of whenever this Relation is converted into [[HadoopFsRelation]],
// which is currently done for all cases, except when Schema Evolution is enabled
override protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
val enableSchemaOnRead = !internalSchema.isEmptySchema
!enableSchemaOnRead
}

override lazy val mandatoryFields: Seq[String] =
// TODO reconcile, record's key shouldn't be mandatory for base-file only relation
Seq(recordKeyField)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
// Controls whether partition columns (which are the source for the partition path values) should
// be omitted from persistence in the data files. On the read path it affects whether partition values (values
// of partition columns) will be read from the data file ot extracted from partition path
// of partition columns) will be read from the data file or extracted from partition path
val shouldOmitPartitionColumns = metaClient.getTableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty
val shouldExtractPartitionValueFromPath =
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
Expand Down Expand Up @@ -419,7 +419,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
}
} catch {
case NonFatal(e) =>
logWarning(s"Failed to get the right partition InternalRow for file : ${file.toString}")
logWarning(s"Failed to get the right partition InternalRow for file: ${file.toString}", e)
InternalRow.empty
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ case class HoodieFileIndex(spark: SparkSession,
* @return list of PartitionDirectory containing partition to base files mapping
*/
override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
val convertedPartitionFilters =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has to be removed since now there partition values are populated from partition path as is, and therefore users will have to query them as they are -- no conversion is needed therefore

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this also used in org.apache.hudi.MergeOnReadSnapshotRelation#collectFileSplits

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means some behavior changes where users still set timestamp keygen for writing, but query the physical partition paths directly without any keygen settings. And also means convertFilterForTimestampKeyGenerator() won't be needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MOR still does the conversion w/in the Relation itself

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xushiyan this reverts us back to behavior we had in 0.10 for COW -- original columns will be overridden with partition values by Spark

HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilters)

// Look up candidate files names in the col-stats index, if all of the following conditions are true
// - Data-skipping is enabled
// - Col-Stats Index is present
Expand Down Expand Up @@ -144,7 +141,7 @@ case class HoodieFileIndex(spark: SparkSession,
Seq(PartitionDirectory(InternalRow.empty, candidateFiles))
} else {
// Prune the partition path by the partition filters
val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, convertedPartitionFilters)
val prunedPartitions = prunePartition(cachedAllInputFileSlices.keySet.asScala.toSeq, partitionFilters)
var totalFileSize = 0
var candidateFileSize = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
def testPrunePartitionForTimestampBasedKeyGenerator(): Unit = {
val options = commonOpts ++ Map(
"hoodie.compact.inline" -> "false",
DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.TimestampBasedKeyGenerator",
Config.TIMESTAMP_TYPE_FIELD_PROP -> "DATE_STRING",
Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyy/MM/dd",
Expand Down Expand Up @@ -176,8 +176,11 @@ class TestCOWDataSource extends HoodieClientTestBase {

// snapshot query
val snapshotQueryRes = spark.read.format("hudi").load(basePath)
assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 20)
assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 30)
// TODO(HUDI-3204) we have to revert this to pre-existing behavior from 0.10
//assertEquals(snapshotQueryRes.where("partition = '2022-01-01'").count, 20)
//assertEquals(snapshotQueryRes.where("partition = '2022-01-02'").count, 30)
assertEquals(snapshotQueryRes.where("partition = '2022/01/01'").count, 20)
assertEquals(snapshotQueryRes.where("partition = '2022/01/02'").count, 30)

// incremental query
val incrementalQueryRes = spark.read.format("hudi")
Expand Down Expand Up @@ -961,10 +964,14 @@ class TestCOWDataSource extends HoodieClientTestBase {
assert(firstDF.count() == 2)

// data_date is the partition field. Persist to the parquet file using the origin values, and read it.
assertEquals(
Seq("2018-09-23", "2018-09-24"),
firstDF.select("data_date").map(_.get(0).toString).collect().sorted.toSeq
)
// TODO(HUDI-3204) we have to revert this to pre-existing behavior from 0.10
val expectedValues = if (useGlobbing) {
Seq("2018-09-23", "2018-09-24")
} else {
Seq("2018/09/23", "2018/09/24")
}

assertEquals(expectedValues, firstDF.select("data_date").map(_.get(0).toString).collect().sorted.toSeq)
assertEquals(
Seq("2018/09/23", "2018/09/24"),
firstDF.select("_hoodie_partition_path").map(_.get(0).toString).collect().sorted.toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

package org.apache.hudi.functional

import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, HoodieRecordPayload, HoodieTableType}
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
Expand All @@ -30,9 +29,8 @@ import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase}
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils, SparkDatasetMixin}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, SparkDatasetMixin}
import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.BooleanType
Expand All @@ -41,7 +39,6 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource

import java.util
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

Expand Down Expand Up @@ -864,8 +861,11 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
val readOptimizedQueryRes = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
.load(basePath)
assertEquals(readOptimizedQueryRes.where("partition = '2022-01-01'").count, 50)
assertEquals(readOptimizedQueryRes.where("partition = '2022-01-02'").count, 60)
// TODO(HUDI-3204) we have to revert this to pre-existing behavior from 0.10
//assertEquals(readOptimizedQueryRes.where("partition = '2022-01-01'").count, 50)
//assertEquals(readOptimizedQueryRes.where("partition = '2022-01-02'").count, 60)
assertEquals(readOptimizedQueryRes.where("partition = '2022/01/01'").count, 50)
assertEquals(readOptimizedQueryRes.where("partition = '2022/01/02'").count, 60)

// incremental query
val incrementalQueryRes = spark.read.format("hudi")
Expand Down