Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
316e84b
Make sure `TestCOWDataSource` properly invokes `HoodieSparkSessionExt…
Jun 2, 2022
71ffd84
Make sure `HoodieBaseRelation` always prefers `schemaSpec` over fetch…
Jun 2, 2022
1b2c6b2
Fixing `HoodieSpark3Analysis` rule to properly pass in schema from th…
Jun 2, 2022
20d1de9
Fixed missing conversion
Jun 2, 2022
82b3876
Added `LogicalPlan` caching for Hudi's relations
Jun 2, 2022
5725bd9
Properly override outputs of the cached `LogicalRelation`
Jun 3, 2022
1528d3e
Tidying up
Jun 3, 2022
1a14aeb
Make `HoodieCatalog` and `Spark3DefaultSource` return `V1Table` inste…
Jun 3, 2022
8549c42
Added `HoodieV1Table` extractor to match Hudi tables;
Jun 3, 2022
db7b14b
Fixing compilation
Jun 3, 2022
cba55db
Fix missing fixture of the `DateTimeZone.default`
Jun 3, 2022
81b1835
Reverting DSv2 APIs
Jun 3, 2022
21f8f01
Make Hudi resolve into either V1 or V2 relations based on whether Sch…
Jun 4, 2022
9fe849a
Restored resolution-rule to fallback from V2 Relation to V1
Jun 4, 2022
481fbf6
Rebased Schema Evolution commands resolution rule to use V2 table
Jun 4, 2022
0ad9182
Fixed schema evoluation rule injection seq
Jun 4, 2022
d5d10fb
Make sure V2 to V1 fallback is properly wired
Jun 4, 2022
191a003
Tidying up
Jun 4, 2022
3e6325d
Tidying up
Jun 4, 2022
1328ada
Fixed V2 to V1 fallback to also apply to Spark 3.1;
Jun 6, 2022
b20a466
Fixing compilation
Jun 6, 2022
f48b918
Reverting back invalid bifurcation b/w Spark 3.1 and 3.2
Jun 6, 2022
986d633
Make sure we're looking up configs appropriately
Jun 6, 2022
fb22256
Moved `SCHEMA_EVOLUTION_ENABLED` config to `HoodieCommonConfig`
Jun 6, 2022
b20712e
Fixed rules evaluation order
Jun 7, 2022
323dee0
Inline Alter Table commands resolving rules to control ordering of th…
Jun 7, 2022
477eaec
Avoid providing catalog schema in cases when Hive catalog is used
Jun 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.util

/**
* Utility allowing for seamless conversion b/w Java/Scala functional primitives
*/
object JFunction {

def toScala[T, R](f: java.util.function.Function[T, R]): T => R =
(t: T) => f.apply(t)

def toJava[T](f: T => Unit): java.util.function.Consumer[T] =
new java.util.function.Consumer[T] {
override def accept(t: T): Unit = f.apply(t)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: redundant empty line?

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.{FilePartition, LogicalRelation, PartitionedFile, SparkParsePartitionUtil}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, Row, SparkSession}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat

import java.util.Locale

Expand Down Expand Up @@ -141,8 +140,8 @@ trait SparkAdapter extends Serializable {
maxSplitBytes: Long): Seq[FilePartition]

def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = {
tripAlias(table) match {
case LogicalRelation(_, _, Some(tbl), _) => isHoodieTable(tbl)
unfoldSubqueryAliases(table) match {
case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table)
case relation: UnresolvedRelation =>
isHoodieTable(toTableIdentifier(relation), spark)
case _=> false
Expand All @@ -162,19 +161,19 @@ trait SparkAdapter extends Serializable {
isHoodieTable(table)
}

def tripAlias(plan: LogicalPlan): LogicalPlan = {
protected def unfoldSubqueryAliases(plan: LogicalPlan): LogicalPlan = {
plan match {
case SubqueryAlias(_, relation: LogicalPlan) =>
tripAlias(relation)
unfoldSubqueryAliases(relation)
case other =>
other
}
}

/**
* Create customresolutionRule to deal with alter command for hudi.
* Create custom Resolution Rule to deal with alter command for hudi.
*/
def createResolveHudiAlterTableCommand(sparkSession: SparkSession): Rule[LogicalPlan]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this affect ALTER TABLE command?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually just been moved to HoodieAnalysis for tighter control of the rules ordering

def createResolveHudiAlterTableCommand(): Option[SparkSession => Rule[LogicalPlan]] = None

/**
* Create instance of [[ParquetFileFormat]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.HoodieConversionUtils;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
Expand Down Expand Up @@ -73,12 +74,14 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.hudi.util.JFunction;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSessionExtensions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -98,6 +101,7 @@
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -145,6 +149,10 @@ public static void tearDownAll() throws IOException {
FileSystem.closeAll();
}

protected Option<Consumer<SparkSessionExtensions>> getSparkSessionExtensionsInjector() {
return Option.empty();
}

@BeforeEach
public void setTestMethodName(TestInfo testInfo) {
if (testInfo.getTestMethod().isPresent()) {
Expand Down Expand Up @@ -186,16 +194,32 @@ public void cleanupResources() throws IOException {
* @param appName The specified application name.
*/
protected void initSparkContexts(String appName) {
Option<Consumer<SparkSessionExtensions>> sparkSessionExtensionsInjector =
getSparkSessionExtensionsInjector();

if (sparkSessionExtensionsInjector.isPresent()) {
// In case we need to inject extensions into Spark Session, we have
// to stop any session that might still be active and since Spark will try
// to re-use it
HoodieConversionUtils.toJavaOption(SparkSession.getActiveSession())
.ifPresent(SparkSession::stop);
}

// Initialize a local spark env
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName + "#" + testMethodName));
jsc.setLogLevel("ERROR");
hadoopConf = jsc.hadoopConfiguration();

// SQLContext stuff
sqlContext = new SQLContext(jsc);
hadoopConf = jsc.hadoopConfiguration();
context = new HoodieSparkEngineContext(jsc);
hadoopConf = context.getHadoopConf().get();
sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();

sparkSession = SparkSession.builder()
.withExtensions(JFunction.toScala(sparkSessionExtensions -> {
sparkSessionExtensionsInjector.ifPresent(injector -> injector.accept(sparkSessionExtensions));
return null;
}))
.config(jsc.getConf())
.getOrCreate();
sqlContext = new SQLContext(sparkSession);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ object DataSourceReadOptions {
.key("hoodie.datasource.read.incr.fallback.fulltablescan.enable")
.defaultValue("false")
.withDocumentation("When doing an incremental query whether we should fall back to full table scans if file does not exist.")

val SCHEMA_EVOLUTION_ENABLED: ConfigProperty[Boolean] = HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to move HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE into a hudi-common config class so its shared across writing and queries.


/** @deprecated Use {@link QUERY_TYPE} and its methods instead */
@Deprecated
val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
Expand Down Expand Up @@ -74,7 +74,7 @@ case class HoodieTableState(tablePath: String,
abstract class HoodieBaseRelation(val sqlContext: SQLContext,
val metaClient: HoodieTableMetaClient,
val optParams: Map[String, String],
userSchema: Option[StructType])
schemaSpec: Option[StructType])
extends BaseRelation
with FileRelation
with PrunedFilteredScan
Expand Down Expand Up @@ -128,24 +128,28 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
*/
protected lazy val (tableAvroSchema: Schema, internalSchema: InternalSchema) = {
val schemaResolver = new TableSchemaResolver(metaClient)
val avroSchema = Try(schemaResolver.getTableAvroSchema) match {
case Success(schema) => schema
case Failure(e) =>
logWarning("Failed to fetch schema from the table", e)
// If there is no commit in the table, we can't get the schema
// t/h [[TableSchemaResolver]], fallback to the provided [[userSchema]] instead.
userSchema match {
case Some(s) => convertToAvroSchema(s)
case _ => throw new IllegalArgumentException("User-provided schema is required in case the table is empty")
}
val avroSchema: Schema = schemaSpec.map(convertToAvroSchema).getOrElse {
Try(schemaResolver.getTableAvroSchema) match {
case Success(schema) => schema
case Failure(e) =>
logError("Failed to fetch schema from the table", e)
throw new HoodieSchemaException("Failed to fetch schema from the table")
}
}
// try to find internalSchema
val internalSchemaFromMeta = try {
schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema)
} catch {
case _: Exception => InternalSchema.getEmptyInternalSchema

val internalSchema: InternalSchema = if (!isSchemaEvolutionEnabled) {
InternalSchema.getEmptyInternalSchema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that the user needs to make sure the schema evolution related config provided by the user must be consistent with what's in table (e.g., if table has evolved schema, while the isSchemaEvolutionEnabled is derived as false, then read result may be inconsistent)? Do we need to add docs and release notes for the change expectation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Users will now have to specify Schema Evolution config for both writes and reads

} else {
Try(schemaResolver.getTableInternalSchemaFromCommitMetadata) match {
case Success(internalSchemaOpt) =>
toScalaOption(internalSchemaOpt).getOrElse(InternalSchema.getEmptyInternalSchema)
case Failure(e) =>
logWarning("Failed to fetch internal-schema from the table", e)
InternalSchema.getEmptyInternalSchema
}
}
(avroSchema, internalSchemaFromMeta)

(avroSchema, internalSchema)
}

protected lazy val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
Expand Down Expand Up @@ -503,6 +507,15 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

private def prunePartitionColumns(dataStructSchema: StructType): StructType =
StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name)))

private def isSchemaEvolutionEnabled = {
// NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as
// t/h Spark Session configuration (for ex, for Spark SQL)
optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean ||
sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
}
}

object HoodieBaseRelation extends SparkAdapterSupport {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,13 @@ class HoodieSparkSessionExtension extends (SparkSessionExtensions => Unit)
new HoodieCommonSqlParser(session, parser)
}

HoodieAnalysis.customResolutionRules().foreach { rule =>
HoodieAnalysis.customResolutionRules.foreach { ruleBuilder =>
extensions.injectResolutionRule { session =>
rule(session)
ruleBuilder(session)
}
}

extensions.injectResolutionRule { session =>
sparkAdapter.createResolveHudiAlterTableCommand(session)
}

HoodieAnalysis.customPostHocResolutionRules().foreach { rule =>
HoodieAnalysis.customPostHocResolutionRules.foreach { rule =>
extensions.injectPostHocResolutionRule { session =>
rule(session)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,45 +39,58 @@ import org.apache.spark.sql.{AnalysisException, SparkSession}

import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer

object HoodieAnalysis {
def customResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] =
Seq(
type RuleBuilder = SparkSession => Rule[LogicalPlan]

def customResolutionRules: Seq[RuleBuilder] = {
val rules: ListBuffer[RuleBuilder] = ListBuffer(
// Default rules
session => HoodieResolveReferences(session),
session => HoodieAnalysis(session)
) ++ extraResolutionRules()
)

def customPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] =
Seq(
session => HoodiePostAnalysisRule(session)
) ++ extraPostHocResolutionRules()
rules ++= sparkAdapter.createResolveHudiAlterTableCommand().toSeq

def extraResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] = {
if (HoodieSparkUtils.gteqSpark3_2) {
val spark3AnalysisClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3Analysis"
val spark3Analysis: SparkSession => Rule[LogicalPlan] =
val spark3Analysis: RuleBuilder =
session => ReflectionUtils.loadClass(spark3AnalysisClass, session).asInstanceOf[Rule[LogicalPlan]]

val spark3ResolveReferences = "org.apache.spark.sql.hudi.analysis.HoodieSpark3ResolveReferences"
val spark3References: SparkSession => Rule[LogicalPlan] =
session => ReflectionUtils.loadClass(spark3ResolveReferences, session).asInstanceOf[Rule[LogicalPlan]]
val spark3ResolveReferencesClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3ResolveReferences"
val spark3ResolveReferences: RuleBuilder =
session => ReflectionUtils.loadClass(spark3ResolveReferencesClass, session).asInstanceOf[Rule[LogicalPlan]]

Seq(spark3Analysis, spark3References)
} else {
Seq.empty
rules ++= Seq(spark3Analysis, spark3ResolveReferences)

val dataSourceV2ToV1FallbackClass = "org.apache.spark.sql.hudi.analysis.HoodieDataSourceV2ToV1Fallback"
val dataSourceV2ToV1Fallback: RuleBuilder =
session => ReflectionUtils.loadClass(dataSourceV2ToV1FallbackClass, session).asInstanceOf[Rule[LogicalPlan]]

rules += dataSourceV2ToV1Fallback
}

rules
}

def extraPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] =
def customPostHocResolutionRules: Seq[RuleBuilder] = {
val rules: ListBuffer[RuleBuilder] = ListBuffer(
// Default rules
session => HoodiePostAnalysisRule(session)
)

if (HoodieSparkUtils.gteqSpark3_2) {
val spark3PostHocResolutionClass = "org.apache.spark.sql.hudi.analysis.HoodieSpark3PostAnalysisRule"
val spark3PostHocResolution: SparkSession => Rule[LogicalPlan] =
val spark3PostHocResolution: RuleBuilder =
session => ReflectionUtils.loadClass(spark3PostHocResolutionClass, session).asInstanceOf[Rule[LogicalPlan]]

Seq(spark3PostHocResolution)
} else {
Seq.empty
rules += spark3PostHocResolution
}

rules
}

}

/**
Expand Down
Loading