Skip to content

Conversation

@srinathk10
Copy link
Contributor

@srinathk10 srinathk10 commented Nov 20, 2025

Thank you for contributing to Ray! 🚀
Please review the Ray Contribution Guide before opening a pull request.

⚠️ Remove these instructions before submitting your PR.

💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete.

Description

Briefly describe what this PR accomplishes and why it's needed.

[Data] Fix obj_store_mem_max_pending_output_per_task reporting

Fix obj_store_mem_max_pending_output_per_task when sample is unavailable to factor in,

  • bytes_per_output = MAX_SAFE_BLOCK_SIZE_FACTOR * target_max_block_size.

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

@srinathk10 srinathk10 requested a review from a team as a code owner November 20, 2025 23:59
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly adjusts the memory estimation for pending task outputs by incorporating MAX_SAFE_BLOCK_SIZE_FACTOR when no block samples are available. The accompanying tests are thorough and validate the new logic across various scenarios. I have one suggestion to refactor the implementation for better code clarity and to reduce duplication.

Comment on lines 685 to 697
if context.target_max_block_size is None:
return None
bytes_per_output = context.target_max_block_size
bytes_per_output = (
DEFAULT_TARGET_MAX_BLOCK_SIZE * MAX_SAFE_BLOCK_SIZE_FACTOR
)
else:
bytes_per_output = (
context.target_max_block_size * MAX_SAFE_BLOCK_SIZE_FACTOR
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for determining bytes_per_output when a sample is not available can be simplified to reduce code duplication. Both branches of the if/else statement perform the same multiplication with MAX_SAFE_BLOCK_SIZE_FACTOR. You can determine the target_block_size first and then perform the multiplication once.

            target_block_size = context.target_max_block_size
            if target_block_size is None:
                target_block_size = DEFAULT_TARGET_MAX_BLOCK_SIZE
            bytes_per_output = target_block_size * MAX_SAFE_BLOCK_SIZE_FACTOR

@srinathk10 srinathk10 added the go add ONLY when ready to merge, run all tests label Nov 21, 2025
@ray-gardener ray-gardener bot added the data Ray Data-related issues label Nov 21, 2025
Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Tests not updated for MAX_SAFE_BLOCK_SIZE_FACTOR change

The code change multiplies bytes_per_output by MAX_SAFE_BLOCK_SIZE_FACTOR when estimating pending task outputs, but three existing test assertions in test_task_pool_resource_reporting, test_task_pool_resource_reporting_with_bundling, and test_actor_pool_resource_reporting still expect the old calculation without the factor. These tests will fail because the actual memory usage will be 1.5x larger than their expected values.

python/ray/data/tests/test_executor_resource_management.py#L222-L228

assert op.metrics.obj_store_mem_pending_task_inputs == pytest.approx(1600, rel=0.5)
assert op.metrics.obj_store_mem_pending_task_outputs == pytest.approx(
2 # Number of active tasks
* ctx._max_num_blocks_in_streaming_gen_buffer
* ctx.target_max_block_size,
rel=0.5,
)

python/ray/data/tests/test_executor_resource_management.py#L282-L288

assert op.metrics.obj_store_mem_pending_task_inputs == pytest.approx(2400, rel=0.5)
assert op.metrics.obj_store_mem_pending_task_outputs == pytest.approx(
1 # Number of active tasks
* ctx._max_num_blocks_in_streaming_gen_buffer
* ctx.target_max_block_size,
rel=0.5,
)

python/ray/data/tests/test_executor_resource_management.py#L355-L361

assert op.metrics.obj_store_mem_pending_task_inputs == pytest.approx(3200, rel=0.5)
assert op.metrics.obj_store_mem_pending_task_outputs == pytest.approx(
2 # We launched 4 tasks across 2 actor, but only 2 tasks run at a time
* ctx._max_num_blocks_in_streaming_gen_buffer
* ctx.target_max_block_size,
rel=0.5,
)

Fix in Cursor Fix in Web


@raulchen raulchen merged commit bceeaa4 into master Nov 24, 2025
6 checks passed
@raulchen raulchen deleted the srinathk10/pending_output_per_task branch November 24, 2025 23:57
ykdojo pushed a commit to ykdojo/ray that referenced this pull request Nov 27, 2025
…roject#58864)

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] Fix obj_store_mem_max_pending_output_per_task reporting

Fix `obj_store_mem_max_pending_output_per_task` when sample is
unavailable to factor in,

- `bytes_per_output` = `MAX_SAFE_BLOCK_SIZE_FACTOR` *
`target_max_block_size`.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: YK <[email protected]>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
…roject#58864)

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] Fix obj_store_mem_max_pending_output_per_task reporting

Fix `obj_store_mem_max_pending_output_per_task` when sample is
unavailable to factor in,

- `bytes_per_output` = `MAX_SAFE_BLOCK_SIZE_FACTOR` *
`target_max_block_size`.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ray fails to serialize self-reference objects

3 participants