Skip to content

Conversation

@yui2010
Copy link
Contributor

@yui2010 yui2010 commented Dec 25, 2020

Tips

What is the purpose of the pull request

implement partition pruning for skip unneeded data

Brief change log

base on current datasource implement of MOR snapshot query
reuse PartitioningAwareFileIndex#listFiles

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added HoodieClientWriteTest to verify the change.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@codecov-io
Copy link

codecov-io commented Dec 25, 2020

Codecov Report

Merging #2378 (295290e) into master (97864a4) will increase coverage by 18.23%.
The diff coverage is n/a.

Impacted file tree graph

@@              Coverage Diff              @@
##             master    #2378       +/-   ##
=============================================
+ Coverage     51.19%   69.43%   +18.23%     
+ Complexity     3226      363     -2863     
=============================================
  Files           438       53      -385     
  Lines         20089     1963    -18126     
  Branches       2068      235     -1833     
=============================================
- Hits          10285     1363     -8922     
+ Misses         8958      466     -8492     
+ Partials        846      134      -712     
Flag Coverage Δ Complexity Δ
hudicli ? ?
hudiclient ? ?
hudicommon ? ?
hudiflink ? ?
hudihadoopmr ? ?
hudisparkdatasource ? ?
hudisync ? ?
huditimelineservice ? ?
hudiutilities 69.43% <ø> (-0.04%) 0.00 <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ Complexity Δ
...hudi/utilities/sources/helpers/KafkaOffsetGen.java 85.84% <0.00%> (-2.94%) 20.00% <0.00%> (+4.00%) ⬇️
...in/java/org/apache/hudi/utilities/UtilHelpers.java 64.53% <0.00%> (-1.17%) 33.00% <0.00%> (+1.00%) ⬇️
...ies/sources/helpers/DatePartitionPathSelector.java 54.68% <0.00%> (-0.16%) 13.00% <0.00%> (ø%)
...apache/hudi/utilities/deltastreamer/DeltaSync.java 70.00% <0.00%> (ø) 52.00% <0.00%> (+2.00%)
...s/deltastreamer/HoodieMultiTableDeltaStreamer.java 78.39% <0.00%> (ø) 18.00% <0.00%> (ø%)
...rg/apache/hudi/metadata/HoodieMetadataPayload.java
...pache/hudi/hadoop/realtime/HoodieParquetSerde.java
...i/common/util/collection/ExternalSpillableMap.java
.../src/main/java/org/apache/hudi/dla/util/Utils.java
...n/java/org/apache/hudi/hadoop/InputSplitUtils.java
... and 376 more

@garyli1019
Copy link
Member

Hi @yui2010 , thanks for your contribution. I am trying to understand the context a bit better.
Is there any issue with the current implementation that the filters are not being pushed down?
What is the advantage of using CatalystScan other than PrunedFilteredScan? I am a little bit concern about the Unstable marker on this API.

@yui2010
Copy link
Contributor Author

yui2010 commented Dec 28, 2020

