44from typing import TYPE_CHECKING , Any , Dict , List , Optional
55
66import ray
7- from ray .data ._internal .execution .bundle_queue import create_bundle_queue
87from ray .data ._internal .execution .interfaces .ref_bundle import RefBundle
98from ray .data ._internal .memory_tracing import trace_allocation
109
@@ -268,11 +267,31 @@ class OpRuntimeMetrics(metaclass=OpRuntimesMetricsMeta):
268267 description = "Number of blocks in operator's internal input queue." ,
269268 metrics_group = MetricsGroup .OBJECT_STORE_MEMORY ,
270269 )
270+ obj_store_mem_internal_inqueue : int = metric_field (
271+ default = 0 ,
272+ description = (
273+ "Byte size of input blocks in the operator's internal input queue."
274+ ),
275+ metrics_group = MetricsGroup .OBJECT_STORE_MEMORY ,
276+ )
271277 obj_store_mem_internal_outqueue_blocks : int = metric_field (
272278 default = 0 ,
273279 description = "Number of blocks in the operator's internal output queue." ,
274280 metrics_group = MetricsGroup .OBJECT_STORE_MEMORY ,
275281 )
282+ obj_store_mem_internal_outqueue : int = metric_field (
283+ default = 0 ,
284+ description = (
285+ "Byte size of output blocks in the operator's internal output queue."
286+ ),
287+ metrics_group = MetricsGroup .OBJECT_STORE_MEMORY ,
288+ )
289+ obj_store_mem_pending_task_inputs : int = metric_field (
290+ default = 0 ,
291+ description = "Byte size of input blocks used by pending tasks." ,
292+ metrics_group = MetricsGroup .OBJECT_STORE_MEMORY ,
293+ map_only = True ,
294+ )
276295 obj_store_mem_freed : int = metric_field (
277296 default = 0 ,
278297 description = "Byte size of freed memory in object store." ,
@@ -304,10 +323,6 @@ def __init__(self, op: "PhysicalOperator"):
304323 # Start time of current pause due to task submission backpressure
305324 self ._task_submission_backpressure_start_time = - 1
306325
307- self ._internal_inqueue = create_bundle_queue ()
308- self ._internal_outqueue = create_bundle_queue ()
309- self ._pending_task_inputs = create_bundle_queue ()
310-
311326 @property
312327 def extra_metrics (self ) -> Dict [str , Any ]:
313328 """Return a dict of extra metrics."""
@@ -362,30 +377,6 @@ def average_bytes_per_output(self) -> Optional[float]:
362377 else :
363378 return self .bytes_task_outputs_generated / self .num_task_outputs_generated
364379
365- @metric_property (
366- description = "Byte size of input blocks in the operator's internal input queue." ,
367- metrics_group = MetricsGroup .OBJECT_STORE_MEMORY ,
368- )
369- def obj_store_mem_internal_inqueue (self ) -> int :
370- return self ._internal_inqueue .estimate_size_bytes ()
371-
372- @metric_property (
373- description = (
374- "Byte size of output blocks in the operator's internal output queue."
375- ),
376- metrics_group = MetricsGroup .OBJECT_STORE_MEMORY ,
377- )
378- def obj_store_mem_internal_outqueue (self ) -> int :
379- return self ._internal_outqueue .estimate_size_bytes ()
380-
381- @metric_property (
382- description = "Byte size of input blocks used by pending tasks." ,
383- metrics_group = MetricsGroup .OBJECT_STORE_MEMORY ,
384- map_only = True ,
385- )
386- def obj_store_mem_pending_task_inputs (self ) -> int :
387- return self ._pending_task_inputs .estimate_size_bytes ()
388-
389380 @property
390381 def obj_store_mem_pending_task_outputs (self ) -> Optional [float ]:
391382 """Estimated size in bytes of output blocks in Ray generator buffers.
@@ -463,13 +454,13 @@ def on_input_received(self, input: RefBundle):
463454 def on_input_queued (self , input : RefBundle ):
464455 """Callback when the operator queues an input."""
465456 self .obj_store_mem_internal_inqueue_blocks += len (input .blocks )
466- self ._internal_inqueue . add ( input )
457+ self .obj_store_mem_internal_inqueue += input . size_bytes ( )
467458
468459 def on_input_dequeued (self , input : RefBundle ):
469460 """Callback when the operator dequeues an input."""
470461 self .obj_store_mem_internal_inqueue_blocks -= len (input .blocks )
471462 input_size = input .size_bytes ()
472- self ._internal_inqueue . remove ( input )
463+ self .obj_store_mem_internal_inqueue -= input_size
473464 assert self .obj_store_mem_internal_inqueue >= 0 , (
474465 self ._op ,
475466 self .obj_store_mem_internal_inqueue ,
@@ -479,13 +470,13 @@ def on_input_dequeued(self, input: RefBundle):
479470 def on_output_queued (self , output : RefBundle ):
480471 """Callback when an output is queued by the operator."""
481472 self .obj_store_mem_internal_outqueue_blocks += len (output .blocks )
482- self ._internal_outqueue . add ( output )
473+ self .obj_store_mem_internal_outqueue += output . size_bytes ( )
483474
484475 def on_output_dequeued (self , output : RefBundle ):
485476 """Callback when an output is dequeued by the operator."""
486477 self .obj_store_mem_internal_outqueue_blocks -= len (output .blocks )
487478 output_size = output .size_bytes ()
488- self ._internal_outqueue . remove ( output )
479+ self .obj_store_mem_internal_outqueue -= output_size
489480 assert self .obj_store_mem_internal_outqueue >= 0 , (
490481 self ._op ,
491482 self .obj_store_mem_internal_outqueue ,
@@ -513,7 +504,7 @@ def on_task_submitted(self, task_index: int, inputs: RefBundle):
513504 self .num_tasks_submitted += 1
514505 self .num_tasks_running += 1
515506 self .bytes_inputs_of_submitted_tasks += inputs .size_bytes ()
516- self ._pending_task_inputs . add ( inputs )
507+ self .obj_store_mem_pending_task_inputs += inputs . size_bytes ( )
517508 self ._running_tasks [task_index ] = RunningTaskInfo (inputs , 0 , 0 )
518509
519510 def on_task_output_generated (self , task_index : int , output : RefBundle ):
@@ -553,7 +544,7 @@ def on_task_finished(self, task_index: int, exception: Optional[Exception]):
553544 total_input_size = inputs .size_bytes ()
554545 self .bytes_task_inputs_processed += total_input_size
555546 input_size = inputs .size_bytes ()
556- self ._pending_task_inputs . remove ( inputs )
547+ self .obj_store_mem_pending_task_inputs -= input_size
557548 assert self .obj_store_mem_pending_task_inputs >= 0 , (
558549 self ._op ,
559550 self .obj_store_mem_pending_task_inputs ,
0 commit comments