Skip to content

Conversation

@garyli1019
Copy link
Member

What is the purpose of the pull request

This PR implement Spark Datasource for MOR table

Brief change log

  • Implemented realtimeRelation
  • Implemented HoodieRealtimeInputFormat on top of Spark SQL internal ParquetFileFormat
  • Implemented HoodieParquetRecordReaderIterator and RecordReader

Verify this pull request

This change added tests and can be verified as follows:

  • Added TestRealtimeDataSource to verify this feature.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@vinothchandar vinothchandar self-assigned this Jun 10, 2020
@vinothchandar
Copy link
Member

@umehrot2 take a look as well?

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@garyli1019 Took an initial pass.. Overall

like to understand if we can wrap more than reusing code from other places.. Want to ensure COW/Snapshot is not affected in any way.

For the merging itself, I had a slightly different approach in mind, where we wrap ParquetFileFormat. and use it for reading the base parquet file using Spark's own record reader and we simply read the logs using our code... then merge can be implemented as another Iterator..

High level flow looks good.. and very exciting to see this PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one thing to ensure is COW/Snapshot will use spark's native parquet reader.. I think we push the same path filter above and reuse the spark source here. so should work.. But good to test once with few queries and ensure there is no change in perf

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea we can do this once we have the benchmarking framework ready.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this codde re-used from some place?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@garyli1019
Copy link
Member Author

@vinothchandar Thanks for the feedback. Your approach makes sense to me. If we can do it that way then we can reduce some maintenance overhead and be more flexible for the future Spark upgrade. I will try to see if I can do it that way.

@garyli1019 garyli1019 force-pushed the HUDI-69 branch 2 times, most recently from 444c71a to d90d4f3 Compare June 11, 2020 01:13
@garyli1019
Copy link
Member Author

Successfully got rid of those RecordReaders! @vinothchandar Thanks for the hint!

@codecov-commenter
Copy link

codecov-commenter commented Jun 12, 2020

Codecov Report

Merging #1722 into master will decrease coverage by 3.03%.
The diff coverage is 1.44%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #1722      +/-   ##
============================================
- Coverage     62.82%   59.79%   -3.04%     
- Complexity     3437     3610     +173     
============================================
  Files           401      439      +38     
  Lines         17091    19183    +2092     
  Branches       1698     1946     +248     
============================================
+ Hits          10737    11470     +733     
- Misses         5623     6924    +1301     
- Partials        731      789      +58     
Flag Coverage Δ Complexity Δ
#hudicli 68.45% <ø> (?) 1430.00 <ø> (?)
#hudiclient 79.24% <ø> (ø) 1258.00 <ø> (ø)
#hudicommon 54.29% <ø> (+0.04%) 1486.00 <ø> (+1.00)
#hudihadoopmr 39.36% <ø> (ø) 163.00 <ø> (ø)
#hudihivesync 72.25% <ø> (ø) 121.00 <ø> (ø)
#hudispark 36.39% <1.44%> (-7.83%) 76.00 <0.00> (ø)
#huditimelineservice 63.47% <ø> (ø) 47.00 <ø> (ø)
#hudiutilities 73.75% <ø> (ø) 287.00 <ø> (ø)
Impacted Files Coverage Δ Complexity Δ
.../main/scala/org/apache/hudi/SnapshotRelation.scala 0.00% <0.00%> (ø) 0.00 <0.00> (?)
...urces/parquet/HoodieMergedParquetRowIterator.scala 0.00% <0.00%> (ø) 0.00 <0.00> (?)
...rces/parquet/HoodieParquetRealtimeFileFormat.scala 0.00% <0.00%> (ø) 0.00 <0.00> (?)
...src/main/scala/org/apache/hudi/DefaultSource.scala 65.71% <33.33%> (-4.88%) 7.00 <0.00> (ø)
...main/scala/org/apache/hudi/DataSourceOptions.scala 93.54% <100.00%> (ø) 0.00 <0.00> (ø)
.../main/scala/org/apache/hudi/cli/SparkHelpers.scala 0.00% <0.00%> (ø) 0.00% <0.00%> (?%)
...src/main/java/org/apache/hudi/cli/TableHeader.java 77.77% <0.00%> (ø) 5.00% <0.00%> (?%)
...org/apache/hudi/cli/commands/RollbacksCommand.java 94.00% <0.00%> (ø) 8.00% <0.00%> (?%)
...ava/org/apache/hudi/cli/commands/UtilsCommand.java 100.00% <0.00%> (ø) 3.00% <0.00%> (?%)
... and 34 more

