Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions python/ray/data/_internal/pandas_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,17 @@ def iter_groups() -> Iterator[Tuple[KeyType, Block]]:
if next_row is None:
next_row = next(iter)
next_key = next_row[key]
if type(next_row[key]) is _pandas._libs.missing.NAType:
next_row = next(iter)
continue

while np.all(next_row[key] == next_key):
end += 1
try:
next_row = next(iter)
if type(next_row[key]) is _pandas._libs.missing.NAType:
next_row = next(iter)
continue
except StopIteration:
next_row = None
break
Expand Down Expand Up @@ -520,37 +527,44 @@ def aggregate_combined_blocks(

stats = BlockExecStats.builder()
keys = key if isinstance(key, list) else [key]

key_fn = (
(lambda r: tuple(r[r._row.columns[: len(keys)]]))
if key is not None
else (lambda r: (0,))
)

iter = heapq.merge(
*[
PandasBlockAccessor(block).iter_rows(public_row_format=False)
for block in blocks
],
iterables = [
filter(
lambda r: tuple(r[r._row.columns[: len(keys)]])[0] is not None,
PandasBlockAccessor(block).iter_rows(public_row_format=False),
)
for block in blocks
]

iters = heapq.merge(
*iterables,
key=key_fn,
)
next_row = None
builder = PandasBlockBuilder()
while True:
try:
if next_row is None:
next_row = next(iter)
next_row = next(iters)

next_keys = key_fn(next_row)
next_key_names = (
next_row._row.columns[: len(keys)] if key is not None else None
)

def gen():
nonlocal iter
nonlocal iters
nonlocal next_row
while key_fn(next_row) == next_keys:
yield next_row
try:
next_row = next(iter)
next_row = next(iters)
except StopIteration:
next_row = None
break
Expand Down
13 changes: 13 additions & 0 deletions python/ray/data/_internal/planner/exchange/sort_task_spec.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import List, Tuple, TypeVar, Union

import numpy as np
import pandas as pd

from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.planner.exchange.interfaces import ExchangeTaskSpec
Expand Down Expand Up @@ -105,6 +106,18 @@ def sample_boundaries(
samples = builder.build()

sample_dict = BlockAccessor.for_block(samples).to_numpy(columns=columns)

for k, v in sample_dict.items():
if v.dtype == object:
sample_dict[k] = np.array(
[
i
for i in v
if i is not None and type(i) is not pd._libs.missing.NAType
],
dtype=object,
)

# Compute sorted indices of the samples. In np.lexsort last key is the
# primary key hence have to reverse the order.
indices = np.lexsort(list(reversed(list(sample_dict.values()))))
Expand Down
6 changes: 3 additions & 3 deletions python/ray/data/_internal/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,9 @@
_LOCAL_SCHEME = "local"
_EXAMPLE_SCHEME = "example"


LazyModule = Union[None, bool, ModuleType]
_pyarrow_dataset: LazyModule = None


cached_cluster_resources = {}
cluster_resources_last_fetch_time = 0
CLUSTER_RESOURCES_FETCH_INTERVAL_SECONDS = 10
Expand Down Expand Up @@ -735,6 +733,7 @@ def find_partition_index(
return right
col_name = columns[i]
col_vals = table[col_name].to_numpy()[left:right]
col_vals = [x for x in col_vals if x is not None]
desired_val = desired[i]

prevleft = left
Expand Down Expand Up @@ -990,7 +989,8 @@ def call_with_retry(
# Retry with binary expoential backoff with random jitter.
backoff = min((2 ** (i + 1)) * random.random(), max_backoff_s)
logger.debug(
f"Retrying {i+1} attempts to {description} after {backoff} seconds."
f"Retrying {i + 1} attempts to {description} after "
f"{backoff} seconds."
)
time.sleep(backoff)
else:
Expand Down
19 changes: 19 additions & 0 deletions python/ray/data/tests/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,25 @@ def test_read_pandas_data_array_column(ray_start_regular_shared):
assert all(row["array"] == [1, 1, 1])


def test_ray_ds_unique(ray_start_regular_shared):
items = [1, 2, 3, 2, 3]
ds = ray.data.from_items(items)
assert ds.unique("item") == [1, 2, 3]


def test_ray_ds_unique_with_none(ray_start_regular_shared):
items = [1, 2, 3, 2, 3, None, 4, 4, 5, 6, 7, 8, 9, 10, 11, 12, None, None]
ds = ray.data.from_items(items)
assert ds.unique("item") == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]


def test_ray_ds_unique_with_none_from_pandas(ray_start_regular_shared):
df = pd.DataFrame({"col": [1, 2, 3, None]}, dtype="Int64")
ds = ray.data.from_pandas(df)
# ds.unique("col")
assert ds.unique("col") == [1, 2, 3]


if __name__ == "__main__":
import sys

Expand Down