Skip to content

Commit

Permalink
Python: Add positional deletes
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Feb 8, 2023
1 parent 3a0686d commit 65f8419
Show file tree
Hide file tree
Showing 6 changed files with 1,482 additions and 1,356 deletions.
2,692 changes: 1,384 additions & 1,308 deletions python/poetry.lock

Large diffs are not rendered by default.

60 changes: 57 additions & 3 deletions python/pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Expand Down Expand Up @@ -72,6 +73,7 @@
OutputFile,
OutputStream,
)
from pyiceberg.manifest import DataFileContent
from pyiceberg.schema import (
PartnerAccessor,
Schema,
Expand Down Expand Up @@ -391,7 +393,7 @@ def visit_timestamp(self, _: TimestampType) -> pa.DataType:
return pa.timestamp(unit="us")

def visit_timestampz(self, _: TimestamptzType) -> pa.DataType:
return pa.timestamp(unit="us", tz="+00:00")
return pa.timestamp(unit="us", tz="UTC")

def visit_string(self, _: StringType) -> pa.DataType:
return pa.string()
Expand Down Expand Up @@ -470,13 +472,23 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression:
return boolean_expression_visit(expr, _ConvertToArrowExpression())


def _read_deletes(fs: FileSystem, file_path: str) -> Dict[str, pa.ChunkedArray]:
_, path = PyArrowFileIO.parse_location(file_path)
table = pq.read_table(
source=path, pre_buffer=True, buffer_size=8 * ONE_MEGABYTE, read_dictionary=["file_path"], filesystem=fs
)
table.unify_dictionaries()
return {file.as_py(): table.filter(pc.field("file_path") == file).column(1) for file in table.columns[0].chunks[0].dictionary}


def _file_to_table(
fs: FileSystem,
task: FileScanTask,
bound_row_filter: BooleanExpression,
projected_schema: Schema,
projected_field_ids: Set[int],
case_sensitive: bool,
positional_deletes: List[pa.ChunkedArray],
) -> pa.Table:
_, path = PyArrowFileIO.parse_location(task.file.file_path)

Expand Down Expand Up @@ -515,6 +527,14 @@ def _file_to_table(
if pyarrow_filter is not None:
arrow_table = arrow_table.filter(pyarrow_filter)

if len(positional_deletes) > 0:
# When there are positional deletes, create a filter mask
mask = [True] * len(arrow_table)
for buffer in positional_deletes:
for pos in buffer:
mask[pos.as_py()] = False
arrow_table = arrow_table.filter(mask)

return to_requested_schema(projected_schema, file_project_schema, arrow_table)


Expand Down Expand Up @@ -546,11 +566,45 @@ def project_table(
id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType))
}.union(extract_field_ids(bound_row_filter))

tasks_data_files: List[FileScanTask] = []
tasks_positional_deletes: List[FileScanTask] = []
for task in tasks:
if task.file.content == DataFileContent.DATA:
tasks_data_files.append(task)
elif task.file.content == DataFileContent.POSITION_DELETES:
tasks_positional_deletes.append(task)
elif task.file.content == DataFileContent.EQUALITY_DELETES:
raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
else:
raise ValueError(f"Unknown file content: {task.file.content}")

with ThreadPool() as pool:
positional_deletes_per_file: Dict[str, List[pa.ChunkedArray]] = {}
if tasks_positional_deletes:
# If there are any positional deletes, get those first
for delete_files in pool.starmap(
func=_read_deletes,
iterable=[(fs, task.file.file_path) for task in tasks_positional_deletes],
):
for file, buffer in delete_files.items():
positional_deletes_per_file[file] = positional_deletes_per_file.get(file, []) + [buffer]

tables = pool.starmap(
func=_file_to_table,
iterable=[(fs, task, bound_row_filter, projected_schema, projected_field_ids, case_sensitive) for task in tasks],
chunksize=None, # we could use this to control how to materialize the generator of tasks (we should also make the expression above lazy)
iterable=[
(
fs,
task,
bound_row_filter,
projected_schema,
projected_field_ids,
case_sensitive,
positional_deletes_per_file.get(task.file.file_path, []),
)
for task in tasks_data_files
],
chunksize=None,
# we could use this to control how to materialize the generator of tasks (we should also make the expression above lazy)
)

if len(tables) > 1:
Expand Down
21 changes: 2 additions & 19 deletions python/pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,7 @@
from pyiceberg.expressions.visitors import inclusive_projection
from pyiceberg.io import FileIO
from pyiceberg.io.pyarrow import project_table
from pyiceberg.manifest import (
DataFile,
ManifestContent,
ManifestFile,
files,
)
from pyiceberg.manifest import DataFile, ManifestFile, files
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.metadata import TableMetadata
Expand Down Expand Up @@ -282,21 +277,10 @@ def __init__(self, data_file: DataFile, start: Optional[int] = None, length: Opt
self.length = length or data_file.file_size_in_bytes


def _check_content(file: DataFile) -> DataFile:
try:
if file.content == ManifestContent.DELETES:
raise ValueError("PyIceberg does not support deletes: https://github.com/apache/iceberg/issues/6568")
return file
except AttributeError:
# If the attribute is not there, it is a V1 record
return file


def _open_manifest(io: FileIO, manifest: ManifestFile, partition_filter: Callable[[DataFile], bool]) -> List[FileScanTask]:
all_files = files(io.new_input(manifest.manifest_path))
matching_partition_files = filter(partition_filter, all_files)
matching_partition_data_files = map(_check_content, matching_partition_files)
return [FileScanTask(file) for file in matching_partition_data_files]
return [FileScanTask(file) for file in matching_partition_files]


class DataScan(TableScan["DataScan"]):
Expand Down Expand Up @@ -352,7 +336,6 @@ def plan_files(self) -> Iterator[FileScanTask]:

# step 2: filter the data files in each manifest
# this filter depends on the partition spec used to write the manifest file

partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)

