Skip to content

Commit

Permalink
[FEAT] [New Executor] [1/N] Move physical plan scheduler to new crate…
Browse files Browse the repository at this point in the history
…, misc. refactorings + drive-bys for new executor (#2339)

This PR moves the physical plan scheduler to a new `daft-scheduling`
crate in order to avoid cycle dependencies for the new executor, which
depends on `daft-plan`. New dependency tree will be:
```
daft-plan   <--  daft-execution
       ^            ^
        \          /
      daft-scheduling 
```

This PR also contains misc. refactorings and drive-bys for follow-up new
executor PRs, and will be the bottom of a stack of PRs.
  • Loading branch information
clarkzinzow authored Jun 5, 2024
1 parent 7d613a6 commit 8a789be
Show file tree
Hide file tree
Showing 20 changed files with 1,068 additions and 777 deletions.
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 7 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ daft-micropartition = {path = "src/daft-micropartition", default-features = fals
daft-parquet = {path = "src/daft-parquet", default-features = false}
daft-plan = {path = "src/daft-plan", default-features = false}
daft-scan = {path = "src/daft-scan", default-features = false}
daft-scheduler = {path = "src/daft-scheduler", default-features = false}
daft-stats = {path = "src/daft-stats", default-features = false}
daft-table = {path = "src/daft-table", default-features = false}
lazy_static = {workspace = true}
Expand All @@ -24,16 +25,17 @@ python = [
"dep:pyo3",
"dep:pyo3-log",
"daft-core/python",
"daft-table/python",
"daft-csv/python",
"daft-dsl/python",
"daft-io/python",
"daft-plan/python",
"daft-parquet/python",
"daft-csv/python",
"daft-json/python",
"daft-micropartition/python",
"daft-parquet/python",
"daft-plan/python",
"daft-scan/python",
"daft-scheduler/python",
"daft-stats/python",
"daft-table/python",
"common-daft-config/python",
"common-system-info/python"
]
Expand Down Expand Up @@ -91,6 +93,7 @@ members = [
"src/daft-plan",
"src/daft-micropartition",
"src/daft-scan",
"src/daft-scheduler",
"src/daft-sketch"
]

Expand Down
111 changes: 97 additions & 14 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,16 @@ class ResourceRequest:
memory_bytes: int | None

def __init__(
self, num_cpus: float | None = None, num_gpus: float | None = None, memory_bytes: int | None = None
self,
num_cpus: float | None = None,
num_gpus: float | None = None,
memory_bytes: int | None = None,
): ...
@staticmethod
def max_resources(resource_requests: list[ResourceRequest]):
"""Take a field-wise max of the list of resource requests."""
...

def __add__(self, other: ResourceRequest) -> ResourceRequest: ...
def __repr__(self) -> str: ...
def __eq__(self, other: ResourceRequest) -> bool: ... # type: ignore[override]
Expand Down Expand Up @@ -253,29 +257,34 @@ class FileFormatConfig:
Create a Parquet file format config.
"""
...

@staticmethod
def from_csv_config(config: CsvSourceConfig) -> FileFormatConfig:
"""
Create a CSV file format config.
"""
...

@staticmethod
def from_json_config(config: JsonSourceConfig) -> FileFormatConfig:
"""
Create a JSON file format config.
"""
...

@staticmethod
def from_database_config(config: DatabaseSourceConfig) -> FileFormatConfig:
"""
Create a database file format config.
"""
...

def file_format(self) -> FileFormat:
"""
Get the file format for this config.
"""
...

def __eq__(self, other: FileFormatConfig) -> bool: ... # type: ignore[override]
def __ne__(self, other: FileFormatConfig) -> bool: ... # type: ignore[override]

Expand Down Expand Up @@ -396,16 +405,19 @@ class FileInfos:
Create from a Daft table with "path", "size", and "num_rows" columns.
"""
...

def extend(self, new_infos: FileInfos) -> FileInfos:
"""
Concatenate two FileInfos together.
"""
...

def __getitem__(self, idx: int) -> FileInfo: ...
def to_table(self) -> PyTable:
"""
Convert to a Daft table with "path", "size", and "num_rows" columns.
"""

def __len__(self) -> int: ...

class S3Config:
Expand Down Expand Up @@ -560,7 +572,12 @@ class IOConfig:
azure: AzureConfig
gcs: GCSConfig

def __init__(self, s3: S3Config | None = None, azure: AzureConfig | None = None, gcs: GCSConfig | None = None): ...
def __init__(
self,
s3: S3Config | None = None,
azure: AzureConfig | None = None,
gcs: GCSConfig | None = None,
): ...
@staticmethod
def from_json(input: str) -> IOConfig:
"""
Expand All @@ -569,7 +586,10 @@ class IOConfig:
...

def replace(
self, s3: S3Config | None = None, azure: AzureConfig | None = None, gcs: GCSConfig | None = None
self,
s3: S3Config | None = None,
azure: AzureConfig | None = None,
gcs: GCSConfig | None = None,
) -> IOConfig:
"""Replaces values if provided, returning a new IOConfig"""
...
Expand Down Expand Up @@ -606,12 +626,14 @@ class StorageConfig:
Create from a native storage config.
"""
...

@staticmethod
def python(config: PythonStorageConfig) -> StorageConfig:
"""
Create from a Python storage config.
"""
...

@property
def config(self) -> NativeStorageConfig | PythonStorageConfig: ...

Expand All @@ -625,16 +647,19 @@ class ScanTask:
Get number of rows that will be scanned by this ScanTask.
"""
...

def size_bytes(self) -> int:
"""
Get number of bytes that will be scanned by this ScanTask.
"""
...

def estimate_in_memory_size_bytes(self, cfg: PyDaftExecutionConfig) -> int:
"""
Estimate the In Memory Size of this ScanTask.
"""
...

@staticmethod
def catalog_scan_task(
file: str,
Expand All @@ -651,6 +676,7 @@ class ScanTask:
Create a Catalog Scan Task
"""
...

@staticmethod
def sql_scan_task(
url: str,
Expand All @@ -666,6 +692,7 @@ class ScanTask:
Create a SQL Scan Task
"""
...

@staticmethod
def python_factory_func_scan_task(
module: str,
Expand Down Expand Up @@ -712,7 +739,10 @@ class PartitionField:
field: PyField

def __init__(
self, field: PyField, source_field: PyField | None = None, transform: PartitionTransform | None = None
self,
field: PyField,
source_field: PyField | None = None,
transform: PartitionTransform | None = None,
) -> None: ...

class PartitionTransform:
Expand Down Expand Up @@ -897,7 +927,11 @@ class PyDataType:
@staticmethod
def embedding(data_type: PyDataType, size: int) -> PyDataType: ...
@staticmethod
def image(mode: ImageMode | None = None, height: int | None = None, width: int | None = None) -> PyDataType: ...
def image(
mode: ImageMode | None = None,
height: int | None = None,
width: int | None = None,
) -> PyDataType: ...
@staticmethod
def tensor(dtype: PyDataType, shape: tuple[int, ...] | None = None) -> PyDataType: ...
@staticmethod
Expand Down Expand Up @@ -1051,7 +1085,11 @@ class PyExpr:
def struct_get(self, name: str) -> PyExpr: ...
def map_get(self, key: PyExpr) -> PyExpr: ...
def url_download(
self, max_connections: int, raise_error_on_failure: bool, multi_thread: bool, config: IOConfig
self,
max_connections: int,
raise_error_on_failure: bool,
multi_thread: bool,
config: IOConfig,
) -> PyExpr: ...
def partitioning_days(self) -> PyExpr: ...
def partitioning_hours(self) -> PyExpr: ...
Expand Down Expand Up @@ -1206,11 +1244,25 @@ class PyTable:
def argsort(self, sort_keys: list[PyExpr], descending: list[bool]) -> PySeries: ...
def agg(self, to_agg: list[PyExpr], group_by: list[PyExpr]) -> PyTable: ...
def pivot(
self, group_by: list[PyExpr], pivot_column: PyExpr, values_column: PyExpr, names: list[str]
self,
group_by: list[PyExpr],
pivot_column: PyExpr,
values_column: PyExpr,
names: list[str],
) -> PyTable: ...
def hash_join(
self,
right: PyTable,
left_on: list[PyExpr],
right_on: list[PyExpr],
how: JoinType,
) -> PyTable: ...
def hash_join(self, right: PyTable, left_on: list[PyExpr], right_on: list[PyExpr], how: JoinType) -> PyTable: ...
def sort_merge_join(
self, right: PyTable, left_on: list[PyExpr], right_on: list[PyExpr], is_sorted: bool
self,
right: PyTable,
left_on: list[PyExpr],
right_on: list[PyExpr],
is_sorted: bool,
) -> PyTable: ...
def explode(self, to_explode: list[PyExpr]) -> PyTable: ...
def head(self, num: int) -> PyTable: ...
Expand Down Expand Up @@ -1268,17 +1320,33 @@ class PyMicroPartition:
def argsort(self, sort_keys: list[PyExpr], descending: list[bool]) -> PySeries: ...
def agg(self, to_agg: list[PyExpr], group_by: list[PyExpr]) -> PyMicroPartition: ...
def hash_join(
self, right: PyMicroPartition, left_on: list[PyExpr], right_on: list[PyExpr], how: JoinType
self,
right: PyMicroPartition,
left_on: list[PyExpr],
right_on: list[PyExpr],
how: JoinType,
) -> PyMicroPartition: ...
def pivot(
self, group_by: list[PyExpr], pivot_column: PyExpr, values_column: PyExpr, names: list[str]
self,
group_by: list[PyExpr],
pivot_column: PyExpr,
values_column: PyExpr,
names: list[str],
) -> PyMicroPartition: ...
def sort_merge_join(
self, right: PyMicroPartition, left_on: list[PyExpr], right_on: list[PyExpr], is_sorted: bool
self,
right: PyMicroPartition,
left_on: list[PyExpr],
right_on: list[PyExpr],
is_sorted: bool,
) -> PyMicroPartition: ...
def explode(self, to_explode: list[PyExpr]) -> PyMicroPartition: ...
def unpivot(
self, ids: list[PyExpr], values: list[PyExpr], variable_name: str, value_name: str
self,
ids: list[PyExpr],
values: list[PyExpr],
variable_name: str,
value_name: str,
) -> PyMicroPartition: ...
def head(self, num: int) -> PyMicroPartition: ...
def sample_by_fraction(self, fraction: float, with_replacement: bool, seed: int | None) -> PyMicroPartition: ...
Expand Down Expand Up @@ -1346,6 +1414,11 @@ class PhysicalPlanScheduler:
A work scheduler for physical query plans.
"""

@staticmethod
def from_logical_plan_builder(
logical_plan_builder: LogicalPlanBuilder,
cfg: PyDaftExecutionConfig,
) -> PhysicalPlanScheduler: ...
def num_partitions(self) -> int: ...
def repr_ascii(self, simple: bool) -> str: ...
def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.InProgressPhysicalPlan: ...
Expand All @@ -1354,6 +1427,12 @@ class AdaptivePhysicalPlanScheduler:
"""
An adaptive Physical Plan Scheduler.
"""

@staticmethod
def from_logical_plan_builder(
logical_plan_builder: LogicalPlanBuilder,
cfg: PyDaftExecutionConfig,
) -> AdaptivePhysicalPlanScheduler: ...
def next(self) -> tuple[int | None, PhysicalPlanScheduler]: ...
def is_done(self) -> bool: ...
# Todo use in memory info here instead
Expand Down Expand Up @@ -1394,7 +1473,11 @@ class LogicalPlanBuilder:
def limit(self, limit: int, eager: bool) -> LogicalPlanBuilder: ...
def explode(self, to_explode: list[PyExpr]) -> LogicalPlanBuilder: ...
def unpivot(
self, ids: list[PyExpr], values: list[PyExpr], variable_name: str, value_name: str
self,
ids: list[PyExpr],
values: list[PyExpr],
variable_name: str,
value_name: str,
) -> LogicalPlanBuilder: ...
def sort(self, sort_by: list[PyExpr], descending: list[bool]) -> LogicalPlanBuilder: ...
def hash_repartition(
Expand Down
Loading

0 comments on commit 8a789be

Please sign in to comment.