Skip to content

Commit 1e4d129

Browse files
authored
fix: Account for unschedulable udf actors (#4987)
## Changes Made Raises error in situations where actors for actor UDFs (i.e. concurrency udfs) cannot be scheduled due to resource constraints. This is implemented via timeout (default 60s) on the readiness check for the actors. Additionally, each swordfish task that needs to run actor UDFs will receive the full list of actor handles, and will check for which actors are ready + local. This is to account for the fact that the UDF actors can be spawned incrementally, and when killed, can spawn on different nodes. ## Related Issues Closes #3934 ## Checklist - [ ] Documented in API Docs (if applicable) - [ ] Documented in User Guide (if applicable) - [ ] If adding a new documentation page, doc is added to `docs/mkdocs.yml` navigation - [ ] Documentation builds and is formatted properly (tag @/ccmao1130 for docs review)
1 parent 10ccc4a commit 1e4d129

File tree

18 files changed

+202
-106
lines changed

18 files changed

+202
-106
lines changed

daft/context.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ def set_execution_config(
204204
native_parquet_writer: bool | None = None,
205205
use_legacy_ray_runner: bool | None = None,
206206
min_cpu_per_task: float | None = None,
207+
actor_udf_ready_timeout: int | None = None,
207208
) -> DaftContext:
208209
"""Globally sets various configuration parameters which control various aspects of Daft execution.
209210
@@ -253,6 +254,7 @@ def set_execution_config(
253254
native_parquet_writer: Whether to use the native parquet writer vs the pyarrow parquet writer. Defaults to `True`.
254255
use_legacy_ray_runner: Whether to use the legacy ray runner. Defaults to `False`.
255256
min_cpu_per_task: Minimum CPU per task in the Ray runner. Defaults to 0.5.
257+
actor_udf_ready_timeout: Timeout for UDF actors to be ready. Defaults to 60 seconds.
256258
"""
257259
# Replace values in the DaftExecutionConfig with user-specified overrides
258260
ctx = get_context()
@@ -289,6 +291,7 @@ def set_execution_config(
289291
native_parquet_writer=native_parquet_writer,
290292
use_legacy_ray_runner=use_legacy_ray_runner,
291293
min_cpu_per_task=min_cpu_per_task,
294+
actor_udf_ready_timeout=actor_udf_ready_timeout,
292295
)
293296

294297
ctx._ctx._daft_execution_config = new_daft_execution_config

daft/daft/__init__.pyi

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1960,6 +1960,7 @@ class PyDaftExecutionConfig:
19601960
native_parquet_writer: bool | None = None,
19611961
use_legacy_ray_runner: bool | None = None,
19621962
min_cpu_per_task: float | None = None,
1963+
actor_udf_ready_timeout: int | None = None,
19631964
) -> PyDaftExecutionConfig: ...
19641965
@property
19651966
def scan_tasks_min_size_bytes(self) -> int: ...
@@ -2012,6 +2013,8 @@ class PyDaftExecutionConfig:
20122013
@property
20132014
def min_cpu_per_task(self) -> float: ...
20142015
@property
2016+
def actor_udf_ready_timeout(self) -> int: ...
2017+
@property
20152018
def scantask_max_parallel(self) -> int: ...
20162019

20172020
class PyDaftPlanningConfig:

daft/execution/ray_actor_pool_udf.py

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
from daft.recordbatch.micropartition import MicroPartition
88

99
if TYPE_CHECKING:
10+
from ray.actor import ActorHandle as RayActorHandle
11+
1012
from daft.daft import PyExpr, PyMicroPartition
1113

1214
try:
@@ -36,30 +38,50 @@ def eval_input(self, input: PyMicroPartition) -> PyMicroPartition:
3638

3739

3840
class UDFActorHandle:
39-
def __init__(self, node_id: str, actor_ref: ray.ObjectRef) -> None:
40-
self.node_id = node_id
41+
def __init__(self, actor_ref: RayActorHandle) -> None:
4142
self.actor = actor_ref
4243