Hi @garyli1019 sorry for reply late.
1. About partition pruning . it will skip unneeded data for example:
there are follow partitions:
/hudi_ws/order/dt=20200801
/hudi_ws/order/dt=20200802
... ...
/hudi_ws/order/dt=20200831
Query sql like " select * from order where dt>'20200820' ". it will start 31 tasks and 20 tasks which partition are in
{20200801,...,20200820} are not need running . if we support partition pruning (current implement in spark 1. built-in
FileSourceStrategy 2. spark v3 datasource v2 [https://issues.apache.org/jira/browse/SPARK-30428]
(https://issues.apache.org/jira/browse/SPARK-30428) ). it will skip unneeded partition data
and only running 11 tasks which partition are in {20200821,...,20200831}. it will make more effective

  1. About 'CatalystScan other than PrunedFilteredScan'
    This is just for the convenience of using listFiles method partitionFilters parameter
    PartitioningAwareFileIndex#listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression])
    CatalystScan was marked 'Experimental' as you mentioned
    CatalystScan and PrunedFilteredScan built thought DataSourceStrategy. I investigated it. it didn't have any special logic.
    They just passed different parameters. and there is usecase (https://github.com/Huawei-Spark/Spark-SQL-on- HBase/blob/master/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala)
    I'm also troubled with ( 1. FileSourceStrategy did partitionKeyFilters but no in DataSourceStrategy 2. using CatalystScan
    or PrunedFilteredScan 3. split allPredicates to partitionfilter and datafilter) when I implement this function. and do you
    have any suggestions on the ' support partition pruning'

Copy link
Member

@garyli1019 garyli1019 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yui2010 Thanks for your explanation. Currently, the PrunedFilteredScan only support the filter pushdown and column pruning, but doesn't work for the partition pruning. And I guess CatalystScan will just do the same here?

IMO, if we need to support the partition pruning, we will need to add partition information to the partitionedFile when we MergeOnReadSnapshotRelation.buildFileIndex(), and pass the partitionSchema into the parquet reader, then we will know which file is coming from which partition. Currently, when we list the file, we don't know where this file is coming from, so I guess we can't actually pruning the files based on the partition? WDYT?

val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we need to somehow handle the partition here.

Copy link
Contributor Author

@yui2010 yui2010 Jan 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @garyli1019 sorry for reply late.
I have not understand completely “ if we need to support the partition pruning, we will need to add partition information to the partitionedFile” which you mentioned

I think we can support the partition pruning in buildFileIndex without additional info except i misunderstand your description

Current implements for support the partition pruning have two main logic as follow:

  1. val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths)

InMemoryFileIndex#refresh0 will list all the LeafFiles. example as follow:


/hudi/data/order/dt=20200610/a.parquet
/hudi/data/order/dt=20200807/b.parquet
/hudi/data/order/dt=20200808/c.parquet
/hudi/data/order/dt=20200808/c_1.log
/hudi/data/order/dt=20200808/d.parquet
/hudi/data/order/dt=20200809/e.parquet
  1. inMemoryFileIndex.listFiles(filters, filters)

PartitioningAwareFileIndex#inferPartitioning() will infer partition Columns and paths info. example as follow:

   PartitionSpec(
     partitionColumns = StructType(
       StructField(name = “dt”, dataType = StringType, nullable = true),
     partitions = Seq(
       Partition(values = Row(“20200610”), path = "hdfs://hudi/data/order/dt=20200610"),
       Partition(values = Row(“20200807”), path = "hdfs://hudi/data/order/dt=20200807"),
       Partition(values = Row(“20200808”), path = "hdfs://hudi/data/order/dt=20200808"),
       Partition(values = Row(“20200809”), path = "hdfs://hudi/data/order/dt=20200809"))

PartitioningAwareFileIndex#prunePartitions will filter the unneeded path thought partitionFilters parameter

If our sql is “select rkey,date from test_order where dt == '20200808' “ and we will get Partition(values = Row(“20200808”), path = "hdfs://hudi/data/order/dt=20200808")
And MergeOnReadSnapshotRelation#buildFileIndex will return two HoodieMergeOnReadFileSplit(c.parquet and d.parquet)

Some test case log as follow:

12657 [main] INFO  org.apache.spark.sql.execution.datasources.InMemoryFileIndex  - Selected 1 partitions out of 4, pruned 75.0% partitions.
12711 [main] INFO  org.apache.hudi.common.table.view.HoodieTableFileSystemView  - Adding file-groups for partition :dt=20200808, #FileGroups=2
73169 [main] INFO  org.apache.spark.scheduler.DAGScheduler  - Job 1 finished: show at HudiAnalyticSql.java:281, took 24.457325 s
+----+-------------------+
|rkey|date               |
+----+-------------------+
|4   |2020-08-08 09:58:48|
|20  |2020-08-08 16:55:59|
|21  |2020-08-08 17:53:59|
+----+-------------------+

So we can support the partition pruning in buildFileIndex without additional info
And you mentioned “pass the partitionSchema into the parquet reader” do you mean the query result will include partition columns ?

I agree with you on “PrunedFilteredScan and CatalystScan only support the filter pushdown and column pruning”

About implements in spark 2.x datasource v1 and v2 (not supported)
We have two options to do this
1.in order to reuse PartitioningAwareFileIndex#listFiles we will convert filter:Array[Filter] to filter:Seq[Expression]
2.implements our listFiles(filter:Array[Filter]).

About implements in spark 3.x (supported https://issues.apache.org/jira/browse/SPARK-30428)
We can implements MergeOnReadSnapshotScan like CSVScan

the current implementation is a simplified way(it has been deployed to our test cluster), and shall we need to discuss more detail?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @yui2010 , how's your dataset looks like? Does it has a dt column in the dataset? The partitioning I am referring to is that when you spark.read.format('hudi').load(basePath) and your dataset folder structure looks like basePath/dt=20201010, then Spark is able to append a dt column to your dataset. When you do sth like df.filter(dt=20201010), spark will go to this partition and read the file. How's your workflow to load your data and pass the partition information to Spark?
In order to get more information about this implementation, would you write a test to demo the partition pruning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @garyli1019, very sorry for reply late. I'm very busy these days because of the end of the year
i have submitted a test case testPrunePartitions to demo this.

val globPaths = globPathIfNecessary(fs, qualified)
globPaths
})
val filteredGlobPaths = globPaths.filterNot( path => TablePathUtils.isHoodieMetaPath(path.toString) || shouldFilterOut(path.getName))
Copy link
Contributor Author

@yui2010 yui2010 Feb 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there have two reason for filter the hoodie meta path :

  1. if our loadPath like bathPath/*/* it will load all .hoodie/*.deltacommit file and cause spark do many fs.listStatus . this is uneffectively
  2. load all .hoodie/*.deltacommit file will cause exception because discoveredBasePaths.distinct.size is 2 when we use spark listFiles to prunes partitions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor. why two lines.
just globPaths.filterNot( path => TablePathUtils.isHoodieMetaPath(path.toString) || shouldFilterOut(path.getName))
should suffice as last line?

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have much exp with this code. Will let Gary preside over the review.

return getTablePathFromPartitionPath(fs, directory);
}

public static Boolean isHoodieMetaPath(String path) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: isHoodieMeta"data"Path

@nsivabalan nsivabalan added the priority:critical Production degraded; pipelines stalled label Feb 11, 2021
* @return list of absolute file paths
*/
def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): Seq[Path] = {
val globPaths =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we enhance java docs (lines 76, 77) to convey that we do partition pruning as well within this method.

val globPaths = globPathIfNecessary(fs, qualified)
globPaths
})
val filteredGlobPaths = globPaths.filterNot( path => TablePathUtils.isHoodieMetaPath(path.toString) || shouldFilterOut(path.getName))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor. why two lines.
just globPaths.filterNot( path => TablePathUtils.isHoodieMetaPath(path.toString) || shouldFilterOut(path.getName))
should suffice as last line?

new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache)
}

