Skip to content

Conversation

@nbalajee
Copy link
Contributor

@nbalajee nbalajee commented Dec 8, 2020

…f a nested record field.

What is the purpose of the pull request

If schema contains nested records, then HoodieAvroUtils rewrite() function copies the record fields as-is, from the oldrecord to the newRecord. If fields of the nested record have evolved, it would result in SchemaCompatibilityException or ArrayIndexOutOfBoundsException.

Brief change log

Modify HoodieAvroUtils rewrite() to rewrite the evolved fields, with new/evolved fields initialized to null.

Verify this pull request

This pull request is already covered by existing tests, such as TestHoodieAvroUtils.
Added testRewriteToEvolvedNestedRecord() and testRewriteToShorterRecord()

Committer checklist

  • [ x] Has a corresponding JIRA in PR title & commit

  • [ x] Commit message is descriptive of the change

  • [ x] CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@codecov-io
Copy link

codecov-io commented Dec 8, 2020

Codecov Report

Merging #2309 (9abc305) into master (0c821fe) will decrease coverage by 0.21%.
The diff coverage is 22.83%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #2309      +/-   ##
============================================
- Coverage     52.43%   52.21%   -0.22%     
- Complexity     2653     2669      +16     
============================================
  Files           332      335       +3     
  Lines         14892    15014     +122     
  Branches       1496     1512      +16     
============================================
+ Hits           7808     7839      +31     
- Misses         6458     6548      +90     
- Partials        626      627       +1     
Flag Coverage Δ Complexity Δ
hudicli 38.83% <ø> (ø) 0.00 <ø> (ø)
hudiclient 100.00% <ø> (ø) 0.00 <ø> (ø)
hudicommon 54.72% <22.22%> (-0.45%) 0.00 <13.00> (ø)
hudihadoopmr 33.52% <100.00%> (+0.23%) 0.00 <0.00> (ø)
huditimelineservice 65.30% <ø> (ø) 0.00 <ø> (ø)
hudiutilities 69.65% <ø> (ø) 0.00 <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ Complexity Δ
.../apache/hudi/common/model/ClusteringOperation.java 0.00% <0.00%> (ø) 0.00 <0.00> (?)
...e/hudi/common/table/log/HoodieFileSliceReader.java 0.00% <0.00%> (ø) 0.00 <0.00> (?)
...di/common/table/timeline/HoodieActiveTimeline.java 69.62% <0.00%> (-4.51%) 41.00 <0.00> (ø)
...che/hudi/common/table/timeline/HoodieTimeline.java 89.13% <0.00%> (-4.06%) 43.00 <0.00> (ø)
...a/org/apache/hudi/common/util/ClusteringUtils.java 89.06% <0.00%> (-1.42%) 18.00 <0.00> (ø)
.../java/org/apache/hudi/common/util/StringUtils.java 66.66% <ø> (ø) 14.00 <0.00> (ø)
...hudi/utilities/schema/FilebasedSchemaProvider.java 82.35% <ø> (ø) 5.00 <0.00> (ø)
...g/apache/hudi/common/model/WriteOperationType.java 53.12% <33.33%> (-2.05%) 2.00 <0.00> (ø)
...ain/java/org/apache/hudi/avro/HoodieAvroUtils.java 52.34% <45.71%> (-1.59%) 45.00 <7.00> (+7.00) ⬇️
.../apache/hudi/common/config/SerializableSchema.java 57.89% <57.89%> (ø) 6.00 <6.00> (?)
... and 6 more

@n3nash
Copy link
Contributor

n3nash commented Dec 9, 2020

@nbalajee Can you please explain why do we need this ? If the latest schema is passed (which is the case for Hudi now) is this still a problem ?
@bvaradar can you please take a look at this one ?

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @nbalajee , i have left some comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the document of Avro, i believe this is the right direction to correct: http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check the schema equivalence first because the old schema may also be a UNION.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add a test case there ? For 2 cases:

  • the nested record has fewer fields than expected
  • the nested record has more fields than expected

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added two test cases.

