Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ object DataSourceReadOptions {
"(or) Read Optimized mode (obtain latest view, based on base files) (or) Snapshot mode " +
"(obtain latest view, by merging base and (if any) log files)")

val QUERY_USE_DATABASE: ConfigProperty[Boolean] = ConfigProperty
.key("hoodie.query.use.database")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I am a little confused about the config and the use case of this config.

Copy link
Contributor Author

@dongkelun dongkelun Nov 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leesf This configuration is reflected in the test case. The main consideration is that if different databases have the same table name, such as db1.table1 and db2.table1, and if the two tables are queried in the same session at the same time, I only want to set the incremental query parameters of db1.table1:

set hoodie.table1.datasource.query.type=incremental;
set hoodie.table1.datasource.read.begin.instanttime=20221130163703640;

In this way, although I only want to query db1.table1 incrementally, I will also perform incremental queries when querying db2.table1. This is not the effect I expected, so I have this parameter:

set hoodie.query.use.database = true;
set hoodie.db1.table1.datasource.query.type=incremental;
set hoodie.db1.table1.datasource.read.begin.instanttime=20221130163703640;

In this way, we can only perform incremental queries on db1.table1. This configuration is false by default, which is consistent with the Hive incremental query parameters

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR of Hive incremental query:#4083

Copy link
Contributor

@leesf leesf Dec 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it only affects incremental query, maybe hoodie.query.incremental.database is a better name? or it is also affect other types of query? then we need to add more test cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also affects other types of queries. I can add test cases of other query types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leesf Hello, I have added test cases of other query types

