diff --git a/Cargo.lock b/Cargo.lock index 78121abd..9e907d7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -81,6 +81,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "ansi_term" version = "0.12.1" @@ -207,9 +213,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb372a7cbcac02a35d3fb7b3fc1f969ec078e871f9bb899bf00a2e1809bec8a3" +checksum = "2a2b10dcb159faf30d3f81f6d56c1211a5bea2ca424eabe477648a44b993320e" dependencies = [ "arrow-arith", "arrow-array", @@ -228,9 +234,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f377dcd19e440174596d83deb49cd724886d91060c07fec4f67014ef9d54049" +checksum = "288015089e7931843c80ed4032c5274f02b37bcb720c4a42096d50b390e70372" dependencies = [ "arrow-array", "arrow-buffer", @@ -242,9 +248,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a23eaff85a44e9fa914660fb0d0bb00b79c4a3d888b5334adb3ea4330c84f002" +checksum = "65ca404ea6191e06bf30956394173337fa9c35f445bd447fe6c21ab944e1a23c" dependencies = [ "ahash", "arrow-buffer", @@ -261,9 +267,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2819d893750cb3380ab31ebdc8c68874dd4429f90fd09180f3c93538bd21626" +checksum = "36356383099be0151dacc4245309895f16ba7917d79bdb71a7148659c9206c56" dependencies = [ "bytes", "half", @@ -273,9 +279,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3d131abb183f80c450d4591dc784f8d7750c50c6e2bc3fcaad148afc8361271" +checksum = "9c8e372ed52bd4ee88cc1e6c3859aa7ecea204158ac640b10e187936e7e87074" dependencies = [ "arrow-array", "arrow-buffer", @@ -295,9 +301,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2275877a0e5e7e7c76954669366c2aa1a829e340ab1f612e647507860906fb6b" +checksum = "8e4100b729fe656f2e4fb32bc5884f14acf9118d4ad532b7b33c1132e4dce896" dependencies = [ "arrow-array", "arrow-cast", @@ -310,9 +316,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05738f3d42cb922b9096f7786f606fcb8669260c2640df8490533bb2fa38c9d3" +checksum = "bf87f4ff5fc13290aa47e499a8b669a82c5977c6a1fedce22c7f542c1fd5a597" dependencies = [ "arrow-buffer", "arrow-schema", @@ -323,9 +329,9 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b5f57c3d39d1b1b7c1376a772ea86a131e7da310aed54ebea9363124bb885e3" +checksum = "f63654f21676be802d446c6c4bc54f6a47e18d55f9ae6f7195a6f6faf2ecdbeb" dependencies = [ "arrow-array", "arrow-buffer", @@ -343,9 +349,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d09446e8076c4b3f235603d9ea7c5494e73d441b01cd61fb33d7254c11964b3" +checksum = "eb3ca63edd2073fcb42ba112f8ae165df1de935627ead6e203d07c99445f2081" dependencies = [ "arrow-array", "arrow-buffer", @@ -359,9 +365,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "371ffd66fa77f71d7628c63f209c9ca5341081051aa32f9c8020feb0def787c0" +checksum = "a36b2332559d3310ebe3e173f75b29989b4412df4029a26a30cc3f7da0869297" dependencies = [ "arrow-array", "arrow-buffer", @@ -383,9 +389,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbc94fc7adec5d1ba9e8cd1b1e8d6f72423b33fe978bf1f46d970fafab787521" +checksum = "13c4e0530272ca755d6814218dffd04425c5b7854b87fa741d5ff848bf50aa39" dependencies = [ "arrow-array", "arrow-buffer", @@ -396,9 +402,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "169676f317157dc079cc5def6354d16db63d8861d61046d2f3883268ced6f99f" +checksum = "b07f52788744cc71c4628567ad834cadbaeb9f09026ff1d7a4120f69edf7abd3" dependencies = [ "arrow-array", "arrow-buffer", @@ -409,9 +415,9 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d27609cd7dd45f006abae27995c2729ef6f4b9361cde1ddd019dc31a5aa017e0" +checksum = "6bb63203e8e0e54b288d0d8043ca8fa1013820822a27692ef1b78a977d879f2c" dependencies = [ "serde_core", "serde_json", @@ -419,9 +425,9 @@ dependencies = [ [[package]] name = "arrow-select" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae980d021879ea119dd6e2a13912d81e64abed372d53163e804dfe84639d8010" +checksum = "c96d8a1c180b44ecf2e66c9a2f2bbcb8b1b6f14e165ce46ac8bde211a363411b" dependencies = [ "ahash", "arrow-array", @@ -433,9 +439,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf35e8ef49dcf0c5f6d175edee6b8af7b45611805333129c541a8b89a0fc0534" +checksum = "a8ad6a81add9d3ea30bf8374ee8329992c7fd246ffd8b7e2f48a3cea5aa0cc9a" dependencies = [ "arrow-array", "arrow-buffer", @@ -512,7 +518,7 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ - "hermit-abi", + "hermit-abi 0.1.19", "libc", "winapi", ] @@ -535,7 +541,7 @@ dependencies = [ "aws-sdk-ssooidc", "aws-sdk-sts", "aws-smithy-async", - "aws-smithy-http", + "aws-smithy-http 0.62.6", "aws-smithy-json", "aws-smithy-runtime", "aws-smithy-runtime-api", @@ -596,7 +602,7 @@ dependencies = [ "aws-credential-types", "aws-sigv4", "aws-smithy-async", - "aws-smithy-http", + "aws-smithy-http 0.62.6", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -613,14 +619,14 @@ dependencies = [ [[package]] name = "aws-sdk-ec2" -version = "1.205.0" +version = "1.206.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac63b89f6395b1acd82bcf5b39173ff7b2528310b9fc020004e2830845a48b32" +checksum = "45900e23181a7783ee0a247ccf255070b07d112968d2dc0dfc6b1facd6871c11" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", - "aws-smithy-http", + "aws-smithy-http 0.62.6", "aws-smithy-json", "aws-smithy-observability", "aws-smithy-query", @@ -644,7 +650,7 @@ dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", - "aws-smithy-http", + "aws-smithy-http 0.62.6", "aws-smithy-json", "aws-smithy-observability", "aws-smithy-runtime", @@ -667,7 +673,7 @@ dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", - "aws-smithy-http", + "aws-smithy-http 0.62.6", "aws-smithy-json", "aws-smithy-observability", "aws-smithy-runtime", @@ -690,7 +696,7 @@ dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", - "aws-smithy-http", + "aws-smithy-http 0.62.6", "aws-smithy-json", "aws-smithy-observability", "aws-smithy-query", @@ -712,7 +718,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69e523e1c4e8e7e8ff219d732988e22bfeae8a1cafdbe6d9eca1546fa080be7c" dependencies = [ "aws-credential-types", - "aws-smithy-http", + "aws-smithy-http 0.62.6", "aws-smithy-runtime-api", "aws-smithy-types", "bytes", @@ -729,9 +735,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "1.2.8" +version = "1.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9330762ee48c6cecfad2cb37b1506c16c8e858c90638eda2b1a7272b56f88bd5" +checksum = "52eec3db979d18cb807fc1070961cc51d87d069abe9ab57917769687368a8c6c" dependencies = [ "futures-util", "pin-project-lite", @@ -759,11 +765,32 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws-smithy-http" +version = "0.63.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630e67f2a31094ffa51b210ae030855cb8f3b7ee1329bdd8d085aaf61e8b97fc" +dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + [[package]] name = "aws-smithy-http-client" -version = "1.1.6" +version = "1.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec918f18147cec121cb142a91b0038f66d99bbe903e585dccf871920e90b22ab" +checksum = "12fb0abf49ff0cab20fd31ac1215ed7ce0ea92286ba09e2854b42ba5cabe7525" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -800,18 +827,18 @@ dependencies = [ [[package]] name = "aws-smithy-observability" -version = "0.2.1" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a700a7702874cd78b85fecdc9f64f3f72eb22fb713791cb445bcfd2a15bc1ecf" +checksum = "c0a46543fbc94621080b3cf553eb4cbbdc41dd9780a30c4756400f0139440a1d" dependencies = [ "aws-smithy-runtime-api", ] [[package]] name = "aws-smithy-query" -version = "0.60.10" +version = "0.60.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adc4a6cdc289a37be7fddb7f4365448187d62c603a40e6d46d13c68e5e81900f" +checksum = "0cebbddb6f3a5bd81553643e9c7daf3cc3dc5b0b5f398ac668630e8a84e6fff0" dependencies = [ "aws-smithy-types", "urlencoding", @@ -819,12 +846,12 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.9.8" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb5b6167fcdf47399024e81ac08e795180c576a20e4d4ce67949f9a88ae37dc1" +checksum = "f3df87c14f0127a0d77eb261c3bc45d5b4833e2a1f63583ebfb728e4852134ee" dependencies = [ "aws-smithy-async", - "aws-smithy-http", + "aws-smithy-http 0.63.3", "aws-smithy-http-client", "aws-smithy-observability", "aws-smithy-runtime-api", @@ -835,6 +862,7 @@ dependencies = [ "http 1.4.0", "http-body 0.4.6", "http-body 1.0.1", + "http-body-util", "pin-project-lite", "pin-utils", "tokio", @@ -843,9 +871,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.11.0" +version = "1.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c47b1e62accf759b01aba295e40479d1ba8fb77c2a54f0fed861c809ca49761" +checksum = "49952c52f7eebb72ce2a754d3866cc0f87b97d2a46146b79f80f3a93fb2b3716" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -860,9 +888,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.4.0" +version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2d447863bdec38c899e5753a48c0abcf590f3ec629e257ad5a9ef8806ad7714" +checksum = "3b3a26048eeab0ddeba4b4f9d51654c79af8c3b32357dc5f336cee85ab331c33" dependencies = [ "base64-simd", "bytes", @@ -1173,9 +1201,9 @@ checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" [[package]] name = "bytemuck" -version = "1.24.0" +version = "1.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fbdf580320f38b612e485521afda1ee26d10cc9884efaaa750d383e13e3c5f4" +checksum = "c8efb64bd706a16a1bdde310ae86b351e4d21550d98d056f22f8a7f7a2183fec" [[package]] name = "byteorder" @@ -1208,6 +1236,12 @@ dependencies = [ "libbz2-rs-sys", ] +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "castaway" version = "0.2.4" @@ -1219,9 +1253,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.54" +version = "1.2.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6354c81bbfd62d9cfa9cb3c773c2b7b2a3a482d569de977fd0e961f6e7c00583" +checksum = "47b26a0954ae34af09b50f0de26458fa95369a0d478d8236d3f93082b219bd29" dependencies = [ "find-msvc-tools", "jobserver", @@ -1265,6 +1299,33 @@ dependencies = [ "phf 0.12.1", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "cipher" version = "0.4.4" @@ -1547,6 +1608,42 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap 4.5.56", + "criterion-plot", + "is-terminal", + "itertools 0.10.5", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools 0.10.5", +] + [[package]] name = "crossbeam-channel" version = "0.5.15" @@ -1556,6 +1653,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + [[package]] name = "crossbeam-epoch" version = "0.9.18" @@ -1565,6 +1672,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -1733,7 +1849,7 @@ dependencies = [ "datafusion-sql", "flate2", "futures", - "itertools", + "itertools 0.14.0", "liblzma", "log", "object_store", @@ -1767,7 +1883,7 @@ dependencies = [ "datafusion-physical-plan", "datafusion-session", "futures", - "itertools", + "itertools 0.14.0", "log", "object_store", "parking_lot", @@ -1792,7 +1908,7 @@ dependencies = [ "datafusion-physical-expr-common", "datafusion-physical-plan", "futures", - "itertools", + "itertools 0.14.0", "log", "object_store", ] @@ -1886,7 +2002,7 @@ dependencies = [ "flate2", "futures", "glob", - "itertools", + "itertools 0.14.0", "liblzma", "log", "object_store", @@ -1916,7 +2032,7 @@ dependencies = [ "datafusion-physical-plan", "datafusion-session", "futures", - "itertools", + "itertools 0.14.0", "object_store", "tokio", ] @@ -2008,7 +2124,7 @@ dependencies = [ "datafusion-pruning", "datafusion-session", "futures", - "itertools", + "itertools 0.14.0", "log", "object_store", "parking_lot", @@ -2027,6 +2143,7 @@ dependencies = [ "async-trait", "bytes", "chrono", + "crossbeam-queue", "dashmap", "datafusion", "datafusion-proto", @@ -2035,7 +2152,7 @@ dependencies = [ "http 1.4.0", "hyper-util", "insta", - "itertools", + "itertools 0.14.0", "moka", "object_store", "parquet", @@ -2072,6 +2189,7 @@ dependencies = [ "built", "chrono", "clap 4.5.56", + "criterion", "dashmap", "datafusion", "datafusion-distributed", @@ -2087,6 +2205,7 @@ dependencies = [ "serde", "serde_json", "structopt", + "sysinfo", "tokio", "tonic", "url", @@ -2170,7 +2289,7 @@ dependencies = [ "datafusion-functions-window-common", "datafusion-physical-expr-common", "indexmap", - "itertools", + "itertools 0.14.0", "paste", "recursive", "serde_json", @@ -2186,7 +2305,7 @@ dependencies = [ "arrow", "datafusion-common", "indexmap", - "itertools", + "itertools 0.14.0", "paste", ] @@ -2210,7 +2329,7 @@ dependencies = [ "datafusion-expr-common", "datafusion-macros", "hex", - "itertools", + "itertools 0.14.0", "log", "md-5", "num-traits", @@ -2273,7 +2392,7 @@ dependencies = [ "datafusion-functions-aggregate-common", "datafusion-macros", "datafusion-physical-expr-common", - "itertools", + "itertools 0.14.0", "log", "paste", ] @@ -2346,7 +2465,7 @@ dependencies = [ "datafusion-expr-common", "datafusion-physical-expr", "indexmap", - "itertools", + "itertools 0.14.0", "log", "recursive", "regex", @@ -2369,7 +2488,7 @@ dependencies = [ "half", "hashbrown 0.16.1", "indexmap", - "itertools", + "itertools 0.14.0", "parking_lot", "paste", "petgraph", @@ -2389,7 +2508,7 @@ dependencies = [ "datafusion-functions", "datafusion-physical-expr", "datafusion-physical-expr-common", - "itertools", + "itertools 0.14.0", ] [[package]] @@ -2405,7 +2524,7 @@ dependencies = [ "datafusion-expr-common", "hashbrown 0.16.1", "indexmap", - "itertools", + "itertools 0.14.0", "parking_lot", ] @@ -2424,7 +2543,7 @@ dependencies = [ "datafusion-physical-expr-common", "datafusion-physical-plan", "datafusion-pruning", - "itertools", + "itertools 0.14.0", "recursive", ] @@ -2452,7 +2571,7 @@ dependencies = [ "half", "hashbrown 0.16.1", "indexmap", - "itertools", + "itertools 0.14.0", "log", "parking_lot", "pin-project-lite", @@ -2510,7 +2629,7 @@ dependencies = [ "datafusion-physical-expr", "datafusion-physical-expr-common", "datafusion-physical-plan", - "itertools", + "itertools 0.14.0", "log", ] @@ -2827,9 +2946,9 @@ dependencies = [ [[package]] name = "find-msvc-tools" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8591b0bcc8a98a64310a2fae1bb3e9b8564dd10e381e6e28010fde8e8e8568db" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" [[package]] name = "finl_unicode" @@ -3052,9 +3171,9 @@ checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7" [[package]] name = "git2" -version = "0.20.3" +version = "0.20.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e2b37e2f62729cdada11f0e6b3b6fe383c69c29fc619e391223e12856af308c" +checksum = "7b88256088d75a56f8ecfa070513a775dd9107f6530ef14919dac831af9cfe2b" dependencies = [ "bitflags 2.10.0", "libc", @@ -3169,6 +3288,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "hex" version = "0.4.3" @@ -3376,14 +3501,13 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.19" +version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" +checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" dependencies = [ "base64", "bytes", "futures-channel", - "futures-core", "futures-util", "http 1.4.0", "http-body 1.0.1", @@ -3412,7 +3536,7 @@ dependencies = [ "js-sys", "log", "wasm-bindgen", - "windows-core", + "windows-core 0.62.2", ] [[package]] @@ -3568,9 +3692,9 @@ dependencies = [ [[package]] name = "insta" -version = "1.46.1" +version = "1.46.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "248b42847813a1550dafd15296fd9748c651d0c32194559dbc05d804d54b21e8" +checksum = "e82db8c87c7f1ccecb34ce0c24399b8a73081427f3c7c50a5d597925356115e4" dependencies = [ "console", "once_cell", @@ -3614,12 +3738,32 @@ dependencies = [ "serde", ] +[[package]] +name = "is-terminal" +version = "0.4.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" +dependencies = [ + "hermit-abi 0.5.2", + "libc", + "windows-sys 0.61.2", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.14.0" @@ -4094,6 +4238,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "ntapi" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c70f219e21142367c70c0b30c6a9e3a14d55b4d12a204d897fbec83a0363f081" +dependencies = [ + "winapi", +] + [[package]] name = "num-bigint" version = "0.4.6" @@ -4184,7 +4337,7 @@ dependencies = [ "http-body-util", "humantime", "hyper 1.8.1", - "itertools", + "itertools 0.14.0", "md-5", "parking_lot", "percent-encoding", @@ -4217,6 +4370,12 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" +[[package]] +name = "oorandom" +version = "11.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" + [[package]] name = "openssl" version = "0.10.75" @@ -4344,9 +4503,9 @@ dependencies = [ [[package]] name = "parquet" -version = "57.1.0" +version = "57.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be3e4f6d320dd92bfa7d612e265d7d08bba0a240bab86af3425e1d255a511d89" +checksum = "5f6a2926a30477c0b95fea6c28c3072712b139337a242c2cc64817bdc20a8854" dependencies = [ "ahash", "arrow-array", @@ -4565,17 +4724,45 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "portable-atomic" -version = "1.13.0" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f89776e4d69bb58bc6993e99ffa1d11f228b839984854c7daeb5d37f87cbe950" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" [[package]] name = "portable-atomic-util" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" +checksum = "7a9db96d7fa8782dd8c15ce32ffe8680bbd1e978a43bf51a34d39483540495f5" dependencies = [ "portable-atomic", ] @@ -4680,7 +4867,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", - "itertools", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.114", @@ -4869,7 +5056,7 @@ dependencies = [ "compact_str", "hashbrown 0.16.1", "indoc", - "itertools", + "itertools 0.14.0", "kasuari", "lru", "strum", @@ -4921,7 +5108,7 @@ dependencies = [ "hashbrown 0.16.1", "indoc", "instability", - "itertools", + "itertools 0.14.0", "line-clipping", "ratatui-core", "strum", @@ -4930,6 +5117,26 @@ dependencies = [ "unicode-width 0.2.2", ] +[[package]] +name = "rayon" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "recursive" version = "0.1.1" @@ -5472,9 +5679,9 @@ checksum = "c1e9a774a6c28142ac54bb25d25562e6bcf957493a184f15ad4eebccb23e410a" [[package]] name = "slab" -version = "0.4.11" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" [[package]] name = "smallvec" @@ -5660,11 +5867,26 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "sysinfo" +version = "0.30.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a5b4ddaee55fb2bea2bf0e5000747e5f5c0de765e5a5ff87f4cd106439f4bb3" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "windows", +] + [[package]] name = "system-configuration" -version = "0.6.1" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" +checksum = "a13f3d0daba03132c0aa9767f98351b3488edc2c100cda2d2ec2b04f3d8d3c8b" dependencies = [ "bitflags 2.10.0", "core-foundation 0.9.4", @@ -5916,6 +6138,16 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.10.0" @@ -5998,6 +6230,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] @@ -6212,7 +6445,7 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16b380a1238663e5f8a691f9039c73e1cdae598a30e9855f541d29b08b53e9a5" dependencies = [ - "itertools", + "itertools 0.14.0", "unicode-segmentation", "unicode-width 0.2.2", ] @@ -6546,6 +6779,25 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +dependencies = [ + "windows-core 0.52.0", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-core" version = "0.62.2" @@ -6830,18 +7082,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.35" +version = "0.8.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdea86ddd5568519879b8187e1cf04e24fce28f7fe046ceecbce472ff19a2572" +checksum = "7456cf00f0685ad319c5b1693f291a650eaf345e941d082fc4e03df8a03996ac" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.35" +version = "0.8.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c15e1b46eff7c6c91195752e0eeed8ef040e391cdece7c25376957d5f15df22" +checksum = "1328722bbf2115db7e19d69ebcc15e795719e2d66b60827c6a69a117365e37a0" dependencies = [ "proc-macro2", "quote", @@ -6957,9 +7209,9 @@ checksum = "40990edd51aae2c2b6907af74ffb635029d5788228222c4bb811e9351c0caad3" [[package]] name = "zmij" -version = "1.0.17" +version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02aae0f83f69aafc94776e879363e9771d7ecbffe2c7fbb6c14c5e00dfe88439" +checksum = "3ff05f8caa9038894637571ae6b9e29466c1f4f829d26c9b28f869a29cbe3445" [[package]] name = "zopfli" diff --git a/Cargo.toml b/Cargo.toml index 159823d0..c2ece5b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,9 +38,10 @@ rand = "0.9" object_store = "0.12.4" bytes = "1.11" pin-project = "1.1.10" -tokio-stream = "0.1.17" +tokio-stream = { version = "0.1.17", features = ["sync"] } tokio-util = "0.7" moka = { version = "0.12", features = ["sync", "future"] } +crossbeam-queue = "0.3" sketches-ddsketch = "0.3.0" # integration_tests deps @@ -80,7 +81,7 @@ tpchgen = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a91 tpchgen-arrow = { git = "https://github.com/clflushopt/tpchgen-rs", rev = "e83365a5a9101906eb9f78c5607b83bc59849acf" } parquet = "57.1.0" arrow = "57.1.0" -tokio-stream = "0.1.17" +tokio-stream = { version = "0.1.17", features = ["sync"] } hyper-util = "0.1.16" pretty_assertions = "1.4" reqwest = "0.12" diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 5fed50e4..3127cb4e 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -31,6 +31,10 @@ openssl = { version = "0.10", features = ["vendored"] } clap = "4.5" mimalloc = "0.1" +[dev-dependencies] +criterion = "0.5" +sysinfo = "0.30" + [build-dependencies] built = { version = "0.8" , features = ["git2", "chrono"]} @@ -41,3 +45,7 @@ path = "src/main.rs" [[bin]] name = "worker" path = "cdk/bin/worker.rs" + +[[bench]] +name = "broadcast_cache_scenarios" +harness = false diff --git a/benchmarks/benches/broadcast_cache_scenarios.rs b/benchmarks/benches/broadcast_cache_scenarios.rs new file mode 100644 index 00000000..24324c29 --- /dev/null +++ b/benchmarks/benches/broadcast_cache_scenarios.rs @@ -0,0 +1,436 @@ +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use datafusion::arrow::array::UInt8Array; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::common::Statistics; +use datafusion::error::Result; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, +}; +use datafusion::prelude::SessionContext; +use datafusion_distributed::BroadcastExec; +use futures::{StreamExt, stream}; +use std::any::Any; +use std::sync::{ + Arc, + atomic::{AtomicBool, AtomicU64, Ordering}, +}; +use std::thread; +use std::time::{Duration, Instant}; +use sysinfo::{System, get_current_pid}; +use tokio::runtime::Builder as RuntimeBuilder; + +#[derive(Clone, Copy)] +enum ConsumerBehavior { + Fast, + Slow(Duration), + CancelAfter(usize), +} + +#[derive(Clone)] +struct ConsumerSpec { + partition: usize, + behavior: ConsumerBehavior, +} + +#[derive(Clone)] +struct Scenario { + name: &'static str, + input_partitions: usize, + consumer_tasks: usize, + rows_per_batch: usize, + num_batches: usize, + consumers: Vec, +} + +struct PeakRssSampler { + stop: Arc, + peak_kb: Arc, + handle: Option>, +} + +impl PeakRssSampler { + fn start(interval: Duration) -> Self { + let stop = Arc::new(AtomicBool::new(false)); + let peak_kb = Arc::new(AtomicU64::new(0)); + let stop_clone = Arc::clone(&stop); + let peak_clone = Arc::clone(&peak_kb); + let handle = thread::spawn(move || { + let mut sys = System::new(); + let pid = get_current_pid().expect("pid"); + while !stop_clone.load(Ordering::Relaxed) { + sys.refresh_process(pid); + if let Some(proc) = sys.process(pid) { + let mem_kb = proc.memory(); + peak_clone.fetch_max(mem_kb, Ordering::Relaxed); + } + thread::sleep(interval); + } + }); + Self { + stop, + peak_kb, + handle: Some(handle), + } + } + + fn stop(mut self) -> u64 { + self.stop.store(true, Ordering::Relaxed); + if let Some(handle) = self.handle.take() { + let _ = handle.join(); + } + self.peak_kb.load(Ordering::Relaxed) + } +} + +#[derive(Debug)] +struct SyntheticExec { + schema: SchemaRef, + partitions: usize, + batches: Arc>>, + properties: PlanProperties, +} + +impl SyntheticExec { + fn new(schema: SchemaRef, partitions: usize, batches: Arc>>) -> Self { + let properties = PlanProperties::new( + EquivalenceProperties::new(Arc::clone(&schema)), + Partitioning::UnknownPartitioning(partitions), + EmissionType::Incremental, + Boundedness::Bounded, + ); + Self { + schema, + partitions, + batches, + properties, + } + } +} + +impl DisplayAs for SyntheticExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => write!(f, "SyntheticExec"), + DisplayFormatType::TreeRender => write!(f, ""), + } + } +} + +impl ExecutionPlan for SyntheticExec { + fn name(&self) -> &str { + "SyntheticExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + unimplemented!() + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> Result { + assert!(partition < self.partitions); + let schema = Arc::clone(&self.schema); + let batches = Arc::clone(&self.batches); + let len = batches.len(); + + let stream = stream::iter((0..len).map(move |idx| { + let batch = &batches[idx]; + Ok(batch.as_ref().clone()) + })); + + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } + + fn statistics(&self) -> Result { + Ok(Statistics::new_unknown(&self.schema)) + } + + fn partition_statistics(&self, _partition: Option) -> Result { + Ok(Statistics::new_unknown(&self.schema)) + } +} + +async fn consume_partition( + plan: Arc, + partition: usize, + task_ctx: Arc, + behavior: ConsumerBehavior, +) -> Result<()> { + let mut stream = plan.execute(partition, task_ctx)?; + let mut seen = 0usize; + while let Some(batch) = stream.next().await { + let _ = batch?; + seen += 1; + match behavior { + ConsumerBehavior::Fast => {} + ConsumerBehavior::Slow(delay) => tokio::time::sleep(delay).await, + ConsumerBehavior::CancelAfter(limit) => { + if seen >= limit { + break; + } + } + } + } + Ok(()) +} + +async fn run_scenario( + scenario: &Scenario, + schema: Arc, + batches: Arc>>, + task_ctx: Arc, + sample_rss: bool, +) -> Result<(Duration, u64)> { + let input: Arc = Arc::new(SyntheticExec::new( + Arc::clone(&schema), + scenario.input_partitions, + batches, + )); + + let broadcast = Arc::new(BroadcastExec::new( + Arc::clone(&input), + scenario.consumer_tasks, + )); + + let sampler = sample_rss.then(|| PeakRssSampler::start(Duration::from_millis(25))); + let start = Instant::now(); + + let mut join_set = tokio::task::JoinSet::new(); + for consumer in &scenario.consumers { + let plan = Arc::clone(&broadcast) as Arc; + let task_ctx = Arc::clone(&task_ctx); + let behavior = consumer.behavior; + let partition = consumer.partition; + join_set.spawn(async move { consume_partition(plan, partition, task_ctx, behavior).await }); + } + + while let Some(res) = join_set.join_next().await { + let res = + res.map_err(|err| datafusion::error::DataFusionError::Execution(err.to_string()))?; + res?; + } + + let elapsed = start.elapsed(); + let peak_kb = sampler.map(|s| s.stop()).unwrap_or(0); + Ok((elapsed, peak_kb)) +} + +fn all_fast_consumers(output_partitions: usize) -> Vec { + (0..output_partitions) + .map(|partition| ConsumerSpec { + partition, + behavior: ConsumerBehavior::Fast, + }) + .collect() +} + +fn scenario_matrix() -> Vec { + let input_partitions = 16; + let consumer_tasks = 4; + let output_partitions = input_partitions * consumer_tasks; + + let mut scenarios = vec![ + // All consumers read at full speed. Baseline overhead without cancellations or lag. + // Example: Broadcast of a small dimension table, everything goes smoothly. + Scenario { + name: "all_fast", + input_partitions, + consumer_tasks, + rows_per_batch: 500_000, + num_batches: 100, + consumers: all_fast_consumers(output_partitions), + }, + // One consumer is slow, creating backpressure or queue buildup. + // Example: Broadcast build side for a join where one probe task is a straggler + // due to skewed partition. + Scenario { + name: "one_slow", + input_partitions, + consumer_tasks, + rows_per_batch: 500_000, + num_batches: 100, + consumers: { + let mut consumers = all_fast_consumers(output_partitions); + if let Some(first) = consumers.first_mut() { + first.behavior = ConsumerBehavior::Slow(Duration::from_millis(2)); + } + consumers + }, + }, + // One consumer cancels early (TopK/LIMIT finishes upstream, task killed). + // Example: SELECT ... FROM store_sales JOIN store ON ... ORDER BY ... LIMIT 100. + // The broadcast build side (store) is still fully materialized, but the LIMIT + // above the join can cancel probe-side consumers once enough rows are produced. + Scenario { + name: "one_cancel", + input_partitions, + consumer_tasks, + rows_per_batch: 500_000, + num_batches: 100, + consumers: { + let mut consumers = all_fast_consumers(output_partitions); + if let Some(first) = consumers.first_mut() { + first.behavior = ConsumerBehavior::CancelAfter(5); + } + consumers + }, + }, + // One virtual partition is never executed (no consumer spawned). + // Example: Fault tolerance, a consumer task fails. + Scenario { + name: "unused_partition", + input_partitions, + consumer_tasks, + rows_per_batch: 500_000, + num_batches: 100, + consumers: { + let mut consumers = all_fast_consumers(output_partitions); + consumers.pop(); + consumers + }, + }, + ]; + + let many_consumers = 16usize; + let many_output = input_partitions * many_consumers; + scenarios.push(Scenario { + name: "many_consumers", + input_partitions, + consumer_tasks: many_consumers, + rows_per_batch: 8192, + num_batches: 100, + consumers: all_fast_consumers(many_output), + }); + + scenarios +} + +fn verbose_enabled() -> bool { + match std::env::var("BROADCAST_BENCH_VERBOSE") { + Ok(val) => { + let val = val.to_ascii_lowercase(); + val == "1" || val == "true" || val == "yes" + } + Err(_) => false, + } +} + +fn rss_enabled() -> bool { + match std::env::var("BROADCAST_BENCH_RSS") { + Ok(val) => { + let val = val.to_ascii_lowercase(); + val == "1" || val == "true" || val == "yes" + } + Err(_) => false, + } +} + +fn runtime_threads() -> Option { + std::env::var("BROADCAST_BENCH_THREADS") + .ok() + .and_then(|val| val.parse::().ok()) + .filter(|threads| *threads > 0) +} + +fn bench_broadcast_cache(c: &mut Criterion) { + let mut rt_builder = RuntimeBuilder::new_multi_thread(); + if let Some(threads) = runtime_threads() { + rt_builder.worker_threads(threads); + } + let rt = rt_builder.enable_all().build().expect("tokio runtime"); + + let mut group = c.benchmark_group("broadcast_cache_scenarios"); + group.sample_size(10); + let verbose = verbose_enabled(); + let sample_rss = rss_enabled(); + let task_ctx = SessionContext::new().task_ctx(); + + for scenario in scenario_matrix() { + let schema = Arc::new(Schema::new(vec![Field::new( + "bytes", + DataType::UInt8, + false, + )])); + let batches = (0..scenario.num_batches) + .map(|_| { + let data = vec![0u8; scenario.rows_per_batch]; + let array = UInt8Array::from(data); + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(array)]) + .expect("batch"); + Arc::new(batch) + }) + .collect::>(); + let batches = Arc::new(batches); + + group.bench_function(BenchmarkId::new("scenario", scenario.name), |b| { + b.iter_custom(|iters| { + let mut total = Duration::ZERO; + let mut peaks = Vec::with_capacity(iters as usize); + for i in 0..iters { + let (elapsed, peak_kb) = rt + .block_on(run_scenario( + &scenario, + Arc::clone(&schema), + Arc::clone(&batches), + Arc::clone(&task_ctx), + sample_rss, + )) + .expect("scenario"); + if verbose || sample_rss { + eprintln!( + "scenario={} iter={} peak_rss_kb={} elapsed_ms={}", + scenario.name, + i, + peak_kb, + elapsed.as_millis() + ); + } + peaks.push(peak_kb); + total += elapsed; + } + if sample_rss && !peaks.is_empty() { + peaks.sort_unstable(); + let min = peaks[0]; + let max = peaks[peaks.len() - 1]; + let median = peaks[peaks.len() / 2]; + eprintln!( + "scenario={} peak_rss_kb[min/median/max]={}/{}/{} runs={}", + scenario.name, + min, + median, + max, + peaks.len() + ); + } + total + }); + }); + } + + group.finish(); +} + +criterion_group!(benches, bench_broadcast_cache); +criterion_main!(benches); diff --git a/src/execution_plans/broadcast.rs b/src/execution_plans/broadcast.rs index 22433ece..915a5e70 100644 --- a/src/execution_plans/broadcast.rs +++ b/src/execution_plans/broadcast.rs @@ -1,17 +1,21 @@ use crate::common::require_one_child; -use datafusion::arrow::array::RecordBatch; +use crossbeam_queue::SegQueue; use datafusion::arrow::datatypes::SchemaRef; -use datafusion::error::Result; +use datafusion::common::runtime::SpawnedTask; +use datafusion::error::{DataFusionError, Result}; +use datafusion::execution::memory_pool::MemoryConsumer; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, internal_err, }; -use futures::stream; +use futures::{Stream, StreamExt}; use std::any::Any; use std::fmt::Formatter; -use std::sync::Arc; -use tokio::sync::OnceCell; +use std::pin::Pin; +use std::sync::{Arc, Mutex, OnceLock}; +use std::task::{Context, Poll}; +use tokio_stream::wrappers::WatchStream; /// [ExecutionPlan] that scales up partitions for network broadcasting. /// @@ -69,9 +73,11 @@ pub struct BroadcastExec { input: Arc, consumer_task_count: usize, properties: PlanProperties, - cached_batches: Vec>>>>, + queues: Vec>>>, } +type StreamAndTask = (SegQueue, Arc>); + impl BroadcastExec { pub fn new(input: Arc, consumer_task_count: usize) -> Self { let input_partition_count = input.properties().partitioning.partition_count(); @@ -82,15 +88,15 @@ impl BroadcastExec { .clone() .with_partitioning(Partitioning::UnknownPartitioning(output_partition_count)); - let cached_batches = (0..input_partition_count) - .map(|_| Arc::new(OnceCell::new())) + let queues = (0..input_partition_count) + .map(|_| OnceLock::new()) .collect(); Self { input, consumer_task_count, properties, - cached_batches, + queues, } } @@ -149,35 +155,56 @@ impl ExecutionPlan for BroadcastExec { context: Arc, ) -> Result { let real_partition = partition % self.input_partition_count(); - let cache = Arc::clone(&self.cached_batches[real_partition]); + let input = Arc::clone(&self.input); - let schema = self.schema(); - - // TODO: Stream batches as they're produced instead of collect-then-emit. Currently we - // wait for all batches before consumers receive any. Streaming would allow overlapping - // production with network transfer. - // - // Challenges: late subscribers must replay from buffer since tokio::sync::broadcast drops old messages, - // need proper error propagation to all consumers and backpressure handling. - let stream = futures::stream::once(async move { - let batches = cache - .get_or_try_init(|| async { - let stream = input.execute(real_partition, context)?; - let batches: Vec = - futures::TryStreamExt::try_collect(stream).await?; - Ok::<_, datafusion::error::DataFusionError>(Arc::new(batches)) - }) - .await?; - let batches = Arc::clone(batches); - let batches_vec: Vec = batches.iter().cloned().collect(); - Ok::<_, datafusion::error::DataFusionError>(stream::iter( - batches_vec.into_iter().map(Ok), - )) - }); + let queue_or_err = self.queues[real_partition].get_or_init(|| { + let queue = BroadcastQueue::new(); + let consumers = SegQueue::new(); + for _ in 0..self.consumer_task_count { + consumers.push(Box::pin(RecordBatchStreamAdapter::new( + self.schema(), + queue.new_consumer().map(|msg| match msg { + Ok((batch, _reservation)) => Ok(batch), + Err(e) => Err(DataFusionError::Shared(e)), + }), + )) as SendableRecordBatchStream); + } + + let pool = Arc::clone(context.memory_pool()); + let mut stream = input.execute(real_partition, context).map_err(Arc::new)?; + let task = SpawnedTask::spawn(async move { + let mem_consumer = MemoryConsumer::new(format!("BroadcastExec[{real_partition}]")); + + while let Some(msg) = stream.next().await { + match msg { + Ok(record_batch) => { + let mut reservation = mem_consumer.clone_with_new_id().register(&pool); + reservation.grow(record_batch.get_array_memory_size()); + queue.push(Ok((record_batch, Arc::new(reservation)))); + } + Err(err) => { + queue.push(Err(Arc::new(err))); + break; + } + } + } + }); + + Ok::<_, Arc>((consumers, Arc::new(task))) + }); + let (consumer, task) = match queue_or_err { + Ok((consumers, task)) => (consumers.pop(), Arc::clone(task)), + Err(err) => return Err(DataFusionError::Shared(Arc::clone(err))), + }; + let Some(consumer) = consumer else { + return internal_err!("Too many consumers for real partition {real_partition}"); + }; Ok(Box::pin(RecordBatchStreamAdapter::new( - schema, - futures::TryStreamExt::try_flatten(stream), + self.schema(), + consumer.inspect(move |_| { + let _ = &task; + }), ))) } @@ -185,3 +212,348 @@ impl ExecutionPlan for BroadcastExec { self.input.schema() } } + +#[derive(Debug, Clone, Copy)] +struct BroadcastState { + len: usize, + closed: bool, +} + +#[derive(Debug)] +struct BroadcastQueue { + entries: Arc>>, + notify: tokio::sync::watch::Sender, +} + +impl BroadcastQueue { + fn new() -> Self { + let (notify, _rx) = tokio::sync::watch::channel(BroadcastState { + len: 0, + closed: false, + }); + Self { + entries: Arc::new(Mutex::new(vec![])), + notify, + } + } + + fn new_consumer(&self) -> BroadcastConsumer { + let rx = self.notify.subscribe(); + let state = *rx.borrow(); + BroadcastConsumer { + index: 0, + entries: Arc::clone(&self.entries), + notify: WatchStream::new(rx), + state, + } + } + + fn push(&self, entry: T) { + let len = { + let mut entries = self.entries.lock().unwrap(); + entries.push(entry); + entries.len() + }; + let mut state = *self.notify.borrow(); + state.len = len; + let _ = self.notify.send(state); + } +} + +impl Drop for BroadcastQueue { + fn drop(&mut self) { + let mut state = *self.notify.borrow(); + state.closed = true; + let _ = self.notify.send(state); + } +} + +/// A consumer stream that reads from the broadcast queue. +struct BroadcastConsumer { + index: usize, + entries: Arc>>, + notify: WatchStream, + state: BroadcastState, +} + +impl Stream for BroadcastConsumer { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + if self.index < self.state.len { + let entry = self.entries.lock().unwrap().get(self.index).cloned(); + if let Some(v) = entry { + self.index += 1; + return Poll::Ready(Some(v)); + } + } + + if self.state.closed { + return Poll::Ready(None); + } + + match Pin::new(&mut self.notify).poll_next(cx) { + Poll::Ready(Some(state)) => { + self.state = state; + } + Poll::Ready(None) => { + self.state.closed = true; + } + Poll::Pending => return Poll::Pending, + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::mock_exec::MockExec; + use datafusion::arrow::array::Int32Array; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::arrow::record_batch::RecordBatch; + use datafusion::prelude::SessionContext; + use futures::StreamExt; + use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + use tokio::sync::Notify; + use tokio::time::{Duration, sleep}; + + fn assert_int32_batch_values(batch: &RecordBatch, expected: &[i32]) { + let values = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("int32 column"); + assert_eq!(values.len(), expected.len()); + for (idx, expected_value) in expected.iter().enumerate() { + assert_eq!(values.value(idx), *expected_value); + } + } + + #[tokio::test] + async fn broadcast_exec_reuses_queue_for_virtual_partitions() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let counts = Arc::new(vec![AtomicUsize::new(0)]); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![0]))], + )?; + let input = Arc::new( + MockExec::new_partitioned(vec![vec![Ok(batch)]], Arc::clone(&schema)) + .with_execute_counts(Arc::clone(&counts)), + ); + let broadcast = Arc::new(BroadcastExec::new(input, 2)); + + let ctx = SessionContext::new(); + let task_ctx = ctx.task_ctx(); + + let batches0 = + datafusion::physical_plan::common::collect(broadcast.execute(0, task_ctx.clone())?) + .await?; + let batches1 = + datafusion::physical_plan::common::collect(broadcast.execute(1, task_ctx)?).await?; + + // Only executes the partition once, second batch is read from the queue + assert_eq!(counts[0].load(Ordering::SeqCst), 1); + assert_eq!(batches0.len(), 1); + assert_eq!(batches1.len(), 1); + assert_eq!(batches0[0].num_rows(), 1); + assert_eq!(batches1[0].num_rows(), 1); + assert_int32_batch_values(&batches0[0], &[0]); + assert_int32_batch_values(&batches1[0], &[0]); + + Ok(()) + } + + #[tokio::test] + async fn broadcast_exec_maps_partitions_by_modulo() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let counts = Arc::new(vec![AtomicUsize::new(0), AtomicUsize::new(0)]); + let batch0 = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![0]))], + )?; + let batch1 = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![1]))], + )?; + let input = Arc::new( + MockExec::new_partitioned( + vec![vec![Ok(batch0)], vec![Ok(batch1)]], + Arc::clone(&schema), + ) + .with_execute_counts(Arc::clone(&counts)), + ); + let broadcast = Arc::new(BroadcastExec::new(input, 2)); + + let ctx = SessionContext::new(); + let task_ctx = ctx.task_ctx(); + + // Should map to real partition 0 + let batches0 = + datafusion::physical_plan::common::collect(broadcast.execute(0, task_ctx.clone())?) + .await?; + // Should map to real partition 1 + let batches1 = + datafusion::physical_plan::common::collect(broadcast.execute(1, task_ctx.clone())?) + .await?; + // Should map to real partition 0 + let batches2 = + datafusion::physical_plan::common::collect(broadcast.execute(2, task_ctx.clone())?) + .await?; + // Should map to real partition 1 + let batches3 = + datafusion::physical_plan::common::collect(broadcast.execute(3, task_ctx)?).await?; + + assert_eq!(counts[0].load(Ordering::SeqCst), 1); + assert_eq!(counts[1].load(Ordering::SeqCst), 1); + + assert_eq!(batches0.len(), 1); + assert_eq!(batches1.len(), 1); + assert_eq!(batches2.len(), 1); + assert_eq!(batches3.len(), 1); + assert_int32_batch_values(&batches0[0], &[0]); + assert_int32_batch_values(&batches1[0], &[1]); + assert_int32_batch_values(&batches2[0], &[0]); + assert_int32_batch_values(&batches3[0], &[1]); + + Ok(()) + } + + #[tokio::test] + async fn broadcast_exec_queue_survives_cancellation() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let execute_counts = Arc::new(vec![AtomicUsize::new(0)]); + let permit_open = Arc::new(AtomicBool::new(false)); + let permit_notify = Arc::new(Notify::new()); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + )?; + let input = Arc::new( + MockExec::new_partitioned(vec![vec![Ok(batch)]], Arc::clone(&schema)) + .with_execute_counts(Arc::clone(&execute_counts)) + .with_gate(Arc::clone(&permit_open), Arc::clone(&permit_notify)), + ); + + // Has two consumers that will execute the same real partition + let broadcast = Arc::new(BroadcastExec::new(input, 2)); + + let ctx = SessionContext::new(); + let task_ctx = ctx.task_ctx(); + + // Execute is called synchronously, so execute_counts should increment immediately + let mut stream1 = broadcast.execute(0, task_ctx.clone())?; + assert_eq!(execute_counts[0].load(Ordering::SeqCst), 1); + + let handle = tokio::spawn(async move { stream1.next().await }); + + // Cancel this consumer (simulates a cancellation like a TopK) + handle.abort(); + let _ = handle.await; + + // Execute with a different virtual partition but maps to same real partition and allow + // full execution + let stream2 = broadcast.execute(1, task_ctx)?; + permit_open.store(true, Ordering::SeqCst); + permit_notify.notify_waiters(); + + let batches: Vec = datafusion::physical_plan::common::collect(stream2).await?; + assert_eq!(batches.len(), 1); + assert_int32_batch_values(&batches[0], &[1, 2, 3]); + + // Partition should only be executed a single time, second stream should've pulled from + // queue + assert_eq!(execute_counts[0].load(Ordering::SeqCst), 1); + + Ok(()) + } + + #[tokio::test] + async fn broadcast_exec_continues_after_consumer_cancel() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let batches = vec![ + Ok(RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![0]))], + )?), + Ok(RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![1]))], + )?), + Ok(RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![2]))], + )?), + ]; + let input = Arc::new( + MockExec::new_partitioned(vec![batches], Arc::clone(&schema)) + .with_delay_between_batches(Duration::from_millis(10)), + ); + let broadcast = Arc::new(BroadcastExec::new(input, 2)); + + let ctx = SessionContext::new(); + let task_ctx = ctx.task_ctx(); + + let mut stream1 = broadcast.execute(0, task_ctx.clone())?; + let stream2 = broadcast.execute(1, task_ctx)?; + + let first = stream1.next().await.transpose()?.expect("first batch"); + assert_int32_batch_values(&first, &[0]); + drop(stream1); + + let batches: Vec = datafusion::physical_plan::common::collect(stream2).await?; + assert_eq!(batches.len(), 3); + assert_int32_batch_values(&batches[0], &[0]); + assert_int32_batch_values(&batches[1], &[1]); + assert_int32_batch_values(&batches[2], &[2]); + + Ok(()) + } + + #[tokio::test] + async fn broadcast_exec_replay_for_late_consumer() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let batches = vec![ + Ok(RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![0]))], + )?), + Ok(RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![1]))], + )?), + Ok(RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![2]))], + )?), + ]; + let input = Arc::new( + MockExec::new_partitioned(vec![batches], Arc::clone(&schema)) + .with_delay_between_batches(Duration::from_millis(10)), + ); + let broadcast = Arc::new(BroadcastExec::new(input, 2)); + + let ctx = SessionContext::new(); + let task_ctx = ctx.task_ctx(); + + let mut stream0 = broadcast.execute(0, task_ctx.clone())?; + let batch0 = stream0.next().await.transpose()?.expect("batch 0"); + assert_int32_batch_values(&batch0, &[0]); + let batch1 = stream0.next().await.transpose()?.expect("batch 1"); + assert_int32_batch_values(&batch1, &[1]); + + // Late consumer joins after producer has already emitted some batches. + sleep(Duration::from_millis(5)).await; + let stream1 = broadcast.execute(1, task_ctx)?; + let batches: Vec = datafusion::physical_plan::common::collect(stream1).await?; + assert_eq!(batches.len(), 3); + assert_int32_batch_values(&batches[0], &[0]); + assert_int32_batch_values(&batches[1], &[1]); + assert_int32_batch_values(&batches[2], &[2]); + + Ok(()) + } +} diff --git a/src/test_utils/mock_exec.rs b/src/test_utils/mock_exec.rs index a8d41266..52678be9 100644 --- a/src/test_utils/mock_exec.rs +++ b/src/test_utils/mock_exec.rs @@ -7,8 +7,14 @@ use datafusion::physical_plan::common::compute_record_batch_statistics; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter}; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; +use futures::{Stream, stream}; use std::any::Any; +use std::pin::Pin; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::time::Duration; +use tokio::sync::Notify; +use tokio::time::sleep; // Copied from https://github.com/apache/datafusion/blob/4b9a468cc1949062cf3cd8685ba8ced377fd212e/datafusion/physical-plan/src/test/exec.rs#L121 /// A Mock ExecutionPlan that can be used for writing tests of other @@ -16,11 +22,17 @@ use std::sync::Arc; #[derive(Debug)] pub struct MockExec { /// the results to send back - data: Vec>, + data: Vec>>, schema: SchemaRef, + partitions: usize, /// if true (the default), sends data using a separate task to ensure the /// batches are not available without this stream yielding first use_task: bool, + delay: Option, + start_notify: Option>, + permit_open: Option>, + permit_notify: Option>, + execute_counts: Option>>, cache: PlanProperties, } @@ -33,11 +45,38 @@ impl MockExec { /// ensure any poll loops are correct. This behavior can be /// changed with `with_use_task` pub fn new(data: Vec>, schema: SchemaRef) -> Self { - let cache = Self::compute_properties(Arc::clone(&schema)); + let cache = Self::compute_properties(Arc::clone(&schema), 1); + Self { + data: vec![data], + schema, + partitions: 1, + use_task: true, + delay: None, + start_notify: None, + permit_open: None, + permit_notify: None, + execute_counts: None, + cache, + } + } + + /// Create a new `MockExec` with per-partition data. + pub fn new_partitioned( + data: Vec>>, + schema: SchemaRef, + ) -> Self { + let partitions = data.len().max(1); + let cache = Self::compute_properties(Arc::clone(&schema), partitions); Self { data, schema, + partitions, use_task: true, + delay: None, + start_notify: None, + permit_open: None, + permit_notify: None, + execute_counts: None, cache, } } @@ -50,11 +89,36 @@ impl MockExec { self } + /// Adds a delay between emitted batches (simulates a slow producer). + pub fn with_delay_between_batches(mut self, delay: Duration) -> Self { + self.delay = Some(delay); + self + } + + /// Notify when execute is called (before emitting any batches). + pub fn with_start_notify(mut self, start_notify: Arc) -> Self { + self.start_notify = Some(start_notify); + self + } + + /// Block emission until `permit_open` is true (use with `permit_notify`). + pub fn with_gate(mut self, permit_open: Arc, permit_notify: Arc) -> Self { + self.permit_open = Some(permit_open); + self.permit_notify = Some(permit_notify); + self + } + + /// Track execute calls per partition (for replay/once-only assertions). + pub fn with_execute_counts(mut self, execute_counts: Arc>) -> Self { + self.execute_counts = Some(execute_counts); + self + } + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. - fn compute_properties(schema: SchemaRef) -> PlanProperties { + fn compute_properties(schema: SchemaRef, partitions: usize) -> PlanProperties { PlanProperties::new( EquivalenceProperties::new(schema), - Partitioning::UnknownPartitioning(1), + Partitioning::UnknownPartitioning(partitions), EmissionType::Incremental, Boundedness::Bounded, ) @@ -105,11 +169,21 @@ impl ExecutionPlan for MockExec { partition: usize, _context: Arc, ) -> datafusion::common::Result { - assert_eq!(partition, 0); + assert!(partition < self.partitions); + + if let Some(counts) = &self.execute_counts { + counts[partition].fetch_add(1, Ordering::SeqCst); + } + + if let Some(start_notify) = &self.start_notify { + start_notify.notify_waiters(); + } // Result doesn't implement clone, so do it ourself let data: Vec<_> = self .data + .get(partition) + .expect("partition data") .iter() .map(|r| match r { Ok(batch) => Ok(batch.clone()), @@ -123,8 +197,22 @@ impl ExecutionPlan for MockExec { // the batches are not available without the stream // yielding). let tx = builder.tx(); + let delay = self.delay; + let permit_open = self.permit_open.clone(); + let permit_notify = self.permit_notify.clone(); builder.spawn(async move { + if let Some(open) = permit_open { + let notify = permit_notify.expect("permit_notify"); + while !open.load(Ordering::SeqCst) { + notify.notified().await; + } + } for batch in data { + if let Some(delay) = delay + && delay > Duration::ZERO + { + sleep(delay).await; + } // println!("Sending batch via delayed stream"); if let Err(e) = tx.send(batch).await { println!("ERROR batch via delayed stream: {e}"); @@ -136,8 +224,40 @@ impl ExecutionPlan for MockExec { // returned stream simply reads off the rx stream Ok(builder.build()) } else { - // make an input that will error - let stream = futures::stream::iter(data); + let delay = self.delay; + let permit_open = self.permit_open.clone(); + let permit_notify = self.permit_notify.clone(); + let stream: Pin< + Box> + Send>, + > = if delay.is_some() || permit_open.is_some() { + Box::pin(stream::unfold( + (data.into_iter(), false), + move |(mut iter, mut gate_done)| { + let permit_open = permit_open.clone(); + let permit_notify = permit_notify.clone(); + async move { + if !gate_done { + if let Some(open) = permit_open { + let notify = permit_notify.expect("permit_notify"); + while !open.load(Ordering::SeqCst) { + notify.notified().await; + } + } + gate_done = true; + } + let batch = iter.next()?; + if let Some(delay) = delay + && delay > Duration::ZERO + { + sleep(delay).await; + } + Some((batch, (iter, gate_done))) + } + }, + )) + } else { + Box::pin(stream::iter(data)) + }; Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), stream, @@ -157,18 +277,23 @@ impl ExecutionPlan for MockExec { if partition.is_some() { return Ok(Statistics::new_unknown(&self.schema)); } - let data: datafusion::common::Result> = self + let data: datafusion::common::Result>> = self .data .iter() - .map(|r| match r { - Ok(batch) => Ok(batch.clone()), - Err(e) => Err(clone_error(e)), + .map(|partition_data| { + partition_data + .iter() + .map(|r| match r { + Ok(batch) => Ok(batch.clone()), + Err(e) => Err(clone_error(e)), + }) + .collect() }) .collect(); let data = data?; - Ok(compute_record_batch_statistics(&[data], &self.schema, None)) + Ok(compute_record_batch_statistics(&data, &self.schema, None)) } }