-
Notifications
You must be signed in to change notification settings - Fork 3k
Python: Support for adding columns #8174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,8 @@ | |
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| # pylint: disable=W0511 | ||
| from __future__ import annotations | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Fokko, what is our policy on using this vs using strings? How do we choose?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can add that to the |
||
|
|
||
| import itertools | ||
| from abc import ABC, abstractmethod | ||
| from dataclasses import dataclass | ||
|
|
@@ -145,7 +147,7 @@ def _lazy_id_to_name(self) -> Dict[int, str]: | |
| return index_name_by_id(self) | ||
|
|
||
| @cached_property | ||
| def _lazy_id_to_accessor(self) -> Dict[int, "Accessor"]: | ||
| def _lazy_id_to_accessor(self) -> Dict[int, Accessor]: | ||
| """Returns an index of field ID to accessor. | ||
|
|
||
| This is calculated once when called for the first time. Subsequent calls to this method will use a cached index. | ||
|
|
@@ -201,7 +203,7 @@ def find_type(self, name_or_id: Union[str, int], case_sensitive: bool = True) -> | |
|
|
||
| @property | ||
| def highest_field_id(self) -> int: | ||
| return visit(self.as_struct(), _FindLastFieldId()) | ||
| return max(self._lazy_id_to_name.keys(), default=0) | ||
Fokko marked this conversation as resolved.
Show resolved
Hide resolved
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems unrelated, though fine.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should have been a separate PR indeed, but it is a very welcome change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The old way didn't get the highest field id correctly, so I changed it. |
||
|
|
||
| def find_column_name(self, column_id: int) -> Optional[str]: | ||
| """Find a column name given a column ID. | ||
|
|
@@ -226,7 +228,7 @@ def column_names(self) -> List[str]: | |
| """ | ||
| return list(self._lazy_id_to_name.values()) | ||
|
|
||
| def accessor_for_field(self, field_id: int) -> "Accessor": | ||
| def accessor_for_field(self, field_id: int) -> Accessor: | ||
| """Find a schema position accessor given a field ID. | ||
|
|
||
| Args: | ||
|
|
@@ -243,7 +245,7 @@ def accessor_for_field(self, field_id: int) -> "Accessor": | |
|
|
||
| return self._lazy_id_to_accessor[field_id] | ||
|
|
||
| def select(self, *names: str, case_sensitive: bool = True) -> "Schema": | ||
| def select(self, *names: str, case_sensitive: bool = True) -> Schema: | ||
| """Return a new schema instance pruned to a subset of columns. | ||
|
|
||
| Args: | ||
|
|
@@ -682,7 +684,7 @@ class Accessor: | |
| """An accessor for a specific position in a container that implements the StructProtocol.""" | ||
|
|
||
| position: int | ||
| inner: Optional["Accessor"] = None | ||
| inner: Optional[Accessor] = None | ||
|
|
||
| def __str__(self) -> str: | ||
| """Returns the string representation of the Accessor class.""" | ||
|
|
@@ -766,7 +768,7 @@ def _(obj: MapType, visitor: SchemaVisitor[T]) -> T: | |
|
|
||
| visitor.before_map_value(obj.value_field) | ||
| value_result = visit(obj.value_type, visitor) | ||
| visitor.after_list_element(obj.value_field) | ||
| visitor.after_map_value(obj.value_field) | ||
Fokko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| return visitor.map(obj, key_result, value_result) | ||
|
|
||
|
|
@@ -890,6 +892,22 @@ def __init__(self) -> None: | |
| self._field_names: List[str] = [] | ||
| self._short_field_names: List[str] = [] | ||
|
|
||
| def before_map_key(self, key: NestedField) -> None: | ||
| self.before_field(key) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this the default implementation?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, looks like it. Let me clean that up |
||
|
|
||
| def after_map_key(self, key: NestedField) -> None: | ||
| self.after_field(key) | ||
|
|
||
| def before_map_value(self, value: NestedField) -> None: | ||
| if not isinstance(value.field_type, StructType): | ||
| self._short_field_names.append(value.name) | ||
| self._field_names.append(value.name) | ||
|
|
||
| def after_map_value(self, value: NestedField) -> None: | ||
| if not isinstance(value.field_type, StructType): | ||
| self._short_field_names.pop() | ||
| self._field_names.pop() | ||
|
|
||
| def before_list_element(self, element: NestedField) -> None: | ||
| """Short field names omit element when the element is a StructType.""" | ||
| if not isinstance(element.field_type, StructType): | ||
|
|
@@ -1082,45 +1100,23 @@ def build_position_accessors(schema_or_type: Union[Schema, IcebergType]) -> Dict | |
| return visit(schema_or_type, _BuildPositionAccessors()) | ||
|
|
||
|
|
||
| class _FindLastFieldId(SchemaVisitor[int]): | ||
| """Traverses the schema to get the highest field-id.""" | ||
|
|
||
| def schema(self, schema: Schema, struct_result: int) -> int: | ||
| return struct_result | ||
|
|
||
| def struct(self, struct: StructType, field_results: List[int]) -> int: | ||
| return max(field_results) | ||
|
|
||
| def field(self, field: NestedField, field_result: int) -> int: | ||
| return max(field.field_id, field_result) | ||
|
|
||
| def list(self, list_type: ListType, element_result: int) -> int: | ||
| return element_result | ||
|
|
||
| def map(self, map_type: MapType, key_result: int, value_result: int) -> int: | ||
| return max(key_result, value_result) | ||
|
|
||
| def primitive(self, primitive: PrimitiveType) -> int: | ||
| return 0 | ||
|
|
||
|
|
||
| def assign_fresh_schema_ids(schema: Schema) -> Schema: | ||
| def assign_fresh_schema_ids(schema_or_type: Union[Schema, IcebergType], next_id: Optional[Callable[[], int]] = None) -> Schema: | ||
| """Traverses the schema, and sets new IDs.""" | ||
| return pre_order_visit(schema, _SetFreshIDs()) | ||
| return pre_order_visit(schema_or_type, _SetFreshIDs(next_id_func=next_id)) | ||
|
|
||
|
|
||
| class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]): | ||
| """Traverses the schema and assigns monotonically increasing ids.""" | ||
|
|
||
| counter: itertools.count # type: ignore | ||
| reserved_ids: Dict[int, int] | ||
|
|
||
| def __init__(self, start: int = 1) -> None: | ||
| self.counter = itertools.count(start) | ||
| def __init__(self, next_id_func: Optional[Callable[[], int]] = None) -> None: | ||
| self.reserved_ids = {} | ||
| counter = itertools.count(1) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this remove
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The def assign_fresh_schema_ids(schema: Schema) -> Schema:
"""Traverses the schema, and sets new IDs."""
return pre_order_visit(schema, _SetFreshIDs())The
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, sounds good! |
||
| self.next_id_func = next_id_func if next_id_func is not None else lambda: next(counter) | ||
|
|
||
| def _get_and_increment(self) -> int: | ||
| return next(self.counter) | ||
| return self.next_id_func() | ||
|
|
||
| def schema(self, schema: Schema, struct_result: Callable[[], StructType]) -> Schema: | ||
| # First we keep the original identifier_field_ids here, we remap afterwards | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko, I don't think that we should encourage people to use the transaction API for single operations like this. It's much simpler to avoid using the transaction right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, we can swap them around.