diff --git a/rfc/README.md b/rfc/README.md index ca9e8782accc3..a9587d1d79cf3 100644 --- a/rfc/README.md +++ b/rfc/README.md @@ -61,7 +61,7 @@ The list of all RFCs can be found here. | 35 | [Make Flink MOR table writing streaming friendly](https://cwiki.apache.org/confluence/display/HUDI/RFC-35%3A+Make+Flink+MOR+table+writing+streaming+friendly) | `UNDER REVIEW` | | 36 | [HUDI Metastore Server](https://cwiki.apache.org/confluence/display/HUDI/%5BWIP%5D+RFC-36%3A+HUDI+Metastore+Server) | `UNDER REVIEW` | | 37 | [Hudi Metadata based Bloom Index](rfc-37/rfc-37.md) | `IN PROGRESS` | -| 38 | [Spark Datasource V2 Integration] | `UNDER REVIEW` | +| 38 | [Spark Datasource V2 Integration](./rfc-38/rfc-38.md) | `IN PROGRESS` | | 39 | [Incremental source for Debezium](./rfc-39/rfc-39.md) | `IN PROGRESS` | | 40 | [Hudi Connector for Trino](./rfc-40/rfc-40.md) | `IN PROGRESS` | | 41 | [Hudi Snowflake Integration] | `UNDER REVIEW` | diff --git a/rfc/rfc-38/1.png b/rfc/rfc-38/1.png new file mode 100644 index 0000000000000..44238888c9b81 Binary files /dev/null and b/rfc/rfc-38/1.png differ diff --git a/rfc/rfc-38/rfc-38.md b/rfc/rfc-38/rfc-38.md new file mode 100644 index 0000000000000..80cb5d7f75787 --- /dev/null +++ b/rfc/rfc-38/rfc-38.md @@ -0,0 +1,283 @@ + + +# RFC-38: Spark Datasource V2 Integration + +## Proposers + +- @leesf + +## Approvers +- @vinothchandar +- @xishiyan +- @YannByron + +## Status + +JIRA: https://issues.apache.org/jira/browse/HUDI-1297 + +## Abstract + +Today, Hudi still uses V1 api and relies heavily on RDD api to index, repartition and so on given the flexibility of RDD api, +it works fine in v1 api, using datasource V1 api, Hudi provides complete read/write, update, +and small file auto handling capabilities, all things work well. +However, with the continuous development and evolving of datasource V2 api, +the datasource v2 api has stabilized.Taking into account the datasource v1 api is too old and the spark community +no longer spends more resources to maintain v1 api, so consider migrating to DataSource V2 api, +and use more pushdown filters provided by V2 api and +integrate with [RFC-27](https://cwiki.apache.org/confluence/display/HUDI/RFC-27+Data+skipping+index+to+improve+query+performance) +to provide more powerful query capabilities. Also we could leverage it after V2 api get evolved or optimized again. + + +## Background + +The current Hudi read and write paths use DataSource V1 api, and the implementation class is `DefaultSource` + +```scala +/** +* Hoodie Spark Datasource, for reading and writing hoodie tables +* +*/ +class DefaultSource extends RelationProvider +with SchemaRelationProvider +with CreatableRelationProvider +with DataSourceRegister +with StreamSinkProvider +with StreamSourceProvider +with Serializable { +... +} +``` + +As for writing(batch write), the following method will be called. +```scala +override def createRelation(sqlContext: SQLContext, +mode: SaveMode, +optParams: Map[String, String], +df: DataFrame): BaseRelation = { +val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams) +val translatedOptions = DataSourceWriteOptions.translateSqlOptions(parameters) +val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*) + + if (translatedOptions(OPERATION.key).equals(BOOTSTRAP_OPERATION_OPT_VAL)) { + HoodieSparkSqlWriter.bootstrap(sqlContext, mode, translatedOptions, dfWithoutMetaCols) + } else { + HoodieSparkSqlWriter.write(sqlContext, mode, translatedOptions, dfWithoutMetaCols) + } + new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema) +} +``` + +Regarding querying, the following method will return a `BaseRelation`(if not provide schema) + +```scala +override def createRelation(sqlContext: SQLContext, +parameters: Map[String, String]): BaseRelation = { +createRelation(sqlContext, parameters, null) +} +``` + +For streaming writing and reading, DefaultSource#createSink and DefaultSource#createSink are called respectively. +In 0.9.0 version , the bulk_insert row mode was introduced to speed up bulk_insert, which implements the `SupportsWrite` v2 api and uses `HoodieDataSourceInternalTable` for writing, +right now only bulk_insert operation is supported. + +## Implementation + +Spark provides a complete V2 api, such as `CatalogPlugin`, `SupportsWrite`, `SupportsRead`, and various pushdown filters, +such as `SupportsPushDownFilters`, `SupportsPushDownAggregates`, `SupportsPushDownRequiredColumns` + +We would define the key abstraction of call `HoodieInternalV2Table`, which inherits the `Table`, `SupportsWrite`, `SupportsRead` +interfaces to provide writing and reading capabilities. + +### Writing Path + +Hudi relies heavily on some RDD APIs on write path, such as the indexing to determine where the record is update or insert, +this refactoring work is relatively large or impossible to migrate to v2 write path under datasource v2 api. +So we can fallback to write to v1 since Spark provides the `V1Write` interface to bridge the V1 and V2 api in 3.2.0 + +The writing path code snippet is below + +```scala +class HoodieInternalV2Table extends Table with SupportsWrite with V2TableWithV1Fallback { + + override def name(): String = { + // + } + + override def schema(): StructType = { + // get hudi table schema + } + + override def partitioning(): Array[Transform] = { + // get partitioning of hudi table. + } + + override def capabilities(): Set[TableCapability] = { + // Set(BATCH_WRITE, BATCH_READ,TRUNCATE,...) + } + + override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { + // HoodieV1WriteBuilder + } +} +``` + +The definition of `HoodieV1WriteBuilder` shows below. + +```scala +private class HoodieV1WriteBuilder(writeOptions: CaseInsensitiveStringMap, + hoodieCatalogTable: HoodieCatalogTable, + spark: SparkSession) + extends SupportsTruncate with SupportsOverwrite with ProvidesHoodieConfig { + + override def truncate(): HoodieV1WriteBuilder = { + this + } + + override def overwrite(filters: Array[Filter]): WriteBuilder = { + this + } + + override def build(): V1Write = new V1Write { + override def toInsertableRelation: InsertableRelation = { + //IntertableRelation + } + } +} +``` + +### Querying path + +For v2 querying, Spark provides various pushdown filters, such as `SupportsPushDownFilters`, `SupportsPushDownAggregates`, +`SupportsPushDownRequiredColumns`, `SupportsRuntimeFiltering` and so on, which is more clear and flexible than v1 interface. +Also, v2 interface provides the capability to read the columnar format file such as parquet and orc format file, one more thing +is that v2 interface provides the capability to split and define the number of partitions for users, which provides the possibility +to split more accurate splits and accelerate query speed on Hudi side. +However, for querying, in first stage we also fallback to v1 read path, which means we need convert +`DataSourceV2Relation` to `DefaultSource` in analysis stage to make the changes well controlled. +The code snippet shows below, the `HoodieSpark3Analysis` should be injected if spark version is equal or larger than 3.2.0. + +```scala + +case class HoodieSpark3Analysis(sparkSession: SparkSession) extends Rule[LogicalPlan] + with SparkAdapterSupport with ProvidesHoodieConfig { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown { + case dsv2@DataSourceV2Relation(d: HoodieInternalV2Table, _, _, _, _) => + val output = dsv2.output + val catalogTable = if (d.catalogTable.isDefined) { + Some(d.v1Table) + } else { + None + } + val relation = new DefaultSource().createRelation(new SQLContext(sparkSession), + buildHoodieConfig(d.hoodieCatalogTable)) + LogicalRelation(relation, output, catalogTable, isStreaming = false) + } +} + +``` +In the second stage, we would make use of v2 reading interface and define `HoodieBatchScanBuilder` to provide querying +capability. The workflow of querying process is shown in below figure. +`PartitionReaderFactory` located in the Driver and the `PartitionReader` located in the Executor. + +![](./1.png) + +The querying path code sample is below + +```scala +class HoodieBatchScanBuilder extends ScanBuilder with SupportsPushDownFilters with SupportsPushDownRequiredColumns { +override def build(): Scan = { +// HoodieScan +} + +override def pushFilters(filters: Array[Filter]): Array[Filter] = { +// record the filters +} + +override def pushedFilters(): Array[Filter] = { +// pushed filters +} + +override def pruneColumns(requiredSchema: StructType): Unit = { +// record the pruned columns +} +} +``` + +### Table Meta Management + +Implementing the `CatalogPlugin` interface to manage the metadata of the Hudi table and +define the core abstraction called `HoodieCatalog`, +and the code sample is below. + +```scala +class HoodieCatalog extends DelegatingCatalogExtension +with StagingTableCatalog { + override def loadTable(ident: Identifier): Table = { + // HoodieDatasouceTable + } + + override def createTable(ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = { + // create hudi table + } + + override def dropTable(Identifier ident): Boolean = { + // drop hudi table + } + + override def alterTable(Identifier ident, TableChange... changes): Table = { + // check schema compability + // HoodieDatasouceTable + } + + override def stageReplace(ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): StagedTable = { + // StagedHoodieTable + } + + override def stageCreateOrReplace(ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): StagedTable = { + // StagedHoodieTable + } +} +``` + +Users would set the spark session config spark.sql.catalog.spark_catalog to org.apache.hudi.catalog.HoodieCatalog to load the HoodieCatalogto manage hudi tables. + +## Rollout/Adoption Plan + +- What impact (if any) will there be on existing users? + +there is no impact on existing users, but users would specify the new catalog to manager hudi tables or other tables. + +- If we are changing behavior how will we phase out the older behavior? + +we should keep compatibility of v1 version and make it transparent for users to migrate to v2 api. + +## Test Plan + +[ ] PoC for catalog plugin +[ ] PoC for writing path with UTs +[ ] Poc for querying path with UTs +[ ] E2E tests +[ ] Benchmark for v1 and v2 writing and querying \ No newline at end of file