def createInMemoryFileIndex(sparkSession: SparkSession, userSpecifiedSchema: Option[StructType], parameters: Map[String, String], globbedPaths: Seq[Path]): InMemoryFileIndex = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need the other method (line 94 to 97) since we have this new method?

}

// just for test
def getFileIndexPaths = fileIndex.map(x => x.dataFile.get.filePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any annotation like VisibleOnlyForTests as we have in java. Not a big fan of introducing public methods in source code just for test purposes.

import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.sources.{BaseRelation, Filter}

object PushDownUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java docs.

val globPaths: Seq[Path],
val metaClient: HoodieTableMetaClient)
extends BaseRelation with PrunedFilteredScan with Logging {
extends BaseRelation with CatalystScan with Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@garyli1019 : what's the counter part of this file for COW? I understand this PR is one for MOR. but just for my understanding.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't have an independent class counterpart for COW since we just need to pass a filter to a normal parquet reader.
https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala#L165

dataGen.setPartitionPaths(Array("20150316","20150317","20160315"));
val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the test, I see how it works now, so we are actually talking about two different partition pruning mechanisms.
In this PR, the input data frame already has a partition field before write and spark will map the partition field to the actual folder partition in the file system.
The way I was talking about is that the input data frame does not has a partition field, and spark will append a partition field while reading. This is how the partition discovery of parquet file works https://spark.apache.org/docs/2.4.0/sql-data-sources-parquet.html#partition-discovery. If the data frame has a field with the same name, then it will throw an error.
@yui2010 Did I understand correctly?

Copy link
Contributor Author

@yui2010 yui2010 Mar 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @garyli1019 and @nsivabalan
yes, In fact, the case about the partition column schema have not in data schema does exist and i write testcase(testPrunePartitionsWithAppendedPartitionsSchema) to cover this.

// we have three partition values: "2015/03/16","2015/03/17","2016/03/15", path view as follow:
// path
// └── to
// └── table
// ├── year=2015
// │ ├── month=03
// ├── day=16
// │ │ └── data.parquet
// ├── day=17
// │ │ └── data.parquet
// ├── year=2016
// │ ├── month=03
// ├── day=15
// │ │ └── data.parquet

and i have a problem with the misplacement of the returned data.

val sql = "select month, seconds_since_epoch, year from mor_test_partition_table where year < '2016' and month > '01'" (select seconds_since_epoch, year, month is ok )
spark.sql(sql).show(1)

it returned incorrect result as follow:
+----------+-------------------+----+
| month |seconds_since_epoch|year|
+----------+-------------------+----+
|1556774336| 2015 | 3 |
+----------+-------------------+----+

when we use spark.read.format("parquet")
val sql = "select month, seconds_since_epoch, year from mor_test_partition_table where year < '2016' and month > '01'"
spark.sql(sql).show(1)
it returned the correct result as follow:
+-----+------------------------+------+
|month| seconds_since_epoch | year |
+-----+------------------------+------+
| 3 |7574769756561934803| 2015 |
+-----+------------------------+------+

and I investigated the spark code and found that the cause of this incorrect result was due to the different of RowDataSourceScanExec(datasource using eg. format("org.apache.hudi")) and FileSourceScanExec(spark native using eg. format("parquet"))

  1. when we using spark query parquet, ParquetRecordReader will return the internalRow value with data schema column(not contains partition schema column) value and append partition Values

VectorizedParquetRecordReader

    if (partitionColumns != null) {
      int partitionIdx = sparkSchema.fields().length;
      for (int i = 0; i < partitionColumns.fields().length; i++) {
        ColumnVectorUtils.populate(columnVectors[i + partitionIdx], partitionValues, i);
        columnVectors[i + partitionIdx].setIsConstant();
      }
    }
  1. RowDataSourceScanExec and FileSourceScanExec have the output: Seq[Attribute]
    there is a default correspondence rule: the order of columnVectors corresponds to the order of output attributes

FileSourceScanExec using readDataColumns ++ partitionColumns order

      val readDataColumns =
        dataColumns
          .filter(requiredAttributes.contains)
          .filterNot(partitionColumns.contains)
      val outputSchema = readDataColumns.toStructType
      logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")

      val outputAttributes = readDataColumns ++ partitionColumns

      val scan =
        FileSourceScanExec(
          fsRelation,
          outputAttributes,
          outputSchema,
          partitionKeyFilters.toSeq,
          bucketSet,
          dataFilters,
          table.map(_.identifier))

such as: columnVectors[0] ~ output(0),columnVectors[1] ~ output(1),...

RowDataSourceScanExec using requiredColumns order

_/** Physical plan node for scanning data from a relation. */
case class RowDataSourceScanExec(
    fullOutput: Seq[Attribute],
    requiredColumnsIndex: Seq[Int],
    filters: Set[Filter],
    handledFilters: Set[Filter],
    rdd: RDD[InternalRow],
    @transient relation: BaseRelation,
    override val tableIdentifier: Option[TableIdentifier])
  extends DataSourceScanExec {

  def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput)
  1. BufferedRowIterator will rebuild internalRow value order from projectList's index of output
    val exprs = projectList.map(x => BindReferences.bindReference[Expression](x, child.output))

so FileSourceScanExec using readDataColumns ++ partitionColumns order return corrected result

project_month ~ output(2) ~ columnVectors[2]
project_seconds_since_epoch ~ output(0) ~ columnVectors[0]
project_year ~ output(1) ~ columnVectors[1]

+-----+------------------------+------+
|month| seconds_since_epoch   | year    |
+-----+------------------------+------+
|    3    |7574769756561934803| 2015   |
+-----+------------------------+------+

RowDataSourceScanExec using requiredColumns order return incorrect result

project_month ~ output(0) ~ columnVectors[0] ~ seconds_since_epoch value
project_seconds_since_epoch ~ output(1) ~ columnVectors[1] ~ year value
project_year ~ output(2) ~ columnVectors[2] ~ month value

+----------+-------------------+----+
|     month  |seconds_since_epoch|year|
+----------+-------------------+----+
|1556774336|        2015       |   3  |
+----------+-------------------+----+

so it's very difficult to use spark2.x data source V1 code to achieve this function (have not api override RowDataSourceScanExec's output attribute) if i understand correctly
and i researched datasource v2 some information as follow:

  1. datasource v2 api spark 2.x https://issues.apache.org/jira/browse/SPARK-15689
    we can implement HoodieFileDataSourceReader#readSchema method
    and after pruneColumns we can get readDataColumns ++ partitionColumns order

  2. datasource v2 api spark 3.x https://issues.apache.org/jira/browse/SPARK-22386
    implementation will may be easier because there is already some native case https://issues.apache.org/jira/browse/SPARK-30428

