Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
de37979
Converted all methods into static ones;
Jun 3, 2022
b1237ae
Pass all filters including partition ones to Column Stats evaluation seq
Jun 3, 2022
84d661b
Using `CachingPath`;
Jun 3, 2022
96e4915
Abstracted `ColumnStatsIndexSupport` as standalone component
Jun 8, 2022
509b8cd
Fixed tests
Jun 8, 2022
29e6724
Tidying up
Jun 8, 2022
d056a0d
Persist CS data-frame to avoid recomputing
Jun 8, 2022
843e6a3
Revert "Persist CS data-frame to avoid recomputing"
Jun 8, 2022
37bbbee
Avoid `InternalRow` > `Row` conversion
Jun 8, 2022
c3eb82e
XXX
Jun 8, 2022
9186ed1
Avoid serializing full MT payload, and instead just cherry-pick CSI p…
Jun 8, 2022
6913812
Use native `Dataset` API to make sure WSCG could collapse most of the…
Jun 8, 2022
102e485
Tidying up
Jun 8, 2022
6f1686f
Rollback `ColumnStatsIndexSupport` onto proper RDD API
Jun 8, 2022
afeb3a7
Add a flag to whether MT should load records as RDD/In-memory
Jun 10, 2022
054c933
Made `JFunction` convertes implicit
Jun 10, 2022
27de4fe
XXX
Jun 10, 2022
ff9add6
Rebased `ColumnStatsIndexSupport` onto leveraging `HoodieData` to be …
Jun 10, 2022
31b53e2
Added more utils to create `Dataset`s based on `LocalRelation`;
Jun 10, 2022
7ccef0c
Fixed `TestColumnStatsIndex`
Jun 10, 2022
0ce4491
Fixed handling of non-indexed columns (to avoid reporting any stats f…
Jun 10, 2022
a67aa01
Fixing non-serializable task;
Jun 11, 2022
a953f0e
Rebased `HoodieFileIndex` onto the new API;
Jun 11, 2022
f9a4103
Added config to overide whether CSI should be read in-memory or by Spark
Jun 11, 2022
a36a654
Extrated `shouldReadInMemory` utility to CSI
Jun 11, 2022
4d56e07
Moved `isIndexAvailable` check into `ColumnStatsIndexSupport`
Jun 13, 2022
a764186
Adding missing scala-docs
Jun 13, 2022
b23cfd2
Fixing tests
Jun 13, 2022
c953ab2
Missing changes to `BaseHoodieTableFileIndex`
Jun 13, 2022
f4c847a
Added caching to `ColumnStatsIndexSupport`
Jun 13, 2022
ea2a1e8
Test `COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE` working appropriately
Jun 13, 2022
93d7d63
Typo
Jun 14, 2022
4f665e4
Make caching w/in `ColumnStatsIndexSupport` configurable
Jun 14, 2022
f01fdd2
Fixed `shouldReadInMemory` check
Jun 14, 2022
f4f19c2
Fixed compilation
Jul 20, 2022
66bc580
Convert Scala lambdas to Java's ones explicitly (required for Scala 2…
Jul 21, 2022
9efbb07
Extracted `convertStorageLevelToString` to `SparkAdapter`
Jul 21, 2022
517b546
Fixed compilation
Jul 21, 2022
02719f9
Delay sanity assertions;
Jul 22, 2022
5538dde
Fixing tests
Jul 23, 2022
40cf8fe
Neutralize the config to be engine-agnostic (and not locked-in to Spark)
Jul 25, 2022
215bab0
Made in-memory projection threshold configurable
Jul 25, 2022
b1e993b
Tidying up
Jul 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,43 @@

package org.apache.hudi.util

import org.apache.hudi.common.function.{SerializableFunction, SerializablePairFunction}
import org.apache.hudi.common.util.collection

import scala.language.implicitConversions

/**
* Utility allowing for seamless conversion b/w Java/Scala functional primitives
*/
object JFunction {

def toScala[T, R](f: java.util.function.Function[T, R]): T => R =
////////////////////////////////////////////////////////////
// From Java to Scala
////////////////////////////////////////////////////////////

implicit def toScala[T, R](f: java.util.function.Function[T, R]): T => R =
(t: T) => f.apply(t)

def toJava[T](f: T => Unit): java.util.function.Consumer[T] =
////////////////////////////////////////////////////////////
// From Scala to Java
////////////////////////////////////////////////////////////

implicit def toJavaFunction[T, R](f: Function[T, R]): java.util.function.Function[T, R] =
new java.util.function.Function[T, R] {
override def apply(t: T): R = f.apply(t)
}

implicit def toJavaSerializableFunction[T, R](f: Function[T, R]): SerializableFunction[T, R] =
new SerializableFunction[T, R] {
override def apply(t: T): R = f.apply(t)
}

implicit def toJavaSerializablePairFunction[T, K, V](f: Function[T, collection.Pair[K, V]]): SerializablePairFunction[T, K, V] =
new SerializablePairFunction[T, K, V] {
override def call(t: T): collection.Pair[K, V] = f.apply(t)
}

implicit def toJava[T](f: T => Unit): java.util.function.Consumer[T] =
new java.util.function.Consumer[T] {
override def accept(t: T): Unit = f.apply(t)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.{FilePartition, LogicalRelation, PartitionedFile, SparkParsePartitionUtil}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, Row, SparkSession}
import org.apache.spark.storage.StorageLevel

import java.util.Locale

Expand Down Expand Up @@ -138,4 +142,9 @@ trait SparkAdapter extends Serializable {
* TODO move to HoodieCatalystExpressionUtils
*/
def createInterpretedPredicate(e: Expression): InterpretedPredicate

/**
* Converts instance of [[StorageLevel]] to a corresponding string
*/
def convertStorageLevelToString(level: StorageLevel): String
}
Original file line number Diff line number Diff line change
Expand Up @@ -1504,7 +1504,7 @@ public void testColStatsPrefixLookup() throws IOException {
// prefix search for column (_hoodie_record_key)
ColumnIndexID columnIndexID = new ColumnIndexID(HoodieRecord.RECORD_KEY_METADATA_FIELD);
List<HoodieRecord<HoodieMetadataPayload>> result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString()),
MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true).collectAsList();

// there are 3 partitions in total and 2 commits. total entries should be 6.
assertEquals(result.size(), 6);
Expand All @@ -1515,7 +1515,7 @@ public void testColStatsPrefixLookup() throws IOException {
// prefix search for col(_hoodie_record_key) and first partition. only 2 files should be matched
PartitionIndexID partitionIndexID = new PartitionIndexID(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString())),
MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true).collectAsList();
// 1 partition and 2 commits. total entries should be 2.
assertEquals(result.size(), 2);
result.forEach(entry -> {
Expand All @@ -1534,7 +1534,7 @@ public void testColStatsPrefixLookup() throws IOException {
// prefix search for column {commit time} and first partition
columnIndexID = new ColumnIndexID(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString())),
MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true).collectAsList();

// 1 partition and 2 commits. total entries should be 2.
assertEquals(result.size(), 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -62,7 +63,7 @@
* <li>Query instant/range</li>
* </ul>
*/
public abstract class BaseHoodieTableFileIndex {
public abstract class BaseHoodieTableFileIndex {

private static final Logger LOG = LogManager.getLogger(BaseHoodieTableFileIndex.class);

Expand Down Expand Up @@ -166,6 +167,11 @@ public Map<String, List<FileSlice>> listFileSlices() {
.collect(Collectors.toMap(e -> e.getKey().path, Map.Entry::getValue));
}

public int getFileSlicesCount() {
return cachedAllInputFileSlices.values().stream()
.mapToInt(List::size).sum();
}

protected List<PartitionPath> getAllQueryPartitionPaths() {
List<String> queryRelativePartitionPaths = queryPaths.stream()
.map(path -> FSUtils.getRelativePartitionPath(new Path(basePath), path))
Expand Down Expand Up @@ -349,10 +355,10 @@ public String getPath() {

Path fullPartitionPath(String basePath) {
if (!path.isEmpty()) {
return new Path(basePath, path);
return new CachingPath(basePath, path);
}

return new Path(basePath);
return new CachingPath(basePath);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,26 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.sinceVersion("0.11.0")
.withDocumentation("Comma-separated list of columns for which column stats index will be built. If not set, all columns will be indexed");

public static final String COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY = "in-memory";
public static final String COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE = "engine";

public static final ConfigProperty<String> COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE = ConfigProperty
.key(METADATA_PREFIX + ".index.column.stats.processing.mode.override")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe .index.column.stats.processing.mode is ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Processing mode is not meant to be configured by default: this config is specifically to override this behavior

.noDefaultValue()
.withValidValues(COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY, COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE)
.sinceVersion("0.12.0")
.withDocumentation("By default Column Stats Index is automatically determining whether it should be read and processed either"
+ "'in-memory' (w/in executing process) or using Spark (on a cluster), based on some factors like the size of the Index "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't forget to update the docs as well

+ "and how many columns are read. This config allows to override this behavior.");

public static final ConfigProperty<Integer> COLUMN_STATS_INDEX_IN_MEMORY_PROJECTION_THRESHOLD = ConfigProperty
.key(METADATA_PREFIX + ".index.column.stats.inMemory.projection.threshold")
.defaultValue(100000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any perf numbers to support this default threshold?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, reading Column Stats lesser than 100k rows is faster in-memory, than using Spark (AFAIR it was roughly about 1s vs 2s)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How fast is it for in-memory?

.sinceVersion("0.12.0")
.withDocumentation("When reading Column Stats Index, if the size of the expected resulting projection is below the in-memory"
+ " threshold (counted by the # of rows), it will be attempted to be loaded \"in-memory\" (ie not using the execution engine"
+ " like Spark, Flink, etc). If the value is above the threshold execution engine will be used to compose the projection.");

public static final ConfigProperty<String> BLOOM_FILTER_INDEX_FOR_COLUMNS = ConfigProperty
.key(METADATA_PREFIX + ".index.bloom.filter.column.list")
.noDefaultValue()
Expand Down Expand Up @@ -246,6 +266,14 @@ public List<String> getColumnsEnabledForColumnStatsIndex() {
return StringUtils.split(getString(COLUMN_STATS_INDEX_FOR_COLUMNS), CONFIG_VALUES_DELIMITER);
}

public String getColumnStatsIndexProcessingModeOverride() {
return getString(COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE);
}

public Integer getColumnStatsIndexInMemoryProjectionThreshold() {
return getIntOrDefault(COLUMN_STATS_INDEX_IN_MEMORY_PROJECTION_THRESHOLD);
}

public List<String> getColumnsEnabledForBloomFilterIndex() {
return StringUtils.split(getString(BLOOM_FILTER_INDEX_FOR_COLUMNS), CONFIG_VALUES_DELIMITER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public Map<Pair<String, String>, HoodieMetadataColumnStats> getColumnStats(final
}

@Override
public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes, String partitionName) {
public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes, String partitionName, boolean shouldLoadInMemory) {
throw new HoodieMetadataException("Unsupported operation: getRecordsByKeyPrefixes!");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.FileSlice;
Expand Down Expand Up @@ -143,55 +144,56 @@ protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String key,

@Override
public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes,
String partitionName) {
String partitionName,
boolean shouldLoadInMemory) {
// Sort the columns so that keys are looked up in order
List<String> sortedkeyPrefixes = new ArrayList<>(keyPrefixes);
Collections.sort(sortedkeyPrefixes);
List<String> sortedKeyPrefixes = new ArrayList<>(keyPrefixes);
Collections.sort(sortedKeyPrefixes);

// NOTE: Since we partition records to a particular file-group by full key, we will have
// to scan all file-groups for all key-prefixes as each of these might contain some
// records matching the key-prefix
List<FileSlice> partitionFileSlices =
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName);

return engineContext.parallelize(partitionFileSlices)
.flatMap(
(SerializableFunction<FileSlice, Iterator<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>>) fileSlice -> {
// NOTE: Since this will be executed by executors, we can't access previously cached
// readers, and therefore have to always open new ones
Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
openReaders(partitionName, fileSlice);
try {
List<Long> timings = new ArrayList<>();

HoodieFileReader baseFileReader = readers.getKey();
HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();

if (baseFileReader == null && logRecordScanner == null) {
// TODO: what do we do if both does not exist? should we throw an exception and let caller do the fallback ?
return Collections.emptyIterator();
}

boolean fullKeys = false;

Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
readLogRecords(logRecordScanner, sortedkeyPrefixes, fullKeys, timings);

List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> mergedRecords =
readFromBaseAndMergeWithLogRecords(baseFileReader, sortedkeyPrefixes, fullKeys, logRecords, timings, partitionName);

LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms",
sortedkeyPrefixes.size(), timings));

return mergedRecords.iterator();
} catch (IOException ioe) {
throw new HoodieIOException("Error merging records from metadata table for " + sortedkeyPrefixes.size() + " key : ", ioe);
} finally {
closeReader(readers);
}
return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) : engineContext.parallelize(partitionFileSlices))
.flatMap((SerializableFunction<FileSlice, Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
// NOTE: Since this will be executed by executors, we can't access previously cached
// readers, and therefore have to always open new ones
Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
openReaders(partitionName, fileSlice);

try {
List<Long> timings = new ArrayList<>();

HoodieFileReader baseFileReader = readers.getKey();
HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();

if (baseFileReader == null && logRecordScanner == null) {
// TODO: what do we do if both does not exist? should we throw an exception and let caller do the fallback ?
return Collections.emptyIterator();
}
)
.map(keyRecordPair -> keyRecordPair.getValue().orElse(null))

boolean fullKeys = false;

Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
readLogRecords(logRecordScanner, sortedKeyPrefixes, fullKeys, timings);

List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> mergedRecords =
readFromBaseAndMergeWithLogRecords(baseFileReader, sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);

LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms",
sortedKeyPrefixes.size(), timings));

return mergedRecords.stream()
.map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
.iterator();
} catch (IOException ioe) {
throw new HoodieIOException("Error merging records from metadata table for " + sortedKeyPrefixes.size() + " key : ", ioe);
} finally {
closeReader(readers);
}
})
.filter(Objects::nonNull);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ Map<Pair<String, String>, HoodieMetadataColumnStats> getColumnStats(final List<P
* @return {@link HoodieData} of {@link HoodieRecord}s with records matching the passed in key prefixes.
*/
HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes,
String partitionName);
String partitionName,
boolean shouldLoadInMemory);
Copy link
Contributor

@YannByron YannByron Jul 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether or not column statistics should be loaded in memory is the behavior of a metadata table.
IMO, we can abstract a method named shouldLoadColumnStatsInMemory in HoodieTableMetadata, the sub-classes can implement this, and use this in their own getRecordsByKeyPrefixes directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I think that would definitely make sense if we in consider in the future to make all methods be configurable to either load in-memory or using Spark. Right now only this method is configurable and it would be very misleading if we elevate it to be instance level config


/**
* Get the instant time to which the metadata is synced w.r.t data timeline.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ private static List<RowData> readColumnStatsIndexByColumns(
.map(colName -> new ColumnIndexID(colName).asBase64EncodedString()).collect(Collectors.toList());

HoodieData<HoodieRecord<HoodieMetadataPayload>> records =
metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS);
metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, false);

org.apache.hudi.util.AvroToRowDataConverters.AvroToRowDataConverter converter =
AvroToRowDataConverters.createRowConverter((RowType) METADATA_DATA_TYPE.getLogicalType());
Expand Down
Loading