Comment on lines 54 to 64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will likely have performance implications and be slower than spark's listing mechanism which uses spark context to parallelize listing (across the cluster) along with other optimizations. Also, we have already known HoodieROTablePathFilter to be really expensive specially when it comes to S3 and it is still a bottleneck for read performance in read optimized queries where it is used with parquet relation, where it is applied sequentially at the driver to all the files.

It might be worth for you to checkout my implementation once as well where I am re-using the InMemoryFileIndex implementation of spark to perform the listing so that all the listing optimizations of spark are retained. Also instead of passing it this expensive filter, which gets applied sequentially to each and every file, I am instead using fsView.getLatestBaseFiles to get the latest view of the files.

https://github.com/apache/hudi/pull/1702/files#diff-f14ac7b3cff88313d650b01a56a2b8f8R155

Also does this handle glob paths like s3://bucket/huditable/*/* ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great to know! We definitely can reuse your implementation there. At this point, I don't know if there is any other way to list all the file status without using HoodieROTablePathFilter. RFC-15 will make this more efficient as well.
No it doesn't support glob paths right now. I think we could put all the paths handling into your HudiSparkUtils

Comment on lines 46 to 49
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this does not support predicate pushdown or column pruning ? Seems like not because we need to implement PruneFilteredScan instead of TableScan to support these, and pass it down for reading to the parquet file format

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it does. I checked the physical plan of a filter query and the filter was pushed down.
IIUC, TableScan can automatically push down the filter and column pruning, like what we did in the incremental relation. PruneFilteredScan was usually used for Spark connector for other databases to pushdown Spark filter to db SQL query.
I found an example on spark-cassandra-connector
https://github.com/garyli1019/spark-cassandra-connector/blob/master/connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSourceRelation.scala#L367

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wrong here. We need the PruneFilteredScan to push down the filter and projection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am really not sure how this will play out with huge amounts of data containing thousands of files. We would be serializing passing over this huge map to each and every task, even though each spark task is supposed to work on only one or few file splits. We need to test how this plays out with extremely large number of files.

Copy link
Member Author

