Skip to content

Commit

Permalink
Optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Jun 14, 2023
1 parent 606e18c commit 4cc9ccc
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 39 deletions.
36 changes: 9 additions & 27 deletions python/pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import os
from abc import ABC, abstractmethod
from functools import lru_cache, singledispatch
from heapq import merge
from itertools import chain
from multiprocessing.pool import ThreadPool
from multiprocessing.sharedctypes import Synchronized
Expand All @@ -37,7 +36,6 @@
Any,
Callable,
Dict,
Generator,
Generic,
Iterable,
List,
Expand All @@ -50,6 +48,7 @@
)
from urllib.parse import urlparse

import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.dataset as ds
Expand Down Expand Up @@ -529,26 +528,12 @@ def _read_deletes(fs: FileSystem, data_file: DataFile) -> Dict[str, pa.ChunkedAr
}


def _create_positional_deletes_indices(positional_deletes: List[pa.ChunkedArray], fn_rows: Callable[[], int]) -> pa.Array:
# This is not ideal, looking for a native PyArrow implementation :)
# Ideally with uniqueness as well
# https://github.com/apache/arrow/issues/35748
sorted_deleted = merge(*positional_deletes, key=lambda e: e.as_py())

def generator() -> Generator[int, None, None]:
deleted_pos = next(sorted_deleted).as_py() # type: ignore
for pos in range(fn_rows()):
if deleted_pos == pos:
while deleted_pos == pos:
try:
deleted_pos = next(sorted_deleted).as_py() # type: ignore
except StopIteration:
deleted_pos = -1
else:
yield pos

# Filter on the positions
return pa.array(generator(), type=pa.int64())
def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], rows: int) -> pa.Array:
if len(positional_deletes) == 0:
all_chunks = positional_deletes[0]
else:
all_chunks = pa.chunked_array(chain(*[arr.chunks for arr in positional_deletes]))
return np.setdiff1d(np.arange(rows), all_chunks, assume_unique=False)


def pyarrow_to_schema(schema: pa.Schema) -> Schema:
Expand Down Expand Up @@ -782,10 +767,8 @@ def _task_to_table(
)

if positional_deletes:
# In the case of a mask, it is a bit awkward because we first
# need to go to a table to apply the bitwise mask, and then
# the table is warped into a dataset to apply the expression
indices = _create_positional_deletes_indices(positional_deletes, fragment.count_rows)
# Create the mask of indices that we're interested in
indices = _combine_positional_deletes(positional_deletes, fragment.count_rows())

if limit:
if pyarrow_filter is not None:
Expand All @@ -802,7 +785,6 @@ def _task_to_table(
# Apply the user filter
if pyarrow_filter is not None:
arrow_table = arrow_table.filter(pyarrow_filter)

else:
# If there are no deletes, we can just take the head
# and the user-filter is already applied
Expand Down
23 changes: 11 additions & 12 deletions python/pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,13 @@ def _min_data_file_sequence_number(manifests: List[ManifestFile]) -> int:


def _match_deletes_to_datafile(data_entry: ManifestEntry, positional_delete_entries: SortedList[ManifestEntry]) -> Set[DataFile]:
"""This method will check if the delete file is relevant for the data file
by using the column metrics to see if the filename is in the lower and upper bound
"""This method will check if the delete file is relevant for the data file.
Using the column metrics to see if the filename is in the lower and upper bound.
Args:
data_entry (ManifestEntry): The manifest entry path of the datafile
positional_delete_entries (List[ManifestEntry]): All the candidate positional deletes manifest entries
data_entry (ManifestEntry): The manifest entry path of the datafile.
positional_delete_entries (List[ManifestEntry]): All the candidate positional deletes manifest entries.
Returns:
A set of files that are relevant for the data file.
Expand Down Expand Up @@ -406,15 +407,14 @@ def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]
return lambda data_file: evaluator(data_file.partition)

def _check_sequence_number(self, min_data_sequence_number: int, manifest: ManifestFile) -> bool:
"""A helper function to make sure that no manifests are loaded that contain deletes
that are older than the data
"""A helper function to make sure that no manifests are loaded that contain deletes that are older than the data.
Args:
min_data_sequence_number (int): The minimal
manifest (ManifestFile): A ManifestFile that can be either data or deletes
min_data_sequence_number (int): The minimal sequence number.
manifest (ManifestFile): A ManifestFile that can be either data or deletes.
Returns:
Boolean indicating if it is either a data file, or a relevant delete file
Boolean indicating if it is either a data file, or a relevant delete file.
"""
return manifest.content == ManifestContent.DATA or (
# Not interested in deletes that are older than the data
Expand All @@ -423,12 +423,11 @@ def _check_sequence_number(self, min_data_sequence_number: int, manifest: Manife
)

def plan_files(self) -> Iterable[FileScanTask]:
"""Plans the relevant files by filtering on the PartitionSpecs
"""Plans the relevant files by filtering on the PartitionSpecs.
Returns:
List of FileScanTasks that contain both data and delete files
List of FileScanTasks that contain both data and delete files.
"""

snapshot = self.snapshot()
if not snapshot:
return iter([])
Expand Down
4 changes: 4 additions & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -243,5 +243,9 @@ ignore_missing_imports = true
module = "sortedcontainers.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "numpy.*"
ignore_missing_imports = true

[tool.coverage.run]
source = ['pyiceberg/']

0 comments on commit 4cc9ccc

Please sign in to comment.