.defaultValue(false)
.withDocumentation("Whether to add database name to qualify table name when setting parameters in Spark SQL query")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this modification have somethings to do with the pr title?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiarixiaoyao This title is not reflected because the form of set parameter is not supported previously. Adding this parameter is mainly consistent with Hive incremental query: ` HoodieHiveUtils.HOODIE_ INCREMENTAL_ USE_ DATABASE ', mainly considering the case that different databases have the same table name.

The reason why it is not described in detail in the PR is that it is uncertain whether the community will approve this form of query. If necessary, I can add a detailed description in the PR. In addition, only incremental queries are added to the test cases, excluding other query types. If necessary, I can add more detailed test cases

val INCREMENTAL_FORMAT_LATEST_STATE_VAL = "latest_state"
val INCREMENTAL_FORMAT_CDC_VAL = "cdc"
val INCREMENTAL_FORMAT: ConfigProperty[String] = ConfigProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}

import scala.collection.JavaConverters._
import scala.collection.mutable

/**
* Hoodie Spark Datasource, for reading and writing hoodie tables
Expand Down Expand Up @@ -93,15 +94,6 @@ class DefaultSource extends RelationProvider
Seq.empty
}

// Add default options for unspecified read options keys.
val parameters = (if (globPaths.nonEmpty) {
Map(
"glob.paths" -> globPaths.mkString(",")
)
} else {
Map()
}) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams)

// Get the table base path
val tablePath = if (globPaths.nonEmpty) {
DataSourceUtils.getTablePath(fs, globPaths.toArray)
Expand All @@ -111,10 +103,38 @@ class DefaultSource extends RelationProvider
log.info("Obtained hudi table path: " + tablePath)

val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
val sqlConf = sqlContext.sparkSession.sessionState.conf.getAllConfs
val queryConf = getQueryConf(sqlConf, metaClient)

// Add default options for unspecified read options keys.
val parameters = (if (globPaths.nonEmpty) {
Map(
"glob.paths" -> globPaths.mkString(",")
)
} else {
Map()
}) ++ DataSourceOptionsHelper.parametersWithReadDefaults(queryConf ++ optParams)

DefaultSource.createRelation(sqlContext, metaClient, schema, globPaths, parameters)
}

def getQueryConf(conf: Map[String, String], metaClient: HoodieTableMetaClient): Map[String,String] = {
val queryConf = new mutable.HashMap[String,String]

val isQueryUseDatabase = conf.getOrElse(QUERY_USE_DATABASE.key(), QUERY_USE_DATABASE.defaultValue().toString).toBoolean
val tableConfig = metaClient.getTableConfig
val tableName = if (isQueryUseDatabase) {
s"${tableConfig.getDatabaseName}.${tableConfig.getTableName}"
} else {
tableConfig.getTableName
}

conf.keySet.filter(_.contains(s"$tableName.")).foreach( key => {
queryConf.put(key.replace(s"$tableName.",""), conf.get(key).get)
})
queryConf.toMap
}

def getValidCommits(metaClient: HoodieTableMetaClient): String = {
metaClient
.getCommitsAndCompactionTimeline.filterCompletedInstants.getInstantsAsStream.toArray().map(_.asInstanceOf[HoodieInstant].getFileName).mkString(",")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi

class TestQueryTable extends HoodieSparkSqlTestBase {

test("Test incremental query with set parameters") {
val tableName = generateTableName

spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| dt string
|) using hudi
| partitioned by (dt)
| options (
| primaryKey = 'id',
| preCombineField = 'ts',
| type = 'cow'
| )
|""".stripMargin)
spark.sql(s"insert into $tableName values (1,'a1', 10, 1000, '2022-11-25')")
spark.sql(s"insert into $tableName values (2,'a2', 20, 2000, '2022-11-25')")
spark.sql(s"insert into $tableName values (3,'a3', 30, 3000, '2022-11-26')")
spark.sql(s"insert into $tableName values (4,'a4', 40, 4000, '2022-12-26')")
spark.sql(s"insert into $tableName values (5,'a5', 50, 5000, '2022-12-27')")

checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1", 10.0, 1000, "2022-11-25"),
Seq(2, "a2", 20.0, 2000, "2022-11-25"),
Seq(3, "a3", 30.0, 3000, "2022-11-26"),
Seq(4, "a4", 40.0, 4000, "2022-12-26"),
Seq(5, "a5", 50.0, 5000, "2022-12-27")
)


import spark.implicits._
val commits = spark.sql(s"call show_commits(table => '$tableName')").select("commit_time").map(k => k.getString(0)).take(10)
val beginTime = commits(commits.length - 2)

spark.sql(s"set hoodie.$tableName.datasource.query.type = incremental")
spark.sql(s"set hoodie.$tableName.datasource.read.begin.instanttime = $beginTime")
spark.sql(s"refresh table $tableName")
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(3, "a3", 30.0, 3000, "2022-11-26"),
Seq(4, "a4", 40.0, 4000, "2022-12-26"),
Seq(5, "a5", 50.0, 5000, "2022-12-27")
)

spark.sql(s"set hoodie.query.use.database = true")
spark.sql(s"refresh table $tableName")
val cnt = spark.sql(s"select * from $tableName").count()
assertResult(5)(cnt)

spark.sql(s"set hoodie.default.$tableName.datasource.query.type = incremental")
spark.sql(s"set hoodie.default.$tableName.datasource.read.begin.instanttime = $beginTime")
val endTime = commits(1)
spark.sql(s"set hoodie.default.$tableName.datasource.read.end.instanttime = $endTime")
spark.sql(s"refresh table $tableName")
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(3, "a3", 30.0, 3000, "2022-11-26"),
Seq(4, "a4", 40.0, 4000, "2022-12-26")
)

spark.sql(s"set hoodie.default.$tableName.datasource.read.incr.path.glob = /dt=2022-11*/*")
spark.sql(s"refresh table $tableName")
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(3, "a3", 30.0, 3000, "2022-11-26")
)

spark.conf.unset("hoodie.query.use.database")
spark.conf.unset(s"hoodie.$tableName.datasource.query.type")
spark.conf.unset(s"hoodie.default.$tableName.datasource.query.type")
}

test("Test snapshot query with set parameters") {
val tableName = generateTableName

spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| dt string
|) using hudi
| partitioned by (dt)
| options (
| primaryKey = 'id',
| preCombineField = 'ts',
| type = 'cow'
| )
|""".stripMargin)

spark.sql(s"insert into $tableName values (1,'a1', 10, 1000, '2022-11-25')")
spark.sql(s"insert into $tableName values (2,'a2', 20, 2000, '2022-11-25')")
spark.sql(s"update $tableName set price = 22 where id = 2")

checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1", 10.0, 1000, "2022-11-25"),
Seq(2, "a2", 22.0, 2000, "2022-11-25")
)

import spark.implicits._
val commits = spark.sql(s"call show_commits(table => '$tableName')").select("commit_time").map(k => k.getString(0)).take(10)
val beginTime = commits(commits.length - 2)

spark.sql(s"set $tableName.as.of.instant = $beginTime")
spark.sql(s"refresh table $tableName")
checkAnswer(s"select id, name, price, ts, dt from $tableName where id = 2")(
Seq(2, "a2", 20, 2000, "2022-11-25")
)
spark.sql(s"set hoodie.query.use.database = true")
spark.sql(s"refresh table $tableName")
checkAnswer(s"select id, name, price, ts, dt from $tableName where id = 2")(
Seq(2, "a2", 22, 2000, "2022-11-25")
)

spark.sql(s"set default.$tableName.as.of.instant = $beginTime")
spark.sql(s"refresh table $tableName")
checkAnswer(s"select id, name, price, ts, dt from $tableName where id = 2")(
Seq(2, "a2", 20, 2000, "2022-11-25")
)

spark.conf.unset("hoodie.query.use.database")
spark.conf.unset(s"hoodie.$tableName.datasource.query.type")
spark.conf.unset(s"hoodie.default.$tableName.datasource.query.type")
}

test("Test read_optimized query with set parameters") {
val tableName = generateTableName

spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| dt string
|) using hudi
| partitioned by (dt)
| options (
| primaryKey = 'id',
| preCombineField = 'ts',
| type = 'mor'
| )
|""".stripMargin)

spark.sql(s"insert into $tableName values (1,'a1', 10, 1000, '2022-11-25')")
spark.sql(s"insert into $tableName values (2,'a2', 20, 2000, '2022-11-25')")
spark.sql(s"update $tableName set price = 22 where id = 2")

checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1", 10.0, 1000, "2022-11-25"),
Seq(2, "a2", 22.0, 2000, "2022-11-25")
)

spark.sql(s"set hoodie.$tableName.datasource.query.type = read_optimized")
spark.sql(s"refresh table $tableName")
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1", 10.0, 1000, "2022-11-25"),
Seq(2, "a2", 20.0, 2000, "2022-11-25")
)

spark.sql(s"set hoodie.query.use.database = true")
spark.sql(s"refresh table $tableName")
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1", 10.0, 1000, "2022-11-25"),
Seq(2, "a2", 22.0, 2000, "2022-11-25")
)

spark.sql(s"set hoodie.default.$tableName.datasource.query.type = read_optimized")
spark.sql(s"refresh table $tableName")
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1", 10.0, 1000, "2022-11-25"),
Seq(2, "a2", 20.0, 2000, "2022-11-25")
)

spark.conf.unset("hoodie.query.use.database")
spark.conf.unset(s"hoodie.$tableName.datasource.query.type")
spark.conf.unset(s"hoodie.default.$tableName.datasource.query.type")
}
}