Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
b3337d7
Revisited Data Skipping utils to accept broader scope of "foldable" e…
Mar 8, 2022
c7a83ea
Expanded scope even further to include any expression not referencing…
Mar 8, 2022
02faeea
Refactor Column Stats Index filter expression translation seq to supp…
Mar 9, 2022
8d4ee26
Added test applying Spark standard functions to source Data Table col…
Mar 9, 2022
a69ddbc
Grouped together logically equivalent expressions
Mar 9, 2022
e1a22bb
Generalize all DS patterns to accept "Single Attribute Expressions" (…
Mar 9, 2022
2ff9d9a
Added composite expression tests
Mar 9, 2022
aa5a54d
Added test for non-literal value expression;
Mar 9, 2022
b388434
Added tests for `like`, `not like` operators
Mar 9, 2022
b32ee08
Tightened up permitted transformation expression to only accept ones …
Mar 9, 2022
3a5c1c9
Rebased allowed transformation matching seq to match permitted transf…
Mar 9, 2022
eea06bf
Extracted Expression utils to `HoodieCatalystExpressionUtils`
Mar 10, 2022
59eef5b
Worked around bug in Spark not allowing to resolve expressions in a s…
Mar 11, 2022
3f67769
Added tests for composite expression (w/ nested function calls)
Mar 11, 2022
7cfc6ce
Added `HoodieSparkTypeUtils`;
Mar 11, 2022
f5d2213
Simplify expression resolution considerably
Mar 11, 2022
f34df69
Fixing incorrect casting
Mar 11, 2022
e7a2291
Tidying up java-docs
Mar 11, 2022
091a357
Fixing compilation
Mar 11, 2022
43d890a
Tidying up
Mar 15, 2022
c4ffcc2
Adding explicit type (Scala 2.11 not able to deduce it)
Mar 15, 2022
b0881ad
Tidying up
Mar 15, 2022
2394573
Scaffolded `HoodieCatalystExpressionUtils` as Spark-specific object;
Mar 15, 2022
26936a5
Bootstrapped Spark2 & Spark3 specific `HoodieCatalystExpressionUtils`
Mar 15, 2022
0c2b88c
Fixing refs
Mar 15, 2022
5572f82
Missing license
Mar 15, 2022
94e96fb
Rebasing refs in `DataSkippingUtils`
Mar 15, 2022
074ac87
Missing imports
Mar 15, 2022
9dd04d4
Fixing refs
Mar 15, 2022
0f0d114
Tidying up
Mar 15, 2022
d72cae3
Inlined `swapAttributeRefInExpr` util
Mar 15, 2022
468d7fc
Branched out `HoodieSpark3_2CatalystExpressionUtils` to support Spark…
Mar 15, 2022
54a72dc
`HoodieSpark3CatalystExpressionUtils` > `HoodieSpark3_1CatalystExpres…
Mar 15, 2022
fe59311
Rebased `Spark3Adapter` to become `BaseSpark3Adapter`;
Mar 15, 2022
b06fe0d
Dangling ref
Mar 15, 2022
d320e14
Fixed `ColumnStatsIndexHelper` handling min/max values from `HoodieCo…
Mar 23, 2022
5bdb3ee
Tidying up
Mar 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.util.DataTypeUtils.areCompatible;

public class ColumnStatsIndexHelper {
Expand Down Expand Up @@ -111,17 +112,17 @@ public static String getNumNullsColumnNameFor(String colName) {
* | another_base_file.parquet | -10 | 0 | 5 |
* +---------------------------+------------+------------+-------------+
* </pre>
*
* <p>
* NOTE: Currently {@link TimestampType} is not supported, since Parquet writer
* does not support statistics for it.
*
* <p>
* TODO leverage metadata table after RFC-27 lands
* @VisibleForTesting
*
* @param sparkSession encompassing Spark session
* @param baseFilesPaths list of base-files paths to be sourced for column-stats index
* @param sparkSession encompassing Spark session
* @param baseFilesPaths list of base-files paths to be sourced for column-stats index
* @param orderedColumnSchemas target ordered columns
* @return Spark's {@link Dataset} holding an index table
* @VisibleForTesting
*/
@Nonnull
public static Dataset<Row> buildColumnStatsTableFor(
Expand Down Expand Up @@ -223,13 +224,13 @@ public static Dataset<Row> buildColumnStatsTableFor(
* <li>Cleans up any residual index tables, that weren't cleaned up before</li>
* </ol>
*
* @param sparkSession encompassing Spark session
* @param sparkSession encompassing Spark session
* @param sourceTableSchema instance of {@link StructType} bearing source table's writer's schema
* @param sourceBaseFiles list of base-files to be indexed
* @param orderedCols target ordered columns
* @param indexFolderPath col-stats index folder path
* @param commitTime current operation commit instant
* @param completedCommits all previously completed commit instants
* @param sourceBaseFiles list of base-files to be indexed
* @param orderedCols target ordered columns
* @param indexFolderPath col-stats index folder path
* @param commitTime current operation commit instant
* @param completedCommits all previously completed commit instants
*/
public static void updateColumnStatsIndexFor(
@Nonnull SparkSession sparkSession,
Expand Down Expand Up @@ -424,57 +425,64 @@ private static String composeZIndexColName(String col, String statName) {
return String.format("%s_%s", col, statName);
}

private static Pair<Object, Object>
fetchMinMaxValues(
@Nonnull DataType colType,
@Nonnull HoodieColumnRangeMetadata<Comparable> colMetadata) {
private static Pair<Object, Object> fetchMinMaxValues(@Nonnull DataType colType,
@Nonnull HoodieColumnRangeMetadata<Comparable> colMetadata) {
Comparable<?> minValue = colMetadata.getMinValue();
Comparable<?> maxValue = colMetadata.getMaxValue();

checkState((minValue == null) == (maxValue == null), "Either both min/max values should be null or neither");

if (minValue == null || maxValue == null) {
return Pair.of(null, null);
}

if (colType instanceof IntegerType) {
return Pair.of(
new Integer(colMetadata.getMinValue().toString()),
new Integer(colMetadata.getMaxValue().toString())
new Integer(minValue.toString()),
new Integer(maxValue.toString())
);
} else if (colType instanceof DoubleType) {
return Pair.of(
new Double(colMetadata.getMinValue().toString()),
new Double(colMetadata.getMaxValue().toString())
new Double(minValue.toString()),
new Double(maxValue.toString())
);
} else if (colType instanceof StringType) {
return Pair.of(
colMetadata.getMinValue().toString(),
colMetadata.getMaxValue().toString());
minValue.toString(),
maxValue.toString());
} else if (colType instanceof DecimalType) {
return Pair.of(
new BigDecimal(colMetadata.getMinValue().toString()),
new BigDecimal(colMetadata.getMaxValue().toString()));
new BigDecimal(minValue.toString()),
new BigDecimal(maxValue.toString()));
} else if (colType instanceof DateType) {
return Pair.of(
java.sql.Date.valueOf(colMetadata.getMinValue().toString()),
java.sql.Date.valueOf(colMetadata.getMaxValue().toString()));
java.sql.Date.valueOf(minValue.toString()),
java.sql.Date.valueOf(maxValue.toString()));
} else if (colType instanceof LongType) {
return Pair.of(
new Long(colMetadata.getMinValue().toString()),
new Long(colMetadata.getMaxValue().toString()));
new Long(minValue.toString()),
new Long(maxValue.toString()));
} else if (colType instanceof ShortType) {
return Pair.of(
new Short(colMetadata.getMinValue().toString()),
new Short(colMetadata.getMaxValue().toString()));
new Short(minValue.toString()),
new Short(maxValue.toString()));
} else if (colType instanceof FloatType) {
return Pair.of(
new Float(colMetadata.getMinValue().toString()),
new Float(colMetadata.getMaxValue().toString()));
new Float(minValue.toString()),
new Float(maxValue.toString()));
} else if (colType instanceof BinaryType) {
return Pair.of(
((ByteBuffer) colMetadata.getMinValue()).array(),
((ByteBuffer) colMetadata.getMaxValue()).array());
((ByteBuffer) minValue).array(),
((ByteBuffer) maxValue).array());
} else if (colType instanceof BooleanType) {
return Pair.of(
Boolean.valueOf(colMetadata.getMinValue().toString()),
Boolean.valueOf(colMetadata.getMaxValue().toString()));
Boolean.valueOf(minValue.toString()),
Boolean.valueOf(maxValue.toString()));
} else if (colType instanceof ByteType) {
return Pair.of(
Byte.valueOf(colMetadata.getMinValue().toString()),
Byte.valueOf(colMetadata.getMaxValue().toString()));
} else {
Byte.valueOf(minValue.toString()),
Byte.valueOf(maxValue.toString()));
} else {
throw new HoodieException(String.format("Not support type: %s", colType));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import org.apache.spark.sql.hudi.SparkAdapter
trait SparkAdapterSupport {

lazy val sparkAdapter: SparkAdapter = {
val adapterClass = if (HoodieSparkUtils.gteqSpark3_2) {
val adapterClass = if (HoodieSparkUtils.isSpark3_2) {
"org.apache.spark.sql.adapter.Spark3_2Adapter"
} else if (HoodieSparkUtils.isSpark3_0 || HoodieSparkUtils.isSpark3_1) {
"org.apache.spark.sql.adapter.Spark3Adapter"
"org.apache.spark.sql.adapter.Spark3_1Adapter"
} else {
"org.apache.spark.sql.adapter.Spark2Adapter"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this code adapted from somewhere? if so, can you please add source attribution

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, this is our code. Had to place it in spark.sql to access package-private API


import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan}
import org.apache.spark.sql.types.StructType

trait HoodieCatalystExpressionUtils {

/**
* Parses and resolves expression against the attributes of the given table schema.
*
* For example:
* <pre>
* ts > 1000 and ts <= 1500
* </pre>
* will be resolved as
* <pre>
* And(GreaterThan(ts#590L > 1000), LessThanOrEqual(ts#590L <= 1500))
* </pre>
*
* Where <pre>ts</pre> is a column of the provided [[tableSchema]]
*
* @param spark spark session
* @param exprString string representation of the expression to parse and resolve
* @param tableSchema table schema encompassing attributes to resolve against
* @return Resolved filter expression
*/
def resolveExpr(spark: SparkSession, exprString: String, tableSchema: StructType): Expression = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not change

val expr = spark.sessionState.sqlParser.parseExpression(exprString)
resolveExpr(spark, expr, tableSchema)
}

/**
* Resolves provided expression (unless already resolved) against the attributes of the given table schema.
*
* For example:
* <pre>
* ts > 1000 and ts <= 1500
* </pre>
* will be resolved as
* <pre>
* And(GreaterThan(ts#590L > 1000), LessThanOrEqual(ts#590L <= 1500))
* </pre>
*
* Where <pre>ts</pre> is a column of the provided [[tableSchema]]
*
* @param spark spark session
* @param expr Catalyst expression to be resolved (if not yet)
* @param tableSchema table schema encompassing attributes to resolve against
* @return Resolved filter expression
*/
def resolveExpr(spark: SparkSession, expr: Expression, tableSchema: StructType): Expression = {
val analyzer = spark.sessionState.analyzer
val schemaFields = tableSchema.fields

val resolvedExpr = {
val plan: LogicalPlan = Filter(expr, LocalRelation(schemaFields.head, schemaFields.drop(1): _*))
analyzer.execute(plan).asInstanceOf[Filter].condition
}

if (!hasUnresolvedRefs(resolvedExpr)) {
resolvedExpr
} else {
throw new IllegalStateException("unresolved attribute")
}
}

/**
* Split the given predicates into two sequence predicates:
* - predicates that references partition columns only(and involves no sub-query);
* - other predicates.
*
* @param sparkSession The spark session
* @param predicates The predicates to be split
* @param partitionColumns The partition columns
* @return (partitionFilters, dataFilters)
*/
def splitPartitionAndDataPredicates(sparkSession: SparkSession,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not change

predicates: Array[Expression],
partitionColumns: Array[String]): (Array[Expression], Array[Expression]) = {
// Validates that the provided names both resolve to the same entity
val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver

predicates.partition(expr => {
// Checks whether given expression only references partition columns(and involves no sub-query)
expr.references.forall(r => partitionColumns.exists(resolvedNameEquals(r.name, _))) &&
!SubqueryExpression.hasSubquery(expr)
})
}

/**
* Matches an expression iff
*
* <ol>
* <li>It references exactly one [[AttributeReference]]</li>
* <li>It contains only whitelisted transformations that preserve ordering of the source column [1]</li>
* </ol>
*
* [1] Preserving ordering is defined as following: transformation T is defined as ordering preserving in case
* values of the source column A values being ordered as a1, a2, a3 ..., will map into column B = T(A) which
* will keep the same ordering b1, b2, b3, ... with b1 = T(a1), b2 = T(a2), ...
*/
def tryMatchAttributeOrderingPreservingTransformation(expr: Expression): Option[AttributeReference]

private def hasUnresolvedRefs(resolvedExpr: Expression): Boolean =
resolvedExpr.collectFirst {
case _: UnresolvedAttribute | _: UnresolvedFunction => true
}.isDefined
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,21 @@ import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.execution.datasources.{FilePartition, LogicalRelation, PartitionedFile, SparkParsePartitionUtil}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, Row, SparkSession}

import java.util.Locale

/**
* An interface to adapter the difference between spark2 and spark3
* in some spark related class.
* Interface adapting discrepancies and incompatibilities between different Spark versions
*/
trait SparkAdapter extends Serializable {

/**
* Creates instance of [[HoodieCatalystExpressionUtils]] providing for common utils operating
* on Catalyst Expressions
*/
def createCatalystExpressionUtils(): HoodieCatalystExpressionUtils

/**
* Creates instance of [[HoodieAvroSerializer]] providing for ability to serialize
* Spark's [[InternalRow]] into Avro payloads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path, PathFilter}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.{getPartitionPath, isMetadataTable}
Expand All @@ -32,7 +32,6 @@ import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
Expand All @@ -41,7 +40,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionDirectory, PartitionedFile}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.StructType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal}
import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.hudi.{DataSkippingUtils, HoodieSqlCommonUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{AnalysisException, Column, SparkSession}
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.unsafe.types.UTF8String

import java.text.SimpleDateFormat
Expand Down Expand Up @@ -244,21 +243,21 @@ case class HoodieFileIndex(spark: SparkSession,
// column references from the filtering expressions, and only transpose records corresponding to the
// columns referenced in those
val transposedColStatsDF =
queryReferencedColumns.map(colName =>
colStatsDF.filter(col(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).equalTo(colName))
.select(targetColStatsIndexColumns.map(col): _*)
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, getNumNullsColumnNameFor(colName))
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, getMinColumnNameFor(colName))
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, getMaxColumnNameFor(colName))
)
.reduceLeft((left, right) =>
left.join(right, usingColumn = HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME))
queryReferencedColumns.map(colName =>
colStatsDF.filter(col(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).equalTo(colName))
.select(targetColStatsIndexColumns.map(col): _*)
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, getNumNullsColumnNameFor(colName))
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, getMinColumnNameFor(colName))
.withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, getMaxColumnNameFor(colName))
)
.reduceLeft((left, right) =>
left.join(right, usingColumn = HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME))

// Persist DF to avoid re-computing column statistics unraveling
withPersistence(transposedColStatsDF) {
val indexSchema = transposedColStatsDF.schema
val indexFilter =
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema))
queryFilters.map(DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr(_, indexSchema))
.reduce(And)

val allIndexedFileNames =
Expand Down
Loading