Skip to content

Commit

Permalink
[CHORE] Remove daft-execution (#2553)
Browse files Browse the repository at this point in the history
Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
colin-ho and Colin Ho authored Jul 26, 2024
1 parent 75a5d52 commit 12f92ff
Show file tree
Hide file tree
Showing 55 changed files with 9 additions and 5,493 deletions.
29 changes: 0 additions & 29 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ daft-compression = {path = "src/daft-compression", default-features = false}
daft-core = {path = "src/daft-core", default-features = false}
daft-csv = {path = "src/daft-csv", default-features = false}
daft-dsl = {path = "src/daft-dsl", default-features = false}
daft-execution = {path = "src/daft-execution", default-features = false}
daft-functions = {path = "src/daft-functions", default-features = false}
daft-io = {path = "src/daft-io", default-features = false}
daft-json = {path = "src/daft-json", default-features = false}
Expand Down Expand Up @@ -35,7 +34,6 @@ python = [
"daft-core/python",
"daft-csv/python",
"daft-dsl/python",
"daft-execution/python",
"daft-local-execution/python",
"daft-io/python",
"daft-json/python",
Expand Down Expand Up @@ -106,7 +104,6 @@ members = [
"src/common/daft-config",
"src/common/system-info",
"src/daft-core",
"src/daft-execution",
"src/daft-local-execution",
"src/daft-io",
"src/daft-parquet",
Expand Down
11 changes: 0 additions & 11 deletions daft/plan_scheduler/physical_plan_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from __future__ import annotations

from typing import Iterator

from daft.daft import (
AdaptivePhysicalPlanScheduler as _AdaptivePhysicalPlanScheduler,
)
Expand All @@ -12,12 +10,9 @@
from daft.execution import physical_plan
from daft.logical.builder import LogicalPlanBuilder
from daft.runners.partitioning import (
MaterializedResult,
PartitionCacheEntry,
PartitionT,
)
from daft.runners.pyrunner import PyMaterializedResult
from daft.table.micropartition import MicroPartition


class PhysicalPlanScheduler:
Expand Down Expand Up @@ -53,12 +48,6 @@ def __repr__(self) -> str:
def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan:
return physical_plan.materialize(self._scheduler.to_partition_tasks(psets))

def run(self, psets: dict[str, list[MaterializedResult[PartitionT]]]) -> Iterator[PyMaterializedResult]:
psets_mp = {part_id: [part.vpartition()._micropartition for part in parts] for part_id, parts in psets.items()}
return (
PyMaterializedResult(MicroPartition._from_pymicropartition(part)) for part in self._scheduler.run(psets_mp)
)


class AdaptivePhysicalPlanScheduler:
def __init__(self, scheduler: _AdaptivePhysicalPlanScheduler) -> None:
Expand Down
18 changes: 6 additions & 12 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,18 +165,12 @@ def run_iter(
adaptive_planner = builder.to_adaptive_physical_plan_scheduler(daft_execution_config)
while not adaptive_planner.is_done():
source_id, plan_scheduler = adaptive_planner.next()
if daft_execution_config.enable_native_executor:
logger.info("Using new executor")
results_gen = plan_scheduler.run(
{k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()}
)
else:
# don't store partition sets in variable to avoid reference
tasks = plan_scheduler.to_partition_tasks(
{k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()}
)
del plan_scheduler
results_gen = self._physical_plan_to_partitions(tasks)
# don't store partition sets in variable to avoid reference
tasks = plan_scheduler.to_partition_tasks(
{k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()}
)
del plan_scheduler
results_gen = self._physical_plan_to_partitions(tasks)
# if source_id is none that means this is the final stage
if source_id is None:
yield from results_gen
Expand Down
31 changes: 0 additions & 31 deletions src/daft-execution/Cargo.toml

This file was deleted.

119 changes: 0 additions & 119 deletions src/daft-execution/src/executor/local/local_executor.rs

This file was deleted.

31 changes: 0 additions & 31 deletions src/daft-execution/src/executor/local/local_partition_ref.rs

This file was deleted.

2 changes: 0 additions & 2 deletions src/daft-execution/src/executor/local/mod.rs

This file was deleted.

19 changes: 0 additions & 19 deletions src/daft-execution/src/executor/mod.rs

This file was deleted.

2 changes: 0 additions & 2 deletions src/daft-execution/src/executor/ray/mod.rs

This file was deleted.

55 changes: 0 additions & 55 deletions src/daft-execution/src/executor/ray/ray_executor.rs

This file was deleted.

Loading

0 comments on commit 12f92ff

Please sign in to comment.