Skip to content

[Data] Sorting on grouped data not working with pandas format #46748

@tanzeyy

Description

@tanzeyy

What happened + What you expected to happen

I am working on handling grouped data and sorting on certain column, and I want to use pandas operations to process the grouped data. However, it seems sort operation doesn't work with map_groups(..., batch_format='pandas').

Reproduction process:

import ray

data = [
    {"tag": "kitty", "number": 12},
    {"tag": "karen", "number": 1},
    {"tag": "karen", "number": 3},
    {"tag": "times", "number": 2},
]

ds = ray.data.from_items(data)

results = (
    ds.groupby("tag")
    .map_groups(lambda g: g, batch_format="pandas")
    .sort("number", descending=True)
    .to_pandas()
)
print(results)

Run above script will produce AttributeError: 'DataFrame' object has no attribute 'num_rows':

...
ray.exceptions.RayTaskError(AttributeError): ray::reduce() (pid=1919807, ip=10.208.49.99)
  File "/.../venvs/dev/lib/python3.10/site-packages/ray/data/_internal/planner/exchange/sort_task_spec.py", line 143, in reduce
    return BlockAccessor.for_block(mapper_outputs[0]).merge_sorted_blocks(
  File "/.../venvs/dev/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 554, in merge_sorted_blocks
    blocks = [b for b in blocks if b.num_rows > 0]
  File "/.../venvs/dev/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 554, in <listcomp>
    blocks = [b for b in blocks if b.num_rows > 0]
  File "/.../venvs/dev/lib/python3.10/site-packages/pandas/core/generic.py", line 6299, in __getattr__
    return object.__getattribute__(self, name)
AttributeError: 'DataFrame' object has no attribute 'num_rows'

After changing batch_format to pyarrow, above code would work properly:
image

Moreover, remove sort and it will also work:

image

Versions / Dependencies

  • Python 3.10.12
  • ray 2.24.0

Reproduction script

import ray

data = [
    {"tag": "kitty", "number": 12},
    {"tag": "karen", "number": 1},
    {"tag": "karen", "number": 3},
    {"tag": "times", "number": 2},
]

ds = ray.data.from_items(data)

results = (
    ds.groupby("tag")
    .map_groups(lambda g: g, batch_format="pandas")
    .sort("number", descending=True)
    .to_pandas()
)
print(results)

Issue Severity

Medium: It is a significant difficulty but I can work around it.

Metadata

Metadata

Assignees

Labels

P1Issue that should be fixed within a few weeksbugSomething that is supposed to be working; but isn'tdataRay Data-related issuesgood-first-issueGreat starter issue for someone just starting to contribute to Ray

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions