Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 178 additions & 105 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ version = "51.0.0"
ahash = { version = "0.8", default-features = false, features = [
"runtime-rng",
] }
apache-avro = { version = "0.20", default-features = false }
apache-avro = { version = "0.21", default-features = false }
arrow = { version = "57.0.0", features = [
"prettyprint",
"chrono-tz",
Expand All @@ -105,7 +105,7 @@ arrow-ipc = { version = "57.0.0", default-features = false, features = [
arrow-ord = { version = "57.0.0", default-features = false }
arrow-schema = { version = "57.0.0", default-features = false }
async-trait = "0.1.89"
bigdecimal = "0.4.8"
bigdecimal = "0.4.9"
bytes = "1.10"
chrono = { version = "0.4.42", default-features = false }
criterion = "0.7"
Expand Down Expand Up @@ -158,6 +158,7 @@ hex = { version = "0.4.3" }
indexmap = "2.12.0"
insta = { version = "1.43.2", features = ["glob", "filters"] }
itertools = "0.14"
liblzma = { version = "0.4.4", features = ["static"] }
log = "^0.4"
num-traits = { version = "0.2" }
object_store = { version = "0.12.4", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ sql = ["sqlparser"]

[dependencies]
ahash = { workspace = true }
apache-avro = { version = "0.20", default-features = false, features = [
apache-avro = { workspace = true, features = [
"bzip",
"snappy",
"xz",
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ array_expressions = ["nested_expressions"]
avro = ["datafusion-common/avro", "datafusion-datasource-avro"]
backtrace = ["datafusion-common/backtrace"]
compression = [
"xz2",
"liblzma",
"bzip2",
"flate2",
"zstd",
Expand Down Expand Up @@ -146,6 +146,7 @@ datafusion-sql = { workspace = true, optional = true }
flate2 = { version = "1.1.4", optional = true }
futures = { workspace = true }
itertools = { workspace = true }
liblzma = { workspace = true, optional = true }
log = { workspace = true }
object_store = { workspace = true }
parking_lot = { workspace = true }
Expand All @@ -159,7 +160,6 @@ tempfile = { workspace = true }
tokio = { workspace = true }
url = { workspace = true }
uuid = { version = "1.18", features = ["v4", "js"] }
xz2 = { version = "0.1", optional = true, features = ["static"] }
zstd = { version = "0.13", optional = true, default-features = false }

[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ use datafusion_datasource_csv::partitioned_csv_config;
use flate2::write::GzEncoder;
#[cfg(feature = "compression")]
use flate2::Compression as GzCompression;
use object_store::local_unpartitioned_file;
#[cfg(feature = "compression")]
use xz2::write::XzEncoder;
use liblzma::write::XzEncoder;
use object_store::local_unpartitioned_file;
#[cfg(feature = "compression")]
use zstd::Encoder as ZstdEncoder;

Expand Down
6 changes: 3 additions & 3 deletions datafusion/datasource/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ version.workspace = true
all-features = true

[features]
compression = ["async-compression", "xz2", "bzip2", "flate2", "zstd", "tokio-util"]
compression = ["async-compression", "liblzma", "bzip2", "flate2", "zstd", "tokio-util"]
default = ["compression"]

[dependencies]
arrow = { workspace = true }
async-compression = { version = "0.4.19", features = [
async-compression = { version = "0.4.30", features = [
"bzip2",
"gzip",
"xz",
Expand All @@ -60,14 +60,14 @@ flate2 = { version = "1.1.4", optional = true }
futures = { workspace = true }
glob = "0.3.0"
itertools = { workspace = true }
liblzma = { workspace = true, optional = true }
log = { workspace = true }
object_store = { workspace = true }
rand = { workspace = true }
tempfile = { workspace = true, optional = true }
tokio = { workspace = true }
tokio-util = { version = "0.7.16", features = ["io"], optional = true }
url = { workspace = true }
xz2 = { version = "0.1", optional = true, features = ["static"] }
zstd = { version = "0.13", optional = true, default-features = false }

[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions datafusion/datasource/src/file_compression_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ use futures::stream::BoxStream;
use futures::StreamExt;
#[cfg(feature = "compression")]
use futures::TryStreamExt;
#[cfg(feature = "compression")]
use liblzma::read::XzDecoder;
use object_store::buffered::BufWriter;
use tokio::io::AsyncWrite;
#[cfg(feature = "compression")]
use tokio_util::io::{ReaderStream, StreamReader};
#[cfg(feature = "compression")]
use xz2::read::XzDecoder;
#[cfg(feature = "compression")]
use zstd::Decoder as ZstdDecoder;

/// Readable file compression type
Expand Down
2 changes: 1 addition & 1 deletion datafusion/ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ datafusion-proto = { workspace = true }
datafusion-proto-common = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
prost = { workspace = true }
prost = { version = "0.13" }
semver = "1.0.27"
tokio = { workspace = true }

Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ md-5 = { version = "^0.10.0", optional = true }
num-traits = { workspace = true }
rand = { workspace = true }
regex = { workspace = true, optional = true }
sha2 = { version = "^0.10.9", optional = true }
sha2 = { version = "^0.10.8", optional = true }
unicode-segmentation = { version = "^1.7.1", optional = true }
uuid = { version = "1.18", features = ["v4"], optional = true }

Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ itertools = { workspace = true, features = ["use_std"] }
parking_lot = { workspace = true }
paste = "^1.0"
petgraph = "0.8.3"
tokio = { workspace = true }

[dev-dependencies]
arrow = { workspace = true, features = ["test_utils"] }
Expand All @@ -79,3 +80,6 @@ name = "is_null"
[[bench]]
harness = false
name = "binary_op"

[package.metadata.cargo-machete]
ignored = ["half"]
102 changes: 100 additions & 2 deletions datafusion/physical-expr/src/expressions/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use parking_lot::RwLock;
use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};
use tokio::sync::watch;

use crate::PhysicalExpr;
use arrow::datatypes::{DataType, Schema};
Expand All @@ -27,6 +28,24 @@ use datafusion_common::{
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash};

/// State of a dynamic filter, tracking both updates and completion.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FilterState {
/// Filter is in progress and may receive more updates.
InProgress { generation: u64 },
/// Filter is complete and will not receive further updates.
Complete { generation: u64 },
}

impl FilterState {
fn generation(&self) -> u64 {
match self {
FilterState::InProgress { generation }
| FilterState::Complete { generation } => *generation,
}
}
}

/// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it.
///
/// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also
Expand All @@ -44,6 +63,8 @@ pub struct DynamicFilterPhysicalExpr {
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
/// The source of dynamic filters.
inner: Arc<RwLock<Inner>>,
/// Broadcasts filter state (updates and completion) to all waiters.
state_watch: watch::Sender<FilterState>,
/// For testing purposes track the data type and nullability to make sure they don't change.
/// If they do, there's a bug in the implementation.
/// But this can have overhead in production, so it's only included in our tests.
Expand All @@ -57,6 +78,10 @@ struct Inner {
/// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes.
generation: u64,
expr: Arc<dyn PhysicalExpr>,
/// Flag for quick synchronous check if filter is complete.
/// This is redundant with the watch channel state, but allows us to return immediately
/// from `wait_complete()` without subscribing if already complete.
is_complete: bool,
}

impl Inner {
Expand All @@ -66,6 +91,7 @@ impl Inner {
// This is not currently used anywhere but it seems useful to have this simple distinction.
generation: 1,
expr,
is_complete: false,
}
}

Expand Down Expand Up @@ -134,10 +160,12 @@ impl DynamicFilterPhysicalExpr {
children: Vec<Arc<dyn PhysicalExpr>>,
inner: Arc<dyn PhysicalExpr>,
) -> Self {
let (state_watch, _) = watch::channel(FilterState::InProgress { generation: 1 });
Self {
children,
remapped_children: None, // Initially no remapped children
inner: Arc::new(RwLock::new(Inner::new(inner))),
state_watch,
data_type: Arc::new(RwLock::new(None)),
nullable: Arc::new(RwLock::new(None)),
}
Expand Down Expand Up @@ -185,7 +213,7 @@ impl DynamicFilterPhysicalExpr {
Self::remap_children(&self.children, self.remapped_children.as_ref(), expr)
}

/// Update the current expression.
/// Update the current expression and notify all waiters.
/// Any children of this expression must be a subset of the original children
/// passed to the constructor.
/// This should be called e.g.:
Expand All @@ -204,13 +232,68 @@ impl DynamicFilterPhysicalExpr {

// Load the current inner, increment generation, and store the new one
let mut current = self.inner.write();
let new_generation = current.generation + 1;
*current = Inner {
generation: current.generation + 1,
generation: new_generation,
expr: new_expr,
is_complete: current.is_complete,
};
drop(current); // Release the lock before broadcasting

// Broadcast the new state to all waiters
let _ = self.state_watch.send(FilterState::InProgress {
generation: new_generation,
});
Ok(())
}

/// Mark this dynamic filter as complete and broadcast to all waiters.
///
/// This signals that all expected updates have been received.
/// Waiters using [`Self::wait_complete`] will be notified.
pub fn mark_complete(&self) {
let mut current = self.inner.write();
let current_generation = current.generation;
current.is_complete = true;
drop(current);

// Broadcast completion to all waiters
let _ = self.state_watch.send(FilterState::Complete {
generation: current_generation,
});
}

/// Wait asynchronously for any update to this filter.
///
/// This method will return when [`Self::update`] is called and the generation increases.
/// It does not guarantee that the filter is complete.
pub async fn wait_update(&self) {
let mut rx = self.state_watch.subscribe();
// Get the current generation
let current_gen = rx.borrow_and_update().generation();

// Wait until generation increases
let _ = rx.wait_for(|state| state.generation() > current_gen).await;
}

/// Wait asynchronously until this dynamic filter is marked as complete.
///
/// This method returns immediately if the filter is already complete.
/// Otherwise, it waits until [`Self::mark_complete`] is called.
///
/// Unlike [`Self::wait_update`], this method guarantees that when it returns,
/// the filter is fully complete with no more updates expected.
pub async fn wait_complete(&self) {
if self.inner.read().is_complete {
return;
}

let mut rx = self.state_watch.subscribe();
let _ = rx
.wait_for(|state| matches!(state, FilterState::Complete { .. }))
.await;
}

fn render(
&self,
f: &mut std::fmt::Formatter<'_>,
Expand Down Expand Up @@ -253,6 +336,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
children: self.children.clone(),
remapped_children: Some(children),
inner: Arc::clone(&self.inner),
state_watch: self.state_watch.clone(),
data_type: Arc::clone(&self.data_type),
nullable: Arc::clone(&self.nullable),
}))
Expand Down Expand Up @@ -509,4 +593,18 @@ mod test {
"Expected err when evaluate is called after changing the expression."
);
}

#[tokio::test]
async fn test_wait_complete_already_complete() {
let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
vec![],
lit(42) as Arc<dyn PhysicalExpr>,
));

// Mark as complete immediately
dynamic_filter.mark_complete();

// wait_complete should return immediately
dynamic_filter.wait_complete().await;
}
}
Loading
Loading