Skip to content

Commit 2a83248

Browse files
authored
[Data] Add iter_prefetched_bytes stats (#58900)
> Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. > ⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ### [Data] Add iter_prefetched_blocks stats Report prefetched bytes per iterator as stats. ## Related issues > Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <[email protected]> Signed-off-by: Srinath Krishnamachari <[email protected]>
1 parent a4e647a commit 2a83248

File tree

4 files changed

+44
-0
lines changed

4 files changed

+44
-0
lines changed

python/ray/dashboard/modules/metrics/dashboards/data_dashboard_panels.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,6 +1050,21 @@
10501050
stack=False,
10511051
)
10521052

1053+
ITERATION_PREFETCHED_BYTES_PANEL = Panel(
1054+
id=90,
1055+
title="Iteration Prefetched Bytes",
1056+
description="Current bytes of prefetched blocks in the iterator",
1057+
unit="bytes",
1058+
targets=[
1059+
Target(
1060+
expr="sum(ray_data_iter_prefetched_bytes{{{global_filters}}}) by (dataset)",
1061+
legend="Prefetched Bytes: {{dataset}}",
1062+
)
1063+
],
1064+
fill=0,
1065+
stack=False,
1066+
)
1067+
10531068
ITERATION_TIME_TO_FIRST_BATCH_PANEL = Panel(
10541069
id=120,
10551070
title="Iteration Time to First Batch",
@@ -1420,6 +1435,7 @@
14201435
ITERATION_BLOCKS_LOCAL_PANEL,
14211436
ITERATION_BLOCKS_REMOTE_PANEL,
14221437
ITERATION_BLOCKS_UNKNOWN_LOCATION_PANEL,
1438+
ITERATION_PREFETCHED_BYTES_PANEL,
14231439
ITERATION_TIME_TO_FIRST_BATCH_PANEL,
14241440
ITERATION_GET_REF_BUNDLES_PANEL,
14251441
],

python/ray/data/_internal/block_batching/iter_batches.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,8 @@ def get_next_ref_bundle() -> RefBundle:
373373
current_window_size = 0
374374

375375
if num_batches_to_prefetch <= 0:
376+
if stats:
377+
stats.iter_prefetched_bytes = 0
376378
for ref_bundle in ref_bundles:
377379
for block_ref in ref_bundle.block_refs:
378380
yield block_ref
@@ -398,6 +400,10 @@ def get_next_ref_bundle() -> RefBundle:
398400
break
399401

400402
prefetcher.prefetch_blocks([block_ref for block_ref, _ in list(sliding_window)])
403+
if stats:
404+
stats.iter_prefetched_bytes = sum(
405+
metadata.size_bytes or 0 for _, metadata in sliding_window
406+
)
401407

402408
while sliding_window:
403409
block_ref, metadata = sliding_window.popleft()
@@ -413,6 +419,10 @@ def get_next_ref_bundle() -> RefBundle:
413419
)
414420
except StopIteration:
415421
pass
422+
if stats:
423+
stats.iter_prefetched_bytes = sum(
424+
metadata.size_bytes or 0 for _, metadata in sliding_window
425+
)
416426
yield block_ref
417427
trace_deallocation(block_ref, loc="iter_batches", free=eager_free)
418428
prefetcher.stop()

python/ray/data/_internal/stats.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,11 @@ def __init__(self, max_stats=1000):
381381
description="Number of blocks that have unknown locations",
382382
tag_keys=iter_tag_keys,
383383
)
384+
self.iter_prefetched_bytes = Gauge(
385+
"data_iter_prefetched_bytes",
386+
description="Current bytes of prefetched blocks in the iterator",
387+
tag_keys=iter_tag_keys,
388+
)
384389

385390
# === Dataset and Operator Metadata Metrics ===
386391
dataset_tags = ("dataset", "job_id", "start_time")
@@ -568,6 +573,7 @@ def update_iteration_metrics(
568573
self.iter_blocks_local.set(stats.iter_blocks_local, tags)
569574
self.iter_blocks_remote.set(stats.iter_blocks_remote, tags)
570575
self.iter_unknown_location.set(stats.iter_unknown_location, tags)
576+
self.iter_prefetched_bytes.set(stats.iter_prefetched_bytes, tags)
571577

572578
self.iter_block_fetching_s.set(stats.iter_get_s.get(), tags)
573579
self.iter_batch_shaping_s.set(stats.iter_next_batch_s.get(), tags)
@@ -976,6 +982,7 @@ def __init__(
976982
self.iter_blocks_local: int = 0
977983
self.iter_blocks_remote: int = 0
978984
self.iter_unknown_location: int = 0
985+
self.iter_prefetched_bytes: int = 0
979986

980987
# Memory usage stats
981988
self.global_bytes_spilled: int = 0
@@ -1018,6 +1025,7 @@ def to_summary(self) -> "DatasetStatsSummary":
10181025
self.iter_blocks_local,
10191026
self.iter_blocks_remote,
10201027
self.iter_unknown_location,
1028+
self.iter_prefetched_bytes,
10211029
)
10221030

10231031
stats_summary_parents = []
@@ -1730,6 +1738,8 @@ class IterStatsSummary:
17301738
iter_blocks_remote: int
17311739
# Num of blocks with unknown locations
17321740
iter_unknown_location: int
1741+
# Current bytes of prefetched blocks in the iterator
1742+
iter_prefetched_bytes: int
17331743

17341744
def __str__(self) -> str:
17351745
return self.to_string()
@@ -1830,6 +1840,8 @@ def to_string(self) -> str:
18301840
out += " * Num blocks unknown location: {}\n".format(
18311841
self.iter_unknown_location
18321842
)
1843+
if self.iter_prefetched_bytes:
1844+
out += " * Prefetched bytes: {}\n".format(self.iter_prefetched_bytes)
18331845
if self.streaming_split_coord_time.get() != 0:
18341846
out += "Streaming split coordinator overhead time: "
18351847
out += f"{fmt(self.streaming_split_coord_time.get())}\n"
@@ -1846,6 +1858,7 @@ def __repr__(self, level=0) -> str:
18461858
f"{indent} iter_blocks_local={self.iter_blocks_local or None},\n"
18471859
f"{indent} iter_blocks_remote={self.iter_blocks_remote or None},\n"
18481860
f"{indent} iter_unknown_location={self.iter_unknown_location or None},\n"
1861+
f"{indent} iter_prefetched_bytes={self.iter_prefetched_bytes or None},\n"
18491862
f"{indent} next_time={fmt(self.next_time.get()) or None},\n"
18501863
f"{indent} format_time={fmt(self.format_time.get()) or None},\n"
18511864
f"{indent} user_time={fmt(self.user_time.get()) or None},\n"

python/ray/data/tests/test_stats.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,7 @@ def test_dataset__repr__(ray_start_regular_shared, restore_data_context):
791791
" iter_blocks_local=None,\n"
792792
" iter_blocks_remote=None,\n"
793793
" iter_unknown_location=None,\n"
794+
" iter_prefetched_bytes=None,\n"
794795
" next_time=T,\n"
795796
" format_time=T,\n"
796797
" user_time=T,\n"
@@ -813,6 +814,7 @@ def test_dataset__repr__(ray_start_regular_shared, restore_data_context):
813814
" iter_blocks_local=None,\n"
814815
" iter_blocks_remote=None,\n"
815816
" iter_unknown_location=None,\n"
817+
" iter_prefetched_bytes=None,\n"
816818
" next_time=T,\n"
817819
" format_time=T,\n"
818820
" user_time=T,\n"
@@ -933,6 +935,7 @@ def check_stats():
933935
" iter_blocks_local=None,\n"
934936
" iter_blocks_remote=None,\n"
935937
" iter_unknown_location=N,\n"
938+
" iter_prefetched_bytes=None,\n"
936939
" next_time=T,\n"
937940
" format_time=T,\n"
938941
" user_time=T,\n"
@@ -1029,6 +1032,7 @@ def check_stats():
10291032
" iter_blocks_local=None,\n"
10301033
" iter_blocks_remote=None,\n"
10311034
" iter_unknown_location=None,\n"
1035+
" iter_prefetched_bytes=None,\n"
10321036
" next_time=T,\n"
10331037
" format_time=T,\n"
10341038
" user_time=T,\n"
@@ -1051,6 +1055,7 @@ def check_stats():
10511055
" iter_blocks_local=None,\n"
10521056
" iter_blocks_remote=None,\n"
10531057
" iter_unknown_location=None,\n"
1058+
" iter_prefetched_bytes=None,\n"
10541059
" next_time=T,\n"
10551060
" format_time=T,\n"
10561061
" user_time=T,\n"

0 commit comments

Comments
 (0)