Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,22 @@ jobs:
include:
- scala: "scala-2.11"
spark: "spark2"
skipModules: ""
- scala: "scala-2.11"
spark: "spark2,spark-shade-unbundle-avro"
skipModules: ""
- scala: "scala-2.12"
Copy link
Contributor Author

@leesf leesf Dec 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xushiyan I find it very hard to make it compatible between spark 3.2.0 and spark 3.0.x/spark3.1.x(there is no V1Write for spark 3.0.x and 3.1.x ) after we upgrade spark version to spark 3.2.0, so I commented out the workflow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leesf We introduce build profiles spark3.0.x and spark3.1.x mostly due to spark's own incompatibilities. Here I think we can make some rules: to enable v2 writer, users have to make sure they're on spark 3.2+. Sounds good? In future, we may gradually drop support for old spark versions if the old spark code deviates too far from the latest one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good.

spark: "spark3,spark3.0.x"
skipModules: "!hudi-spark-datasource/hudi-spark3"
- scala: "scala-2.12"
spark: "spark3,spark3.0.x,spark-shade-unbundle-avro"
skipModules: "!hudi-spark-datasource/hudi-spark3"
- scala: "scala-2.12"
spark: "spark3,spark3.1.x"
skipModules: "!hudi-spark-datasource/hudi-spark3"
- scala: "scala-2.12"
spark: "spark3,spark3.1.x,spark-shade-unbundle-avro"
skipModules: "!hudi-spark-datasource/hudi-spark3"
- scala: "scala-2.12"
spark: "spark3"
- scala: "scala-2.12"
Expand All @@ -44,4 +50,5 @@ jobs:
env:
SCALA_PROFILE: ${{ matrix.scala }}
SPARK_PROFILE: ${{ matrix.spark }}
run: mvn install -P "$SCALA_PROFILE,$SPARK_PROFILE" -DskipTests=true -Dmaven.javadoc.skip=true -B -V
SKIP_MODULES: ${{ matrix.skipModules }}
run: mvn install -P "$SCALA_PROFILE,$SPARK_PROFILE" -pl "$SKIP_MODULES" -DskipTests=true -Dmaven.javadoc.skip=true -B -V
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ package org.apache.spark.sql.hudi

import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil
import org.apache.spark.sql.execution.datasources.{LogicalRelation, SparkParsePartitionUtil}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{Row, SparkSession}

import java.util.Locale

/**
* An interface to adapter the difference between spark2 and spark3
* in some spark related class.
Expand All @@ -44,12 +47,12 @@ trait SparkAdapter extends Serializable {
/**
* Convert a AliasIdentifier to TableIdentifier.
*/
def toTableIdentify(aliasId: AliasIdentifier): TableIdentifier
def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier

/**
* Convert a UnresolvedRelation to TableIdentifier.
*/
def toTableIdentify(relation: UnresolvedRelation): TableIdentifier
def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier

/**
* Create Join logical plan.
Expand Down Expand Up @@ -92,4 +95,31 @@ trait SparkAdapter extends Serializable {
* ParserInterface#parseMultipartIdentifier is supported since spark3, for spark2 this should not be called.
*/
def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String]

def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any difference with hoodieSqlCommonUtils.isHoodieTable? I see sometimes we use adapter.isHoodieTable, sometimes use hoodieSqlCommonUtils.isHoodieTable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any difference with hoodieSqlCommonUtils.isHoodieTable? I see sometimes we use adapter.isHoodieTable, sometimes use hoodieSqlCommonUtils.isHoodieTable

in fact hoodieSqlCommonUtils.isHoodieTable method is used in v1 codebase to judge if a table is a hoodie table in v1 codebase , but adapter.isHoodieTable method is to judge if a table is a hoodie table in v2 codebase, change the name would be better to understand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your reply, and please correct me if I'm wrong. Can we just move the method implementation out from SparkAdapter to Spark2Adapter, Spark2Adapter can judge if a table is a hoodie table in v1 codebase, while Spark3Adapter can judge it by v2 codebase, by this, I think the method hoodieSqlCommonUtils.isHoodieTable can be simply removed?

Looking forward this PR can be merged as soon as possible, this is an excellent work as we can support many other features with DSV2 based on this. :)

tripAlias(table) match {
case LogicalRelation(_, _, Some(tbl), _) => isHoodieTable(tbl)
case relation: UnresolvedRelation =>
isHoodieTable(toTableIdentifier(relation), spark)
case _=> false
}
}

def tripAlias(plan: LogicalPlan): LogicalPlan = {
plan match {
case SubqueryAlias(_, relation: LogicalPlan) =>
tripAlias(relation)
case other =>
other
}
}

def isHoodieTable(table: CatalogTable): Boolean = {
table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi"
}

def isHoodieTable(tableId: TableIdentifier, spark: SparkSession): Boolean = {
val table = spark.sessionState.catalog.getTableMetadata(tableId)
isHoodieTable(table)
}
}
42 changes: 42 additions & 0 deletions hudi-spark-datasource/hudi-spark-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,51 @@
<version>${project.version}</version>
</dependency>

<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>

<!-- Spark (Packages) -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>

<!-- Hoodie - Test -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class DefaultSource extends RelationProvider
outputMode)
}

override def shortName(): String = "hudi"
override def shortName(): String = "hudi_v1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would cause every job out there to be upgraded? Not sure if we can afford to do this. Also would like to clearly understand if the new v2 implementation will support ALL the existing functionality or be a drop-in replacement for the current v1 implementation?

I think its crucial to get aligned on this before we proceed further.


private def getBaseFileOnlyView(useHoodieFileIndex: Boolean,
sqlContext: SQLContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

package org.apache.hudi

import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.{Partition, TaskContext}

class HoodieBootstrapRDD(@transient spark: SparkSession,
dataReadFunction: PartitionedFile => Iterator[Any],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import scala.collection.JavaConverters._
* else read data from tablePath using HoodiFileIndex.
* @param metaClient Hoodie table meta client
* @param optParams DataSource options passed by the user
*/
*/
class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
val userSchema: StructType,
val globPaths: Option[Seq[Path]],
Expand Down Expand Up @@ -116,7 +116,7 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
filters = if (requiredSkeletonSchema.isEmpty) filters else Seq() ,
options = Map.empty,
hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
)
)

val skeletonReadFunction = new ParquetFileFormat()
.buildReaderWithPartitionValues(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory}
import org.apache.spark.sql.hudi.DataSkippingUtils.createColumnStatsIndexFilterExpr
import org.apache.spark.sql.hudi.HoodieSqlUtils
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{AnalysisException, Column, SparkSession}
Expand Down Expand Up @@ -87,11 +87,11 @@ case class HoodieFileIndex(
private val tableType = metaClient.getTableType

private val specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
.map(HoodieSqlUtils.formatQueryInstant)
.map(HoodieSqlCommonUtils.formatQueryInstant)

/**
* Get all completeCommits.
*/
*/
lazy val completedCommits = metaClient.getCommitsTimeline
.filterCompletedInstants().getInstants.iterator().toList.map(_.getTimestamp)

Expand Down Expand Up @@ -403,7 +403,7 @@ case class HoodieFileIndex(
cachedAllInputFileSlices = partitionFiles.map(p => {
val latestSlices = if (latestInstant.isPresent) {
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, queryInstant.get)
.iterator().asScala.toSeq
.iterator().asScala.toSeq
} else {
Seq()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}

private def logFileIterator(split: HoodieMergeOnReadFileSplit,
config: Configuration): Iterator[InternalRow] =
config: Configuration): Iterator[InternalRow] =
new Iterator[InternalRow] with Closeable {
private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
Expand Down Expand Up @@ -168,8 +168,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}

private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
baseFileIterator: Iterator[InternalRow],
config: Configuration): Iterator[InternalRow] =
baseFileIterator: Iterator[InternalRow],
config: Configuration): Iterator[InternalRow] =
new Iterator[InternalRow] with Closeable {
private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
Expand Down Expand Up @@ -225,8 +225,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}

private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit,
baseFileIterator: Iterator[InternalRow],
config: Configuration): Iterator[InternalRow] =
baseFileIterator: Iterator[InternalRow],
config: Configuration): Iterator[InternalRow] =
new Iterator[InternalRow] with Closeable {
private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
Expand Down Expand Up @@ -350,23 +350,23 @@ private object HoodieMergeOnReadRDD {
def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: Configuration): HoodieMergedLogRecordScanner = {
val fs = FSUtils.getFs(split.tablePath, config)
HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(split.tablePath)
.withLogFilePaths(split.logPaths.get.asJava)
.withReaderSchema(logSchema)
.withLatestInstantTime(split.latestCommit)
.withReadBlocksLazily(
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
.getOrElse(false))
.withReverseReader(false)
.withBufferSize(
config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
.withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
.withSpillableMapBasePath(
config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
.build()
.withFileSystem(fs)
.withBasePath(split.tablePath)
.withLogFilePaths(split.logPaths.get.asJava)
.withReaderSchema(logSchema)
.withLatestInstantTime(split.latestCommit)
.withReadBlocksLazily(
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
.getOrElse(false))
.withReverseReader(false)
.withBufferSize(
config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
.withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
.withSpillableMapBasePath(
config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ object HoodieSparkSqlWriter {
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path, tblName,
mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]

if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
asyncCompactionTriggerFn.get.apply(client)
Expand Down Expand Up @@ -487,9 +487,9 @@ object HoodieSparkSqlWriter {
val syncHiveSuccess =
if (hiveSyncEnabled || metaSyncEnabled) {
metaSync(sqlContext.sparkSession, hoodieConfig, basePath, df.schema)
} else {
true
}
} else {
true
}
(syncHiveSuccess, common.util.Option.ofNullable(instantTime))
}

Expand Down Expand Up @@ -619,7 +619,7 @@ object HoodieSparkSqlWriter {
tableConfig: HoodieTableConfig,
jsc: JavaSparkContext,
tableInstantInfo: TableInstantInfo
): (Boolean, common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = {
): (Boolean, common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = {
if(writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).isEmpty()) {
log.info("Proceeding to commit the write.")
val metaMap = parameters.filter(kv =>
Expand Down Expand Up @@ -715,7 +715,7 @@ object HoodieSparkSqlWriter {
}

private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String],
tableConfig: HoodieTableConfig): (Map[String, String], HoodieConfig) = {
tableConfig: HoodieTableConfig): (Map[String, String], HoodieConfig) = {
val translatedOptions = DataSourceWriteOptions.translateSqlOptions(optParams)
val mergedParams = mutable.Map.empty ++ HoodieWriterUtils.parametersWithWriteDefaults(translatedOptions)
if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ class HoodieStreamingSink(sqlContext: SQLContext,
case Success((false, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
log.error(s"Micro batch id=$batchId ended up with errors"
+ (commitOps.isPresent match {
case true => s" for commit=${commitOps.get()}"
case _ => s""
}))
case true => s" for commit=${commitOps.get()}"
case _ => s""
}))
if (ignoreFailedBatch) {
log.info(s"Ignore the errors and move on streaming as per " +
s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key} configuration")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object HoodieWriterUtils {
*
* @param parameters
* @return
*/
*/
def parametersWithWriteDefaults(parameters: Map[String, String]): Map[String, String] = {
val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala
val props = new Properties()
Expand Down Expand Up @@ -123,7 +123,7 @@ object HoodieWriterUtils {
* Detects conflicts between new parameters and existing table configurations
*/
def validateTableConfig(spark: SparkSession, params: Map[String, String],
tableConfig: HoodieConfig): Unit = {
tableConfig: HoodieConfig): Unit = {
val resolver = spark.sessionState.conf.resolver
val diffConfigs = StringBuilder.newBuilder
params.foreach { case (key, value) =>
Expand All @@ -137,21 +137,21 @@ object HoodieWriterUtils {
val datasourceRecordKey = params.getOrElse(RECORDKEY_FIELD.key(), null)
val tableConfigRecordKey = tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)
if (null != datasourceRecordKey && null != tableConfigRecordKey
&& datasourceRecordKey != tableConfigRecordKey) {
&& datasourceRecordKey != tableConfigRecordKey) {
diffConfigs.append(s"RecordKey:\t$datasourceRecordKey\t$tableConfigRecordKey\n")
}

val datasourcePreCombineKey = params.getOrElse(PRECOMBINE_FIELD.key(), null)
val tableConfigPreCombineKey = tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD)
if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey
&& datasourcePreCombineKey != tableConfigPreCombineKey) {
&& datasourcePreCombineKey != tableConfigPreCombineKey) {
diffConfigs.append(s"PreCombineKey:\t$datasourcePreCombineKey\t$tableConfigPreCombineKey\n")
}

val datasourceKeyGen = getOriginKeyGenerator(params)
val tableConfigKeyGen = tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
if (null != datasourceKeyGen && null != tableConfigKeyGen
&& datasourceKeyGen != tableConfigKeyGen) {
&& datasourceKeyGen != tableConfigKeyGen) {
diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n")
}
}
Expand Down
Loading