Skip to content

Conversation

@owenowenisme
Copy link
Member

@owenowenisme owenowenisme commented Nov 19, 2025

Description

Fix aggregator actors OOM for SF‑100 map_groups on an autoscaling cluster.

Main problems:

  • Even with “scheduling_strategy”: “SPREAD”, autoscaling causes greedy placement on the same nodes (node fills → autoscale → schedule on new node → autoscale), so actors cluster on the first few nodes.
  • Available memory calculations didn’t exclude the Object Store, a known issue ([Core] Actor/Task Memory resource allocation should be separate from Object Store allocation  #49344).
  • Aggregator resource miscalculation: we observed ≈1.7 GiB spikes, but only allocated 1.13 GiB per actor.Previously, aggregator memory was computed as dataset_size/num_aggregators + dataset_size/num_partitions (input + output).
    This is incorrect because partition size is determined by num_partitions, not num_aggregators, and some aggregators receive multiple partitions via partition_id % num_aggregators, leading to under‑allocation.For SF‑100, dataset size ≈82.1 GiB.
    The old formula gives (82.1/128) + (82.1/200) ≈ 1.13 GiB. For an aggregator receiving 2 partitions, the max memory should be (82.1/200 per‑partition) × 2 (input + output) × 2 (ceil(num_partitions/num_aggregators)) ≈ 1.76 GiB, matching the observed ≈1.7 GiB peak.

This fails only on autoscaling clusters because the resource budget is tighter than fixed‑size clusters (≈18 nodes vs 32 nodes), and aggregators aren’t well spread.

Job URL: https://console.anyscale-staging.com/o/anyscale-internal/jobs/prodjob_z2pll84knil1iexjhwfcmislan

Fixes

  • Correct the calculation of aggregators from dataset_size/num_aggregators + dataset_size/num_partitions to (82.1/ 200)(per partition size) * 2( Input and output) * 2 (ceil (num_partition/num_aggregators))
    -Compute CPU from real available memory (exclude Object Store proportion). (May not be the best way but this is what we can do right now.)

  • Manually exclude Object Store Memory proportion when calculating target_num_cpus for aggregators

        worker_heap_memory_proportion = 1 - DEFAULT_OBJECT_STORE_MEMORY_PROPORTION
        target_num_cpus = min(
            cap,
            estimated_aggregator_memory_required
            / (4 * GiB * worker_heap_memory_proportion),
        )

Related issues

Closes #58734

bveeramani and others added 7 commits November 17, 2025 13:25
@owenowenisme owenowenisme force-pushed the data/fix-sf-100-release-test branch from b30adf5 to d9ba322 Compare November 19, 2025 09:14
Signed-off-by: You-Cheng Lin <[email protected]>
Signed-off-by: You-Cheng Lin <[email protected]>
Signed-off-by: You-Cheng Lin <[email protected]>
Signed-off-by: You-Cheng Lin <[email protected]>
Signed-off-by: You-Cheng Lin <[email protected]>
@owenowenisme owenowenisme added go add ONLY when ready to merge, run all tests data Ray Data-related issues release-test release test labels Nov 22, 2025
Signed-off-by: You-Cheng Lin <[email protected]>
@owenowenisme owenowenisme marked this pull request as ready for review November 23, 2025 16:31
@owenowenisme owenowenisme requested a review from a team as a code owner November 23, 2025 16:31
Comment on lines 1267 to 1277
f"shuffle={aggregator_shuffle_object_store_memory_required / MiB:.1f}MiB, "
f"output={output_object_store_memory_required / MiB:.1f}MiB, "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep these line-items separate as they help estimate individual components

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what this mean, but I restore the info to show shuffle & output memory info

# Output (object store)
output_object_store_memory_required
max_partitions_for_aggregator * partition_byte_size_estimate * 2
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's preserve the structure of estimating individual components (shuffle, output, working memory)

Comment on lines 1246 to 1269
)

# Estimate of object store memory required to accommodate all partitions
# handled by a single aggregator
aggregator_shuffle_object_store_memory_required: int = math.ceil(
estimated_dataset_bytes / num_aggregators
)
# Estimate of memory required to accommodate single partition as an output
# (inside Object Store)
output_object_store_memory_required: int = partition_byte_size_estimate
) # Estimated byte size of a single partition

aggregator_total_memory_required: int = (
# Inputs (object store)
aggregator_shuffle_object_store_memory_required
+
# Output (object store)
output_object_store_memory_required
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What i meant is following

  • We still want to estimate total memory required based on corresponding components (shuffle, output, etc)
  • Core change you're making is moving from dataset_size / num_aggregators to partition_byte_size * max_partitions, which is just a rounded version

Let's keep the overall structure and just make the actual change you want to make by updating shuffle component estimate.

BTW, we can add some skew factor (like say 30% extra as padding against skews)

Signed-off-by: You-Cheng Lin <[email protected]>
Signed-off-by: You-Cheng Lin <[email protected]>
@owenowenisme owenowenisme force-pushed the data/fix-sf-100-release-test branch from b082697 to 11f563e Compare December 3, 2025 06:28
Comment on lines 1261 to 1262
# Add 30% buffer to account for data skew
SKEW_FACTOR = 1.3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous comment got lost -- make this a class var so that we can patch it if needed

) # Estimated byte size of a single partition

# Add 30% buffer to account for data skew
SKEW_FACTOR = 1.3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SKEW_FACTOR = 1.3
SHUFFLE_AGGREGATOR_MEMORY_ESTIMATE_SKEW_FACTOR = 1.3

Signed-off-by: You-Cheng Lin <[email protected]>
Signed-off-by: You-Cheng Lin <[email protected]>
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Test expected values not updated for new memory formula

The test_hash_shuffle_operator_remote_args test cases have expected memory values that match the old memory calculation formula, but HashShuffleOperator._estimate_aggregator_memory_allocation was changed to use a new formula. The new formula multiplies partition_byte_size * max_partitions_for_aggregator for both input and output (plus a 1.3 skew factor), resulting in significantly larger memory values. For example, Case 1 expects 671088640 but the new formula produces approximately 1,395,864,372. These tests will fail until the expected values are updated.

python/ray/data/tests/test_hash_shuffle.py#L392-L393

"num_cpus": 0.16,
"memory": 671088640,

python/ray/data/tests/test_hash_shuffle.py#L440-L441

"num_cpus": 0.16, # ~0.6Gb / 4Gb = ~0.16
"memory": 687865856,

Fix in Cursor Fix in Web


Signed-off-by: You-Cheng Lin <[email protected]>
Signed-off-by: You-Cheng Lin <[email protected]>
@alexeykudinkin alexeykudinkin merged commit 8437df4 into ray-project:master Dec 4, 2025
6 checks passed
bveeramani added a commit that referenced this pull request Dec 8, 2025
… lint (#59247)

Our CI uses Bazel to run Python tests (not pytest). If we don't call
`pytest.main` in a test module, the tests aren't actually run.

#58816 fixed this issue for
`test_hash_shuffle`. As a follow up, this PR removes the
`test_hash_shuffle` from the list of files excluded from the
`pytest.main` lint.

Signed-off-by: Balaji Veeramani <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests release-test release test

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Data] map_groups release test fails with scale-factor 100 on autoscaling cluster

3 participants