-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-4178] Addressing performance regressions in Spark DataSourceV2 Integration #5737
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
316e84b
71ffd84
1b2c6b2
20d1de9
82b3876
5725bd9
1528d3e
1a14aeb
8549c42
db7b14b
cba55db
81b1835
21f8f01
9fe849a
481fbf6
0ad9182
d5d10fb
191a003
3e6325d
1328ada
b20a466
f48b918
986d633
fb22256
b20712e
323dee0
477eaec
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hudi.util | ||
|
|
||
| /** | ||
| * Utility allowing for seamless conversion b/w Java/Scala functional primitives | ||
| */ | ||
| object JFunction { | ||
|
|
||
| def toScala[T, R](f: java.util.function.Function[T, R]): T => R = | ||
| (t: T) => f.apply(t) | ||
|
|
||
| def toJava[T](f: T => Unit): java.util.function.Consumer[T] = | ||
| new java.util.function.Consumer[T] { | ||
| override def accept(t: T): Unit = f.apply(t) | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,14 +28,13 @@ import org.apache.spark.sql.catalyst.expressions.Expression | |
| import org.apache.spark.sql.catalyst.parser.ParserInterface | ||
| import org.apache.spark.sql.catalyst.plans.JoinType | ||
| import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias} | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} | ||
| import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat | ||
| import org.apache.spark.sql.execution.datasources.{FilePartition, LogicalRelation, PartitionedFile, SparkParsePartitionUtil} | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.types.DataType | ||
| import org.apache.spark.sql.{HoodieCatalystExpressionUtils, Row, SparkSession} | ||
| import org.apache.spark.sql.{Row, SparkSession} | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat | ||
|
|
||
| import java.util.Locale | ||
|
|
||
|
|
@@ -141,8 +140,8 @@ trait SparkAdapter extends Serializable { | |
| maxSplitBytes: Long): Seq[FilePartition] | ||
|
|
||
| def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = { | ||
| tripAlias(table) match { | ||
| case LogicalRelation(_, _, Some(tbl), _) => isHoodieTable(tbl) | ||
| unfoldSubqueryAliases(table) match { | ||
| case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table) | ||
| case relation: UnresolvedRelation => | ||
| isHoodieTable(toTableIdentifier(relation), spark) | ||
| case _=> false | ||
|
|
@@ -162,20 +161,15 @@ trait SparkAdapter extends Serializable { | |
| isHoodieTable(table) | ||
| } | ||
|
|
||
| def tripAlias(plan: LogicalPlan): LogicalPlan = { | ||
| protected def unfoldSubqueryAliases(plan: LogicalPlan): LogicalPlan = { | ||
| plan match { | ||
| case SubqueryAlias(_, relation: LogicalPlan) => | ||
| tripAlias(relation) | ||
| unfoldSubqueryAliases(relation) | ||
| case other => | ||
| other | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Create customresolutionRule to deal with alter command for hudi. | ||
| */ | ||
| def createResolveHudiAlterTableCommand(sparkSession: SparkSession): Rule[LogicalPlan] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this affect
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This actually just been moved to |
||
|
|
||
| /** | ||
| * Create instance of [[ParquetFileFormat]] | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,7 +34,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView | |
| import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} | ||
| import org.apache.hudi.common.util.StringUtils | ||
| import org.apache.hudi.common.util.ValidationUtils.checkState | ||
| import org.apache.hudi.internal.schema.InternalSchema | ||
| import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema} | ||
| import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter | ||
| import org.apache.hudi.io.storage.HoodieHFileReader | ||
| import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex | ||
|
|
@@ -74,7 +74,7 @@ case class HoodieTableState(tablePath: String, | |
| abstract class HoodieBaseRelation(val sqlContext: SQLContext, | ||
| val metaClient: HoodieTableMetaClient, | ||
| val optParams: Map[String, String], | ||
| userSchema: Option[StructType]) | ||
| schemaSpec: Option[StructType]) | ||
| extends BaseRelation | ||
| with FileRelation | ||
| with PrunedFilteredScan | ||
|
|
@@ -128,24 +128,28 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, | |
| */ | ||
| protected lazy val (tableAvroSchema: Schema, internalSchema: InternalSchema) = { | ||
| val schemaResolver = new TableSchemaResolver(metaClient) | ||
| val avroSchema = Try(schemaResolver.getTableAvroSchema) match { | ||
| case Success(schema) => schema | ||
| case Failure(e) => | ||
| logWarning("Failed to fetch schema from the table", e) | ||
| // If there is no commit in the table, we can't get the schema | ||
| // t/h [[TableSchemaResolver]], fallback to the provided [[userSchema]] instead. | ||
| userSchema match { | ||
| case Some(s) => convertToAvroSchema(s) | ||
| case _ => throw new IllegalArgumentException("User-provided schema is required in case the table is empty") | ||
| } | ||
| val avroSchema: Schema = schemaSpec.map(convertToAvroSchema).getOrElse { | ||
| Try(schemaResolver.getTableAvroSchema) match { | ||
| case Success(schema) => schema | ||
| case Failure(e) => | ||
| logError("Failed to fetch schema from the table", e) | ||
| throw new HoodieSchemaException("Failed to fetch schema from the table") | ||
| } | ||
| } | ||
| // try to find internalSchema | ||
| val internalSchemaFromMeta = try { | ||
| schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema) | ||
| } catch { | ||
| case _: Exception => InternalSchema.getEmptyInternalSchema | ||
|
|
||
| val internalSchema: InternalSchema = if (!isSchemaEvolutionEnabled) { | ||
| InternalSchema.getEmptyInternalSchema | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this mean that the user needs to make sure the schema evolution related config provided by the user must be consistent with what's in table (e.g., if table has evolved schema, while the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct. Users will now have to specify Schema Evolution config for both writes and reads |
||
| } else { | ||
| Try(schemaResolver.getTableInternalSchemaFromCommitMetadata) match { | ||
| case Success(internalSchemaOpt) => | ||
| toScalaOption(internalSchemaOpt).getOrElse(InternalSchema.getEmptyInternalSchema) | ||
| case Failure(e) => | ||
| logWarning("Failed to fetch internal-schema from the table", e) | ||
| InternalSchema.getEmptyInternalSchema | ||
| } | ||
| } | ||
| (avroSchema, internalSchemaFromMeta) | ||
|
|
||
| (avroSchema, internalSchema) | ||
| } | ||
|
|
||
| protected lazy val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) | ||
|
|
@@ -503,6 +507,15 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, | |
|
|
||
| private def prunePartitionColumns(dataStructSchema: StructType): StructType = | ||
| StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name))) | ||
|
|
||
| private def isSchemaEvolutionEnabled = { | ||
| // NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as | ||
| // t/h Spark Session configuration (for ex, for Spark SQL) | ||
| optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, | ||
| DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean || | ||
| sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, | ||
| DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean | ||
| } | ||
| } | ||
|
|
||
| object HoodieBaseRelation extends SparkAdapterSupport { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: redundant empty line?