Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions python/pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ class UpgradeFormatVersionUpdate(TableUpdate):
class AddSchemaUpdate(TableUpdate):
action: TableUpdateAction = TableUpdateAction.add_schema
schema_: Schema = Field(alias="schema")
# This field is required: https://github.com/apache/iceberg/pull/7445
last_column_id: int = Field(alias="last-column-id")


class SetCurrentSchemaUpdate(TableUpdate):
Expand Down Expand Up @@ -331,13 +333,13 @@ class TableRequirement(IcebergBaseModel):
class AssertCreate(TableRequirement):
"""The table must not already exist; used for create transactions."""

type: Literal["assert-create"]
type: Literal["assert-create"] = Field(default="assert-create")


class AssertTableUUID(TableRequirement):
"""The table UUID must match the requirement's `uuid`."""

type: Literal["assert-table-uuid"]
type: Literal["assert-table-uuid"] = Field(default="assert-table-uuid")
uuid: str


Expand All @@ -347,43 +349,43 @@ class AssertRefSnapshotId(TableRequirement):
if `snapshot-id` is `null` or missing, the ref must not already exist.
"""

type: Literal["assert-ref-snapshot-id"]
type: Literal["assert-ref-snapshot-id"] = Field(default="assert-ref-snapshot-id")
ref: str
snapshot_id: int = Field(..., alias="snapshot-id")


class AssertLastAssignedFieldId(TableRequirement):
"""The table's last assigned column id must match the requirement's `last-assigned-field-id`."""

type: Literal["assert-last-assigned-field-id"]
type: Literal["assert-last-assigned-field-id"] = Field(default="assert-last-assigned-field-id")
last_assigned_field_id: int = Field(..., alias="last-assigned-field-id")


class AssertCurrentSchemaId(TableRequirement):
"""The table's current schema id must match the requirement's `current-schema-id`."""

type: Literal["assert-current-schema-id"]
type: Literal["assert-current-schema-id"] = Field(default="assert-current-schema-id")
current_schema_id: int = Field(..., alias="current-schema-id")


class AssertLastAssignedPartitionId(TableRequirement):
"""The table's last assigned partition id must match the requirement's `last-assigned-partition-id`."""

type: Literal["assert-last-assigned-partition-id"]
type: Literal["assert-last-assigned-partition-id"] = Field(default="assert-last-assigned-partition-id")
last_assigned_partition_id: int = Field(..., alias="last-assigned-partition-id")


class AssertDefaultSpecId(TableRequirement):
"""The table's default spec id must match the requirement's `default-spec-id`."""

type: Literal["assert-default-spec-id"]
type: Literal["assert-default-spec-id"] = Field(default="assert-default-spec-id")
default_spec_id: int = Field(..., alias="default-spec-id")


class AssertDefaultSortOrderId(TableRequirement):
"""The table's default sort order id must match the requirement's `default-sort-order-id`."""

type: Literal["assert-default-sort-order-id"]
type: Literal["assert-default-sort-order-id"] = Field(default="assert-default-sort-order-id")
default_sort_order_id: int = Field(..., alias="default-sort-order-id")


Expand Down Expand Up @@ -943,10 +945,14 @@ def commit(self) -> Table:
if self._table is None:
raise ValueError("Cannot commit schema update, table is not set")
# Strip the catalog name
new_schema = self._apply()
table_update_response = self._table.catalog._commit_table( # pylint: disable=W0212
CommitTableRequest(
identifier=self._table.identifier[1:],
updates=[AddSchemaUpdate(schema=self._apply())],
updates=[
AddSchemaUpdate(schema=new_schema, last_column_id=new_schema.highest_field_id),
SetCurrentSchemaUpdate(schema_id=-1),
],
)
)

Expand Down
40 changes: 40 additions & 0 deletions python/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
from pyiceberg.table import Table
from pyiceberg.types import (
BooleanType,
FixedType,
IntegerType,
NestedField,
StringType,
TimestampType,
UUIDType,
)


Expand Down Expand Up @@ -352,3 +354,41 @@ def test_unpartitioned_fixed_table(catalog: Catalog) -> None:
b"12345678901234567ass12345",
b"qweeqwwqq1231231231231111",
]


@pytest.mark.integration
def test_schema_evolution(catalog: Catalog) -> None:
try:
catalog.drop_table("default.test_schema_evolution")
except NoSuchTableError:
pass

schema = Schema(
NestedField(field_id=1, name="col_uuid", field_type=UUIDType(), required=False),
NestedField(field_id=2, name="col_fixed", field_type=FixedType(25), required=False),
)

t = catalog.create_table(identifier="default.test_schema_evolution", schema=schema)

assert t.schema() == schema

with t.update_schema() as tx:
tx.add_column("col_string", StringType())

assert t.schema() == Schema(
NestedField(field_id=1, name="col_uuid", field_type=UUIDType(), required=False),
NestedField(field_id=2, name="col_fixed", field_type=FixedType(25), required=False),
NestedField(field_id=3, name="col_string", field_type=StringType(), required=False),
schema_id=1,
)

with t.transaction() as tx:
tx.update_schema().add_column("col_int", IntegerType()).commit()

assert t.schema() == Schema(
NestedField(field_id=1, name="col_uuid", field_type=UUIDType(), required=False),
NestedField(field_id=2, name="col_fixed", field_type=FixedType(25), required=False),
NestedField(field_id=3, name="col_string", field_type=StringType(), required=False),
NestedField(field_id=4, name="col_int", field_type=IntegerType(), required=False),
schema_id=2,
)