Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] [New Executor] [1/N] Move physical plan scheduler to new crate, misc. refactorings + drive-bys for new executor #2339

Merged
merged 2 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 7 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ daft-micropartition = {path = "src/daft-micropartition", default-features = fals
daft-parquet = {path = "src/daft-parquet", default-features = false}
daft-plan = {path = "src/daft-plan", default-features = false}
daft-scan = {path = "src/daft-scan", default-features = false}
daft-scheduler = {path = "src/daft-scheduler", default-features = false}
daft-stats = {path = "src/daft-stats", default-features = false}
daft-table = {path = "src/daft-table", default-features = false}
lazy_static = {workspace = true}
Expand All @@ -24,16 +25,17 @@ python = [
"dep:pyo3",
"dep:pyo3-log",
"daft-core/python",
"daft-table/python",
"daft-csv/python",
"daft-dsl/python",
"daft-io/python",
"daft-plan/python",
"daft-parquet/python",
"daft-csv/python",
"daft-json/python",
"daft-micropartition/python",
"daft-parquet/python",
"daft-plan/python",
"daft-scan/python",
"daft-scheduler/python",
"daft-stats/python",
"daft-table/python",
"common-daft-config/python",
"common-system-info/python"
]
Expand Down Expand Up @@ -91,6 +93,7 @@ members = [
"src/daft-plan",
"src/daft-micropartition",
"src/daft-scan",
"src/daft-scheduler",
"src/daft-sketch"
]

Expand Down
111 changes: 97 additions & 14 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,16 @@ class ResourceRequest:
memory_bytes: int | None

def __init__(
self, num_cpus: float | None = None, num_gpus: float | None = None, memory_bytes: int | None = None
self,
num_cpus: float | None = None,
num_gpus: float | None = None,
memory_bytes: int | None = None,
): ...
@staticmethod
def max_resources(resource_requests: list[ResourceRequest]):
"""Take a field-wise max of the list of resource requests."""
...

def __add__(self, other: ResourceRequest) -> ResourceRequest: ...
def __repr__(self) -> str: ...
def __eq__(self, other: ResourceRequest) -> bool: ... # type: ignore[override]
Expand Down Expand Up @@ -253,29 +257,34 @@ class FileFormatConfig:
Create a Parquet file format config.
"""
...

@staticmethod
def from_csv_config(config: CsvSourceConfig) -> FileFormatConfig:
"""
Create a CSV file format config.
"""
...

@staticmethod
def from_json_config(config: JsonSourceConfig) -> FileFormatConfig:
"""
Create a JSON file format config.
"""
...

@staticmethod
def from_database_config(config: DatabaseSourceConfig) -> FileFormatConfig:
"""
Create a database file format config.
"""
...

def file_format(self) -> FileFormat:
"""
Get the file format for this config.
"""
...

def __eq__(self, other: FileFormatConfig) -> bool: ... # type: ignore[override]
def __ne__(self, other: FileFormatConfig) -> bool: ... # type: ignore[override]

Expand Down Expand Up @@ -396,16 +405,19 @@ class FileInfos:
Create from a Daft table with "path", "size", and "num_rows" columns.
"""
...

def extend(self, new_infos: FileInfos) -> FileInfos:
"""
Concatenate two FileInfos together.
"""
...

def __getitem__(self, idx: int) -> FileInfo: ...
def to_table(self) -> PyTable:
"""
Convert to a Daft table with "path", "size", and "num_rows" columns.
"""

def __len__(self) -> int: ...

class S3Config:
Expand Down Expand Up @@ -560,7 +572,12 @@ class IOConfig:
azure: AzureConfig
gcs: GCSConfig

def __init__(self, s3: S3Config | None = None, azure: AzureConfig | None = None, gcs: GCSConfig | None = None): ...
def __init__(
self,
s3: S3Config | None = None,
azure: AzureConfig | None = None,
gcs: GCSConfig | None = None,
): ...
@staticmethod
def from_json(input: str) -> IOConfig:
"""
Expand All @@ -569,7 +586,10 @@ class IOConfig:
...

def replace(
self, s3: S3Config | None = None, azure: AzureConfig | None = None, gcs: GCSConfig | None = None
self,
s3: S3Config | None = None,
azure: AzureConfig | None = None,
gcs: GCSConfig | None = None,
) -> IOConfig:
"""Replaces values if provided, returning a new IOConfig"""
...
Expand Down Expand Up @@ -606,12 +626,14 @@ class StorageConfig:
Create from a native storage config.
"""
...

@staticmethod
def python(config: PythonStorageConfig) -> StorageConfig:
"""
Create from a Python storage config.
"""
...

@property
def config(self) -> NativeStorageConfig | PythonStorageConfig: ...

Expand All @@ -625,16 +647,19 @@ class ScanTask:
Get number of rows that will be scanned by this ScanTask.
"""
...

def size_bytes(self) -> int:
"""
Get number of bytes that will be scanned by this ScanTask.
"""
...

def estimate_in_memory_size_bytes(self, cfg: PyDaftExecutionConfig) -> int:
"""
Estimate the In Memory Size of this ScanTask.
"""
...

@staticmethod
def catalog_scan_task(
file: str,
Expand All @@ -651,6 +676,7 @@ class ScanTask:
Create a Catalog Scan Task
"""
...

@staticmethod
def sql_scan_task(
url: str,
Expand All @@ -666,6 +692,7 @@ class ScanTask:
Create a SQL Scan Task
"""
...

@staticmethod
def python_factory_func_scan_task(
module: str,
Expand Down Expand Up @@ -712,7 +739,10 @@ class PartitionField:
field: PyField

def __init__(
self, field: PyField, source_field: PyField | None = None, transform: PartitionTransform | None = None
self,
field: PyField,
source_field: PyField | None = None,
transform: PartitionTransform | None = None,
) -> None: ...

class PartitionTransform:
Expand Down Expand Up @@ -897,7 +927,11 @@ class PyDataType:
@staticmethod
def embedding(data_type: PyDataType, size: int) -> PyDataType: ...
@staticmethod
def image(mode: ImageMode | None = None, height: int | None = None, width: int | None = None) -> PyDataType: ...
def image(
mode: ImageMode | None = None,
height: int | None = None,
width: int | None = None,
) -> PyDataType: ...
@staticmethod
def tensor(dtype: PyDataType, shape: tuple[int, ...] | None = None) -> PyDataType: ...
@staticmethod
Expand Down Expand Up @@ -1051,7 +1085,11 @@ class PyExpr:
def struct_get(self, name: str) -> PyExpr: ...
def map_get(self, key: PyExpr) -> PyExpr: ...
def url_download(
self, max_connections: int, raise_error_on_failure: bool, multi_thread: bool, config: IOConfig
self,
max_connections: int,
raise_error_on_failure: bool,
multi_thread: bool,
config: IOConfig,
) -> PyExpr: ...
def partitioning_days(self) -> PyExpr: ...
def partitioning_hours(self) -> PyExpr: ...
Expand Down Expand Up @@ -1206,11 +1244,25 @@ class PyTable:
def argsort(self, sort_keys: list[PyExpr], descending: list[bool]) -> PySeries: ...
def agg(self, to_agg: list[PyExpr], group_by: list[PyExpr]) -> PyTable: ...
def pivot(
self, group_by: list[PyExpr], pivot_column: PyExpr, values_column: PyExpr, names: list[str]
self,
group_by: list[PyExpr],
pivot_column: PyExpr,
values_column: PyExpr,
names: list[str],
) -> PyTable: ...
def hash_join(
self,
right: PyTable,
left_on: list[PyExpr],
right_on: list[PyExpr],
how: JoinType,
) -> PyTable: ...
def hash_join(self, right: PyTable, left_on: list[PyExpr], right_on: list[PyExpr], how: JoinType) -> PyTable: ...
def sort_merge_join(
self, right: PyTable, left_on: list[PyExpr], right_on: list[PyExpr], is_sorted: bool
self,
right: PyTable,
left_on: list[PyExpr],
right_on: list[PyExpr],
is_sorted: bool,
) -> PyTable: ...
def explode(self, to_explode: list[PyExpr]) -> PyTable: ...
def head(self, num: int) -> PyTable: ...
Expand Down Expand Up @@ -1268,17 +1320,33 @@ class PyMicroPartition:
def argsort(self, sort_keys: list[PyExpr], descending: list[bool]) -> PySeries: ...
def agg(self, to_agg: list[PyExpr], group_by: list[PyExpr]) -> PyMicroPartition: ...
def hash_join(
self, right: PyMicroPartition, left_on: list[PyExpr], right_on: list[PyExpr], how: JoinType
self,
right: PyMicroPartition,
left_on: list[PyExpr],
right_on: list[PyExpr],
how: JoinType,
) -> PyMicroPartition: ...
def pivot(
self, group_by: list[PyExpr], pivot_column: PyExpr, values_column: PyExpr, names: list[str]
self,
group_by: list[PyExpr],
pivot_column: PyExpr,
values_column: PyExpr,
names: list[str],
) -> PyMicroPartition: ...
def sort_merge_join(
self, right: PyMicroPartition, left_on: list[PyExpr], right_on: list[PyExpr], is_sorted: bool
self,
right: PyMicroPartition,
left_on: list[PyExpr],
right_on: list[PyExpr],
is_sorted: bool,
) -> PyMicroPartition: ...
def explode(self, to_explode: list[PyExpr]) -> PyMicroPartition: ...
def unpivot(
self, ids: list[PyExpr], values: list[PyExpr], variable_name: str, value_name: str
self,
ids: list[PyExpr],
values: list[PyExpr],
variable_name: str,
value_name: str,
) -> PyMicroPartition: ...
def head(self, num: int) -> PyMicroPartition: ...
def sample_by_fraction(self, fraction: float, with_replacement: bool, seed: int | None) -> PyMicroPartition: ...
Expand Down Expand Up @@ -1346,6 +1414,11 @@ class PhysicalPlanScheduler:
A work scheduler for physical query plans.
"""

@staticmethod
def from_logical_plan_builder(
logical_plan_builder: LogicalPlanBuilder,
cfg: PyDaftExecutionConfig,
) -> PhysicalPlanScheduler: ...
def num_partitions(self) -> int: ...
def repr_ascii(self, simple: bool) -> str: ...
def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.InProgressPhysicalPlan: ...
Expand All @@ -1354,6 +1427,12 @@ class AdaptivePhysicalPlanScheduler:
"""
An adaptive Physical Plan Scheduler.
"""

@staticmethod
def from_logical_plan_builder(
logical_plan_builder: LogicalPlanBuilder,
cfg: PyDaftExecutionConfig,
) -> AdaptivePhysicalPlanScheduler: ...
def next(self) -> tuple[int | None, PhysicalPlanScheduler]: ...
def is_done(self) -> bool: ...
# Todo use in memory info here instead
Expand Down Expand Up @@ -1394,7 +1473,11 @@ class LogicalPlanBuilder:
def limit(self, limit: int, eager: bool) -> LogicalPlanBuilder: ...
def explode(self, to_explode: list[PyExpr]) -> LogicalPlanBuilder: ...
def unpivot(
self, ids: list[PyExpr], values: list[PyExpr], variable_name: str, value_name: str
self,
ids: list[PyExpr],
values: list[PyExpr],
variable_name: str,
value_name: str,
) -> LogicalPlanBuilder: ...
def sort(self, sort_by: list[PyExpr], descending: list[bool]) -> LogicalPlanBuilder: ...
def hash_repartition(
Expand Down
Loading
Loading