diff --git a/Cargo.lock b/Cargo.lock index d712eecfcc72e..4a093b4848e7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -184,15 +184,16 @@ checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" [[package]] name = "apache-avro" -version = "0.20.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a033b4ced7c585199fb78ef50fca7fe2f444369ec48080c5fd072efa1a03cc7" +checksum = "36fa98bc79671c7981272d91a8753a928ff6a1cd8e4f20a44c45bd5d313840bf" dependencies = [ "bigdecimal", "bon", - "bzip2 0.6.1", + "bzip2", "crc32fast", "digest", + "liblzma", "log", "miniz_oxide", "num-bigint", @@ -207,7 +208,6 @@ dependencies = [ "strum_macros 0.27.2", "thiserror", "uuid", - "xz2", "zstd", ] @@ -363,8 +363,8 @@ dependencies = [ "futures", "once_cell", "paste", - "prost", - "prost-types", + "prost 0.14.1", + "prost-types 0.14.1", "tonic", "tonic-prost", ] @@ -520,19 +520,15 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.19" +version = "0.4.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06575e6a9673580f52661c92107baabffbf41e2141373441cbcdc47cb733003c" +checksum = "93c1f86859c1af3d514fa19e8323147ff10ea98684e6c7b307912509f50e67b2" dependencies = [ - "bzip2 0.5.2", - "flate2", + "compression-codecs", + "compression-core", "futures-core", - "memchr", "pin-project-lite", "tokio", - "xz2", - "zstd", - "zstd-safe", ] [[package]] @@ -1026,9 +1022,9 @@ dependencies = [ [[package]] name = "bigdecimal" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a22f228ab7a1b23027ccc6c350b72868017af7ea8356fbdf19f8d991c690013" +checksum = "560f42649de9fa436b73517378a147ec21f6c997a546581df4b4b31677828934" dependencies = [ "autocfg", "libm", @@ -1166,8 +1162,8 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85a885520bf6249ab931a764ffdb87b0ceef48e6e7d807cfdb21b751e086e1ad" dependencies = [ - "prost", - "prost-types", + "prost 0.14.1", + "prost-types 0.14.1", "tonic", "tonic-prost", "ureq", @@ -1183,7 +1179,7 @@ dependencies = [ "bollard-buildkit-proto", "bytes", "chrono", - "prost", + "prost 0.14.1", "serde", "serde_json", "serde_repr", @@ -1192,9 +1188,9 @@ dependencies = [ [[package]] name = "bon" -version = "3.7.2" +version = "3.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2529c31017402be841eb45892278a6c21a000c0a17643af326c73a73f83f0fb" +checksum = "ebeb9aaf9329dff6ceb65c689ca3db33dbf15f324909c60e4e5eef5701ce31b1" dependencies = [ "bon-macros", "rustversion", @@ -1202,9 +1198,9 @@ dependencies = [ [[package]] name = "bon-macros" -version = "3.7.2" +version = "3.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d82020dadcb845a345591863adb65d74fa8dc5c18a0b6d408470e13b7adc7005" +checksum = "77e9d642a7e3a318e37c2c9427b5a6a48aa1ad55dcd986f3034ab2239045a645" dependencies = [ "darling", "ident_case", @@ -1319,15 +1315,6 @@ dependencies = [ "either", ] -[[package]] -name = "bzip2" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49ecfb22d906f800d4fe833b6282cf4dc1c298f5057ca0b5445e5c209735ca47" -dependencies = [ - "bzip2-sys", -] - [[package]] name = "bzip2" version = "0.6.1" @@ -1337,16 +1324,6 @@ dependencies = [ "libbz2-rs-sys", ] -[[package]] -name = "bzip2-sys" -version = "0.1.13+1.0.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "225bff33b2141874fe80d71e07d6eec4f85c5c216453dd96388240f96e1acc14" -dependencies = [ - "cc", - "pkg-config", -] - [[package]] name = "cast" version = "0.3.0" @@ -1534,6 +1511,27 @@ dependencies = [ "unicode-width 0.2.1", ] +[[package]] +name = "compression-codecs" +version = "0.4.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "680dc087785c5230f8e8843e2e57ac7c1c90488b6a91b88caa265410568f441b" +dependencies = [ + "bzip2", + "compression-core", + "flate2", + "liblzma", + "memchr", + "zstd", + "zstd-safe", +] + +[[package]] +name = "compression-core" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a9b614a5787ef0c8802a55766480563cb3a93b435898c422ed2a359cf811582" + [[package]] name = "console" version = "0.15.11" @@ -1838,7 +1836,7 @@ dependencies = [ "arrow-schema", "async-trait", "bytes", - "bzip2 0.6.1", + "bzip2", "chrono", "criterion", "ctor", @@ -1879,6 +1877,7 @@ dependencies = [ "glob", "insta", "itertools 0.14.0", + "liblzma", "log", "nix", "object_store", @@ -1898,7 +1897,6 @@ dependencies = [ "tokio", "url", "uuid", - "xz2", "zstd", ] @@ -2049,7 +2047,7 @@ dependencies = [ "async-compression", "async-trait", "bytes", - "bzip2 0.6.1", + "bzip2", "chrono", "criterion", "datafusion-common", @@ -2065,6 +2063,7 @@ dependencies = [ "futures", "glob", "itertools 0.14.0", + "liblzma", "log", "object_store", "rand 0.9.2", @@ -2072,7 +2071,6 @@ dependencies = [ "tokio", "tokio-util", "url", - "xz2", "zstd", ] @@ -2213,7 +2211,7 @@ dependencies = [ "mimalloc", "nix", "object_store", - "prost", + "prost 0.14.1", "rand 0.9.2", "serde_json", "tempfile", @@ -2299,7 +2297,7 @@ dependencies = [ "doc-comment", "futures", "log", - "prost", + "prost 0.13.5", "semver", "tokio", ] @@ -2490,6 +2488,7 @@ dependencies = [ "petgraph 0.8.3", "rand 0.9.2", "rstest", + "tokio", ] [[package]] @@ -2601,9 +2600,9 @@ dependencies = [ "datafusion-proto-common", "doc-comment", "object_store", - "pbjson", + "pbjson 0.8.0", "pretty_assertions", - "prost", + "prost 0.13.5", "serde", "serde_json", "tokio", @@ -2616,8 +2615,8 @@ dependencies = [ "arrow", "datafusion-common", "doc-comment", - "pbjson", - "prost", + "pbjson 0.8.0", + "prost 0.13.5", "serde", ] @@ -2745,7 +2744,7 @@ dependencies = [ "itertools 0.14.0", "object_store", "pbjson-types", - "prost", + "prost 0.13.5", "serde_json", "substrait", "tokio", @@ -3228,16 +3227,16 @@ dependencies = [ name = "gen" version = "0.1.0" dependencies = [ - "pbjson-build", - "prost-build", + "pbjson-build 0.8.0", + "prost-build 0.14.1", ] [[package]] name = "gen-common" version = "0.1.0" dependencies = [ - "pbjson-build", - "prost-build", + "pbjson-build 0.8.0", + "prost-build 0.14.1", ] [[package]] @@ -3988,6 +3987,26 @@ dependencies = [ "windows-link 0.2.0", ] +[[package]] +name = "liblzma" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73c36d08cad03a3fbe2c4e7bb3a9e84c57e4ee4135ed0b065cade3d98480c648" +dependencies = [ + "liblzma-sys", +] + +[[package]] +name = "liblzma-sys" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01b9596486f6d60c3bbe644c0e1be1aa6ccc472ad630fe8927b456973d7cb736" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "libm" version = "0.2.15" @@ -4080,17 +4099,6 @@ dependencies = [ "twox-hash", ] -[[package]] -name = "lzma-sys" -version = "0.1.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fda04ab3764e6cde78b9974eec4f779acaba7c4e84b36eca3cf77c581b85d27" -dependencies = [ - "cc", - "libc", - "pkg-config", -] - [[package]] name = "matchit" version = "0.8.4" @@ -4509,6 +4517,16 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pbjson" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7e6349fa080353f4a597daffd05cb81572a9c031a6d4fff7e504947496fcc68" +dependencies = [ + "base64 0.21.7", + "serde", +] + [[package]] name = "pbjson" version = "0.8.0" @@ -4519,6 +4537,18 @@ dependencies = [ "serde", ] +[[package]] +name = "pbjson-build" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6eea3058763d6e656105d1403cb04e0a41b7bbac6362d413e7c33be0c32279c9" +dependencies = [ + "heck 0.5.0", + "itertools 0.13.0", + "prost 0.13.5", + "prost-types 0.13.5", +] + [[package]] name = "pbjson-build" version = "0.8.0" @@ -4527,22 +4557,22 @@ checksum = "af22d08a625a2213a78dbb0ffa253318c5c79ce3133d32d296655a7bdfb02095" dependencies = [ "heck 0.5.0", "itertools 0.14.0", - "prost", - "prost-types", + "prost 0.14.1", + "prost-types 0.14.1", ] [[package]] name = "pbjson-types" -version = "0.8.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e748e28374f10a330ee3bb9f29b828c0ac79831a32bab65015ad9b661ead526" +checksum = "e54e5e7bfb1652f95bc361d76f3c780d8e526b134b85417e774166ee941f0887" dependencies = [ "bytes", "chrono", - "pbjson", - "pbjson-build", - "prost", - "prost-build", + "pbjson 0.7.0", + "pbjson-build 0.7.0", + "prost 0.13.5", + "prost-build 0.13.5", "serde", ] @@ -4821,6 +4851,16 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive 0.13.5", +] + [[package]] name = "prost" version = "0.14.1" @@ -4828,7 +4868,27 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.14.1", +] + +[[package]] +name = "prost-build" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" +dependencies = [ + "heck 0.5.0", + "itertools 0.14.0", + "log", + "multimap", + "once_cell", + "petgraph 0.7.1", + "prettyplease", + "prost 0.13.5", + "prost-types 0.13.5", + "regex", + "syn 2.0.108", + "tempfile", ] [[package]] @@ -4844,13 +4904,26 @@ dependencies = [ "once_cell", "petgraph 0.7.1", "prettyplease", - "prost", - "prost-types", + "prost 0.14.1", + "prost-types 0.14.1", "regex", "syn 2.0.108", "tempfile", ] +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn 2.0.108", +] + [[package]] name = "prost-derive" version = "0.14.1" @@ -4864,13 +4937,22 @@ dependencies = [ "syn 2.0.108", ] +[[package]] +name = "prost-types" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +dependencies = [ + "prost 0.13.5", +] + [[package]] name = "prost-types" version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9b4db3d6da204ed77bb26ba83b6122a73aeb2e87e25fbf7ad2e84c4ccbf8f72" dependencies = [ - "prost", + "prost 0.14.1", ] [[package]] @@ -5248,9 +5330,9 @@ dependencies = [ [[package]] name = "regex-lite" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "943f41321c63ef1c92fd763bfe054d2668f7f225a5c29f0105903dc2fc04ba30" +checksum = "8d942b98df5e658f56f20d592c7f868833fe38115e65c33003d8cd224b0155da" [[package]] name = "regex-syntax" @@ -6111,18 +6193,18 @@ dependencies = [ [[package]] name = "substrait" -version = "0.62.0" +version = "0.58.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21f1cb6d0bcd097a39fc25f7236236be29881fe122e282e4173d6d007a929927" +checksum = "de6d24c270c6c672a86c183c3a8439ba46c1936f93cf7296aa692de3b0ff0228" dependencies = [ "heck 0.5.0", - "pbjson", - "pbjson-build", + "pbjson 0.7.0", + "pbjson-build 0.7.0", "pbjson-types", "prettyplease", - "prost", - "prost-build", - "prost-types", + "prost 0.13.5", + "prost-build 0.13.5", + "prost-types 0.13.5", "protobuf-src", "regress", "schemars 0.8.22", @@ -6549,7 +6631,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" dependencies = [ "bytes", - "prost", + "prost 0.14.1", "tonic", ] @@ -6706,9 +6788,9 @@ checksum = "f8c1ae7cc0fdb8b842d65d127cb981574b0d2b249b74d1c7a2986863dc134f71" [[package]] name = "typify" -version = "0.5.0" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6d5bcc6f62eb1fa8aa4098f39b29f93dcb914e17158b76c50360911257aa629" +checksum = "7144144e97e987c94758a3017c920a027feac0799df325d6df4fc8f08d02068e" dependencies = [ "typify-impl", "typify-macro", @@ -6716,9 +6798,9 @@ dependencies = [ [[package]] name = "typify-impl" -version = "0.5.0" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1eb359f7ffa4f9ebe947fa11a1b2da054564502968db5f317b7e37693cb2240" +checksum = "062879d46aa4c9dfe0d33b035bbaf512da192131645d05deacb7033ec8581a09" dependencies = [ "heck 0.5.0", "log", @@ -6736,9 +6818,9 @@ dependencies = [ [[package]] name = "typify-macro" -version = "0.5.0" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "911c32f3c8514b048c1b228361bebb5e6d73aeec01696e8cc0e82e2ffef8ab7a" +checksum = "9708a3ceb6660ba3f8d2b8f0567e7d4b8b198e2b94d093b8a6077a751425de9e" dependencies = [ "proc-macro2", "quote", @@ -7490,15 +7572,6 @@ version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" -[[package]] -name = "xz2" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "388c44dc09d76f1536602ead6d325eb532f5c122f17782bd57fb47baeeb767e2" -dependencies = [ - "lzma-sys", -] - [[package]] name = "yansi" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index 36198430e40b1..5bf54e674c899 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,7 +90,7 @@ version = "51.0.0" ahash = { version = "0.8", default-features = false, features = [ "runtime-rng", ] } -apache-avro = { version = "0.20", default-features = false } +apache-avro = { version = "0.21", default-features = false } arrow = { version = "57.0.0", features = [ "prettyprint", "chrono-tz", @@ -105,7 +105,7 @@ arrow-ipc = { version = "57.0.0", default-features = false, features = [ arrow-ord = { version = "57.0.0", default-features = false } arrow-schema = { version = "57.0.0", default-features = false } async-trait = "0.1.89" -bigdecimal = "0.4.8" +bigdecimal = "0.4.9" bytes = "1.10" chrono = { version = "0.4.42", default-features = false } criterion = "0.7" @@ -158,6 +158,7 @@ hex = { version = "0.4.3" } indexmap = "2.12.0" insta = { version = "1.43.2", features = ["glob", "filters"] } itertools = "0.14" +liblzma = { version = "0.4.4", features = ["static"] } log = "^0.4" num-traits = { version = "0.2" } object_store = { version = "0.12.4", default-features = false } diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index b222ae12b92f5..030c53101cb80 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -56,7 +56,7 @@ sql = ["sqlparser"] [dependencies] ahash = { workspace = true } -apache-avro = { version = "0.20", default-features = false, features = [ +apache-avro = { workspace = true, features = [ "bzip", "snappy", "xz", diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 67a73ac6f6693..494baa187ca7c 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -46,7 +46,7 @@ array_expressions = ["nested_expressions"] avro = ["datafusion-common/avro", "datafusion-datasource-avro"] backtrace = ["datafusion-common/backtrace"] compression = [ - "xz2", + "liblzma", "bzip2", "flate2", "zstd", @@ -146,6 +146,7 @@ datafusion-sql = { workspace = true, optional = true } flate2 = { version = "1.1.4", optional = true } futures = { workspace = true } itertools = { workspace = true } +liblzma = { workspace = true, optional = true } log = { workspace = true } object_store = { workspace = true } parking_lot = { workspace = true } @@ -159,7 +160,6 @@ tempfile = { workspace = true } tokio = { workspace = true } url = { workspace = true } uuid = { version = "1.18", features = ["v4", "js"] } -xz2 = { version = "0.1", optional = true, features = ["static"] } zstd = { version = "0.13", optional = true, default-features = false } [dev-dependencies] diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 68f83e7f1f115..2fe9d03c3ddba 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -53,9 +53,9 @@ use datafusion_datasource_csv::partitioned_csv_config; use flate2::write::GzEncoder; #[cfg(feature = "compression")] use flate2::Compression as GzCompression; -use object_store::local_unpartitioned_file; #[cfg(feature = "compression")] -use xz2::write::XzEncoder; +use liblzma::write::XzEncoder; +use object_store::local_unpartitioned_file; #[cfg(feature = "compression")] use zstd::Encoder as ZstdEncoder; diff --git a/datafusion/datasource/Cargo.toml b/datafusion/datasource/Cargo.toml index 19b247829dbd2..e37752aa54ff5 100644 --- a/datafusion/datasource/Cargo.toml +++ b/datafusion/datasource/Cargo.toml @@ -31,12 +31,12 @@ version.workspace = true all-features = true [features] -compression = ["async-compression", "xz2", "bzip2", "flate2", "zstd", "tokio-util"] +compression = ["async-compression", "liblzma", "bzip2", "flate2", "zstd", "tokio-util"] default = ["compression"] [dependencies] arrow = { workspace = true } -async-compression = { version = "0.4.19", features = [ +async-compression = { version = "0.4.30", features = [ "bzip2", "gzip", "xz", @@ -60,6 +60,7 @@ flate2 = { version = "1.1.4", optional = true } futures = { workspace = true } glob = "0.3.0" itertools = { workspace = true } +liblzma = { workspace = true, optional = true } log = { workspace = true } object_store = { workspace = true } rand = { workspace = true } @@ -67,7 +68,6 @@ tempfile = { workspace = true, optional = true } tokio = { workspace = true } tokio-util = { version = "0.7.16", features = ["io"], optional = true } url = { workspace = true } -xz2 = { version = "0.1", optional = true, features = ["static"] } zstd = { version = "0.13", optional = true, default-features = false } [dev-dependencies] diff --git a/datafusion/datasource/src/file_compression_type.rs b/datafusion/datasource/src/file_compression_type.rs index 7cc3142564e9b..9ca5d8763b74a 100644 --- a/datafusion/datasource/src/file_compression_type.rs +++ b/datafusion/datasource/src/file_compression_type.rs @@ -43,13 +43,13 @@ use futures::stream::BoxStream; use futures::StreamExt; #[cfg(feature = "compression")] use futures::TryStreamExt; +#[cfg(feature = "compression")] +use liblzma::read::XzDecoder; use object_store::buffered::BufWriter; use tokio::io::AsyncWrite; #[cfg(feature = "compression")] use tokio_util::io::{ReaderStream, StreamReader}; #[cfg(feature = "compression")] -use xz2::read::XzDecoder; -#[cfg(feature = "compression")] use zstd::Decoder as ZstdDecoder; /// Readable file compression type diff --git a/datafusion/ffi/Cargo.toml b/datafusion/ffi/Cargo.toml index 3ac08180fb68c..baa6d1299c0b8 100644 --- a/datafusion/ffi/Cargo.toml +++ b/datafusion/ffi/Cargo.toml @@ -53,7 +53,7 @@ datafusion-proto = { workspace = true } datafusion-proto-common = { workspace = true } futures = { workspace = true } log = { workspace = true } -prost = { workspace = true } +prost = { version = "0.13" } semver = "1.0.27" tokio = { workspace = true } diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index ad52a551a7c17..f14e7f06253d9 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -84,7 +84,7 @@ md-5 = { version = "^0.10.0", optional = true } num-traits = { workspace = true } rand = { workspace = true } regex = { workspace = true, optional = true } -sha2 = { version = "^0.10.9", optional = true } +sha2 = { version = "^0.10.8", optional = true } unicode-segmentation = { version = "^1.7.1", optional = true } uuid = { version = "1.18", features = ["v4"], optional = true } diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 953a46929c394..69670941ad4f7 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -55,6 +55,7 @@ itertools = { workspace = true, features = ["use_std"] } parking_lot = { workspace = true } paste = "^1.0" petgraph = "0.8.3" +tokio = { workspace = true } [dev-dependencies] arrow = { workspace = true, features = ["test_utils"] } @@ -79,3 +80,6 @@ name = "is_null" [[bench]] harness = false name = "binary_op" + +[package.metadata.cargo-machete] +ignored = ["half"] diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 964a193db833a..43a242472beca 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -17,6 +17,7 @@ use parking_lot::RwLock; use std::{any::Any, fmt::Display, hash::Hash, sync::Arc}; +use tokio::sync::watch; use crate::PhysicalExpr; use arrow::datatypes::{DataType, Schema}; @@ -27,6 +28,24 @@ use datafusion_common::{ use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash}; +/// State of a dynamic filter, tracking both updates and completion. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum FilterState { + /// Filter is in progress and may receive more updates. + InProgress { generation: u64 }, + /// Filter is complete and will not receive further updates. + Complete { generation: u64 }, +} + +impl FilterState { + fn generation(&self) -> u64 { + match self { + FilterState::InProgress { generation } + | FilterState::Complete { generation } => *generation, + } + } +} + /// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it. /// /// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also @@ -44,6 +63,8 @@ pub struct DynamicFilterPhysicalExpr { remapped_children: Option>>, /// The source of dynamic filters. inner: Arc>, + /// Broadcasts filter state (updates and completion) to all waiters. + state_watch: watch::Sender, /// For testing purposes track the data type and nullability to make sure they don't change. /// If they do, there's a bug in the implementation. /// But this can have overhead in production, so it's only included in our tests. @@ -57,6 +78,10 @@ struct Inner { /// This is used for [`PhysicalExpr::snapshot_generation`] to have a cheap check for changes. generation: u64, expr: Arc, + /// Flag for quick synchronous check if filter is complete. + /// This is redundant with the watch channel state, but allows us to return immediately + /// from `wait_complete()` without subscribing if already complete. + is_complete: bool, } impl Inner { @@ -66,6 +91,7 @@ impl Inner { // This is not currently used anywhere but it seems useful to have this simple distinction. generation: 1, expr, + is_complete: false, } } @@ -134,10 +160,12 @@ impl DynamicFilterPhysicalExpr { children: Vec>, inner: Arc, ) -> Self { + let (state_watch, _) = watch::channel(FilterState::InProgress { generation: 1 }); Self { children, remapped_children: None, // Initially no remapped children inner: Arc::new(RwLock::new(Inner::new(inner))), + state_watch, data_type: Arc::new(RwLock::new(None)), nullable: Arc::new(RwLock::new(None)), } @@ -185,7 +213,7 @@ impl DynamicFilterPhysicalExpr { Self::remap_children(&self.children, self.remapped_children.as_ref(), expr) } - /// Update the current expression. + /// Update the current expression and notify all waiters. /// Any children of this expression must be a subset of the original children /// passed to the constructor. /// This should be called e.g.: @@ -204,13 +232,68 @@ impl DynamicFilterPhysicalExpr { // Load the current inner, increment generation, and store the new one let mut current = self.inner.write(); + let new_generation = current.generation + 1; *current = Inner { - generation: current.generation + 1, + generation: new_generation, expr: new_expr, + is_complete: current.is_complete, }; + drop(current); // Release the lock before broadcasting + + // Broadcast the new state to all waiters + let _ = self.state_watch.send(FilterState::InProgress { + generation: new_generation, + }); Ok(()) } + /// Mark this dynamic filter as complete and broadcast to all waiters. + /// + /// This signals that all expected updates have been received. + /// Waiters using [`Self::wait_complete`] will be notified. + pub fn mark_complete(&self) { + let mut current = self.inner.write(); + let current_generation = current.generation; + current.is_complete = true; + drop(current); + + // Broadcast completion to all waiters + let _ = self.state_watch.send(FilterState::Complete { + generation: current_generation, + }); + } + + /// Wait asynchronously for any update to this filter. + /// + /// This method will return when [`Self::update`] is called and the generation increases. + /// It does not guarantee that the filter is complete. + pub async fn wait_update(&self) { + let mut rx = self.state_watch.subscribe(); + // Get the current generation + let current_gen = rx.borrow_and_update().generation(); + + // Wait until generation increases + let _ = rx.wait_for(|state| state.generation() > current_gen).await; + } + + /// Wait asynchronously until this dynamic filter is marked as complete. + /// + /// This method returns immediately if the filter is already complete. + /// Otherwise, it waits until [`Self::mark_complete`] is called. + /// + /// Unlike [`Self::wait_update`], this method guarantees that when it returns, + /// the filter is fully complete with no more updates expected. + pub async fn wait_complete(&self) { + if self.inner.read().is_complete { + return; + } + + let mut rx = self.state_watch.subscribe(); + let _ = rx + .wait_for(|state| matches!(state, FilterState::Complete { .. })) + .await; + } + fn render( &self, f: &mut std::fmt::Formatter<'_>, @@ -253,6 +336,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { children: self.children.clone(), remapped_children: Some(children), inner: Arc::clone(&self.inner), + state_watch: self.state_watch.clone(), data_type: Arc::clone(&self.data_type), nullable: Arc::clone(&self.nullable), })) @@ -509,4 +593,18 @@ mod test { "Expected err when evaluate is called after changing the expression." ); } + + #[tokio::test] + async fn test_wait_complete_already_complete() { + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new( + vec![], + lit(42) as Arc, + )); + + // Mark as complete immediately + dynamic_filter.mark_complete(); + + // wait_complete should return immediately + dynamic_filter.wait_complete().await; + } } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index c552e6954c8f9..a250ce542e6b1 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -4500,4 +4500,103 @@ mod tests { fn columns(schema: &Schema) -> Vec { schema.fields().iter().map(|f| f.name().clone()).collect() } + + /// This test verifies that the dynamic filter is marked as complete after HashJoinExec finishes building the hash table. + #[tokio::test] + async fn test_hash_join_marks_filter_complete() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + let left = build_table( + ("a1", &vec![1, 2, 3]), + ("b1", &vec![4, 5, 6]), + ("c1", &vec![7, 8, 9]), + ); + let right = build_table( + ("a2", &vec![10, 20, 30]), + ("b1", &vec![4, 5, 6]), + ("c2", &vec![70, 80, 90]), + ); + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + // Create a dynamic filter manually + let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); + let dynamic_filter_clone = Arc::clone(&dynamic_filter); + + // Create HashJoinExec with the dynamic filter + let mut join = HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + )?; + join.dynamic_filter = Some(HashJoinExecDynamicFilter { + filter: dynamic_filter, + bounds_accumulator: OnceLock::new(), + }); + + // Execute the join + let stream = join.execute(0, task_ctx)?; + let _batches = common::collect(stream).await?; + + // After the join completes, the dynamic filter should be marked as complete + // wait_complete() should return immediately + dynamic_filter_clone.wait_complete().await; + + Ok(()) + } + + /// This test verifies that the dynamic filter is marked as complete even when the build side is empty. + #[tokio::test] + async fn test_hash_join_marks_filter_complete_empty_build_side() -> Result<()> { + let task_ctx = Arc::new(TaskContext::default()); + // Empty left side (build side) + let left = build_table(("a1", &vec![]), ("b1", &vec![]), ("c1", &vec![])); + let right = build_table( + ("a2", &vec![10, 20, 30]), + ("b1", &vec![4, 5, 6]), + ("c2", &vec![70, 80, 90]), + ); + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + // Create a dynamic filter manually + let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); + let dynamic_filter_clone = Arc::clone(&dynamic_filter); + + // Create HashJoinExec with the dynamic filter + let mut join = HashJoinExec::try_new( + left, + right, + on, + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + )?; + join.dynamic_filter = Some(HashJoinExecDynamicFilter { + filter: dynamic_filter, + bounds_accumulator: OnceLock::new(), + }); + + // Execute the join + let stream = join.execute(0, task_ctx)?; + let _batches = common::collect(stream).await?; + + // Even with empty build side, the dynamic filter should be marked as complete + // wait_complete() should return immediately + dynamic_filter_clone.wait_complete().await; + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 25f7a0de31acd..e83f89e4e5d78 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -300,6 +300,7 @@ impl SharedBoundsAccumulator { self.create_filter_from_partition_bounds(&inner.bounds)?; self.dynamic_filter.update(filter_expr)?; } + self.dynamic_filter.mark_complete(); } Ok(()) diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 9435de1cc4488..6be6243513b0f 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -589,10 +589,13 @@ impl TopK { common_sort_prefix_converter: _, common_sort_prefix: _, finished: _, - filter: _, + filter, } = self; let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop + // Mark the dynamic filter as complete now that TopK processing is finished. + filter.read().expr().mark_complete(); + // break into record batches as needed let mut batches = vec![]; if let Some(mut batch) = heap.emit()? { @@ -1196,4 +1199,52 @@ mod tests { Ok(()) } + + /// This test verifies that the dynamic filter is marked as complete after TopK processing finishes. + #[tokio::test] + async fn test_topk_marks_filter_complete() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + + let sort_expr = PhysicalSortExpr { + expr: col("a", schema.as_ref())?, + options: SortOptions::default(), + }; + + let full_expr = LexOrdering::from([sort_expr.clone()]); + let prefix = vec![sort_expr]; + + // Create a dummy runtime environment and metrics + let runtime = Arc::new(RuntimeEnv::default()); + let metrics = ExecutionPlanMetricsSet::new(); + + // Create a dynamic filter that we'll check for completion + let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], lit(true))); + let dynamic_filter_clone = Arc::clone(&dynamic_filter); + + // Create a TopK instance + let mut topk = TopK::try_new( + 0, + Arc::clone(&schema), + prefix, + full_expr, + 2, + 10, + runtime, + &metrics, + Arc::new(RwLock::new(TopKDynamicFilters::new(dynamic_filter))), + )?; + + let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(1), Some(2)])); + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![array])?; + topk.insert_batch(batch)?; + + // Call emit to finish TopK processing + let _results: Vec<_> = topk.emit()?.try_collect().await?; + + // After emit is called, the dynamic filter should be marked as complete + // wait_complete() should return immediately + dynamic_filter_clone.wait_complete().await; + + Ok(()) + } } diff --git a/datafusion/proto-common/Cargo.toml b/datafusion/proto-common/Cargo.toml index c67c8892a3ded..a5618e1627384 100644 --- a/datafusion/proto-common/Cargo.toml +++ b/datafusion/proto-common/Cargo.toml @@ -45,7 +45,7 @@ json = ["serde", "pbjson"] arrow = { workspace = true } datafusion-common = { workspace = true } pbjson = { workspace = true, optional = true } -prost = { workspace = true } +prost = { version = "0.13" } serde = { version = "1.0", optional = true } [dev-dependencies] diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index 920e277b8ccc0..99e4c78fe580d 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -68,7 +68,7 @@ datafusion-physical-plan = { workspace = true } datafusion-proto-common = { workspace = true } object_store = { workspace = true } pbjson = { workspace = true, optional = true } -prost = { workspace = true } +prost = { version = "0.13" } serde = { version = "1.0", optional = true } serde_json = { workspace = true, optional = true } diff --git a/datafusion/substrait/Cargo.toml b/datafusion/substrait/Cargo.toml index 0d7e34881c9cb..f5d81d41551f9 100644 --- a/datafusion/substrait/Cargo.toml +++ b/datafusion/substrait/Cargo.toml @@ -41,9 +41,9 @@ datafusion = { workspace = true, features = ["sql"] } half = { workspace = true } itertools = { workspace = true } object_store = { workspace = true } -pbjson-types = { workspace = true } -prost = { workspace = true } -substrait = { version = "0.62", features = ["serde"] } +pbjson-types = { version = "0.7" } +prost = { version = "0.13" } +substrait = { version = "0.58", features = ["serde"] } url = { workspace = true } tokio = { workspace = true, features = ["fs"] } uuid = { version = "1.17.0", features = ["v4"] } diff --git a/datafusion/substrait/src/extensions.rs b/datafusion/substrait/src/extensions.rs index 0792928982268..b66da95480d94 100644 --- a/datafusion/substrait/src/extensions.rs +++ b/datafusion/substrait/src/extensions.rs @@ -121,7 +121,6 @@ impl From for Vec { for (f_anchor, f_name) in val.functions { let function_extension = ExtensionFunction { extension_uri_reference: u32::MAX, - extension_urn_reference: u32::MAX, function_anchor: f_anchor, name: f_name, }; @@ -134,7 +133,6 @@ impl From for Vec { for (t_anchor, t_name) in val.types { let type_extension = ExtensionType { extension_uri_reference: u32::MAX, // https://github.com/apache/datafusion/issues/11545 - extension_urn_reference: u32::MAX, // https://github.com/apache/datafusion/issues/11545 type_anchor: t_anchor, name: t_name, }; @@ -147,7 +145,6 @@ impl From for Vec { for (tv_anchor, tv_name) in val.type_variations { let type_variation_extension = ExtensionTypeVariation { extension_uri_reference: u32::MAX, // We don't register proper extension URIs yet - extension_urn_reference: u32::MAX, // We don't register proper extension URIs yet type_variation_anchor: tv_anchor, name: tv_name, }; diff --git a/datafusion/substrait/src/logical_plan/producer/expr/mod.rs b/datafusion/substrait/src/logical_plan/producer/expr/mod.rs index f4e43fd586773..9fbef97d4158f 100644 --- a/datafusion/substrait/src/logical_plan/producer/expr/mod.rs +++ b/datafusion/substrait/src/logical_plan/producer/expr/mod.rs @@ -88,7 +88,6 @@ pub fn to_substrait_extended_expr( advanced_extensions: None, expected_type_urls: vec![], extension_uris: vec![], - extension_urns: vec![], extensions: extensions.into(), version: Some(version::version_with_producer("datafusion")), referred_expr: substrait_exprs, diff --git a/datafusion/substrait/src/logical_plan/producer/plan.rs b/datafusion/substrait/src/logical_plan/producer/plan.rs index ad8f45ec3606f..070e063f670d8 100644 --- a/datafusion/substrait/src/logical_plan/producer/plan.rs +++ b/datafusion/substrait/src/logical_plan/producer/plan.rs @@ -48,13 +48,11 @@ pub fn to_substrait_plan( Ok(Box::new(Plan { version: Some(version::version_with_producer("datafusion")), extension_uris: vec![], - extension_urns: vec![], extensions: extensions.into(), relations: plan_rels, advanced_extensions: None, expected_type_urls: vec![], parameter_bindings: vec![], - type_aliases: vec![], })) }