-
Notifications
You must be signed in to change notification settings - Fork 3k
Python: Support for adding columns #8174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I forgot, but could you also add docs under |
78842c7 to
1d50871
Compare
9ad74c1 to
b383de6
Compare
04e4dcf to
139039b
Compare
6d376c4 to
6d90d6f
Compare
6d90d6f to
449f23c
Compare
449f23c to
32db8b1
Compare
|
Thanks @hililiwei for working on this 👍🏻 |
|
|
||
| Add new columns through the `Transaction` or `UpdateSchema` API: | ||
|
|
||
| Use the Transaction API: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko, I don't think that we should encourage people to use the transaction API for single operations like this. It's much simpler to avoid using the transaction right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, we can swap them around.
| @property | ||
| def highest_field_id(self) -> int: | ||
| return visit(self.as_struct(), _FindLastFieldId()) | ||
| return max(self._lazy_id_to_name.keys(), default=0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems unrelated, though fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should have been a separate PR indeed, but it is a very welcome change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old way didn't get the highest field id correctly, so I changed it.
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| # pylint: disable=W0511 | ||
| from __future__ import annotations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko, what is our policy on using this vs using strings? How do we choose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add that to the Type annotations section. I prefer the from __future__ import annotations since it introduces less visual noise.
| self._short_field_names: List[str] = [] | ||
|
|
||
| def before_map_key(self, key: NestedField) -> None: | ||
| self.before_field(key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this the default implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, looks like it. Let me clean that up
| self.counter = itertools.count(start) | ||
| def __init__(self, next_id_func: Optional[Callable[[], int]] = None) -> None: | ||
| self.reserved_ids = {} | ||
| counter = itertools.count(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this remove start? Isn't that an unnecessary incompatible change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The start argument wasn't actually exposed:
def assign_fresh_schema_ids(schema: Schema) -> Schema:
"""Traverses the schema, and sets new IDs."""
return pre_order_visit(schema, _SetFreshIDs())The _SetFreshIDs() is private to the module, so we're safe here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, sounds good!
| """ | ||
| for requirement in new_requirements: | ||
| type_new_requirement = type(requirement) | ||
| if any(type(update) == type_new_requirement for update in self._updates): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be _updates or _requirements?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auch, copy-paste. Thanks!
| return self | ||
|
|
||
| def add_column( | ||
| self, name: str, type_var: IcebergType, doc: Optional[str] = None, parent: Optional[str] = None, required: bool = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we support Tuple[str] for name? That way we don't need parent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A Tuple[str, ...] would work. I like that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest Union[str, Tuple[str, ...]] so you can still pass in a string for adding a field to the root
| self._internal_add_column(parent, name, not required, type_var, doc) | ||
| return self | ||
|
|
||
| def allow_incompatible_changes(self) -> UpdateSchema: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about adding these boolean options to the update_schema(...) method?
table.update_schema(allow_incompatible_changes=True, case_sensitive=False).add_column(...).commit()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a nice touch, added
| CommitTableRequest(identifier=self._table.identifier[1:], updates=updates, requirements=requirements) | ||
| ) | ||
| self._table.metadata = table_update_response.metadata | ||
| self._table.metadata_location = table_update_response.metadata_location |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think these should be managed externally. We don't want code blocks that keep metadata and metadata_location in sync on the table all over the place. Can _table keep track of these and run the commit instead?
| parent_field = parent_type.element_field | ||
|
|
||
| if not parent_field.field_type.is_struct: | ||
| raise ValueError(f"Cannot add column to non-struct type: {parent}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should also include the parent field name.
| if exist_field: | ||
| raise ValueError(f"Cannot add column, name already exists: {parent}.{name}") | ||
|
|
||
| full_name = parent_field.name + "." + name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't the full name. The parent field's name is the local name, not its full name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this has been fixed in the refactored version where you need to supply the full path. Let me add some additional tests to make sure that it works as expected.
| new_type = assign_fresh_schema_ids(type_var, self.assign_new_column_id) | ||
| field = NestedField(new_id, name, new_type, not is_optional, doc) | ||
|
|
||
| self._adds.setdefault(parent_id, []).append(field) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko, looks like this doesn't update _id_to_parent but the Java implementation does. Is that because this is only adding columns right now? I think we should probably keep everything in sync with the reference implementation so we don't have bugs later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is currently only appending columns. To keep things smaller, @hililiwei decided to break it up into smaller chunks, which I think is a good idea.
|
|
||
| def schema(self, schema: Schema, struct_result: IcebergType) -> IcebergType: | ||
| fields = _ApplyChanges.add_fields(schema.as_struct().fields, self.adds.get(TABLE_ROOT_ID)) | ||
| if len(fields) > 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the same as the check in Java. In Java, this checks whether there were any changes to apply. It isn't clear what an empty list is here, but I definitely prefer using None to signal that no changes were necessary rather than an empty list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, this has already been updated in: #8374
|
|
||
| is_value_optional = not map_type.value_required | ||
|
|
||
| if is_value_optional != value_field.required and map_type.value_type == value_type: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this a bit confusing since it negates value_required above and then checks != here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is only adding columns currently, so this is a bit redundant. It will not be useful until there is an update field. Sorry , it's not clean enough here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In #8374 I've removed the temporary variable. I that might introduce confusion.
| if value_type is None: | ||
| raise ValueError(f"Cannot delete value type from map: {value_field}") | ||
|
|
||
| is_value_optional = not map_type.value_required |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't come from the right place. In Java, this comes from updates where the value may have been made optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, similar to the previous comment.
| if is_element_optional == element_field.required and list_type.element_type == element_type: | ||
| return list_type | ||
|
|
||
| return ListType(list_type.element_id, element_type, is_element_optional) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko: This is is_element_required so this accidentally flips the boolean. I think we should work with is_required throughout this code to avoid needing to negate and accidentally changing whether types are optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is correct, this was already fixed in #8374 and I'll make sure to add some more checks
| if element_type is None: | ||
| raise ValueError(f"Cannot delete element type from list: {element_field}") | ||
|
|
||
| is_element_optional = not list_type.element_required |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is unnecessary without updates. It can remain as a noop check and placeholder, I guess? In any case, the logic should be fixed below.
|
|
||
| is_element_optional = not list_type.element_required | ||
|
|
||
| if is_element_optional == element_field.required and list_type.element_type == element_type: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks wrong: is_optional == is_required.
| new_fields.extend(fields) | ||
| if adds: | ||
| new_fields.extend(adds) | ||
| return new_fields |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about return tuple(*fields, *adds)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like it! Updated it to:
@staticmethod
def add_fields(fields: Tuple[NestedField, ...], adds: List[NestedField]) -> Optional[List[NestedField]]:
return None if len(adds) == 0 else tuple(*fields, *adds)| return primitive | ||
|
|
||
| @staticmethod | ||
| def add_fields(fields: Tuple[NestedField, ...], adds: Optional[List[NestedField]]) -> List[NestedField]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If adds is None, then I think this should return None.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment above :)
| new_fields = self.adds[field_id] | ||
| if len(new_fields) > 0: | ||
| fields = _ApplyChanges.add_fields(field_result.fields, new_fields) | ||
| if len(fields) > 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be simpler:
new_fields = _ApplyChanges.add_fields(field_result.fields, self.adds.get(field_id))
if new_fields is None:
return field_result
return StructType(*new_fields)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| new_fields: List[NestedField] = [] | ||
| for i in range(len(field_results)): | ||
| type_: Optional[IcebergType] = field_results[i] | ||
| if type_ is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is implementing drop column. I'm not sure it makes sense to have part of the code, but not implement it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this also confused me a bit
| continue | ||
|
|
||
| field: NestedField = struct.fields[i] | ||
| new_fields.append(field) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this use the type from field_results?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes :)
| has_change = True | ||
| continue | ||
|
|
||
| field: NestedField = struct.fields[i] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko, we also need to be careful in places like this. This lookup is incorrect because it is matching the field results with the original fields by position. That must be done by field ID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @rdblue, I don't understand something. The field_results are obtained from the struct. And when we get None, the loop will continue. It seems that getting from the position does not cause confusion?
|
|
||
| def field(self, field: NestedField, field_result: IcebergType) -> IcebergType: | ||
| field_id: int = field.field_id | ||
| if field_id in self.adds: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this also check that the field is a struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good check, but I think it went okay since _internal_add_column only create mappings where the parent is a struct.

Under the umbrella of #7875
What is the purpose of the change
Added the ability to add columns to PyIceberg.