with ThreadPool() as pool:
Expand Down
5 changes: 5 additions & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ coverage = { version = "^7.1.0", extras = ["toml"] }
requests-mock = "1.10.0"
moto = "^4.1.2"
typing-extensions = '4.4.0'
fastparquet = '2023.1.0'

[tool.poetry.scripts]
pyiceberg = "pyiceberg.cli.console:run"
Expand Down Expand Up @@ -239,5 +240,9 @@ ignore_missing_imports = true
module = "pyparsing.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "fastparquet.*"
ignore_missing_imports = true

[tool.coverage.run]
source = ['pyiceberg/']
36 changes: 33 additions & 3 deletions python/tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@
PyArrowFile,
PyArrowFileIO,
_ConvertToArrowSchema,
_read_deletes,
expression_to_pyarrow,
project_table,
schema_to_pyarrow,
)
from pyiceberg.manifest import DataFile, FileFormat
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema, visit
from pyiceberg.table import FileScanTask, Table
Expand Down Expand Up @@ -377,7 +378,7 @@ def test_timestamp_type_to_pyarrow() -> None:

def test_timestamptz_type_to_pyarrow() -> None:
iceberg_type = TimestamptzType()
assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.timestamp(unit="us", tz="+00:00")
assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.timestamp(unit="us", tz="UTC")


def test_string_type_to_pyarrow() -> None:
Expand Down Expand Up @@ -749,7 +750,14 @@ def project(
return project_table(
[
FileScanTask(
DataFile(file_path=file, file_format=FileFormat.PARQUET, partition={}, record_count=3, file_size_in_bytes=3)
DataFile(
content=DataFileContent.DATA,
file_path=file,
file_format=FileFormat.PARQUET,
partition={},
record_count=3,
file_size_in_bytes=3,
)
)
for file in files
],
Expand Down Expand Up @@ -1114,3 +1122,25 @@ def test_projection_filter_on_unknown_field(schema_int_str: Schema, file_int_str
_ = project(schema, [file_int_str], GreaterThan("unknown_field", "1"), schema_int_str)

assert "Could not find field with name unknown_field, case_sensitive=True" in str(exc_info.value)


@pytest.fixture
def deletes_file(tmp_path: str) -> str:
import pandas as pd
from fastparquet import write

path = "s3://bucket/default.db/table/data.parquet"
d = {"file_path": [path, path, path], "col": [19, 22, 25]}
df = pd.DataFrame(data=d)

deletes_file_path = f"{tmp_path}/deletes.parq"
write(deletes_file_path, df)

return deletes_file_path


def test_read_deletes(deletes_file: str) -> None:
# None filesystem will default to a local filesystem
deletes = _read_deletes(None, deletes_file)
assert set(deletes.keys()) == {"s3://bucket/default.db/table/data.parquet"}
assert list(deletes.values())[0] == pa.chunked_array([[19, 22, 25]])
24 changes: 1 addition & 23 deletions python/tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@
In,
)
from pyiceberg.io import load_file_io
from pyiceberg.manifest import DataFile, ManifestContent
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table, _check_content
from pyiceberg.table import Table
from pyiceberg.table.metadata import TableMetadataV2
from pyiceberg.table.snapshots import (
Operation,
Expand All @@ -44,7 +43,6 @@
SortOrder,
)
from pyiceberg.transforms import BucketTransform, IdentityTransform
from pyiceberg.typedef import Record
from pyiceberg.types import LongType, NestedField


Expand Down Expand Up @@ -245,23 +243,3 @@ def test_table_scan_projection_unknown_column(table: Table) -> None:
_ = scan.select("a").projection()

assert "Could not find column: 'a'" in str(exc_info.value)


def test_check_content_deletes() -> None:
with pytest.raises(ValueError) as exc_info:
_check_content(
DataFile(
content=ManifestContent.DELETES,
)
)
assert "PyIceberg does not support deletes: https://github.com/apache/iceberg/issues/6568" in str(exc_info.value)


def test_check_content_data() -> None:
manifest_file = DataFile(content=ManifestContent.DATA)
assert _check_content(manifest_file) == manifest_file


def test_check_content_missing_attr() -> None:
r = Record(*([None] * 15))
assert _check_content(r) == r # type: ignore

0 comments on commit 65f8419

Please sign in to comment.