diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 04fc02b740dd0..df5e2777cbe05 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieRecord import org.apache.spark.SPARK_VERSION import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.SQLConfInjectingRDD import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, StructField, StructType} diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala index f2d6f0381a471..1bdea9f93e572 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala @@ -168,6 +168,26 @@ object HoodieCatalystExpressionUtils { } } + /** + * Convert Filters to Catalyst Expressions and joined by And. If convert success return an + * Non-Empty Option[Expression],or else return None. + */ + def convertToCatalystExpression(filters: Array[Filter], + tableSchema: StructType): Option[Expression] = { + val expressions = filters.map(convertToCatalystExpression(_, tableSchema)) + if (expressions.forall(p => p.isDefined)) { + if (expressions.isEmpty) { + None + } else if (expressions.length == 1) { + expressions.head + } else { + Some(expressions.map(_.get).reduce(org.apache.spark.sql.catalyst.expressions.And)) + } + } else { + None + } + } + /** * Converts [[Filter]] to Catalyst [[Expression]] */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java index 00ff7e5683307..7d40382723c04 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java @@ -41,6 +41,11 @@ public class HoodieCommonConfig extends HoodieConfig { .noDefaultValue() .withDocumentation("The query instant for time travel. Without specified this option, we query the latest snapshot."); + public static final ConfigProperty READ_SUPPORT_V2_ENABLE = ConfigProperty + .key("hoodie.datasource.v2.read.enable") + .defaultValue(false) + .withDocumentation("If set to true, the query statement will use v2 to read"); + public static final ConfigProperty RECONCILE_SCHEMA = ConfigProperty .key("hoodie.datasource.write.reconcile.schema") .defaultValue(false) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 57cab09377fa1..fda7944c032b2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -152,6 +152,8 @@ object DataSourceReadOptions { val SCHEMA_EVOLUTION_ENABLED: ConfigProperty[Boolean] = HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE + val READ_SUPPORT_V2_ENABLE: ConfigProperty[Boolean] = HoodieCommonConfig.READ_SUPPORT_V2_ENABLE + /** @deprecated Use {@link QUERY_TYPE} and its methods instead */ @Deprecated val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key() diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala index e7848320ff354..0f40e516a226b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.util.Utils import org.joda.time.DateTimeZone -import org.scalactic.source import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag} import java.io.File @@ -76,7 +75,7 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { } } - override protected def test(testName: String, testTags: Tag*)(testFun: => Any /* Assertion */)(implicit pos: source.Position): Unit = { + override protected def test(testName: String, testTags: Tag*)(testFun: => Any /* Assertion */)(implicit pos: org.scalactic.source.Position): Unit = { super.test(testName, testTags: _*)( try { testFun diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryTable.scala new file mode 100644 index 0000000000000..aa9a6464fd8d3 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryTable.scala @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import org.apache.commons.lang3.StringUtils +import org.apache.hudi.HoodieSparkUtils +import org.apache.hudi.exception.{HoodieDuplicateKeyException, HoodieException} +import org.apache.spark.sql.functions.lit + +class TestQueryTable extends HoodieSparkSqlTestBase { + + test("Test PruneColumns") { + withTempDir { tmp => + val tableName = generateTableName + // Create a partitioned table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string + |) using hudi + | tblproperties (primaryKey = 'id') + | partitioned by (dt) + | location '${tmp.getCanonicalPath}/$tableName' + """.stripMargin) + // Insert overwrite dynamic partition + spark.sql( + s""" + | insert overwrite table $tableName + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-01-05' as dt + """.stripMargin) + + // Insert overwrite dynamic partition + spark.sql( + s""" + | insert overwrite table $tableName + | select 2 as id, 'a2' as name, 10 as price, 1000 as ts, '2021-01-06' as dt + """.stripMargin) + + // Insert overwrite static partition + spark.sql( + s""" + | insert into table $tableName partition(dt = '2021-01-05') + | select * from (select 2 , 'a2', 12, 1000) limit 10 + """.stripMargin) + + spark.sql(s"set hoodie.datasource.v2.read.enable=true") + + val query = s"select id, name from $tableName " + + s"where dt ='2021-01-05' and id = 1" + + spark.sql(query).explain(true) + + checkAnswer(query)( + Seq(1, "a1") + ) + spark.sql(s"set hoodie.datasource.v2.read.enable=false") + + } + + } + + test("Test RuntimeFiltering") { + withTempDir { tmp => + val tableName = generateTableName + // Create a partitioned table + println(spark.version) + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string + |) using hudi + | tblproperties (primaryKey = 'id', type = 'cow') + | partitioned by (name) + | location '${tmp.getCanonicalPath}' + """.stripMargin) + // Insert into dynamic partition + spark.sql( + s""" + | insert into $tableName + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-01-05' as dt + """.stripMargin) + + spark.sql( + s""" + | insert into $tableName + | select 10 as id, 'a10' as name, 100 as price, 10000 as ts, '2021-01-06' as dt + """.stripMargin) + + + spark.sql( + s""" + | insert into $tableName + | select 2 as id, 'a2' as name, 20 as price, 2000 as ts, '2021-02-05' as dt + """.stripMargin) + + val dimDf = spark.range(1, 4) + .withColumn("name", lit("a1")) + .select("id", "name") + + dimDf.show(false) + + spark.sql("CREATE TABLE dim (id int, name string) USING parquet") + dimDf.coalesce(1).write.mode("append").insertInto("dim") + + + spark.sql(s"set hoodie.datasource.v2.read.enable=true") + + val query = String.format("SELECT f.id, f.price, f.ts, f.dt, f.name FROM %s f JOIN dim d ON f.name = d.name AND d.id = 1 ORDER BY id", tableName) + val output = spark.sql("EXPLAIN EXTENDED " + query).collectAsList() + val actualFilterCount = StringUtils.countMatches(output.get(0).getString(0), "dynamicpruningexpression") + checkAnswer(query)( + Seq(1, 10.0, 1000, "2021-01-05", "a1") + ) + + if (HoodieSparkUtils.isSpark3_2) { + assertResult(actualFilterCount)(1) + } + + spark.sql("DROP TABLE IF EXISTS dim") + spark.sql(s"set hoodie.datasource.v2.read.enable=false") + } + } + + test("Test Query None Partitioned Table") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql(s"set hoodie.sql.insert.mode=strict") + spark.sql(s"set hoodie.datasource.v2.read.enable=true") + + // Create none partitioned cow table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | type = 'cow', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0, 1000) + ) + + spark.sql(s"insert into $tableName select 2, 'a2', 12, 1000") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 12.0, 1000) + ) + + checkAnswer(s"select id, name from default.$tableName where id = 1")( + Seq(1, "a1") + ) + + assertThrows[HoodieDuplicateKeyException] { + try { + spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000") + } catch { + case e: Exception => + var root: Throwable = e + while (root.getCause != null) { + root = root.getCause + } + throw root + } + } + + // Create table with dropDup is true + val tableName2 = generateTableName + spark.sql("set hoodie.datasource.write.insert.drop.duplicates = true") + spark.sql( + s""" + |create table $tableName2 ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName2' + | tblproperties ( + | type = 'cow', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + spark.sql(s"insert into $tableName2 values(1, 'a1', 10, 1000)") + // This record will be drop when dropDup is true + spark.sql(s"insert into $tableName2 values(2, 'a2', 1000, 1000)") + checkAnswer(s"select id, name, price, ts from $tableName2 where name = 'a1'")( + Seq(1, "a1", 10.0, 1000) + ) + + // disable this config to avoid affect other test in this class. + spark.sql("set hoodie.datasource.write.insert.drop.duplicates = false") + spark.sql(s"set hoodie.sql.insert.mode=upsert") + spark.sql(s"set hoodie.datasource.v2.read.enable=false") + } + } + + test("Test Query Partitioned Table") { + withTempDir { tmp => + val tableName = generateTableName + // Create a partitioned table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string + |) using hudi + | tblproperties (primaryKey = 'id') + | partitioned by (dt) + | location '${tmp.getCanonicalPath}/$tableName' + """.stripMargin) + // Insert overwrite dynamic partition + spark.sql( + s""" + | insert overwrite table $tableName + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-01-05' as dt + """.stripMargin) + + // Insert overwrite dynamic partition + spark.sql( + s""" + | insert overwrite table $tableName + | select 2 as id, 'a2' as name, 10 as price, 1000 as ts, '2021-01-06' as dt + """.stripMargin) + + // Insert overwrite static partition + spark.sql( + s""" + | insert overwrite table $tableName partition(dt = '2021-01-05') + | select * from (select 2 , 'a2', 12, 1000) limit 10 + """.stripMargin) + + // Insert data from another table + val tblNonPartition = generateTableName + spark.sql( + s""" + | create table $tblNonPartition ( + | id int, + | name string, + | price double, + | ts long + | ) using hudi + | tblproperties (primaryKey = 'id') + | location '${tmp.getCanonicalPath}/$tblNonPartition' + """.stripMargin) + spark.sql(s"insert into $tblNonPartition select 1, 'a1', 10, 1000") + spark.sql( + s""" + | insert overwrite table $tableName partition(dt ='2021-01-04') + | select * from $tblNonPartition limit 10 + """.stripMargin) + + spark.sql( + s""" + | insert overwrite table $tableName + | select id + 2, name, price, ts , '2021-01-04' from $tblNonPartition limit 10 + """.stripMargin) + + // test insert overwrite non-partitioned table + spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10, 1000") + + spark.sql(s"set hoodie.datasource.v2.read.enable=true") + + checkAnswer(s"select id, name, price, ts, dt from $tableName order by id,dt")( + Seq(2, "a2", 10.0, 1000, "2021-01-06"), + Seq(2, "a2", 12.0, 1000, "2021-01-05"), + Seq(3, "a1", 10.0, 1000, "2021-01-04") + ) + + checkAnswer(s"select id, name, price, ts, dt from $tableName " + + s"where dt >='2021-01-05' and dt <= '2021-01-06' order by id,dt")( + Seq(2, "a2", 12.0, 1000, "2021-01-05"), + Seq(2, "a2", 10.0, 1000, "2021-01-06") + ) + + checkAnswer(s"select id, name, price, ts from $tblNonPartition")( + Seq(2, "a2", 10.0, 1000) + ) + + spark.sql(s"set hoodie.datasource.v2.read.enable=false") + } + } + + test("Test Qeury Exception") { + val tableName = generateTableName + spark.sql(s"set hoodie.datasource.v2.read.enable=true") + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | dt string + |) using hudi + | tblproperties (primaryKey = 'id', type = 'mor') + | partitioned by (dt) + """.stripMargin) + + if (HoodieSparkUtils.isSpark3_2) { + assertThrows[HoodieException] { + try { + spark.sql(s"select * from $tableName").show() + } catch { + case e: Exception => + var root: Throwable = e + while (root.getCause != null) { + root = root.getCause + } + throw root + } + } + } + spark.sql(s"set hoodie.datasource.v2.read.enable=false") + } + + test("Test Query SQL Join") { + withTempDir { tmp => + val tableName1 = generateTableName + // Create a partitioned table + spark.sql( + s""" + |create table $tableName1 ( + | id int, + | name string, + | price double, + | ts long, + | dt string + |) using hudi + | tblproperties (primaryKey = 'id', type = 'cow') + | partitioned by (dt) + | location '${tmp.getCanonicalPath}/$tableName1' + """.stripMargin) + // Insert into dynamic partition + spark.sql( + s""" + | insert into $tableName1 + | select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-01-05' as dt + """.stripMargin) + + spark.sql( + s""" + | insert into $tableName1 + | select 10 as id, 'a111' as name, 10 as price, 1000 as ts, '2021-01-06' as dt + """.stripMargin) + + spark.sql( + s""" + | insert into $tableName1 + | select 2 as id, 'a2' as name, 20 as price, 2000 as ts, '2021-02-05' as dt + """.stripMargin) + + val dimDf = spark.range(1, 4) + .withColumn("name", lit("a1")) + .select("id", "name") + + val tableName2 = generateTableName + spark.sql( + s""" + |create table $tableName2 ( + | id int, + | name string + |) using hudi + | tblproperties (primaryKey = 'id', type = 'cow') + | location '${tmp.getCanonicalPath}/$tableName2' + """.stripMargin) + + dimDf.coalesce(1) + .write.format("org.apache.hudi").mode("append").insertInto(tableName2) + + spark.sql(s"set hoodie.datasource.v2.read.enable=true") + val query =String.format("SELECT f.id, f.name, f.ts, f.dt" + + " FROM %s f JOIN %s d ON f.name = d.name AND d.id = 1 ORDER BY id", tableName1, tableName2) + checkAnswer(query)( + Seq(1, "a1", 1000, "2021-01-05") + ) + spark.sql(s"set hoodie.datasource.v2.read.enable=false") + } + } +} diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala index 552eb320161ff..583ed576df61b 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import org.apache.hudi.spark3.internal.ReflectUtil import org.apache.spark.sql.catalyst.analysis.{TableOutputResolver, UnresolvedRelation} -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, ProjectionOverSchema} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, Like, ProjectionOverSchema} import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan} import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala index 52d450029e3bd..b76e392e5aac6 100644 --- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32PlusHoodieParquetFileFormat.scala @@ -402,7 +402,7 @@ object Spark32PlusHoodieParquetFileFormat { /** * NOTE: This method is specific to Spark 3.2.0 */ - private def createParquetFilters(args: Any*): ParquetFilters = { + def createParquetFilters(args: Any*): ParquetFilters = { // NOTE: ParquetFilters ctor args contain Scala enum, therefore we can't look it // up by arg types, and have to instead rely on the number of args based on individual class; // the ctor order is not guaranteed @@ -414,7 +414,7 @@ object Spark32PlusHoodieParquetFileFormat { /** * NOTE: This method is specific to Spark 3.2.0 */ - private def createParquetReadSupport(args: Any*): ParquetReadSupport = { + def createParquetReadSupport(args: Any*): ParquetReadSupport = { // NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it // up by arg types, and have to instead rely on the number of args based on individual class; // the ctor order is not guaranteed @@ -426,7 +426,7 @@ object Spark32PlusHoodieParquetFileFormat { /** * NOTE: This method is specific to Spark 3.2.0 */ - private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = { + def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = { // NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it // up by arg types, and have to instead rely on the number of args based on individual class; // the ctor order is not guaranteed @@ -445,7 +445,7 @@ object Spark32PlusHoodieParquetFileFormat { } } - private def rebuildFilterFromParquet(oldFilter: Filter, fileSchema: InternalSchema, querySchema: InternalSchema): Filter = { + def rebuildFilterFromParquet(oldFilter: Filter, fileSchema: InternalSchema, querySchema: InternalSchema): Filter = { if (fileSchema == null || querySchema == null) { oldFilter } else { diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala index 10d2208edc446..370d2d90fcb27 100644 --- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.hudi.analysis +import org.apache.hudi.{DataSourceReadOptions, DefaultSource, SparkAdapterSupport} import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.{DefaultSource, SparkAdapterSupport} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, UnresolvedPartitionSpec} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} @@ -52,12 +52,13 @@ class HoodieDataSourceV2ToV1Fallback(sparkSession: SparkSession) extends Rule[Lo with ProvidesHoodieConfig { override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown { - case v2r @ DataSourceV2Relation(v2Table: HoodieInternalV2Table, _, _, _, _) => + case v2r @ DataSourceV2Relation(v2Table: HoodieInternalV2Table, _, _, _, _) + if !(sparkSession.conf.get(DataSourceReadOptions.READ_SUPPORT_V2_ENABLE.key, + DataSourceReadOptions.READ_SUPPORT_V2_ENABLE.defaultValue.toString).toBoolean) => val output = v2r.output val catalogTable = v2Table.catalogTable.map(_ => v2Table.v1Table) val relation = new DefaultSource().createRelation(new SQLContext(sparkSession), buildHoodieConfig(v2Table.hoodieCatalogTable), v2Table.hoodieCatalogTable.tableSchema) - LogicalRelation(relation, output, catalogTable, isStreaming = false) } } diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala index eeef56d3cff74..412d9d6d75416 100644 --- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala @@ -117,6 +117,9 @@ class HoodieCatalog extends DelegatingCatalogExtension val schemaEvolutionEnabled: Boolean = spark.sessionState.conf.getConfString(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean + val supportV2ReadEnabled: Boolean = spark.sessionState.conf.getConfString(DataSourceReadOptions.READ_SUPPORT_V2_ENABLE.key, + DataSourceReadOptions.READ_SUPPORT_V2_ENABLE.defaultValue.toString).toBoolean + // NOTE: PLEASE READ CAREFULLY // // Since Hudi relations don't currently implement DS V2 Read API, we by default fallback to V1 here. @@ -124,7 +127,7 @@ class HoodieCatalog extends DelegatingCatalogExtension // where V2 API have to be used. Currently only such use-case is using of Schema Evolution feature // // Check out HUDI-4178 for more details - if (schemaEvolutionEnabled) { + if (schemaEvolutionEnabled || supportV2ReadEnabled) { v2Table } else { v2Table.v1TableWrapper diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala index 9968095f3a5d3..8914c89aadf78 100644 --- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala @@ -23,8 +23,10 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform} +import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write._ import org.apache.spark.sql.hudi.ProvidesHoodieConfig +import org.apache.spark.sql.hudi.source.HoodieBatchScanBuilder import org.apache.spark.sql.sources.{Filter, InsertableRelation} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -32,13 +34,14 @@ import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import java.util import scala.collection.JavaConverters.{mapAsJavaMapConverter, setAsJavaSetConverter} +import scala.jdk.CollectionConverters.mapAsScalaMapConverter case class HoodieInternalV2Table(spark: SparkSession, path: String, catalogTable: Option[CatalogTable] = None, tableIdentifier: Option[String] = None, options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty()) - extends Table with SupportsWrite with V2TableWithV1Fallback { + extends Table with SupportsWrite with SupportsRead with V2TableWithV1Fallback with ProvidesHoodieConfig { lazy val hoodieCatalogTable: HoodieCatalogTable = if (catalogTable.isDefined) { HoodieCatalogTable(spark, catalogTable.get) @@ -82,6 +85,15 @@ case class HoodieInternalV2Table(spark: SparkSession, }.toArray } + override def newScanBuilder(caseInsensitiveStringMap: CaseInsensitiveStringMap): ScanBuilder = { + val scanOptions = buildHoodieScanConfig(caseInsensitiveStringMap, hoodieCatalogTable) + new HoodieBatchScanBuilder(spark, hoodieCatalogTable, scanOptions) + } + + private def buildHoodieScanConfig(caseInsensitiveStringMap: CaseInsensitiveStringMap, + hoodieCatalogTable: HoodieCatalogTable): Map[String, String] = { + buildHoodieConfig(hoodieCatalogTable) ++ caseInsensitiveStringMap.asCaseSensitiveMap().asScala + } } private class HoodieV1WriteBuilder(writeOptions: CaseInsensitiveStringMap, diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/source/HoodieBatchScanBuilder.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/source/HoodieBatchScanBuilder.scala new file mode 100644 index 0000000000000..a41e8b4152455 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/source/HoodieBatchScanBuilder.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.source + +import org.apache.hudi.DataSourceReadOptions.QUERY_TYPE +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.{DataSourceOptionsHelper, DefaultSource} +import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SQLContext, SparkSession} +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns} +import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType + + +class HoodieBatchScanBuilder(spark: SparkSession, + hoodieCatalogTable: HoodieCatalogTable, + options: Map[String, String]) + extends ScanBuilder with SupportsPushDownFilters with SupportsPushDownRequiredColumns { + @transient lazy val hadoopConf = { + // Hadoop Configurations are case sensitive. + spark.sessionState.newHadoopConfWithOptions(options) + } + + private var filterExpressions: Option[Expression] = None + + private var filterArrays: Array[Filter] = Array.empty + + private var expectedSchema: StructType= hoodieCatalogTable.tableSchema + + override def build(): Scan = { + val relation = new DefaultSource().createRelation(new SQLContext(spark), options) + relation match { + case HadoopFsRelation(location, partitionSchema, dataSchema, _, _, options) => + val selectedPartitions = location.listFiles(Seq.empty, filterExpressions.toList) + SparkScan(spark, hoodieCatalogTable.catalogTableName, selectedPartitions, dataSchema, partitionSchema, + expectedSchema, filterArrays, options, hadoopConf) + case _ => + val isBootstrappedTable = hoodieCatalogTable.metaClient.getTableConfig.getBootstrapBasePath.isPresent + val tableType = hoodieCatalogTable.metaClient.getTableType + val parameters = DataSourceOptionsHelper.parametersWithReadDefaults(options) + val queryType = parameters(QUERY_TYPE.key) + throw new HoodieException("Hoodie do not support read with DataSource V2 for (tableType, queryType, " + + "isBootstrappedTable) = (" + tableType + "," + queryType + "," + isBootstrappedTable + ")") + } + } + + override def pushFilters(filters: Array[Filter]): Array[Filter] = { + this.filterExpressions = HoodieCatalystExpressionUtils.convertToCatalystExpression(filters, expectedSchema) + this.filterArrays = filters + filters + } + + override def pushedFilters(): Array[Filter] = { + filterArrays + } + + override def pruneColumns(requiredSchema: StructType): Unit = { + expectedSchema = requiredSchema + } +} diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/source/SparkBatch.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/source/SparkBatch.scala new file mode 100644 index 0000000000000..696b86c3565ea --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/source/SparkBatch.scala @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.source + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.FileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.hadoop.mapreduce.{JobID, RecordReader, TaskAttemptID, TaskID, TaskType} +import org.apache.hudi.HoodieSparkUtils +import org.apache.hudi.client.utils.SparkInternalSchemaConverter +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.util.InternalSchemaCache +import org.apache.hudi.common.util.StringUtils.isNullOrEmpty +import org.apache.hudi.common.util.collection.Pair +import org.apache.hudi.internal.schema.action.InternalSchemaMerger +import org.apache.hudi.internal.schema.utils.SerDeHelper +import org.apache.parquet.filter2.compat.FilterCompat +import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS +import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader} +import org.apache.spark.TaskContext +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReader, PartitionReaderFactory} +import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, ParquetFooterReader, ParquetOptions, ParquetReadSupport, ParquetWriteSupport, Spark32PlusDataSourceUtils, Spark32PlusHoodieVectorizedParquetRecordReader, VectorizedParquetRecordReader} +import org.apache.spark.sql.execution.datasources.parquet.Spark32PlusHoodieParquetFileFormat.{createParquetFilters, createParquetReadSupport, createVectorizedParquetRecordReader, pruneInternalSchema, rebuildFilterFromParquet} +import org.apache.spark.sql.execution.datasources.v2.{FilePartitionReaderFactory, PartitionReaderWithPartitionValues} +import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FilePartition, PartitionDirectory, PartitionedFile} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.{AtomicType, DataType, StructType} +import org.apache.spark.util.SerializableConfiguration + +import java.net.URI + +class SparkBatch(spark: SparkSession, + selectedPartitions: Seq[PartitionDirectory], + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + @transient hadoopConf: Configuration) extends Batch with Serializable { + + override def planInputPartitions(): Array[InputPartition] = { + // TODO support more accurate task planning. + val maxSplitBytes = FilePartition.maxSplitBytes(spark, selectedPartitions) + val splitFiles = selectedPartitions.flatMap { partition => + partition.files.flatMap { file => + val filePath = file.getPath + PartitionedFileUtil.splitFiles( + sparkSession = spark, + file = file, + filePath = filePath, + isSplitable = true, + maxSplitBytes = maxSplitBytes, + partitionValues = partition.values + ) + }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) + } + FilePartition.getFilePartitions(spark, splitFiles, maxSplitBytes).toArray + + } + + override def createReaderFactory(): PartitionReaderFactory = { + hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) + hadoopConf.set( + ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, + requiredSchema.json) + hadoopConf.set( + ParquetWriteSupport.SPARK_ROW_SCHEMA, + requiredSchema.json) + hadoopConf.set( + SQLConf.SESSION_LOCAL_TIMEZONE.key, + spark.sessionState.conf.sessionLocalTimeZone) + hadoopConf.setBoolean( + SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, + spark.sessionState.conf.nestedSchemaPruningEnabled) + hadoopConf.setBoolean( + SQLConf.CASE_SENSITIVE.key, + spark.sessionState.conf.caseSensitiveAnalysis) + + ParquetWriteSupport.setSchema(requiredSchema, hadoopConf) + + // Sets flags for `ParquetToSparkSchemaConverter` + hadoopConf.setBoolean( + SQLConf.PARQUET_BINARY_AS_STRING.key, + spark.sessionState.conf.isParquetBinaryAsString) + hadoopConf.setBoolean( + SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, + spark.sessionState.conf.isParquetINT96AsTimestamp) + val broadcastedConf = spark.sparkContext.broadcast( + new SerializableConfiguration(hadoopConf)) + val sqlConf = spark.sessionState.conf + ReaderFactory(sqlConf, broadcastedConf) + } + + case class ReaderFactory(sqlConf: SQLConf, + broadcastedConf: Broadcast[SerializableConfiguration]) extends FilePartitionReaderFactory { + // TODO: if you move this into the closure it reverts to the default values. + // If true, enable using the custom RecordReader for parquet. This only works for + // a subset of the types (no complex types). + val resultSchema: StructType = StructType(partitionSchema.fields ++ requiredSchema.fields) + val enableOffHeapColumnVector: Boolean = sqlConf.offHeapColumnVectorEnabled + val enableVectorizedReader: Boolean = + sqlConf.parquetVectorizedReaderEnabled && + resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) + val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled + val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion + val capacity: Int = sqlConf.parquetVectorizedReaderBatchSize + val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown + // Whole stage codegen (PhysicalRDD) is able to deal with batches directly + val pushDownDate: Boolean = sqlConf.parquetFilterPushDownDate + val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp + val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal + val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith + val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold + val isCaseSensitive = sqlConf.caseSensitiveAnalysis + val parquetOptions = new ParquetOptions(options, spark.sessionState.conf) + val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead + val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead + + override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { + val reader = createReader(partitionedFile) + + val fileReader = new PartitionReader[InternalRow] { + override def next(): Boolean = reader.nextKeyValue() + + override def get(): InternalRow = reader.getCurrentValue.asInstanceOf[InternalRow] + + override def close(): Unit = reader.close() + } + + new PartitionReaderWithPartitionValues(fileReader, requiredSchema, + partitionSchema, partitionedFile.partitionValues) + } + + def createReader(partitionedFile: PartitionedFile): RecordReader[Void, _ >: InternalRow <: AnyRef] = { + + val sharedConf = broadcastedConf.value.value + val internalSchemaStr = sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA) + // For Spark DataSource v1, there's no Physical Plan projection/schema pruning w/in Spark itself, + // therefore it's safe to do schema projection here + if (!isNullOrEmpty(internalSchemaStr)) { + val prunedInternalSchemaStr = + pruneInternalSchema(internalSchemaStr, requiredSchema) + sharedConf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, prunedInternalSchemaStr) + } + + val filePath = new Path(new URI(partitionedFile.filePath)) + val split = new FileSplit(filePath, partitionedFile.start, partitionedFile.length, Array.empty[String]) + + // Internal schema has to be pruned at this point + val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr) + + val shouldUseInternalSchema = !isNullOrEmpty(internalSchemaStr) && querySchemaOption.isPresent + + val tablePath = sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH) + val fileSchema = if (shouldUseInternalSchema) { + val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong; + val validCommits = sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST) + InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, sharedConf, if (validCommits == null) "" else validCommits) + } else { + null + } + + lazy val footerFileMetaData = + ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData + // Try to push down filters when filter push-down is enabled. + val pushed = if (enableParquetFilterPushDown) { + val parquetSchema = footerFileMetaData.getSchema + val parquetFilters = if (HoodieSparkUtils.gteqSpark3_2_1) { + // NOTE: Below code could only be compiled against >= Spark 3.2.1, + // and unfortunately won't compile against Spark 3.2.0 + // However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1 + val datetimeRebaseSpec = + DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseSpec) + } else { + // Spark 3.2.0 + val datetimeRebaseMode = + Spark32PlusDataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + createParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseMode) + } + filters.map(rebuildFilterFromParquet(_, fileSchema, querySchemaOption.orElse(null))) + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(parquetFilters.createFilter) + .reduceOption(FilterApi.and) + } else { + None + } + + // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' + // *only* if the file was created by something other than "parquet-mr", so check the actual + // writer here for this file. We have to do this per-file, as each file in the table may + // have different writers. + // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. + def isCreatedByParquetMr: Boolean = + footerFileMetaData.getCreatedBy().startsWith("parquet-mr") + + val convertTz = + if (timestampConversion && !isCreatedByParquetMr) { + Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + } else { + None + } + + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + + // Clone new conf + val hadoopAttemptConf = new Configuration(sharedConf) + val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = if (shouldUseInternalSchema) { + val mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchemaOption.get(), true, true).mergeSchema() + val mergedSchema = SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema) + + hadoopAttemptConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, mergedSchema.json) + + SparkInternalSchemaConverter.collectTypeChangedCols(querySchemaOption.get(), mergedInternalSchema) + } else { + new java.util.HashMap() + } + + val hadoopAttemptContext = + new TaskAttemptContextImpl(hadoopAttemptConf, attemptId) + + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + if (pushed.isDefined) { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) + } + val taskContext = Option(TaskContext.get()) + if (enableVectorizedReader) { + val vectorizedReader = + if (shouldUseInternalSchema) { + val int96RebaseSpec = + DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + val datetimeRebaseSpec = + DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + new Spark32PlusHoodieVectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseSpec.mode.toString, + datetimeRebaseSpec.timeZone, + int96RebaseSpec.mode.toString, + int96RebaseSpec.timeZone, + enableOffHeapColumnVector && taskContext.isDefined, + capacity, + typeChangeInfos) + } else if (HoodieSparkUtils.gteqSpark3_2_1) { + // NOTE: Below code could only be compiled against >= Spark 3.2.1, + // and unfortunately won't compile against Spark 3.2.0 + // However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1 + val int96RebaseSpec = + DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + val datetimeRebaseSpec = + DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + new VectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseSpec.mode.toString, + datetimeRebaseSpec.timeZone, + int96RebaseSpec.mode.toString, + int96RebaseSpec.timeZone, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) + } else { + // Spark 3.2.0 + val datetimeRebaseMode = + Spark32PlusDataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + val int96RebaseMode = + Spark32PlusDataSourceUtils.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + createVectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseMode.toString, + int96RebaseMode.toString, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) + } + vectorizedReader.initialize(split, hadoopAttemptContext) + vectorizedReader + } else { + val readSupport = if (HoodieSparkUtils.gteqSpark3_2_1) { + // ParquetRecordReader returns InternalRow + // NOTE: Below code could only be compiled against >= Spark 3.2.1, + // and unfortunately won't compile against Spark 3.2.0 + // However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1 + val int96RebaseSpec = + DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + val datetimeRebaseSpec = + DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + new ParquetReadSupport( + convertTz, + enableVectorizedReader = false, + datetimeRebaseSpec, + int96RebaseSpec) + } else { + val datetimeRebaseMode = + Spark32PlusDataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + val int96RebaseMode = + Spark32PlusDataSourceUtils.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + createParquetReadSupport( + convertTz, + /* enableVectorizedReader = */ false, + datetimeRebaseMode, + int96RebaseMode) + } + val baseReader = if (pushed.isDefined && enableRecordFilter) { + val parquetFilter = FilterCompat.get(pushed.get, null) + new ParquetRecordReader[InternalRow](readSupport, parquetFilter) + } else { + new ParquetRecordReader[InternalRow](readSupport) + } + baseReader + } + } + } +} diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/source/SparkScan.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/source/SparkScan.scala new file mode 100644 index 0000000000000..e43756a365db8 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/source/SparkScan.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.source + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.expressions.{Expressions, NamedReference} +import org.apache.spark.sql.connector.read.{Batch, Scan, SupportsRuntimeFiltering} +import org.apache.spark.sql.execution.datasources.PartitionDirectory +import org.apache.spark.sql.sources.{Filter, In} +import org.apache.spark.sql.types.StructType + +case class SparkScan(spark: SparkSession, + hoodieTableName: String, + selectedPartitions: Seq[PartitionDirectory], + tableSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + @transient hadoopConf: Configuration) + extends SparkBatch ( + spark, + selectedPartitions, + partitionSchema, + requiredSchema, + filters, + options, + hadoopConf) with Scan with SupportsRuntimeFiltering { + + override def readSchema(): StructType = { + requiredSchema + } + + override def toBatch: Batch = this + + override def description(): String = { + hoodieTableName + ", PushedFilters: " + filters.mkString("[", ", ", "], ") + } + + override def filterAttributes(): Array[NamedReference] = { + val scanFields = readSchema().fields.map(_.name).toSet + + val namedReference = partitionSchema.fields.filter(field => scanFields.contains(field.name)) + .map(field => Expressions.column(field.name)) + namedReference + } + + override def filter(filters: Array[Filter]): Unit = { + // TODO need to filter out irrelevant data files + } + +}