-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-5443] Fixing exception trying to read MOR table after NestedSchemaPruning rule has been applied
#7528
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
.../hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
Show resolved
Hide resolved
...lient/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
Show resolved
Hide resolved
| userSchema: Option[StructType], | ||
| globPaths: Seq[Path]) | ||
| extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport { | ||
| case class BaseFileOnlyRelation(override val sqlContext: SQLContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Primary change here is converting the class to be a case class, which in turn entails that all of the ctor parameters would become field values requiring corresponding annotation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason this method is converted to a case-class is to avoid any in-place mutations and instead make updatePrunedDataSchema produce new instance instead
| * Get all PartitionDirectories based on globPaths if specified, otherwise use the table path. | ||
| * Will perform pruning if necessary | ||
| */ | ||
| private def listPartitionDirectories(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Combined this 2 methods into 1
| } | ||
| } | ||
|
|
||
| protected def getColName(f: StructField): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dead code
| extends MergeOnReadSnapshotRelation(sqlContext, optParams, userSchema, Seq(), metaClient) with HoodieIncrementalRelationTrait { | ||
|
|
||
| override type FileSplit = HoodieMergeOnReadFileSplit | ||
| case class MergeOnReadIncrementalRelation(override val sqlContext: SQLContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same changes as other relations
| globPaths: Seq[Path], | ||
| metaClient: HoodieTableMetaClient) | ||
| extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) { | ||
| case class MergeOnReadSnapshotRelation(override val sqlContext: SQLContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same changes as other relations
...park-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
Outdated
Show resolved
Hide resolved
5994fda to
9339277
Compare
9339277 to
636b300
Compare
|
@hudi-bot run azure |
NestedSchemaPruning rule has been applied
|
@hudi-bot run azure |
636b300 to
8012230
Compare
yihua
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexeykudinkin I left a few comments. Besides, is there any test that failed before and is now fixed? If not, could you add such a test to verify the fix?
...nt/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
Outdated
Show resolved
Hide resolved
| sourceExpr | ||
|
|
||
| case (sourceType: StructType, targetType: StructType) => | ||
| val fieldValueExprs = targetType.fields.map { tf => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a subset of nested fields may be taken during the projection, e.g., if the source has a {a.b, a.c, a.d} and the target has a.b, we only keep a.b instead of the whole StructType a. Does this happen or the caller of this function always makes sure the targetStructType is properly constructed to preserve the root-level field instead of a subset of nested fields? Is this a problem for projection, where the parquet and log reader can read files with a schema containing a subset of nested fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a subset of nested fields may be taken during the projection, e.g., if the source has a {a.b, a.c, a.d} and the target has a.b, we only keep a.b instead of the whole StructType a. Does this happen or the caller of this function always makes sure the targetStructType is properly constructed to preserve the root-level field instead of a subset of nested fields?
It does happen. It's actually the reason for this PR -- previously it was only handling nested field projections, but NestedSchemaPruning could produce schemas w/ nested fields being pruned as well. Therefore, we need to make sure we handle this appropriately when reading log-files (by projecting records into the new schema)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Realized that this is actually not the right approach and the problem is elsewhere:
- Problem was that we're simply not reading projected records from the Parquet -- and the reason for that was that in case when non-whitelisted RecordPayload is used -- we will fallback to reading full record, but we still were allowing
NestedSchemaPruningto be applied nevertheless
...spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
Show resolved
Hide resolved
| } | ||
| private def getHadoopConf: Configuration = { | ||
| val conf = hadoopConfBroadcast.value.value | ||
| new Configuration(conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still need the lock for concurrency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't actually think we need the lock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that this might introduce side effects and there's little time before the code freeze of the release to verify the removal of the lock, could you keep this part the same as before? It is not essential to the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's any real explanation for why that lock might be needed
- Underlying broadcast implementation is thread-safe (takes its own lock)
- Hadoop configuration doesn't need a lock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But even more importantly -- there's no real concurrent access to this method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This lock is from the beginning of the implementation of HoodieMergeOnReadRDD and it is likely due to the broadcast configuration issue in Spark 2.4 (#1848 (comment)). @garyli1019 could you confirm if that's the case? I'd rather keep it and keep the code safe from potentially breaking the MOR queries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would it break MOR queries though?
See whichever way we turn this around this lock doesn't really make sense:
- There's no concurrency accessing this method, each task gets its own copy of the object
- Only concurrency is w/in accessing the Broadcast shared cache which is hedged by its own lock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Synced up offline. @alexeykudinkin and I are aligned in that we need to clean up the legacy code and remove any unnecessary code path. For this particular case, there could be a problem regarding the modification of the hadoop conf returned by this function (check: HoodieMergeOnReadRDD#compute -> LogFileIterator.logRecords -> scanLog -> FSUtils.getFs -> prepareHadoopConf -> conf.set). That's likely why the lock is put in from the beginning. So the change is going to be reverted in this PR and we'll revisit this in a separate PR to be merged after 0.13.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yihua Hi Ethan, the hadoop configuration issue that I can recall was related to serialization during the broadcast. The hadoop configuration was not serializable. The initial implementation of HoodieMergeOnReadRDD somehow new a Configuration object or use a serializableConfiguration to solve this issue.
...spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
Show resolved
Hide resolved
...udi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala
Show resolved
Hide resolved
8012230 to
9267c53
Compare
Replaced pruned data-schema updating sequence to create a new relation instead of mutating existing one (that might be cached)
…om it for both Snapshot and Incremental relations; Fixed signature of `updatePrunedDataSchema` to rely on overrrident type decl
…dSchemaPruning` rule could be applied to MOR table; Tidying up
Tidying up
…mplement lazy semantic for every projection
74f9bc4 to
8f78916
Compare
yihua
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…hemaPruning` rule has been applied (apache#7528) Addresses issue w/ MOR tables where after applying `NestedSchemaPruning` optimization, it would fail to read in case delta-log merging will need to happen
…hemaPruning` rule has been applied (apache#7528) Addresses issue w/ MOR tables where after applying `NestedSchemaPruning` optimization, it would fail to read in case delta-log merging will need to happen

Change Logs
Currently MOR tables w/
NestedSchemaPruning(NSP) rule successfully applied (ie being able to prune nested schema) would fail to read in case any log-file merging would occur.HoodieClientTestHarnessto properly initSparkSessionso thatHoodieSparkSessionExtensionsare injected properlyImpact
Addresses issue w/ MOR tables where after applying NSP optimization, it would fail to read in case delta-log merging will nee d to happen
Risk level (write none, low medium or high below)
Low
Documentation Update
N/A
Contributor's checklist