-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-69] Support Spark Datasource for MOR table - RDD approach #1848
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1848 +/- ##
============================================
+ Coverage 62.32% 63.33% +1.01%
- Complexity 3508 3522 +14
============================================
Files 420 410 -10
Lines 17817 17400 -417
Branches 1754 1729 -25
============================================
- Hits 11105 11021 -84
+ Misses 5959 5630 -329
+ Partials 753 749 -4
Flags with carried forward coverage won't be shown. Click here to find out more.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PrunedFilteredScan will change the behavior of ParquetRecordReader inside ParquetFileFormat even we are not using the vectorized reader. Still trying to figure out why... I will follow up with PrunedFilteredScan in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we file a JIRA for this. and is it possible to target this for 0.6.0 as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
@vinothchandar @umehrot2 Ready for review. Thanks! |
vinothchandar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a first pass. Looks reasonable, but we need some changes before we can land this.
Thanks for the persistent efforts, @garyli1019 !
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HadoopSerializableConfiguration.java
Outdated
Show resolved
Hide resolved
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
Outdated
Show resolved
Hide resolved
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
Outdated
Show resolved
Hide resolved
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just make a new datasource level config for this. and translate. mixing raw InputFormat level configs here, feels a bit problematic in terms of long term maitenance>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Maybe something like SNAPSHOT_READ_STRATEGY? So we can control the logic for merge unmerge mergeWithBootstrap e.t.c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added MERGE_ON_READ_PAYLOAD_KEY and MERGE_ON_READ_ORDERING_KEY. Then we use the payload to do all the merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
payload is actually stored inside hoodie.properties . let me take a stab at how to handle this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems to indicate that we will keep scanning the remaining entries in the log and hand them out if dataFileIterator runs out. We need to be careful about how it interplays with split generation. specifically, only works if the each base file has only 1 split..
HudiMergeOnReadFileSplit(partitionedFile, logPaths, latestCommit,
metaClient.getBasePath, maxCompactionMemoryInBytes, skipMerge)
here partitionedFile has to be a single file and not an input Split. otherwise we will face an issue that the log entry belongs to a different split and the ultimate query will have duplicates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this HudiMergeOnReadFileSplit is based on we pack one baseFile into one partitionedFile during the buildFileIndex. I believe Spark won't split this secretly somewhere...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FileFormat could break up a file into splits I think. We can wait for @umehrot2 also to chime in here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One partitioned file will go into one split if I understand this correctly https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L355
That would be difficult to handle if Spark split one parquet into two splits somewhere else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vinothchandar @garyli1019 Spark's FileFormat uses can split the parquet files up if there are multiple row groups in the file. In Spark's implementation one PartitionedFile is not a complete file, that is why you can see the start position and length in that.
But in both this implementation and in bootstrap case we are packaging complete file as a split, to avoid the complexity of dealing with partial splits.
hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the remove needed. this map is often spillable.. we should just make sure the remove does not incur additional I/O or soemethng
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is tricky. We need something like skipableIterator.
- The
hudiLogRecordsare immutable. So I am making a copy of thekeySet()to handle the remove and iterator it through to read back fromhudiLogRecords. - It's possible that the parquet has duplicate keys and we need to merge with a single
logRecordmultiple times. This is the behavior of how the compaction handles duplicate keys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment here about double checking if this is actually ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this implicitly assumes OvewriteWithLatestPayload ? can we just convert the parquet row as well to Avro and then perform the merge actually calling the right API. HoodieRecordPayload#combineAndGetUpdateValue() ? This is a correctness issue we need to resolve in the PR..
ideally, adding a test case as well to go along with this would be good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
|
@umehrot2 can you also please make a quick second pass. |
a4c7069 to
41d1d05
Compare
garyli1019
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed the comments and added support for custom payload. Added tests to demonstrate delete operation with OverwriteWithLatestAvroPayload
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HadoopSerializableConfiguration.java
Outdated
Show resolved
Hide resolved
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added MERGE_ON_READ_PAYLOAD_KEY and MERGE_ON_READ_ORDERING_KEY. Then we use the payload to do all the merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
f38c76c to
5b051e5
Compare
@vinothchandar @garyli1019 Sorry for the late response. Plan to review it later today. |
umehrot2
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a look and have some high level comments. Will take another more thorough line by line pass of the code and unit tests once we have addressed the main points.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we call it something like MergeOnReadSnapshotRelation ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of doing this, you can just do:
val tableSchema = schemaUtil.getTableAvroSchema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good to know. thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using prefix Hoodie for all the newly defined classes, shouldn't we be using Hudi. Isn't that where the community is headed towards ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This problem is tricky. Both sides make sense to me. It seems like impossible to completely switch from hoodie to hudi everywhere. Should we define a standard for the naming convension? Like class name -> hoodie, package name -> hudi @vinothchandar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly something like HoodieLogFileFormat might make sense to do in future, as it will clearly extract out the Log files reading logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should try to support filter push-down. Again for the more straight forward scenarios which I mentioned in my other comment, it should be possible to just pass down the user filters in the reader.
However, for Log file merging scenario we may have to take care of the following scenario:
- Reading base file filtered out say
Row Xbecause of filter push-down. Row Xhad been updated in the log file and has an entry.- While merging we need some way to tell that
Row Xshould be filtered out from log file as well, otherwise we may end up still returning it in the result, because based on the merging logic I see that any remaining rows in log file which did not have corresponding key in base file are just appended and returned in the result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my understanding, what is this use-case where we want to return un-merged rows ? Do we do something similar for MOR queries through input format where we want to return un-merged rows ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes we have this option for Hive https://github.com/apache/hudi/blob/master/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
The performance will be better without merging. We can avoid the type conversion at least Row -> Avro -> Merge -> Avro -> Row
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need explicit casting ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I followed the HadoopRDD implementation here https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L120
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vinothchandar This seems like something we can consider using at other places in Hudi code like AvroConversionHelper to convert Avro Records to Rows, instead of maintaining the conversion code in-house.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree here. Our AvroConversionHelper is handling Row, which is not an extension of InternalRow. If we don't need Row specifically, I think we can adapt to the Spark Internal serializer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
Instead of looping over
baseFileIteratorand performing the check whether that key exists inlogRecords, will it be more efficient to do it the other way round. Loop overlogRecordsand perform merge. In the end append all the remaining base file rows. -
Also is this a good practice to perform the actual fetching in
hasNextfunction, instead ofnext?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually on further thought the first point may not be possible, since if we iterator over log records it will be difficult to find the corresponding base parquet record using record key (for merging).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
The log scanner scans all the log files in one batch and handled the merging internally. The output is a hashmap we can use directly. This
logRecordsIteratoris just looping through the hashmap and doesn't load the row one by one like thedaseFileIterator. -
This is a little bit tricky. If the hasNext return true, but next() doesn't return a value, Spark will throw an exception. In our logic, we don't know
hasNextwill be true of false until we find the qualified record to read. For example, 100 records in base file and 100 delete records in the log file. We will read 0 row and hasNext should return false in the first call, but we have iterated through the whole base file already. There is a test case for this example.
garyli1019
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@umehrot2 Thanks for reviewing! I clarified some questions in the comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This problem is tricky. Both sides make sense to me. It seems like impossible to completely switch from hoodie to hudi everywhere. Should we define a standard for the naming convension? Like class name -> hoodie, package name -> hudi @vinothchandar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I followed the HadoopRDD implementation here https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L120
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes we have this option for Hive https://github.com/apache/hudi/blob/master/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
The performance will be better without merging. We can avoid the type conversion at least Row -> Avro -> Merge -> Avro -> Row
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree here. Our AvroConversionHelper is handling Row, which is not an extension of InternalRow. If we don't need Row specifically, I think we can adapt to the Spark Internal serializer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
The log scanner scans all the log files in one batch and handled the merging internally. The output is a hashmap we can use directly. This
logRecordsIteratoris just looping through the hashmap and doesn't load the row one by one like thedaseFileIterator. -
This is a little bit tricky. If the hasNext return true, but next() doesn't return a value, Spark will throw an exception. In our logic, we don't know
hasNextwill be true of false until we find the qualified record to read. For example, 100 records in base file and 100 delete records in the log file. We will read 0 row and hasNext should return false in the first call, but we have iterated through the whole base file already. There is a test case for this example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good to know. thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I pushdown nothing and pass the full schema as user requested schema, with simply changing from TableScan() to PrunedFilteredScan, the behavior of the parquet reader was changed and not reading the correct schema. I need to dig deeper here.
Let's focus on making the basic functionality work in this PR. I will figure this out with a follow-up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing the filter for log file probably ok because Spark will apply the filter again after the pushdown filter. Description https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L268
|
Added support for |
d8ef9d3 to
4b05f0f
Compare
|
@garyli1019 : I took this patch and ran it in EMR (Spark-2.4.5-amzn-0). I got the following exceptions when loading S3 dataset. I am using hudi-spark-bundle in the spark session. Setting default log level to "WARN". Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_252) scala> val dfh = spark.read.format("hudi").load("s3a://hudi.streaming.perf/orders_stream_hudi_mor_4//") scala> Have you seen this issue before ? |
|
@bvaradar Thanks for trying this out. |
|
@bvaradar I tested on Spark 2.4.0 cdh release with a small dataset, and found a broadcast configuration issue. Pushed a new commit with the fix. Now this work fine on my cluster. I will test a larger dataset tomorrow. |
|
Tested on 100GB MOR table. A few partitions have 100% duplicate upsert log file, the other has parquet files only. |
|
@garyli1019 thanks for testing this! Will review again. can we resolve the addressed comments in this PR, and also do a small summary on what the follow up work here is? |
@vinothchandar For 0.6.0 release, the only one left is incremental pulling. I am currently working on it and will probably get it done by tomorrow. Maybe we can wait until tomorrow to review the whole thing.
Yes, will let the |
|
@garyli1019 mind rebasing again please? I think you had some conflicts with #1752 |
a053967 to
fc5425d
Compare
|
@bvaradar @garyli1019 I laid out the data from the test output here https://gist.github.com/vinothchandar/28cee665c36f806b562efc88821c9061 if you notice, the compaction here succeeded I can see marker files. (but why though, should nt they be deleted if the commit is successful, guess that a question for me) even. but no new base files. seems like a legit issue to me. cannot imagine how it can happen though |
|
rerun tests |
fc5425d to
dcd2663
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vinothchandar Looks like some unrelated change was added during the rebase. Maybe this is related to the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, this was intended based on previous comments to keep the consistency with others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was something that i should have caught in the earlier pr . I renamed it here, thinking we will be landing this quickly. oh well. lets just get the test to pass
|
@bvaradar locally, I can see all the 12 files generated as expected. I cannot imagine, why this can happen in ci alone. |
a9533a0 to
f70258d
Compare
|
@garyli1019 I am afraid this has something to do with the changes we for May be the path filter getting set, but not being removed or something? I can now see all 12 files present, as expected. but spark.read.parquet() picks up just 6, the latests parquet files |
f70258d to
52a4532
Compare
@garyli1019 this is failing now. See my last fix. just unsetting the path filter after resolving the relation seems to help get over the issue. So the root issue there was the path filter still kicking in for |
@vinothchandar agree. I can see the InMemoryFileIndex warning on every test following the SparkStreaming test, but don't see it if I run some COW test alone. I will try to change the test set up to clean the spark context properly. |
- This PR implements Spark Datasource for MOR table in the RDD approach. - Implemented SnapshotRelation - Implemented HudiMergeOnReadRDD - Implemented separate Iterator to handle merge and unmerge record reader. - Added TestMORDataSource to verify this feature.
| options = optParams) | ||
| .resolveRelation() | ||
|
|
||
| sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bvaradar keeping this code actually. given now, we are adding support for MOR snapshot query, if we don't unset this, then if you do a RO query and then a snapshot query, then this will filter out all except latest base files. which will be problematic.
cc @garyli1019 does this make sense? Let me see if I can add a test case for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes make sense. We unset this before the incremental query. When we only have two data source query type, it's fine to unset before running incremental query, but right now we have to unset this here. Still wondering why the local build is able to pass though...
- We can now revert the change made in DefaultSource
|
@garyli1019 I wrote a test for this. Seems like this is actually not a problem. So reverted the unset for now. Please check my last commit. now, if CI passes this timee we can land (fingers crossed) |
|
@garyli1019 Thanks for being so patient, and amazing and persevering ! this is now landed.! congrats! |
I am seeing this exception with latest Hudi, how do we get it resolved? |
I see Hudi uses spark-sql_2.11-2.4.4.jar and EMR uses /usr/lib/spark/jars/spark-sql_2.11-2.4.5-amzn-0.jar |
|
@luffyd Thanks for reporting. I created a ticket to track this: https://issues.apache.org/jira/browse/HUDI-1270 |
What is the purpose of the pull request
This PR implements Spark Datasource for MOR table in the RDD approach.
ParquetFileFormat approach PR: #1722
Brief change log
Verify this pull request
This change added tests and can be verified as follows:
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.