-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Core: reassign the partition field IDs and reuse any existing IDs #2284
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| // get a fresh spec to ensure the spec ID is set to the new default | ||
| builder.add(freshSpec(newDefaultSpecId, schema, newPartitionSpec)); | ||
| builder.add(freshSpec(newDefaultSpecId, schema, newPartitionSpec, formatVersion, | ||
| specs, new AtomicInteger(lastAssignedPartitionId))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem from my comment that needs to be fixed does not affect this method. The logic in BaseUpdatePartitionSpec already covers spec evolution. The problem I was referring to is that when a table replacement is created, all of the metadata is new and can possibly conflict with the existing table's metadata.
For example, if I have a table partitioned by 1000: ts_day=day(ts) and I replace it with one partitioned by 1000: id_bucket=bucket(16, id) then the two partition IDs collide. The buildReplacement method needs to handle reassigning IDs for partition specs just like it does for schemas.
The specs that are passed into updatePartitionSpec are already based on the current spec and so the IDs are assigned correctly and are consistent with all previous table specs. There is no "bug" to fix in this method. However, there is an improvement that we can make that is similar. Currently, if you add a partition field that was previously part of the table but is not any longer, it is assigned a completely new ID.
For example, if I have a table partitioned by 1000: ts_day=day(ts) and then update it by dropping the ts_day partition, it would have one spec with ts_day and one that is unpartitioned. Then if I go back and add day(ts) again, it would create 1001: ts_day=day(ts). That is perfectly fine and consistent, but it would be better to go back and reuse ID 1000 for the new field.
We can detect that fields should be un-deleted rather than added by going back to through old partition specs for the table. That will recover the old ID, but there is an additional issue to solve: when un-deleting a partition field in v1, we need to make sure that the un-deleted field replaces the void field that was added when the field was deleted. For v2, it would also be nice to use the original partition field order but that's not required.
I think that this improvement is a good thing to do, but should not be part of this change because this PR is needed to fix the blocker bug with buildReplacement. Also, I think that the improvement for spec updates should be built into BaseUpdatePartitionSpec rather than done in table metadata.
Could you separate this into another PR, @jun-he?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way, I think the examples above provide good test cases to include. I'd also want to include one where the deleted field is not at the end of the spec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the details. It makes sense and I will update the PR accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue I updated the PR accordingly.
Seems the logic of reassignPartitionIds can only work for V2 table.
For V1 table, if a table has a schema with a single field ts and is partitioned by 1000: ts_day=day(ts) and then is replaced with a new schema with a single field id and is partitioned by 1000: id_bucket=bucket(16, id), it cannot reassign spec including 1000: ts_day=null and 1001: id_bucket=bucket(16, id) because the new schema does not contain ts any more.
Therefore, I kept reassignPartitionIds to be no op in the PR, which is the current behavior for V1 table.
How do you feel about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can't drop a column that is referenced by a partition spec, so I don't think that we would get in that situation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue ah, I see. Thanks for the comment. If it cannot happen when dropping a column, it won't be a problem.
|
|
||
| private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec partitionSpec) { | ||
| private static PartitionSpec freshSpec(int specId, Schema schema, PartitionSpec partitionSpec, int formatVersion, | ||
| List<PartitionSpec> specs, AtomicInteger lastPartitionId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noted above that I don't think that we should change the behavior of freshSpec for updatePartitionSpec. Instead, what about creating a reassignPartitionIds method that gets called after freshSpec in buildReplacement? That would be similar to how schemas are handled, where the existing schema is passed to seed the IDs for the new table schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comments. Updated it accordingly.
| PartitionField::fieldId, | ||
| (n1, n2) -> n2)); | ||
|
|
||
| for (PartitionField field : partitionSpec.fields()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic looks mostly correct for a reassignPartitionIds method, but I would separate the concern of updating the source field ID references (what freshSpec originally did) from reassigning partition IDs to be consistent with other specs in the table. We should use freshSpec to reassign the source IDs to the ones used in the current table schema, and then we should call reassignPartitionIds to make them match existing partition specs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👌 updated it.
| TableMetadata updated = metadata.updatePartitionSpec(spec); | ||
| Assert.assertEquals(spec, updated.spec()); | ||
|
|
||
| spec = PartitionSpec.builderFor(schema).withSpecId(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This violates v1 requirements because the partition field ID for bucket(8, x) is reassigned. It is 1000 in the first spec and 1001 in the second spec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👌 , updated the test accordingly.
e53e5ee to
7f45dfc
Compare
| return specBuilder.build(); | ||
| } else { | ||
| // noop for v1 table | ||
| return partitionSpec; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the algorithm for v1 tables should be to find out what matches previous specs and re-use those. Then if there is a new field it is moved to the end, and if there is an old field that is unmatched it gets replaced with a void transform. Otherwise, this will create partition field ID conflicts in v1 tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned in #2284 (comment), the issue to replace old one using void transform is that the table schema might be changed and cannot create void transform if the old source field is removed.
@rdblue do you think we should use a dummy source field in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jun-he, you can't drop a column if it is referenced by the partition spec. I don't think we could get into the case you mentioned because of that limitation. When this method runs, the column was referenced by the previous version of the field. So we know that the column exists. After this method runs, the void partition should prevent the column from being removed. Am I missing a case where the column can be dropped but still referenced by a void transform?
|
|
||
| for (PartitionField field : partitionSpec.fields()) { | ||
| // reassign the partition field ids | ||
| Integer fieldId = transformToFieldId.computeIfAbsent( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this Integer instead of int?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
| snapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis)); | ||
| } | ||
|
|
||
| private PartitionSpec reassignPartitionIds(PartitionSpec partitionSpec, AtomicInteger lastPartitionId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: this is private, but I think it would be a bit better to use TypeUtil.NextID just because that removes confusion about whether incrementAndGet should be used or getAndIncrement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
| } | ||
|
|
||
| @Test | ||
| public void testBuildReplacementForV2Table() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
| PartitionSpec expected = PartitionSpec.builderFor(updated.schema()).withSpecId(1) | ||
| .add(3, 1000, "z_bucket", "bucket[8]") | ||
| .add(1, 1001, "x", "identity") | ||
| .build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this needs to be this:
PartitionSpec expected = PartitionSpec.builderFor(updated.schema()).withSpecId(1)
.add(1, 1000, "x", "identity")
.add(2, 1001, "y", "void")
.add(3, 1002, "z_bucket", "bucket[8]")
.build();We should be able to create that by updating the existing spec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is for v2 table, which does not need void transform partition field for removed column.
This .add(2, 1001, "y", "void") seems not working with the error fo Cannot find source column: 2 as no field y in the updated schema.
This also means v1 table cannot use void to fill the field id gap in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test name is testBuildReplacementForV1Table, which is why I thought that the changes should be only v1 changes.
…alent fields when refreshing spec
7f45dfc to
ad88d6c
Compare
|
I am going to continue this task and should be shipped together with the release 0.12. |
|
I'm going to close this because I made the updates in #2906. |
Reassign the partition field IDs and reuse any existing IDs for equivalent fields when refreshing spec.