Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions dask_sql/physical/rel/logical/limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from dask_sql.datacontainer import DataContainer
from dask_sql.physical.rel.base import BaseRelPlugin
from dask_sql.physical.rex import RexConverter
from dask_sql.physical.utils.map import map_on_partition_index

if TYPE_CHECKING:
import dask_sql
Expand Down Expand Up @@ -80,17 +79,17 @@ def _apply_offset(self, df: dd.DataFrame, offset: int, end: int) -> dd.DataFrame
# its pandas representation at this point and we can calculate the cumsum
# (which is not possible on the dask object). Recalculating it should not cost
# us much, as we assume the number of partitions is rather small.
def select_from_to(df, partition_index, partition_borders):
partition_borders = partition_borders.cumsum().to_dict()
def select_from_to(index, partition, partition_borders):
partition_borders = partition_borders.compute().cumsum().to_dict()
Copy link
Collaborator Author

@charlesbluca charlesbluca May 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One difference between from_map and from_delayed is that Dask objects would already be computed by the time the function is called - using from_map, we would need to manually compute Dask objects either before calling from_map or within the function being called by from_map (which I opted for here).

Is this an intended difference for from_map?

this_partition_border_left = (
partition_borders[partition_index - 1] if partition_index > 0 else 0
partition_borders[index - 1] if index > 0 else 0
)
this_partition_border_right = partition_borders[partition_index]
this_partition_border_right = partition_borders[index]

if (end and end < this_partition_border_left) or (
offset and offset >= this_partition_border_right
):
return df.iloc[0:0]
return partition.compute().iloc[0:0]

from_index = max(offset - this_partition_border_left, 0) if offset else 0
to_index = (
Expand All @@ -99,10 +98,14 @@ def select_from_to(df, partition_index, partition_borders):
else this_partition_border_right
) - this_partition_border_left

return df.iloc[from_index:to_index]
return partition.compute().iloc[from_index:to_index]

# (b) Now we just need to apply the function on every partition
# We do this via the delayed interface, which seems the easiest one.
return map_on_partition_index(
df, select_from_to, partition_borders, meta=df._meta
return dd.from_map(
select_from_to,
list(range(df.npartitions)),
list(df.partitions),
args=(partition_borders,),
meta=df._meta,
)
17 changes: 0 additions & 17 deletions dask_sql/physical/utils/map.py

This file was deleted.