Skip to content
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
3473ec3
Test on CI
srilman Jun 3, 2025
62b118c
Fix for testing
srilman Jun 5, 2025
42dc29e
Merge branch 'main' into slade/parallel-stateless-udfs
srilman Jun 19, 2025
86fa79c
Figure out proto
srilman Jun 20, 2025
951f3c9
Compiling
srilman Jun 23, 2025
835d7d1
More issues
srilman Jun 23, 2025
6b36b46
Merge branch 'main' into slade/parallel-stateless-udfs
srilman Jun 23, 2025
7578bac
Fix merge conflicts
srilman Jun 23, 2025
7a59ad1
Undo rename
srilman Jun 23, 2025
fc33728
Override v1 proto instead
srilman Jun 23, 2025
a633ee0
Should be failing doctests
srilman Jun 23, 2025
efcf890
Fix rust tests
srilman Jun 23, 2025
1405f9f
Merge with main
srilman Jun 23, 2025
a8074aa
Merge branch 'main' into slade/parallel-stateless-udfs
srilman Jun 24, 2025
3d82a52
Merge branch 'main' into slade/parallel-stateless-udfs
srilman Jun 27, 2025
2291acf
Passing tests
srilman Jun 27, 2025
b9326dd
Try on CI again
srilman Jul 1, 2025
c6d2dba
Found issue
srilman Jul 1, 2025
479f62a
Forgot to undo old change
srilman Jul 2, 2025
81480a3
Fixed the passing issue
srilman Jul 2, 2025
e2b7e7a
Merge branch 'main' into slade/parallel-stateless-udfs
srilman Jul 2, 2025
930ed82
First
srilman Jul 11, 2025
eb55002
Merge branch 'main' into slade/parallel-stateless-udfs
srilman Jul 11, 2025
4c865a4
Addressed comments and CI
srilman Jul 18, 2025
5d6d4b8
Addressed comments and CI
srilman Jul 18, 2025
a27063b
Fix tests
srilman Jul 18, 2025
9f2f308
Fix the last issue
srilman Jul 18, 2025
1abece9
Come on
srilman Jul 18, 2025
bcaf7ec
Fix last
srilman Jul 18, 2025
497841f
One more change
srilman Jul 21, 2025
abb3a99
Another Cleanup
srilman Jul 21, 2025
053c3d2
Ok
srilman Jul 21, 2025
2d19f45
Sink Names
srilman Jul 21, 2025
eb9d506
Save temp
srilman Jul 24, 2025
8b38561
Merge branch 'main' into slade/pbar-logging
srilman Jul 28, 2025
6307c2f
Fix CI
srilman Jul 28, 2025
3683aaa
Testing
srilman Jul 28, 2025
b61b550
Fix the flotilla issue
srilman Jul 28, 2025
f87bf03
Fix thread logging but it sucks
srilman Aug 4, 2025
e414d5f
Merge branch 'main' into slade/pbar-logging
srilman Aug 4, 2025
7324155
Merge branch 'main' into slade/pbar-logging
srilman Aug 7, 2025
032d583
Addressed comments
srilman Aug 7, 2025
c26453a
Merge branch 'main' into slade/pbar-logging
srilman Aug 12, 2025
a4e2f40
Fix the segfault
srilman Aug 12, 2025
040920a
Yeahhhhh
srilman Aug 13, 2025
1ef2414
Not sure why but OK
srilman Aug 14, 2025
729f8c2
Removed the threaded aspects
srilman Aug 15, 2025
79774d3
Address comments
srilman Aug 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ common-daft-config = {path = "src/common/daft-config", default-features = false}
common-display = {path = "src/common/display", default-features = false}
common-file-formats = {path = "src/common/file-formats", default-features = false}
common-hashable-float-wrapper = {path = "src/common/hashable-float-wrapper", default-features = false}
common-logging = {path = "src/common/logging", default-features = false}
common-metrics = {path = "src/common/metrics", default-features = false}
common-partitioning = {path = "src/common/partitioning", default-features = false}
common-resource-request = {path = "src/common/resource-request", default-features = false}
Expand Down Expand Up @@ -217,6 +218,7 @@ members = [

[workspace.dependencies]
approx = "0.5.1"
arc-swap = "1.7.1"
async-compat = "0.2.3"
async-compression = {version = "0.4.12", features = [
"tokio",
Expand Down
6 changes: 4 additions & 2 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1837,14 +1837,16 @@ class NativeExecutor:
psets: dict[str, list[PyMicroPartition]],
daft_execution_config: PyDaftExecutionConfig,
results_buffer_size: int | None,
flotilla: bool = False,
) -> Iterator[PyMicroPartition]: ...
def run_async(
self,
plan: LocalPhysicalPlan,
psets: dict[str, list[PyMicroPartition]],
daft_execution_config: PyDaftExecutionConfig,
results_buffer_size: int | None,
context: dict[str, str] | None,
results_buffer_size: int | None = None,
flotilla: bool = False,
context: dict[str, str] | None = None,
) -> AsyncIterator[PyMicroPartition]: ...
@staticmethod
def repr_ascii(builder: LogicalPlanBuilder, daft_execution_config: PyDaftExecutionConfig, simple: bool) -> str: ...
Expand Down
31 changes: 26 additions & 5 deletions daft/execution/udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import tempfile
from multiprocessing import resource_tracker, shared_memory
from multiprocessing.connection import Listener
from typing import TYPE_CHECKING
from typing import IO, TYPE_CHECKING, cast

from daft.errors import UDFException
from daft.expressions import Expression, ExpressionsProjection
Expand All @@ -21,9 +21,11 @@
logger = logging.getLogger(__name__)

_ENTER = "__ENTER__"
_READY = "ready"
_SUCCESS = "success"
_UDF_ERROR = "udf_error"
_ERROR = "error"
_OUTPUT_DIVIDER = b"_DAFT_OUTPUT_DIVIDER_\n"
_SENTINEL = ("__EXIT__", 0)


Expand Down Expand Up @@ -66,7 +68,11 @@ def __init__(self, project_expr: PyExpr, passthrough_exprs: list[PyExpr]) -> Non
"daft.execution.udf_worker",
self.socket_path,
secret.hex(),
]
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
# Python auto-buffers stdout by default, so disable
env={"PYTHONUNBUFFERED": "1"},
)

# Initialize communication
Expand All @@ -79,8 +85,22 @@ def __init__(self, project_expr: PyExpr, passthrough_exprs: list[PyExpr]) -> Non
)
expr_projection_bytes = pickle.dumps(expr_projection)
self.handle_conn.send((_ENTER, expr_projection_bytes))

def eval_input(self, input: PyMicroPartition) -> PyMicroPartition:
response = self.handle_conn.recv()
if response != _READY:
raise RuntimeError(f"Expected '{_READY}' but got {response}")

def trace_output(self) -> list[str]:
lines = []
while True:
line = cast("IO[bytes]", self.process.stdout).readline()
# UDF process is expected to return the divider
# after initialization and every iteration
if line == b"" or line == _OUTPUT_DIVIDER or self.process.poll() is not None:
break
lines.append(line.decode().rstrip())
return lines

def eval_input(self, input: PyMicroPartition) -> tuple[PyMicroPartition, list[str]]:
if self.process.poll() is not None:
raise RuntimeError("UDF process has terminated")

Expand All @@ -89,6 +109,7 @@ def eval_input(self, input: PyMicroPartition) -> PyMicroPartition:
self.handle_conn.send((shm_name, shm_size))

response = self.handle_conn.recv()
stdout = self.trace_output()
if response[0] == _UDF_ERROR:
base_exc: Exception = pickle.loads(response[3])
if sys.version_info >= (3, 11):
Expand All @@ -100,7 +121,7 @@ def eval_input(self, input: PyMicroPartition) -> PyMicroPartition:
out_name, out_size = response[1], response[2]
output_bytes = self.transport.read_and_release(out_name, out_size)
deserialized = MicroPartition.from_ipc_stream(output_bytes)
return deserialized._micropartition
return (deserialized._micropartition, stdout)
else:
raise RuntimeError(f"Unknown response from actor: {response}")

Expand Down
22 changes: 20 additions & 2 deletions daft/execution/udf_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@
from traceback import TracebackException

from daft.errors import UDFException
from daft.execution.udf import _ENTER, _ERROR, _SENTINEL, _SUCCESS, _UDF_ERROR, SharedMemoryTransport
from daft.execution.udf import (
_ENTER,
_ERROR,
_OUTPUT_DIVIDER,
_READY,
_SENTINEL,
_SUCCESS,
_UDF_ERROR,
SharedMemoryTransport,
)
from daft.expressions.expressions import ExpressionsProjection
from daft.recordbatch.micropartition import MicroPartition

Expand All @@ -28,6 +37,11 @@ def udf_event_loop(
try:
initialized_projection = ExpressionsProjection([e._initialize_udfs() for e in uninitialized_projection])

print(_OUTPUT_DIVIDER.decode(), end="", file=sys.stderr, flush=True)
sys.stdout.flush()
sys.stderr.flush()
conn.send(_READY)

while True:
name, size = conn.recv()
if (name, size) == _SENTINEL:
Expand All @@ -36,9 +50,13 @@ def udf_event_loop(
input_bytes = transport.read_and_release(name, size)
input = MicroPartition.from_ipc_stream(input_bytes)
evaluated = input.eval_expression_list(initialized_projection)
output_bytes = evaluated.to_ipc_stream()

output_bytes = evaluated.to_ipc_stream()
out_name, out_size = transport.write_and_close(output_bytes)

print(_OUTPUT_DIVIDER.decode(), end="", file=sys.stderr, flush=True)
sys.stdout.flush()
sys.stderr.flush()
conn.send((_SUCCESS, out_name, out_size))
except UDFException as e:
exc = e.__cause__
Expand Down
2 changes: 1 addition & 1 deletion daft/runners/flotilla.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async def run_plan(

metas = []
native_executor = NativeExecutor()
async for partition in native_executor.run_async(plan, psets_mp, config, None, context):
async for partition in native_executor.run_async(plan, psets_mp, config, None, True, context):
if partition is None:
break
mp = MicroPartition._from_pymicropartition(partition)
Expand Down
11 changes: 11 additions & 0 deletions src/common/logging/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[dependencies]
arc-swap = {workspace = true}
log = {workspace = true}

[lints]
workspace = true

[package]
edition = {workspace = true}
name = "common-logging"
version = {workspace = true}
83 changes: 83 additions & 0 deletions src/common/logging/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use std::sync::{Arc, LazyLock};

use arc_swap::ArcSwap;
use log::Log;

/// A logger that can be internally modified at runtime.
/// Usually, loggers can only be initialized once, but this container can
/// swap out the internal logger at runtime atomically.
pub struct SwappableLogger {
base: ArcSwap<Box<dyn Log + Send + Sync + 'static>>,
temp: ArcSwap<Option<Box<dyn Log + Send + Sync + 'static>>>,
}

impl SwappableLogger {
pub fn new(logger: Box<dyn Log + Send + Sync + 'static>) -> Self {
Self {
base: ArcSwap::new(Arc::new(logger)),
temp: ArcSwap::new(Arc::new(None)),
}
}

pub fn set_base_logger(&self, logger: Box<dyn Log + Send + Sync + 'static>) {
self.base.store(Arc::new(logger));
}

pub fn get_base_logger(&self) -> Arc<Box<dyn Log + Send + Sync + 'static>> {
self.base.load().to_owned()
}

pub fn set_temp_logger(&self, logger: Box<dyn Log + Send + Sync + 'static>) {
self.temp.store(Arc::new(Some(logger)));
}

pub fn reset_temp_logger(&self) {
self.temp.store(Arc::new(None));
}
}

impl Log for SwappableLogger {
fn enabled(&self, metadata: &log::Metadata) -> bool {
if let Some(temp) = self.temp.load().as_ref() {
temp.enabled(metadata)
} else {
self.base.load().enabled(metadata)
}
}

fn log(&self, record: &log::Record) {
if let Some(temp) = self.temp.load().as_ref() {
temp.log(record);
} else {
self.base.load().log(record);
}
}

fn flush(&self) {
if let Some(temp) = self.temp.load().as_ref() {
temp.flush();
} else {
self.base.load().flush();
}
}
}

/// A Noop logger that does nothing.
/// Used for initialization purposes only, should never actually be used.
struct NoopLogger;

impl Log for NoopLogger {
fn enabled(&self, _metadata: &log::Metadata) -> bool {
false
}

fn log(&self, _record: &log::Record) {}

fn flush(&self) {}
}

/// The global logger that can be swapped out at runtime.
/// This is initialized to a NoopLogger to avoid any logging during initialization.
/// It can be swapped out with a real logger using `set_inner_logger`.
pub static GLOBAL_LOGGER: LazyLock<Arc<SwappableLogger>> =
LazyLock::new(|| Arc::new(SwappableLogger::new(Box::new(NoopLogger))));
1 change: 0 additions & 1 deletion src/daft-dsl/src/functions/python/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ impl LegacyPythonUDF {
MaybeInitializedUDF::Initialized(func) => func.clone().unwrap().clone_ref(py),
MaybeInitializedUDF::Uninitialized { inner, init_args } => {
// TODO(Kevin): warn user if initialization is taking too long and ask them to use actor pool UDFs

py_udf_initialize(py, inner.clone().unwrap(), init_args.clone().unwrap())?
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/daft-io/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl Drop for IOStatsContext {
let num_puts = self.load_put_requests();
let mean_get_size = (bytes_read as f64) / (num_gets as f64);
let mean_put_size = (bytes_uploaded as f64) / (num_puts as f64);
log::info!(
log::debug!(
"IOStatsContext: {}, Gets: {}, Heads: {}, Lists: {}, BytesRead: {}, AvgGetSize: {}, BytesUploaded: {}, AvgPutSize: {}",
self.name,
num_gets,
Expand Down
3 changes: 3 additions & 0 deletions src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
[dependencies]
arc-swap = {workspace = true}
async-trait = {workspace = true}
capitalize = "*"
common-daft-config = {path = "../common/daft-config", default-features = false}
common-display = {path = "../common/display", default-features = false}
common-error = {path = "../common/error", default-features = false}
common-file-formats = {path = "../common/file-formats", default-features = false}
common-logging = {path = "../common/logging", default-features = false}
common-metrics = {path = "../common/metrics", default-features = false}
common-py-serde = {path = "../common/py-serde", default-features = false}
common-resource-request = {path = "../common/resource-request", default-features = false}
common-runtime = {path = "../common/runtime", default-features = false}
common-scan-info = {path = "../common/scan-info", default-features = false}
common-system-info = {path = "../common/system-info", default-features = false}
common-tracing = {path = "../common/tracing", default-features = false}
console = "*"
daft-core = {path = "../daft-core", default-features = false}
daft-csv = {path = "../daft-csv", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
Expand Down
Loading
Loading