Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import org.apache.spark.sql.hudi.SparkAdapter
trait SparkAdapterSupport {

lazy val sparkAdapter: SparkAdapter = {
val adapterClass = if (HoodieSparkUtils.isSpark3) {
val adapterClass = if (HoodieSparkUtils.gteqSpark3_2) {
"org.apache.spark.sql.adapter.Spark3_2Adapter"
} else if (HoodieSparkUtils.isSpark3_0 || HoodieSparkUtils.isSpark3_1) {
"org.apache.spark.sql.adapter.Spark3Adapter"
} else {
"org.apache.spark.sql.adapter.Spark2Adapter"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ trait SparkAdapter extends Serializable {
def getInsertIntoChildren(plan: LogicalPlan):
Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)]

/**
* if the logical plan is a TimeTravelRelation LogicalPlan.
*/
def isRelationTimeTravel(plan: LogicalPlan): Boolean

/**
* Get the member of the TimeTravelRelation LogicalPlan.
*/
def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])]

/**
* Create a Insert Into LogicalPlan.
*/
Expand Down
18 changes: 18 additions & 0 deletions hudi-spark-datasource/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,21 @@ file that supports spark sql on spark 2.x version.
has no class since hudi only supports spark 2.4.4 version, and it acts as the placeholder when packaging hudi-spark-bundle module.
* hudi-spark3-common is the module that contains the code that would be reused between spark3.x versions.
* hudi-spark-common is the module that contains the code that would be reused between spark2.x and spark3.x versions.

## Description of Time Travel
* `HoodieSpark3_2ExtendedSqlAstBuilder` have comments in the spark3.2's code fork from `org.apache.spark.sql.catalyst.parser.AstBuilder`, and additional `withTimeTravel` method.
* `SqlBase.g4` have comments in the code forked from spark3.2's parser, and add SparkSQL Syntax `TIMESTAMP AS OF` and `VERSION AS OF`.

### Time Travel Support Spark Version:

| version | support |
| ------ | ------- |
| 2.4.x | No |
| 3.0.x | No |
| 3.1.2 | No |
| 3.2.0 | Yes |

### About upgrading Time Travel:
Spark3.3 support time travel syntax link [SPARK-37219](https://issues.apache.org/jira/browse/SPARK-37219).
Once Spark 3.3 released. The files in the following list will be removed:
* hudi-spark3's `HoodieSpark3_2ExtendedSqlAstBuilder.scala`、`HoodieSpark3_2ExtendedSqlParser.scala`、`TimeTravelRelation.scala`、`SqlBase.g4`、`HoodieSqlBase.g4`
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ package org.apache.spark.sql.hudi.analysis
import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.util.ReflectionUtils
import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedStar}
import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, SparkAdapterSupport}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.catalog.{CatalogUtils, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, GenericInternalRow, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{getTableIdentifier, removeMetaFields}
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.hudi.command._
Expand Down Expand Up @@ -113,6 +114,7 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
case _ =>
l
}

// Convert to CreateHoodieTableAsSelectCommand
case CreateTable(table, mode, Some(query))
if query.resolved && sparkAdapter.isHoodieTable(table) =>
Expand Down Expand Up @@ -396,6 +398,37 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
l
}

case l if sparkAdapter.isRelationTimeTravel(l) =>
val (plan: UnresolvedRelation, timestamp, version) =
sparkAdapter.getRelationTimeTravel(l).get

if (timestamp.isEmpty && version.nonEmpty) {
throw new AnalysisException(
"version expression is not supported for time travel")
}

val tableIdentifier = sparkAdapter.toTableIdentifier(plan)
if (sparkAdapter.isHoodieTable(tableIdentifier, sparkSession)) {
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
val table = hoodieCatalogTable.table
val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
val instantOption = Map(
DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key -> timestamp.get.toString())
val dataSource =
DataSource(
sparkSession,
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
className = table.provider.get,
options = table.storage.properties ++ pathOption ++ instantOption,
catalogTable = Some(table))

LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
} else {
l
}

case p => p
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi

import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.table.HoodieTableMetaClient

class TestTimeTravelTable extends TestHoodieSqlBase {
test("Test Insert and Update Record with time travel") {
if (HoodieSparkUtils.gteqSpark3_2) {
withTempDir { tmp =>
val tableName1 = generateTableName
spark.sql(
s"""
|create table $tableName1 (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| tblproperties (
| type = 'cow',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
| location '${tmp.getCanonicalPath}/$tableName1'
""".stripMargin)

spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")

val metaClient1 = HoodieTableMetaClient.builder()
.setBasePath(s"${tmp.getCanonicalPath}/$tableName1")
.setConf(spark.sessionState.newHadoopConf())
.build()

val instant1 = metaClient1.getActiveTimeline.getAllCommitsTimeline
.lastInstant().get().getTimestamp

spark.sql(s"insert into $tableName1 values(1, 'a2', 20, 2000)")

checkAnswer(s"select id, name, price, ts from $tableName1")(
Seq(1, "a2", 20.0, 2000)
)

// time travel from instant1
checkAnswer(
s"select id, name, price, ts from $tableName1 TIMESTAMP AS OF '$instant1'")(
Seq(1, "a1", 10.0, 1000)
)
}
}
}

test("Test Insert Into Records with time travel To new Table") {
if (HoodieSparkUtils.gteqSpark3_2) {
withTempDir { tmp =>
// Create Non-Partitioned table
val tableName1 = generateTableName
spark.sql(
s"""
|create table $tableName1 (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| tblproperties (
| type = 'cow',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
| location '${tmp.getCanonicalPath}/$tableName1'
""".stripMargin)

spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")

val metaClient1 = HoodieTableMetaClient.builder()
.setBasePath(s"${tmp.getCanonicalPath}/$tableName1")
.setConf(spark.sessionState.newHadoopConf())
.build()

val instant1 = metaClient1.getActiveTimeline.getAllCommitsTimeline
.lastInstant().get().getTimestamp


val tableName2 = generateTableName
// Create a partitioned table
spark.sql(
s"""
|create table $tableName2 (
| id int,
| name string,
| price double,
| ts long,
| dt string
|) using hudi
| tblproperties (primaryKey = 'id')
| partitioned by (dt)
| location '${tmp.getCanonicalPath}/$tableName2'
""".stripMargin)

// Insert into dynamic partition
spark.sql(
s"""
| insert into $tableName2
| select id, name, price, ts, '2022-02-14' as dt
| from $tableName1 TIMESTAMP AS OF '$instant1'
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName2")(
Seq(1, "a1", 10.0, 1000, "2022-02-14")
)

// Insert into static partition
spark.sql(
s"""
| insert into $tableName2 partition(dt = '2022-02-15')
| select 2 as id, 'a2' as name, price, ts
| from $tableName1 TIMESTAMP AS OF '$instant1'
""".stripMargin)
checkAnswer(
s"select id, name, price, ts, dt from $tableName2")(
Seq(1, "a1", 10.0, 1000, "2022-02-14"),
Seq(2, "a2", 10.0, 1000, "2022-02-15")
)
}
}
}

test("Test Two Table's Union Join with time travel") {
if (HoodieSparkUtils.gteqSpark3_2) {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName

val basePath = tmp.getCanonicalPath
val tableName1 = tableName + "_1"
val tableName2 = tableName + "_2"
val path1 = s"$basePath/$tableName1"
val path2 = s"$basePath/$tableName2"

spark.sql(
s"""
|create table $tableName1 (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| tblproperties (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
| location '$path1'
""".stripMargin)

spark.sql(
s"""
|create table $tableName2 (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| tblproperties (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
| location '$path2'
""".stripMargin)

spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName1 values(2, 'a2', 20, 1000)")

checkAnswer(s"select id, name, price, ts from $tableName1")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 20.0, 1000)
)

checkAnswer(s"select id, name, price, ts from $tableName1")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 20.0, 1000)
)

spark.sql(s"insert into $tableName2 values(3, 'a3', 10, 1000)")
spark.sql(s"insert into $tableName2 values(4, 'a4', 20, 1000)")

checkAnswer(s"select id, name, price, ts from $tableName2")(
Seq(3, "a3", 10.0, 1000),
Seq(4, "a4", 20.0, 1000)
)

val metaClient1 = HoodieTableMetaClient.builder()
.setBasePath(path1)
.setConf(spark.sessionState.newHadoopConf())
.build()

val metaClient2 = HoodieTableMetaClient.builder()
.setBasePath(path2)
.setConf(spark.sessionState.newHadoopConf())
.build()

val instant1 = metaClient1.getActiveTimeline.getAllCommitsTimeline
.lastInstant().get().getTimestamp

val instant2 = metaClient2.getActiveTimeline.getAllCommitsTimeline
.lastInstant().get().getTimestamp

val sql =
s"""
|select id, name, price, ts from $tableName1 TIMESTAMP AS OF '$instant1' where id=1
|union
|select id, name, price, ts from $tableName2 TIMESTAMP AS OF '$instant2' where id>1
|""".stripMargin

checkAnswer(sql)(
Seq(1, "a1", 10.0, 1000),
Seq(3, "a3", 10.0, 1000),
Seq(4, "a4", 20.0, 1000)
)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ assignmentList
assignment
: key=qualifiedName EQ value=expression
;

qualifiedNameList
: qualifiedName (',' qualifiedName)*
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,18 @@ class Spark2Adapter extends SparkAdapter {
closePartition()
partitions.toSeq
}

/**
* if the logical plan is a TimeTravelRelation LogicalPlan.
*/
override def isRelationTimeTravel(plan: LogicalPlan): Boolean = {
false
}

/**
* Get the member of the TimeTravelRelation LogicalPlan.
*/
override def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])] = {
throw new IllegalStateException(s"Should not call getRelationTimeTravel for spark2")
}
}
Loading