Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .github/workflows/daft-profiling.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,10 @@ jobs:
run: python benchmarking/tpch/data_generation.py --scale_factor=${{ env.TPCH_SCALE_FACTOR }} --num_parts=${{ env.TPCH_NUM_PARTS }} --generate_parquet

- name: Run Profiling on TPCH Benchmark
env:
DAFT_DEVELOPER_USE_THREAD_POOL: '0'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used by pyrunner only

run: |
py-spy record --native --function -o tpch-${{github.run_id}}.txt -f speedscope -- python benchmarking/tpch/__main__.py --scale_factor=${{ env.TPCH_SCALE_FACTOR }} --num_parts=${{ env.TPCH_NUM_PARTS }} --skip_warmup --skip_questions=11,12,13,14,15,16,17,18,19,20,21,22 || true

- name: Run GIL Profiling on TPCH Benchmark
env:
DAFT_DEVELOPER_USE_THREAD_POOL: '0'
run: |
py-spy record --native --function --gil -o tpch-gil-${{github.run_id}}.txt -f speedscope -- python benchmarking/tpch/__main__.py --scale_factor=${{ env.TPCH_SCALE_FACTOR }} --num_parts=${{ env.TPCH_NUM_PARTS }} --skip_warmup --skip_questions=11,12,13,14,15,16,17,18,19,20,21,22 || true

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/property-based-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
fail-fast: false
matrix:
python-version: ['3.9']
daft_runner: [py]
daft_runner: [ray, native]
steps:
- uses: actions/checkout@v4
- uses: moonrepo/setup-rust@v1
Expand Down
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion benchmarking/tpcds/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def run_benchmarks(

if runner == "ray":
return run_query_on_ray(run_args)
elif runner == "py" or runner == "native":
elif runner == "native":
return run_query_on_local(run_args)
else:
typing.assert_never(runner)
Expand Down
4 changes: 2 additions & 2 deletions benchmarking/tpch/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,13 @@ def get_daft_version() -> str:
return daft.get_version()


def get_daft_benchmark_runner_name() -> Literal["ray"] | Literal["py"] | Literal["native"]:
def get_daft_benchmark_runner_name() -> Literal["ray"] | Literal["native"]:
"""Test utility that checks the environment variable for the runner that is being used for the benchmarking."""
name = os.getenv("DAFT_RUNNER")
assert name is not None, "Tests must be run with $DAFT_RUNNER env var"
name = name.lower()

assert name in {"ray", "py", "native"}, f"Runner name not recognized: {name}"
assert name in {"ray", "native"}, f"Runner name not recognized: {name}"
return name


Expand Down
29 changes: 0 additions & 29 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from daft.daft import IOConfig, PyDaftContext, PyDaftExecutionConfig, PyDaftPlanningConfig
from daft.daft import get_context as _get_context
from daft.daft import set_runner_native as _set_runner_native
from daft.daft import set_runner_py as _set_runner_py
from daft.daft import set_runner_ray as _set_runner_ray

if TYPE_CHECKING:
Expand Down Expand Up @@ -94,31 +93,6 @@ def set_runner_ray(
return DaftContext._from_native(py_ctx)


def set_runner_py(use_thread_pool: bool | None = None, num_threads: int | None = None) -> DaftContext:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of removing, would it be better to have it return an error?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be helpful but we also have noted that deprecation warning for a while now so it's ok to remove as well. I'll leave it to your discretion

"""Configure Daft to execute dataframes in the local Python interpreter.

Args:
use_thread_pool: If True, uses a thread pool for parallel execution.
If False, runs single-threaded. If None, uses system default.

Returns:
DaftContext: Updated Daft execution context configured for local Python.

Note:
Can also be configured via environment variable: DAFT_RUNNER=py

Deprecated:
This execution mode is deprecated. Use set_runner_native() instead for
improved local performance with native multi-threading.
"""
py_ctx = _set_runner_py(
use_thread_pool=use_thread_pool,
num_threads=num_threads,
)

return DaftContext._from_native(py_ctx)


def set_runner_native(num_threads: int | None = None) -> DaftContext:
"""Configure Daft to execute dataframes using native multi-threaded processing.

Expand Down Expand Up @@ -204,7 +178,6 @@ def set_execution_config(
high_cardinality_aggregation_threshold: float | None = None,
read_sql_partition_size_bytes: int | None = None,
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
default_morsel_size: int | None = None,
shuffle_algorithm: str | None = None,
pre_shuffle_merge_threshold: int | None = None,
Expand Down Expand Up @@ -251,7 +224,6 @@ def set_execution_config(
high_cardinality_aggregation_threshold: Threshold selectivity for performing high cardinality aggregations on the Native Runner. Defaults to 0.8.
read_sql_partition_size_bytes: Target size of partition when reading from SQL databases. Defaults to 512MB
enable_aqe: Enables Adaptive Query Execution, Defaults to False
enable_native_executor: Enables the native executor, Defaults to False
default_morsel_size: Default size of morsels used for the new local executor. Defaults to 131072 rows.
shuffle_algorithm: The shuffle algorithm to use. Defaults to "auto", which will let Daft determine the algorithm. Options are "map_reduce" and "pre_shuffle_merge".
pre_shuffle_merge_threshold: Memory threshold in bytes for pre-shuffle merge. Defaults to 1GB
Expand Down Expand Up @@ -284,7 +256,6 @@ def set_execution_config(
high_cardinality_aggregation_threshold=high_cardinality_aggregation_threshold,
read_sql_partition_size_bytes=read_sql_partition_size_bytes,
enable_aqe=enable_aqe,
enable_native_executor=enable_native_executor,
default_morsel_size=default_morsel_size,
shuffle_algorithm=shuffle_algorithm,
flight_shuffle_dirs=flight_shuffle_dirs,
Expand Down
4 changes: 0 additions & 4 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1831,7 +1831,6 @@ class PyDaftExecutionConfig:
high_cardinality_aggregation_threshold: float | None = None,
read_sql_partition_size_bytes: int | None = None,
enable_aqe: bool | None = None,
enable_native_executor: bool | None = None,
default_morsel_size: int | None = None,
enable_ray_tracing: bool | None = None,
shuffle_algorithm: str | None = None,
Expand Down Expand Up @@ -1877,8 +1876,6 @@ class PyDaftExecutionConfig:
@property
def enable_aqe(self) -> bool: ...
@property
def enable_native_executor(self) -> bool: ...
@property
def default_morsel_size(self) -> int: ...
@property
def shuffle_algorithm(self) -> str: ...
Expand Down Expand Up @@ -1919,7 +1916,6 @@ def set_runner_ray(
force_client_mode: bool = False,
) -> PyDaftContext: ...
def set_runner_native(num_threads: int | None = None) -> PyDaftContext: ...
def set_runner_py(use_thread_pool: bool | None = None, num_threads: int | None = None) -> PyDaftContext: ...
def get_context() -> PyDaftContext: ...
def build_type() -> str: ...
def version() -> str: ...
Expand Down
Loading
Loading