Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
68c9a03
Alter table
Fokko Apr 26, 2023
4e575ba
Merge branch 'master' of github.com:apache/iceberg into fd-alter-table
Fokko May 1, 2023
e7a4251
Merge branch 'master' of github.com:apache/iceberg into fd-alter-table
Fokko May 11, 2023
aa767e3
Make the CI happy
Fokko May 11, 2023
ebc8fda
Merge branch 'master' of github.com:apache/iceberg into fd-alter-table
Fokko May 15, 2023
1adff76
Comments
Fokko May 25, 2023
d958bd6
Merge branch 'master' of github.com:apache/iceberg into fd-alter-table
Fokko May 25, 2023
5e2ed48
Thanks Ryan!
Fokko May 25, 2023
1c107c9
Python: Bump dependencies to the latest version
Fokko Jun 1, 2023
800583c
Remove from docs
Fokko Jun 4, 2023
efa4aaa
WIP
Fokko Jun 4, 2023
f690c7f
Comments
Fokko Jun 4, 2023
8be23df
Make CI happy
Fokko Jun 4, 2023
b952513
Update docstrings
Fokko Jun 4, 2023
a0c12fb
Do some renaming
Fokko Jun 5, 2023
b93e5c6
Add a context manager
Fokko Jun 6, 2023
09cf76d
Rename commit to commit_transaction()
Fokko Jun 6, 2023
593b2e6
Update docs
Fokko Jun 6, 2023
cc6d75f
Refresh in place
Fokko Jun 6, 2023
822d440
Remove redudant call
Fokko Jun 6, 2023
6ec83b2
Merge branch 'master' of github.com:apache/iceberg into fd-alter-table
Fokko Jun 10, 2023
fe67e57
Load a fresh copy instead
Fokko Jun 12, 2023
4ca07e8
Fix the docstrings
Fokko Jun 12, 2023
bd36077
Merge branch 'master' of github.com:apache/iceberg into fd-alter-table
Fokko Jun 12, 2023
828a3fd
Restore CommitTableResponse
Fokko Jun 13, 2023
ef14375
Merge branch 'master' of github.com:Fokko/incubator-iceberg into fd-a…
Fokko Jun 13, 2023
7272c86
Merge branch 'master' of github.com:apache/iceberg into fd-alter-table
Fokko Jun 13, 2023
1ceaa96
Merge branch 'master' of github.com:apache/iceberg into fd-alter-table
Fokko Jun 14, 2023
0332c73
Conflicts
Fokko Jun 14, 2023
88b9564
Merge branch 'master' of github.com:apache/iceberg into fd-alter-table
Fokko Jun 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 29 additions & 139 deletions python/mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,110 +78,15 @@ catalog.load_table(("nyc", "taxis"))
# The tuple syntax can be used if the namespace or table contains a dot.
```

This returns a `Table` that represents an Iceberg table:

```python
Table(
identifier=('nyc', 'taxis'),
metadata_location='s3a://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json',
metadata=TableMetadataV2(
location='s3a://warehouse/wh/nyc.db/taxis',
table_uuid=UUID('ebd5d172-2162-453d-b586-1cdce52c1116'),
last_updated_ms=1662633437826,
last_column_id=19,
schemas=[Schema(
NestedField(field_id=1, name='VendorID', field_type=LongType(), required=False),
NestedField(field_id=2, name='tpep_pickup_datetime', field_type=TimestamptzType(), required=False),
NestedField(field_id=3, name='tpep_dropoff_datetime', field_type=TimestamptzType(), required=False),
NestedField(field_id=4, name='passenger_count', field_type=DoubleType(), required=False),
NestedField(field_id=5, name='trip_distance', field_type=DoubleType(), required=False),
NestedField(field_id=6, name='RatecodeID', field_type=DoubleType(), required=False),
NestedField(field_id=7, name='store_and_fwd_flag', field_type=StringType(), required=False),
NestedField(field_id=8, name='PULocationID', field_type=LongType(), required=False),
NestedField(field_id=9, name='DOLocationID', field_type=LongType(), required=False),
NestedField(field_id=10, name='payment_type', field_type=LongType(), required=False),
NestedField(field_id=11, name='fare_amount', field_type=DoubleType(), required=False),
NestedField(field_id=12, name='extra', field_type=DoubleType(), required=False),
NestedField(field_id=13, name='mta_tax', field_type=DoubleType(), required=False),
NestedField(field_id=14, name='tip_amount', field_type=DoubleType(), required=False),
NestedField(field_id=15, name='tolls_amount', field_type=DoubleType(), required=False),
NestedField(field_id=16, name='improvement_surcharge', field_type=DoubleType(), required=False),
NestedField(field_id=17, name='total_amount', field_type=DoubleType(), required=False),
NestedField(field_id=18, name='congestion_surcharge', field_type=DoubleType(), required=False),
NestedField(field_id=19, name='airport_fee', field_type=DoubleType(), required=False)
),
schema_id=0,
identifier_field_ids=[]
)],
current_schema_id=0,
partition_specs=[PartitionSpec(spec_id=0)],
default_spec_id=0,
last_partition_id=999,
properties={
'owner': 'root',
'write.format.default': 'parquet'
},
current_snapshot_id=8334458494559715805,
snapshots=[
Snapshot(
snapshot_id=7910949481055846233,
parent_snapshot_id=None,
sequence_number=None,
timestamp_ms=1662489306555,
manifest_list='s3a://warehouse/wh/nyc.db/taxis/metadata/snap-7910949481055846233-1-3eb7a2e1-5b7a-4e76-a29a-3e29c176eea4.avro',
summary=Summary(
Operation.APPEND,
**{
'spark.app.id': 'local-1662489289173',
'added-data-files': '1',
'added-records': '2979431',
'added-files-size': '46600777',
'changed-partition-count': '1',
'total-records': '2979431',
'total-files-size': '46600777',
'total-data-files': '1',
'total-delete-files': '0',
'total-position-deletes': '0',
'total-equality-deletes': '0'
}
),
schema_id=0
),
],
snapshot_log=[
SnapshotLogEntry(
snapshot_id='7910949481055846233',
timestamp_ms=1662489306555
)
],
metadata_log=[
MetadataLogEntry(
metadata_file='s3a://warehouse/wh/nyc.db/taxis/metadata/00000-b58341ba-6a63-4eea-9b2f-e85e47c7d09f.metadata.json',
timestamp_ms=1662489306555
)
],
sort_orders=[SortOrder(order_id=0)],
default_sort_order_id=0,
refs={
'main': SnapshotRef(
snapshot_id=8334458494559715805,
snapshot_ref_type=SnapshotRefType.BRANCH,
min_snapshots_to_keep=None,
max_snapshot_age_ms=None,
max_ref_age_ms=None
)
},
format_version=2,
last_sequence_number=1
)
)
```
This returns a `Table` that represents an Iceberg table that can be queried and altered.

### Directly from a metadata file

To load a table directly from a metadata file (i.e., **without** using a catalog), you can use a `StaticTable` as follows:

```python
from pyiceberg.table import StaticTable

table = StaticTable.from_metadata(
"s3a://warehouse/wh/nyc.db/taxis/metadata/00002-6ea51ce3-62aa-4197-9cf8-43d07c3440ca.metadata.json"
)
Expand Down Expand Up @@ -241,52 +146,37 @@ catalog.create_table(
)
```

Which returns a newly created table:
### Update table properties

Set and remove properties through the `Transaction` API:

```python
Table(
identifier=('default', 'bids'),
metadata_location='/Users/fokkodriesprong/Desktop/docker-spark-iceberg/wh/bids//metadata/00000-c8cd93ab-f784-474d-a167-b1a86b05195f.metadata.json',
metadata=TableMetadataV2(
location='/Users/fokkodriesprong/Desktop/docker-spark-iceberg/wh/bids/',
table_uuid=UUID('38d4cb39-4945-4bf2-b374-984b5c4984d2'),
last_updated_ms=1661847562069,
last_column_id=4,
schemas=[
Schema(
NestedField(field_id=1, name='datetime', field_type=TimestampType(), required=False),
NestedField(field_id=2, name='bid', field_type=DoubleType(), required=False),
NestedField(field_id=3, name='ask', field_type=DoubleType(), required=False),
NestedField(field_id=4, name='symbol', field_type=StringType(), required=False)),
schema_id=1,
identifier_field_ids=[])
],
current_schema_id=1,
partition_specs=[
PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=DayTransform(), name='datetime_day'),))
],
default_spec_id=0,
last_partition_id=1000,
properties={},
current_snapshot_id=None,
snapshots=[],
snapshot_log=[],
metadata_log=[],
sort_orders=[
SortOrder(order_id=1, fields=[SortField(source_id=4, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST)])
],
default_sort_order_id=1,
refs={},
format_version=2,
last_sequence_number=0
)
)
with table.transaction() as transaction:
transaction.set_properties(abc="def")

assert table.properties == {"abc": "def"}

with table.transaction() as transaction:
transaction.remove_properties("abc")

assert table.properties == {}
```

Or, without a context manager:

```python
table = table.transaction().set_properties(abc="def").commit_transaction()

assert table.properties == {"abc": "def"}

table = table.transaction().remove_properties("abc").commit_transaction()

assert table.properties == {}
```

## Query a table
## Query the data

To query a table, a table scan is needed. A table scan accepts a filter, columns and optionally a limit and a snapshot ID:
To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID:

```python
from pyiceberg.catalog import load_catalog
Expand Down
57 changes: 26 additions & 31 deletions python/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 21 additions & 2 deletions python/pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import ToOutputFile
from pyiceberg.table import Table, TableMetadata
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
Table,
TableMetadata,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import (
EMPTY_DICT,
Expand Down Expand Up @@ -322,6 +327,20 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
NoSuchTableError: If a table with the name does not exist.
"""

@abstractmethod
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Updates one or more tables.

Args:
table_request (CommitTableRequest): The table requests to be carried out.

Returns:
CommitTableResponse: The updated metadata.

Raises:
NoSuchTableError: If a table with the given identifier does not exist.
"""

@abstractmethod
def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
"""Create a namespace in the catalog.
Expand Down Expand Up @@ -392,7 +411,7 @@ def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Proper

@abstractmethod
def update_namespace_properties(
self, namespace: Union[str, Identifier], removals: set[str] | None = None, updates: Properties = EMPTY_DICT
self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT
) -> PropertiesUpdateSummary:
"""Removes provided property keys and updates properties for a namespace.

Expand Down
17 changes: 16 additions & 1 deletion python/pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import Table
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT
Expand Down Expand Up @@ -168,6 +168,20 @@ def create_table(

return self.load_table(identifier=identifier)

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Updates the table.

Args:
table_request (CommitTableRequest): The table requests to be carried out.

Returns:
CommitTableResponse: The updated metadata.

Raises:
NoSuchTableError: If a table with the given identifier does not exist.
"""
raise NotImplementedError

def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""
Loads the table's metadata and returns the table instance.
Expand Down Expand Up @@ -577,6 +591,7 @@ def _convert_dynamo_table_item_to_iceberg_table(self, dynamo_table_item: Dict[st
metadata=metadata,
metadata_location=metadata_location,
io=self._load_file_io(metadata.properties, metadata_location),
catalog=self,
)


Expand Down
Loading