so will the community consider introducing V2 for the long view ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @yui2010 , thanks for promoting this detailed explanation. Will look into this in the next few days.
Regarding Datasource V2, yes, the community is planning to rewrite the data source API in V2, for better Spark 3 support. If you are interested, contributing is always welcome!

@nsivabalan
Copy link
Contributor

@garyli1019 : whats the status of the PR in general. when do you think we can get this landed.

@garyli1019
Copy link
Member

@garyli1019 : whats the status of the PR in general. when do you think we can get this landed.

@nsivabalan my concern is CatalystScan is not a stable API, not sure if this could be a problem. Need a Spark expert to take a look here. @umehrot2 @zhedoubushishi @pengzhiwei2018 any of you guys able to take a look here?

@yui2010
Copy link
Contributor Author

yui2010 commented Mar 13, 2021

Add testPrunePartitionsWithMisplacedReturnData test case to demo misplacement of the returned data

@nsivabalan nsivabalan removed their assignment Apr 5, 2021
@vinothchandar vinothchandar self-assigned this Apr 19, 2021
@vinothchandar
Copy link
Member

#2926 overlaps with this? @yui2010 , @pengzhiwei2018 any thoughts?

@yui2010
Copy link
Contributor Author

yui2010 commented Jun 9, 2021

#2926 overlaps with this? @yui2010 , @pengzhiwei2018 any thoughts?

yes. I noted pengzhiwei2018's good implements, there is still may have the problem about result disorder for the situation that input data frame does not has a partition field as gary mentioned (a different partition pruning mechanisms).
this is due to spark datasource V1 framework which I described in the previous large section. we can handle it better in datasource V2. and we can close the pr now

@vinothchandar vinothchandar added the status:in-progress Work in progress label Jun 27, 2021
@hudi-bot
Copy link
Collaborator

hudi-bot commented Jun 27, 2021

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run travis re-run the last Travis build
  • @hudi-bot run azure re-run the last Azure build

@garyli1019
Copy link
Member

@yui2010 thanks for your contribution, I think appending the partition column to the data source table should be the ultimate way. closing this pr for now.

@garyli1019 garyli1019 closed this Jun 30, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:critical Production degraded; pipelines stalled status:in-progress Work in progress

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants