diff --git a/python/ray/data/_internal/pandas_block.py b/python/ray/data/_internal/pandas_block.py index 5bbaff67bb75..03c26781dd1c 100644 --- a/python/ray/data/_internal/pandas_block.py +++ b/python/ray/data/_internal/pandas_block.py @@ -429,10 +429,17 @@ def iter_groups() -> Iterator[Tuple[KeyType, Block]]: if next_row is None: next_row = next(iter) next_key = next_row[key] + if type(next_row[key]) is _pandas._libs.missing.NAType: + next_row = next(iter) + continue + while np.all(next_row[key] == next_key): end += 1 try: next_row = next(iter) + if type(next_row[key]) is _pandas._libs.missing.NAType: + next_row = next(iter) + continue except StopIteration: next_row = None break @@ -520,17 +527,23 @@ def aggregate_combined_blocks( stats = BlockExecStats.builder() keys = key if isinstance(key, list) else [key] + key_fn = ( (lambda r: tuple(r[r._row.columns[: len(keys)]])) if key is not None else (lambda r: (0,)) ) - iter = heapq.merge( - *[ - PandasBlockAccessor(block).iter_rows(public_row_format=False) - for block in blocks - ], + iterables = [ + filter( + lambda r: tuple(r[r._row.columns[: len(keys)]])[0] is not None, + PandasBlockAccessor(block).iter_rows(public_row_format=False), + ) + for block in blocks + ] + + iters = heapq.merge( + *iterables, key=key_fn, ) next_row = None @@ -538,19 +551,20 @@ def aggregate_combined_blocks( while True: try: if next_row is None: - next_row = next(iter) + next_row = next(iters) + next_keys = key_fn(next_row) next_key_names = ( next_row._row.columns[: len(keys)] if key is not None else None ) def gen(): - nonlocal iter + nonlocal iters nonlocal next_row while key_fn(next_row) == next_keys: yield next_row try: - next_row = next(iter) + next_row = next(iters) except StopIteration: next_row = None break diff --git a/python/ray/data/_internal/planner/exchange/sort_task_spec.py b/python/ray/data/_internal/planner/exchange/sort_task_spec.py index 0a2910f2fa34..5124870cc490 100644 --- a/python/ray/data/_internal/planner/exchange/sort_task_spec.py +++ b/python/ray/data/_internal/planner/exchange/sort_task_spec.py @@ -1,6 +1,7 @@ from typing import List, Tuple, TypeVar, Union import numpy as np +import pandas as pd from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder from ray.data._internal.planner.exchange.interfaces import ExchangeTaskSpec @@ -105,6 +106,18 @@ def sample_boundaries( samples = builder.build() sample_dict = BlockAccessor.for_block(samples).to_numpy(columns=columns) + + for k, v in sample_dict.items(): + if v.dtype == object: + sample_dict[k] = np.array( + [ + i + for i in v + if i is not None and type(i) is not pd._libs.missing.NAType + ], + dtype=object, + ) + # Compute sorted indices of the samples. In np.lexsort last key is the # primary key hence have to reverse the order. indices = np.lexsort(list(reversed(list(sample_dict.values())))) diff --git a/python/ray/data/_internal/util.py b/python/ray/data/_internal/util.py index e07cbc238ec2..e0b4384a904f 100644 --- a/python/ray/data/_internal/util.py +++ b/python/ray/data/_internal/util.py @@ -51,11 +51,9 @@ _LOCAL_SCHEME = "local" _EXAMPLE_SCHEME = "example" - LazyModule = Union[None, bool, ModuleType] _pyarrow_dataset: LazyModule = None - cached_cluster_resources = {} cluster_resources_last_fetch_time = 0 CLUSTER_RESOURCES_FETCH_INTERVAL_SECONDS = 10 @@ -735,6 +733,7 @@ def find_partition_index( return right col_name = columns[i] col_vals = table[col_name].to_numpy()[left:right] + col_vals = [x for x in col_vals if x is not None] desired_val = desired[i] prevleft = left @@ -990,7 +989,8 @@ def call_with_retry( # Retry with binary expoential backoff with random jitter. backoff = min((2 ** (i + 1)) * random.random(), max_backoff_s) logger.debug( - f"Retrying {i+1} attempts to {description} after {backoff} seconds." + f"Retrying {i + 1} attempts to {description} after " + f"{backoff} seconds." ) time.sleep(backoff) else: diff --git a/python/ray/data/tests/test_pandas.py b/python/ray/data/tests/test_pandas.py index ec27c296a296..8e1205fb88d4 100644 --- a/python/ray/data/tests/test_pandas.py +++ b/python/ray/data/tests/test_pandas.py @@ -180,6 +180,25 @@ def test_read_pandas_data_array_column(ray_start_regular_shared): assert all(row["array"] == [1, 1, 1]) +def test_ray_ds_unique(ray_start_regular_shared): + items = [1, 2, 3, 2, 3] + ds = ray.data.from_items(items) + assert ds.unique("item") == [1, 2, 3] + + +def test_ray_ds_unique_with_none(ray_start_regular_shared): + items = [1, 2, 3, 2, 3, None, 4, 4, 5, 6, 7, 8, 9, 10, 11, 12, None, None] + ds = ray.data.from_items(items) + assert ds.unique("item") == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] + + +def test_ray_ds_unique_with_none_from_pandas(ray_start_regular_shared): + df = pd.DataFrame({"col": [1, 2, 3, None]}, dtype="Int64") + ds = ray.data.from_pandas(df) + # ds.unique("col") + assert ds.unique("col") == [1, 2, 3] + + if __name__ == "__main__": import sys