Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,29 @@ public enum TimestampType implements Serializable {

protected final boolean encodePartitionPath;

/**
* Supported configs.
*/
public static class Config {

// One value from TimestampType above
public static final String TIMESTAMP_TYPE_FIELD_PROP = "hoodie.deltastreamer.keygen.timebased.timestamp.type";
public static final String INPUT_TIME_UNIT =
"hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit";
//This prop can now accept list of input date formats.
public static final String TIMESTAMP_INPUT_DATE_FORMAT_PROP =
"hoodie.deltastreamer.keygen.timebased.input.dateformat";
public static final String TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP = "hoodie.deltastreamer.keygen.timebased.input.dateformat.list.delimiter.regex";
public static final String TIMESTAMP_INPUT_TIMEZONE_FORMAT_PROP = "hoodie.deltastreamer.keygen.timebased.input.timezone";
public static final String TIMESTAMP_OUTPUT_DATE_FORMAT_PROP =
"hoodie.deltastreamer.keygen.timebased.output.dateformat";
//still keeping this prop for backward compatibility so that functionality for existing users does not break.
public static final String TIMESTAMP_TIMEZONE_FORMAT_PROP =
"hoodie.deltastreamer.keygen.timebased.timezone";
public static final String TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP = "hoodie.deltastreamer.keygen.timebased.output.timezone";
static final String DATE_TIME_PARSER_PROP = "hoodie.deltastreamer.keygen.datetime.parser.class";
}

public TimestampBasedAvroKeyGenerator(TypedProperties config) throws IOException {
this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ import org.apache.spark.sql.hudi.SparkAdapter
trait SparkAdapterSupport {

lazy val sparkAdapter: SparkAdapter = {
val adapterClass = if (HoodieSparkUtils.isSpark3) {
val adapterClass = if (HoodieSparkUtils.isSpark3_1) {
"org.apache.spark.sql.adapter.Spark3_1Adapter"
} else if (HoodieSparkUtils.isSpark3_2) {
"org.apache.spark.sql.adapter.Spark3_2Adapter"
} else if (HoodieSparkUtils.isSpark3) {
"org.apache.spark.sql.adapter.Spark3Adapter"
} else {
"org.apache.spark.sql.adapter.Spark2Adapter"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.spark.sql.hudi

import org.apache.hudi.HoodieSparkUtils.sparkAdapter
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@

package org.apache.spark.sql.hudi.analysis

import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.util.ReflectionUtils
import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedStar}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.catalog.{CatalogUtils, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{getTableIdentifier, removeMetaFields}
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.hudi.command._
Expand Down Expand Up @@ -110,6 +112,7 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
case _ =>
l
}

// Convert to CreateHoodieTableAsSelectCommand
case CreateTable(table, mode, Some(query))
if query.resolved && sparkAdapter.isHoodieTable(table) =>
Expand All @@ -133,7 +136,7 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
// Convert to CompactionShowHoodiePathCommand
case CompactionShowOnPath(path, limit) =>
CompactionShowHoodiePathCommand(path, limit)
case _=> plan
case _ => plan
}
}
}
Expand Down Expand Up @@ -350,6 +353,35 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
l
}

case TimeTravelRelation(plan: UnresolvedRelation, timestamp, version) =>
// TODO: How to use version to perform time travel?
if (timestamp.isEmpty && version.nonEmpty) {
throw new AnalysisException(
"version expression is not support for time travel")
}

val tableIdentifier = sparkAdapter.toTableIdentifier(plan)
if (sparkAdapter.isHoodieTable(tableIdentifier, sparkSession)) {
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
val table = hoodieCatalogTable.table
val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
val instantOption = Map(
DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key -> timestamp.get.toString())
val dataSource =
DataSource(
sparkSession,
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
className = table.provider.get,
options = table.storage.properties ++ pathOption ++ instantOption,
catalogTable = Some(table))

LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
} else {
plan
}

case p => p
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{Project, TimeTravelRelation}

class TestTimeTravelParser extends TestHoodieSqlBase {
private val parser = spark.sessionState.sqlParser

test("time travel of timestamp") {
val timeTravelPlan1 = parser.parsePlan("SELECT * FROM A.B " +
"TIMESTAMP AS OF '2019-01-29 00:37:58'")

assertResult(Project(Seq(UnresolvedStar(None)),
TimeTravelRelation(
UnresolvedRelation(new TableIdentifier("B", Option.apply("A"))),
Some(Literal("2019-01-29 00:37:58")),
None))) {
timeTravelPlan1
}

val timeTravelPlan2 = parser.parsePlan("SELECT * FROM A.B " +
"TIMESTAMP AS OF 1643119574")

assertResult(Project(Seq(UnresolvedStar(None)),
TimeTravelRelation(
UnresolvedRelation(new TableIdentifier("B", Option.apply("A"))),
Some(Literal(1643119574)),
None))) {
timeTravelPlan2
}
}

test("time travel of version") {
val timeTravelPlan1 = parser.parsePlan("SELECT * FROM A.B " +
"VERSION AS OF 'Snapshot123456789'")

assertResult(Project(Seq(UnresolvedStar(None)),
TimeTravelRelation(
UnresolvedRelation(new TableIdentifier("B", Option.apply("A"))),
None,
Some("Snapshot123456789")))) {
timeTravelPlan1
}

val timeTravelPlan2 = parser.parsePlan("SELECT * FROM A.B " +
"VERSION AS OF 'Snapshot01'")

assertResult(Project(Seq(UnresolvedStar(None)),
TimeTravelRelation(
UnresolvedRelation(new TableIdentifier("B", Option.apply("A"))),
None,
Some("Snapshot01")))) {
timeTravelPlan2
}
}
}
Loading