Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ trait SparkAdapter extends Serializable {
/**
* Convert a AliasIdentifier to TableIdentifier.
*/
def toTableIdentify(aliasId: AliasIdentifier): TableIdentifier
def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier

/**
* Convert a UnresolvedRelation to TableIdentifier.
*/
def toTableIdentify(relation: UnresolvedRelation): TableIdentifier
def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier

/**
* Create Join logical plan.
Expand Down
38 changes: 38 additions & 0 deletions hudi-spark-datasource/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<!--
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
-->

# Description of the relationship between each module

This repo contains the code that integrate Hudi with Spark. The repo is split into the following modules

`hudi-spark`
`hudi-spark2`
`hudi-spark3`
`hudi-spark3.1.x`
`hudi-spark2-common`
`hudi-spark3-common`
`hudi-spark-common`

* hudi-spark is the module that contains the code that both spark2 & spark3 version would share, also contains the antlr4
file that supports spark sql on spark 2.x version.
* hudi-spark2 is the module that contains the code that compatible with spark 2.x versions.
* hudi-spark3 is the module that contains the code that compatible with spark 3.2.0(and above) versions。
* hudi-spark3.1.x is the module that contains the code that compatible with spark3.1.x and spark3.0.x version.
* hudi-spark2-common is the module that contains the code that would be reused between spark2.x versions, right now the module
has no class since hudi only supports spark 2.4.4 version, and it acts as the placeholder when packaging hudi-spark-bundle module.
* hudi-spark3-common is the module that contains the code that would be reused between spark3.x versions.
* hudi-spark-common is the module that contains the code that would be reused between spark2.x and spark3.x versions.
42 changes: 42 additions & 0 deletions hudi-spark-datasource/hudi-spark-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,51 @@
<version>${project.version}</version>
</dependency>

<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>

<!-- Spark (Packages) -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>

<!-- Hoodie - Test -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class DefaultSource extends RelationProvider
outputMode)
}

override def shortName(): String = "hudi"
override def shortName(): String = "hudi_v1"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leesf i suppose this refactoring PR not meant to include this change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not change the format, it would conflict with hudi-spark2/hudi-spark3.1.x/hudi-spark3 module format.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it conflict? Given we are extending DefaultSource and overriding shortName()?

Copy link
Copy Markdown
Contributor Author

@leesf leesf Jan 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is because in hudi-spark-bundle module. I used <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</resource> </transformer> to append the formats(hudi_v1 and hudi) in DataSourceRegister file, so it will conflict if not change the format. As to the PR itself, we do not need to change the format to hudi_v1 and not use AppendingTransformer. But when implementing V2 codepath, I find it difficult to handle the incremental bootstrap table(

) as it will pass schema to Spark and difficult to handle in v2 codepath, after finding a good way to handle it, we would definitely delete the hudi_v1 format here.


private def getBaseFileOnlyView(useHoodieFileIndex: Boolean,
sqlContext: SQLContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory}
import org.apache.spark.sql.hudi.DataSkippingUtils.createColumnStatsIndexFilterExpr
import org.apache.spark.sql.hudi.HoodieSqlUtils
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{AnalysisException, Column, SparkSession}
Expand Down Expand Up @@ -87,7 +87,7 @@ case class HoodieFileIndex(
private val tableType = metaClient.getTableType

private val specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
.map(HoodieSqlUtils.formatQueryInstant)
.map(HoodieSqlCommonUtils.formatQueryInstant)

/**
* Get all completeCommits.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}

private def logFileIterator(split: HoodieMergeOnReadFileSplit,
config: Configuration): Iterator[InternalRow] =
config: Configuration): Iterator[InternalRow] =
new Iterator[InternalRow] with Closeable {
private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
Expand Down Expand Up @@ -168,8 +168,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}

private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
baseFileIterator: Iterator[InternalRow],
config: Configuration): Iterator[InternalRow] =
baseFileIterator: Iterator[InternalRow],
config: Configuration): Iterator[InternalRow] =
new Iterator[InternalRow] with Closeable {
private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
Expand Down Expand Up @@ -225,8 +225,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}

private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit,
baseFileIterator: Iterator[InternalRow],
config: Configuration): Iterator[InternalRow] =
baseFileIterator: Iterator[InternalRow],
config: Configuration): Iterator[InternalRow] =
new Iterator[InternalRow] with Closeable {
private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
Expand Down Expand Up @@ -350,23 +350,23 @@ private object HoodieMergeOnReadRDD {
def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: Configuration): HoodieMergedLogRecordScanner = {
val fs = FSUtils.getFs(split.tablePath, config)
HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(split.tablePath)
.withLogFilePaths(split.logPaths.get.asJava)
.withReaderSchema(logSchema)
.withLatestInstantTime(split.latestCommit)
.withReadBlocksLazily(
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
.getOrElse(false))
.withReverseReader(false)
.withBufferSize(
config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
.withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
.withSpillableMapBasePath(
config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
.build()
.withFileSystem(fs)
.withBasePath(split.tablePath)
.withLogFilePaths(split.logPaths.get.asJava)
.withReaderSchema(logSchema)
.withLatestInstantTime(split.latestCommit)
.withReadBlocksLazily(
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
.getOrElse(false))
.withReverseReader(false)
.withBufferSize(
config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
.withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
.withSpillableMapBasePath(
config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ object HoodieSparkSqlWriter {
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path, tblName,
mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]

if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
asyncCompactionTriggerFn.get.apply(client)
Expand Down Expand Up @@ -489,9 +489,9 @@ object HoodieSparkSqlWriter {
val syncHiveSuccess =
if (hiveSyncEnabled || metaSyncEnabled) {
metaSync(sqlContext.sparkSession, hoodieConfig, basePath, df.schema)
} else {
true
}
} else {
true
}
(syncHiveSuccess, common.util.Option.ofNullable(instantTime))
}

Expand Down Expand Up @@ -621,7 +621,7 @@ object HoodieSparkSqlWriter {
tableConfig: HoodieTableConfig,
jsc: JavaSparkContext,
tableInstantInfo: TableInstantInfo
): (Boolean, common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = {
): (Boolean, common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = {
if(writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).isEmpty()) {
log.info("Proceeding to commit the write.")
val metaMap = parameters.filter(kv =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hudi.HoodieSqlUtils
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -99,7 +99,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
override def needConversion: Boolean = false

private val specifiedQueryInstant = optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
.map(HoodieSqlUtils.formatQueryInstant)
.map(HoodieSqlCommonUtils.formatQueryInstant)

override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils}
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.types.{StructField, StructType}

import java.util.{Locale, Properties}
Expand Down Expand Up @@ -62,7 +62,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
* hoodie table's location.
* if create managed hoodie table, use `catalog.defaultTablePath`.
*/
val tableLocation: String = HoodieSqlUtils.getTableLocation(table, spark)
val tableLocation: String = HoodieSqlCommonUtils.getTableLocation(table, spark)

/**
* A flag to whether the hoodie table exists.
Expand Down Expand Up @@ -124,7 +124,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
/**
* The schema without hoodie meta fields
*/
lazy val tableSchemaWithoutMetaFields: StructType = HoodieSqlUtils.removeMetaFields(tableSchema)
lazy val tableSchemaWithoutMetaFields: StructType = HoodieSqlCommonUtils.removeMetaFields(tableSchema)

/**
* The schema of data fields
Expand All @@ -136,7 +136,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
/**
* The schema of data fields not including hoodie meta fields
*/
lazy val dataSchemaWithoutMetaFields: StructType = HoodieSqlUtils.removeMetaFields(dataSchema)
lazy val dataSchemaWithoutMetaFields: StructType = HoodieSqlCommonUtils.removeMetaFields(dataSchema)

/**
* The schema of partition fields
Expand All @@ -146,7 +146,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
/**
* All the partition paths
*/
def getAllPartitionPaths: Seq[String] = HoodieSqlUtils.getAllPartitionPaths(spark, table)
def getAllPartitionPaths: Seq[String] = HoodieSqlCommonUtils.getAllPartitionPaths(spark, table)

/**
* Check if table is a partitioned table
Expand Down Expand Up @@ -213,7 +213,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten

case (CatalogTableType.MANAGED, true) =>
throw new AnalysisException(s"Can not create the managed table('$catalogTableName')" +
s". The associated location('$tableLocation') already exists.")
s". The associated location('$tableLocation') already exists.")
}
HoodieOptionConfig.validateTable(spark, finalSchema,
HoodieOptionConfig.mappingTableConfigToSqlOption(tableConfigs))
Expand All @@ -234,17 +234,17 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
val allPartitionPaths = getAllPartitionPaths
if (originTableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)) {
extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) =
originTableConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)
originTableConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)
} else {
extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) =
String.valueOf(isHiveStyledPartitioning(allPartitionPaths, table))
String.valueOf(isHiveStyledPartitioning(allPartitionPaths, table))
}
if (originTableConfig.contains(HoodieTableConfig.URL_ENCODE_PARTITIONING.key)) {
extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) =
originTableConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key)
originTableConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key)
} else {
extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) =
String.valueOf(isUrlEncodeEnabled(allPartitionPaths, table))
String.valueOf(isUrlEncodeEnabled(allPartitionPaths, table))
}
} else {
extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = "true"
Expand All @@ -253,8 +253,8 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten

if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) {
extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) =
HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(
originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(
originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key))
} else {
extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = classOf[ComplexKeyGenerator].getCanonicalName
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.hudi

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType}
import org.apache.hudi.common.model.DefaultHoodieRecordPayload
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.ValidationUtils
import org.apache.spark.sql.SparkSession
Expand Down
Loading