Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Use shared thread pool for multiple running instances of df on pyrunner #2502

Merged
merged 3 commits into from
Jul 18, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 100 additions & 98 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
from concurrent import futures
from dataclasses import dataclass
from typing import Iterable, Iterator
from typing import Iterator

from daft.context import get_context
from daft.daft import FileFormatConfig, FileInfos, IOConfig, ResourceRequest, SystemInfo
Expand Down Expand Up @@ -116,7 +116,14 @@ def glob_paths_details(
class PyRunner(Runner[MicroPartition]):
def __init__(self, use_thread_pool: bool | None) -> None:
super().__init__()

self._use_thread_pool: bool = use_thread_pool if use_thread_pool is not None else True
self._thread_pool = futures.ThreadPoolExecutor()

# Global accounting of tasks and resources
self._inflight_tasks_resources: dict[str, ResourceRequest] = dict()
self._inflight_tasks: dict[str, PartitionTask] = dict()

system_info = SystemInfo()
num_cpus = system_info.cpu_count()
if num_cpus is None:
Expand Down Expand Up @@ -210,113 +217,108 @@ def run_iter_tables(
def _physical_plan_to_partitions(
self, plan: physical_plan.MaterializedPhysicalPlan[MicroPartition]
) -> Iterator[PyMaterializedResult]:
inflight_tasks: dict[str, PartitionTask] = dict()
inflight_tasks_resources: dict[str, ResourceRequest] = dict()
future_to_task: dict[futures.Future, str] = dict()

pbar = ProgressBar(use_ray_tqdm=False)
with futures.ThreadPoolExecutor() as thread_pool:
try:
next_step = next(plan)

# Dispatch->Await loop.
try:
next_step = next(plan)

# Dispatch->Await loop.
while True:
# Dispatch loop.
while True:
# Dispatch loop.
while True:
if next_step is None:
# Blocked on already dispatched tasks; await some tasks.
break

elif isinstance(next_step, MaterializedResult):
assert isinstance(next_step, PyMaterializedResult)

# A final result.
yield next_step
next_step = next(plan)
continue

elif not self._can_admit_task(
next_step.resource_request,
inflight_tasks_resources.values(),
if next_step is None:
# Blocked on already dispatched tasks; await some tasks.
break

elif isinstance(next_step, MaterializedResult):
assert isinstance(next_step, PyMaterializedResult)

# A final result.
yield next_step
next_step = next(plan)
continue

elif not self._can_admit_task(
next_step.resource_request,
):
# Insufficient resources; await some tasks.
break

else:
# next_task is a task to run.

# Run the task in the main thread, instead of the thread pool, in certain conditions:
# - Threading is disabled in runner config.
# - Task is a no-op.
# - Task requires GPU.
# TODO(charles): Queue these up until the physical plan is blocked to avoid starving cluster.
if (
not self._use_thread_pool
or len(next_step.instructions) == 0
or (
next_step.resource_request.num_gpus is not None
and next_step.resource_request.num_gpus > 0
)
):
# Insufficient resources; await some tasks.
break
logger.debug(
"Running task synchronously in main thread: %s",
next_step,
)
materialized_results = self.build_partitions(
next_step.instructions,
next_step.inputs,
next_step.partial_metadatas,
)
next_step.set_result(materialized_results)

else:
# next_task is a task to run.

# Run the task in the main thread, instead of the thread pool, in certain conditions:
# - Threading is disabled in runner config.
# - Task is a no-op.
# - Task requires GPU.
# TODO(charles): Queue these up until the physical plan is blocked to avoid starving cluster.
if (
not self._use_thread_pool
or len(next_step.instructions) == 0
or (
next_step.resource_request.num_gpus is not None
and next_step.resource_request.num_gpus > 0
)
):
logger.debug(
"Running task synchronously in main thread: %s",
next_step,
)
materialized_results = self.build_partitions(
next_step.instructions,
next_step.inputs,
next_step.partial_metadatas,
)
next_step.set_result(materialized_results)

else:
# Submit the task for execution.
logger.debug("Submitting task for execution: %s", next_step)

# update progress bar
pbar.mark_task_start(next_step)

future = thread_pool.submit(
self.build_partitions,
next_step.instructions,
next_step.inputs,
next_step.partial_metadatas,
)
# Register the inflight task and resources used.
future_to_task[future] = next_step.id()

inflight_tasks[next_step.id()] = next_step
inflight_tasks_resources[next_step.id()] = next_step.resource_request

next_step = next(plan)

# Await at least one task and process the results.
assert (
len(future_to_task) > 0
), "Scheduler deadlocked! This should never happen. Please file an issue."
done_set, _ = futures.wait(list(future_to_task.keys()), return_when=futures.FIRST_COMPLETED)
for done_future in done_set:
done_id = future_to_task.pop(done_future)
del inflight_tasks_resources[done_id]
done_task = inflight_tasks.pop(done_id)
materialized_results = done_future.result()

pbar.mark_task_done(done_task)

logger.debug(
"Task completed: %s -> <%s partitions>",
done_id,
len(materialized_results),
)

done_task.set_result(materialized_results)
# Submit the task for execution.
logger.debug("Submitting task for execution: %s", next_step)

# update progress bar
pbar.mark_task_start(next_step)

future = self._thread_pool.submit(
self.build_partitions,
next_step.instructions,
next_step.inputs,
next_step.partial_metadatas,
)
# Register the inflight task and resources used.
future_to_task[future] = next_step.id()

self._inflight_tasks[next_step.id()] = next_step
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if the id is guaranteed to be unique across dataframe executions?
let's add an assert that

next_step.id() not in self._inflight_tasks_resources

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checked: I think it should be

We have a singleton ID_GEN = itertools.count() that is used to generate the IDs, and it is supposedly threadsafe thanks to the GIL.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Also added the assert)

self._inflight_tasks_resources[next_step.id()] = next_step.resource_request

if next_step is None:
next_step = next(plan)

except StopIteration:
pbar.close()
return
# Await at least one task and process the results.
assert len(future_to_task) > 0, "Scheduler deadlocked! This should never happen. Please file an issue."
done_set, _ = futures.wait(list(future_to_task.keys()), return_when=futures.FIRST_COMPLETED)
for done_future in done_set:
done_id = future_to_task.pop(done_future)
del self._inflight_tasks_resources[done_id]
done_task = self._inflight_tasks.pop(done_id)
materialized_results = done_future.result()

pbar.mark_task_done(done_task)

logger.debug(
"Task completed: %s -> <%s partitions>",
done_id,
len(materialized_results),
)

done_task.set_result(materialized_results)

if next_step is None:
next_step = next(plan)

except StopIteration:
pbar.close()
return

def _check_resource_requests(self, resource_request: ResourceRequest) -> None:
"""Validates that the requested ResourceRequest is possible to run locally"""
Expand All @@ -333,10 +335,10 @@ def _check_resource_requests(self, resource_request: ResourceRequest) -> None:
def _can_admit_task(
self,
resource_request: ResourceRequest,
inflight_resources: Iterable[ResourceRequest],
) -> bool:
self._check_resource_requests(resource_request)

inflight_resources = self._inflight_tasks_resources.values()
total_inflight_resources: ResourceRequest = sum(inflight_resources, ResourceRequest())
cpus_okay = (total_inflight_resources.num_cpus or 0) + (resource_request.num_cpus or 0) <= self.num_cpus
gpus_okay = (total_inflight_resources.num_gpus or 0) + (resource_request.num_gpus or 0) <= self.num_gpus
Expand Down
Loading