-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-6055] Fix input format for bootstrap tables #8397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Push down instantiattion of table schema resolver
yihua
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good catch.
| // No need trigger schema evolution for count(*)/count(1) operation | ||
| boolean disableSchemaEvolution = requiredColumns.isEmpty() || (requiredColumns.size() == 1 && requiredColumns.get(0).isEmpty()); | ||
| if (!disableSchemaEvolution) { | ||
| if (!internalSchemaOption.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this condition be internalSchemaOption == null since it may not be initialized?
| if (internalSchemaOption.isPresent()) { | ||
| List<String> requiredColumns = getRequireColumn(job); | ||
| // No need trigger schema evolution for count(*)/count(1) operation | ||
| boolean disableSchemaEvolution = requiredColumns.isEmpty() || (requiredColumns.size() == 1 && requiredColumns.get(0).isEmpty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify, do requiredColumns contain the columns from the predicate(s), e.g., count(*) where col1 is not null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. This is existing logic. Still wondering the same question.
| // reading hoodie schema evolution table | ||
| job.setBoolean(HIVE_EVOLUTION_ENABLE, true); | ||
| Path finalPath = ((FileSplit)split).getPath(); | ||
| Path finalPath = ((FileSplit) split).getPath(); | ||
| InternalSchema prunedSchema; | ||
| List<String> requiredColumns = getRequireColumn(job); | ||
| // No need trigger schema evolution for count(*)/count(1) operation | ||
| boolean disableSchemaEvolution = | ||
| requiredColumns.isEmpty() || (requiredColumns.size() == 1 && requiredColumns.get(0).isEmpty()); | ||
| if (!disableSchemaEvolution) { | ||
| prunedSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), requiredColumns); | ||
| InternalSchema querySchema = prunedSchema; | ||
| Long commitTime = Long.valueOf(FSUtils.getCommitTime(finalPath.getName())); | ||
| InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient, false); | ||
| InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchema, true, | ||
| true).mergeSchema(); | ||
| List<Types.Field> fields = mergedInternalSchema.columns(); | ||
| setColumnNameList(job, fields); | ||
| setColumnTypeList(job, fields); | ||
| pushDownFilter(job, querySchema, fileSchema); | ||
| } | ||
| prunedSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), requiredColumns); | ||
| InternalSchema querySchema = prunedSchema; | ||
| Long commitTime = Long.valueOf(FSUtils.getCommitTime(finalPath.getName())); | ||
| InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient, false); | ||
| InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchema, true, true).mergeSchema(); | ||
| List<Types.Field> fields = mergedInternalSchema.columns(); | ||
| setColumnNameList(job, fields); | ||
| setColumnTypeList(job, fields); | ||
| pushDownFilter(job, querySchema, fileSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and should this part be guarded by !internalSchemaOption.isPresent()?
| throw new HoodieException("Failed to parse partition column values from the partition-path:" | ||
| + " likely non-encoded slashes being used in partition column's values. You can try to" | ||
| + " work this around by switching listing mode to eager"); | ||
| LOG.warn(">>> PartitionColumns: " + partitionColumns + " PartitionValues: " + partitionColumnValues); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I assume we still need to fail here instead of printing the warning and letting it return?
| boolean disableSchemaEvolution = requiredColumns.isEmpty() || (requiredColumns.size() == 1 && requiredColumns.get(0).isEmpty()); | ||
| if (!disableSchemaEvolution) { | ||
| if (!internalSchemaOption.isPresent()) { | ||
| internalSchemaOption = new TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still do try .. catch .. here in case the internal schema cannot be read?
| true, | ||
| new NoopCache(), | ||
| false); | ||
| true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I remember we need to fix the lazy listing for Hvie File Index. Should this be in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this should not have been part of this PR. Actually, it doesn't really matter for Hudi connector as it doesn't go through COW input format code. And for Hive connector, we already saw that it was partition loader will instantiate this every call. However, the actual perf issue for Hive connector was fixed due to #7527 (comment)
| public SchemaEvolutionContext(InputSplit split, JobConf job, Option<HoodieTableMetaClient> metaClientOption) { | ||
| this.split = split; | ||
| this.job = job; | ||
| this.metaClient = metaClientOption.isPresent() ? metaClientOption.get() : setUpHoodieTableMetaClient(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now the init of internalSchemaOption has been removed.
hudi/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
Line 91 in 83d4fe1
| if (schemaEvolutionContext.internalSchemaOption.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh ok then there is no need of this PR.
|
@yihua @xiarixiaoyao thanks for reviewing. As mentioned in the comments, the unnecessary instantiation is already removed and we don't need lazy listing in Hive file index impl for now. So, I am going to close the PR. |
Change Logs
Push down instantiation of table schema resolver to the point when it's actually needed. This helps in avoiding some cycles reading commit metadata unless needed.
Impact
None.
Risk level (write none, low medium or high below)
low
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist