Skip to content

[Data] map-groupby-aggregate seems to hang indefinitely on scheduler #57979

@HenryL27

Description

@HenryL27

What happened + What you expected to happen

I was writing some clustering code using ray (pared-down repro script below), and ray hangs on it.

When I run this on ray 2.50 (1-laptop cluster), it seems to hang forever with messages like:

(autoscaler +25s) Warning: The following resource request cannot be scheduled right now: {'CPU': 0.25, 'memory': 1073741824.0}. This is likely due to all cluster resources being claimed by actors. Consider creating fewer actors or adding more nodes to this Ray cluster

Which seems like an awful lot of memory to be requesting. I also have this plan log thingy

- HashAggregate(key_columns=('cluster',), num_partitions=200): Tasks: 15 [backpressured:tasks]; Actors: 0; Queued blocks: 67; Resources: 10.0 CPU, 0.0B object store: : 0.00 row [01:05, ? row/s]                                                                                                                                                                                         *- Shuffle: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 118/118 [01:05<00:00, 1.80 row/s]
  *- Aggregation: : 0.00 row [01:05, ? row/s]

Where it seems like 200 is the wrong number of partitions to default to? And if each partition is trying to create a HashAggregate actor using 10GB of ram I can see how that would break!

Result of this is the job hangs forever, asking me to connect more nodes to my cluster (but I'm just running on my laptop)

Notes: in 2.47 this doesn't seem to be an issue; also if I remove the map (set the cluster value when constructing the rows) it also seems to be ok; also if I set num_partitions in the groupby to something reasonable myself, I'm ok.

Versions / Dependencies

ray: 2.50.0
python: 3.12.0 (CPython)
os: Macos 26.0.1 (M2 Pro, 16GB Ram)

Reproduction script

import numpy as np
from ray.data import from_items
from ray.data.aggregate import AggregateFn
import random


def pc(row):
    row["cluster"] = random.randint(0, 10)
    return row


update_centroids = AggregateFn(
    init=lambda v: {"v": [0] * 10, "c": 0},
    accumulate_row=lambda a, row: {"v": [x + y for x, y in zip(a["v"], row["vector"])], "c": a["c"] + 1},
    merge=lambda a1, a2: {"v": [x + y for x, y in zip(a1["v"], a2["v"])], "c": a1["c"] + a2["c"]},
    name="centroids",
)


items = [{"vector": np.random.uniform(0, 1, (10)), "cluster": random.randint(0, 10)} for _ in range(200)]
ds = from_items(items)

initial_centroids = [i["vector"] for i in items[:10]]

ds.map(pc).groupby("cluster").aggregate(update_centroids).take()

Issue Severity

High: It blocks me from completing my task.

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions