Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Add positional deletes #6775

Merged
merged 56 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
8b293cb
Python: Add positional deletes
Fokko Feb 8, 2023
18a3204
Merge branch 'master' of github.com:apache/iceberg into fd-positional…
Fokko Feb 17, 2023
b013b5a
Revert unrelated changes
Fokko Feb 17, 2023
3898879
Merge branch 'master' of github.com:apache/iceberg into fd-positional…
Fokko Feb 17, 2023
e35ecb3
Update based on comments
Fokko Feb 20, 2023
a0e5f0b
Fix false annotation
Fokko Feb 20, 2023
aeea029
WIP
Fokko Feb 22, 2023
a792764
WIP
Fokko Feb 24, 2023
3fd53c9
Cleanup
Fokko Feb 27, 2023
fcead79
Cleanup
Fokko Feb 27, 2023
97d84bf
Cleanup
Fokko Feb 27, 2023
0a2537f
Cleanup
Fokko Feb 27, 2023
9281caa
Cleanup
Fokko Feb 27, 2023
8f3b777
Cleanup
Fokko Feb 27, 2023
bfb4837
Cleanup
Fokko Feb 27, 2023
82011d5
Merge branch 'master' of github.com:apache/iceberg into fd-positional…
Fokko Feb 28, 2023
01fa072
WIP
Fokko Feb 28, 2023
8212b3d
Merge branch 'master' of github.com:apache/iceberg into fd-positional…
Fokko Feb 28, 2023
5ddb394
Cleanup
Fokko Feb 28, 2023
4769001
Core: Allow dropping columns referenced in old sort orders
Fokko Mar 2, 2023
6af113b
WIP
Fokko Mar 2, 2023
a98e0a7
WIP
Fokko Mar 2, 2023
735d689
Revert "Core: Allow dropping columns referenced in old sort orders"
Fokko Mar 2, 2023
5cbb866
Working again
Fokko Mar 3, 2023
d56ceb6
Merge branch 'master' of github.com:apache/iceberg into fd-positional…
Fokko Mar 8, 2023
eb72088
Optimize using an iterator that takes multiple arrays
Fokko Mar 8, 2023
b6cb3a6
Merge branch 'master' of github.com:apache/iceberg into fd-positional…
Fokko Mar 12, 2023
b9ac9ed
Merge branch 'master' into fd-positional-deletes
Fokko Apr 10, 2023
1e27f70
Make the linters happy
Fokko Apr 10, 2023
7514a22
Merge branch 'master' of github.com:apache/iceberg into fd-positional…
Fokko Apr 11, 2023
5ca6405
Merge branch 'master' of github.com:apache/iceberg into fd-positional…
Fokko Apr 24, 2023
9e826ff
Fix the PR in combination with the limit
Fokko Apr 24, 2023
bee614f
Merge branch 'master' of github.com:apache/iceberg into fd-positional…
Fokko Apr 25, 2023
7c1a30c
Merge branch 'master' of github.com:apache/iceberg into fd-positional…
Fokko May 1, 2023
169aad3
Merge branch 'master' of github.com:apache/iceberg into fd-positional…
Fokko May 2, 2023
ac24f88
Use `take()` instead
Fokko May 2, 2023
e9ce645
Replace `_OrderedChunkedArrayConsumer` with `heapq.merge`
Fokko May 3, 2023
31c4309
Oops, luckily we have tests
Fokko May 3, 2023
3ea8099
Moar optimizations
Fokko May 10, 2023
43aa76d
Update python/pyiceberg/io/pyarrow.py
Fokko May 12, 2023
2a65342
Thanks Ryan!
Fokko May 22, 2023
0fc0021
Merge branch 'fd-positional-deletes' of github.com:Fokko/incubator-ic…
Fokko May 22, 2023
f87bb0f
Merge branch 'master' of github.com:apache/iceberg into fd-positional…
Fokko May 22, 2023
16c251e
WIP
Fokko May 22, 2023
db5c336
Merge branch 'master' of github.com:apache/iceberg into fd-positional…
Fokko May 22, 2023
ea09514
Add moar tests
Fokko May 22, 2023
5f1538a
WIP
Fokko May 22, 2023
bc770ec
Merge branch 'master' of github.com:apache/iceberg into fd-positional…
Fokko May 22, 2023
ce8dc22
WIP
Fokko May 24, 2023
8b4890d
Cleanup
Fokko May 24, 2023
d088480
Merge branch 'master' of github.com:apache/iceberg into fd-positional…
Fokko Jun 5, 2023
2e47f24
WIP
Fokko Jun 5, 2023
2ecb81b
Merge branch 'master' of github.com:apache/iceberg into fd-positional…
Fokko Jun 6, 2023
606e18c
Merge branch 'master' of github.com:apache/iceberg into fd-positional…
Fokko Jun 14, 2023
60c83b6
Optimizations
Fokko Jun 14, 2023
f215353
Merge branch 'master' of github.com:apache/iceberg into fd-positional…
Fokko Jun 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 59 additions & 3 deletions python/pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Expand Down Expand Up @@ -72,6 +73,7 @@
OutputFile,
OutputStream,
)
from pyiceberg.manifest import DataFileContent
from pyiceberg.schema import (
PartnerAccessor,
Schema,
Expand Down Expand Up @@ -391,7 +393,7 @@ def visit_timestamp(self, _: TimestampType) -> pa.DataType:
return pa.timestamp(unit="us")

def visit_timestampz(self, _: TimestamptzType) -> pa.DataType:
return pa.timestamp(unit="us", tz="+00:00")
return pa.timestamp(unit="us", tz="UTC")

def visit_string(self, _: StringType) -> pa.DataType:
return pa.string()
Expand Down Expand Up @@ -470,13 +472,25 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression:
return boolean_expression_visit(expr, _ConvertToArrowExpression())


def _read_deletes(fs: FileSystem, file_path: str) -> Dict[str, pa.ChunkedArray]:
_, path = PyArrowFileIO.parse_location(file_path)
table = pq.read_table(
source=path, pre_buffer=True, buffer_size=8 * ONE_MEGABYTE, read_dictionary=["file_path"], filesystem=fs
)
table.unify_dictionaries()
Fokko marked this conversation as resolved.
Show resolved Hide resolved
return {
file.as_py(): table.filter(pc.field("file_path") == file).column("pos") for file in table.columns[0].chunks[0].dictionary
Fokko marked this conversation as resolved.
Show resolved Hide resolved
}


def _file_to_table(
fs: FileSystem,
task: FileScanTask,
bound_row_filter: BooleanExpression,
projected_schema: Schema,
projected_field_ids: Set[int],
case_sensitive: bool,
positional_deletes: List[pa.ChunkedArray],
) -> pa.Table:
_, path = PyArrowFileIO.parse_location(task.file.file_path)

Expand Down Expand Up @@ -515,6 +529,14 @@ def _file_to_table(
if pyarrow_filter is not None:
arrow_table = arrow_table.filter(pyarrow_filter)

if len(positional_deletes) > 0:
# When there are positional deletes, create a filter mask
mask = [True] * len(arrow_table)
for buffer in positional_deletes:
for pos in buffer:
mask[pos.as_py()] = False
arrow_table = arrow_table.filter(mask)
Fokko marked this conversation as resolved.
Show resolved Hide resolved

return to_requested_schema(projected_schema, file_project_schema, arrow_table)


Expand Down Expand Up @@ -546,11 +568,45 @@ def project_table(
id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType))
}.union(extract_field_ids(bound_row_filter))

tasks_data_files: List[FileScanTask] = []
tasks_positional_deletes: List[FileScanTask] = []
for task in tasks:
Fokko marked this conversation as resolved.
Show resolved Hide resolved
if task.file.content == DataFileContent.DATA:
tasks_data_files.append(task)
elif task.file.content == DataFileContent.POSITION_DELETES:
tasks_positional_deletes.append(task)
elif task.file.content == DataFileContent.EQUALITY_DELETES:
raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
else:
raise ValueError(f"Unknown file content: {task.file.content}")

with ThreadPool() as pool:
positional_deletes_per_file: Dict[str, List[pa.ChunkedArray]] = {}
if tasks_positional_deletes:
# If there are any positional deletes, get those first
for delete_files in pool.starmap(
func=_read_deletes,
iterable=[(fs, task.file.file_path) for task in tasks_positional_deletes],
):
for file, buffer in delete_files.items():
positional_deletes_per_file[file] = positional_deletes_per_file.get(file, []) + [buffer]
Fokko marked this conversation as resolved.
Show resolved Hide resolved

tables = pool.starmap(
func=_file_to_table,
iterable=[(fs, task, bound_row_filter, projected_schema, projected_field_ids, case_sensitive) for task in tasks],
chunksize=None, # we could use this to control how to materialize the generator of tasks (we should also make the expression above lazy)
iterable=[
(
fs,
task,
bound_row_filter,
projected_schema,
projected_field_ids,
case_sensitive,
positional_deletes_per_file.get(task.file.file_path, []),
)
for task in tasks_data_files
],
chunksize=None,
# we could use this to control how to materialize the generator of tasks (we should also make the expression above lazy)
)

if len(tables) > 1:
Expand Down
21 changes: 2 additions & 19 deletions python/pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,7 @@
)
from pyiceberg.expressions.visitors import inclusive_projection
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.manifest import (
DataFile,
ManifestContent,
ManifestFile,
files,
)
from pyiceberg.manifest import DataFile, ManifestFile, files
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.metadata import TableMetadata
Expand Down Expand Up @@ -304,21 +299,10 @@ def __init__(self, data_file: DataFile, start: Optional[int] = None, length: Opt
self.length = length or data_file.file_size_in_bytes


def _check_content(file: DataFile) -> DataFile:
try:
if file.content == ManifestContent.DELETES:
raise ValueError("PyIceberg does not support deletes: https://github.com/apache/iceberg/issues/6568")
return file
except AttributeError:
# If the attribute is not there, it is a V1 record
return file


def _open_manifest(io: FileIO, manifest: ManifestFile, partition_filter: Callable[[DataFile], bool]) -> List[FileScanTask]:
all_files = files(io.new_input(manifest.manifest_path))
matching_partition_files = filter(partition_filter, all_files)
matching_partition_data_files = map(_check_content, matching_partition_files)
return [FileScanTask(file) for file in matching_partition_data_files]
return [FileScanTask(file) for file in matching_partition_files]


class DataScan(TableScan):
Expand Down Expand Up @@ -374,7 +358,6 @@ def plan_files(self) -> Iterator[FileScanTask]:

# step 2: filter the data files in each manifest
# this filter depends on the partition spec used to write the manifest file

Fokko marked this conversation as resolved.
Show resolved Hide resolved
partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)

with ThreadPool() as pool:
Expand Down
32 changes: 29 additions & 3 deletions python/tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@
PyArrowFile,
PyArrowFileIO,
_ConvertToArrowSchema,
_read_deletes,
expression_to_pyarrow,
project_table,
schema_to_pyarrow,
)
from pyiceberg.manifest import DataFile, FileFormat
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema, visit
from pyiceberg.table import FileScanTask, Table
Expand Down Expand Up @@ -377,7 +378,7 @@ def test_timestamp_type_to_pyarrow() -> None:

def test_timestamptz_type_to_pyarrow() -> None:
iceberg_type = TimestamptzType()
assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.timestamp(unit="us", tz="+00:00")
assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.timestamp(unit="us", tz="UTC")
rdblue marked this conversation as resolved.
Show resolved Hide resolved


def test_string_type_to_pyarrow() -> None:
Expand Down Expand Up @@ -749,7 +750,14 @@ def project(
return project_table(
[
FileScanTask(
DataFile(file_path=file, file_format=FileFormat.PARQUET, partition={}, record_count=3, file_size_in_bytes=3)
DataFile(
content=DataFileContent.DATA,
file_path=file,
file_format=FileFormat.PARQUET,
partition={},
record_count=3,
file_size_in_bytes=3,
)
)
for file in files
],
Expand Down Expand Up @@ -1114,3 +1122,21 @@ def test_projection_filter_on_unknown_field(schema_int_str: Schema, file_int_str
_ = project(schema, [file_int_str], GreaterThan("unknown_field", "1"), schema_int_str)

assert "Could not find field with name unknown_field, case_sensitive=True" in str(exc_info.value)


@pytest.fixture
def deletes_file(tmp_path: str) -> str:
path = "s3://bucket/default.db/table/data.parquet"
table = pa.table({"file_path": [path, path, path], "pos": [19, 22, 25]})

deletes_file_path = f"{tmp_path}/deletes.parquet"
pq.write_table(table, deletes_file_path)

return deletes_file_path


def test_read_deletes(deletes_file: str) -> None:
# None filesystem will default to a local filesystem
deletes = _read_deletes(None, deletes_file)
assert set(deletes.keys()) == {"s3://bucket/default.db/table/data.parquet"}
assert list(deletes.values())[0] == pa.chunked_array([[19, 22, 25]])
24 changes: 1 addition & 23 deletions python/tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@
In,
)
from pyiceberg.io import PY_IO_IMPL, load_file_io
from pyiceberg.manifest import DataFile, ManifestContent
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import StaticTable, Table, _check_content
from pyiceberg.table import StaticTable, Table
from pyiceberg.table.metadata import TableMetadataV2
from pyiceberg.table.snapshots import (
Operation,
Expand All @@ -44,7 +43,6 @@
SortOrder,
)
from pyiceberg.transforms import BucketTransform, IdentityTransform
from pyiceberg.typedef import Record
from pyiceberg.types import LongType, NestedField


Expand Down Expand Up @@ -252,26 +250,6 @@ def test_table_scan_projection_unknown_column(table: Table) -> None:
assert "Could not find column: 'a'" in str(exc_info.value)


def test_check_content_deletes() -> None:
with pytest.raises(ValueError) as exc_info:
_check_content(
DataFile(
content=ManifestContent.DELETES,
)
)
assert "PyIceberg does not support deletes: https://github.com/apache/iceberg/issues/6568" in str(exc_info.value)


def test_check_content_data() -> None:
manifest_file = DataFile(content=ManifestContent.DATA)
assert _check_content(manifest_file) == manifest_file


def test_check_content_missing_attr() -> None:
r = Record(*([None] * 15))
assert _check_content(r) == r # type: ignore


def test_static_table_same_as_table(table: Table, static_table: StaticTable) -> None:
assert isinstance(static_table, Table)
assert static_table.metadata == table.metadata
Expand Down