Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0abbd50
Initial commit
bveeramani Nov 17, 2025
0340d58
Update scale
bveeramani Nov 17, 2025
28ebd4b
Remove comment
bveeramani Nov 17, 2025
9b935b4
Merge branch 'master' into fixed-size-map-groups
bveeramani Nov 17, 2025
3d79ecc
Increase scale
bveeramani Nov 17, 2025
f430e5e
Update file'
bveeramani Nov 18, 2025
d9ba322
Merge remote-tracking branch 'upstream/master' into fixed-size-map-gr…
owenowenisme Nov 19, 2025
bc4ce3d
remove adjustment
owenowenisme Nov 19, 2025
7f223e8
update matrix
owenowenisme Nov 19, 2025
4af7aea
Merge remote-tracking branch 'upstream/master' into data/fix-sf-100-r…
owenowenisme Nov 21, 2025
bbb0586
debug log
owenowenisme Nov 21, 2025
b4c0472
quick fix
owenowenisme Nov 21, 2025
b23c251
update
owenowenisme Nov 22, 2025
f995052
update
owenowenisme Nov 23, 2025
5bc7889
restore logging info
owenowenisme Nov 26, 2025
74c60a3
Merge branch 'master' into data/fix-sf-100-release-test
owenowenisme Nov 26, 2025
8b1ecc7
update
owenowenisme Nov 27, 2025
35bcf3e
Merge branch 'master' into data/fix-sf-100-release-test
owenowenisme Nov 30, 2025
398d038
remove worker proportion calculation
owenowenisme Dec 3, 2025
514baa2
add ceil
owenowenisme Dec 3, 2025
4c531c6
Merge branch 'master' into data/fix-sf-100-release-test
owenowenisme Dec 3, 2025
11f563e
update release test yaml
owenowenisme Dec 3, 2025
d45fc6d
Merge branch 'master' into data/fix-sf-100-release-test
owenowenisme Dec 3, 2025
d74dad9
Merge branch 'master' into data/fix-sf-100-release-test
owenowenisme Dec 3, 2025
d2aa269
address comments
owenowenisme Dec 4, 2025
3f4a317
trigger test
owenowenisme Dec 4, 2025
ceeb3a7
update test
owenowenisme Dec 4, 2025
6951c51
make test size to medium
owenowenisme Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 29 additions & 12 deletions python/ray/data/_internal/execution/operators/hash_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@

import ray
from ray import ObjectRef
from ray._private.ray_constants import env_integer
from ray._private.ray_constants import (
DEFAULT_OBJECT_STORE_MEMORY_PROPORTION,
env_integer,
)
from ray.actor import ActorHandle
from ray.data._internal.arrow_block import ArrowBlockBuilder
from ray.data._internal.arrow_ops.transform_pyarrow import (
Expand Down Expand Up @@ -1179,7 +1182,12 @@ def _get_aggregator_num_cpus(
#
cap = min(4.0, total_available_cluster_resources.cpu * 0.25 / num_aggregators)

target_num_cpus = min(cap, estimated_aggregator_memory_required / (4 * GiB))
worker_heap_memory_proportion = 1 - DEFAULT_OBJECT_STORE_MEMORY_PROPORTION
target_num_cpus = min(
cap,
estimated_aggregator_memory_required
/ (4 * GiB * worker_heap_memory_proportion),
)

# Round resource to 2d decimal point (for readability)
return round(target_num_cpus, 2)
Expand Down Expand Up @@ -1241,18 +1249,25 @@ def _estimate_aggregator_memory_allocation(
num_partitions: int,
estimated_dataset_bytes: int,
) -> int:
max_partitions_for_aggregator = math.ceil(
num_partitions / num_aggregators
) # Max number of partitions that a single aggregator might handle
partition_byte_size_estimate = math.ceil(
estimated_dataset_bytes / num_partitions
) # Estimated byte size of a single partition

# Add 30% buffer to account for data skew
SKEW_FACTOR = 1.3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous comment got lost -- make this a class var so that we can patch it if needed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SKEW_FACTOR = 1.3
SHUFFLE_AGGREGATOR_MEMORY_ESTIMATE_SKEW_FACTOR = 1.3


# Inputs (object store) - memory for receiving shuffled partitions
aggregator_shuffle_object_store_memory_required = math.ceil(
partition_byte_size_estimate * max_partitions_for_aggregator
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's preserve the structure of estimating individual components (shuffle, output, working memory)


# Estimate of object store memory required to accommodate all partitions
# handled by a single aggregator
aggregator_shuffle_object_store_memory_required: int = math.ceil(
estimated_dataset_bytes / num_aggregators
# Output (object store) - memory for output partitions
output_object_store_memory_required = math.ceil(
partition_byte_size_estimate * max_partitions_for_aggregator
)
# Estimate of memory required to accommodate single partition as an output
# (inside Object Store)
output_object_store_memory_required: int = partition_byte_size_estimate

aggregator_total_memory_required: int = (
# Inputs (object store)
Expand All @@ -1261,18 +1276,20 @@ def _estimate_aggregator_memory_allocation(
# Output (object store)
output_object_store_memory_required
)

total_with_skew = aggregator_total_memory_required * SKEW_FACTOR
logger.info(
f"Estimated memory requirement for shuffling aggregator "
f"(partitions={num_partitions}, "
f"aggregators={num_aggregators}, "
f"dataset (estimate)={estimated_dataset_bytes / GiB:.1f}GiB): "
f"shuffle={aggregator_shuffle_object_store_memory_required / MiB:.1f}MiB, "
f"output={output_object_store_memory_required / MiB:.1f}MiB, "
f"total={aggregator_total_memory_required / MiB:.1f}MiB, "
f"total_base={aggregator_total_memory_required / MiB:.1f}MiB, "
f"skew_factor={SKEW_FACTOR}, "
f"total_with_skew={total_with_skew / MiB:.1f}MiB"
)

return aggregator_total_memory_required
return total_with_skew


@dataclass
Expand Down
2 changes: 1 addition & 1 deletion release/release_data_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@

matrix:
setup:
scaling: [fixed_size]
scaling: [fixed_size, autoscaling]
shuffle_strategy: [sort_shuffle_pull_based, hash_shuffle]
columns:
- "column08 column13 column14" # 84 groups
Expand Down