Skip to content
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0abbd50
Initial commit
bveeramani Nov 17, 2025
0340d58
Update scale
bveeramani Nov 17, 2025
28ebd4b
Remove comment
bveeramani Nov 17, 2025
9b935b4
Merge branch 'master' into fixed-size-map-groups
bveeramani Nov 17, 2025
3d79ecc
Increase scale
bveeramani Nov 17, 2025
f430e5e
Update file'
bveeramani Nov 18, 2025
d9ba322
Merge remote-tracking branch 'upstream/master' into fixed-size-map-gr…
owenowenisme Nov 19, 2025
bc4ce3d
remove adjustment
owenowenisme Nov 19, 2025
7f223e8
update matrix
owenowenisme Nov 19, 2025
4af7aea
Merge remote-tracking branch 'upstream/master' into data/fix-sf-100-r…
owenowenisme Nov 21, 2025
bbb0586
debug log
owenowenisme Nov 21, 2025
b4c0472
quick fix
owenowenisme Nov 21, 2025
b23c251
update
owenowenisme Nov 22, 2025
f995052
update
owenowenisme Nov 23, 2025
5bc7889
restore logging info
owenowenisme Nov 26, 2025
74c60a3
Merge branch 'master' into data/fix-sf-100-release-test
owenowenisme Nov 26, 2025
8b1ecc7
update
owenowenisme Nov 27, 2025
35bcf3e
Merge branch 'master' into data/fix-sf-100-release-test
owenowenisme Nov 30, 2025
398d038
remove worker proportion calculation
owenowenisme Dec 3, 2025
514baa2
add ceil
owenowenisme Dec 3, 2025
4c531c6
Merge branch 'master' into data/fix-sf-100-release-test
owenowenisme Dec 3, 2025
11f563e
update release test yaml
owenowenisme Dec 3, 2025
d45fc6d
Merge branch 'master' into data/fix-sf-100-release-test
owenowenisme Dec 3, 2025
d74dad9
Merge branch 'master' into data/fix-sf-100-release-test
owenowenisme Dec 3, 2025
d2aa269
address comments
owenowenisme Dec 4, 2025
3f4a317
trigger test
owenowenisme Dec 4, 2025
ceeb3a7
update test
owenowenisme Dec 4, 2025
6951c51
make test size to medium
owenowenisme Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 29 additions & 13 deletions python/ray/data/_internal/execution/operators/hash_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@

import ray
from ray import ObjectRef
from ray._private.ray_constants import env_integer
from ray._private.ray_constants import (
env_integer,
)
from ray.actor import ActorHandle
from ray.data._internal.arrow_block import ArrowBlockBuilder
from ray.data._internal.arrow_ops.transform_pyarrow import (
Expand Down Expand Up @@ -1181,8 +1183,10 @@ def _get_aggregator_num_cpus(
# - No more than 4 CPUs per aggregator
#
cap = min(4.0, total_available_cluster_resources.cpu * 0.25 / num_aggregators)

target_num_cpus = min(cap, estimated_aggregator_memory_required / (4 * GiB))
target_num_cpus = min(
cap,
estimated_aggregator_memory_required / (4 * GiB),
)

# Round resource to 2d decimal point (for readability)
return round(target_num_cpus, 2)
Expand All @@ -1203,6 +1207,9 @@ def _gen_op_name(cls, num_partitions: int) -> str:


class HashShuffleOperator(HashShufflingOperatorBase):
# Add 30% buffer to account for data skew
SHUFFLE_AGGREGATOR_MEMORY_ESTIMATE_SKEW_FACTOR = 1.3

def __init__(
self,
input_op: PhysicalOperator,
Expand Down Expand Up @@ -1247,18 +1254,22 @@ def _estimate_aggregator_memory_allocation(
num_partitions: int,
estimated_dataset_bytes: int,
) -> int:
max_partitions_for_aggregator = math.ceil(
num_partitions / num_aggregators
) # Max number of partitions that a single aggregator might handle
partition_byte_size_estimate = math.ceil(
estimated_dataset_bytes / num_partitions
) # Estimated byte size of a single partition

# Inputs (object store) - memory for receiving shuffled partitions
aggregator_shuffle_object_store_memory_required = math.ceil(
partition_byte_size_estimate * max_partitions_for_aggregator
)

# Estimate of object store memory required to accommodate all partitions
# handled by a single aggregator
aggregator_shuffle_object_store_memory_required: int = math.ceil(
estimated_dataset_bytes / num_aggregators
# Output (object store) - memory for output partitions
output_object_store_memory_required = math.ceil(
partition_byte_size_estimate * max_partitions_for_aggregator
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's preserve the structure of estimating individual components (shuffle, output, working memory)

# Estimate of memory required to accommodate single partition as an output
# (inside Object Store)
output_object_store_memory_required: int = partition_byte_size_estimate

aggregator_total_memory_required: int = (
# Inputs (object store)
Expand All @@ -1267,18 +1278,23 @@ def _estimate_aggregator_memory_allocation(
# Output (object store)
output_object_store_memory_required
)

total_with_skew = math.ceil(
aggregator_total_memory_required
* cls.SHUFFLE_AGGREGATOR_MEMORY_ESTIMATE_SKEW_FACTOR
)
logger.info(
f"Estimated memory requirement for shuffling aggregator "
f"(partitions={num_partitions}, "
f"aggregators={num_aggregators}, "
f"dataset (estimate)={estimated_dataset_bytes / GiB:.1f}GiB): "
f"shuffle={aggregator_shuffle_object_store_memory_required / MiB:.1f}MiB, "
f"output={output_object_store_memory_required / MiB:.1f}MiB, "
f"total={aggregator_total_memory_required / MiB:.1f}MiB, "
f"total_base={aggregator_total_memory_required / MiB:.1f}MiB, "
f"skew_factor={cls.SHUFFLE_AGGREGATOR_MEMORY_ESTIMATE_SKEW_FACTOR}, "
f"total_with_skew={total_with_skew / MiB:.1f}MiB"
)

return aggregator_total_memory_required
return total_with_skew


@dataclass
Expand Down
15 changes: 1 addition & 14 deletions release/release_data_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -178,25 +178,12 @@

matrix:
setup:
scaling: [fixed_size]
scaling: [fixed_size, autoscaling]
shuffle_strategy: [sort_shuffle_pull_based, hash_shuffle]
columns:
- "column08 column13 column14" # 84 groups
- "column02 column14" # 7M groups

adjustments:
# Ray Data OOMs with hash shuffle on autoscaling clusters. So, only run
# autoscaling variants with sort shuffle. For more information, see
# https://github.com/ray-project/ray/issues/58734.
- with:
scaling: autoscaling
shuffle_strategy: sort_shuffle_pull_based
columns: "column08 column13 column14"
- with:
scaling: autoscaling
shuffle_strategy: sort_shuffle_pull_based
columns: "column02 column14"

cluster:
cluster_compute: "{{scaling}}_all_to_all_compute.yaml"

Expand Down