44+
def actor_id(self) -> str:
45+
return self.actor._actor_id.hex()
46+
4347
async def eval_input(self, input: PyMicroPartition) -> PyMicroPartition:
4448
return await self.actor.eval_input.remote(input)
4549

46-
def is_on_current_node(self) -> bool:
47-
return self.node_id == ray.get_runtime_context().get_node_id()
48-
4950
def teardown(self) -> None:
5051
ray.kill(self.actor)
5152

5253

54+
def get_ready_actors_by_location(
55+
actor_handles: list[UDFActorHandle],
56+
) -> tuple[list[UDFActorHandle], list[UDFActorHandle]]:
57+
from ray._private.state import actors
58+
59+
current_node_id = ray.get_runtime_context().get_node_id()
60+
61+
local_actors = []
62+
remote_actors = []
63+
for actor_handle in actor_handles:
64+
actor_id = actor_handle.actor_id()
65+
actor_state = actors(actor_id)
66+
if actor_state["Address"]["NodeID"] == current_node_id:
67+
local_actors.append(actor_handle)
68+
else:
69+
remote_actors.append(actor_handle)
70+
71+
return local_actors, remote_actors
72+
73+
5374
async def start_udf_actors(
5475
projection: list[PyExpr],
5576
num_actors: int,
5677
num_gpus_per_actor: float,
5778
num_cpus_per_actor: float,
5879
memory_per_actor: float,
80+
timeout: int,
5981
) -> list[UDFActorHandle]:
6082
expr_projection = ExpressionsProjection([Expression._from_pyexpr(expr) for expr in projection])
6183

62-
actors = [
84+
actors: list[RayActorHandle] = [
6385
UDFActor.options( # type: ignore
6486
scheduling_strategy="SPREAD",
6587
num_gpus=num_gpus_per_actor,
@@ -68,6 +90,20 @@ async def start_udf_actors(
6890
).remote(expr_projection)
6991
for _ in range(num_actors)
7092
]
71-
node_ids = await asyncio.gather(*[actor.get_node_id.remote() for actor in actors])
72-
handles = [UDFActorHandle(node_id, actor) for actor, node_id in zip(actors, node_ids)]
73-
return handles
93+
94+
# Wait for actors to be ready
95+
ready_futures = [asyncio.wrap_future(actor.__ray_ready__.remote().future()) for actor in actors]
96+
ready_refs, _ = await asyncio.wait(ready_futures, return_when=asyncio.ALL_COMPLETED, timeout=timeout)
97+
98+
# Verify that the __ray_ready__ calls were successful
99+
await asyncio.gather(*ready_refs)
100+
101+
if not ready_refs:
102+
raise RuntimeError(
103+
f"UDF actors failed to start within {timeout} seconds, please increase the actor_udf_ready_timeout config via daft.set_execution_config(actor_udf_ready_timeout=timeout)"
104+
)
105+
106+
# Return the ready actors
107+
ready_indices = [ready_futures.index(ref) for ref in ready_refs]
108+
ready_actors = [UDFActorHandle(actors[i]) for i in ready_indices]
109+
return ready_actors

src/common/daft-config/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ pub struct DaftExecutionConfig {
7474
pub native_parquet_writer: bool,
7575
pub use_legacy_ray_runner: bool,
7676
pub min_cpu_per_task: f64,
77+
pub actor_udf_ready_timeout: usize,
7778
}
7879

7980
impl Default for DaftExecutionConfig {
@@ -110,6 +111,7 @@ impl Default for DaftExecutionConfig {
110111
native_parquet_writer: true,
111112
use_legacy_ray_runner: false,
112113
min_cpu_per_task: 0.5,
114+
actor_udf_ready_timeout: 60,
113115
}
114116
}
115117
}

src/common/daft-config/src/python.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ impl PyDaftExecutionConfig {
118118
native_parquet_writer=None,
119119
use_legacy_ray_runner=None,
120120
min_cpu_per_task=None,
121+
actor_udf_ready_timeout=None,
121122
))]
122123
fn with_config_values(
123124
&self,
@@ -150,6 +151,7 @@ impl PyDaftExecutionConfig {
150151
native_parquet_writer: Option<bool>,
151152
use_legacy_ray_runner: Option<bool>,
152153
min_cpu_per_task: Option<f64>,
154+
actor_udf_ready_timeout: Option<usize>,
153155
) -> PyResult<Self> {
154156
let mut config = self.config.as_ref().clone();
155157

@@ -266,6 +268,10 @@ impl PyDaftExecutionConfig {
266268
config.min_cpu_per_task = min_cpu_per_task;
267269
}
268270

271+
if let Some(actor_udf_ready_timeout) = actor_udf_ready_timeout {
272+
config.actor_udf_ready_timeout = actor_udf_ready_timeout;
273+
}
274+
269275
Ok(Self {
270276
config: Arc::new(config),
271277
})
@@ -396,6 +402,11 @@ impl PyDaftExecutionConfig {
396402
fn min_cpu_per_task(&self) -> PyResult<f64> {
397403
Ok(self.config.min_cpu_per_task)
398404
}
405+
406+
#[getter]
407+
fn actor_udf_ready_timeout(&self) -> PyResult<usize> {
408+
Ok(self.config.actor_udf_ready_timeout)
409+
}
399410
}
400411

401412
impl_bincode_py_state_serialization!(PyDaftExecutionConfig);

src/daft-distributed/src/pipeline_node/actor_udf.rs

Lines changed: 30 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@ use super::{
1414
PipelineNodeContext, SubmittableTaskStream, TreeDisplay,
1515
};
1616
use crate::{
17-
scheduling::{
18-
scheduler::SubmittableTask,
19-
task::{SwordfishTask, Task},
20-
},
17+
pipeline_node::append_plan_to_existing_task,
18+
scheduling::{scheduler::SubmittableTask, task::SwordfishTask},
2119
stage::{StageConfig, StageExecutionContext},
2220
utils::{
2321
channel::{Sender, create_channel},
@@ -37,6 +35,7 @@ impl UDFActors {
3735
async fn initialize_actors(
3836
projection: &[BoundExpr],
3937
udf_properties: &UDFProperties,
38+
actor_ready_timeout: usize,
4039
) -> DaftResult<Vec<PyObjectWrapper>> {
4140
let (task_locals, py_exprs) = Python::with_gil(|py| {
4241
let task_locals = crate::utils::runtime::PYO3_ASYNC_RUNTIME_LOCALS
@@ -77,6 +76,7 @@ impl UDFActors {
7776
gpu_request,
7877
cpu_request,
7978
memory_request,
79+
actor_ready_timeout,
8080
),
8181
)?;
8282
pyo3_async_runtimes::tokio::into_future(coroutine)
@@ -98,10 +98,12 @@ impl UDFActors {
9898
Ok(actors)
9999
}
100100

101-
async fn get_actors(&mut self) -> DaftResult<Vec<PyObjectWrapper>> {
101+
async fn get_actors(&mut self, actor_ready_timeout: usize) -> DaftResult<Vec<PyObjectWrapper>> {
102102
match self {
103103
Self::Uninitialized(projection, udf_properties) => {
104-
let actors = Self::initialize_actors(projection, udf_properties).await?;
104+
let actors =
105+
Self::initialize_actors(projection, udf_properties, actor_ready_timeout)
106+
.await?;
105107
*self = Self::Initialized {
106108
actors: actors.clone(),
107109
};
@@ -130,6 +132,7 @@ pub(crate) struct ActorUDF {
130132
child: Arc<dyn DistributedPipelineNode>,
131133
projection: Vec<BoundExpr>,
132134
udf_properties: UDFProperties,
135+
actor_ready_timeout: usize,
133136
}
134137

135138
impl ActorUDF {
@@ -164,6 +167,7 @@ impl ActorUDF {
164167
child,
165168
projection,
166169
udf_properties,
170+
actor_ready_timeout: stage_config.config.actor_udf_ready_timeout,
167171
})
168172
}
169173

@@ -181,9 +185,9 @@ impl ActorUDF {
181185

182186
let mut running_tasks = JoinSet::new();
183187
while let Some(task) = input_task_stream.next().await {
184-
let actors = udf_actors.get_actors().await?;
188+
let actors = udf_actors.get_actors(self.actor_ready_timeout).await?;
185189

186-
let modified_task = self.append_actor_udf_to_task(task, actors)?;
190+
let modified_task = self.append_actor_udf_to_task(task, actors);
187191
let (submittable_task, notify_token) = modified_task.add_notify_token();
188192
running_tasks.spawn(notify_token);
189193
if result_tx.send(submittable_task).await.is_err() {
@@ -202,10 +206,10 @@ impl ActorUDF {
202206
}
203207

204208
fn append_actor_udf_to_task(
205-
&self,
209+
self: &Arc<Self>,
206210
submittable_task: SubmittableTask<SwordfishTask>,
207211
actors: Vec<PyObjectWrapper>,
208-
) -> DaftResult<SubmittableTask<SwordfishTask>> {
212+
) -> SubmittableTask<SwordfishTask> {
209213
let memory_request = self
210214
.udf_properties
211215
.resource_request
@@ -214,33 +218,22 @@ impl ActorUDF {
214218
.map(|m| m as u64)
215219
.unwrap_or(0);
216220

217-
let mut task_context = submittable_task.task().task_context();
218-
if let Some(logical_node_id) = self.context.logical_node_id {
219-
task_context.add_logical_node_id(logical_node_id);
220-
}
221-
let task_plan = submittable_task.task().plan();
222-
let actor_pool_project_plan = LocalPhysicalPlan::distributed_actor_pool_project(
223-
task_plan,
224-
actors,
225-
self.udf_properties.batch_size,
226-
memory_request,
227-
self.config.schema.clone(),
228-
StatsState::NotMaterialized,
229-
);
230-
231-
// Set scheduling strategy based on whether we have a valid worker ID
232-
let scheduling_strategy = submittable_task.task().strategy().clone();
233-
let psets = submittable_task.task().psets().clone();
234-
235-
let task = submittable_task.with_new_task(SwordfishTask::new(
236-
task_context,
237-
actor_pool_project_plan,
238-
self.config.execution_config.clone(),
239-
psets,
240-
scheduling_strategy,
241-
self.context.to_hashmap(),
242-
));
243-
Ok(task)
221+
let batch_size = self.udf_properties.batch_size;
222+
let schema = self.config.schema.clone();
223+
append_plan_to_existing_task(
224+
submittable_task,
225+
&(self.clone() as Arc<dyn DistributedPipelineNode>),
226+
&move |input| {
227+
LocalPhysicalPlan::distributed_actor_pool_project(
228+
input,
229+
actors.clone(),
230+
batch_size,
231+
memory_request,
232+
schema.clone(),
233+
StatsState::NotMaterialized,
234+
)
235+
},
236+
)
244237
}
245238

246239
fn multiline_display(&self) -> Vec<String> {

src/daft-local-execution/src/intermediate_ops/cross_join.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ impl IntermediateOperator for CrossJoinOperator {
142142
]
143143
}
144144

145-
fn make_state(&self) -> DaftResult<Self::State> {
145+
async fn make_state(&self) -> DaftResult<Self::State> {
146146
Ok(CrossJoinState::new(self.state_bridge.clone()))
147147
}
148148
}

0 commit comments

Comments
 (0)