-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-426] Bootstrap datasource integration #1702
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-426] Bootstrap datasource integration #1702
Conversation
hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRelation.scala
Outdated
Show resolved
Hide resolved
garyli1019
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @umehrot2 , very clean work 👍 ! I walked through this PR and found some common places we can share.
- Path filtering.
- User input paths handling and blob pattern.
- Schema provider.
I have a few questions.
How should we define the user interface?
Soon, we will have Bootstrap view, read optimized view, snapshot(realtime) view, incremental view. I am wondering we should unified the query interface and handle all the file formats internally. How about this:
Snapshot view: Bootstrap files + non-hudi files + hudi files + hudi log
Read optimized: Bootstrap files + non-hudi files + hudi files
Incremental: incremental view on top of snapshot
How should we split the filegroups?
Right now we already have 4 different filegroups. Once we add ORC support, there will be more. One of the cleanest ways I could find is to read each filegroup into RDD independently then union them together. In the current version of this PR, we handle regular parquet in HudiBootstrapRDD. The two disadvantages I could see:
- After we add ORC support, the complexity of this RDD would increase if we handle the ORC reading here too.
- IIUC, we didn't take the full advantage of the vectorized reader by using
ColumnBatchdirectly. Merging probably requires reading row by row, but for regular parquet files, we can use the default parquet reader.
If we can find a way to efficiently listing files in the driver, I think we can separate the bootstrap files from regular parquet and only use the BootstrapRDD to handle the files that need to be merged. Happy to discuss more here.
|
|
||
| val rows = fileIterator.flatMap(_ match { | ||
| case r: InternalRow => Seq(r) | ||
| case b: ColumnarBatch => b.rowIterator().asScala |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use vectorized reader this way, does it still have a huge performance boost?
From my understanding, the regular reader iterator will read the whole row as UnsafeRow then do the column pruning before load it into memory. The vectorized reader will do the column pruning and loading data in one step. So theoretically vectorized reader would still be faster even we read it as InternalRow
The description I found from Spark code
This class can either return InternalRows or ColumnarBatches. With whole stage codegen enabled, this class returns ColumnarBatches which offers significant performance gains.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per my understanding column pruning is independent of vectorized reader. vectorized reader will basically read a batch of rows into a columnar batch and that is what will happen here as well. However, the only difference is that we are not passing it as a columnar batch all the way down as a batch. However, even if I use regular parquet reader at some point it must be converting the columnar batch to rows I guess. Right now I am not fully sure whether I am able to 100% use all the benefits of vectorized reading with this method, but atleast it reads the data as a batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do some more research on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably have to use rowIterator since we will need to merge on row level anyway, same for MOR table too. Agree that Spark will convert ColumnBatch to row at some point and it is very difficult to locate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For MOR table, I have some ideas to speed things up by pre-reading the delete/rollback blocks and simply "skip" rows as long as OverwritewithLatestPayload is used.. If the user does specify a merge function, then its hard to get away from.. we can take this discussion in a separate forum.
hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
Outdated
Show resolved
Hide resolved
hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
Outdated
Show resolved
Hide resolved
hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRDD.scala
Outdated
Show resolved
Hide resolved
hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRDD.scala
Outdated
Show resolved
Hide resolved
@vinothchandar yes it does. I had put some stuff in just for ease of reviewing becuase this utilizes some of the core changes that @bvaradar has done. If that is creating confusion I can get rid of it. |
|
@garyli1019 thank you for your inputs. Sorry, I had been busy with oncall and other projects. Let me try to catch up and process your comments. |
Thanks @garyli1019 for your review and bringing some interesting points. Yes, I think the pieces you mentioned can be used by you later for the MOR datasource work. Regarding the user interface for the query your proposal makes sense to me in general. We can may be have it flushed out in more detail once our PRs are merged and happy to collaborate on that. Regarding your suggestion about using
|
hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRDD.scala
Outdated
Show resolved
Hide resolved
| className = "parquet", | ||
| options = parameters) | ||
| .resolveRelation() | ||
| val readPathsStr = parameters.get(DataSourceReadOptions.READ_PATHS_OPT_KEY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these additional paths on top of the path? Any example of the use cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These additional paths are being used in the Incremental query code to make it work for bootstrapped tables. I need to pass a list of bootstrapped files to read, and that is why had to add support for reading from multiple paths. spark.read.parquet already has that kind of support and is being used in incremental relation already to read a list of files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the bootstrap.base.path is now in hoodie.properties. Should can we make this transparent for the user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well right now I added it only for our internal logic to support incremental query on bootstrapped tables.
Would you want customers to use this otherwise as well, to be able to provide multiple read paths for querying ? Is that the ask here ?
|
|
||
| val rows = fileIterator.flatMap(_ match { | ||
| case r: InternalRow => Seq(r) | ||
| case b: ColumnarBatch => b.rowIterator().asScala |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably have to use rowIterator since we will need to merge on row level anyway, same for MOR table too. Agree that Spark will convert ColumnBatch to row at some point and it is very difficult to locate.
vinothchandar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@umehrot2 Some of these are very good optimizations in the general sense as well.
hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRelation.scala
Outdated
Show resolved
Hide resolved
hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRDD.scala
Outdated
Show resolved
Hide resolved
| } | ||
| } | ||
|
|
||
| def mergeInternalRow(skeletonRow: InternalRow, dataRow: InternalRow): InternalRow = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on avoiding the merge cost, my understanding is - its hard for this case where you need to actually merge these two values, re-order etc.
|
|
||
| val rows = fileIterator.flatMap(_ match { | ||
| case r: InternalRow => Seq(r) | ||
| case b: ColumnarBatch => b.rowIterator().asScala |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For MOR table, I have some ideas to speed things up by pre-reading the delete/rollback blocks and simply "skip" rows as long as OverwritewithLatestPayload is used.. If the user does specify a merge function, then its hard to get away from.. we can take this discussion in a separate forum.
2af6913 to
c8295f2
Compare
I guess one of us will have to re-base. While most of the work seems isolated between the two PRs, but some files are common and common code areas have been touched. I am fine with doing further re-base if that PR gets in first. |
c8295f2 to
9d21da8
Compare
|
@vinothchandar the tests are passing, so its ready for review from my side. |
313385d to
08e8481
Compare
vinothchandar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few high level comments. Took a pass at the code, lgtm high level.
Doing a in-depth review, while we hash out the high level comments
| className = "parquet", | ||
| options = parameters) | ||
| .resolveRelation() | ||
| val readPathsStr = parameters.get(DataSourceReadOptions.READ_PATHS_OPT_KEY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the bootstrap.base.path is now in hoodie.properties. Should can we make this transparent for the user?
hudi-spark/src/main/scala/org/apache/hudi/HudiBootstrapRDD.scala
Outdated
Show resolved
Hide resolved
hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
Outdated
Show resolved
Hide resolved
08e8481 to
4fcd7fe
Compare
4fcd7fe to
923a678
Compare
|
@umehrot2 I rebased this after landing @garyli1019 's PR. Please take a look at |
|
@umehrot2 some tests are failing . looking at them later today. Before we head into the weekend, is this PR ready from your perspective. if so, I will take care of making the final changes and land. |
@vinothchandar the rebase has some issues. With the introduction of Spark datasource support for real time queries, we need to handle the bootstrap case there. For bootstrapped tables, real time queries are still not supported. Only read optimized queries will work for MOR case with bootstrapped tables for now. I will fix this, and hopefully that should fix atleast the unit test failures. |
7fe1bfa to
3f7ecde
Compare
|
@vinothchandar I fixed the rebase issue, and resolved the |
hi @umehrot2 , the datasource test will initialize spark context before each run. If the previous run didn't close the spark properly, this error will come out. See 4f74a84#diff-b9deb8bdc09b0440cafdf6354fe9068dR104 |
hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
Outdated
Show resolved
Hide resolved
3f7ecde to
caa597a
Compare
caa597a to
952a499
Compare
|
@vinothchandar the unit tests issues are resolved now. But the integration tests are behaving crazy. They passed the last time, and failed now even though I didn't make any code change. They are getting stuck for some reason. I think you mentioned about this issue to me. |
|
The integration test fails sometimes for no reason. I have been seeing this for a few times. Maybe rerun will fix if lucky. |
|
@umehrot2 : Thanks for the update. Yeah, the integration test flakiness is a know issue and the logs shows the same pattern. Let me do one pass of it along with other bootstrap PRs from @zhedoubushishi and land them. If there are any minor review comments, I will update the PRs myself to speed up landing. |
|
@umehrot2 : Can you confirm if all review comments are resolved and the PR is ready otherwise. |
bvaradar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome work @umehrot2. Looks good overall. I have addressed the conflicts. Will land this tomorrow after the tests finishes.
| Boolean.parseBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString())); | ||
| StructType sparkSchema = converter.convert(parquetSchema); | ||
| String tableName = writeConfig.getTableName(); | ||
| String structName = tableName + "_record"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@umehrot2 : ITTestBootstrapCommand is failing with the below exception. Adding a sanitization API to remove illegal characters from avro field names
Exception in thread "main" org.apache.avro.SchemaParseException: Illegal character in: test-table_record
at org.apache.avro.Schema.validateName(Schema.java:1151)
at org.apache.avro.Schema.access$200(Schema.java:81)
at org.apache.avro.Schema$Name.<init>(Schema.java:489)
at org.apache.avro.Schema.createRecord(Schema.java:161)
at org.apache.avro.SchemaBuilder$RecordBuilder.fields(SchemaBuilder.java:1732)
at org.apache.spark.sql.avro.SchemaConverters$.toAvroType(SchemaConverters.scala:173)
at org.apache.spark.sql.avro.SchemaConverters.toAvroType(SchemaConverters.scala)
at org.apache.hudi.client.bootstrap.BootstrapSchemaProvider.getBootstrapSourceSchema(BootstrapSchemaProvider.java:97)
at org.apache.hudi.client.bootstrap.BootstrapSchemaProvider.getBootstrapSchema(BootstrapSchemaProvider.java:66)
at org.apache.hudi.table.action.bootstrap.BootstrapCommitActionExecutor.listAndProcessSourcePartitions(BootstrapCommitActionExecutor.java:288)
What is the purpose of the pull request
This PR consolidates changes related to Hudi data source and hive sync integration.
Brief change log
(for example:)
Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.