-
Notifications
You must be signed in to change notification settings - Fork 2.2k
/
__init__.py
841 lines (650 loc) · 28.5 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from enum import Enum
from functools import cached_property
from itertools import chain
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Literal,
Optional,
Set,
Tuple,
TypeVar,
Union,
)
from pydantic import Field
from sortedcontainers import SortedList
from pyiceberg.expressions import (
AlwaysTrue,
And,
BooleanExpression,
EqualTo,
parser,
visitors,
)
from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator, inclusive_projection
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.manifest import (
POSITIONAL_DELETE_SCHEMA,
DataFile,
DataFileContent,
ManifestContent,
ManifestEntry,
ManifestFile,
)
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadata
from pyiceberg.table.snapshots import Snapshot, SnapshotLogEntry
from pyiceberg.table.sorting import SortOrder
from pyiceberg.typedef import (
EMPTY_DICT,
IcebergBaseModel,
Identifier,
KeyDefaultDict,
Properties,
)
if TYPE_CHECKING:
import pandas as pd
import pyarrow as pa
import ray
from duckdb import DuckDBPyConnection
from pyiceberg.catalog import Catalog
ALWAYS_TRUE = AlwaysTrue()
class Transaction:
_table: Table
_updates: Tuple[TableUpdate, ...]
_requirements: Tuple[TableRequirement, ...]
def __init__(
self,
table: Table,
actions: Optional[Tuple[TableUpdate, ...]] = None,
requirements: Optional[Tuple[TableRequirement, ...]] = None,
):
self._table = table
self._updates = actions or ()
self._requirements = requirements or ()
def __enter__(self) -> Transaction:
"""Starts a transaction to update the table."""
return self
def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
"""Closes and commits the transaction."""
fresh_table = self.commit_transaction()
# Update the new data in place
self._table.metadata = fresh_table.metadata
self._table.metadata_location = fresh_table.metadata_location
def _append_updates(self, *new_updates: TableUpdate) -> Transaction:
"""Appends updates to the set of staged updates.
Args:
*new_updates: Any new updates.
Raises:
ValueError: When the type of update is not unique.
Returns:
A new AlterTable object with the new updates appended.
"""
for new_update in new_updates:
type_new_update = type(new_update)
if any(type(update) == type_new_update for update in self._updates):
raise ValueError(f"Updates in a single commit need to be unique, duplicate: {type_new_update}")
self._updates = self._updates + new_updates
return self
def set_table_version(self, format_version: Literal[1, 2]) -> Transaction:
"""Sets the table to a certain version.
Args:
format_version: The newly set version.
Returns:
The alter table builder.
"""
raise NotImplementedError("Not yet implemented")
def set_properties(self, **updates: str) -> Transaction:
"""Set properties.
When a property is already set, it will be overwritten.
Args:
updates: The properties set on the table.
Returns:
The alter table builder.
"""
return self._append_updates(SetPropertiesUpdate(updates=updates))
def remove_properties(self, *removals: str) -> Transaction:
"""Removes properties.
Args:
removals: Properties to be removed.
Returns:
The alter table builder.
"""
return self._append_updates(RemovePropertiesUpdate(removals=removals))
def update_location(self, location: str) -> Transaction:
"""Sets the new table location.
Args:
location: The new location of the table.
Returns:
The alter table builder.
"""
raise NotImplementedError("Not yet implemented")
def commit_transaction(self) -> Table:
"""Commits the changes to the catalog.
Returns:
The table with the updates applied.
"""
# Strip the catalog name
if len(self._updates) > 0:
response = self._table.catalog._commit_table( # pylint: disable=W0212
CommitTableRequest(
identifier=self._table.identifier[1:],
requirements=self._requirements,
updates=self._updates,
)
)
# Update the metadata with the new one
self._table.metadata = response.metadata
self._table.metadata_location = response.metadata_location
return self._table
else:
return self._table
class TableUpdateAction(Enum):
upgrade_format_version = "upgrade-format-version"
add_schema = "add-schema"
set_current_schema = "set-current-schema"
add_spec = "add-spec"
set_default_spec = "set-default-spec"
add_sort_order = "add-sort-order"
set_default_sort_order = "set-default-sort-order"
add_snapshot = "add-snapshot"
set_snapshot_ref = "set-snapshot-ref"
remove_snapshots = "remove-snapshots"
remove_snapshot_ref = "remove-snapshot-ref"
set_location = "set-location"
set_properties = "set-properties"
remove_properties = "remove-properties"
class TableUpdate(IcebergBaseModel):
action: TableUpdateAction
class UpgradeFormatVersionUpdate(TableUpdate):
action: TableUpdateAction = TableUpdateAction.upgrade_format_version
format_version: int = Field(alias="format-version")
class AddSchemaUpdate(TableUpdate):
action: TableUpdateAction = TableUpdateAction.add_schema
schema_: Schema = Field(alias="schema")
class SetCurrentSchemaUpdate(TableUpdate):
action: TableUpdateAction = TableUpdateAction.set_current_schema
schema_id: int = Field(
alias="schema-id", description="Schema ID to set as current, or -1 to set last added schema", default=-1
)
class AddPartitionSpecUpdate(TableUpdate):
action: TableUpdateAction = TableUpdateAction.add_spec
spec: PartitionSpec
class SetDefaultSpecUpdate(TableUpdate):
action: TableUpdateAction = TableUpdateAction.set_default_spec
spec_id: int = Field(
alias="spec-id", description="Partition spec ID to set as the default, or -1 to set last added spec", default=-1
)
class AddSortOrderUpdate(TableUpdate):
action: TableUpdateAction = TableUpdateAction.add_sort_order
sort_order: SortOrder = Field(alias="sort-order")
class SetDefaultSortOrderUpdate(TableUpdate):
action: TableUpdateAction = TableUpdateAction.set_default_sort_order
sort_order_id: int = Field(
alias="sort-order-id", description="Sort order ID to set as the default, or -1 to set last added sort order", default=-1
)
class AddSnapshotUpdate(TableUpdate):
action: TableUpdateAction = TableUpdateAction.add_snapshot
snapshot: Snapshot
class SetSnapshotRefUpdate(TableUpdate):
action: TableUpdateAction = TableUpdateAction.set_snapshot_ref
ref_name: str = Field(alias="ref-name")
type: Literal["tag", "branch"]
snapshot_id: int = Field(alias="snapshot-id")
max_age_ref_ms: int = Field(alias="max-ref-age-ms")
max_snapshot_age_ms: int = Field(alias="max-snapshot-age-ms")
min_snapshots_to_keep: int = Field(alias="min-snapshots-to-keep")
class RemoveSnapshotsUpdate(TableUpdate):
action: TableUpdateAction = TableUpdateAction.remove_snapshots
snapshot_ids: List[int] = Field(alias="snapshot-ids")
class RemoveSnapshotRefUpdate(TableUpdate):
action: TableUpdateAction = TableUpdateAction.remove_snapshot_ref
ref_name: str = Field(alias="ref-name")
class SetLocationUpdate(TableUpdate):
action: TableUpdateAction = TableUpdateAction.set_location
location: str
class SetPropertiesUpdate(TableUpdate):
action: TableUpdateAction = TableUpdateAction.set_properties
updates: Dict[str, str]
class RemovePropertiesUpdate(TableUpdate):
action: TableUpdateAction = TableUpdateAction.remove_properties
removals: List[str]
class TableRequirement(IcebergBaseModel):
type: str
class AssertCreate(TableRequirement):
"""The table must not already exist; used for create transactions."""
type: Literal["assert-create"]
class AssertTableUUID(TableRequirement):
"""The table UUID must match the requirement's `uuid`."""
type: Literal["assert-table-uuid"]
uuid: str
class AssertRefSnapshotId(TableRequirement):
"""The table branch or tag identified by the requirement's `ref` must reference the requirement's `snapshot-id`.
if `snapshot-id` is `null` or missing, the ref must not already exist.
"""
type: Literal["assert-ref-snapshot-id"]
ref: str
snapshot_id: int = Field(..., alias="snapshot-id")
class AssertLastAssignedFieldId(TableRequirement):
"""The table's last assigned column id must match the requirement's `last-assigned-field-id`."""
type: Literal["assert-last-assigned-field-id"]
last_assigned_field_id: int = Field(..., alias="last-assigned-field-id")
class AssertCurrentSchemaId(TableRequirement):
"""The table's current schema id must match the requirement's `current-schema-id`."""
type: Literal["assert-current-schema-id"]
current_schema_id: int = Field(..., alias="current-schema-id")
class AssertLastAssignedPartitionId(TableRequirement):
"""The table's last assigned partition id must match the requirement's `last-assigned-partition-id`."""
type: Literal["assert-last-assigned-partition-id"]
last_assigned_partition_id: int = Field(..., alias="last-assigned-partition-id")
class AssertDefaultSpecId(TableRequirement):
"""The table's default spec id must match the requirement's `default-spec-id`."""
type: Literal["assert-default-spec-id"]
default_spec_id: int = Field(..., alias="default-spec-id")
class AssertDefaultSortOrderId(TableRequirement):
"""The table's default sort order id must match the requirement's `default-sort-order-id`."""
type: Literal["assert-default-sort-order-id"]
default_sort_order_id: int = Field(..., alias="default-sort-order-id")
class CommitTableRequest(IcebergBaseModel):
identifier: Identifier = Field()
requirements: List[TableRequirement] = Field(default_factory=list)
updates: List[TableUpdate] = Field(default_factory=list)
class CommitTableResponse(IcebergBaseModel):
metadata: TableMetadata = Field()
metadata_location: str = Field(alias="metadata-location")
class Table:
identifier: Identifier = Field()
metadata: TableMetadata = Field()
metadata_location: str = Field()
io: FileIO
catalog: Catalog
def __init__(
self, identifier: Identifier, metadata: TableMetadata, metadata_location: str, io: FileIO, catalog: Catalog
) -> None:
self.identifier = identifier
self.metadata = metadata
self.metadata_location = metadata_location
self.io = io
self.catalog = catalog
def transaction(self) -> Transaction:
return Transaction(self)
def refresh(self) -> Table:
"""Refresh the current table metadata."""
fresh = self.catalog.load_table(self.identifier[1:])
self.metadata = fresh.metadata
self.io = fresh.io
self.metadata_location = fresh.metadata_location
return self
def name(self) -> Identifier:
"""Return the identifier of this table."""
return self.identifier
def scan(
self,
row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
selected_fields: Tuple[str, ...] = ("*",),
case_sensitive: bool = True,
snapshot_id: Optional[int] = None,
options: Properties = EMPTY_DICT,
limit: Optional[int] = None,
) -> DataScan:
return DataScan(
table=self,
row_filter=row_filter,
selected_fields=selected_fields,
case_sensitive=case_sensitive,
snapshot_id=snapshot_id,
options=options,
limit=limit,
)
def schema(self) -> Schema:
"""Return the schema for this table."""
return next(schema for schema in self.metadata.schemas if schema.schema_id == self.metadata.current_schema_id)
def schemas(self) -> Dict[int, Schema]:
"""Return a dict of the schema of this table."""
return {schema.schema_id: schema for schema in self.metadata.schemas}
def spec(self) -> PartitionSpec:
"""Return the partition spec of this table."""
return next(spec for spec in self.metadata.partition_specs if spec.spec_id == self.metadata.default_spec_id)
def specs(self) -> Dict[int, PartitionSpec]:
"""Return a dict the partition specs this table."""
return {spec.spec_id: spec for spec in self.metadata.partition_specs}
def sort_order(self) -> SortOrder:
"""Return the sort order of this table."""
return next(
sort_order for sort_order in self.metadata.sort_orders if sort_order.order_id == self.metadata.default_sort_order_id
)
def sort_orders(self) -> Dict[int, SortOrder]:
"""Return a dict of the sort orders of this table."""
return {sort_order.order_id: sort_order for sort_order in self.metadata.sort_orders}
@property
def properties(self) -> Dict[str, str]:
"""Properties of the table."""
return self.metadata.properties
def location(self) -> str:
"""Return the table's base location."""
return self.metadata.location
def current_snapshot(self) -> Optional[Snapshot]:
"""Get the current snapshot for this table, or None if there is no current snapshot."""
if snapshot_id := self.metadata.current_snapshot_id:
return self.snapshot_by_id(snapshot_id)
return None
def snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
"""Get the snapshot of this table with the given id, or None if there is no matching snapshot."""
try:
return next(snapshot for snapshot in self.metadata.snapshots if snapshot.snapshot_id == snapshot_id)
except StopIteration:
return None
def snapshot_by_name(self, name: str) -> Optional[Snapshot]:
"""Returns the snapshot referenced by the given name or null if no such reference exists."""
if ref := self.metadata.refs.get(name):
return self.snapshot_by_id(ref.snapshot_id)
return None
def history(self) -> List[SnapshotLogEntry]:
"""Get the snapshot history of this table."""
return self.metadata.snapshot_log
def __eq__(self, other: Any) -> bool:
"""Returns the equality of two instances of the Table class."""
return (
self.identifier == other.identifier
and self.metadata == other.metadata
and self.metadata_location == other.metadata_location
if isinstance(other, Table)
else False
)
class StaticTable(Table):
"""Load a table directly from a metadata file (i.e., without using a catalog)."""
def refresh(self) -> Table:
"""Refresh the current table metadata."""
raise NotImplementedError("To be implemented")
@classmethod
def from_metadata(cls, metadata_location: str, properties: Properties = EMPTY_DICT) -> StaticTable:
io = load_file_io(properties=properties, location=metadata_location)
file = io.new_input(metadata_location)
from pyiceberg.serializers import FromInputFile
metadata = FromInputFile.table_metadata(file)
from pyiceberg.catalog.noop import NoopCatalog
return cls(
identifier=("static-table", metadata_location),
metadata_location=metadata_location,
metadata=metadata,
io=load_file_io({**properties, **metadata.properties}),
catalog=NoopCatalog("static-table"),
)
def _parse_row_filter(expr: Union[str, BooleanExpression]) -> BooleanExpression:
"""Accepts an expression in the form of a BooleanExpression or a string.
In the case of a string, it will be converted into a unbound BooleanExpression.
Args:
expr: Expression as a BooleanExpression or a string.
Returns: An unbound BooleanExpression.
"""
return parser.parse(expr) if isinstance(expr, str) else expr
S = TypeVar("S", bound="TableScan", covariant=True)
class TableScan(ABC):
table: Table
row_filter: BooleanExpression
selected_fields: Tuple[str, ...]
case_sensitive: bool
snapshot_id: Optional[int]
options: Properties
limit: Optional[int]
def __init__(
self,
table: Table,
row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
selected_fields: Tuple[str, ...] = ("*",),
case_sensitive: bool = True,
snapshot_id: Optional[int] = None,
options: Properties = EMPTY_DICT,
limit: Optional[int] = None,
):
self.table = table
self.row_filter = _parse_row_filter(row_filter)
self.selected_fields = selected_fields
self.case_sensitive = case_sensitive
self.snapshot_id = snapshot_id
self.options = options
self.limit = limit
def snapshot(self) -> Optional[Snapshot]:
if self.snapshot_id:
return self.table.snapshot_by_id(self.snapshot_id)
return self.table.current_snapshot()
def projection(self) -> Schema:
snapshot_schema = self.table.schema()
if snapshot := self.snapshot():
if snapshot_schema_id := snapshot.schema_id:
snapshot_schema = self.table.schemas()[snapshot_schema_id]
if "*" in self.selected_fields:
return snapshot_schema
return snapshot_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive)
@abstractmethod
def plan_files(self) -> Iterable[ScanTask]:
...
@abstractmethod
def to_arrow(self) -> pa.Table:
...
@abstractmethod
def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
...
def update(self: S, **overrides: Any) -> S:
"""Creates a copy of this table scan with updated fields."""
return type(self)(**{**self.__dict__, **overrides})
def use_ref(self: S, name: str) -> S:
if self.snapshot_id:
raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}")
if snapshot := self.table.snapshot_by_name(name):
return self.update(snapshot_id=snapshot.snapshot_id)
raise ValueError(f"Cannot scan unknown ref={name}")
def select(self: S, *field_names: str) -> S:
if "*" in self.selected_fields:
return self.update(selected_fields=field_names)
return self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names))))
def filter(self: S, expr: Union[str, BooleanExpression]) -> S:
return self.update(row_filter=And(self.row_filter, _parse_row_filter(expr)))
def with_case_sensitive(self: S, case_sensitive: bool = True) -> S:
return self.update(case_sensitive=case_sensitive)
class ScanTask(ABC):
pass
@dataclass(init=False)
class FileScanTask(ScanTask):
file: DataFile
delete_files: Set[DataFile]
start: int
length: int
def __init__(
self,
data_file: DataFile,
delete_files: Optional[Set[DataFile]] = None,
start: Optional[int] = None,
length: Optional[int] = None,
) -> None:
self.file = data_file
self.delete_files = delete_files or set()
self.start = start or 0
self.length = length or data_file.file_size_in_bytes
def _open_manifest(
io: FileIO,
manifest: ManifestFile,
partition_filter: Callable[[DataFile], bool],
metrics_evaluator: Callable[[DataFile], bool],
) -> List[ManifestEntry]:
return [
manifest_entry
for manifest_entry in manifest.fetch_manifest_entry(io, discard_deleted=True)
if partition_filter(manifest_entry.data_file) and metrics_evaluator(manifest_entry.data_file)
]
def _min_data_file_sequence_number(manifests: List[ManifestFile]) -> int:
try:
return min(
manifest.min_sequence_number or INITIAL_SEQUENCE_NUMBER
for manifest in manifests
if manifest.content == ManifestContent.DATA
)
except ValueError:
# In case of an empty iterator
return INITIAL_SEQUENCE_NUMBER
def _match_deletes_to_datafile(data_entry: ManifestEntry, positional_delete_entries: SortedList[ManifestEntry]) -> Set[DataFile]:
"""This method will check if the delete file is relevant for the data file.
Using the column metrics to see if the filename is in the lower and upper bound.
Args:
data_entry (ManifestEntry): The manifest entry path of the datafile.
positional_delete_entries (List[ManifestEntry]): All the candidate positional deletes manifest entries.
Returns:
A set of files that are relevant for the data file.
"""
relevant_entries = positional_delete_entries[positional_delete_entries.bisect_right(data_entry) :]
if len(relevant_entries) > 0:
evaluator = _InclusiveMetricsEvaluator(POSITIONAL_DELETE_SCHEMA, EqualTo("file_path", data_entry.data_file.file_path))
return {
positional_delete_entry.data_file
for positional_delete_entry in relevant_entries
if evaluator.eval(positional_delete_entry.data_file)
}
else:
return set()
class DataScan(TableScan):
def __init__(
self,
table: Table,
row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
selected_fields: Tuple[str, ...] = ("*",),
case_sensitive: bool = True,
snapshot_id: Optional[int] = None,
options: Properties = EMPTY_DICT,
limit: Optional[int] = None,
):
super().__init__(table, row_filter, selected_fields, case_sensitive, snapshot_id, options, limit)
def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
project = inclusive_projection(self.table.schema(), self.table.specs()[spec_id])
return project(self.row_filter)
@cached_property
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
return KeyDefaultDict(self._build_partition_projection)
def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
spec = self.table.specs()[spec_id]
return visitors.manifest_evaluator(spec, self.table.schema(), self.partition_filters[spec_id], self.case_sensitive)
def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
spec = self.table.specs()[spec_id]
partition_type = spec.partition_type(self.table.schema())
partition_schema = Schema(*partition_type.fields)
partition_expr = self.partition_filters[spec_id]
evaluator = visitors.expression_evaluator(partition_schema, partition_expr, self.case_sensitive)
return lambda data_file: evaluator(data_file.partition)
def _check_sequence_number(self, min_data_sequence_number: int, manifest: ManifestFile) -> bool:
"""A helper function to make sure that no manifests are loaded that contain deletes that are older than the data.
Args:
min_data_sequence_number (int): The minimal sequence number.
manifest (ManifestFile): A ManifestFile that can be either data or deletes.
Returns:
Boolean indicating if it is either a data file, or a relevant delete file.
"""
return manifest.content == ManifestContent.DATA or (
# Not interested in deletes that are older than the data
manifest.content == ManifestContent.DELETES
and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_data_sequence_number
)
def plan_files(self) -> Iterable[FileScanTask]:
"""Plans the relevant files by filtering on the PartitionSpecs.
Returns:
List of FileScanTasks that contain both data and delete files.
"""
snapshot = self.snapshot()
if not snapshot:
return iter([])
io = self.table.io
# step 1: filter manifests using partition summaries
# the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id
manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
manifests = [
manifest_file
for manifest_file in snapshot.manifests(io)
if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
]
# step 2: filter the data files in each manifest
# this filter depends on the partition spec used to write the manifest file
partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)
metrics_evaluator = _InclusiveMetricsEvaluator(self.table.schema(), self.row_filter, self.case_sensitive).eval
min_data_sequence_number = _min_data_file_sequence_number(manifests)
data_entries: List[ManifestEntry] = []
positional_delete_entries = SortedList(key=lambda entry: entry.data_sequence_number or INITIAL_SEQUENCE_NUMBER)
with ThreadPoolExecutor() as executor:
for manifest_entry in chain(
*executor.map(
lambda args: _open_manifest(*args),
[
(
io,
manifest,
partition_evaluators[manifest.partition_spec_id],
metrics_evaluator,
)
for manifest in manifests
if self._check_sequence_number(min_data_sequence_number, manifest)
],
)
):
data_file = manifest_entry.data_file
if data_file.content == DataFileContent.DATA:
data_entries.append(manifest_entry)
elif data_file.content == DataFileContent.POSITION_DELETES:
positional_delete_entries.add(manifest_entry)
elif data_file.content == DataFileContent.EQUALITY_DELETES:
raise ValueError(
"PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568"
)
else:
raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")
return [
FileScanTask(
data_entry.data_file,
delete_files=_match_deletes_to_datafile(
data_entry,
positional_delete_entries,
),
)
for data_entry in data_entries
]
def to_arrow(self) -> pa.Table:
from pyiceberg.io.pyarrow import project_table
return project_table(
self.plan_files(),
self.table,
self.row_filter,
self.projection(),
case_sensitive=self.case_sensitive,
limit=self.limit,
)
def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
return self.to_arrow().to_pandas(**kwargs)
def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
import duckdb
con = connection or duckdb.connect(database=":memory:")
con.register(table_name, self.to_arrow())
return con
def to_ray(self) -> ray.data.dataset.Dataset:
import ray
return ray.data.from_arrow(self.to_arrow())