Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -587,14 +587,6 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_bundle_queue",
size = "small",
srcs = ["tests/test_bundle_queue.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_autoscaler",
size = "small",
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional

import ray
from ray.data._internal.execution.bundle_queue import create_bundle_queue
from ray.data._internal.execution.interfaces.ref_bundle import RefBundle
from ray.data._internal.memory_tracing import trace_allocation

Expand Down Expand Up @@ -268,11 +267,31 @@ class OpRuntimeMetrics(metaclass=OpRuntimesMetricsMeta):
description="Number of blocks in operator's internal input queue.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
obj_store_mem_internal_inqueue: int = metric_field(
default=0,
description=(
"Byte size of input blocks in the operator's internal input queue."
),
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
obj_store_mem_internal_outqueue_blocks: int = metric_field(
default=0,
description="Number of blocks in the operator's internal output queue.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
obj_store_mem_internal_outqueue: int = metric_field(
default=0,
description=(
"Byte size of output blocks in the operator's internal output queue."
),
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
obj_store_mem_pending_task_inputs: int = metric_field(
default=0,
description="Byte size of input blocks used by pending tasks.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
map_only=True,
)
obj_store_mem_freed: int = metric_field(
default=0,
description="Byte size of freed memory in object store.",
Expand Down Expand Up @@ -304,10 +323,6 @@ def __init__(self, op: "PhysicalOperator"):
# Start time of current pause due to task submission backpressure
self._task_submission_backpressure_start_time = -1

self._internal_inqueue = create_bundle_queue()
self._internal_outqueue = create_bundle_queue()
self._pending_task_inputs = create_bundle_queue()

@property
def extra_metrics(self) -> Dict[str, Any]:
"""Return a dict of extra metrics."""
Expand Down Expand Up @@ -362,30 +377,6 @@ def average_bytes_per_output(self) -> Optional[float]:
else:
return self.bytes_task_outputs_generated / self.num_task_outputs_generated

@metric_property(
description="Byte size of input blocks in the operator's internal input queue.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
def obj_store_mem_internal_inqueue(self) -> int:
return self._internal_inqueue.estimate_size_bytes()

@metric_property(
description=(
"Byte size of output blocks in the operator's internal output queue."
),
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
)
def obj_store_mem_internal_outqueue(self) -> int:
return self._internal_outqueue.estimate_size_bytes()

@metric_property(
description="Byte size of input blocks used by pending tasks.",
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
map_only=True,
)
def obj_store_mem_pending_task_inputs(self) -> int:
return self._pending_task_inputs.estimate_size_bytes()

@property
def obj_store_mem_pending_task_outputs(self) -> Optional[float]:
"""Estimated size in bytes of output blocks in Ray generator buffers.
Expand Down Expand Up @@ -463,13 +454,13 @@ def on_input_received(self, input: RefBundle):
def on_input_queued(self, input: RefBundle):
"""Callback when the operator queues an input."""
self.obj_store_mem_internal_inqueue_blocks += len(input.blocks)
self._internal_inqueue.add(input)
self.obj_store_mem_internal_inqueue += input.size_bytes()

def on_input_dequeued(self, input: RefBundle):
"""Callback when the operator dequeues an input."""
self.obj_store_mem_internal_inqueue_blocks -= len(input.blocks)
input_size = input.size_bytes()
self._internal_inqueue.remove(input)
self.obj_store_mem_internal_inqueue -= input_size
assert self.obj_store_mem_internal_inqueue >= 0, (
self._op,
self.obj_store_mem_internal_inqueue,
Expand All @@ -479,13 +470,13 @@ def on_input_dequeued(self, input: RefBundle):
def on_output_queued(self, output: RefBundle):
"""Callback when an output is queued by the operator."""
self.obj_store_mem_internal_outqueue_blocks += len(output.blocks)
self._internal_outqueue.add(output)
self.obj_store_mem_internal_outqueue += output.size_bytes()

def on_output_dequeued(self, output: RefBundle):
"""Callback when an output is dequeued by the operator."""
self.obj_store_mem_internal_outqueue_blocks -= len(output.blocks)
output_size = output.size_bytes()
self._internal_outqueue.remove(output)
self.obj_store_mem_internal_outqueue -= output_size
assert self.obj_store_mem_internal_outqueue >= 0, (
self._op,
self.obj_store_mem_internal_outqueue,
Expand Down Expand Up @@ -513,7 +504,7 @@ def on_task_submitted(self, task_index: int, inputs: RefBundle):
self.num_tasks_submitted += 1
self.num_tasks_running += 1
self.bytes_inputs_of_submitted_tasks += inputs.size_bytes()
self._pending_task_inputs.add(inputs)
self.obj_store_mem_pending_task_inputs += inputs.size_bytes()
self._running_tasks[task_index] = RunningTaskInfo(inputs, 0, 0)

def on_task_output_generated(self, task_index: int, output: RefBundle):
Expand Down Expand Up @@ -553,7 +544,7 @@ def on_task_finished(self, task_index: int, exception: Optional[Exception]):
total_input_size = inputs.size_bytes()
self.bytes_task_inputs_processed += total_input_size
input_size = inputs.size_bytes()
self._pending_task_inputs.remove(inputs)
self.obj_store_mem_pending_task_inputs -= input_size
assert self.obj_store_mem_pending_task_inputs >= 0, (
self._op,
self.obj_store_mem_pending_task_inputs,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import collections
import logging
from dataclasses import dataclass
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union
Expand All @@ -7,7 +8,6 @@
from ray.core.generated import gcs_pb2
from ray.data._internal.compute import ActorPoolStrategy
from ray.data._internal.execution.autoscaler import AutoscalingActorPool
from ray.data._internal.execution.bundle_queue import create_bundle_queue
from ray.data._internal.execution.interfaces import (
ExecutionOptions,
ExecutionResources,
Expand Down Expand Up @@ -109,7 +109,7 @@ def __init__(

self._actor_pool = _ActorPool(compute_strategy, self._start_actor)
# A queue of bundles awaiting dispatch to actors.
self._bundle_queue = create_bundle_queue()
self._bundle_queue = collections.deque()
# Cached actor class.
self._cls = None
# Whether no more submittable bundles will be added.
Expand Down Expand Up @@ -175,7 +175,7 @@ def _task_done_callback(res_ref):
return actor, res_ref

def _add_bundled_input(self, bundle: RefBundle):
self._bundle_queue.add(bundle)
self._bundle_queue.append(bundle)
self._metrics.on_input_queued(bundle)
# Try to dispatch all bundles in the queue, including this new bundle.
self._dispatch_tasks()
Expand All @@ -191,14 +191,14 @@ def _dispatch_tasks(self):
while self._bundle_queue:
# Pick an actor from the pool.
if self._actor_locality_enabled:
actor = self._actor_pool.pick_actor(self._bundle_queue.peek())
actor = self._actor_pool.pick_actor(self._bundle_queue[0])
else:
actor = self._actor_pool.pick_actor()
if actor is None:
# No actors available for executing the next task.
break
# Submit the map task.
bundle = self._bundle_queue.pop()
bundle = self._bundle_queue.popleft()
self._metrics.on_input_dequeued(bundle)
input_blocks = [block for block, _ in bundle.blocks]
ctx = TaskContext(
Expand Down
Loading
Loading