UNION is predominantly used for optional record - [null, {record}] pattern. In the next step of the recursion, record performs the schema equivalence check. Hence, thought we won't need the equivalence check here. Please let me know if I missed something here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, for UNION i think the check in the nested recursion is valid.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nbalajee : let me know if this is feasible. Is NULL mandatory in any UNION schema? I mean, can there be a UNION schema w/o NULL in it? if yes, this would fail in my understanding.

@vinothchandar
Copy link
Member

@danny0405 awesome! thanks for jumping in :) .

@nbalajee
Copy link
Contributor Author

@nbalajee Can you please explain why do we need this ? If the latest schema is passed (which is the case for Hudi now) is this still a problem ?
@bvaradar can you please take a look at this one ?

@n3nash - Correct. When reading the parquet files, Hudi uses the writer schema (evolved schema with added fields) so that optional fields are automatically populated with null (native schema evolution). For the rewrite(), Hudi use-cases always pass the writerSchema, so we don't run into this issue.

Added advantage of fixing this the correct way is that Hudi will be able to support "external schema evolution". (Read parquet using the reader schema, then rewrite the records using the evolved schema).

@vinothchandar vinothchandar added the area:schema Schema evolution and data types label Dec 15, 2020
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, for UNION i think the check in the nested recursion is valid.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The force type coercion (List)datum does not have any protection logic, thus, the method rewriteEvolvedFields(Object datum, Schema newSchema) works assuming that the datum has compatible schema with newSchema, this implicit contract should be kept by the invoker, i would suggest to add some notion to the java doc to make it more clear. Same with Map type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the datum contained inside the Array/Map is of primitive type, then no additional schema compatibility check is required. If the datum is a Record then we are already checking whether newSchema matches with the record schema, by comparing the hash values.

In other words, similar to the UNION, the nested recursion would take care of the datum contained in the ARRAY or MAP as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is good, although it misses the ARRAY and MAP type check, i think it is okey.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 - Since the differences come from thee RECORD and ARRAY/MAP are containers, testing against the RECORD is sufficient is my thinking as well.

@danny0405
Copy link
Contributor

Thanks for the update @nbalajee , i have left some comments.

@nsivabalan
Copy link
Contributor

@nbalajee : do you think we can get this landed by upcoming release.
@danny0405 : Can you please take care of seeing this through once balaji addresses your comment. we are looking to get this in by upcoming release (will be cutting a release in a weeks time)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assertEquals first operand should be the expected, to avoid confusion, i would suggest to use the assertThat(variable, is(expected)) instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swapped the parameters to fix assertEquals.

@danny0405
Copy link
Contributor

@nbalajee : do you think we can get this landed by upcoming release.
@danny0405 : Can you please take care of seeing this through once balaji addresses your comment. we are looking to get this in by upcoming release (will be cutting a release in a weeks time)

I can, wait for the update of @nbalajee ~

Satish Kotha and others added 11 commits December 28, 2020 07:24
Co-authored-by: Wenning Ding <[email protected]>

