Skip to content

Conversation

@southernriver
Copy link

@southernriver southernriver commented Aug 19, 2022

Change Logs

NA

Impact

NA

Risk level (write none, low medium or high below)

medium

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@southernriver southernriver changed the title Spark3.2: Support DataSourceV2 Read [HUDI-4449]: Support DataSourceV2 Read for Spark3.2 Aug 19, 2022
@southernriver southernriver changed the title [HUDI-4449]: Support DataSourceV2 Read for Spark3.2 [HUDI-4449] Support DataSourceV2 Read for Spark3.2 Aug 19, 2022
@XuQianJin-Stars XuQianJin-Stars self-assigned this Aug 19, 2022
}

override protected def test(testName: String, testTags: Tag*)(testFun: => Any /* Assertion */)(implicit pos: source.Position): Unit = {
override protected def test(testName: String, testTags: Tag*)(testFun: => Any /* Assertion */)(implicit pos: org.scalactic.source.Position): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I have introduced org.apache.spark.sql.hudi.source package dircetory for HoodieBatchScan, just avoid conflict.

test("Test Query None Partitioned Table") {
withTempDir { tmp =>
val tableName = generateTableName
spark.conf.set("hoodie.datasource.v2.read.enable", "true")
Copy link
Contributor

@leesf leesf Aug 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we support for spark.sql("set hoodie.datasource.v2.read.enable = true")?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

going to support this.

Seq(2, "a2", 12.0, 1000)
)

assertThrows[HoodieDuplicateKeyException] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is unnecessary?

}

override def pruneColumns(structType: StructType): Unit = {
// TODO support prune columns.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

able to support columns pruning in this PR?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will open another pr to support this.

}

override def createReaderFactory(): PartitionReaderFactory = {
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not suport ORC?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my general test, I found upstream SparkSQL does not currently support orc format, may need another pr to support this.

options: Map[String, String],
@transient hadoopConf: Configuration) extends Batch with Scan {

override def planInputPartitions(): Array[InputPartition] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the code below is copied from spark codebase?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For cow parquet datasource, we can reuse part of this code.

@leesf leesf self-assigned this Aug 20, 2022
@leesf
Copy link
Contributor

leesf commented Aug 20, 2022

does this PR only aimed to support spark 3.2.0 not spark 3.2.1/3.2.2 or 3.3.0? or with following-up tasks to support other versions?

@southernriver
Copy link
Author

@leesf thanks for your time to review this, I found that there are serveral problems with this pr, since upstream has revert v2 datasource,

class Spark3DefaultSource extends DefaultSource with DataSourceRegister /* with TableProvider */ {
, I'll update soon

@yihua yihua added the engine:spark Spark integration label Aug 23, 2022
@yihua
Copy link
Contributor

yihua commented Aug 23, 2022

@alexeykudinkin FYI

* Convert Filter to Catalyst Expression. If convert success return an Non-Empty
* Option[Expression],or else return None.
*/
def convertToCatalystExpression(filter: Filter, tableSchema: StructType): Option[Expression] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure we're not duplicating utilities we already have (there's HoodieCatalystExpressionUtils doing exactly the same thing)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

/**
* Create Like expression.
*/
def createLike(left: Expression, right: Expression): Expression
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we need this one for?

* NOTE: This method is specific to Spark 3.2.0
*/
private def createParquetFilters(args: Any*): ParquetFilters = {
def createParquetFilters(args: Any*): ParquetFilters = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we down-grading these?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For SparkBatch,I have reused some functions from outside object Spark32PlusHoodieParquetFileFormat.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, i don't think i understand. Can you please elaborate? Where these are used?

override def build(): Scan = {
val relation = new DefaultSource().createRelation(new SQLContext(spark), options)
relation match {
case HadoopFsRelation(location, partitionSchema, dataSchema, _, _, options) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whole idea of V2 integration is bypassing the V1 concepts that such as HadoopFsRelation which unfortunately are very limiting for Hudi features.

What's our plan here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, it is still v1 scan here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It‘s really a huge work to reimplement DataSourceV2 APIs for all hudi tables,My initial idea is to use the lower level V2 read interface to support features like SupportsPushDownFilters and SupportsRuntimeFiltering etc first. And I will continue to follow up on the implementation of new HudiTableScan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, it's no small effort, and we should definitely think about how we can approach it incrementally.
But we also need to be mindful that we can't keep things in transitory state for long -- we can't have the integration spread b/w V1 and V2 and we have to commit and migrate fully either way, w/ a clear plan for the deprecation of the one.

I'd suggest for feature of this scale we should actually write a proper RFC to cover all of these concerns. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW this will be greatly helped by RFC-64 we're currently working on, so we'd sync and sequence our efforts to make sure there's minimal to no duplication in what we do

// TODO: if you move this into the closure it reverts to the default values.
// If true, enable using the custom RecordReader for parquet. This only works for
// a subset of the types (no complex types).
val resultSchema: StructType = StructType(partitionSchema.fields ++ requiredSchema.fields)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to clone whole Hudi's custom ParquetFileFormat impl here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some minor change to construct createReaderFactory, we may extend suitable action to support more accurate task planning

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, i still i'm not sure i understand what we're copying this for. Can you please elaborate what exactly you're trying to modify here?

We should copy code from Spark only in exceptional circumstances when there's just no other way around it. Otherwise we should avoid doing that at all costs.


spark.sql(s"set hoodie.datasource.v2.read.enable=true")

val query =String.format("SELECT f.id, f.price, f.ts, f.dt, f.name FROM %s f JOIN dim d ON f.name = d.name AND d.id = 1 ORDER BY id", tableName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

= String

@southernriver
Copy link
Author

@leesf @alexeykudinkin @XuQianJin-Stars Thanks for your time. I've been busy with other things these days, sorry for the late reply.

@hudi-bot
Copy link
Collaborator

hudi-bot commented Nov 5, 2022

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

* NOTE: This method is specific to Spark 3.2.0
*/
private def createParquetFilters(args: Any*): ParquetFilters = {
def createParquetFilters(args: Any*): ParquetFilters = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, i don't think i understand. Can you please elaborate? Where these are used?

new HoodieBatchScanBuilder(spark, hoodieCatalogTable, scanOptions)
}

private def buildHoodieScanConfig(caseInsensitiveStringMap: CaseInsensitiveStringMap,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can inline this method (it's trivial)

// TODO: if you move this into the closure it reverts to the default values.
// If true, enable using the custom RecordReader for parquet. This only works for
// a subset of the types (no complex types).
val resultSchema: StructType = StructType(partitionSchema.fields ++ requiredSchema.fields)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, i still i'm not sure i understand what we're copying this for. Can you please elaborate what exactly you're trying to modify here?

We should copy code from Spark only in exceptional circumstances when there's just no other way around it. Otherwise we should avoid doing that at all costs.

override def build(): Scan = {
val relation = new DefaultSource().createRelation(new SQLContext(spark), options)
relation match {
case HadoopFsRelation(location, partitionSchema, dataSchema, _, _, options) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, it's no small effort, and we should definitely think about how we can approach it incrementally.
But we also need to be mindful that we can't keep things in transitory state for long -- we can't have the integration spread b/w V1 and V2 and we have to commit and migrate fully either way, w/ a clear plan for the deprecation of the one.

I'd suggest for feature of this scale we should actually write a proper RFC to cover all of these concerns. What do you think?

override def build(): Scan = {
val relation = new DefaultSource().createRelation(new SQLContext(spark), options)
relation match {
case HadoopFsRelation(location, partitionSchema, dataSchema, _, _, options) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW this will be greatly helped by RFC-64 we're currently working on, so we'd sync and sequence our efforts to make sure there's minimal to no duplication in what we do

@alexeykudinkin alexeykudinkin changed the title [HUDI-4449] Support DataSourceV2 Read for Spark3.2 [WIP][HUDI-4449] Support DataSourceV2 Read for Spark3.2 Nov 10, 2022
@vinothchandar vinothchandar self-assigned this Feb 16, 2023
@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Feb 26, 2024
Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing this stale PR. We're revisiting Spark Datasource V2 support in a new effort. cc @geserdugarov

@yihua yihua closed this Dec 13, 2025
@github-project-automation github-project-automation bot moved this from 🆕 New to ✅ Done in Hudi PR Support Dec 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

big-needle-movers engine:spark Spark integration priority:critical Production degraded; pipelines stalled size:L PR with lines of changes in (300, 1000]

Projects

Status: 🚧 Needs Repro
Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

7 participants