diff --git a/python/pyiceberg/table/__init__.py b/python/pyiceberg/table/__init__.py index 377b314e4423..b1e7a73dfa0b 100644 --- a/python/pyiceberg/table/__init__.py +++ b/python/pyiceberg/table/__init__.py @@ -251,6 +251,8 @@ class UpgradeFormatVersionUpdate(TableUpdate): class AddSchemaUpdate(TableUpdate): action: TableUpdateAction = TableUpdateAction.add_schema schema_: Schema = Field(alias="schema") + # This field is required: https://github.com/apache/iceberg/pull/7445 + last_column_id: int = Field(alias="last-column-id") class SetCurrentSchemaUpdate(TableUpdate): @@ -331,13 +333,13 @@ class TableRequirement(IcebergBaseModel): class AssertCreate(TableRequirement): """The table must not already exist; used for create transactions.""" - type: Literal["assert-create"] + type: Literal["assert-create"] = Field(default="assert-create") class AssertTableUUID(TableRequirement): """The table UUID must match the requirement's `uuid`.""" - type: Literal["assert-table-uuid"] + type: Literal["assert-table-uuid"] = Field(default="assert-table-uuid") uuid: str @@ -347,7 +349,7 @@ class AssertRefSnapshotId(TableRequirement): if `snapshot-id` is `null` or missing, the ref must not already exist. """ - type: Literal["assert-ref-snapshot-id"] + type: Literal["assert-ref-snapshot-id"] = Field(default="assert-ref-snapshot-id") ref: str snapshot_id: int = Field(..., alias="snapshot-id") @@ -355,35 +357,35 @@ class AssertRefSnapshotId(TableRequirement): class AssertLastAssignedFieldId(TableRequirement): """The table's last assigned column id must match the requirement's `last-assigned-field-id`.""" - type: Literal["assert-last-assigned-field-id"] + type: Literal["assert-last-assigned-field-id"] = Field(default="assert-last-assigned-field-id") last_assigned_field_id: int = Field(..., alias="last-assigned-field-id") class AssertCurrentSchemaId(TableRequirement): """The table's current schema id must match the requirement's `current-schema-id`.""" - type: Literal["assert-current-schema-id"] + type: Literal["assert-current-schema-id"] = Field(default="assert-current-schema-id") current_schema_id: int = Field(..., alias="current-schema-id") class AssertLastAssignedPartitionId(TableRequirement): """The table's last assigned partition id must match the requirement's `last-assigned-partition-id`.""" - type: Literal["assert-last-assigned-partition-id"] + type: Literal["assert-last-assigned-partition-id"] = Field(default="assert-last-assigned-partition-id") last_assigned_partition_id: int = Field(..., alias="last-assigned-partition-id") class AssertDefaultSpecId(TableRequirement): """The table's default spec id must match the requirement's `default-spec-id`.""" - type: Literal["assert-default-spec-id"] + type: Literal["assert-default-spec-id"] = Field(default="assert-default-spec-id") default_spec_id: int = Field(..., alias="default-spec-id") class AssertDefaultSortOrderId(TableRequirement): """The table's default sort order id must match the requirement's `default-sort-order-id`.""" - type: Literal["assert-default-sort-order-id"] + type: Literal["assert-default-sort-order-id"] = Field(default="assert-default-sort-order-id") default_sort_order_id: int = Field(..., alias="default-sort-order-id") @@ -943,10 +945,14 @@ def commit(self) -> Table: if self._table is None: raise ValueError("Cannot commit schema update, table is not set") # Strip the catalog name + new_schema = self._apply() table_update_response = self._table.catalog._commit_table( # pylint: disable=W0212 CommitTableRequest( identifier=self._table.identifier[1:], - updates=[AddSchemaUpdate(schema=self._apply())], + updates=[ + AddSchemaUpdate(schema=new_schema, last_column_id=new_schema.highest_field_id), + SetCurrentSchemaUpdate(schema_id=-1), + ], ) ) diff --git a/python/tests/test_integration.py b/python/tests/test_integration.py index a63436bdaead..7966e2a32973 100644 --- a/python/tests/test_integration.py +++ b/python/tests/test_integration.py @@ -40,10 +40,12 @@ from pyiceberg.table import Table from pyiceberg.types import ( BooleanType, + FixedType, IntegerType, NestedField, StringType, TimestampType, + UUIDType, ) @@ -352,3 +354,41 @@ def test_unpartitioned_fixed_table(catalog: Catalog) -> None: b"12345678901234567ass12345", b"qweeqwwqq1231231231231111", ] + + +@pytest.mark.integration +def test_schema_evolution(catalog: Catalog) -> None: + try: + catalog.drop_table("default.test_schema_evolution") + except NoSuchTableError: + pass + + schema = Schema( + NestedField(field_id=1, name="col_uuid", field_type=UUIDType(), required=False), + NestedField(field_id=2, name="col_fixed", field_type=FixedType(25), required=False), + ) + + t = catalog.create_table(identifier="default.test_schema_evolution", schema=schema) + + assert t.schema() == schema + + with t.update_schema() as tx: + tx.add_column("col_string", StringType()) + + assert t.schema() == Schema( + NestedField(field_id=1, name="col_uuid", field_type=UUIDType(), required=False), + NestedField(field_id=2, name="col_fixed", field_type=FixedType(25), required=False), + NestedField(field_id=3, name="col_string", field_type=StringType(), required=False), + schema_id=1, + ) + + with t.transaction() as tx: + tx.update_schema().add_column("col_int", IntegerType()).commit() + + assert t.schema() == Schema( + NestedField(field_id=1, name="col_uuid", field_type=UUIDType(), required=False), + NestedField(field_id=2, name="col_fixed", field_type=FixedType(25), required=False), + NestedField(field_id=3, name="col_string", field_type=StringType(), required=False), + NestedField(field_id=4, name="col_int", field_type=IntegerType(), required=False), + schema_id=2, + )