Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0abbd50
Initial commit
bveeramani Nov 17, 2025
0340d58
Update scale
bveeramani Nov 17, 2025
28ebd4b
Remove comment
bveeramani Nov 17, 2025
9b935b4
Merge branch 'master' into fixed-size-map-groups
bveeramani Nov 17, 2025
3d79ecc
Increase scale
bveeramani Nov 17, 2025
f430e5e
Update file'
bveeramani Nov 18, 2025
d9ba322
Merge remote-tracking branch 'upstream/master' into fixed-size-map-gr…
owenowenisme Nov 19, 2025
bc4ce3d
remove adjustment
owenowenisme Nov 19, 2025
7f223e8
update matrix
owenowenisme Nov 19, 2025
4af7aea
Merge remote-tracking branch 'upstream/master' into data/fix-sf-100-r…
owenowenisme Nov 21, 2025
bbb0586
debug log
owenowenisme Nov 21, 2025
b4c0472
quick fix
owenowenisme Nov 21, 2025
b23c251
update
owenowenisme Nov 22, 2025
f995052
update
owenowenisme Nov 23, 2025
5bc7889
restore logging info
owenowenisme Nov 26, 2025
74c60a3
Merge branch 'master' into data/fix-sf-100-release-test
owenowenisme Nov 26, 2025
8b1ecc7
update
owenowenisme Nov 27, 2025
35bcf3e
Merge branch 'master' into data/fix-sf-100-release-test
owenowenisme Nov 30, 2025
398d038
remove worker proportion calculation
owenowenisme Dec 3, 2025
514baa2
add ceil
owenowenisme Dec 3, 2025
4c531c6
Merge branch 'master' into data/fix-sf-100-release-test
owenowenisme Dec 3, 2025
11f563e
update release test yaml
owenowenisme Dec 3, 2025
d45fc6d
Merge branch 'master' into data/fix-sf-100-release-test
owenowenisme Dec 3, 2025
d74dad9
Merge branch 'master' into data/fix-sf-100-release-test
owenowenisme Dec 3, 2025
d2aa269
address comments
owenowenisme Dec 4, 2025
3f4a317
trigger test
owenowenisme Dec 4, 2025
ceeb3a7
update test
owenowenisme Dec 4, 2025
6951c51
make test size to medium
owenowenisme Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/data/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ py_test(

py_test(
name = "test_hash_shuffle",
size = "small",
size = "medium",
srcs = ["tests/test_hash_shuffle.py"],
tags = [
"exclusive",
Expand Down
42 changes: 29 additions & 13 deletions python/ray/data/_internal/execution/operators/hash_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@

import ray
from ray import ObjectRef
from ray._private.ray_constants import env_integer
from ray._private.ray_constants import (
env_integer,
)
from ray.actor import ActorHandle
from ray.data._internal.arrow_block import ArrowBlockBuilder
from ray.data._internal.arrow_ops.transform_pyarrow import (
Expand Down Expand Up @@ -1181,8 +1183,10 @@ def _get_aggregator_num_cpus(
# - No more than 4 CPUs per aggregator
#
cap = min(4.0, total_available_cluster_resources.cpu * 0.25 / num_aggregators)

target_num_cpus = min(cap, estimated_aggregator_memory_required / (4 * GiB))
target_num_cpus = min(
cap,
estimated_aggregator_memory_required / (4 * GiB),
)

# Round resource to 2d decimal point (for readability)
return round(target_num_cpus, 2)
Expand All @@ -1203,6 +1207,9 @@ def _gen_op_name(cls, num_partitions: int) -> str:


class HashShuffleOperator(HashShufflingOperatorBase):
# Add 30% buffer to account for data skew
SHUFFLE_AGGREGATOR_MEMORY_ESTIMATE_SKEW_FACTOR = 1.3

def __init__(
self,
input_op: PhysicalOperator,
Expand Down Expand Up @@ -1247,18 +1254,22 @@ def _estimate_aggregator_memory_allocation(
num_partitions: int,
estimated_dataset_bytes: int,
) -> int:
max_partitions_for_aggregator = math.ceil(
num_partitions / num_aggregators
) # Max number of partitions that a single aggregator might handle
partition_byte_size_estimate = math.ceil(
estimated_dataset_bytes / num_partitions
) # Estimated byte size of a single partition

# Inputs (object store) - memory for receiving shuffled partitions
aggregator_shuffle_object_store_memory_required = math.ceil(
partition_byte_size_estimate * max_partitions_for_aggregator
)

# Estimate of object store memory required to accommodate all partitions
# handled by a single aggregator
aggregator_shuffle_object_store_memory_required: int = math.ceil(
estimated_dataset_bytes / num_aggregators
# Output (object store) - memory for output partitions
output_object_store_memory_required = math.ceil(
partition_byte_size_estimate * max_partitions_for_aggregator
)
# Estimate of memory required to accommodate single partition as an output
# (inside Object Store)
output_object_store_memory_required: int = partition_byte_size_estimate

aggregator_total_memory_required: int = (
# Inputs (object store)
Expand All @@ -1267,18 +1278,23 @@ def _estimate_aggregator_memory_allocation(
# Output (object store)
output_object_store_memory_required
)

total_with_skew = math.ceil(
aggregator_total_memory_required
* cls.SHUFFLE_AGGREGATOR_MEMORY_ESTIMATE_SKEW_FACTOR
)
logger.info(
f"Estimated memory requirement for shuffling aggregator "
f"(partitions={num_partitions}, "
f"aggregators={num_aggregators}, "
f"dataset (estimate)={estimated_dataset_bytes / GiB:.1f}GiB): "
f"shuffle={aggregator_shuffle_object_store_memory_required / MiB:.1f}MiB, "
f"output={output_object_store_memory_required / MiB:.1f}MiB, "
f"total={aggregator_total_memory_required / MiB:.1f}MiB, "
f"total_base={aggregator_total_memory_required / MiB:.1f}MiB, "
f"shuffle_aggregator_memory_estimate_skew_factor={cls.SHUFFLE_AGGREGATOR_MEMORY_ESTIMATE_SKEW_FACTOR}, "
f"total_with_skew={total_with_skew / MiB:.1f}MiB"
)

return aggregator_total_memory_required
return total_with_skew


@dataclass
Expand Down
88 changes: 73 additions & 15 deletions python/ray/data/tests/test_hash_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@ class JoinTestCase:
},
),
# Case 7: No dataset size estimates available (fallback to default memory request)
# Memory calculation (fallback):
# max_mem_per_agg = 32 GiB / 32 = 1 GiB
# modest_mem = 1 GiB / 2 = 512 MiB
# memory = min(512 MiB, DEFAULT_1GiB) = 512 MiB = 536870912
# CPU calculation:
# cap = min(4.0, 32.0 * 0.25 / 32) = 0.25
# target = min(0.25, 536870912 / 4 GiB) = 0.12
JoinTestCase(
left_size_bytes=None,
right_size_bytes=None,
Expand All @@ -160,9 +167,8 @@ class JoinTestCase:
expected_num_aggregators=32, # min(200, min(1000, 128 (default max))
expected_ray_remote_args={
"max_concurrency": 7, # ceil(200 / 32)
"num_cpus": 0.25, # 32 * 25% / 32
# Default fallback of 2Gb
"memory": 1073741824,
"num_cpus": 0.12,
"memory": 536870912,
"scheduling_strategy": "SPREAD",
"allow_out_of_order_execution": True,
},
Expand Down Expand Up @@ -313,6 +319,13 @@ class HashOperatorTestCase:
},
),
# Case 6: No dataset size estimate inferred (fallback to default memory request)
# Memory calculation (fallback):
# max_mem_per_agg = 32 GiB / 32 = 1 GiB
# modest_mem = 1 GiB / 2 = 512 MiB
# memory = min(512 MiB, DEFAULT_1GiB) = 512 MiB = 536870912
# CPU calculation:
# cap = min(4.0, 32.0 * 0.25 / 32) = 0.25
# target = min(0.25, 536870912 / 4 GiB) = 0.12
HashOperatorTestCase(
input_size_bytes=None,
input_num_blocks=None,
Expand All @@ -322,8 +335,8 @@ class HashOperatorTestCase:
expected_num_aggregators=32,
expected_ray_remote_args={
"max_concurrency": 7,
"num_cpus": 0.25,
"memory": 1073741824,
"num_cpus": 0.12,
"memory": 536870912,
"scheduling_strategy": "SPREAD",
"allow_out_of_order_execution": True,
},
Expand Down Expand Up @@ -380,6 +393,14 @@ def test_hash_aggregate_operator_remote_args(
"tc",
[
# Case 1: Auto-derived partitions with limited CPUs
# Memory calculation:
# max_partitions_per_agg = ceil(16 / 4) = 4
# partition_size = ceil(2 GiB / 16) = 128 MiB
# shuffle + output = 2 * (128 MiB * 4) = 1024 MiB
# with 1.3x skew factor: ceil(1024 MiB * 1.3) = 1395864372
# CPU calculation:
# cap = min(4.0, 4.0 * 0.25 / 4) = 0.25
# target = min(0.25, 1395864372 / 4 GiB) = 0.25
HashOperatorTestCase(
input_size_bytes=2 * GiB,
input_num_blocks=16,
Expand All @@ -389,13 +410,21 @@ def test_hash_aggregate_operator_remote_args(
expected_num_aggregators=4,
expected_ray_remote_args={
"max_concurrency": 4,
"num_cpus": 0.16,
"memory": 671088640,
"num_cpus": 0.25,
"memory": 1395864372,
"scheduling_strategy": "SPREAD",
"allow_out_of_order_execution": True,
},
),
# Case 2: Single partition produced
# Memory calculation:
# max_partitions_per_agg = ceil(1 / 1) = 1
# partition_size = ceil(512 MiB / 1) = 512 MiB
# shuffle + output = 2 * (512 MiB * 1) = 1024 MiB
# with 1.3x skew factor: ceil(1024 MiB * 1.3) = 1395864372
# CPU calculation:
# cap = min(4.0, 8.0 * 0.25 / 1) = 2.0
# target = min(2.0, 1395864372 / 4 GiB) = 0.33
HashOperatorTestCase(
input_size_bytes=512 * MiB,
input_num_blocks=8,
Expand All @@ -405,13 +434,21 @@ def test_hash_aggregate_operator_remote_args(
expected_num_aggregators=1,
expected_ray_remote_args={
"max_concurrency": 1,
"num_cpus": 0.25,
"memory": 1073741824,
"num_cpus": 0.33,
"memory": 1395864372,
"scheduling_strategy": "SPREAD",
"allow_out_of_order_execution": True,
},
),
# Case 3: Many CPUs
# Memory calculation:
# max_partitions_per_agg = ceil(32 / 32) = 1
# partition_size = ceil(16 GiB / 32) = 512 MiB
# shuffle + output = 2 * (512 MiB * 1) = 1024 MiB
# with 1.3x skew factor: ceil(1024 MiB * 1.3) = 1395864372
# CPU calculation:
# cap = min(4.0, 256.0 * 0.25 / 32) = 2.0
# target = min(2.0, 1395864372 / 4 GiB) = 0.33
HashOperatorTestCase(
input_size_bytes=16 * GiB,
input_num_blocks=128,
Expand All @@ -421,13 +458,21 @@ def test_hash_aggregate_operator_remote_args(
expected_num_aggregators=32,
expected_ray_remote_args={
"max_concurrency": 1,
"num_cpus": 0.25,
"memory": 1073741824,
"num_cpus": 0.33,
"memory": 1395864372,
"scheduling_strategy": "SPREAD",
"allow_out_of_order_execution": True,
},
),
# Case 4: Testing num_cpus derived from memory allocation
# Memory calculation:
# max_partitions_per_agg = ceil(200 / 128) = 2
# partition_size = ceil(50 GiB / 200) = 256 MiB
# shuffle + output = 2 * (256 MiB * 2) = 1024 MiB
# with 1.3x skew factor: ceil(1024 MiB * 1.3) = 1395864372
# CPU calculation:
# cap = min(4.0, 1024 * 0.25 / 128) = 2.0
# target = min(2.0, 1395864372 / 4 GiB) = 0.33
HashOperatorTestCase(
input_size_bytes=50 * GiB,
input_num_blocks=200,
Expand All @@ -437,13 +482,20 @@ def test_hash_aggregate_operator_remote_args(
expected_num_aggregators=128, # min(200, min(1000, 128 (default max))
expected_ray_remote_args={
"max_concurrency": 2, # ceil(200 / 128)
"num_cpus": 0.16, # ~0.6Gb / 4Gb = ~0.16
"memory": 687865856,
"num_cpus": 0.33,
"memory": 1395864372,
"scheduling_strategy": "SPREAD",
"allow_out_of_order_execution": True,
},
),
# Case 5: No dataset size estimate inferred (fallback to default memory request)
# Memory calculation (fallback):
# max_mem_per_agg = 32 GiB / 32 = 1 GiB
# modest_mem = 1 GiB / 2 = 512 MiB
# memory = min(512 MiB, DEFAULT_1GiB) = 512 MiB = 536870912
# CPU calculation:
# cap = min(4.0, 32.0 * 0.25 / 32) = 0.25
# target = min(0.25, 536870912 / 4 GiB) = 0.12
HashOperatorTestCase(
input_size_bytes=None,
input_num_blocks=None,
Expand All @@ -453,8 +505,8 @@ def test_hash_aggregate_operator_remote_args(
expected_num_aggregators=32,
expected_ray_remote_args={
"max_concurrency": 7,
"num_cpus": 0.25,
"memory": 1073741824,
"num_cpus": 0.12,
"memory": 536870912,
"scheduling_strategy": "SPREAD",
"allow_out_of_order_execution": True,
},
Expand Down Expand Up @@ -566,3 +618,9 @@ def test_aggregator_ray_remote_args_partial_override(ray_start_regular):

# Verify that memory is still present
assert "memory" in op._aggregator_pool._aggregator_ray_remote_args


if __name__ == "__main__":
import sys

sys.exit(pytest.main(["-v", __file__]))
15 changes: 1 addition & 14 deletions release/release_data_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -178,25 +178,12 @@

matrix:
setup:
scaling: [fixed_size]
scaling: [fixed_size, autoscaling]
shuffle_strategy: [sort_shuffle_pull_based, hash_shuffle]
columns:
- "column08 column13 column14" # 84 groups
- "column02 column14" # 7M groups

adjustments:
# Ray Data OOMs with hash shuffle on autoscaling clusters. So, only run
# autoscaling variants with sort shuffle. For more information, see
# https://github.com/ray-project/ray/issues/58734.
- with:
scaling: autoscaling
shuffle_strategy: sort_shuffle_pull_based
columns: "column08 column13 column14"
- with:
scaling: autoscaling
shuffle_strategy: sort_shuffle_pull_based
columns: "column02 column14"

cluster:
cluster_compute: "{{scaling}}_all_to_all_compute.yaml"

Expand Down