-
Notifications
You must be signed in to change notification settings - Fork 3k
Add persistent IDs to partition fields #845
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@rdblue can you please take a look? Thanks. |
|
Hi @rdblue, can you please take a look? Thanks. |
chenjunjiedada
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jun-he! This addresses my open issue for adding partition evolution.
Jut left some comments for you.
| private transient volatile ListMultimap<Integer, PartitionField> fieldsBySourceId = null; | ||
| private transient volatile Class<?>[] lazyJavaClasses = null; | ||
| private transient volatile List<PartitionField> fieldList = null; | ||
| private final int lastAssignedFieldId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to use TypeUtil::NextID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is tracking something different. Here, this is the highest ID assigned to any partition, so that the next ID assigned will be unique.
| PartitionField(int sourceId, String name, Transform<?, ?> transform) { | ||
| PartitionField(int sourceId, int fieldId, String name, Transform<?, ?> transform) { | ||
| this.sourceId = sourceId; | ||
| this.fieldId = fieldId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check fieldId is larger than 1000?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. That is a convention that we use, but not strictly required by the spec.
|
|
||
| builder.add(sourceId, name, transform); | ||
| // partition field ids are missing in old PartitionSpec, they always auto-increment from PARTITION_DATA_ID_START | ||
| if (!hasFieldId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about the forward compatibility? Is it possible that an old reader reads the new spec? then it still parses the new spec field id start from 1000?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For forward-compatibility, I think that this should detect breaking changes to specs and throw an exception.
If IDs are removed by an older writer, then the IDs will be reassigned. That means that IDs must be assigned starting at 1000 and should have no gaps. If there are IDs, this should validate that assumption by checking that the field actually has the ID that is expected.
We should make a similar change on the write path: for each field, check that it's field ID is what would be assigned if it were removed by an older writer. That will prevent newer writers from creating specs that will break.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of making these changes here, I think that this should be verified in TableMetadata. That would accomplish the same thing, but make it easier to check the table version.
| if (elements.hasNext() && elements.next().has(FIELD_ID)) { | ||
| hasFieldId = true; | ||
| } | ||
| elements = json.elements(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be easier to follow and would result in a better error message if we put this logic inside the elements loop.
How about adding a counter for field IDs that are present and after the loop throwing an exception if the counter is not equal to the number of fields? Then each field would be handled independently (using has(FIELD_ID)).
I like the idea of a check here that states there were missing field IDs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👌
|
I'm only about half-way done reviewing this, but I wanted to capture some thoughts about forward-compatibility that was raised by @chenjunjiedada. If there are already multiple partition specs, then the IDs may be reused and can even conflict. This isn't something we can change because manifest files embed the field IDs in their schemas. That means assignment when there are no IDs must be from 1000 and should be independent across different partition specs. If an older version writes to the table, then it may remove any assigned partition IDs. That means that for any format v1 table, we must remain compatible with the current assignment strategy. That way, IDs can be removed by an old writer and will be the same when they are reassigned. This also means that evolution is limited in v1 tables. To ensure that IDs can be reassigned correctly if they are removed, partition fields cannot be dropped or reordered in any way. Otherwise, reassignment would be incorrect. That means no removing partition fields, no reordering partition fields, and no adding partition fields unless they are added at the end of the spec. We will be able to make more evolution changes when we can guarantee that all partition fields have IDs that won't be removed. We'll make the IDs a requirement in v2 tables. |
| * When committing, these changes will be applied to the current table metadata. Commit conflicts | ||
| * will not be resolved and will result in a {@link CommitFailedException}. | ||
| */ | ||
| public interface UpdatePartitionSpec extends PendingUpdate<PartitionSpec> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove the new interface and methods from this PR?
I don't think this is needed for this PR, and I'd like to minimize the number of changes. In addition, I don't think we want to move to a model where users create a new spec and apply it to a table. I think we instead want to evolve the partition spec of a table. So this API will probably be different when we release that feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that it is better to move the update to a new PR, which addresses #281.
The main reason I put it here is to have additional unit tests to make sure it works as expected.
Additionally, I think we may support the table partition spec evolution in two ways
table.updatePartitionSpec()
.update(spec)
.commit();
table.updatePartitionSpec().newSpec(schema)
.identity(...)
.bucket(...)
...
.commit();
The first approach may be used if clients want to define a spec and manage it by their codes, e.g. use defined spec object in multiple places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For tests, you can use TableMetadata and commit directly:
TableOperations ops = table.operations();
TableMetadata base = ops.current()
TableMetadata updated = base.updatePartitionSpec(newSpec);
ops.commit(base, updated);There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👌
|
Thanks @jun-he, this is great progress on partition field IDs! I think I understand how compatibility will work with field IDs and I tried to add that context to my comments in this PR. If it isn't clear, please let me know what doesn't make sense. |
6d804af to
cd75cbd
Compare
|
I will move |
|
@chenjunjiedada FYI, I refactored the code and the partition spec evolution changes and related tests are now in #922. Thanks. |
| .appendManifest(manifestWithDeletedFiles) | ||
| .commit()); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: unnecessary newline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Will remove it in #922.
|
Awesome work, @jun-he! I'm merging this. |
…dule (apache#845) * Internal: Updates AppleAwsSparkConnect util to 1.0.9 * Internal: Allow AppleAwsConnectionFactory to use Proxy Hosts * Internal: Allows AppleAwsConnectionFactory to use Spark/Hadoop Configuration
For issue #280.
I took a different approach from the previous WIP PR (#499).
Instead of adding additional states and logics into
TableMetadata, I createdPartitionSpecUpdateto handle that.Also, it is unnecessary to keep
lastAssignedPartitonFieldIdinTableMetadataas it can be lazily derived from all specs (previous specs are still there) stored inTableMetadata.