diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index b68cf97e9aa35..ac75997ea5c92 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -74,6 +74,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.text.ParseException; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -159,10 +160,14 @@ public List upsertPreppedRecords(List> preppedRecor initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime)); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient()); - final HoodieWriteHandle writeHandle = getOrCreateWriteHandle(preppedRecords.get(0), getConfig(), - instantTime, table, preppedRecords.listIterator()); - HoodieWriteMetadata> result = ((HoodieFlinkTable) table).upsertPrepped(context, writeHandle, instantTime, preppedRecords); - return postWrite(result, instantTime, table); + Map>> preppedRecordsByFileId = preppedRecords.stream().parallel() + .collect(Collectors.groupingBy(r -> r.getCurrentLocation().getFileId())); + return preppedRecordsByFileId.values().stream().parallel().map(records -> { + final HoodieWriteHandle writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(), + instantTime, table, records.listIterator()); + HoodieWriteMetadata> result = ((HoodieFlinkTable) table).upsertPrepped(context, writeHandle, instantTime, records); + return postWrite(result, instantTime, table); + }).flatMap(Collection::stream).collect(Collectors.toList()); } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index a436963bfcfb5..7dc5deb791d4d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -236,6 +236,13 @@ private FlinkOptions() { .noDefaultValue() .withDescription("End commit instant for reading, the commit time format should be 'yyyyMMddHHmmss'"); + public static final ConfigOption READ_DATA_SKIPPING_ENABLED = ConfigOptions + .key("read.data.skipping.enabled") + .booleanType() + .defaultValue(false) + .withDescription("Enables data-skipping allowing queries to leverage indexes to reduce the search space by" + + "skipping over files"); + // ------------------------------------------------------------------------ // Write Options // ------------------------------------------------------------------------ diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java index d15ef280f532a..8666151fe492d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/HadoopConfigurations.java @@ -18,11 +18,11 @@ package org.apache.hudi.configuration; +import org.apache.hudi.util.FlinkClientUtil; + import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hudi.util.FlinkClientUtil; - import java.util.Map; /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java index f059c7050ca56..5062ab0cd3c17 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java @@ -18,7 +18,6 @@ package org.apache.hudi.sink.meta; -import org.apache.flink.configuration.Configuration; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.ValidationUtils; @@ -26,6 +25,7 @@ import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.exception.HoodieException; +import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java index 9fc5323d46a2d..8973c405f3b49 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java @@ -18,7 +18,6 @@ package org.apache.hudi.sink.utils; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.FSUtils; @@ -29,6 +28,7 @@ import org.apache.hudi.hive.ddl.HiveSyncMode; import org.apache.hudi.table.format.FilePathUtils; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java index 535e05f687635..4364d1d16d665 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/NonThrownExecutor.java @@ -155,6 +155,10 @@ private Supplier getActionString(String actionName, Object... actionPara // ------------------------------------------------------------------------- // Inner Class // ------------------------------------------------------------------------- + + /** + * The exception hook. + */ public interface ExceptionHook { void apply(String errMsg, Throwable t); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java index d7125b414352d..92396c1820e49 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java @@ -21,14 +21,24 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; +import org.apache.hudi.source.stats.ColumnStatsIndices; +import org.apache.hudi.source.stats.ExpressionEvaluator; +import org.apache.hudi.util.DataTypeUtils; +import org.apache.hudi.util.ExpressionUtils; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.types.logical.RowType; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -40,6 +50,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.stream.Collectors; /** * A file index which supports listing files efficiently through metadata table. @@ -47,19 +58,26 @@ *

It caches the partition paths to avoid redundant look up. */ public class FileIndex { + private static final Logger LOG = LoggerFactory.getLogger(FileIndex.class); + private final Path path; + private final RowType rowType; private final HoodieMetadataConfig metadataConfig; - private List partitionPaths; // cache of partition paths + private final boolean dataSkippingEnabled; + private List partitionPaths; // cache of partition paths + private List filters; // push down filters private final boolean tableExists; - private FileIndex(Path path, Configuration conf) { + private FileIndex(Path path, Configuration conf, RowType rowType) { this.path = path; + this.rowType = rowType; this.metadataConfig = metadataConfig(conf); + this.dataSkippingEnabled = conf.getBoolean(FlinkOptions.READ_DATA_SKIPPING_ENABLED); this.tableExists = StreamerUtil.tableExists(path.toString(), HadoopConfigurations.getHadoopConf(conf)); } - public static FileIndex instance(Path path, Configuration conf) { - return new FileIndex(path, conf); + public static FileIndex instance(Path path, Configuration conf, RowType rowType) { + return new FileIndex(path, conf, rowType); } /** @@ -119,9 +137,17 @@ public FileStatus[] getFilesInPartitions() { return new FileStatus[0]; } String[] partitions = getOrBuildPartitionPaths().stream().map(p -> fullPartitionPath(path, p)).toArray(String[]::new); - return FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString(), + FileStatus[] allFileStatus = FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString(), partitions, "/tmp/") .values().stream().flatMap(Arrays::stream).toArray(FileStatus[]::new); + Set candidateFiles = candidateFilesInMetadataTable(allFileStatus); + if (candidateFiles == null) { + // no need to filter by col stats or error occurs. + return allFileStatus; + } + return Arrays.stream(allFileStatus).parallel() + .filter(fileStatus -> candidateFiles.contains(fileStatus.getPath().getName())) + .toArray(FileStatus[]::new); } /** @@ -159,10 +185,96 @@ public void setPartitionPaths(@Nullable Set partitionPaths) { } } + /** + * Sets up pushed down filters. + */ + public void setFilters(List filters) { + if (filters.size() > 0) { + this.filters = new ArrayList<>(filters); + } + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- + /** + * Computes pruned list of candidate base-files' names based on provided list of data filters. + * conditions, by leveraging Metadata Table's Column Statistics index (hereon referred as ColStats for brevity) + * bearing "min", "max", "num_nulls" statistics for all columns. + * + *

NOTE: This method has to return complete set of candidate files, since only provided candidates will + * ultimately be scanned as part of query execution. Hence, this method has to maintain the + * invariant of conservatively including every base-file's name, that is NOT referenced in its index. + * + *

The {@code filters} must all be simple. + * + * @return list of pruned (data-skipped) candidate base-files' names + */ + @Nullable + private Set candidateFilesInMetadataTable(FileStatus[] allFileStatus) { + // NOTE: Data Skipping is only effective when it references columns that are indexed w/in + // the Column Stats Index (CSI). Following cases could not be effectively handled by Data Skipping: + // - Expressions on top-level column's fields (ie, for ex filters like "struct.field > 0", since + // CSI only contains stats for top-level columns, in this case for "struct") + // - Any expression not directly referencing top-level column (for ex, sub-queries, since there's + // nothing CSI in particular could be applied for) + if (!metadataConfig.enabled() || !dataSkippingEnabled) { + validateConfig(); + return null; + } + if (this.filters == null || this.filters.size() == 0) { + return null; + } + String[] referencedCols = ExpressionUtils.referencedColumns(filters); + if (referencedCols.length == 0) { + return null; + } + try { + final List colStats = ColumnStatsIndices.readColumnStatsIndex(path.toString(), metadataConfig, referencedCols); + final Pair, String[]> colStatsTable = ColumnStatsIndices.transposeColumnStatsIndex(colStats, referencedCols, rowType); + List transposedColStats = colStatsTable.getLeft(); + String[] queryCols = colStatsTable.getRight(); + if (queryCols.length == 0) { + // the indexed columns have no intersection with the referenced columns, returns early + return null; + } + RowType.RowField[] queryFields = DataTypeUtils.projectRowFields(rowType, queryCols); + + Set allIndexedFileNames = transposedColStats.stream().parallel() + .map(row -> row.getString(0).toString()) + .collect(Collectors.toSet()); + Set candidateFileNames = transposedColStats.stream().parallel() + .filter(row -> ExpressionEvaluator.filterExprs(filters, row, queryFields)) + .map(row -> row.getString(0).toString()) + .collect(Collectors.toSet()); + + // NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every + // base-file: since it's bound to clustering, which could occur asynchronously + // at arbitrary point in time, and is not likely to be touching all the base files. + // + // To close that gap, we manually compute the difference b/w all indexed (by col-stats-index) + // files and all outstanding base-files, and make sure that all base files not + // represented w/in the index are included in the output of this method + Set nonIndexedFileNames = Arrays.stream(allFileStatus) + .map(fileStatus -> fileStatus.getPath().getName()).collect(Collectors.toSet()); + nonIndexedFileNames.removeAll(allIndexedFileNames); + + candidateFileNames.addAll(nonIndexedFileNames); + return candidateFileNames; + } catch (Throwable throwable) { + LOG.warn("Read column stats for data skipping error", throwable); + return null; + } + } + + private void validateConfig() { + if (dataSkippingEnabled && !metadataConfig.enabled()) { + LOG.warn("Data skipping requires Metadata Table to be enabled! " + + "isMetadataTableEnabled = {}", metadataConfig.enabled()); + } + } + /** * Returns all the relative partition paths. * diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index 94eeefcd36df3..d4b23c3134875 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -35,6 +35,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; +import org.apache.flink.table.types.logical.RowType; import org.apache.hadoop.fs.FileStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,6 +78,7 @@ public class IncrementalInputSplits implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(IncrementalInputSplits.class); private final Configuration conf; private final Path path; + private final RowType rowType; private final long maxCompactionMemoryInBytes; // for partition pruning private final Set requiredPartitions; @@ -86,11 +88,13 @@ public class IncrementalInputSplits implements Serializable { private IncrementalInputSplits( Configuration conf, Path path, + RowType rowType, long maxCompactionMemoryInBytes, @Nullable Set requiredPartitions, boolean skipCompaction) { this.conf = conf; this.path = path; + this.rowType = rowType; this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes; this.requiredPartitions = requiredPartitions; this.skipCompaction = skipCompaction; @@ -167,7 +171,7 @@ public Result inputSplits( if (instantRange == null) { // reading from the earliest, scans the partitions and files directly. - FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf); + FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf, rowType); if (this.requiredPartitions != null) { // apply partition push down fileIndex.setPartitionPaths(this.requiredPartitions); @@ -349,6 +353,7 @@ public static Result instance(List inputSplits, String en public static class Builder { private Configuration conf; private Path path; + private RowType rowType; private long maxCompactionMemoryInBytes; // for partition pruning private Set requiredPartitions; @@ -368,6 +373,11 @@ public Builder path(Path path) { return this; } + public Builder rowType(RowType rowType) { + this.rowType = rowType; + return this; + } + public Builder maxCompactionMemoryInBytes(long maxCompactionMemoryInBytes) { this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes; return this; @@ -384,7 +394,8 @@ public Builder skipCompaction(boolean skipCompaction) { } public IncrementalInputSplits build() { - return new IncrementalInputSplits(Objects.requireNonNull(this.conf), Objects.requireNonNull(this.path), + return new IncrementalInputSplits( + Objects.requireNonNull(this.conf), Objects.requireNonNull(this.path), Objects.requireNonNull(this.rowType), this.maxCompactionMemoryInBytes, this.requiredPartitions, this.skipCompaction); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index 012bd093818a5..3318cecf10369 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.table.types.logical.RowType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,6 +100,7 @@ public class StreamReadMonitoringFunction public StreamReadMonitoringFunction( Configuration conf, Path path, + RowType rowType, long maxCompactionMemoryInBytes, @Nullable Set requiredPartitionPaths) { this.conf = conf; @@ -107,6 +109,7 @@ public StreamReadMonitoringFunction( this.incrementalInputSplits = IncrementalInputSplits.builder() .conf(conf) .path(path) + .rowType(rowType) .maxCompactionMemoryInBytes(maxCompactionMemoryInBytes) .requiredPartitions(requiredPartitionPaths) .skipCompaction(conf.getBoolean(FlinkOptions.READ_STREAMING_SKIP_COMPACT)) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java new file mode 100644 index 0000000000000..d033c1d8745bf --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source.stats; + +import org.apache.hudi.avro.model.HoodieMetadataRecord; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.common.util.hash.ColumnIndexID; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.AvroToRowDataConverters; +import org.apache.hudi.util.RowDataProjection; + +import org.apache.avro.generic.GenericRecord; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.util.ValidationUtils.checkState; + +/** + * Utilities for abstracting away heavy-lifting of interactions with Metadata Table's Column Stats Index, + * providing convenient interfaces to read it, transpose, etc. + */ +public class ColumnStatsIndices { + private static final DataType METADATA_DATA_TYPE = getMetadataDataType(); + private static final DataType COL_STATS_DATA_TYPE = getColStatsDataType(); + private static final int[] COL_STATS_TARGET_POS = getColStatsTargetPos(); + + // the column schema: + // |- file_name: string + // |- min_val: row + // |- max_val: row + // |- null_cnt: long + // |- val_cnt: long + // |- column_name: string + private static final int ORD_FILE_NAME = 0; + private static final int ORD_MIN_VAL = 1; + private static final int ORD_MAX_VAL = 2; + private static final int ORD_NULL_CNT = 3; + private static final int ORD_VAL_CNT = 4; + private static final int ORD_COL_NAME = 5; + + private ColumnStatsIndices() { + } + + public static List readColumnStatsIndex(String basePath, HoodieMetadataConfig metadataConfig, String[] targetColumns) { + // NOTE: If specific columns have been provided, we can considerably trim down amount of data fetched + // by only fetching Column Stats Index records pertaining to the requested columns. + // Otherwise, we fall back to read whole Column Stats Index + ValidationUtils.checkArgument(targetColumns.length > 0, + "Column stats is only valid when push down filters have referenced columns"); + final List metadataRows = readColumnStatsIndexByColumns(basePath, targetColumns, metadataConfig); + return projectNestedColStatsColumns(metadataRows); + } + + private static List projectNestedColStatsColumns(List rows) { + int pos = HoodieMetadataRecord.SCHEMA$.getField(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).pos(); + RowDataProjection projection = RowDataProjection.instanceV2((RowType) COL_STATS_DATA_TYPE.getLogicalType(), COL_STATS_TARGET_POS); + return rows.stream().parallel() + .map(row -> { + RowData columnStatsField = row.getRow(pos, 9); + return projection.project(columnStatsField); + }).collect(Collectors.toList()); + } + + /** + * Transposes and converts the raw table format of the Column Stats Index representation, + * where each row/record corresponds to individual (column, file) pair, into the table format + * where each row corresponds to single file with statistic for individual columns collated + * w/in such row: + *

+ * Metadata Table Column Stats Index format: + * + *

+   *  +---------------------------+------------+------------+------------+-------------+
+   *  |        fileName           | columnName |  minValue  |  maxValue  |  num_nulls  |
+   *  +---------------------------+------------+------------+------------+-------------+
+   *  | one_base_file.parquet     |          A |          1 |         10 |           0 |
+   *  | another_base_file.parquet |          A |        -10 |          0 |           5 |
+   *  +---------------------------+------------+------------+------------+-------------+
+   * 
+ *

+ * Returned table format + * + *

+   *  +---------------------------+------------+------------+-------------+
+   *  |          file             | A_minValue | A_maxValue | A_nullCount |
+   *  +---------------------------+------------+------------+-------------+
+   *  | one_base_file.parquet     |          1 |         10 |           0 |
+   *  | another_base_file.parquet |        -10 |          0 |           5 |
+   *  +---------------------------+------------+------------+-------------+
+   * 
+ *

+ * NOTE: Column Stats Index might potentially contain statistics for many columns (if not all), while + * query at hand might only be referencing a handful of those. As such, we collect all the + * column references from the filtering expressions, and only transpose records corresponding to the + * columns referenced in those + * + * @param colStats RowData list bearing raw Column Stats Index table + * @param queryColumns target columns to be included into the final table + * @param tableSchema schema of the source data table + * @return reshaped table according to the format outlined above + */ + public static Pair, String[]> transposeColumnStatsIndex(List colStats, String[] queryColumns, RowType tableSchema) { + + Map tableFieldTypeMap = tableSchema.getFields().stream() + .collect(Collectors.toMap(RowType.RowField::getName, RowType.RowField::getType)); + + // NOTE: We have to collect list of indexed columns to make sure we properly align the rows + // w/in the transposed dataset: since some files might not have all the columns indexed + // either due to the Column Stats Index config changes, schema evolution, etc. we have + // to make sure that all the rows w/in transposed data-frame are properly padded (with null + // values) for such file-column combinations + Set indexedColumns = colStats.stream().map(row -> row.getString(ORD_COL_NAME) + .toString()).collect(Collectors.toSet()); + + // NOTE: We're sorting the columns to make sure final index schema matches layout + // of the transposed table + TreeSet sortedTargetColumns = Arrays.stream(queryColumns).sorted() + .filter(indexedColumns::contains) + .collect(Collectors.toCollection(TreeSet::new)); + + Map> fileNameToRows = colStats.stream().parallel() + .filter(row -> sortedTargetColumns.contains(row.getString(ORD_COL_NAME).toString())) + .map(row -> { + if (row.isNullAt(ORD_MIN_VAL) && row.isNullAt(ORD_MAX_VAL)) { + // Corresponding row could be null in either of the 2 cases + // - Column contains only null values (in that case both min/max have to be nulls) + // - This is a stubbed Column Stats record (used as a tombstone) + return row; + } else { + String colName = row.getString(ORD_COL_NAME).toString(); + LogicalType colType = tableFieldTypeMap.get(colName); + return unpackMinMaxVal(row, colType); + } + }).collect(Collectors.groupingBy(rowData -> rowData.getString(ORD_FILE_NAME))); + + return Pair.of(foldRowsByFiles(sortedTargetColumns, fileNameToRows), sortedTargetColumns.toArray(new String[0])); + } + + private static List foldRowsByFiles( + TreeSet sortedTargetColumns, + Map> fileNameToRows) { + return fileNameToRows.values().stream().parallel().map(rows -> { + // Rows seq is always non-empty (otherwise it won't be grouped into) + StringData fileName = rows.get(0).getString(ORD_FILE_NAME); + long valueCount = rows.get(0).getLong(ORD_VAL_CNT); + + // To properly align individual rows (corresponding to a file) w/in the transposed projection, we need + // to align existing column-stats for individual file with the list of expected ones for the + // whole transposed projection (a superset of all files) + Map columnRowsMap = rows.stream() + .collect(Collectors.toMap(row -> row.getString(ORD_COL_NAME).toString(), row -> row)); + SortedMap alignedColumnRowsMap = new TreeMap<>(); + sortedTargetColumns.forEach(col -> alignedColumnRowsMap.put(col, columnRowsMap.get(col))); + + List columnStats = alignedColumnRowsMap.values().stream().map(row -> { + if (row == null) { + // NOTE: Since we're assuming missing column to essentially contain exclusively + // null values, we set null-count to be equal to value-count (this behavior is + // consistent with reading non-existent columns from Parquet) + return Tuple3.of(null, null, valueCount); + } else { + GenericRowData gr = (GenericRowData) row; + return Tuple3.of(gr.getField(ORD_MIN_VAL), gr.getField(ORD_MAX_VAL), gr.getField(ORD_NULL_CNT)); + } + }).collect(Collectors.toList()); + GenericRowData foldedRow = new GenericRowData(2 + 3 * columnStats.size()); + foldedRow.setField(0, fileName); + foldedRow.setField(1, valueCount); + for (int i = 0; i < columnStats.size(); i++) { + Tuple3 stats = columnStats.get(i); + int startPos = 2 + 3 * i; + foldedRow.setField(startPos, stats.f0); + foldedRow.setField(startPos + 1, stats.f1); + foldedRow.setField(startPos + 2, stats.f2); + } + return foldedRow; + }).collect(Collectors.toList()); + } + + private static RowData unpackMinMaxVal( + RowData row, + LogicalType colType) { + + RowData minValueStruct = row.getRow(ORD_MIN_VAL, 1); + RowData maxValueStruct = row.getRow(ORD_MAX_VAL, 1); + + checkState(minValueStruct != null && maxValueStruct != null, + "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null"); + + Object minValue = tryUnpackNonNullVal(minValueStruct, colType); + Object maxValue = tryUnpackNonNullVal(maxValueStruct, colType); + + // the column schema: + // |- file_name: string + // |- min_val: row + // |- max_val: row + // |- null_cnt: long + // |- val_cnt: long + // |- column_name: string + + GenericRowData unpackedRow = new GenericRowData(row.getArity()); + unpackedRow.setField(0, row.getString(0)); + unpackedRow.setField(1, minValue); + unpackedRow.setField(2, maxValue); + unpackedRow.setField(3, row.getLong(3)); + unpackedRow.setField(4, row.getLong(4)); + unpackedRow.setField(5, row.getString(5)); + + return unpackedRow; + } + + private static Object tryUnpackNonNullVal(RowData rowData, LogicalType colType) { + for (int i = 0; i < rowData.getArity(); i++) { + // row data converted from avro is definitely generic. + Object nested = ((GenericRowData) rowData).getField(i); + if (nested != null) { + return doUnpack(nested, colType); + } + } + return null; + } + + private static Object doUnpack(Object rawVal, LogicalType logicalType) { + // fix time unit + switch (logicalType.getTypeRoot()) { + case TIME_WITHOUT_TIME_ZONE: + TimeType timeType = (TimeType) logicalType; + if (timeType.getPrecision() == 3) { + // the precision in HoodieMetadata is 6 + rawVal = ((Long) rawVal) / 1000; + } else if (timeType.getPrecision() == 9) { + rawVal = ((Long) rawVal) * 1000; + } + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + TimestampType timestampType = (TimestampType) logicalType; + if (timestampType.getPrecision() == 3) { + // the precision in HoodieMetadata is 6 + rawVal = ((Long) rawVal) / 1000; + } else if (timestampType.getPrecision() == 9) { + rawVal = ((Long) rawVal) * 1000; + } + break; + default: + // no operation + } + AvroToRowDataConverters.AvroToRowDataConverter converter = AvroToRowDataConverters.createConverter(logicalType); + return converter.convert(rawVal); + } + + private static List readColumnStatsIndexByColumns( + String basePath, + String[] targetColumns, + HoodieMetadataConfig metadataConfig) { + + // Read Metadata Table's Column Stats Index into Flink's RowData list by + // - Fetching the records from CSI by key-prefixes (encoded column names) + // - Deserializing fetched records into [[RowData]]s + HoodieTableMetadata metadataTable = HoodieTableMetadata.create( + HoodieFlinkEngineContext.DEFAULT, + metadataConfig, basePath, + FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue()); + + // TODO encoding should be done internally w/in HoodieBackedTableMetadata + List encodedTargetColumnNames = Arrays.stream(targetColumns) + .map(colName -> new ColumnIndexID(colName).asBase64EncodedString()).collect(Collectors.toList()); + + HoodieData> records = + metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS); + + org.apache.hudi.util.AvroToRowDataConverters.AvroToRowDataConverter converter = + AvroToRowDataConverters.createRowConverter((RowType) METADATA_DATA_TYPE.getLogicalType()); + return records.collectAsList().stream().parallel().map(record -> { + // schema and props are ignored for generating metadata record from the payload + // instead, the underlying file system, or bloom filter, or columns stats metadata (part of payload) are directly used + GenericRecord genericRecord; + try { + genericRecord = (GenericRecord) record.getData().getInsertValue(null, null).orElse(null); + } catch (IOException e) { + throw new HoodieException("Exception while getting insert value from metadata payload"); + } + return (RowData) converter.convert(genericRecord); + } + ).collect(Collectors.toList()); + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + private static class Tuple3 { + public Object f0; + public Object f1; + public Object f2; + + private Tuple3(Object f0, Object f1, Object f2) { + this.f0 = f0; + this.f1 = f1; + this.f2 = f2; + } + + public static Tuple3 of(Object f0, Object f1, Object f2) { + return new Tuple3(f0, f1, f2); + } + } + + private static DataType getMetadataDataType() { + return AvroSchemaConverter.convertToDataType(HoodieMetadataRecord.SCHEMA$); + } + + private static DataType getColStatsDataType() { + int pos = HoodieMetadataRecord.SCHEMA$.getField(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).pos(); + return METADATA_DATA_TYPE.getChildren().get(pos); + } + + // the column schema: + // |- file_name: string + // |- min_val: row + // |- max_val: row + // |- null_cnt: long + // |- val_cnt: long + // |- column_name: string + private static int[] getColStatsTargetPos() { + RowType colStatsRowType = (RowType) COL_STATS_DATA_TYPE.getLogicalType(); + return Stream.of( + HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME, + HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, + HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, + HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, + HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT, + HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME) + .mapToInt(colStatsRowType::getFieldIndex) + .toArray(); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java new file mode 100644 index 0000000000000..a0162856fa921 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ExpressionEvaluator.java @@ -0,0 +1,552 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source.stats; + +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.util.ExpressionUtils; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import javax.validation.constraints.NotNull; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Tool to evaluate the {@link org.apache.flink.table.expressions.ResolvedExpression}s. + */ +public class ExpressionEvaluator { + private static final int IN_PREDICATE_LIMIT = 200; + + /** + * Filter the index row with specific data filters and query fields. + * + * @param filters The pushed down data filters + * @param indexRow The index row + * @param queryFields The query fields referenced by the filters + * @return true if the index row should be considered as a candidate + */ + public static boolean filterExprs(List filters, RowData indexRow, RowType.RowField[] queryFields) { + for (ResolvedExpression filter : filters) { + if (!Evaluator.bindCall((CallExpression) filter, indexRow, queryFields).eval()) { + return false; + } + } + return true; + } + + /** + * Used for deciding whether the literal values match the column stats. + * The evaluator can be nested. + */ + public abstract static class Evaluator { + // the constant literal value + protected Object val; + + // column stats + protected Object minVal; + protected Object maxVal; + protected long nullCnt = 0; + + // referenced field type + protected LogicalType type; + + /** + * Binds the evaluator with specific call expression. + * + *

Three steps to bind the call: + * 1. map the evaluator instance; + * 2. bind the field reference; + * 3. bind the column stats. + * + *

Normalize the expression to simplify the following decision logic: + * always put the literal expression in the right. + */ + public static Evaluator bindCall(CallExpression call, RowData indexRow, RowType.RowField[] queryFields) { + FunctionDefinition funDef = call.getFunctionDefinition(); + List childExprs = call.getChildren(); + + boolean normalized = childExprs.get(0) instanceof FieldReferenceExpression; + final Evaluator evaluator; + + if (BuiltInFunctionDefinitions.NOT.equals(funDef)) { + evaluator = Not.getInstance(); + Evaluator childEvaluator = bindCall((CallExpression) childExprs.get(0), indexRow, queryFields); + return ((Not) evaluator).bindEvaluator(childEvaluator); + } + + if (BuiltInFunctionDefinitions.AND.equals(funDef)) { + evaluator = And.getInstance(); + Evaluator evaluator1 = bindCall((CallExpression) childExprs.get(0), indexRow, queryFields); + Evaluator evaluator2 = bindCall((CallExpression) childExprs.get(1), indexRow, queryFields); + return ((And) evaluator).bindEvaluator(evaluator1, evaluator2); + } + + if (BuiltInFunctionDefinitions.OR.equals(funDef)) { + evaluator = Or.getInstance(); + Evaluator evaluator1 = bindCall((CallExpression) childExprs.get(0), indexRow, queryFields); + Evaluator evaluator2 = bindCall((CallExpression) childExprs.get(1), indexRow, queryFields); + return ((Or) evaluator).bindEvaluator(evaluator1, evaluator2); + } + + // handle IN specifically + if (BuiltInFunctionDefinitions.IN.equals(funDef)) { + ValidationUtils.checkState(normalized, "The IN expression expects to be normalized"); + evaluator = In.getInstance(); + FieldReferenceExpression rExpr = (FieldReferenceExpression) childExprs.get(0); + evaluator.bindFieldReference(rExpr); + ((In) evaluator).bindVals(getInLiteralVals(childExprs)); + return evaluator.bindColStats(indexRow, queryFields, rExpr); + } + + // handle unary operators + if (BuiltInFunctionDefinitions.IS_NULL.equals(funDef)) { + FieldReferenceExpression rExpr = (FieldReferenceExpression) childExprs.get(0); + return IsNull.getInstance() + .bindFieldReference(rExpr) + .bindColStats(indexRow, queryFields, rExpr); + } else if (BuiltInFunctionDefinitions.IS_NOT_NULL.equals(funDef)) { + FieldReferenceExpression rExpr = (FieldReferenceExpression) childExprs.get(0); + return IsNotNull.getInstance() + .bindFieldReference(rExpr) + .bindColStats(indexRow, queryFields, rExpr); + } + + // handle binary operators + if (BuiltInFunctionDefinitions.EQUALS.equals(funDef)) { + evaluator = EqualTo.getInstance(); + } else if (BuiltInFunctionDefinitions.NOT_EQUALS.equals(funDef)) { + evaluator = NotEqualTo.getInstance(); + } else if (BuiltInFunctionDefinitions.LESS_THAN.equals(funDef)) { + evaluator = normalized ? LessThan.getInstance() : GreaterThan.getInstance(); + } else if (BuiltInFunctionDefinitions.GREATER_THAN.equals(funDef)) { + evaluator = normalized ? GreaterThan.getInstance() : LessThan.getInstance(); + } else if (BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL.equals(funDef)) { + evaluator = normalized ? LessThanOrEqual.getInstance() : GreaterThanOrEqual.getInstance(); + } else if (BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL.equals(funDef)) { + evaluator = normalized ? GreaterThanOrEqual.getInstance() : LessThanOrEqual.getInstance(); + } else { + throw new AssertionError("Unexpected function definition " + funDef); + } + FieldReferenceExpression rExpr = normalized + ? (FieldReferenceExpression) childExprs.get(0) + : (FieldReferenceExpression) childExprs.get(1); + ValueLiteralExpression vExpr = normalized + ? (ValueLiteralExpression) childExprs.get(1) + : (ValueLiteralExpression) childExprs.get(0); + evaluator + .bindFieldReference(rExpr) + .bindVal(vExpr) + .bindColStats(indexRow, queryFields, rExpr); + return evaluator; + } + + public Evaluator bindColStats( + RowData indexRow, + RowType.RowField[] queryFields, + FieldReferenceExpression expr) { + int colPos = -1; + for (int i = 0; i < queryFields.length; i++) { + if (expr.getName().equals(queryFields[i].getName())) { + colPos = i; + } + } + ValidationUtils.checkState(colPos != -1, "Can not find column " + expr.getName()); + int startPos = 2 + colPos * 3; + LogicalType colType = queryFields[colPos].getType(); + Object minVal = indexRow.isNullAt(startPos) ? null : getValAsJavaObj(indexRow, startPos, colType); + Object maxVal = indexRow.isNullAt(startPos + 1) ? null : getValAsJavaObj(indexRow, startPos + 1, colType); + long nullCnt = indexRow.getLong(startPos + 2); + + this.minVal = minVal; + this.maxVal = maxVal; + this.nullCnt = nullCnt; + return this; + } + + public Evaluator bindVal(ValueLiteralExpression vExpr) { + this.val = ExpressionUtils.getValueFromLiteral(vExpr); + return this; + } + + public Evaluator bindFieldReference(FieldReferenceExpression expr) { + this.type = expr.getOutputDataType().getLogicalType(); + return this; + } + + public abstract boolean eval(); + } + + /** + * To evaluate = expr. + */ + public static class EqualTo extends Evaluator { + + public static EqualTo getInstance() { + return new EqualTo(); + } + + @Override + public boolean eval() { + if (this.minVal == null || this.maxVal == null || this.val == null) { + return false; + } + if (compare(this.minVal, this.val, this.type) > 0) { + return false; + } + return compare(this.maxVal, this.val, this.type) >= 0; + } + } + + /** + * To evaluate <> expr. + */ + public static class NotEqualTo extends Evaluator { + public static NotEqualTo getInstance() { + return new NotEqualTo(); + } + + @Override + public boolean eval() { + // because the bounds are not necessarily a min or max value, this cannot be answered using + // them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value in col. + return true; + } + } + + /** + * To evaluate IS NULL expr. + */ + public static class IsNull extends Evaluator { + public static IsNull getInstance() { + return new IsNull(); + } + + @Override + public boolean eval() { + return this.nullCnt > 0; + } + } + + /** + * To evaluate IS NOT NULL expr. + */ + public static class IsNotNull extends Evaluator { + public static IsNotNull getInstance() { + return new IsNotNull(); + } + + @Override + public boolean eval() { + // should consider FLOAT/DOUBLE & NAN + return this.minVal != null || this.nullCnt <= 0; + } + } + + /** + * To evaluate < expr. + */ + public static class LessThan extends Evaluator { + public static LessThan getInstance() { + return new LessThan(); + } + + @Override + public boolean eval() { + if (this.minVal == null) { + return false; + } + return compare(this.minVal, this.val, this.type) < 0; + } + } + + /** + * To evaluate > expr. + */ + public static class GreaterThan extends Evaluator { + public static GreaterThan getInstance() { + return new GreaterThan(); + } + + @Override + public boolean eval() { + if (this.maxVal == null) { + return false; + } + return compare(this.maxVal, this.val, this.type) > 0; + } + } + + /** + * To evaluate <= expr. + */ + public static class LessThanOrEqual extends Evaluator { + public static LessThanOrEqual getInstance() { + return new LessThanOrEqual(); + } + + @Override + public boolean eval() { + if (this.minVal == null) { + return false; + } + return compare(this.minVal, this.val, this.type) <= 0; + } + } + + /** + * To evaluate >= expr. + */ + public static class GreaterThanOrEqual extends Evaluator { + public static GreaterThanOrEqual getInstance() { + return new GreaterThanOrEqual(); + } + + @Override + public boolean eval() { + if (this.maxVal == null) { + return false; + } + return compare(this.maxVal, this.val, this.type) >= 0; + } + } + + /** + * To evaluate IN expr. + */ + public static class In extends Evaluator { + public static In getInstance() { + return new In(); + } + + private Object[] vals; + + @Override + public boolean eval() { + if (this.minVal == null) { + return false; // values are all null and literalSet cannot contain null. + } + + if (vals.length > IN_PREDICATE_LIMIT) { + // skip evaluating the predicate if the number of values is too big + return true; + } + + vals = Arrays.stream(vals).filter(v -> compare(this.minVal, v, this.type) <= 0).toArray(); + if (vals.length == 0) { // if all values are less than lower bound, rows cannot match. + return false; + } + + vals = Arrays.stream(vals).filter(v -> compare(this.maxVal, v, this.type) >= 0).toArray(); + if (vals.length == 0) { // if all remaining values are greater than upper bound, rows cannot match. + return false; + } + + return true; + } + + public void bindVals(Object... vals) { + this.vals = vals; + } + } + + // component predicate + /** + * To evaluate NOT expr. + */ + public static class Not extends Evaluator { + public static Not getInstance() { + return new Not(); + } + + private Evaluator evaluator; + + @Override + public boolean eval() { + return !this.evaluator.eval(); + } + + public Evaluator bindEvaluator(Evaluator evaluator) { + this.evaluator = evaluator; + return this; + } + } + + /** + * To evaluate AND expr. + */ + public static class And extends Evaluator { + public static And getInstance() { + return new And(); + } + + private Evaluator[] evaluators; + + @Override + public boolean eval() { + for (Evaluator evaluator : evaluators) { + if (!evaluator.eval()) { + return false; + } + } + return true; + } + + public Evaluator bindEvaluator(Evaluator... evaluators) { + this.evaluators = evaluators; + return this; + } + } + + /** + * To evaluate OR expr. + */ + public static class Or extends Evaluator { + public static Or getInstance() { + return new Or(); + } + + private Evaluator[] evaluators; + + @Override + public boolean eval() { + for (Evaluator evaluator : evaluators) { + if (evaluator.eval()) { + return true; + } + } + return false; + } + + public Evaluator bindEvaluator(Evaluator... evaluators) { + this.evaluators = evaluators; + return this; + } + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private static int compare(@NotNull Object val1, @NotNull Object val2, LogicalType logicalType) { + switch (logicalType.getTypeRoot()) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIME_WITHOUT_TIME_ZONE: + case DATE: + return ((Long) val1).compareTo((Long) val2); + case BOOLEAN: + return ((Boolean) val1).compareTo((Boolean) val2); + case TINYINT: + case SMALLINT: + case INTEGER: + return ((Integer) val1).compareTo((Integer) val2); + case FLOAT: + return ((Float) val1).compareTo((Float) val2); + case DOUBLE: + return ((Double) val1).compareTo((Double) val2); + case BINARY: + case VARBINARY: + return compareBytes((byte[]) val1, (byte[]) val2); + case CHAR: + case VARCHAR: + return ((String) val1).compareTo((String) val2); + case DECIMAL: + return ((BigDecimal) val1).compareTo((BigDecimal) val2); + default: + throw new UnsupportedOperationException("Unsupported type: " + logicalType); + } + } + + private static int compareBytes(byte[] v1, byte[] v2) { + int len1 = v1.length; + int len2 = v2.length; + int lim = Math.min(len1, len2); + + int k = 0; + while (k < lim) { + byte c1 = v1[k]; + byte c2 = v2[k]; + if (c1 != c2) { + return c1 - c2; + } + k++; + } + return len1 - len2; + } + + /** + * Returns the IN expression literal values. + */ + private static Object[] getInLiteralVals(List childExprs) { + List vals = new ArrayList<>(); + for (int i = 1; i < childExprs.size(); i++) { + vals.add(ExpressionUtils.getValueFromLiteral((ValueLiteralExpression) childExprs.get(i))); + } + return vals.toArray(); + } + + /** + * Returns the value as Java object at position {@code pos} of row {@code indexRow}. + */ + private static Object getValAsJavaObj(RowData indexRow, int pos, LogicalType colType) { + switch (colType.getTypeRoot()) { + // NOTE: Since we can't rely on Avro's "date", and "timestamp-micros" logical-types, we're + // manually encoding corresponding values as int and long w/in the Column Stats Index and + // here we have to decode those back into corresponding logical representation. + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIME_WITHOUT_TIME_ZONE: + case DATE: + return indexRow.getLong(pos); + // NOTE: All integral types of size less than Int are encoded as Ints in MT + case BOOLEAN: + return indexRow.getBoolean(pos); + case TINYINT: + case SMALLINT: + case INTEGER: + return indexRow.getInt(pos); + case FLOAT: + return indexRow.getFloat(pos); + case DOUBLE: + return indexRow.getDouble(pos); + case BINARY: + case VARBINARY: + return indexRow.getBinary(pos); + case CHAR: + case VARCHAR: + return indexRow.getString(pos).toString(); + case DECIMAL: + DecimalType decimalType = (DecimalType) colType; + return indexRow.getDecimal(pos, decimalType.getPrecision(), decimalType.getScale()).toBigDecimal(); + default: + throw new UnsupportedOperationException("Unsupported type: " + colType); + } + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 930cbfe3d871a..97c63ac2f9458 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -67,7 +67,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { .getCheckpointConfig().getCheckpointTimeout(); conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); - RowType rowType = (RowType) schema.toSourceRowDataType().notNull().getLogicalType(); + RowType rowType = (RowType) schema.toSinkRowDataType().notNull().getLogicalType(); // bulk_insert mode final String writeOperation = this.conf.get(FlinkOptions.OPERATION); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index e76bb29bdf50a..6fd7c6b462bf2 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -20,6 +20,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieTableType; @@ -34,7 +35,6 @@ import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.hadoop.HoodieROTablePathFilter; import org.apache.hudi.source.FileIndex; import org.apache.hudi.source.IncrementalInputSplits; import org.apache.hudi.source.StreamReadMonitoringFunction; @@ -46,13 +46,12 @@ import org.apache.hudi.table.format.mor.MergeOnReadTableState; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.ChangelogModes; +import org.apache.hudi.util.ExpressionUtils; import org.apache.hudi.util.InputFormats; import org.apache.hudi.util.StreamerUtil; import org.apache.avro.Schema; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.io.FileInputFormat; -import org.apache.flink.api.common.io.FilePathFilter; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; @@ -73,7 +72,6 @@ import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter; import org.apache.flink.table.types.DataType; @@ -117,6 +115,7 @@ public class HoodieTableSource implements private final long maxCompactionMemoryInBytes; private final ResolvedSchema schema; + private final RowType tableRowType; private final Path path; private final List partitionKeys; private final String defaultPartName; @@ -125,7 +124,7 @@ public class HoodieTableSource implements private int[] requiredPos; private long limit; - private List filters; + private List filters; private List> requiredPartitions; @@ -147,21 +146,22 @@ public HoodieTableSource( @Nullable List> requiredPartitions, @Nullable int[] requiredPos, @Nullable Long limit, - @Nullable List filters) { + @Nullable List filters) { this.schema = schema; + this.tableRowType = (RowType) schema.toPhysicalRowDataType().notNull().getLogicalType(); this.path = path; this.partitionKeys = partitionKeys; this.defaultPartName = defaultPartName; this.conf = conf; - this.fileIndex = FileIndex.instance(this.path, this.conf); this.requiredPartitions = requiredPartitions; this.requiredPos = requiredPos == null - ? IntStream.range(0, schema.toPhysicalRowDataType().getChildren().size()).toArray() + ? IntStream.range(0, this.tableRowType.getFieldCount()).toArray() : requiredPos; this.limit = limit == null ? NO_LIMIT_CONSTANT : limit; this.filters = filters == null ? Collections.emptyList() : filters; this.hadoopConf = HadoopConfigurations.getHadoopConf(conf); this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf); + this.fileIndex = FileIndex.instance(this.path, this.conf, this.tableRowType); this.maxCompactionMemoryInBytes = StreamerUtil.getMaxCompactionMemoryInBytes(conf); } @@ -181,12 +181,12 @@ public DataStream produceDataStream(StreamExecutionEnvironment execEnv) (TypeInformation) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType()); if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) { StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction( - conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths()); + conf, FilePathUtils.toFlinkPath(path), tableRowType, maxCompactionMemoryInBytes, getRequiredPartitionPaths()); InputFormat inputFormat = getInputFormat(true); OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor")) .setParallelism(1) - .keyBy(inputSplit -> inputSplit.getFileId()) + .keyBy(MergeOnReadInputSplit::getFileId) .transform("split_reader", typeInfo, factory) .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); return new DataStreamSource<>(source); @@ -219,7 +219,8 @@ public String asSummaryString() { @Override public Result applyFilters(List filters) { - this.filters = new ArrayList<>(filters); + this.filters = filters.stream().filter(ExpressionUtils::isSimpleCallExpression).collect(Collectors.toList()); + this.fileIndex.setFilters(this.filters); // refuse all the filters now return SupportsFilterPushDown.Result.of(Collections.emptyList(), new ArrayList<>(filters)); } @@ -262,13 +263,6 @@ private DataType getProducedDataType() { .bridgedTo(RowData.class); } - private List> getOrFetchPartitions() { - if (requiredPartitions == null) { - requiredPartitions = listPartitions().orElse(Collections.emptyList()); - } - return requiredPartitions; - } - private String getSourceOperatorName(String operatorName) { String[] schemaFieldNames = this.schema.getColumnNames().toArray(new String[0]); List fields = Arrays.stream(this.requiredPos) @@ -366,7 +360,9 @@ private List buildFileIndex() { return baseFileOnlyInputFormat(); case FlinkOptions.QUERY_TYPE_INCREMENTAL: IncrementalInputSplits incrementalInputSplits = IncrementalInputSplits.builder() - .conf(conf).path(FilePathUtils.toFlinkPath(path)) + .conf(conf) + .path(FilePathUtils.toFlinkPath(path)) + .rowType(this.tableRowType) .maxCompactionMemoryInBytes(maxCompactionMemoryInBytes) .requiredPartitions(getRequiredPartitionPaths()).build(); final IncrementalInputSplits.Result result = incrementalInputSplits.inputSplits(metaClient, hadoopConf); @@ -439,11 +435,18 @@ private MergeOnReadInputFormat mergeOnReadInputFormat( } private InputFormat baseFileOnlyInputFormat() { - final Path[] paths = getReadPaths(); - if (paths.length == 0) { + final FileStatus[] fileStatuses = getReadFiles(); + if (fileStatuses.length == 0) { return InputFormats.EMPTY_INPUT_FORMAT; } - FileInputFormat format = new CopyOnWriteInputFormat( + + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, + metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(), fileStatuses); + Path[] paths = fsView.getLatestBaseFiles() + .map(HoodieBaseFile::getFileStatus) + .map(FileStatus::getPath).toArray(Path[]::new); + + return new CopyOnWriteInputFormat( FilePathUtils.toFlinkPaths(paths), this.schema.getColumnNames().toArray(new String[0]), this.schema.getColumnDataTypes().toArray(new DataType[0]), @@ -453,12 +456,10 @@ private MergeOnReadInputFormat mergeOnReadInputFormat( getParquetConf(this.conf, this.hadoopConf), this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE) ); - format.setFilesFilter(new LatestFileFilter(this.hadoopConf)); - return format; } private Schema inferSchemaFromDdl() { - Schema schema = AvroSchemaConverter.convertToSchema(this.schema.toPhysicalRowDataType().getLogicalType()); + Schema schema = AvroSchemaConverter.convertToSchema(this.tableRowType); return HoodieAvroUtils.addMetadataFields(schema, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)); } @@ -498,23 +499,13 @@ public void reset() { * Get the reader paths with partition path expanded. */ @VisibleForTesting - public Path[] getReadPaths() { - return partitionKeys.isEmpty() - ? new Path[] {path} - : FilePathUtils.partitionPath2ReadPath(path, partitionKeys, getOrFetchPartitions(), - conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)); - } - - private static class LatestFileFilter extends FilePathFilter { - private final HoodieROTablePathFilter hoodieFilter; - - public LatestFileFilter(org.apache.hadoop.conf.Configuration hadoopConf) { - this.hoodieFilter = new HoodieROTablePathFilter(hadoopConf); - } - - @Override - public boolean filterPath(org.apache.flink.core.fs.Path filePath) { - return !this.hoodieFilter.accept(new Path(filePath.toUri())); + public FileStatus[] getReadFiles() { + Set requiredPartitionPaths = getRequiredPartitionPaths(); + fileIndex.setPartitionPaths(requiredPartitionPaths); + List relPartitionPaths = fileIndex.getOrBuildPartitionPaths(); + if (relPartitionPaths.size() == 0) { + return new FileStatus[0]; } + return fileIndex.getFilesInPartitions(); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java index 99efa0b36a7ae..d03c5aac272e8 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java @@ -323,6 +323,10 @@ public static List> getPartitions( public static LinkedHashMap validateAndReorderPartitions( Map partitionKVs, List partitionKeys) { + if (partitionKeys.size() == 0) { + // in case the partition fields are not in schema + return new LinkedHashMap<>(partitionKVs); + } LinkedHashMap map = new LinkedHashMap<>(); for (String k : partitionKeys) { if (!partitionKVs.containsKey(k)) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java index 73c7341780b4d..11324038ab943 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java @@ -39,6 +39,7 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; import java.util.List; +import java.util.stream.Collectors; /** * Converts an Avro schema into Flink's type information. It uses {@link org.apache.flink.api.java.typeutils.RowTypeInfo} for @@ -96,9 +97,24 @@ public static DataType convertToDataType(Schema schema) { actualSchema = schema.getTypes().get(0); nullable = false; } else { + List nonNullTypes = schema.getTypes().stream() + .filter(s -> s.getType() != Schema.Type.NULL) + .collect(Collectors.toList()); + nullable = schema.getTypes().size() > nonNullTypes.size(); + + // use Kryo for serialization + DataType rawDataType = new AtomicDataType( + new TypeInformationRawType<>(false, Types.GENERIC(Object.class))) + .notNull(); + + if (recordTypesOfSameNumFields(nonNullTypes)) { + DataType converted = DataTypes.ROW( + DataTypes.FIELD("wrapper", rawDataType)) + .notNull(); + return nullable ? converted.nullable() : converted; + } // use Kryo for serialization - return new AtomicDataType( - new TypeInformationRawType<>(false, Types.GENERIC(Object.class))); + return nullable ? rawDataType.nullable() : rawDataType; } DataType converted = convertToDataType(actualSchema); return nullable ? converted.nullable() : converted; @@ -155,6 +171,20 @@ public static DataType convertToDataType(Schema schema) { } } + /** + * Returns true if all the types are RECORD type with same number of fields. + */ + private static boolean recordTypesOfSameNumFields(List types) { + if (types == null || types.size() == 0) { + return false; + } + if (types.stream().anyMatch(s -> s.getType() != Schema.Type.RECORD)) { + return false; + } + int numFields = types.get(0).getFields().size(); + return types.stream().allMatch(s -> s.getFields().size() == numFields); + } + /** * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. * diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java index 8855f87bd230f..558bb41f90490 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java @@ -106,7 +106,7 @@ private static AvroToRowDataConverter createNullableConverter(LogicalType type) /** * Creates a runtime converter which assuming input object is not null. */ - private static AvroToRowDataConverter createConverter(LogicalType type) { + public static AvroToRowDataConverter createConverter(LogicalType type) { switch (type.getTypeRoot()) { case NULL: return avroObject -> null; @@ -121,6 +121,7 @@ private static AvroToRowDataConverter createConverter(LogicalType type) { case INTERVAL_DAY_TIME: // long case FLOAT: // float case DOUBLE: // double + case RAW: return avroObject -> avroObject; case DATE: return AvroToRowDataConverters::convertToDate; @@ -143,7 +144,6 @@ private static AvroToRowDataConverter createConverter(LogicalType type) { case MAP: case MULTISET: return createMapConverter(type); - case RAW: default: throw new UnsupportedOperationException("Unsupported type: " + type); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java index eb188ad554d1a..60d457370a9f1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java @@ -23,8 +23,11 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.TimestampType; +import java.util.Arrays; + /** * Utilities for {@link org.apache.flink.table.types.DataType}. */ @@ -58,4 +61,12 @@ public static boolean isDateType(DataType type) { public static boolean isDatetimeType(DataType type) { return isTimestampType(type) || isDateType(type); } + + /** + * Projects the row fields with given names. + */ + public static RowType.RowField[] projectRowFields(RowType rowType, String[] names) { + int [] fieldIndices = Arrays.stream(names).mapToInt(rowType::getFieldIndex).toArray(); + return Arrays.stream(fieldIndices).mapToObj(i -> rowType.getFields().get(i)).toArray(RowType.RowField[]::new); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ExpressionUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ExpressionUtils.java new file mode 100644 index 0000000000000..20473acdcda6b --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ExpressionUtils.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.logical.LogicalType; + +import javax.annotation.Nullable; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoField; +import java.util.Arrays; +import java.util.List; + +/** + * Utilities for expression resolving. + */ +public class ExpressionUtils { + + /** + * Collect the referenced columns with given expressions, + * only simple call expression is supported. + */ + public static String[] referencedColumns(List exprs) { + return exprs.stream() + .map(ExpressionUtils::getReferencedColumns) + .filter(columns -> columns.length > 0) + .flatMap(Arrays::stream) + .distinct() // deduplication + .toArray(String[]::new); + } + + /** + * Returns whether the given expression is simple call expression: + * a binary call with one operand as field reference and another operand + * as literal. + */ + public static boolean isSimpleCallExpression(Expression expr) { + if (!(expr instanceof CallExpression)) { + return false; + } + CallExpression callExpression = (CallExpression) expr; + FunctionDefinition funcDef = callExpression.getFunctionDefinition(); + // simple call list: + // NOT AND OR IN EQUALS NOT_EQUALS IS_NULL IS_NOT_NULL LESS_THAN GREATER_THAN + // LESS_THAN_OR_EQUAL GREATER_THAN_OR_EQUAL + + if (funcDef == BuiltInFunctionDefinitions.NOT + || funcDef == BuiltInFunctionDefinitions.AND + || funcDef == BuiltInFunctionDefinitions.OR) { + return callExpression.getChildren().stream() + .allMatch(ExpressionUtils::isSimpleCallExpression); + } + if (!(funcDef == BuiltInFunctionDefinitions.IN + || funcDef == BuiltInFunctionDefinitions.EQUALS + || funcDef == BuiltInFunctionDefinitions.NOT_EQUALS + || funcDef == BuiltInFunctionDefinitions.IS_NULL + || funcDef == BuiltInFunctionDefinitions.IS_NOT_NULL + || funcDef == BuiltInFunctionDefinitions.LESS_THAN + || funcDef == BuiltInFunctionDefinitions.GREATER_THAN + || funcDef == BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL + || funcDef == BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL)) { + return false; + } + // handle IN + if (funcDef == BuiltInFunctionDefinitions.IN) { + // In expression RHS operands are always literals + return true; + } + // handle unary operator + if (funcDef == BuiltInFunctionDefinitions.IS_NULL + || funcDef == BuiltInFunctionDefinitions.IS_NOT_NULL) { + return callExpression.getChildren().stream() + .allMatch(e -> e instanceof FieldReferenceExpression); + } + // handle binary operator + return isFieldReferenceAndLiteral(callExpression.getChildren()); + } + + private static boolean isFieldReferenceAndLiteral(List exprs) { + if (exprs.size() != 2) { + return false; + } + final Expression expr0 = exprs.get(0); + final Expression expr1 = exprs.get(1); + return expr0 instanceof FieldReferenceExpression && expr1 instanceof ValueLiteralExpression + || expr0 instanceof ValueLiteralExpression && expr1 instanceof FieldReferenceExpression; + } + + private static String[] getReferencedColumns(ResolvedExpression expression) { + CallExpression callExpr = (CallExpression) expression; + FunctionDefinition funcDef = callExpr.getFunctionDefinition(); + if (funcDef == BuiltInFunctionDefinitions.NOT + || funcDef == BuiltInFunctionDefinitions.AND + || funcDef == BuiltInFunctionDefinitions.OR) { + return callExpr.getChildren().stream() + .map(e -> getReferencedColumns((ResolvedExpression) e)) + .flatMap(Arrays::stream) + .toArray(String[]::new); + } + + return expression.getChildren().stream() + .filter(expr -> expr instanceof FieldReferenceExpression) + .map(expr -> ((FieldReferenceExpression) expr).getName()) + .toArray(String[]::new); + } + + /** + * Returns the value with given value literal expression. + * + *

Returns null if the value can not parse as the output data type correctly, + * should call {@code ValueLiteralExpression.isNull} first to decide whether + * the literal is NULL. + */ + @Nullable + public static Object getValueFromLiteral(ValueLiteralExpression expr) { + LogicalType logicalType = expr.getOutputDataType().getLogicalType(); + switch (logicalType.getTypeRoot()) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + return expr.getValueAs(LocalDateTime.class) + .map(ldt -> ldt.toInstant(ZoneOffset.UTC).toEpochMilli()) + .orElse(null); + case TIME_WITHOUT_TIME_ZONE: + return expr.getValueAs(LocalTime.class) + .map(lt -> lt.get(ChronoField.MILLI_OF_DAY)) + .orElse(null); + case DATE: + return expr.getValueAs(LocalDate.class) + .map(LocalDate::toEpochDay) + .orElse(null); + // NOTE: All integral types of size less than Int are encoded as Ints in MT + case BOOLEAN: + return expr.getValueAs(Boolean.class).orElse(null); + case TINYINT: + case SMALLINT: + case INTEGER: + return expr.getValueAs(Integer.class).orElse(null); + case FLOAT: + return expr.getValueAs(Float.class).orElse(null); + case DOUBLE: + return expr.getValueAs(Double.class).orElse(null); + case BINARY: + case VARBINARY: + return expr.getValueAs(byte[].class).orElse(null); + case CHAR: + case VARCHAR: + return expr.getValueAs(String.class).orElse(null); + case DECIMAL: + return expr.getValueAs(BigDecimal.class).orElse(null); + default: + throw new UnsupportedOperationException("Unsupported type: " + logicalType); + } + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java index 048f286fa0eca..8076d982b9919 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java @@ -26,6 +26,8 @@ import org.apache.flink.table.types.logical.RowType; import java.io.Serializable; +import java.util.Arrays; +import java.util.List; /** * Utilities to project the row data with given positions. @@ -51,6 +53,12 @@ public static RowDataProjection instance(RowType rowType, int[] positions) { return new RowDataProjection(types, positions); } + public static RowDataProjection instanceV2(RowType rowType, int[] positions) { + List fieldTypes = rowType.getChildren(); + final LogicalType[] types = Arrays.stream(positions).mapToObj(fieldTypes::get).toArray(LogicalType[]::new); + return new RowDataProjection(types, positions); + } + public static RowDataProjection instance(LogicalType[] types, int[] positions) { return new RowDataProjection(types, positions); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java index 91662e47077c7..a5e9f31145618 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ViewStorageProperties.java @@ -18,12 +18,12 @@ package org.apache.hudi.util; -import org.apache.flink.configuration.Configuration; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.exception.HoodieIOException; +import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java index 7bfaade59ea26..5226f55bffe9e 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java @@ -18,11 +18,10 @@ package org.apache.hudi.sink.utils; -import org.apache.flink.configuration.Configuration; - import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.flink.configuration.Configuration; import org.junit.jupiter.api.Test; import java.lang.reflect.Method; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java index 334df5961314d..00bf17cd13951 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java @@ -58,7 +58,7 @@ void testFileListingUsingMetadata(boolean hiveStylePartitioning) throws Exceptio conf.setBoolean(FlinkOptions.METADATA_ENABLED, true); conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning); TestData.writeData(TestData.DATA_SET_INSERT, conf); - FileIndex fileIndex = FileIndex.instance(new Path(tempFile.getAbsolutePath()), conf); + FileIndex fileIndex = FileIndex.instance(new Path(tempFile.getAbsolutePath()), conf, TestConfigurations.ROW_TYPE); List partitionKeys = Collections.singletonList("partition"); List> partitions = fileIndex.getPartitions(partitionKeys, "default", hiveStylePartitioning); assertTrue(partitions.stream().allMatch(m -> m.size() == 1)); @@ -79,7 +79,7 @@ void testFileListingUsingMetadataNonPartitionedTable() throws Exception { conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, NonpartitionedAvroKeyGenerator.class.getName()); conf.setBoolean(FlinkOptions.METADATA_ENABLED, true); TestData.writeData(TestData.DATA_SET_INSERT, conf); - FileIndex fileIndex = FileIndex.instance(new Path(tempFile.getAbsolutePath()), conf); + FileIndex fileIndex = FileIndex.instance(new Path(tempFile.getAbsolutePath()), conf, TestConfigurations.ROW_TYPE); List partitionKeys = Collections.singletonList(""); List> partitions = fileIndex.getPartitions(partitionKeys, "default", false); assertThat(partitions.size(), is(0)); @@ -94,7 +94,7 @@ void testFileListingUsingMetadataNonPartitionedTable() throws Exception { void testFileListingEmptyTable(boolean enableMetadata) { Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.setBoolean(FlinkOptions.METADATA_ENABLED, enableMetadata); - FileIndex fileIndex = FileIndex.instance(new Path(tempFile.getAbsolutePath()), conf); + FileIndex fileIndex = FileIndex.instance(new Path(tempFile.getAbsolutePath()), conf, TestConfigurations.ROW_TYPE); List partitionKeys = Collections.singletonList("partition"); List> partitions = fileIndex.getPartitions(partitionKeys, "default", false); assertThat(partitions.size(), is(0)); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestColumnStatsIndices.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestColumnStatsIndices.java new file mode 100644 index 0000000000000..837f419248636 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestColumnStatsIndices.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source.stats; + +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.utils.TestConfigurations; +import org.apache.hudi.utils.TestData; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Test cases for {@link ColumnStatsIndices}. + */ +public class TestColumnStatsIndices { + @TempDir + File tempFile; + + @Test + void testTransposeColumnStatsIndex() throws Exception { + final String path = tempFile.getAbsolutePath(); + Configuration conf = TestConfigurations.getDefaultConf(path); + conf.setBoolean(FlinkOptions.METADATA_ENABLED, true); + conf.setBoolean(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true); + conf.setString("hoodie.metadata.index.column.stats.enable", "true"); + + HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder() + .enable(true) + .withMetadataIndexColumnStats(true) + .build(); + TestData.writeData(TestData.DATA_SET_INSERT, conf); + + // explicit query columns + String[] queryColumns1 = {"uuid", "age"}; + List indexRows1 = ColumnStatsIndices.readColumnStatsIndex(path, metadataConfig, queryColumns1); + Pair, String[]> transposedIndexTable1 = ColumnStatsIndices + .transposeColumnStatsIndex(indexRows1, queryColumns1, TestConfigurations.ROW_TYPE); + assertThat("The schema columns should sort by natural order", + Arrays.toString(transposedIndexTable1.getRight()), is("[age, uuid]")); + List transposed1 = filterOutFileNames(transposedIndexTable1.getLeft()); + assertThat(transposed1.size(), is(4)); + final String expected = "[" + + "+I(2,18,20,0,id5,id6,0), " + + "+I(2,23,33,0,id1,id2,0), " + + "+I(2,31,53,0,id3,id4,0), " + + "+I(2,44,56,0,id7,id8,0)]"; + assertThat(transposed1.toString(), is(expected)); + + // no query columns, only for tests + assertThrows(IllegalArgumentException.class, + () -> ColumnStatsIndices.readColumnStatsIndex(path, metadataConfig, new String[0])); + } + + private static List filterOutFileNames(List indexRows) { + return indexRows.stream().map(row -> { + GenericRowData gr = (GenericRowData) row; + GenericRowData converted = new GenericRowData(gr.getArity() - 1); + for (int i = 1; i < gr.getArity(); i++) { + converted.setField(i - 1, gr.getField(i)); + } + return converted; + }) + // sort by age min values + .sorted(Comparator.comparingInt(r -> r.getInt(1))) + .collect(Collectors.toList()); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestExpressionEvaluator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestExpressionEvaluator.java new file mode 100644 index 0000000000000..0c7012e2ee475 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestExpressionEvaluator.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source.stats; + +import org.apache.hudi.utils.TestData; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test cases for {@link ExpressionEvaluator}. + */ +public class TestExpressionEvaluator { + private static final DataType ROW_DATA_TYPE = DataTypes.ROW( + DataTypes.FIELD("f_tinyint", DataTypes.TINYINT()), + DataTypes.FIELD("f_smallint", DataTypes.SMALLINT()), + DataTypes.FIELD("f_int", DataTypes.INT()), + DataTypes.FIELD("f_long", DataTypes.BIGINT()), + DataTypes.FIELD("f_float", DataTypes.FLOAT()), + DataTypes.FIELD("f_double", DataTypes.DOUBLE()), + DataTypes.FIELD("f_boolean", DataTypes.BOOLEAN()), + DataTypes.FIELD("f_decimal", DataTypes.DECIMAL(10, 2)), + DataTypes.FIELD("f_bytes", DataTypes.VARBINARY(10)), + DataTypes.FIELD("f_string", DataTypes.VARCHAR(10)), + DataTypes.FIELD("f_time", DataTypes.TIME(3)), + DataTypes.FIELD("f_date", DataTypes.DATE()), + DataTypes.FIELD("f_timestamp", DataTypes.TIMESTAMP(3)) + ).notNull(); + private static final DataType INDEX_ROW_DATA_TYPE = DataTypes.ROW( + DataTypes.FIELD("file_name", DataTypes.STRING()), + DataTypes.FIELD("value_cnt", DataTypes.BIGINT()), + DataTypes.FIELD("f_int_min", DataTypes.INT()), + DataTypes.FIELD("f_int_max", DataTypes.INT()), + DataTypes.FIELD("f_int_null_cnt", DataTypes.BIGINT()), + DataTypes.FIELD("f_string_min", DataTypes.VARCHAR(10)), + DataTypes.FIELD("f_string_max", DataTypes.VARCHAR(10)), + DataTypes.FIELD("f_string_null_cnt", DataTypes.BIGINT()), + DataTypes.FIELD("f_timestamp_min", DataTypes.TIMESTAMP(3)), + DataTypes.FIELD("f_timestamp_max", DataTypes.TIMESTAMP(3)), + DataTypes.FIELD("f_timestamp_null_cnt", DataTypes.BIGINT()) + ).notNull(); + + private static final RowType INDEX_ROW_TYPE = (RowType) INDEX_ROW_DATA_TYPE.getLogicalType(); + + @Test + void testEqualTo() { + ExpressionEvaluator.EqualTo equalTo = ExpressionEvaluator.EqualTo.getInstance(); + FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", DataTypes.INT(), 2, 2); + ValueLiteralExpression vExpr = new ValueLiteralExpression(12); + + RowData indexRow1 = intIndexRow(11, 13); + equalTo.bindFieldReference(rExpr) + .bindVal(vExpr) + .bindColStats(indexRow1, queryFields(2), rExpr); + assertTrue(equalTo.eval(), "11 < 12 < 13"); + + RowData indexRow2 = intIndexRow(12, 13); + equalTo.bindColStats(indexRow2, queryFields(2), rExpr); + assertTrue(equalTo.eval(), "12 <= 12 < 13"); + + RowData indexRow3 = intIndexRow(11, 12); + equalTo.bindColStats(indexRow3, queryFields(2), rExpr); + assertTrue(equalTo.eval(), "11 < 12 <= 12"); + + RowData indexRow4 = intIndexRow(10, 11); + equalTo.bindColStats(indexRow4, queryFields(2), rExpr); + assertFalse(equalTo.eval(), "11 < 12"); + + RowData indexRow5 = intIndexRow(13, 14); + equalTo.bindColStats(indexRow5, queryFields(2), rExpr); + assertFalse(equalTo.eval(), "12 < 13"); + + RowData indexRow6 = intIndexRow(null, null); + equalTo.bindColStats(indexRow6, queryFields(2), rExpr); + assertFalse(equalTo.eval(), "12 <> null"); + + equalTo.bindVal(new ValueLiteralExpression(null, DataTypes.INT())); + assertFalse(equalTo.eval(), "null <> null"); + } + + @Test + void testNotEqualTo() { + ExpressionEvaluator.NotEqualTo notEqualTo = ExpressionEvaluator.NotEqualTo.getInstance(); + FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", DataTypes.INT(), 2, 2); + ValueLiteralExpression vExpr = new ValueLiteralExpression(12); + + RowData indexRow1 = intIndexRow(11, 13); + notEqualTo.bindFieldReference(rExpr) + .bindVal(vExpr) + .bindColStats(indexRow1, queryFields(2), rExpr); + assertTrue(notEqualTo.eval(), "11 <> 12 && 12 <> 13"); + + RowData indexRow2 = intIndexRow(12, 13); + notEqualTo.bindColStats(indexRow2, queryFields(2), rExpr); + assertTrue(notEqualTo.eval(), "12 <> 13"); + + RowData indexRow3 = intIndexRow(11, 12); + notEqualTo.bindColStats(indexRow3, queryFields(2), rExpr); + assertTrue(notEqualTo.eval(), "11 <> 12"); + + RowData indexRow4 = intIndexRow(10, 11); + notEqualTo.bindColStats(indexRow4, queryFields(2), rExpr); + assertTrue(notEqualTo.eval(), "10 <> 12 and 11 < 12"); + + RowData indexRow5 = intIndexRow(13, 14); + notEqualTo.bindColStats(indexRow5, queryFields(2), rExpr); + assertTrue(notEqualTo.eval(), "12 <> 13 and 12 <> 14"); + + RowData indexRow6 = intIndexRow(null, null); + notEqualTo.bindColStats(indexRow6, queryFields(2), rExpr); + assertTrue(notEqualTo.eval(), "12 <> null"); + + notEqualTo.bindVal(new ValueLiteralExpression(null, DataTypes.INT())); + assertTrue(notEqualTo.eval(), "null <> null"); + } + + @Test + void testIsNull() { + ExpressionEvaluator.IsNull isNull = ExpressionEvaluator.IsNull.getInstance(); + FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", DataTypes.INT(), 2, 2); + + RowData indexRow1 = intIndexRow(11, 13); + isNull.bindFieldReference(rExpr) + .bindColStats(indexRow1, queryFields(2), rExpr); + assertTrue(isNull.eval(), "2 nulls"); + + RowData indexRow2 = intIndexRow(12, 13, 0L); + isNull.bindColStats(indexRow2, queryFields(2), rExpr); + assertFalse(isNull.eval(), "0 nulls"); + } + + @Test + void testIsNotNull() { + ExpressionEvaluator.IsNotNull isNotNull = ExpressionEvaluator.IsNotNull.getInstance(); + FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", DataTypes.INT(), 2, 2); + + RowData indexRow1 = intIndexRow(11, 13); + isNotNull.bindFieldReference(rExpr) + .bindColStats(indexRow1, queryFields(2), rExpr); + assertTrue(isNotNull.eval(), "min 11 is not null"); + + RowData indexRow2 = intIndexRow(null, null, 0L); + isNotNull.bindColStats(indexRow2, queryFields(2), rExpr); + assertTrue(isNotNull.eval(), "min is null and 0 nulls"); + } + + @Test + void testLessThan() { + ExpressionEvaluator.LessThan lessThan = ExpressionEvaluator.LessThan.getInstance(); + FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", DataTypes.INT(), 2, 2); + ValueLiteralExpression vExpr = new ValueLiteralExpression(12); + + RowData indexRow1 = intIndexRow(11, 13); + lessThan.bindFieldReference(rExpr) + .bindVal(vExpr) + .bindColStats(indexRow1, queryFields(2), rExpr); + assertTrue(lessThan.eval(), "12 < 13"); + + RowData indexRow2 = intIndexRow(12, 13); + lessThan.bindColStats(indexRow2, queryFields(2), rExpr); + assertFalse(lessThan.eval(), "min 12 = 12"); + + RowData indexRow3 = intIndexRow(11, 12); + lessThan.bindColStats(indexRow3, queryFields(2), rExpr); + assertTrue(lessThan.eval(), "11 < 12"); + + RowData indexRow4 = intIndexRow(10, 11); + lessThan.bindColStats(indexRow4, queryFields(2), rExpr); + assertTrue(lessThan.eval(), "11 < 12"); + + RowData indexRow5 = intIndexRow(13, 14); + lessThan.bindColStats(indexRow5, queryFields(2), rExpr); + assertFalse(lessThan.eval(), "12 < min 13"); + + RowData indexRow6 = intIndexRow(null, null); + lessThan.bindColStats(indexRow6, queryFields(2), rExpr); + assertFalse(lessThan.eval(), "12 <> null"); + + lessThan.bindVal(new ValueLiteralExpression(null, DataTypes.INT())); + assertFalse(lessThan.eval(), "null <> null"); + } + + @Test + void testGreaterThan() { + ExpressionEvaluator.GreaterThan greaterThan = ExpressionEvaluator.GreaterThan.getInstance(); + FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", DataTypes.INT(), 2, 2); + ValueLiteralExpression vExpr = new ValueLiteralExpression(12); + + RowData indexRow1 = intIndexRow(11, 13); + greaterThan.bindFieldReference(rExpr) + .bindVal(vExpr) + .bindColStats(indexRow1, queryFields(2), rExpr); + assertTrue(greaterThan.eval(), "12 < 13"); + + RowData indexRow2 = intIndexRow(12, 13); + greaterThan.bindColStats(indexRow2, queryFields(2), rExpr); + assertTrue(greaterThan.eval(), "12 < 13"); + + RowData indexRow3 = intIndexRow(11, 12); + greaterThan.bindColStats(indexRow3, queryFields(2), rExpr); + assertFalse(greaterThan.eval(), "max 12 = 12"); + + RowData indexRow4 = intIndexRow(10, 11); + greaterThan.bindColStats(indexRow4, queryFields(2), rExpr); + assertFalse(greaterThan.eval(), "max 11 < 12"); + + RowData indexRow5 = intIndexRow(13, 14); + greaterThan.bindColStats(indexRow5, queryFields(2), rExpr); + assertTrue(greaterThan.eval(), "12 < 13"); + + RowData indexRow6 = intIndexRow(null, null); + greaterThan.bindColStats(indexRow6, queryFields(2), rExpr); + assertFalse(greaterThan.eval(), "12 <> null"); + + greaterThan.bindVal(new ValueLiteralExpression(null, DataTypes.INT())); + assertFalse(greaterThan.eval(), "null <> null"); + } + + @Test + void testLessThanOrEqual() { + ExpressionEvaluator.LessThanOrEqual lessThanOrEqual = ExpressionEvaluator.LessThanOrEqual.getInstance(); + FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", DataTypes.INT(), 2, 2); + ValueLiteralExpression vExpr = new ValueLiteralExpression(12); + + RowData indexRow1 = intIndexRow(11, 13); + lessThanOrEqual.bindFieldReference(rExpr) + .bindVal(vExpr) + .bindColStats(indexRow1, queryFields(2), rExpr); + assertTrue(lessThanOrEqual.eval(), "11 < 12"); + + RowData indexRow2 = intIndexRow(12, 13); + lessThanOrEqual.bindColStats(indexRow2, queryFields(2), rExpr); + assertTrue(lessThanOrEqual.eval(), "min 12 = 12"); + + RowData indexRow3 = intIndexRow(11, 12); + lessThanOrEqual.bindColStats(indexRow3, queryFields(2), rExpr); + assertTrue(lessThanOrEqual.eval(), "max 12 = 12"); + + RowData indexRow4 = intIndexRow(10, 11); + lessThanOrEqual.bindColStats(indexRow4, queryFields(2), rExpr); + assertTrue(lessThanOrEqual.eval(), "max 11 < 12"); + + RowData indexRow5 = intIndexRow(13, 14); + lessThanOrEqual.bindColStats(indexRow5, queryFields(2), rExpr); + assertFalse(lessThanOrEqual.eval(), "12 < 13"); + + RowData indexRow6 = intIndexRow(null, null); + lessThanOrEqual.bindColStats(indexRow6, queryFields(2), rExpr); + assertFalse(lessThanOrEqual.eval(), "12 <> null"); + + lessThanOrEqual.bindVal(new ValueLiteralExpression(null, DataTypes.INT())); + assertFalse(lessThanOrEqual.eval(), "null <> null"); + } + + @Test + void testGreaterThanOrEqual() { + ExpressionEvaluator.GreaterThanOrEqual greaterThanOrEqual = ExpressionEvaluator.GreaterThanOrEqual.getInstance(); + FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", DataTypes.INT(), 2, 2); + ValueLiteralExpression vExpr = new ValueLiteralExpression(12); + + RowData indexRow1 = intIndexRow(11, 13); + greaterThanOrEqual.bindFieldReference(rExpr) + .bindVal(vExpr) + .bindColStats(indexRow1, queryFields(2), rExpr); + assertTrue(greaterThanOrEqual.eval(), "12 < 13"); + + RowData indexRow2 = intIndexRow(12, 13); + greaterThanOrEqual.bindColStats(indexRow2, queryFields(2), rExpr); + assertTrue(greaterThanOrEqual.eval(), "min 12 = 12"); + + RowData indexRow3 = intIndexRow(11, 12); + greaterThanOrEqual.bindColStats(indexRow3, queryFields(2), rExpr); + assertTrue(greaterThanOrEqual.eval(), "max 12 = 12"); + + RowData indexRow4 = intIndexRow(10, 11); + greaterThanOrEqual.bindColStats(indexRow4, queryFields(2), rExpr); + assertFalse(greaterThanOrEqual.eval(), "max 11 < 12"); + + RowData indexRow5 = intIndexRow(13, 14); + greaterThanOrEqual.bindColStats(indexRow5, queryFields(2), rExpr); + assertTrue(greaterThanOrEqual.eval(), "12 < 13"); + + RowData indexRow6 = intIndexRow(null, null); + greaterThanOrEqual.bindColStats(indexRow6, queryFields(2), rExpr); + assertFalse(greaterThanOrEqual.eval(), "12 <> null"); + + greaterThanOrEqual.bindVal(new ValueLiteralExpression(null, DataTypes.INT())); + assertFalse(greaterThanOrEqual.eval(), "null <> null"); + } + + @Test + void testIn() { + ExpressionEvaluator.In in = ExpressionEvaluator.In.getInstance(); + FieldReferenceExpression rExpr = new FieldReferenceExpression("f_int", DataTypes.INT(), 2, 2); + + RowData indexRow1 = intIndexRow(11, 13); + in.bindFieldReference(rExpr) + .bindColStats(indexRow1, queryFields(2), rExpr); + in.bindVals(12); + assertTrue(in.eval(), "11 < 12 < 13"); + + RowData indexRow2 = intIndexRow(12, 13); + in.bindColStats(indexRow2, queryFields(2), rExpr); + assertTrue(in.eval(), "min 12 = 12"); + + RowData indexRow3 = intIndexRow(11, 12); + in.bindColStats(indexRow3, queryFields(2), rExpr); + assertTrue(in.eval(), "max 12 = 12"); + + RowData indexRow4 = intIndexRow(10, 11); + in.bindColStats(indexRow4, queryFields(2), rExpr); + assertFalse(in.eval(), "max 11 < 12"); + + RowData indexRow5 = intIndexRow(13, 14); + in.bindColStats(indexRow5, queryFields(2), rExpr); + assertFalse(in.eval(), "12 < 13"); + + RowData indexRow6 = intIndexRow(null, null); + in.bindColStats(indexRow6, queryFields(2), rExpr); + assertFalse(in.eval(), "12 <> null"); + + in.bindVals((Object) null); + assertFalse(in.eval(), "null <> null"); + } + + private static RowData intIndexRow(Integer minVal, Integer maxVal) { + return intIndexRow(minVal, maxVal, 2L); + } + + private static RowData intIndexRow(Integer minVal, Integer maxVal, Long nullCnt) { + return indexRow(StringData.fromString("f1"), 100L, + minVal, maxVal, nullCnt, + StringData.fromString("1"), StringData.fromString("100"), 5L, + TimestampData.fromEpochMillis(1), TimestampData.fromEpochMillis(100), 3L); + } + + private static RowData indexRow(Object... fields) { + return TestData.insertRow(INDEX_ROW_TYPE, fields); + } + + private static RowType.RowField[] queryFields(int... pos) { + List fields = ((RowType) ROW_DATA_TYPE.getLogicalType()).getFields(); + return Arrays.stream(pos).mapToObj(fields::get).toArray(RowType.RowField[]::new); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 0c423df6b7bdb..c1a24a270128d 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -1256,6 +1256,37 @@ void testBuiltinFunctionWithCatalog(String operation) { assertRowsEquals(partitionResult, "[+I[1, 2022-02-02]]"); } + @Test + void testWriteAndReadWithDataSkipping() { + TableEnvironment tableEnv = batchTableEnv; + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.METADATA_ENABLED, true) + .option("hoodie.metadata.index.column.stats.enable", true) + .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true) + .end(); + tableEnv.executeSql(hoodieTableDDL); + + execInsertSql(tableEnv, TestSQL.INSERT_T1); + + List result1 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT); + // apply filters + List result2 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1 where uuid > 'id5' and age > 20").execute().collect()); + assertRowsEquals(result2, "[" + + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], " + + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); + // filter by timestamp + List result3 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1 where ts > TIMESTAMP '1970-01-01 00:00:05'").execute().collect()); + assertRowsEquals(result3, "[" + + "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], " + + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], " + + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java index 8ee18a9601b2f..924ce929b0307 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.data.RowData; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.ThrowingSupplier; @@ -38,7 +39,6 @@ import java.io.File; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; @@ -76,22 +76,18 @@ void testGetReadPaths() throws Exception { Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")), "default-par", conf); - Path[] paths = tableSource.getReadPaths(); - assertNotNull(paths); - String[] names = Arrays.stream(paths).map(Path::getName) - .sorted(Comparator.naturalOrder()).toArray(String[]::new); - assertThat(Arrays.toString(names), is("[par1, par2, par3, par4]")); + FileStatus[] fileStatuses = tableSource.getReadFiles(); + assertNotNull(fileStatuses); + assertThat(fileStatuses.length, is(4)); // apply partition pruning Map partitions = new HashMap<>(); partitions.put("partition", "par1"); tableSource.applyPartitions(Collections.singletonList(partitions)); - Path[] paths2 = tableSource.getReadPaths(); - assertNotNull(paths2); - String[] names2 = Arrays.stream(paths2).map(Path::getName) - .sorted(Comparator.naturalOrder()).toArray(String[]::new); - assertThat(Arrays.toString(names2), is("[par1]")); + FileStatus[] fileStatuses2 = tableSource.getReadFiles(); + assertNotNull(fileStatuses2); + assertThat(fileStatuses2.length, is(1)); } @Test diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java new file mode 100644 index 0000000000000..e56541ed57cb3 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utils; + +import org.apache.hudi.avro.model.HoodieMetadataRecord; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.util.AvroSchemaConverter; + +import org.apache.avro.Schema; +import org.apache.flink.table.types.DataType; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Test cases for {@link org.apache.hudi.util.AvroSchemaConverter}. + */ +public class TestAvroSchemaConverter { + @Test + void testUnionSchemaWithMultipleRecordTypes() { + Schema schema = HoodieMetadataRecord.SCHEMA$; + DataType dataType = AvroSchemaConverter.convertToDataType(schema); + int pos = HoodieMetadataRecord.SCHEMA$.getField(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).pos(); + final String expected = "ROW<" + + "`fileName` STRING, " + + "`columnName` STRING, " + + "`minValue` ROW<`wrapper` RAW('java.lang.Object', ?) NOT NULL>, " + + "`maxValue` ROW<`wrapper` RAW('java.lang.Object', ?) NOT NULL>, " + + "`valueCount` BIGINT, " + + "`nullCount` BIGINT, " + + "`totalSize` BIGINT, " + + "`totalUncompressedSize` BIGINT, " + + "`isDeleted` BOOLEAN NOT NULL>"; + assertThat(dataType.getChildren().get(pos).toString(), is(expected)); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index f2e8f1ab67a7c..a5b7e368a8856 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -318,6 +318,9 @@ public String end() { } } + /** + * Tool to construct the catalog DDL. + */ public static class Catalog { private final String catalogName; private String catalogPath = "."; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java index 2d55b4001c0c8..d20c25866200f 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java @@ -97,6 +97,6 @@ public static String getSplitPartitionPath(MergeOnReadInputSplit split) { public static StreamReadMonitoringFunction getMonitorFunc(Configuration conf) { final String basePath = conf.getString(FlinkOptions.PATH); - return new StreamReadMonitoringFunction(conf, new Path(basePath), 1024 * 1024L, null); + return new StreamReadMonitoringFunction(conf, new Path(basePath), TestConfigurations.ROW_TYPE, 1024 * 1024L, null); } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java index 121a1c6785f30..2e3bc7383f553 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java @@ -18,11 +18,11 @@ package org.apache.hudi.utils; -import org.apache.flink.configuration.Configuration; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.util.ViewStorageProperties; +import org.apache.flink.configuration.Configuration; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir;