diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index c3825e3426cb8..46e1fe3cf82bb 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -62,6 +62,11 @@ object DataSourceReadOptions { "(or) Read Optimized mode (obtain latest view, based on base files) (or) Snapshot mode " + "(obtain latest view, by merging base and (if any) log files)") + val QUERY_USE_DATABASE: ConfigProperty[Boolean] = ConfigProperty + .key("hoodie.query.use.database") + .defaultValue(false) + .withDocumentation("Whether to add database name to qualify table name when setting parameters in Spark SQL query") + val INCREMENTAL_FORMAT_LATEST_STATE_VAL = "latest_state" val INCREMENTAL_FORMAT_CDC_VAL = "cdc" val INCREMENTAL_FORMAT: ConfigProperty[String] = ConfigProperty diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index ad1e2059bf5ea..ad6b0b1b57e96 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession} import scala.collection.JavaConverters._ +import scala.collection.mutable /** * Hoodie Spark Datasource, for reading and writing hoodie tables @@ -93,15 +94,6 @@ class DefaultSource extends RelationProvider Seq.empty } - // Add default options for unspecified read options keys. - val parameters = (if (globPaths.nonEmpty) { - Map( - "glob.paths" -> globPaths.mkString(",") - ) - } else { - Map() - }) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams) - // Get the table base path val tablePath = if (globPaths.nonEmpty) { DataSourceUtils.getTablePath(fs, globPaths.toArray) @@ -111,10 +103,38 @@ class DefaultSource extends RelationProvider log.info("Obtained hudi table path: " + tablePath) val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build() + val sqlConf = sqlContext.sparkSession.sessionState.conf.getAllConfs + val queryConf = getQueryConf(sqlConf, metaClient) + + // Add default options for unspecified read options keys. + val parameters = (if (globPaths.nonEmpty) { + Map( + "glob.paths" -> globPaths.mkString(",") + ) + } else { + Map() + }) ++ DataSourceOptionsHelper.parametersWithReadDefaults(queryConf ++ optParams) DefaultSource.createRelation(sqlContext, metaClient, schema, globPaths, parameters) } + def getQueryConf(conf: Map[String, String], metaClient: HoodieTableMetaClient): Map[String,String] = { + val queryConf = new mutable.HashMap[String,String] + + val isQueryUseDatabase = conf.getOrElse(QUERY_USE_DATABASE.key(), QUERY_USE_DATABASE.defaultValue().toString).toBoolean + val tableConfig = metaClient.getTableConfig + val tableName = if (isQueryUseDatabase) { + s"${tableConfig.getDatabaseName}.${tableConfig.getTableName}" + } else { + tableConfig.getTableName + } + + conf.keySet.filter(_.contains(s"$tableName.")).foreach( key => { + queryConf.put(key.replace(s"$tableName.",""), conf.get(key).get) + }) + queryConf.toMap + } + def getValidCommits(metaClient: HoodieTableMetaClient): String = { metaClient .getCommitsAndCompactionTimeline.filterCompletedInstants.getInstantsAsStream.toArray().map(_.asInstanceOf[HoodieInstant].getFileName).mkString(",") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryTable.scala new file mode 100644 index 0000000000000..e43d82d0008cb --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestQueryTable.scala @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +class TestQueryTable extends HoodieSparkSqlTestBase { + + test("Test incremental query with set parameters") { + val tableName = generateTableName + + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string + |) using hudi + | partitioned by (dt) + | options ( + | primaryKey = 'id', + | preCombineField = 'ts', + | type = 'cow' + | ) + |""".stripMargin) + spark.sql(s"insert into $tableName values (1,'a1', 10, 1000, '2022-11-25')") + spark.sql(s"insert into $tableName values (2,'a2', 20, 2000, '2022-11-25')") + spark.sql(s"insert into $tableName values (3,'a3', 30, 3000, '2022-11-26')") + spark.sql(s"insert into $tableName values (4,'a4', 40, 4000, '2022-12-26')") + spark.sql(s"insert into $tableName values (5,'a5', 50, 5000, '2022-12-27')") + + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1", 10.0, 1000, "2022-11-25"), + Seq(2, "a2", 20.0, 2000, "2022-11-25"), + Seq(3, "a3", 30.0, 3000, "2022-11-26"), + Seq(4, "a4", 40.0, 4000, "2022-12-26"), + Seq(5, "a5", 50.0, 5000, "2022-12-27") + ) + + + import spark.implicits._ + val commits = spark.sql(s"call show_commits(table => '$tableName')").select("commit_time").map(k => k.getString(0)).take(10) + val beginTime = commits(commits.length - 2) + + spark.sql(s"set hoodie.$tableName.datasource.query.type = incremental") + spark.sql(s"set hoodie.$tableName.datasource.read.begin.instanttime = $beginTime") + spark.sql(s"refresh table $tableName") + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(3, "a3", 30.0, 3000, "2022-11-26"), + Seq(4, "a4", 40.0, 4000, "2022-12-26"), + Seq(5, "a5", 50.0, 5000, "2022-12-27") + ) + + spark.sql(s"set hoodie.query.use.database = true") + spark.sql(s"refresh table $tableName") + val cnt = spark.sql(s"select * from $tableName").count() + assertResult(5)(cnt) + + spark.sql(s"set hoodie.default.$tableName.datasource.query.type = incremental") + spark.sql(s"set hoodie.default.$tableName.datasource.read.begin.instanttime = $beginTime") + val endTime = commits(1) + spark.sql(s"set hoodie.default.$tableName.datasource.read.end.instanttime = $endTime") + spark.sql(s"refresh table $tableName") + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(3, "a3", 30.0, 3000, "2022-11-26"), + Seq(4, "a4", 40.0, 4000, "2022-12-26") + ) + + spark.sql(s"set hoodie.default.$tableName.datasource.read.incr.path.glob = /dt=2022-11*/*") + spark.sql(s"refresh table $tableName") + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(3, "a3", 30.0, 3000, "2022-11-26") + ) + + spark.conf.unset("hoodie.query.use.database") + spark.conf.unset(s"hoodie.$tableName.datasource.query.type") + spark.conf.unset(s"hoodie.default.$tableName.datasource.query.type") + } + + test("Test snapshot query with set parameters") { + val tableName = generateTableName + + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string + |) using hudi + | partitioned by (dt) + | options ( + | primaryKey = 'id', + | preCombineField = 'ts', + | type = 'cow' + | ) + |""".stripMargin) + + spark.sql(s"insert into $tableName values (1,'a1', 10, 1000, '2022-11-25')") + spark.sql(s"insert into $tableName values (2,'a2', 20, 2000, '2022-11-25')") + spark.sql(s"update $tableName set price = 22 where id = 2") + + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1", 10.0, 1000, "2022-11-25"), + Seq(2, "a2", 22.0, 2000, "2022-11-25") + ) + + import spark.implicits._ + val commits = spark.sql(s"call show_commits(table => '$tableName')").select("commit_time").map(k => k.getString(0)).take(10) + val beginTime = commits(commits.length - 2) + + spark.sql(s"set $tableName.as.of.instant = $beginTime") + spark.sql(s"refresh table $tableName") + checkAnswer(s"select id, name, price, ts, dt from $tableName where id = 2")( + Seq(2, "a2", 20, 2000, "2022-11-25") + ) + spark.sql(s"set hoodie.query.use.database = true") + spark.sql(s"refresh table $tableName") + checkAnswer(s"select id, name, price, ts, dt from $tableName where id = 2")( + Seq(2, "a2", 22, 2000, "2022-11-25") + ) + + spark.sql(s"set default.$tableName.as.of.instant = $beginTime") + spark.sql(s"refresh table $tableName") + checkAnswer(s"select id, name, price, ts, dt from $tableName where id = 2")( + Seq(2, "a2", 20, 2000, "2022-11-25") + ) + + spark.conf.unset("hoodie.query.use.database") + spark.conf.unset(s"hoodie.$tableName.datasource.query.type") + spark.conf.unset(s"hoodie.default.$tableName.datasource.query.type") + } + + test("Test read_optimized query with set parameters") { + val tableName = generateTableName + + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | dt string + |) using hudi + | partitioned by (dt) + | options ( + | primaryKey = 'id', + | preCombineField = 'ts', + | type = 'mor' + | ) + |""".stripMargin) + + spark.sql(s"insert into $tableName values (1,'a1', 10, 1000, '2022-11-25')") + spark.sql(s"insert into $tableName values (2,'a2', 20, 2000, '2022-11-25')") + spark.sql(s"update $tableName set price = 22 where id = 2") + + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1", 10.0, 1000, "2022-11-25"), + Seq(2, "a2", 22.0, 2000, "2022-11-25") + ) + + spark.sql(s"set hoodie.$tableName.datasource.query.type = read_optimized") + spark.sql(s"refresh table $tableName") + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1", 10.0, 1000, "2022-11-25"), + Seq(2, "a2", 20.0, 2000, "2022-11-25") + ) + + spark.sql(s"set hoodie.query.use.database = true") + spark.sql(s"refresh table $tableName") + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1", 10.0, 1000, "2022-11-25"), + Seq(2, "a2", 22.0, 2000, "2022-11-25") + ) + + spark.sql(s"set hoodie.default.$tableName.datasource.query.type = read_optimized") + spark.sql(s"refresh table $tableName") + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1", 10.0, 1000, "2022-11-25"), + Seq(2, "a2", 20.0, 2000, "2022-11-25") + ) + + spark.conf.unset("hoodie.query.use.database") + spark.conf.unset(s"hoodie.$tableName.datasource.query.type") + spark.conf.unset(s"hoodie.default.$tableName.datasource.query.type") + } +}