Skip to content

Commit 65b223a

Browse files
bveeramanimohitjain2504
authored andcommitted
[Data] Add BundleQueue abstraction (ray-project#48503)
Two reasons for adding this abstraction: 1. It allows us to customize how we pop bundles from OpState and internal queues. Currently, we pop the first-in bundle, but you might want a less naive strategy. 2. It allows us to keep track of bundles in OpRuntimeMetrics so that we can refresh stale bundle sizes. --------- Signed-off-by: Balaji Veeramani <[email protected]> Signed-off-by: mohitjain2504 <[email protected]>
1 parent 31be818 commit 65b223a

File tree

11 files changed

+440
-71
lines changed

11 files changed

+440
-71
lines changed

python/ray/data/BUILD

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,14 @@ py_test(
587587
deps = ["//:ray_lib", ":conftest"],
588588
)
589589

590+
py_test(
591+
name = "test_bundle_queue",
592+
size = "small",
593+
srcs = ["tests/test_bundle_queue.py"],
594+
tags = ["team:data", "exclusive"],
595+
deps = ["//:ray_lib", ":conftest"],
596+
)
597+
590598
py_test(
591599
name = "test_autoscaler",
592600
size = "small",
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from .bundle_queue import BundleQueue
2+
from .fifo_bundle_queue import FIFOBundleQueue
3+
4+
5+
def create_bundle_queue() -> BundleQueue:
6+
return FIFOBundleQueue()
7+
8+
9+
__all__ = ["BundleQueue", "create_bundle_queue"]
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import abc
2+
from typing import TYPE_CHECKING, Optional
3+
4+
if TYPE_CHECKING:
5+
from ray.data._internal.execution.interfaces import RefBundle
6+
7+
8+
class BundleQueue(abc.ABC):
9+
@abc.abstractmethod
10+
def __len__(self) -> int:
11+
"""Return the number of bundles in the queue."""
12+
...
13+
14+
@abc.abstractmethod
15+
def __contains__(self, bundle: "RefBundle") -> bool:
16+
"""Return whether the bundle is in the queue."""
17+
...
18+
19+
@abc.abstractmethod
20+
def add(self, bundle: "RefBundle") -> None:
21+
"""Add a bundle to the queue."""
22+
...
23+
24+
@abc.abstractmethod
25+
def pop(self) -> "RefBundle":
26+
"""Remove and return the head of the queue.
27+
Raises:
28+
IndexError: If the queue is empty.
29+
"""
30+
...
31+
32+
@abc.abstractmethod
33+
def peek(self) -> Optional["RefBundle"]:
34+
"""Return the head of the queue without removing it.
35+
If the queue is empty, return `None`.
36+
"""
37+
...
38+
39+
@abc.abstractmethod
40+
def remove(self, bundle: "RefBundle"):
41+
"""Remove a bundle from the queue."""
42+
...
43+
44+
@abc.abstractmethod
45+
def clear(self):
46+
"""Remove all bundles from the queue."""
47+
...
48+
49+
@abc.abstractmethod
50+
def estimate_size_bytes(self) -> int:
51+
"""Return an estimate of the total size of objects in the queue."""
52+
...
53+
54+
@abc.abstractmethod
55+
def is_empty(self):
56+
"""Return whether this queue and all of its internal data structures are empty.
57+
This method is used for testing.
58+
"""
59+
...
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
from collections import defaultdict, deque
2+
from dataclasses import dataclass
3+
from typing import TYPE_CHECKING, Dict, List, Optional
4+
5+
from .bundle_queue import BundleQueue
6+
7+
if TYPE_CHECKING:
8+
from ray.data._internal.execution.interfaces import RefBundle
9+
10+
11+
@dataclass
12+
class _Node:
13+
value: "RefBundle"
14+
next: Optional["_Node"] = None
15+
prev: Optional["_Node"] = None
16+
17+
18+
class FIFOBundleQueue(BundleQueue):
19+
"""A bundle queue that follows a first-in-first-out policy."""
20+
21+
def __init__(self):
22+
# We manually implement a linked list because we need to remove elements
23+
# efficiently, and Python's built-in data structures have O(n) removal time.
24+
self._head: Optional[_Node] = None
25+
self._tail: Optional[_Node] = None
26+
# We use a dictionary to keep track of the nodes corresponding to each bundle.
27+
# This allows us to remove a bundle from the queue in O(1) time. We need a list
28+
# because a bundle can be added to the queue multiple times. Nodes in each list
29+
# are insertion-ordered.
30+
self._bundle_to_nodes: Dict["RefBundle", List[_Node]] = defaultdict(deque)
31+
32+
self._nbytes = 0
33+
self._num_bundles = 0
34+
35+
def __len__(self) -> int:
36+
return self._num_bundles
37+
38+
def __contains__(self, bundle: "RefBundle") -> bool:
39+
return bundle in self._bundle_to_nodes
40+
41+
def add(self, bundle: "RefBundle") -> None:
42+
"""Add a bundle to the end (right) of the queue."""
43+
new_node = _Node(value=bundle, next=None, prev=self._tail)
44+
# Case 1: The queue is empty.
45+
if self._head is None:
46+
assert self._tail is None
47+
self._head = new_node
48+
self._tail = new_node
49+
# Case 2: The queue has at least one element.
50+
else:
51+
self._tail.next = new_node
52+
self._tail = new_node
53+
54+
self._bundle_to_nodes[bundle].append(new_node)
55+
56+
self._nbytes += bundle.size_bytes()
57+
self._num_bundles += 1
58+
59+
def pop(self) -> "RefBundle":
60+
"""Return the first (left) bundle in the queue."""
61+
# Case 1: The queue is empty.
62+
if not self._head:
63+
raise IndexError("You can't pop from an empty queue")
64+
65+
bundle = self._head.value
66+
self.remove(bundle)
67+
68+
return bundle
69+
70+
def peek(self) -> Optional["RefBundle"]:
71+
"""Return the first (left) bundle in the queue without removing it."""
72+
if self._head is None:
73+
return None
74+
75+
return self._head.value
76+
77+
def remove(self, bundle: "RefBundle"):
78+
"""Remove a bundle from the queue.
79+
80+
If there are multiple instances of the bundle in the queue, this method only
81+
removes the first one.
82+
"""
83+
# Case 1: The queue is empty.
84+
if bundle not in self._bundle_to_nodes:
85+
raise ValueError(f"The bundle {bundle} is not in the queue.")
86+
87+
node = self._bundle_to_nodes[bundle].popleft()
88+
if not self._bundle_to_nodes[bundle]:
89+
del self._bundle_to_nodes[bundle]
90+
91+
# Case 2: The bundle is the only element in the queue.
92+
if self._head is self._tail:
93+
self._head = None
94+
self._tail = None
95+
# Case 3: The bundle is the first element in the queue.
96+
elif node is self._head:
97+
self._head = node.next
98+
self._head.prev = None
99+
# Case 4: The bundle is the last element in the queue.
100+
elif node is self._tail:
101+
self._tail = node.prev
102+
self._tail.next = None
103+
# Case 5: The bundle is in the middle of the queue.
104+
else:
105+
node.prev.next = node.next
106+
node.next.prev = node.prev
107+
108+
self._nbytes -= bundle.size_bytes()
109+
assert self._nbytes >= 0, (
110+
"Expected the total size of objects in the queue to be non-negative, but "
111+
f"got {self._nbytes} bytes instead."
112+
)
113+
114+
self._num_bundles -= 1
115+
116+
return node.value
117+
118+
def clear(self):
119+
self._head = None
120+
self._tail = None
121+
self._bundle_to_nodes.clear()
122+
self._nbytes = 0
123+
self._num_bundles = 0
124+
125+
def estimate_size_bytes(self) -> int:
126+
return self._nbytes
127+
128+
def is_empty(self):
129+
return not self._bundle_to_nodes and self._head is None and self._tail is None

python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import TYPE_CHECKING, Any, Dict, List, Optional
55

66
import ray
7+
from ray.data._internal.execution.bundle_queue import create_bundle_queue
78
from ray.data._internal.execution.interfaces.ref_bundle import RefBundle
89
from ray.data._internal.memory_tracing import trace_allocation
910

@@ -267,31 +268,11 @@ class OpRuntimeMetrics(metaclass=OpRuntimesMetricsMeta):
267268
description="Number of blocks in operator's internal input queue.",
268269
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
269270
)
270-
obj_store_mem_internal_inqueue: int = metric_field(
271-
default=0,
272-
description=(
273-
"Byte size of input blocks in the operator's internal input queue."
274-
),
275-
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
276-
)
277271
obj_store_mem_internal_outqueue_blocks: int = metric_field(
278272
default=0,
279273
description="Number of blocks in the operator's internal output queue.",
280274
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
281275
)
282-
obj_store_mem_internal_outqueue: int = metric_field(
283-
default=0,
284-
description=(
285-
"Byte size of output blocks in the operator's internal output queue."
286-
),
287-
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
288-
)
289-
obj_store_mem_pending_task_inputs: int = metric_field(
290-
default=0,
291-
description="Byte size of input blocks used by pending tasks.",
292-
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
293-
map_only=True,
294-
)
295276
obj_store_mem_freed: int = metric_field(
296277
default=0,
297278
description="Byte size of freed memory in object store.",
@@ -323,6 +304,10 @@ def __init__(self, op: "PhysicalOperator"):
323304
# Start time of current pause due to task submission backpressure
324305
self._task_submission_backpressure_start_time = -1
325306

307+
self._internal_inqueue = create_bundle_queue()
308+
self._internal_outqueue = create_bundle_queue()
309+
self._pending_task_inputs = create_bundle_queue()
310+
326311
@property
327312
def extra_metrics(self) -> Dict[str, Any]:
328313
"""Return a dict of extra metrics."""
@@ -377,6 +362,30 @@ def average_bytes_per_output(self) -> Optional[float]:
377362
else:
378363
return self.bytes_task_outputs_generated / self.num_task_outputs_generated
379364

365+
@metric_property(
366+
description="Byte size of input blocks in the operator's internal input queue.",
367+
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
368+
)
369+
def obj_store_mem_internal_inqueue(self) -> int:
370+
return self._internal_inqueue.estimate_size_bytes()
371+
372+
@metric_property(
373+
description=(
374+
"Byte size of output blocks in the operator's internal output queue."
375+
),
376+
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
377+
)
378+
def obj_store_mem_internal_outqueue(self) -> int:
379+
return self._internal_outqueue.estimate_size_bytes()
380+
381+
@metric_property(
382+
description="Byte size of input blocks used by pending tasks.",
383+
metrics_group=MetricsGroup.OBJECT_STORE_MEMORY,
384+
map_only=True,
385+
)
386+
def obj_store_mem_pending_task_inputs(self) -> int:
387+
return self._pending_task_inputs.estimate_size_bytes()
388+
380389
@property
381390
def obj_store_mem_pending_task_outputs(self) -> Optional[float]:
382391
"""Estimated size in bytes of output blocks in Ray generator buffers.
@@ -454,13 +463,13 @@ def on_input_received(self, input: RefBundle):
454463
def on_input_queued(self, input: RefBundle):
455464
"""Callback when the operator queues an input."""
456465
self.obj_store_mem_internal_inqueue_blocks += len(input.blocks)
457-
self.obj_store_mem_internal_inqueue += input.size_bytes()
466+
self._internal_inqueue.add(input)
458467

459468
def on_input_dequeued(self, input: RefBundle):
460469
"""Callback when the operator dequeues an input."""
461470
self.obj_store_mem_internal_inqueue_blocks -= len(input.blocks)
462471
input_size = input.size_bytes()
463-
self.obj_store_mem_internal_inqueue -= input_size
472+
self._internal_inqueue.remove(input)
464473
assert self.obj_store_mem_internal_inqueue >= 0, (
465474
self._op,
466475
self.obj_store_mem_internal_inqueue,
@@ -470,13 +479,13 @@ def on_input_dequeued(self, input: RefBundle):
470479
def on_output_queued(self, output: RefBundle):
471480
"""Callback when an output is queued by the operator."""
472481
self.obj_store_mem_internal_outqueue_blocks += len(output.blocks)
473-
self.obj_store_mem_internal_outqueue += output.size_bytes()
482+
self._internal_outqueue.add(output)
474483

475484
def on_output_dequeued(self, output: RefBundle):
476485
"""Callback when an output is dequeued by the operator."""
477486
self.obj_store_mem_internal_outqueue_blocks -= len(output.blocks)
478487
output_size = output.size_bytes()
479-
self.obj_store_mem_internal_outqueue -= output_size
488+
self._internal_outqueue.remove(output)
480489
assert self.obj_store_mem_internal_outqueue >= 0, (
481490
self._op,
482491
self.obj_store_mem_internal_outqueue,
@@ -504,7 +513,7 @@ def on_task_submitted(self, task_index: int, inputs: RefBundle):
504513
self.num_tasks_submitted += 1
505514
self.num_tasks_running += 1
506515
self.bytes_inputs_of_submitted_tasks += inputs.size_bytes()
507-
self.obj_store_mem_pending_task_inputs += inputs.size_bytes()
516+
self._pending_task_inputs.add(inputs)
508517
self._running_tasks[task_index] = RunningTaskInfo(inputs, 0, 0)
509518

510519
def on_task_output_generated(self, task_index: int, output: RefBundle):
@@ -544,7 +553,7 @@ def on_task_finished(self, task_index: int, exception: Optional[Exception]):
544553
total_input_size = inputs.size_bytes()
545554
self.bytes_task_inputs_processed += total_input_size
546555
input_size = inputs.size_bytes()
547-
self.obj_store_mem_pending_task_inputs -= input_size
556+
self._pending_task_inputs.remove(inputs)
548557
assert self.obj_store_mem_pending_task_inputs >= 0, (
549558
self._op,
550559
self.obj_store_mem_pending_task_inputs,

python/ray/data/_internal/execution/operators/actor_pool_map_operator.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import collections
21
import logging
32
from dataclasses import dataclass
43
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union
@@ -8,6 +7,7 @@
87
from ray.core.generated import gcs_pb2
98
from ray.data._internal.compute import ActorPoolStrategy
109
from ray.data._internal.execution.autoscaler import AutoscalingActorPool
10+
from ray.data._internal.execution.bundle_queue import create_bundle_queue
1111
from ray.data._internal.execution.interfaces import (
1212
ExecutionOptions,
1313
ExecutionResources,
@@ -109,7 +109,7 @@ def __init__(
109109

110110
self._actor_pool = _ActorPool(compute_strategy, self._start_actor)
111111
# A queue of bundles awaiting dispatch to actors.
112-
self._bundle_queue = collections.deque()
112+
self._bundle_queue = create_bundle_queue()
113113
# Cached actor class.
114114
self._cls = None
115115
# Whether no more submittable bundles will be added.
@@ -175,7 +175,7 @@ def _task_done_callback(res_ref):
175175
return actor, res_ref
176176

177177
def _add_bundled_input(self, bundle: RefBundle):
178-
self._bundle_queue.append(bundle)
178+
self._bundle_queue.add(bundle)
179179
self._metrics.on_input_queued(bundle)
180180
# Try to dispatch all bundles in the queue, including this new bundle.
181181
self._dispatch_tasks()
@@ -191,14 +191,14 @@ def _dispatch_tasks(self):
191191
while self._bundle_queue:
192192
# Pick an actor from the pool.
193193
if self._actor_locality_enabled:
194-
actor = self._actor_pool.pick_actor(self._bundle_queue[0])
194+
actor = self._actor_pool.pick_actor(self._bundle_queue.peek())
195195
else:
196196
actor = self._actor_pool.pick_actor()
197197
if actor is None:
198198
# No actors available for executing the next task.
199199
break
200200
# Submit the map task.
201-
bundle = self._bundle_queue.popleft()
201+
bundle = self._bundle_queue.pop()
202202
self._metrics.on_input_dequeued(bundle)
203203
input_blocks = [block for block, _ in bundle.blocks]
204204
ctx = TaskContext(

0 commit comments

Comments
 (0)