- Added support for bulk insert v2 with datasource v2 api in Spark 3.0.0.
…ons that have no incremental changes (apache#2371)

* Incremental Query should work even when there are  partitions that have no incremental changes

Co-authored-by: Sivabalan Narayanan <[email protected]>
…ning tests in test suite framework (apache#2168)

* trigger rebuild

* [HUDI-1156] Remove unused dependencies from HoodieDeltaStreamerWrapper Class (apache#1927)

* Adding support for validating records and long running tests in test sutie framework

* Adding partial validate node

* Fixing spark session initiation in Validate nodes

* Fixing validation

* Adding hive table validation to ValidateDatasetNode

* Rebasing with latest commits from master

* Addressing feedback

* Addressing comments

Co-authored-by: lamber-ken <[email protected]>
Co-authored-by: linshan-ma <[email protected]>
…pache#2275)

* [HUDI-1354] Block updates and replace on file groups in clustering

* [HUDI-1354]  Block updates and replace on file groups in clustering
@danny0405
Copy link
Contributor

@nbalajee You may need to rebase your branch first in order to avoid unnecessary commits.

lw309637554 and others added 3 commits December 29, 2020 11:32
* [HUDI-1350] Support Partition level delete API in HUDI

* [HUDI-1350] Support Partition level delete API in HUDI base InsertOverwriteCommitAction

* [HUDI-1350] Support Partition level delete API in HUDI base InsertOverwriteCommitAction
@nsivabalan
Copy link
Contributor

fyi, we have another patch that fixes schema evolution in hudi. not sure if there are any overlaps though as I haven't looked into this patch.
#2334

@nbalajee
Copy link
Contributor Author

nbalajee commented Jan 4, 2021

fyi, we have another patch that fixes schema evolution in hudi. not sure if there are any overlaps though as I haven't looked into this patch.
#2334

@nsivabalan - #2334 is using HoodieAvroUtils.rewrite() functionality to rewrite the generic records. There is no overlap between the issues being corrected.

@vinothchandar vinothchandar self-assigned this Jan 29, 2021
@vinothchandar
Copy link
Member

@nbalajee @nsivabalan can you please summarize the status of this PR? is it ready to go after rebasing or should we spend more time in the review

@vinothchandar vinothchandar added the priority:critical Production degraded; pipelines stalled label Feb 11, 2021
@vinothchandar vinothchandar assigned n3nash and unassigned bvaradar Mar 14, 2021
@vinothchandar
Copy link
Member

@n3nash @nbalajee @prashantwason @nsivabalan this PR sounds important, but can someone please summarize its state? also this needs a rebase with only the necessary changes.

@danny0405
Copy link
Contributor

@n3nash @nbalajee @prashantwason @nsivabalan this PR sounds important, but can someone please summarize its state? also this needs a rebase with only the necessary changes.

The changes overall looks good from my side, but this PR has to do a rebase because it introduces many conflicts commits from master branch.

@vinothchandar vinothchandar removed the priority:critical Production degraded; pipelines stalled label Mar 16, 2021
@nsivabalan
Copy link
Contributor

@nbalajee : can you rebase and update the PR.

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments for clarification.
Also, a general question on avro schema evolution.

  • I know we can evolve a field from int to long. But can we evolve a field of array[int] to array[long] ?
  • Can a primitive field be evolved to union with null in it?
    In general, depending on the answers to this, does this patch handles whatever is compatible evolution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nbalajee : let me know if this is feasible. Is NULL mandatory in any UNION schema? I mean, can there be a UNION schema w/o NULL in it? if yes, this would fail in my understanding.

mapCopy.put(entry.getKey(), rewriteEvolvedFields(entry.getValue(), newSchema.getValueType()));
}
return mapCopy;
default:
Copy link
Contributor

@nsivabalan nsivabalan May 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am slowly gaining knowledge in schema evolution. might be a noob question. Apart from RECORD datatype, how else other datatypes could evolve. for eg, a field of array datatype in old schema has to be an array in new schema right, and it can never evolve to anything else (in a compatible manner). Incase of RECORD, I understand there could be more fields, and hence we need a deep copy. what I am trying to ask is, for union, array, map data types, can we just fetch old value and add to new record rather than doing a deep copy? Can you help clarify.

}

@Test
public void testRewriteToShorterRecord() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought from the java docs of HoodieAvroUtils.rewriteRecord(GenericRecord oldRecord, Schema newSchema), rewrite can happen from old schema to new schema and not other way round. Can you help me understand why we allow backwards incompatible rewrite here?

@nsivabalan
Copy link
Contributor

@nbalajee : I see that you have lot of extra commits in this patch. Can you fix it and rebase. In the interest of testing it out, I pulled in your changes locally and have put up some draft PR #2982 w/ some minor fixes in addition to your patch. If incase you wanna create a clean PR, might be useful to you.

@nsivabalan
Copy link
Contributor

I verified some of the unknowns.

  • a primitive can be evolved to a union w/ null default.
  • array[int] can be evolved to array[long]
  • map[int] can be evolved to map[ long ]

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made some minor optimization in new PR. do check it out.
#2982

@hudi-bot
Copy link
Collaborator

hudi-bot commented Nov 5, 2021

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@nsivabalan nsivabalan closed this Jan 20, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:schema Schema evolution and data types

Projects

None yet

Development

Successfully merging this pull request may close these issues.