Skip to content

Commit

Permalink
[BUG] Perform cleanup of tasks and results when iterator is deleted (#…
Browse files Browse the repository at this point in the history
…2812)

Addresses #2810 

Needs some testing to make sure that it fixes all of it

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Sep 8, 2024
1 parent 1b79bbb commit 4578e36
Showing 1 changed file with 41 additions and 13 deletions.
54 changes: 41 additions & 13 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import contextlib
import logging
import threading
import uuid
from concurrent import futures
from dataclasses import dataclass
from typing import Iterator
Expand Down Expand Up @@ -34,6 +35,13 @@
logger = logging.getLogger(__name__)


# Unique UUID for each execution
ExecutionID = str

# Unique ID for each task
TaskID = str


class LocalPartitionSet(PartitionSet[MicroPartition]):
_partitions: dict[PartID, MaterializedResult[MicroPartition]]

Expand Down Expand Up @@ -223,7 +231,7 @@ def __init__(self, use_thread_pool: bool | None) -> None:
self._actor_pools: dict[str, PyActorPool] = {}

# Global accounting of tasks and resources
self._inflight_futures: dict[str, futures.Future] = {}
self._inflight_futures: dict[tuple[ExecutionID, TaskID], futures.Future] = {}

system_info = SystemInfo()
num_cpus = system_info.cpu_count()
Expand Down Expand Up @@ -263,6 +271,7 @@ def run_iter(
) -> Iterator[PyMaterializedResult]:
# NOTE: Freeze and use this same execution config for the entire execution
daft_execution_config = get_context().daft_execution_config
execution_id = str(uuid.uuid4())

# Optimize the logical plan.
builder = builder.optimize()
Expand All @@ -277,7 +286,7 @@ def run_iter(
results_buffer_size,
)
del plan_scheduler
results_gen = self._physical_plan_to_partitions(tasks)
results_gen = self._physical_plan_to_partitions(execution_id, tasks)
# if source_id is none that means this is the final stage
if source_id is None:
yield from results_gen
Expand Down Expand Up @@ -310,7 +319,7 @@ def run_iter(
tasks = plan_scheduler.to_partition_tasks(psets, results_buffer_size)
del psets
with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"):
results_gen = self._physical_plan_to_partitions(tasks)
results_gen = self._physical_plan_to_partitions(execution_id, tasks)
yield from results_gen

def run_iter_tables(
Expand Down Expand Up @@ -346,7 +355,9 @@ def actor_pool_context(
del self._actor_pools[actor_pool_id]

def _physical_plan_to_partitions(
self, plan: physical_plan.MaterializedPhysicalPlan[MicroPartition]
self,
execution_id: str,
plan: physical_plan.MaterializedPhysicalPlan[MicroPartition],
) -> Iterator[PyMaterializedResult]:
local_futures_to_task: dict[futures.Future, PartitionTask] = {}
pbar = ProgressBar(use_ray_tqdm=False)
Expand All @@ -360,14 +371,16 @@ def _physical_plan_to_partitions(
while True:
if next_step is None:
# Blocked on already dispatched tasks; await some tasks.
logger.debug("Skipping to wait on dispatched tasks: plan waiting on work")
logger.debug(
"execution[%s] Skipping to wait on dispatched tasks: plan waiting on work", execution_id
)
break

elif isinstance(next_step, MaterializedResult):
assert isinstance(next_step, PyMaterializedResult)

# A final result.
logger.debug("Yielding completed step")
logger.debug("execution[%s] Yielding completed step", execution_id)
yield next_step
next_step = next(plan)
continue
Expand All @@ -380,7 +393,10 @@ def _physical_plan_to_partitions(

if not task_admitted:
# Insufficient resources; await some tasks.
logger.debug("Skipping to wait on dispatched tasks: insufficient resources")
logger.debug(
"execution[%s] Skipping to wait on dispatched tasks: insufficient resources",
execution_id,
)
break

# Run the task in the main thread, instead of the thread pool, in certain conditions:
Expand All @@ -397,7 +413,8 @@ def _physical_plan_to_partitions(
)
):
logger.debug(
"Running task synchronously in main thread: %s",
"execution[%s] Running task synchronously in main thread: %s",
execution_id,
next_step,
)
materialized_results = self.build_partitions(
Expand All @@ -412,7 +429,7 @@ def _physical_plan_to_partitions(

else:
# Submit the task for execution.
logger.debug("Submitting task for execution: %s", next_step)
logger.debug("execution[%s] Submitting task for execution: %s", execution_id, next_step)

# update progress bar
pbar.mark_task_start(next_step)
Expand Down Expand Up @@ -443,7 +460,7 @@ def _physical_plan_to_partitions(
assert (
next_step.id() not in local_futures_to_task
), "Step IDs should be unique - this indicates an internal error, please file an issue!"
self._inflight_futures[next_step.id()] = future
self._inflight_futures[(execution_id, next_step.id())] = future
local_futures_to_task[future] = next_step

next_step = next(plan)
Expand All @@ -463,10 +480,11 @@ def _physical_plan_to_partitions(
materialized_results = done_future.result()

pbar.mark_task_done(done_task)
del self._inflight_futures[done_task.id()]
del self._inflight_futures[(execution_id, done_task.id())]

logger.debug(
"Task completed: %s -> <%s partitions>",
"execution[%s] Task completed: %s -> <%s partitions>",
execution_id,
done_task.id(),
len(materialized_results),
)
Expand All @@ -476,9 +494,19 @@ def _physical_plan_to_partitions(
if next_step is None:
next_step = next(plan)

# StopIteration is raised when the plan is exhausted, and all materialized results have been yielded.
except StopIteration:
logger.debug("execution[%s] Exhausted all materialized results", execution_id)

# Perform any cleanups when the generator is closed (StopIteration is raised, generator is deleted with `__del__` on GC, etc)
finally:
# Close the progress bar
pbar.close()
return

# Cleanup any remaining inflight futures/results from this local execution
for (exec_id, task_id), _ in list(self._inflight_futures.items()):
if exec_id == execution_id:
del self._inflight_futures[(exec_id, task_id)]

def _check_resource_requests(self, resource_request: ResourceRequest) -> None:
"""Validates that the requested ResourceRequest is possible to run locally"""
Expand Down

0 comments on commit 4578e36

Please sign in to comment.