diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala index 4a7dca840876d..8a612f4da2c64 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala @@ -17,15 +17,43 @@ package org.apache.hudi.util +import org.apache.hudi.common.function.{SerializableFunction, SerializablePairFunction} +import org.apache.hudi.common.util.collection + +import scala.language.implicitConversions + /** * Utility allowing for seamless conversion b/w Java/Scala functional primitives */ object JFunction { - def toScala[T, R](f: java.util.function.Function[T, R]): T => R = + //////////////////////////////////////////////////////////// + // From Java to Scala + //////////////////////////////////////////////////////////// + + implicit def toScala[T, R](f: java.util.function.Function[T, R]): T => R = (t: T) => f.apply(t) - def toJava[T](f: T => Unit): java.util.function.Consumer[T] = + //////////////////////////////////////////////////////////// + // From Scala to Java + //////////////////////////////////////////////////////////// + + implicit def toJavaFunction[T, R](f: Function[T, R]): java.util.function.Function[T, R] = + new java.util.function.Function[T, R] { + override def apply(t: T): R = f.apply(t) + } + + implicit def toJavaSerializableFunction[T, R](f: Function[T, R]): SerializableFunction[T, R] = + new SerializableFunction[T, R] { + override def apply(t: T): R = f.apply(t) + } + + implicit def toJavaSerializablePairFunction[T, K, V](f: Function[T, collection.Pair[K, V]]): SerializablePairFunction[T, K, V] = + new SerializablePairFunction[T, K, V] { + override def call(t: T): collection.Pair[K, V] = f.apply(t) + } + + implicit def toJava[T](f: T => Unit): java.util.function.Consumer[T] = new java.util.function.Consumer[T] { override def accept(t: T): Unit = f.apply(t) } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index cd30528798d66..24f4e6117a686 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -27,12 +27,16 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate} import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.{FilePartition, LogicalRelation, PartitionedFile, SparkParsePartitionUtil} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.DataType import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, Row, SparkSession} +import org.apache.spark.storage.StorageLevel import java.util.Locale @@ -138,4 +142,9 @@ trait SparkAdapter extends Serializable { * TODO move to HoodieCatalystExpressionUtils */ def createInterpretedPredicate(e: Expression): InterpretedPredicate + + /** + * Converts instance of [[StorageLevel]] to a corresponding string + */ + def convertStorageLevelToString(level: StorageLevel): String } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index e10c372be64c7..8828ceab6dcda 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -1504,7 +1504,7 @@ public void testColStatsPrefixLookup() throws IOException { // prefix search for column (_hoodie_record_key) ColumnIndexID columnIndexID = new ColumnIndexID(HoodieRecord.RECORD_KEY_METADATA_FIELD); List> result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString()), - MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList(); + MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true).collectAsList(); // there are 3 partitions in total and 2 commits. total entries should be 6. assertEquals(result.size(), 6); @@ -1515,7 +1515,7 @@ public void testColStatsPrefixLookup() throws IOException { // prefix search for col(_hoodie_record_key) and first partition. only 2 files should be matched PartitionIndexID partitionIndexID = new PartitionIndexID(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString())), - MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList(); + MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true).collectAsList(); // 1 partition and 2 commits. total entries should be 2. assertEquals(result.size(), 2); result.forEach(entry -> { @@ -1534,7 +1534,7 @@ public void testColStatsPrefixLookup() throws IOException { // prefix search for column {commit time} and first partition columnIndexID = new ColumnIndexID(HoodieRecord.COMMIT_TIME_METADATA_FIELD); result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString())), - MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList(); + MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true).collectAsList(); // 1 partition and 2 commits. total entries should be 2. assertEquals(result.size(), 2); diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index ec70653b9c124..0b2c34618ed6c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hudi.hadoop.CachingPath; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -62,7 +63,7 @@ *
  • Query instant/range
  • * */ -public abstract class BaseHoodieTableFileIndex { +public abstract class BaseHoodieTableFileIndex { private static final Logger LOG = LogManager.getLogger(BaseHoodieTableFileIndex.class); @@ -166,6 +167,11 @@ public Map> listFileSlices() { .collect(Collectors.toMap(e -> e.getKey().path, Map.Entry::getValue)); } + public int getFileSlicesCount() { + return cachedAllInputFileSlices.values().stream() + .mapToInt(List::size).sum(); + } + protected List getAllQueryPartitionPaths() { List queryRelativePartitionPaths = queryPaths.stream() .map(path -> FSUtils.getRelativePartitionPath(new Path(basePath), path)) @@ -349,10 +355,10 @@ public String getPath() { Path fullPartitionPath(String basePath) { if (!path.isEmpty()) { - return new Path(basePath, path); + return new CachingPath(basePath, path); } - return new Path(basePath); + return new CachingPath(basePath); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 2cd08b9ae9805..b16373ef83436 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -187,6 +187,26 @@ public final class HoodieMetadataConfig extends HoodieConfig { .sinceVersion("0.11.0") .withDocumentation("Comma-separated list of columns for which column stats index will be built. If not set, all columns will be indexed"); + public static final String COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY = "in-memory"; + public static final String COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE = "engine"; + + public static final ConfigProperty COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE = ConfigProperty + .key(METADATA_PREFIX + ".index.column.stats.processing.mode.override") + .noDefaultValue() + .withValidValues(COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY, COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE) + .sinceVersion("0.12.0") + .withDocumentation("By default Column Stats Index is automatically determining whether it should be read and processed either" + + "'in-memory' (w/in executing process) or using Spark (on a cluster), based on some factors like the size of the Index " + + "and how many columns are read. This config allows to override this behavior."); + + public static final ConfigProperty COLUMN_STATS_INDEX_IN_MEMORY_PROJECTION_THRESHOLD = ConfigProperty + .key(METADATA_PREFIX + ".index.column.stats.inMemory.projection.threshold") + .defaultValue(100000) + .sinceVersion("0.12.0") + .withDocumentation("When reading Column Stats Index, if the size of the expected resulting projection is below the in-memory" + + " threshold (counted by the # of rows), it will be attempted to be loaded \"in-memory\" (ie not using the execution engine" + + " like Spark, Flink, etc). If the value is above the threshold execution engine will be used to compose the projection."); + public static final ConfigProperty BLOOM_FILTER_INDEX_FOR_COLUMNS = ConfigProperty .key(METADATA_PREFIX + ".index.bloom.filter.column.list") .noDefaultValue() @@ -246,6 +266,14 @@ public List getColumnsEnabledForColumnStatsIndex() { return StringUtils.split(getString(COLUMN_STATS_INDEX_FOR_COLUMNS), CONFIG_VALUES_DELIMITER); } + public String getColumnStatsIndexProcessingModeOverride() { + return getString(COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE); + } + + public Integer getColumnStatsIndexInMemoryProjectionThreshold() { + return getIntOrDefault(COLUMN_STATS_INDEX_IN_MEMORY_PROJECTION_THRESHOLD); + } + public List getColumnsEnabledForBloomFilterIndex() { return StringUtils.split(getString(BLOOM_FILTER_INDEX_FOR_COLUMNS), CONFIG_VALUES_DELIMITER); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index f029995ba0122..9877755b3c8c6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -168,7 +168,7 @@ public Map, HoodieMetadataColumnStats> getColumnStats(final } @Override - public HoodieData> getRecordsByKeyPrefixes(List keyPrefixes, String partitionName) { + public HoodieData> getRecordsByKeyPrefixes(List keyPrefixes, String partitionName, boolean shouldLoadInMemory) { throw new HoodieMetadataException("Unsupported operation: getRecordsByKeyPrefixes!"); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index e96889f044a9a..f8a0389da3d4c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.model.FileSlice; @@ -143,10 +144,11 @@ protected Option> getRecordByKey(String key, @Override public HoodieData> getRecordsByKeyPrefixes(List keyPrefixes, - String partitionName) { + String partitionName, + boolean shouldLoadInMemory) { // Sort the columns so that keys are looked up in order - List sortedkeyPrefixes = new ArrayList<>(keyPrefixes); - Collections.sort(sortedkeyPrefixes); + List sortedKeyPrefixes = new ArrayList<>(keyPrefixes); + Collections.sort(sortedKeyPrefixes); // NOTE: Since we partition records to a particular file-group by full key, we will have // to scan all file-groups for all key-prefixes as each of these might contain some @@ -154,44 +156,44 @@ public HoodieData> getRecordsByKeyPrefixes(L List partitionFileSlices = HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName); - return engineContext.parallelize(partitionFileSlices) - .flatMap( - (SerializableFunction>>>>) fileSlice -> { - // NOTE: Since this will be executed by executors, we can't access previously cached - // readers, and therefore have to always open new ones - Pair readers = - openReaders(partitionName, fileSlice); - try { - List timings = new ArrayList<>(); - - HoodieFileReader baseFileReader = readers.getKey(); - HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight(); - - if (baseFileReader == null && logRecordScanner == null) { - // TODO: what do we do if both does not exist? should we throw an exception and let caller do the fallback ? - return Collections.emptyIterator(); - } - - boolean fullKeys = false; - - Map>> logRecords = - readLogRecords(logRecordScanner, sortedkeyPrefixes, fullKeys, timings); - - List>>> mergedRecords = - readFromBaseAndMergeWithLogRecords(baseFileReader, sortedkeyPrefixes, fullKeys, logRecords, timings, partitionName); - - LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms", - sortedkeyPrefixes.size(), timings)); - - return mergedRecords.iterator(); - } catch (IOException ioe) { - throw new HoodieIOException("Error merging records from metadata table for " + sortedkeyPrefixes.size() + " key : ", ioe); - } finally { - closeReader(readers); - } + return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) : engineContext.parallelize(partitionFileSlices)) + .flatMap((SerializableFunction>>) fileSlice -> { + // NOTE: Since this will be executed by executors, we can't access previously cached + // readers, and therefore have to always open new ones + Pair readers = + openReaders(partitionName, fileSlice); + + try { + List timings = new ArrayList<>(); + + HoodieFileReader baseFileReader = readers.getKey(); + HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight(); + + if (baseFileReader == null && logRecordScanner == null) { + // TODO: what do we do if both does not exist? should we throw an exception and let caller do the fallback ? + return Collections.emptyIterator(); } - ) - .map(keyRecordPair -> keyRecordPair.getValue().orElse(null)) + + boolean fullKeys = false; + + Map>> logRecords = + readLogRecords(logRecordScanner, sortedKeyPrefixes, fullKeys, timings); + + List>>> mergedRecords = + readFromBaseAndMergeWithLogRecords(baseFileReader, sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName); + + LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms", + sortedKeyPrefixes.size(), timings)); + + return mergedRecords.stream() + .map(keyRecordPair -> keyRecordPair.getValue().orElse(null)) + .iterator(); + } catch (IOException ioe) { + throw new HoodieIOException("Error merging records from metadata table for " + sortedKeyPrefixes.size() + " key : ", ioe); + } finally { + closeReader(readers); + } + }) .filter(Objects::nonNull); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java index a059b5784556c..ae871e3be0c03 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java @@ -170,7 +170,8 @@ Map, HoodieMetadataColumnStats> getColumnStats(final List

    > getRecordsByKeyPrefixes(List keyPrefixes, - String partitionName); + String partitionName, + boolean shouldLoadInMemory); /** * Get the instant time to which the metadata is synced w.r.t data timeline. diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java index 8ec0eafde3f2c..75e10341db79a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java @@ -319,7 +319,7 @@ private static List readColumnStatsIndexByColumns( .map(colName -> new ColumnIndexID(colName).asBase64EncodedString()).collect(Collectors.toList()); HoodieData> records = - metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS); + metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, false); org.apache.hudi.util.AvroToRowDataConverters.AvroToRowDataConverter converter = AvroToRowDataConverters.createRowConverter((RowType) METADATA_DATA_TYPE.getLogicalType()); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala index b1e03f86ff807..58511f791ed78 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala @@ -17,66 +17,153 @@ package org.apache.hudi -import org.apache.avro.Schema.Parser -import org.apache.avro.generic.GenericRecord -import org.apache.hudi.ColumnStatsIndexSupport.{composeIndexSchema, deserialize, metadataRecordSchemaString, metadataRecordStructType, tryUnpackNonNullVal} +import org.apache.avro.Conversions.DecimalConversion +import org.apache.avro.generic.GenericData +import org.apache.hudi.ColumnStatsIndexSupport._ +import org.apache.hudi.HoodieCatalystUtils.{withPersistedData, withPersistedDataset} import org.apache.hudi.HoodieConversionUtils.toScalaOption -import org.apache.hudi.avro.model.HoodieMetadataRecord +import org.apache.hudi.avro.model._ import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.data.HoodieData import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.view.FileSystemViewStorageConfig import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.common.util.collection import org.apache.hudi.common.util.hash.ColumnIndexID import org.apache.hudi.data.HoodieJavaRDD import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType} +import org.apache.hudi.util.JFunction import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.rdd.RDD +import org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows, createDataFrameFromRDD, createDataFrameFromRows} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, HoodieUnsafeRDDUtils, Row, SparkSession} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.storage.StorageLevel +import java.nio.ByteBuffer import scala.collection.JavaConverters._ import scala.collection.immutable.TreeSet +import scala.collection.mutable.ListBuffer +import scala.collection.parallel.mutable.ParHashMap + +class ColumnStatsIndexSupport(spark: SparkSession, + tableSchema: StructType, + @transient metadataConfig: HoodieMetadataConfig, + @transient metaClient: HoodieTableMetaClient, + allowCaching: Boolean = false) { + + @transient private lazy val engineCtx = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) + @transient private lazy val metadataTable: HoodieTableMetadata = + HoodieTableMetadata.create(engineCtx, metadataConfig, metaClient.getBasePathV2.toString, FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue) + + @transient private lazy val cachedColumnStatsIndexViews: ParHashMap[Seq[String], DataFrame] = ParHashMap() + + // NOTE: Since [[metadataConfig]] is transient this has to be eagerly persisted, before this will be passed + // on to the executor + private val inMemoryProjectionThreshold = metadataConfig.getColumnStatsIndexInMemoryProjectionThreshold + + private lazy val indexedColumns: Set[String] = { + val customIndexedColumns = metadataConfig.getColumnsEnabledForColumnStatsIndex + // Column Stats Index could index either + // - The whole table + // - Only configured columns + if (customIndexedColumns.isEmpty) { + tableSchema.fieldNames.toSet + } else { + customIndexedColumns.asScala.toSet + } + } -/** - * Mixin trait abstracting away heavy-lifting of interactions with Metadata Table's Column Stats Index, - * providing convenient interfaces to read it, transpose, etc - */ -trait ColumnStatsIndexSupport extends SparkAdapterSupport { - - def readColumnStatsIndex(spark: SparkSession, - tableBasePath: String, - metadataConfig: HoodieMetadataConfig, - targetColumns: Seq[String] = Seq.empty): DataFrame = { - val targetColStatsIndexColumns = Seq( - HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME, - HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, - HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, - HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, - HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT) - - val requiredMetadataIndexColumns = - (targetColStatsIndexColumns :+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName => - s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}") - - val metadataTableDF: DataFrame = { - // NOTE: If specific columns have been provided, we can considerably trim down amount of data fetched - // by only fetching Column Stats Index records pertaining to the requested columns. - // Otherwise we fallback to read whole Column Stats Index - if (targetColumns.nonEmpty) { - readColumnStatsIndexForColumnsInternal(spark, targetColumns, metadataConfig, tableBasePath) - } else { - readFullColumnStatsIndexInternal(spark, metadataConfig, tableBasePath) - } + /** + * Returns true in cases when Column Stats Index is built and available as standalone partition + * w/in the Metadata Table + */ + def isIndexAvailable: Boolean = { + checkState(metadataConfig.enabled, "Metadata Table support has to be enabled") + metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) + } + + /** + * Determines whether it would be more optimal to read Column Stats Index a) in-memory of the invoking process, + * or b) executing it on-cluster via Spark [[Dataset]] and [[RDD]] APIs + */ + def shouldReadInMemory(fileIndex: HoodieFileIndex, queryReferencedColumns: Seq[String]): Boolean = { + Option(metadataConfig.getColumnStatsIndexProcessingModeOverride) match { + case Some(mode) => + mode == HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY + case None => + fileIndex.getFileSlicesCount * queryReferencedColumns.length < inMemoryProjectionThreshold + } + } + + /** + * Loads view of the Column Stats Index in a transposed format where single row coalesces every columns' + * statistics for a single file, returning it as [[DataFrame]] + * + * Please check out scala-doc of the [[transpose]] method explaining this view in more details + */ + def loadTransposed[T](targetColumns: Seq[String], shouldReadInMemory: Boolean)(block: DataFrame => T): T = { + cachedColumnStatsIndexViews.get(targetColumns) match { + case Some(cachedDF) => + block(cachedDF) + + case None => + val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = + loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory) + + withPersistedData(colStatsRecords, StorageLevel.MEMORY_ONLY) { + val (transposedRows, indexSchema) = transpose(colStatsRecords, targetColumns) + val df = if (shouldReadInMemory) { + // NOTE: This will instantiate a [[Dataset]] backed by [[LocalRelation]] holding all of the rows + // of the transposed table in memory, facilitating execution of the subsequently chained operations + // on it locally (on the driver; all such operations are actually going to be performed by Spark's + // Optimizer) + createDataFrameFromRows(spark, transposedRows.collectAsList().asScala, indexSchema) + } else { + val rdd = HoodieJavaRDD.getJavaRDD(transposedRows) + spark.createDataFrame(rdd, indexSchema) + } + + if (allowCaching) { + cachedColumnStatsIndexViews.put(targetColumns, df) + // NOTE: Instead of collecting the rows from the index and hold them in memory, we instead rely + // on Spark as (potentially distributed) cache managing data lifecycle, while we simply keep + // the referenced to persisted [[DataFrame]] instance + df.persist(StorageLevel.MEMORY_ONLY) + + block(df) + } else { + withPersistedDataset(df) { + block(df) + } + } + } } + } - val colStatsDF = metadataTableDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull) - .select(requiredMetadataIndexColumns.map(col): _*) + /** + * Loads a view of the Column Stats Index in a raw format, returning it as [[DataFrame]] + * + * Please check out scala-doc of the [[transpose]] method explaining this view in more details + */ + def load(targetColumns: Seq[String] = Seq.empty, shouldReadInMemory: Boolean = false): DataFrame = { + // NOTE: If specific columns have been provided, we can considerably trim down amount of data fetched + // by only fetching Column Stats Index records pertaining to the requested columns. + // Otherwise we fallback to read whole Column Stats Index + if (targetColumns.nonEmpty) { + loadColumnStatsIndexForColumnsInternal(targetColumns, shouldReadInMemory) + } else { + loadFullColumnStatsIndexInternal() + } + } - colStatsDF + def invalidateCaches(): Unit = { + cachedColumnStatsIndexViews.foreach { case (_, df) => df.unpersist() } + cachedColumnStatsIndexViews.clear() } /** @@ -112,154 +199,184 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport { * column references from the filtering expressions, and only transpose records corresponding to the * columns referenced in those * - * @param spark Spark session ref - * @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table + * @param colStatsRecords [[HoodieData[HoodieMetadataColumnStats]]] bearing raw Column Stats Index records * @param queryColumns target columns to be included into the final table - * @param tableSchema schema of the source data table * @return reshaped table according to the format outlined above */ - def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, queryColumns: Seq[String], tableSchema: StructType): DataFrame = { - val colStatsSchema = colStatsDF.schema - val colStatsSchemaOrdinalsMap = colStatsSchema.fields.zipWithIndex.map({ - case (field, ordinal) => (field.name, ordinal) - }).toMap - + private def transpose(colStatsRecords: HoodieData[HoodieMetadataColumnStats], queryColumns: Seq[String]): (HoodieData[Row], StructType) = { val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap - - val colNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME) - val minValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE) - val maxValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE) - val fileNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) - val nullCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT) - val valueCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT) - - // NOTE: We have to collect list of indexed columns to make sure we properly align the rows - // w/in the transposed dataset: since some files might not have all of the columns indexed - // either due to the Column Stats Index config changes, schema evolution, etc, we have - // to make sure that all of the rows w/in transposed data-frame are properly padded (with null - // values) for such file-column combinations - val indexedColumns: Seq[String] = colStatsDF.rdd.map(row => row.getString(colNameOrdinal)).distinct().collect() - // NOTE: We're sorting the columns to make sure final index schema matches layout // of the transposed table - val sortedTargetColumns = TreeSet(queryColumns.intersect(indexedColumns): _*) - - val transposedRDD = colStatsDF.rdd - .filter(row => sortedTargetColumns.contains(row.getString(colNameOrdinal))) - .map { row => - if (row.isNullAt(minValueOrdinal) && row.isNullAt(maxValueOrdinal)) { + val sortedTargetColumnsSet = TreeSet(queryColumns:_*) + val sortedTargetColumns = sortedTargetColumnsSet.toSeq + + // NOTE: This is a trick to avoid pulling all of [[ColumnStatsIndexSupport]] object into the lambdas' + // closures below + val indexedColumns = this.indexedColumns + + // Here we perform complex transformation which requires us to modify the layout of the rows + // of the dataset, and therefore we rely on low-level RDD API to avoid incurring encoding/decoding + // penalty of the [[Dataset]], since it's required to adhere to its schema at all times, while + // RDDs are not; + val transposedRows: HoodieData[Row] = colStatsRecords + // NOTE: Explicit conversion is required for Scala 2.11 + .filter(JFunction.toJavaSerializableFunction(r => sortedTargetColumnsSet.contains(r.getColumnName))) + .mapToPair(JFunction.toJavaSerializablePairFunction(r => { + if (r.getMinValue == null && r.getMaxValue == null) { // Corresponding row could be null in either of the 2 cases // - Column contains only null values (in that case both min/max have to be nulls) // - This is a stubbed Column Stats record (used as a tombstone) - row + collection.Pair.of(r.getFileName, r) } else { - val minValueStruct = row.getAs[Row](minValueOrdinal) - val maxValueStruct = row.getAs[Row](maxValueOrdinal) + val minValueWrapper = r.getMinValue + val maxValueWrapper = r.getMaxValue - checkState(minValueStruct != null && maxValueStruct != null, "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null") + checkState(minValueWrapper != null && maxValueWrapper != null, "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null") - val colName = row.getString(colNameOrdinal) + val colName = r.getColumnName val colType = tableSchemaFieldMap(colName).dataType - val (minValue, _) = tryUnpackNonNullVal(minValueStruct) - val (maxValue, _) = tryUnpackNonNullVal(maxValueStruct) - val rowValsSeq = row.toSeq.toArray + val minValue = deserialize(tryUnpackValueWrapper(minValueWrapper), colType) + val maxValue = deserialize(tryUnpackValueWrapper(maxValueWrapper), colType) + // Update min-/max-value structs w/ unwrapped values in-place - rowValsSeq(minValueOrdinal) = deserialize(minValue, colType) - rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType) + r.setMinValue(minValue) + r.setMaxValue(maxValue) - Row(rowValsSeq: _*) + collection.Pair.of(r.getFileName, r) } - } - .groupBy(r => r.getString(fileNameOrdinal)) - .foldByKey(Seq[Row]()) { - case (_, columnRowsSeq) => - // Rows seq is always non-empty (otherwise it won't be grouped into) - val fileName = columnRowsSeq.head.get(fileNameOrdinal) - val valueCount = columnRowsSeq.head.get(valueCountOrdinal) - - // To properly align individual rows (corresponding to a file) w/in the transposed projection, we need - // to align existing column-stats for individual file with the list of expected ones for the - // whole transposed projection (a superset of all files) - val columnRowsMap = columnRowsSeq.map(row => (row.getString(colNameOrdinal), row)).toMap - val alignedColumnRowsSeq = sortedTargetColumns.toSeq.map(columnRowsMap.get) - - val coalescedRowValuesSeq = - alignedColumnRowsSeq.foldLeft(Seq[Any](fileName, valueCount)) { - case (acc, opt) => - opt match { - case Some(columnStatsRow) => - acc ++ Seq(minValueOrdinal, maxValueOrdinal, nullCountOrdinal).map(ord => columnStatsRow.get(ord)) - case None => - // NOTE: Since we're assuming missing column to essentially contain exclusively - // null values, we set null-count to be equal to value-count (this behavior is - // consistent with reading non-existent columns from Parquet) - acc ++ Seq(null, null, valueCount) - } - } - - Seq(Row(coalescedRowValuesSeq:_*)) - } - .values - .flatMap(it => it) + })) + .groupByKey() + .map(JFunction.toJavaSerializableFunction(p => { + val columnRecordsSeq: Seq[HoodieMetadataColumnStats] = p.getValue.asScala.toSeq + val fileName: String = p.getKey + val valueCount: Long = columnRecordsSeq.head.getValueCount + + // To properly align individual rows (corresponding to a file) w/in the transposed projection, we need + // to align existing column-stats for individual file with the list of expected ones for the + // whole transposed projection (a superset of all files) + val columnRecordsMap = columnRecordsSeq.map(r => (r.getColumnName, r)).toMap + val alignedColStatRecordsSeq = sortedTargetColumns.map(columnRecordsMap.get) + + val coalescedRowValuesSeq = + alignedColStatRecordsSeq.foldLeft(ListBuffer[Any](fileName, valueCount)) { + case (acc, opt) => + opt match { + case Some(colStatRecord) => + acc ++= Seq(colStatRecord.getMinValue, colStatRecord.getMaxValue, colStatRecord.getNullCount) + case None => + // NOTE: This could occur in either of the following cases: + // 1. Column is not indexed in Column Stats Index: in this case we won't be returning + // any statistics for such column (ie all stats will be null) + // 2. Particular file does not have this particular column (which is indexed by Column Stats Index): + // in this case we're assuming missing column to essentially contain exclusively + // null values, we set min/max values as null and null-count to be equal to value-count (this + // behavior is consistent with reading non-existent columns from Parquet) + // + // This is a way to determine current column's index without explicit iteration (we're adding 3 stats / column) + val idx = acc.length / 3 + val colName = sortedTargetColumns(idx) + val indexed = indexedColumns.contains(colName) + + val nullCount = if (indexed) valueCount else null + + acc ++= Seq(null, null, nullCount) + } + } + + Row(coalescedRowValuesSeq:_*) + })) // NOTE: It's crucial to maintain appropriate ordering of the columns // matching table layout: hence, we cherry-pick individual columns // instead of simply filtering in the ones we're interested in the schema - val indexSchema = composeIndexSchema(sortedTargetColumns.toSeq, tableSchema) + val indexSchema = composeIndexSchema(sortedTargetColumns, tableSchema) + (transposedRows, indexSchema) + } - spark.createDataFrame(transposedRDD, indexSchema) + private def loadColumnStatsIndexForColumnsInternal(targetColumns: Seq[String], shouldReadInMemory: Boolean): DataFrame = { + val colStatsDF = { + val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory) + // NOTE: Explicit conversion is required for Scala 2.11 + val catalystRows: HoodieData[InternalRow] = colStatsRecords.mapPartitions(JFunction.toJavaSerializableFunction(it => { + val converter = AvroConversionUtils.createAvroToInternalRowConverter(HoodieMetadataColumnStats.SCHEMA$, columnStatsRecordStructType) + it.asScala.map(r => converter(r).orNull).asJava + }), false) + + if (shouldReadInMemory) { + // NOTE: This will instantiate a [[Dataset]] backed by [[LocalRelation]] holding all of the rows + // of the transposed table in memory, facilitating execution of the subsequently chained operations + // on it locally (on the driver; all such operations are actually going to be performed by Spark's + // Optimizer) + createDataFrameFromInternalRows(spark, catalystRows.collectAsList().asScala, columnStatsRecordStructType) + } else { + createDataFrameFromRDD(spark, HoodieJavaRDD.getJavaRDD(catalystRows), columnStatsRecordStructType) + } + } + + colStatsDF.select(targetColumnStatsIndexColumns.map(col): _*) } - private def readFullColumnStatsIndexInternal(spark: SparkSession, metadataConfig: HoodieMetadataConfig, tableBasePath: String): DataFrame = { - val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(tableBasePath) + private def loadColumnStatsIndexRecords(targetColumns: Seq[String], shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = { + // Read Metadata Table's Column Stats Index records into [[HoodieData]] container by + // - Fetching the records from CSI by key-prefixes (encoded column names) + // - Extracting [[HoodieMetadataColumnStats]] records + // - Filtering out nulls + checkState(targetColumns.nonEmpty) + + // TODO encoding should be done internally w/in HoodieBackedTableMetadata + val encodedTargetColumnNames = targetColumns.map(colName => new ColumnIndexID(colName).asBase64EncodedString()) + + val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] = + metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory) + + val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] = + // NOTE: Explicit conversion is required for Scala 2.11 + metadataRecords.map(JFunction.toJavaSerializableFunction(record => { + toScalaOption(record.getData.getInsertValue(null, null)) + .map(metadataRecord => metadataRecord.asInstanceOf[HoodieMetadataRecord].getColumnStatsMetadata) + .orNull + })) + .filter(JFunction.toJavaSerializableFunction(columnStatsRecord => columnStatsRecord != null)) + + columnStatsRecords + } + + private def loadFullColumnStatsIndexInternal(): DataFrame = { + val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePathV2.toString) // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]] - spark.read.format("org.apache.hudi") + val colStatsDF = spark.read.format("org.apache.hudi") .options(metadataConfig.getProps.asScala) .load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}") - } - - private def readColumnStatsIndexForColumnsInternal(spark: SparkSession, targetColumns: Seq[String], metadataConfig: HoodieMetadataConfig, tableBasePath: String) = { - val ctx = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) - // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]] by - // - Fetching the records from CSI by key-prefixes (encoded column names) - // - Deserializing fetched records into [[InternalRow]]s - // - Composing [[DataFrame]] - val metadataTableDF = { - val metadataTable = HoodieTableMetadata.create(ctx, metadataConfig, tableBasePath, FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue) - - // TODO encoding should be done internally w/in HoodieBackedTableMetadata - val encodedTargetColumnNames = targetColumns.map(colName => new ColumnIndexID(colName).asBase64EncodedString()) - - val recordsRDD: RDD[HoodieRecord[HoodieMetadataPayload]] = - HoodieJavaRDD.getJavaRDD( - metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) - ) - - val catalystRowsRDD: RDD[InternalRow] = recordsRDD.mapPartitions { it => - val metadataRecordSchema = new Parser().parse(metadataRecordSchemaString) - val converter = AvroConversionUtils.createAvroToInternalRowConverter(metadataRecordSchema, metadataRecordStructType) - - it.map { record => - // schema and props are ignored for generating metadata record from the payload - // instead, the underlying file system, or bloom filter, or columns stats metadata (part of payload) are directly used - toScalaOption(record.getData.getInsertValue(null, null)) - .flatMap(avroRecord => converter(avroRecord.asInstanceOf[GenericRecord])) - .orNull - } - } + val requiredIndexColumns = + targetColumnStatsIndexColumns.map(colName => + col(s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}")) - HoodieUnsafeRDDUtils.createDataFrame(spark, catalystRowsRDD, metadataRecordStructType) - } - metadataTableDF + colStatsDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull) + .select(requiredIndexColumns: _*) } } object ColumnStatsIndexSupport { - private val metadataRecordSchemaString: String = HoodieMetadataRecord.SCHEMA$.toString - private val metadataRecordStructType: StructType = AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataRecord.SCHEMA$) + private val expectedAvroSchemaValues = Set("BooleanWrapper", "IntWrapper", "LongWrapper", "FloatWrapper", "DoubleWrapper", + "BytesWrapper", "StringWrapper", "DateWrapper", "DecimalWrapper", "TimeMicrosWrapper", "TimestampMicrosWrapper") + + /** + * Target Column Stats Index columns which internally are mapped onto fields of the correspoding + * Column Stats record payload ([[HoodieMetadataColumnStats]]) persisted w/in Metadata Table + */ + private val targetColumnStatsIndexColumns = Seq( + HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME, + HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, + HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, + HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, + HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT, + HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME + ) + + private val columnStatsRecordStructType: StructType = AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataColumnStats.SCHEMA$) /** * @VisibleForTesting @@ -300,13 +417,28 @@ object ColumnStatsIndexSupport { @inline private def composeColumnStatStructType(col: String, statName: String, dataType: DataType) = StructField(formatColName(col, statName), dataType, nullable = true, Metadata.empty) - private def tryUnpackNonNullVal(statStruct: Row): (Any, Int) = - statStruct.toSeq.zipWithIndex - .find(_._1 != null) - // NOTE: First non-null value will be a wrapper (converted into Row), bearing a single - // value - .map { case (value, ord) => (value.asInstanceOf[Row].get(0), ord)} - .getOrElse((null, -1)) + private def tryUnpackValueWrapper(valueWrapper: AnyRef): Any = { + valueWrapper match { + case w: BooleanWrapper => w.getValue + case w: IntWrapper => w.getValue + case w: LongWrapper => w.getValue + case w: FloatWrapper => w.getValue + case w: DoubleWrapper => w.getValue + case w: BytesWrapper => w.getValue + case w: StringWrapper => w.getValue + case w: DateWrapper => w.getValue + case w: DecimalWrapper => w.getValue + case w: TimeMicrosWrapper => w.getValue + case w: TimestampMicrosWrapper => w.getValue + + case r: GenericData.Record if expectedAvroSchemaValues.contains(r.getSchema.getName) => + r.get("value") + + case _ => throw new UnsupportedOperationException(s"Not recognized value wrapper type (${valueWrapper.getClass.getSimpleName})") + } + } + + val decConv = new DecimalConversion() private def deserialize(value: Any, dataType: DataType): Any = { dataType match { @@ -315,12 +447,37 @@ object ColumnStatsIndexSupport { // here we have to decode those back into corresponding logical representation. case TimestampType => DateTimeUtils.toJavaTimestamp(value.asInstanceOf[Long]) case DateType => DateTimeUtils.toJavaDate(value.asInstanceOf[Int]) - + // Standard types + case StringType => value + case BooleanType => value + // Numeric types + case FloatType => value + case DoubleType => value + case LongType => value + case IntegerType => value // NOTE: All integral types of size less than Int are encoded as Ints in MT case ShortType => value.asInstanceOf[Int].toShort case ByteType => value.asInstanceOf[Int].toByte - case _ => value + // TODO fix + case _: DecimalType => + value match { + case buffer: ByteBuffer => + val logicalType = DecimalWrapper.SCHEMA$.getField("value").schema().getLogicalType + decConv.fromBytes(buffer, null, logicalType) + case _ => value + } + case BinaryType => + value match { + case b: ByteBuffer => + val bytes = new Array[Byte](b.remaining) + b.get(bytes) + bytes + case other => other + } + + case _ => + throw new UnsupportedOperationException(s"Data type for the statistic value is not recognized $dataType") } } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCatalystUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCatalystUtils.scala new file mode 100644 index 0000000000000..0f41dc1fff3f1 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCatalystUtils.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.data.HoodieData +import org.apache.spark.sql.Dataset +import org.apache.spark.storage.StorageLevel +import org.apache.spark.storage.StorageLevel._ + +object HoodieCatalystUtils extends SparkAdapterSupport { + + /** + * Executes provided function while keeping provided [[Dataset]] instance persisted for the + * duration of the execution + * + * @param df target [[Dataset]] to be persisted + * @param level desired [[StorageLevel]] of the persistence + * @param f target function to be executed while [[Dataset]] is kept persisted + * @tparam T return value of the target function + * @return execution outcome of the [[f]] function + */ + def withPersistedDataset[T](df: Dataset[_], level: StorageLevel = MEMORY_AND_DISK)(f: => T): T = { + df.persist(level) + try { + f + } finally { + df.unpersist() + } + } + + /** + * Executes provided function while keeping provided [[HoodieData]] instance persisted for the + * duration of the execution + * + * @param data target [[Dataset]] to be persisted + * @param level desired [[StorageLevel]] of the persistence + * @param f target function to be executed while [[Dataset]] is kept persisted + * @tparam T return value of the target function + * @return execution outcome of the [[f]] function + */ + def withPersistedData[T](data: HoodieData[_], level: StorageLevel = MEMORY_AND_DISK)(f: => T): T = { + data.persist(sparkAdapter.convertStorageLevelToString(level)) + try { + f + } finally { + data.unpersist() + } + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala index dad7c1765020c..09f1fac2c874d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala @@ -27,11 +27,10 @@ import org.apache.hudi.keygen.{BuiltinKeyGenerator, SparkKeyGeneratorInterface} import org.apache.hudi.table.BulkInsertPartitioner import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.HoodieUnsafeRDDUtils.createDataFrame import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.{StringType, StructField, StructType} -import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeRDDUtils, Row} +import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeUtils, Row} import org.apache.spark.unsafe.types.UTF8String import scala.collection.JavaConverters.asScalaBufferConverter @@ -92,9 +91,9 @@ object HoodieDatasetBulkInsertHelper extends Logging { val updatedDF = if (populateMetaFields && config.shouldCombineBeforeInsert) { val dedupedRdd = dedupeRows(prependedRdd, updatedSchema, config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config)) - HoodieUnsafeRDDUtils.createDataFrame(df.sparkSession, dedupedRdd, updatedSchema) + HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, updatedSchema) } else { - HoodieUnsafeRDDUtils.createDataFrame(df.sparkSession, prependedRdd, updatedSchema) + HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, prependedRdd, updatedSchema) } val trimmedDF = if (shouldDropPartitionColumns) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetUtils.scala deleted file mode 100644 index a6c689610b76d..0000000000000 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetUtils.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi - -import org.apache.spark.sql.DataFrame -import org.apache.spark.storage.StorageLevel -import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK - -object HoodieDatasetUtils { - - /** - * Executes provided function while keeping provided [[DataFrame]] instance persisted for the - * duration of the execution - * - * @param df target [[DataFrame]] to be persisted - * @param level desired [[StorageLevel]] of the persistence - * @param f target function to be executed while [[DataFrame]] is kept persisted - * @tparam T return value of the target function - * @return execution outcome of the [[f]] function - */ - def withPersistence[T](df: DataFrame, level: StorageLevel = MEMORY_AND_DISK)(f: => T): T = { - df.persist(level) - try { - f - } finally { - df.unpersist() - } - } -} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index bd15cb250fe95..4e158aaa86796 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -18,7 +18,6 @@ package org.apache.hudi import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hudi.HoodieDatasetUtils.withPersistence import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, collectReferencedColumns, getConfigProperties} import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.table.HoodieTableMetaClient @@ -26,7 +25,7 @@ import org.apache.hudi.common.util.StringUtils import org.apache.hudi.exception.HoodieException import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} -import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadataUtil} +import org.apache.hudi.metadata.HoodieMetadataPayload import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal} @@ -35,7 +34,7 @@ import org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndex import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.sql.{Column, DataFrame, SparkSession} +import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.unsafe.types.UTF8String import java.text.SimpleDateFormat @@ -80,8 +79,9 @@ case class HoodieFileIndex(spark: SparkSession, specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant), fileStatusCache = fileStatusCache ) - with FileIndex - with ColumnStatsIndexSupport { + with FileIndex { + + @transient private lazy val columnStatsIndex = new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient) override def rootPaths: Seq[Path] = queryPaths.asScala @@ -95,8 +95,9 @@ case class HoodieFileIndex(spark: SparkSession, */ def allFiles: Seq[FileStatus] = { cachedAllInputFileSlices.values.asScala.flatMap(_.asScala) - .filter(_.getBaseFile.isPresent) - .map(_.getBaseFile.get().getFileStatus) + .map(fs => fs.getBaseFile.orElse(null)) + .filter(_ != null) + .map(_.getFileStatus) .toSeq } @@ -196,64 +197,63 @@ case class HoodieFileIndex(spark: SparkSession, // nothing CSI in particular could be applied for) lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema) - if (!isMetadataTableEnabled || !isColumnStatsIndexAvailable || !isDataSkippingEnabled) { + if (!isMetadataTableEnabled || !isDataSkippingEnabled || !columnStatsIndex.isIndexAvailable) { validateConfig() Option.empty } else if (queryFilters.isEmpty || queryReferencedColumns.isEmpty) { Option.empty } else { - val colStatsDF: DataFrame = readColumnStatsIndex(spark, basePath, metadataConfig, queryReferencedColumns) - - // Persist DF to avoid re-computing column statistics unraveling - withPersistence(colStatsDF) { - val transposedColStatsDF: DataFrame = transposeColumnStatsIndex(spark, colStatsDF, queryReferencedColumns, schema) - - // Persist DF to avoid re-computing column statistics unraveling - withPersistence(transposedColStatsDF) { - val indexSchema = transposedColStatsDF.schema - val indexFilter = - queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema)) - .reduce(And) - - val allIndexedFileNames = - transposedColStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) - .collect() - .map(_.getString(0)) - .toSet - - val prunedCandidateFileNames = - transposedColStatsDF.where(new Column(indexFilter)) - .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) - .collect() - .map(_.getString(0)) - .toSet - - // NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every - // base-file: since it's bound to clustering, which could occur asynchronously - // at arbitrary point in time, and is not likely to be touching all of the base files. - // - // To close that gap, we manually compute the difference b/w all indexed (by col-stats-index) - // files and all outstanding base-files, and make sure that all base files not - // represented w/in the index are included in the output of this method - val notIndexedFileNames = lookupFileNamesMissingFromIndex(allIndexedFileNames) - - Some(prunedCandidateFileNames ++ notIndexedFileNames) - } + // NOTE: Since executing on-cluster via Spark API has its own non-trivial amount of overhead, + // it's most often preferential to fetch Column Stats Index w/in the same process (usually driver), + // w/o resorting to on-cluster execution. + // For that we use a simple-heuristic to determine whether we should read and process CSI in-memory or + // on-cluster: total number of rows of the expected projected portion of the index has to be below the + // threshold (of 100k records) + val shouldReadInMemory = columnStatsIndex.shouldReadInMemory(this, queryReferencedColumns) + + columnStatsIndex.loadTransposed(queryReferencedColumns, shouldReadInMemory) { transposedColStatsDF => + val indexSchema = transposedColStatsDF.schema + val indexFilter = + queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema)) + .reduce(And) + + val allIndexedFileNames = + transposedColStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) + .collect() + .map(_.getString(0)) + .toSet + + val prunedCandidateFileNames = + transposedColStatsDF.where(new Column(indexFilter)) + .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) + .collect() + .map(_.getString(0)) + .toSet + + // NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every + // base-file: since it's bound to clustering, which could occur asynchronously + // at arbitrary point in time, and is not likely to be touching all of the base files. + // + // To close that gap, we manually compute the difference b/w all indexed (by col-stats-index) + // files and all outstanding base-files, and make sure that all base files not + // represented w/in the index are included in the output of this method + val notIndexedFileNames = lookupFileNamesMissingFromIndex(allIndexedFileNames) + + Some(prunedCandidateFileNames ++ notIndexedFileNames) } } } - override def refresh(): Unit = super.refresh() + override def refresh(): Unit = { + super.refresh() + columnStatsIndex.invalidateCaches() + } override def inputFiles: Array[String] = allFiles.map(_.getPath.toString).toArray override def sizeInBytes: Long = cachedFileSize - private def isColumnStatsIndexAvailable = - metaClient.getTableConfig.getMetadataPartitions - .contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) - private def isDataSkippingEnabled: Boolean = options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeRDDUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala similarity index 50% rename from hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeRDDUtils.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala index 8995701d5fc5c..bd7f2f54560e8 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeRDDUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala @@ -21,17 +21,54 @@ package org.apache.spark.sql import org.apache.hudi.HoodieUnsafeRDD import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.types.StructType import org.apache.spark.util.MutablePair /** * Suite of utilities helping in handling instances of [[HoodieUnsafeRDD]] */ -object HoodieUnsafeRDDUtils { +object HoodieUnsafeUtils { - // TODO scala-doc - def createDataFrame(spark: SparkSession, rdd: RDD[InternalRow], structType: StructType): DataFrame = - spark.internalCreateDataFrame(rdd, structType) + /** + * Creates [[DataFrame]] from the in-memory [[Seq]] of [[Row]]s with provided [[schema]] + * + * NOTE: [[DataFrame]] is based on [[LocalRelation]], entailing that most computations with it + * will be executed by Spark locally + * + * @param spark spark's session + * @param rows collection of rows to base [[DataFrame]] on + * @param schema target [[DataFrame]]'s schema + * @return + */ + def createDataFrameFromRows(spark: SparkSession, rows: Seq[Row], schema: StructType): DataFrame = + Dataset.ofRows(spark, LocalRelation.fromExternalRows(schema.toAttributes, rows)) + + /** + * Creates [[DataFrame]] from the in-memory [[Seq]] of [[InternalRow]]s with provided [[schema]] + * + * NOTE: [[DataFrame]] is based on [[LocalRelation]], entailing that most computations with it + * will be executed by Spark locally + * + * @param spark spark's session + * @param rows collection of rows to base [[DataFrame]] on + * @param schema target [[DataFrame]]'s schema + * @return + */ + def createDataFrameFromInternalRows(spark: SparkSession, rows: Seq[InternalRow], schema: StructType): DataFrame = + Dataset.ofRows(spark, LocalRelation(schema.toAttributes, rows)) + + + /** + * Creates [[DataFrame]] from the [[RDD]] of [[Row]]s with provided [[schema]] + * + * @param spark spark's session + * @param rdd RDD w/ [[Row]]s to base [[DataFrame]] on + * @param schema target [[DataFrame]]'s schema + * @return + */ + def createDataFrameFromRDD(spark: SparkSession, rdd: RDD[InternalRow], schema: StructType): DataFrame = + spark.internalCreateDataFrame(rdd, schema) /** * Canonical implementation of the [[RDD#collect]] for [[HoodieUnsafeRDD]], returning a properly diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index 1d4dbfb1eace7..19027a47bfabc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -369,8 +369,9 @@ class TestHoodieFileIndex extends HoodieClientTestBase { metaClient = HoodieTableMetaClient.reload(metaClient) case class TestCase(enableMetadata: Boolean, - enableColumnStats: Boolean, - enableDataSkipping: Boolean) + enableColumnStats: Boolean, + enableDataSkipping: Boolean, + columnStatsProcessingModeOverride: String = null) val testCases: Seq[TestCase] = TestCase(enableMetadata = false, enableColumnStats = false, enableDataSkipping = false) :: @@ -378,6 +379,8 @@ class TestHoodieFileIndex extends HoodieClientTestBase { TestCase(enableMetadata = true, enableColumnStats = false, enableDataSkipping = true) :: TestCase(enableMetadata = false, enableColumnStats = true, enableDataSkipping = true) :: TestCase(enableMetadata = true, enableColumnStats = true, enableDataSkipping = true) :: + TestCase(enableMetadata = true, enableColumnStats = true, enableDataSkipping = true, columnStatsProcessingModeOverride = HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY) :: + TestCase(enableMetadata = true, enableColumnStats = true, enableDataSkipping = true, columnStatsProcessingModeOverride = HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE) :: Nil for (testCase <- testCases) { @@ -391,7 +394,8 @@ class TestHoodieFileIndex extends HoodieClientTestBase { val props = Map[String, String]( "path" -> basePath, QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL, - DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> testCase.enableDataSkipping.toString + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> testCase.enableDataSkipping.toString, + HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE.key -> testCase.columnStatsProcessingModeOverride ) ++ readMetadataOpts val fileIndex = HoodieFileIndex(spark, metaClient, Option.empty, props, NoopCache) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala index b982b1851c326..822d2051cb024 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala @@ -31,12 +31,12 @@ import org.apache.hudi.functional.TestColumnStatsIndex.ColumnStatsTestCase import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions} import org.apache.spark.sql._ -import org.apache.spark.sql.functions.typedLit +import org.apache.spark.sql.functions.{col, lit, typedLit} import org.apache.spark.sql.types._ import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue} import org.junit.jupiter.api._ import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{Arguments, MethodSource} +import org.junit.jupiter.params.provider.{Arguments, ArgumentsSource, MethodSource, ValueSource} import java.math.BigInteger import java.sql.{Date, Timestamp} @@ -44,7 +44,7 @@ import scala.collection.JavaConverters._ import scala.util.Random @Tag("functional") -class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSupport { +class TestColumnStatsIndex extends HoodieClientTestBase { var spark: SparkSession = _ val sourceTableSchema = @@ -119,35 +119,31 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup .fromProperties(toProperties(metadataOpts)) .build() - val requestedColumns: Seq[String] = { - // Providing empty seq of columns to [[readColumnStatsIndex]] will lead to the whole - // MT to be read, and subsequently filtered - if (testCase.readFullMetadataTable) Seq.empty - else sourceTableSchema.fieldNames - } + val requestedColumns: Seq[String] = sourceTableSchema.fieldNames - val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns) - val transposedColStatsDF = transposeColumnStatsIndex(spark, colStatsDF, sourceTableSchema.fieldNames, sourceTableSchema) + val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient) val expectedColStatsSchema = composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema) - // Match against expected column stats table - val expectedColStatsIndexTableDf = - spark.read - .schema(expectedColStatsSchema) - .json(getClass.getClassLoader.getResource("index/colstats/column-stats-index-table.json").toString) + columnStatsIndex.loadTransposed(requestedColumns, testCase.shouldReadInMemory) { transposedColStatsDF => + // Match against expected column stats table + val expectedColStatsIndexTableDf = + spark.read + .schema(expectedColStatsSchema) + .json(getClass.getClassLoader.getResource("index/colstats/column-stats-index-table.json").toString) - assertEquals(expectedColStatsIndexTableDf.schema, transposedColStatsDF.schema) - // NOTE: We have to drop the `fileName` column as it contains semi-random components - // that we can't control in this test. Nevertheless, since we manually verify composition of the - // ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue - assertEquals(asJson(sort(expectedColStatsIndexTableDf)), asJson(sort(transposedColStatsDF.drop("fileName")))) + assertEquals(expectedColStatsIndexTableDf.schema, transposedColStatsDF.schema) + // NOTE: We have to drop the `fileName` column as it contains semi-random components + // that we can't control in this test. Nevertheless, since we manually verify composition of the + // ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue + assertEquals(asJson(sort(expectedColStatsIndexTableDf)), asJson(sort(transposedColStatsDF.drop("fileName")))) - // Collect Column Stats manually (reading individual Parquet files) - val manualColStatsTableDF = - buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames, expectedColStatsSchema) + // Collect Column Stats manually (reading individual Parquet files) + val manualColStatsTableDF = + buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames, sourceTableSchema.fieldNames, expectedColStatsSchema) - assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(transposedColStatsDF))) + assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(transposedColStatsDF))) + } // do an upsert and validate val updateJSONTablePath = getClass.getClassLoader.getResource("index/colstats/another-input-table-json").toString @@ -166,26 +162,28 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup metaClient = HoodieTableMetaClient.reload(metaClient) - val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns) - val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, updatedColStatsDF, sourceTableSchema.fieldNames, sourceTableSchema) + val updatedColumnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient) - val expectedColStatsIndexUpdatedDF = - spark.read - .schema(expectedColStatsSchema) - .json(getClass.getClassLoader.getResource("index/colstats/updated-column-stats-index-table.json").toString) + updatedColumnStatsIndex.loadTransposed(requestedColumns, testCase.shouldReadInMemory) { transposedUpdatedColStatsDF => + val expectedColStatsIndexUpdatedDF = + spark.read + .schema(expectedColStatsSchema) + .json(getClass.getClassLoader.getResource("index/colstats/updated-column-stats-index-table.json").toString) - assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema) - assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName")))) + assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema) + assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName")))) - // Collect Column Stats manually (reading individual Parquet files) - val manualUpdatedColStatsTableDF = - buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames, expectedColStatsSchema) + // Collect Column Stats manually (reading individual Parquet files) + val manualUpdatedColStatsTableDF = + buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames, sourceTableSchema.fieldNames, expectedColStatsSchema) - assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF))) + assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF))) + } } - @Test - def testMetadataColumnStatsIndexPartialProjection(): Unit = { + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testMetadataColumnStatsIndexPartialProjection(shouldReadInMemory: Boolean): Unit = { val targetColumnsToIndex = Seq("c1", "c2", "c3") val metadataOpts = Map( @@ -235,11 +233,11 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup // These are NOT indexed val requestedColumns = Seq("c4") - val emptyColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns) - val emptyTransposedColStatsDF = transposeColumnStatsIndex(spark, emptyColStatsDF, requestedColumns, sourceTableSchema) + val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient) - assertEquals(0, emptyColStatsDF.collect().length) - assertEquals(0, emptyTransposedColStatsDF.collect().length) + columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) { emptyTransposedColStatsDF => + assertEquals(0, emptyTransposedColStatsDF.collect().length) + } } //////////////////////////////////////////////////////////////////////// @@ -252,29 +250,27 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup // We have to include "c1", since we sort the expected outputs by this column val requestedColumns = Seq("c4", "c1") - val partialColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns) - val partialTransposedColStatsDF = transposeColumnStatsIndex(spark, partialColStatsDF, requestedColumns, sourceTableSchema) - - val targetIndexedColumns = targetColumnsToIndex.intersect(requestedColumns) - val expectedColStatsSchema = composeIndexSchema(targetIndexedColumns, sourceTableSchema) - + val expectedColStatsSchema = composeIndexSchema(requestedColumns.sorted, sourceTableSchema) // Match against expected column stats table val expectedColStatsIndexTableDf = spark.read .schema(expectedColStatsSchema) .json(getClass.getClassLoader.getResource("index/colstats/partial-column-stats-index-table.json").toString) - assertEquals(expectedColStatsIndexTableDf.schema, partialTransposedColStatsDF.schema) - // NOTE: We have to drop the `fileName` column as it contains semi-random components - // that we can't control in this test. Nevertheless, since we manually verify composition of the - // ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue - assertEquals(asJson(sort(expectedColStatsIndexTableDf)), asJson(sort(partialTransposedColStatsDF.drop("fileName")))) - // Collect Column Stats manually (reading individual Parquet files) val manualColStatsTableDF = - buildColumnStatsTableManually(basePath, targetIndexedColumns, expectedColStatsSchema) + buildColumnStatsTableManually(basePath, requestedColumns, targetColumnsToIndex, expectedColStatsSchema) + + val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient) - assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(partialTransposedColStatsDF))) + columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) { partialTransposedColStatsDF => + assertEquals(expectedColStatsIndexTableDf.schema, partialTransposedColStatsDF.schema) + // NOTE: We have to drop the `fileName` column as it contains semi-random components + // that we can't control in this test. Nevertheless, since we manually verify composition of the + // ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue + assertEquals(asJson(sort(expectedColStatsIndexTableDf)), asJson(sort(partialTransposedColStatsDF.drop("fileName")))) + assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(partialTransposedColStatsDF))) + } } //////////////////////////////////////////////////////////////////////// @@ -307,27 +303,26 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup val requestedColumns = sourceTableSchema.fieldNames - // Nevertheless, the last update was written with a new schema (that is a subset of the original table schema), - // we should be able to read CSI, which will be properly padded (with nulls) after transposition - val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns) - val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, updatedColStatsDF, requestedColumns, sourceTableSchema) - - val targetIndexedColumns = targetColumnsToIndex.intersect(requestedColumns) - val expectedColStatsSchema = composeIndexSchema(targetIndexedColumns, sourceTableSchema) - + val expectedColStatsSchema = composeIndexSchema(requestedColumns.sorted, sourceTableSchema) val expectedColStatsIndexUpdatedDF = spark.read .schema(expectedColStatsSchema) .json(getClass.getClassLoader.getResource("index/colstats/updated-partial-column-stats-index-table.json").toString) - assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema) - assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName")))) - // Collect Column Stats manually (reading individual Parquet files) val manualUpdatedColStatsTableDF = - buildColumnStatsTableManually(basePath, targetIndexedColumns, expectedColStatsSchema) + buildColumnStatsTableManually(basePath, requestedColumns, targetColumnsToIndex, expectedColStatsSchema) - assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF))) + val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient) + + // Nevertheless, the last update was written with a new schema (that is a subset of the original table schema), + // we should be able to read CSI, which will be properly padded (with nulls) after transposition + columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) { transposedUpdatedColStatsDF => + assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema) + + assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName")))) + assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF))) + } } } @@ -370,7 +365,10 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup }) } - private def buildColumnStatsTableManually(tablePath: String, indexedCols: Seq[String], indexSchema: StructType) = { + private def buildColumnStatsTableManually(tablePath: String, + includedCols: Seq[String], + indexedCols: Seq[String], + indexSchema: StructType): DataFrame = { val files = { val it = fs.listFiles(new Path(tablePath), true) var seq = Seq[LocatedFileStatus]() @@ -387,15 +385,23 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup s"'${typedLit(file.getPath.getName)}' AS file" +: s"sum(1) AS valueCount" +: df.columns - .filter(col => indexedCols.contains(col)) + .filter(col => includedCols.contains(col)) .flatMap(col => { val minColName = s"${col}_minValue" val maxColName = s"${col}_maxValue" - Seq( - s"min($col) AS $minColName", - s"max($col) AS $maxColName", - s"sum(cast(isnull($col) AS long)) AS ${col}_nullCount" - ) + if (indexedCols.contains(col)) { + Seq( + s"min($col) AS $minColName", + s"max($col) AS $maxColName", + s"sum(cast(isnull($col) AS long)) AS ${col}_nullCount" + ) + } else { + Seq( + s"null AS $minColName", + s"null AS $maxColName", + s"null AS ${col}_nullCount" + ) + } }) df.selectExpr(exprs: _*) @@ -461,11 +467,13 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup object TestColumnStatsIndex { - case class ColumnStatsTestCase(forceFullLogScan: Boolean, readFullMetadataTable: Boolean) + case class ColumnStatsTestCase(forceFullLogScan: Boolean, shouldReadInMemory: Boolean) def testMetadataColumnStatsIndexParams: java.util.stream.Stream[Arguments] = java.util.stream.Stream.of( - Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = false, readFullMetadataTable = false)), - Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = true, readFullMetadataTable = true)) + Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = false, shouldReadInMemory = true)), + Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = false, shouldReadInMemory = false)), + Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = true, shouldReadInMemory = false)), + Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = true, shouldReadInMemory = true)) ) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala index b24a341d4a293..00ab7091445db 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala @@ -30,7 +30,7 @@ import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, DefaultSo import org.apache.parquet.hadoop.util.counters.BenchmarkCounter import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.{Dataset, HoodieUnsafeRDDUtils, Row, SaveMode} +import org.apache.spark.sql.{Dataset, HoodieUnsafeUtils, Row, SaveMode} import org.junit.jupiter.api.Assertions.{assertEquals, fail} import org.junit.jupiter.api.{Disabled, Tag, Test} @@ -316,7 +316,7 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with val (rows, bytesRead) = measureBytesRead { () => val rdd = relation.buildScan(targetColumns, Array.empty).asInstanceOf[HoodieUnsafeRDD] - HoodieUnsafeRDDUtils.collect(rdd) + HoodieUnsafeUtils.collect(rdd) } val targetRecordCount = tableState.targetRecordCount; diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index 30af252d2dc16..3c0282d710c2b 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -22,9 +22,13 @@ import org.apache.avro.Schema import org.apache.hudi.Spark2RowSerDe import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.spark.sql.avro._ +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate} import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan} +import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark24HoodieParquetFileFormat} import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, Spark2ParsePartitionUtil, SparkParsePartitionUtil} import org.apache.spark.sql.hudi.SparkAdapter @@ -32,6 +36,8 @@ import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.DataType import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSpark2CatalystExpressionUtils, HoodieSpark2CatalystPlanUtils, Row, SparkSession} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.storage.StorageLevel._ import scala.collection.mutable.ArrayBuffer @@ -115,4 +121,20 @@ class Spark2Adapter extends SparkAdapter { override def createInterpretedPredicate(e: Expression): InterpretedPredicate = { InterpretedPredicate.create(e) } + + override def convertStorageLevelToString(level: StorageLevel): String = level match { + case NONE => "NONE" + case DISK_ONLY => "DISK_ONLY" + case DISK_ONLY_2 => "DISK_ONLY_2" + case MEMORY_ONLY => "MEMORY_ONLY" + case MEMORY_ONLY_2 => "MEMORY_ONLY_2" + case MEMORY_ONLY_SER => "MEMORY_ONLY_SER" + case MEMORY_ONLY_SER_2 => "MEMORY_ONLY_SER_2" + case MEMORY_AND_DISK => "MEMORY_AND_DISK" + case MEMORY_AND_DISK_2 => "MEMORY_AND_DISK_2" + case MEMORY_AND_DISK_SER => "MEMORY_AND_DISK_SER" + case MEMORY_AND_DISK_SER_2 => "MEMORY_AND_DISK_SER_2" + case OFF_HEAP => "OFF_HEAP" + case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $level") + } } diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala index 034d21dba4544..4f55039746785 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala @@ -33,6 +33,8 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.{HoodieCatalystPlansUtils, HoodieSpark3CatalystPlanUtils, Row, SparkSession} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.storage.StorageLevel.{DISK_ONLY, DISK_ONLY_2, DISK_ONLY_3, MEMORY_AND_DISK, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER, MEMORY_AND_DISK_SER_2, MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_ONLY_SER, MEMORY_ONLY_SER_2, NONE, OFF_HEAP} import scala.util.control.NonFatal @@ -100,4 +102,24 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging { override def createInterpretedPredicate(e: Expression): InterpretedPredicate = { Predicate.createInterpreted(e) } + + /** + * Converts instance of [[StorageLevel]] to a corresponding string + */ + override def convertStorageLevelToString(level: StorageLevel): String = level match { + case NONE => "NONE" + case DISK_ONLY => "DISK_ONLY" + case DISK_ONLY_2 => "DISK_ONLY_2" + case DISK_ONLY_3 => "DISK_ONLY_3" + case MEMORY_ONLY => "MEMORY_ONLY" + case MEMORY_ONLY_2 => "MEMORY_ONLY_2" + case MEMORY_ONLY_SER => "MEMORY_ONLY_SER" + case MEMORY_ONLY_SER_2 => "MEMORY_ONLY_SER_2" + case MEMORY_AND_DISK => "MEMORY_AND_DISK" + case MEMORY_AND_DISK_2 => "MEMORY_AND_DISK_2" + case MEMORY_AND_DISK_SER => "MEMORY_AND_DISK_SER" + case MEMORY_AND_DISK_SER_2 => "MEMORY_AND_DISK_SER_2" + case OFF_HEAP => "OFF_HEAP" + case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $level") + } }