@garyli1019 garyli1019 Jun 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree here. Other possible implementations could be:

  • Add it to Hadoop config, but it could cause the same issue as using the option hashmap.
    - Concatenate the logPath to the parquetPath, then split path in the executor before doing the file split.(EDIT: Doesn't work, in the FileFormat, the file is already a PartitionedFile type)
  • Search log based on parquetPath on each executor. This could put pressure on the name node as well. (EDIT: This probably doesn't work as well. Getting the meta client and file system view is very expensive.)

I personally prefer the second way if we can efficiently handle all the file listing on the driver side. I will see if it's possible to implement it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should refrain from listing other executors. @umehrot2 do you have any suggestions here?
FWIW the amount of data would be proportional to the amount of partitions globbed which should not be as bad?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just something to think about, Spark forms file splits by default when you use a FileFormat type datasource. It can potentially split this parquet files (with multiple row groups) in between and send it to task for reading. Does this play out well with merging splits with corresponding log files ? I do think that it should be okay in this case, but wanted to point it out if you guys see any potential issues.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is similar to the Hive file split we have. If we split one parquet file into two splits, we will do the log scanning for the same logfile twice.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@garyli1019 true.. but if the logs are small w.r.t parquet base, this might be okay actually.. For workloads with high write amplification, people tend to have much smaller file sizes anyway.. We should support the row group level parallel splitting for sure, otherwise, performance would be an issue.. btw Hive achieves this today.. We wrap each file split into a realtime file split..

@umehrot2
Copy link
Contributor

umehrot2 commented Jun 13, 2020

Like @vinothchandar I do agree with the high level approach here, and thanks for putting out this PR 👍 However, I would highly recommend both of you to check out #1702 which is along similar lines, and solves some of the issues I see in this PR:

  • Here we are instantiating another datasource/relation i.e. HoodieRealtimeFileFormat and spark parquet relation within Snapshot relation which has overheads associated with it, like spark having to form index again by listing the paths passed to HoodieRealtimeFileFormat and spark parquet relations to be able to instantiate them.

  • We are re-using the ParquetFileFormat reader and all of its functionalities like vectorized reading , predicate pushdown, column pruning without having to copy the over and maintain it internally.

  • We do not have to pass the expensive map from parquet to log files to each task. Instead it gives complete control over what goes into each task partition, and we send only the file and its corresponding mapping (in our case external data file, and in this case log file) over to the task. It is the very use to RDD interface to have that kind of control over the datasource we are building.

Happy to have more in-depth discussion on this and help get this to completion.

@vinothchandar
Copy link
Member

Thanks for the detailed comments, @umehrot2 ! Will process and also look at your other pr.

So, on some of the perf issues you pointed out; please raise 0.6.0 jiras with blocker priority, so we can get them fixed in the nexr major release

Copy link
Member Author

@garyli1019 garyli1019 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@umehrot2 Thank you for your insightful comments! I will definitely take a closer look at your PR.

  • I think HoodieRealtimeFileFormat is necessary here. Because we need a certain level of customization e.g. HoodieParquetRecordReaderIterator HoodieRealtimeFileSplit that are not exposed to the user. But I do agree that it will give us some overhead on maintenance. Regarding the performance side, if most parts of the data files are stand-alone parquet files, the performance difference wouldn't be too large.
  • For HoodieRealtimeFileFormat, we can support predicate pushdown, but not vectorized reader. Not sure if column pruning will work when vectorized reader is not available(EDIT: column pruning still works when not using the vectorized reader). Because we need to merge on rows, we need to extract value at least from certain columns to do the comparison. We need the correct schema for the log file, a way to convert those fake UnsafeRow to HoodieRecord type to merge with log e.t.c. I think this could be quite complicated but still possible to do in the future.
  • Yes, I will try to find a better way to send the paths to executors without broadcasting the whole map.

Comment on lines 46 to 49
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it does. I checked the physical plan of a filter query and the filter was pushed down.
IIUC, TableScan can automatically push down the filter and column pruning, like what we did in the incremental relation. PruneFilteredScan was usually used for Spark connector for other databases to pushdown Spark filter to db SQL query.
I found an example on spark-cassandra-connector
https://github.com/garyli1019/spark-cassandra-connector/blob/master/connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSourceRelation.scala#L367

Comment on lines 54 to 64
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great to know! We definitely can reuse your implementation there. At this point, I don't know if there is any other way to list all the file status without using HoodieROTablePathFilter. RFC-15 will make this more efficient as well.
No it doesn't support glob paths right now. I think we could put all the paths handling into your HudiSparkUtils

Copy link
Member Author

@garyli1019 garyli1019 Jun 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree here. Other possible implementations could be:

  • Add it to Hadoop config, but it could cause the same issue as using the option hashmap.
    - Concatenate the logPath to the parquetPath, then split path in the executor before doing the file split.(EDIT: Doesn't work, in the FileFormat, the file is already a PartitionedFile type)
  • Search log based on parquetPath on each executor. This could put pressure on the name node as well. (EDIT: This probably doesn't work as well. Getting the meta client and file system view is very expensive.)

I personally prefer the second way if we can efficiently handle all the file listing on the driver side. I will see if it's possible to implement it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is similar to the Hive file split we have. If we split one parquet file into two splits, we will do the log scanning for the same logfile twice.

@garyli1019
Copy link
Member Author

Few major concerns here:

  • Listing files are too expensive.
    Solution: Switch to bootstrap file listing methods once udit's PR merged. Move to RFC-15 once it was ready.
  • Broadcasting paths in option hashmap could cause performance issues.
    I am not sure if there is a better way to do this until RFC-15 is ready. Search log files from executors could be more expensive since it requires TableView and metaClient e.t.c. Even we have thousands of log files, the hashmap might be a few MBs, so I guess it could be ok?

Major follow-ups after this PR:

  • Incremental view on MOR table.
  • Vectorized reader
  • More efficient type conversion.
  • Support custom payload.
    Those four can be done in parallel but all depend on this PR. We can make a baseline in this PR and iterate through different topics in parallel.
    Also, we can ask help from the community to test in their production for a very large dataset. This could be quite easy if they are using MOR table.

@garyli1019 garyli1019 requested a review from vinothchandar June 17, 2020 05:34
@vinothchandar
Copy link
Member

Just finished reviewing bootstrap pr which blocks udits pr :). Will read up and review both your and udits or shortly

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need more work IMO.. lets keep hashing this out.. I will spend time on Spark datasource APIs more deeply this week and can come back with a concrete plan for this as well. Meantime, some thing to think about.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For cow, snapshot is the default anyway.
Why would globing like that error out?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don’t think this change is necessary, right? RO view does map to snapshot query for cow. We may need to have two maps for cow and mor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got confused by the naming sometimes...
So for COW table, snapshot view = read optimized view
for MOR, snapshot view and read optimized view are different things.
With bootstrap, we will have one more view.
Can we call read optimized view -> parquet only(including bootstrap) snapshot view -> parquet(with bootstrap) merge with log regardless of table type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep this mapping because we should be able to do RO view on MOR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No.. there are no more views.. we did a renaming exercise to clear things up as "query types" .. with that there should be no confusion.. our docs are consistent with this as well.. On COW there is in fact no RO view.. so this change has to be done differently, if you need for MOR..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, my previous comments are confusing, let me rephrase.
What I trying to do here is to not change the query behavior. Since before we don't support snapshot query for MOR, so RO and snapshot query type will behave the same regardless of its COW or MOR.
If we don't change this mapping, the user will have different behavior after upgrade to the next release. If they are using VIEW_TYPE_READ_OPTIMIZED_OPT_VAL(deprecated) on MOR in their code, after upgrade to the next release, the code will run snapshot query instead of RO query. This could give users surprise even this key was deprecated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

f they are using VIEW_TYPE_READ_OPTIMIZED_OPT_VAL(deprecated) on MOR in their code, after upgrade to the next release, the code will run snapshot query instead of RO query.

we have been logging warning for sometime on the use of the deprecated configs. and so I think its fair to do the right thing here moving forward and call this out in the release notes. Let me push some changes..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is what we use for both cow and mor.. see comment above. Would be best to keep behavior for cow same

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not as efficient as the default DataSource.apply().resolveRelation() yet, due to the filter pushdown and column pruning is not supported yet. But we will get there soon...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:indentation

This changes behavior for cow/snapshot. I feel we should have two separate methods - createCopyOnWriteRelation (No change in behavior) and createMergeOnReadRelation (changes for this poor)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is hard without a path handler. Now we tell the user to add glob pattern to load(basePath + "/*/*/*/*") for COW table and the glob will be in the basePath. The Spark default DataSource.apply().resolveRelation() is able to handle the glob, but our custom relation not able to handle this. This is why our incremental relation requires .load(basePath) only. Udit's PR has this path handler so we will have a unified place to handle all the paths.
I think we can change the default option to READ_OPTIMIZED, so the user side has no impact. Currently spark datasource only supports snapshot on COW anyway, which is the same as READ_OPTIMIZED. We can switch back later with the path handler https://github.com/apache/hudi/pull/1702/files#diff-683cf2c70477ed6cc0a484a2ae494999R72

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@garyli1019 true.. but if the logs are small w.r.t parquet base, this might be okay actually.. For workloads with high write amplification, people tend to have much smaller file sizes anyway.. We should support the row group level parallel splitting for sure, otherwise, performance would be an issue.. btw Hive achieves this today.. We wrap each file split into a realtime file split..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we avoid all these copied code from ParquetFileFormat by wrapping or overriding? This should be doable?

I am concerned about the amount of borrowed code in this file..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also we have re-implemented the merge by hand again in this class.. I was trying to explore if we can reuse the existing HoodieRecordReader classes by templatizing for Row instead of ArrayWritable .. That's atleast the more longer term option to pursue..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a single parquet file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be only a portion of a file. From spark description A part (i.e. "block") of a single file that should be read, along with partition column values that need to be prepended to each row.

Copy link
Member Author

@garyli1019 garyli1019 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing. I will address the comments soon.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is like incremental relation. We need the base path to get the hudi table view. We need a smarter path handler to handle the glob path and udit's pr have this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got confused by the naming sometimes...
So for COW table, snapshot view = read optimized view
for MOR, snapshot view and read optimized view are different things.
With bootstrap, we will have one more view.
Can we call read optimized view -> parquet only(including bootstrap) snapshot view -> parquet(with bootstrap) merge with log regardless of table type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goes back to the MapReduce v1 vs v2 API. Hive use V1 and Spark use V2. Spark also warp V2(from parquet) into its own record reader. I couldn't find a way to make Hive and Spark share one record reader. The vectorized reader would be another story as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be only a portion of a file. From spark description A part (i.e. "block") of a single file that should be read, along with partition column values that need to be prepended to each row.

@garyli1019 garyli1019 force-pushed the HUDI-69 branch 2 times, most recently from 610e727 to c80e26f Compare June 25, 2020 05:40
@garyli1019
Copy link
Member Author

@vinothchandar Thanks for reviewing! I created tickets for the follow-up work. All the file listing and globing can be improved after @umehrot2 's PR merged.

@garyli1019 garyli1019 force-pushed the HUDI-69 branch 2 times, most recently from a5ee2fb to d2daeb2 Compare June 28, 2020 23:19
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No.. there are no more views.. we did a renaming exercise to clear things up as "query types" .. with that there should be no confusion.. our docs are consistent with this as well.. On COW there is in fact no RO view.. so this change has to be done differently, if you need for MOR..

// TODO: Smarter path handling
val metaClient = try {
val conf = sqlContext.sparkContext.hadoopConfiguration
Option(new HoodieTableMetaClient(conf, path.get, true))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would n't be problematic if path is a glob and not the actual basePath of the table? COW/Snapshot query can for e.g do this and I think we should handle the same for MOR as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point we have:

  • RO, Snapshot query for COW: Support glob and basePath
  • Snapshot for MOR: only support basePath
  • Incremental: Only support basePath

What I am trying to do here is:

  • If the path contains glob, fall back to RO. This is the current behavior. Create metaClient will throw an Exception but handled below.
  • If the path is basePath, we create the metaClient. If COW table, go RO relation. If MOR, go snapshot relation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Snapshot for MOR: only support basePath

Let me think about this more. We need to support some form of globbing for MOR/Snapshot query.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Udit's PR has this path handling. Should we merge part of his PR first? https://github.com/apache/hudi/pull/1702/files#diff-9a21766ebf794414f94b302bcb968f41R31
With this, we can handle user to .load(basePath) or .load(basePath + "/*/*") for COW, MOR and incremental.

val conf = sqlContext.sparkContext.hadoopConfiguration
Option(new HoodieTableMetaClient(conf, path.get, true))
} catch {
case e: HoodieException => Option.empty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can just error out there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used this as a flag that the path is not basePath. This is a temporary solution to not change the query behavior.
This will be handled better with: https://github.com/apache/hudi/pull/1702/files#diff-9a21766ebf794414f94b302bcb968f41R31


override def shortName(): String = "hudi"

private def getReadOptimizedView(sqlContext: SQLContext,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can rename to something like getFilteredBaseFileRelation(). Again, don't want to bring back view nomenclature into the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will do

* This class is an extension of ParquetFileFormat from Spark SQL.
* The file split, record reader, record reader iterator are customized to read Hudi MOR table.
*/
class HoodieParquetRealtimeFileFormat extends ParquetFileFormat {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still mulling if this file can be simpler.. working on it..

heads up : if we have to re-use code, we need to attribute in NOTICE/LICENSE as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use the FileFormat approach, we probably can't avoid copy some Spark code. For datasource V2, we need to copy more code since Spark 3 use case class for all the FileFormat
I will try to use udit's RDD approach https://github.com/apache/hudi/pull/1702/files#diff-809772c649e85ffb321055d9871e37e0R39
I think that's doable. In that approach, we can get rid of this, but need to try that after his PR merged. Will need to reuse many code from that PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@garyli1019 it should be possible to call super.buildReader..() get the Iterator[InternalRow] for the parquet base file alone and then use the class above to merge right?
It will avoid a ton of code from this file..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could come with a schema issue. super.buildReader..() could use a vectorized reader that only read a few columns and right now I didn't implement merging Avro with InternalRow with different schema. Right now we can only merge row by row with the same schema.
We will get rid of this FileFormat approach. The RDD approach is cleaner. https://github.com/apache/hudi/pull/1702/files#diff-809772c649e85ffb321055d9871e37e0R75

* Custom payload is not supported yet. This combining logic is matching with [OverwriteWithLatestAvroPayload]
* @param rowReader ParquetRecordReader
*/
class HoodieMergedParquetRowIterator(private[this] var rowReader: ParquetRecordReader[UnsafeRow],
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar
Copy link
Member

I am fine with doing that.. not sure if thats more work for @umehrot2 .. wdyt ?

@bvaradar in general, can we get more of the bootstrap landed and work on the follow ups vs having these large PRs pending out there this long.. We kind of have a traffic gridlock here.

@bvaradar
Copy link
Contributor

@garyli1019 @vinothchandar : Yes, I am planning to address the bootstrap PR comments and also give review comments for @umehrot2 changes by this weekend. @umehrot2 : I know this is not ideal and apologies for not able to give a review quickly.

@vinothchandar
Copy link
Member

@garyli1019 actually @umehrot2 's approach there is wrapping the FileFormat as opposed to extending, which is great as well.. If you can wait, we can land the bootstrap changes and rework this on top. But, I feel what you will be doing here is a very different merge than @umehrot2 's PR and the code to wrap ParquetFileFormat is not that large.. So you can also in parallel just proceed? Do you just want the two util classes landed? SparkUtils/TablePathUtils?

your call. lmk what you think.

@garyli1019
Copy link
Member Author

@vinothchandar I agree we should use @umehrot2 RDD approach.

So you can also in parallel just proceed?

Yes, I will change this PR in parallel.

Do you just want the two util classes landed? SparkUtils/TablePathUtils?

Yes, @umehrot2 do you mind making a separate PR for these two classes?

@umehrot2
Copy link
Contributor

@vinothchandar I agree we should use @umehrot2 RDD approach.

So you can also in parallel just proceed?

Yes, I will change this PR in parallel.

Do you just want the two util classes landed? SparkUtils/TablePathUtils?

Yes, @umehrot2 do you mind making a separate PR for these two classes?

@vinothchandar @garyli1019 thank you guys for reviewing the PR. Will look at the comments and start addressing. As for the HudiSparkUtils/TablePathUtils let me create a separate PR so that you can re-use those pieces for your PR and are unblocked. Meanwhile you can copy the code over to unblock coding from your side while I work on the PR. Thanks!

@vinothchandar
Copy link
Member

@garyli1019 should we close this in favor of #1848 ?

@garyli1019 garyli1019 closed this Jul 21, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants