Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
if (targetColumns.nonEmpty) {
readColumnStatsIndexForColumnsInternal(spark, targetColumns, metadataConfig, tableBasePath)
} else {
readFullColumnStatsIndexInternal(spark, tableBasePath)
readFullColumnStatsIndexInternal(spark, metadataConfig, tableBasePath)
}
}

Expand Down Expand Up @@ -181,10 +181,11 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
spark.createDataFrame(transposedRDD, indexSchema)
}

private def readFullColumnStatsIndexInternal(spark: SparkSession, tableBasePath: String) = {
private def readFullColumnStatsIndexInternal(spark: SparkSession, metadataConfig: HoodieMetadataConfig, tableBasePath: String): DataFrame = {
val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(tableBasePath)
// Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
spark.read.format("org.apache.hudi")
.options(metadataConfig.getProps.asScala)
.load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,6 @@ case class HoodieFileIndex(spark: SparkSession,

override def rootPaths: Seq[Path] = queryPaths.asScala

def isDataSkippingEnabled: Boolean = {
options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean
}

/**
* Returns the FileStatus for all the base files (excluding log files). This should be used only for
* cases where Spark directly fetches the list of files via HoodieFileIndex or for read optimized query logic
Expand Down Expand Up @@ -196,12 +191,20 @@ case class HoodieFileIndex(spark: SparkSession,
* @return list of pruned (data-skipped) candidate base-files' names
*/
private def lookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try {
if (!isDataSkippingEnabled || queryFilters.isEmpty || !HoodieTableMetadataUtil.getCompletedMetadataPartitions(metaClient.getTableConfig)
.contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)) {
// NOTE: Data Skipping is only effective when it references columns that are indexed w/in
// the Column Stats Index (CSI). Following cases could not be effectively handled by Data Skipping:
// - Expressions on top-level column's fields (ie, for ex filters like "struct.field > 0", since
// CSI only contains stats for top-level columns, in this case for "struct")
// - Any expression not directly referencing top-level column (for ex, sub-queries, since there's
// nothing CSI in particular could be applied for)
lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)

if (!isMetadataTableEnabled || !isColumnStatsIndexAvailable || !isDataSkippingEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that CSI does not have stats for top level columns, if predicate references both top level and non-top level columns, we gonna skip leveraging CSI is it? since anyways, for non top level column, we have to visit all data files?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, how do we deduce what columns have been indexed in MDT CSI?
for eg, we have two flows.
a. hoodie.metadata.index.column.stats.all_columns.enable = true, where in all cols will be enabled.
b. hoodie.metadata.index.column.stats.column.list set to list of columns to be indexed.

So, when we are looking to apply data skipping on the query side, should we check for these configs and decided whether a particular col is indexed by CSI or not ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that CSI does not have stats for top level columns, if predicate references both top level and non-top level columns, we gonna skip leveraging CSI is it? since anyways, for non top level column, we have to visit all data files?

It depends on the predicate, but we will at least try to leverage it to filter out for top-level columns only

So, when we are looking to apply data skipping on the query side, should we check for these configs and decided whether a particular col is indexed by CSI or not ?

We can't do that, we have to play by what's actually in index: this is handled when we execute the filter against lookup table -- if it doesn't contain the column of the filter, it will just match all of the files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, your question made me realize that we're actually deriving index schema incorrectly currently. Let me address that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsivabalan i'm addressing this problem in a separate PR to avoid overloading this one: #5275

validateConfig()
Option.empty
} else if (queryFilters.isEmpty || queryReferencedColumns.isEmpty) {
Option.empty
} else {
val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)

val colStatsDF: DataFrame = readColumnStatsIndex(spark, basePath, metadataConfig, queryReferencedColumns)

// Persist DF to avoid re-computing column statistics unraveling
Expand Down Expand Up @@ -245,13 +248,27 @@ case class HoodieFileIndex(spark: SparkSession,

override def refresh(): Unit = super.refresh()

override def inputFiles: Array[String] = {
val fileStatusList = allFiles
fileStatusList.map(_.getPath.toString).toArray
}
override def inputFiles: Array[String] =
allFiles.map(_.getPath.toString).toArray

override def sizeInBytes: Long = {
cachedFileSize
override def sizeInBytes: Long = cachedFileSize

private def isColumnStatsIndexAvailable =
HoodieTableMetadataUtil.getCompletedMetadataPartitions(metaClient.getTableConfig)
.contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)

private def isDataSkippingEnabled: Boolean =
options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean

private def isMetadataTableEnabled: Boolean = metadataConfig.enabled()
private def isColumnStatsIndexEnabled: Boolean = metadataConfig.isColumnStatsIndexEnabled

private def validateConfig(): Unit = {
if (isDataSkippingEnabled && (!isMetadataTableEnabled || !isColumnStatsIndexEnabled)) {
logWarning("Data skipping requires both Metadata Table and Column Stats Index to be enabled as well! " +
s"(isMetadataTableEnabled = ${isMetadataTableEnabled}, isColumnStatsIndexEnabled = ${isColumnStatsIndexEnabled}")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ import org.apache.hudi.client.HoodieJavaWriteClient
import org.apache.hudi.client.common.HoodieJavaEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.engine.EngineType
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableQueryType, HoodieTableType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtils}
Expand All @@ -38,17 +37,15 @@ import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.metadata.{HoodieTableMetadata, MetadataPartitionType}
import org.apache.hudi.testutils.{HoodieClientTestBase, SparkClientFunctionalTestHarness}
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, GreaterThanOrEqual, LessThan, Literal}
import org.apache.spark.sql.execution.datasources.{NoopCache, PartitionDirectory}
import org.apache.spark.sql.functions.{lit, struct}
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.apache.spark.sql.{DataFrameWriter, Row, SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{BeforeEach, Tag, Test}
import org.junit.jupiter.api.{BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, CsvSource, MethodSource, ValueSource}

Expand Down Expand Up @@ -343,16 +340,19 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
import _spark.implicits._
val inputDF = tuples.toDF("id", "inv_id", "str", "rand")

val writeMetadataOpts = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
)

val opts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
RECORDKEY_FIELD.key -> "id",
PRECOMBINE_FIELD.key -> "id",
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
)
) ++ writeMetadataOpts

// If there are any failures in the Data Skipping flow, test should fail
spark.sqlContext.setConf(DataSkippingFailureMode.configName, DataSkippingFailureMode.Strict.value);
Expand All @@ -368,26 +368,46 @@ class TestHoodieFileIndex extends HoodieClientTestBase {

metaClient = HoodieTableMetaClient.reload(metaClient)

val props = Map[String, String](
"path" -> basePath,
QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL,
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
// NOTE: Metadata Table has to be enabled on the read path as well
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
)

val fileIndex = HoodieFileIndex(spark, metaClient, Option.empty, props, NoopCache)

val allFilesPartitions = fileIndex.listFiles(Seq(), Seq())
assertEquals(10, allFilesPartitions.head.files.length)

// We're selecting a single file that contains "id" == 1 row, which there should be
// strictly 1. Given that 1 is minimal possible value, Data Skipping should be able to
// truncate search space to just a single file
val dataFilter = EqualTo(AttributeReference("id", IntegerType, nullable = false)(), Literal(1))
val filteredPartitions = fileIndex.listFiles(Seq(), Seq(dataFilter))
assertEquals(1, filteredPartitions.head.files.length)
case class TestCase(enableMetadata: Boolean,
enableColumnStats: Boolean,
enableDataSkipping: Boolean)

val testCases: Seq[TestCase] =
TestCase(enableMetadata = false, enableColumnStats = false, enableDataSkipping = false) ::
TestCase(enableMetadata = false, enableColumnStats = false, enableDataSkipping = true) ::
TestCase(enableMetadata = true, enableColumnStats = false, enableDataSkipping = true) ::
TestCase(enableMetadata = false, enableColumnStats = true, enableDataSkipping = true) ::
TestCase(enableMetadata = true, enableColumnStats = true, enableDataSkipping = true) ::
Nil

for (testCase <- testCases) {
val readMetadataOpts = Map(
// NOTE: Metadata Table has to be enabled on the read path as well
HoodieMetadataConfig.ENABLE.key -> testCase.enableMetadata.toString,
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> testCase.enableColumnStats.toString,
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
)

val props = Map[String, String](
"path" -> basePath,
QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL,
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> testCase.enableDataSkipping.toString
) ++ readMetadataOpts

val fileIndex = HoodieFileIndex(spark, metaClient, Option.empty, props, NoopCache)

val allFilesPartitions = fileIndex.listFiles(Seq(), Seq())
assertEquals(10, allFilesPartitions.head.files.length)

if (testCase.enableDataSkipping && testCase.enableMetadata) {
// We're selecting a single file that contains "id" == 1 row, which there should be
// strictly 1. Given that 1 is minimal possible value, Data Skipping should be able to
// truncate search space to just a single file
val dataFilter = EqualTo(AttributeReference("id", IntegerType, nullable = false)(), Literal(1))
val filteredPartitions = fileIndex.listFiles(Seq(), Seq(dataFilter))
assertEquals(1, filteredPartitions.head.files.length)
}
}
}

private def attribute(partition: String): AttributeReference = {
Expand All @@ -411,6 +431,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
}

object TestHoodieFileIndex {

def keyGeneratorParameters(): java.util.stream.Stream[Arguments] = {
java.util.stream.Stream.of(
Arguments.arguments(null.asInstanceOf[String]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
package org.apache.hudi.functional

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
import org.apache.hadoop.fs.{LocatedFileStatus, Path}
import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.ParquetUtils
import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
import org.apache.hudi.functional.TestColumnStatsIndex.ColumnStatsTestCase
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
import org.apache.spark.sql._
Expand All @@ -35,7 +36,7 @@ import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue}
import org.junit.jupiter.api._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.params.provider.{Arguments, MethodSource}

import java.math.BigInteger
import java.sql.{Date, Timestamp}
Expand Down Expand Up @@ -72,19 +73,25 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
}

@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testMetadataColumnStatsIndex(forceFullLogScan: Boolean): Unit = {
@MethodSource(Array("testMetadataColumnStatsIndexParams"))
def testMetadataColumnStatsIndex(testCase: ColumnStatsTestCase): Unit = {
val metadataOpts = Map(
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true"
)

val opts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
RECORDKEY_FIELD.key -> "c1",
PRECOMBINE_FIELD.key -> "c1",
HoodieMetadataConfig.ENABLE.key -> "true",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.key -> forceFullLogScan.toString,
// NOTE: Currently only this setting is used like following by different MT partitions:
// - Files: using it
// - Column Stats: NOT using it (defaults to doing "point-lookups")
HoodieMetadataConfig.ENABLE_FULL_SCAN_LOG_FILES.key -> testCase.forceFullLogScan.toString,
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
)
) ++ metadataOpts

setTableName("hoodie_test")
initMetaClient()
Expand All @@ -108,10 +115,17 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
metaClient = HoodieTableMetaClient.reload(metaClient)

val metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(toProperties(opts))
.fromProperties(toProperties(metadataOpts))
.build()

val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, sourceTableSchema.fieldNames)
val targetColumnsToRead: Seq[String] = {
// Providing empty seq of columns to [[readColumnStatsIndex]] will lead to the whole
// MT to be read, and subsequently filtered
if (testCase.readFullMetadataTable) Seq.empty
else sourceTableSchema.fieldNames
}

val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, targetColumnsToRead)
val transposedColStatsDF = transposeColumnStatsIndex(spark, colStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)

val expectedColStatsSchema = composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema)
Expand Down Expand Up @@ -151,7 +165,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup

metaClient = HoodieTableMetaClient.reload(metaClient)

val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, sourceTableSchema.fieldNames)
val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, targetColumnsToRead)
val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, updatedColStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)

val expectedColStatsIndexUpdatedDF =
Expand Down Expand Up @@ -243,26 +257,6 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
)
}

def bootstrapParquetInputTableFromJSON(sourceJSONTablePath: String, targetParquetTablePath: String): Unit = {
val jsonInputDF =
// NOTE: Schema here is provided for validation that the input date is in the appropriate format
spark.read
.schema(sourceTableSchema)
.json(sourceJSONTablePath)

jsonInputDF
.sort("c1")
.repartition(4, new Column("c1"))
.write
.format("parquet")
.mode("overwrite")
.save(targetParquetTablePath)

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
// Have to cleanup additional artefacts of Spark write
fs.delete(new Path(targetParquetTablePath, "_SUCCESS"), false)
}

private def generateRandomDataFrame(spark: SparkSession): DataFrame = {
val sourceTableSchema =
new StructType()
Expand Down Expand Up @@ -316,3 +310,14 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
}

}

object TestColumnStatsIndex {

case class ColumnStatsTestCase(forceFullLogScan: Boolean, readFullMetadataTable: Boolean)

def testMetadataColumnStatsIndexParams: java.util.stream.Stream[Arguments] =
java.util.stream.Stream.of(
Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = false, readFullMetadataTable = false)),
Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = true, readFullMetadataTable = true))
)
}