diff --git a/Cargo.lock b/Cargo.lock index 112a7b8869..f05eee598a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -119,9 +119,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.19" +version = "0.6.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "301af1932e46185686725e0fad2f8f2aa7da69dd70bf6ecc44d6b703844a3933" +checksum = "3ae563653d1938f79b1ab1b5e668c87c76a9930414574a6583a7b7e11a8e6192" dependencies = [ "anstyle", "anstyle-parse", @@ -149,22 +149,22 @@ dependencies = [ [[package]] name = "anstyle-query" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8bdeb6047d8983be085bab0ba1472e6dc604e7041dbf6fcd5e71523014fae9" +checksum = "9e231f6134f61b71076a3eab506c379d4f36122f2af15a9ff04415ea4c3339e2" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] name = "anstyle-wincon" -version = "3.0.9" +version = "3.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "403f75924867bb1033c59fbf0797484329750cfbe3c4325cd33127941fabc882" +checksum = "3e0633414522a32ffaac8ac6cc8f748e090c5717661fddeea04219e2344f5f2a" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -458,7 +458,7 @@ dependencies = [ "memchr", "num", "regex", - "regex-syntax", + "regex-syntax 0.8.5", ] [[package]] @@ -554,9 +554,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "aws-config" -version = "1.8.3" +version = "1.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0baa720ebadea158c5bda642ac444a2af0cdf7bb66b46d1e4533de5d1f449d0" +checksum = "483020b893cdef3d89637e428d588650c71cfae7ea2e6ecbaee4de4ff99fb2dd" dependencies = [ "aws-credential-types", "aws-runtime", @@ -584,9 +584,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.2.4" +version = "1.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b68c2194a190e1efc999612792e25b1ab3abfefe4306494efaaabc25933c0cbe" +checksum = "1541072f81945fa1251f8795ef6c92c4282d74d59f88498ae7d4bf00f0ebdad9" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -619,9 +619,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.5.9" +version = "1.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2090e664216c78e766b6bac10fe74d2f451c02441d43484cd76ac9a295075f7" +checksum = "c034a1bc1d70e16e7f4e4caf7e9f7693e4c9c24cd91cf17c2a0b21abaebc7c8b" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -643,9 +643,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.78.0" +version = "1.79.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbd7bc4bd34303733bded362c4c997a39130eac4310257c79aae8484b1c4b724" +checksum = "0a847168f15b46329fa32c7aca4e4f1a2e072f9b422f0adb19756f2e1457f111" dependencies = [ "aws-credential-types", "aws-runtime", @@ -665,9 +665,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.79.0" +version = "1.80.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77358d25f781bb106c1a69531231d4fd12c6be904edb0c47198c604df5a2dbca" +checksum = "b654dd24d65568738593e8239aef279a86a15374ec926ae8714e2d7245f34149" dependencies = [ "aws-credential-types", "aws-runtime", @@ -687,9 +687,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.80.0" +version = "1.81.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06e3ed2a9b828ae7763ddaed41d51724d2661a50c45f845b08967e52f4939cfc" +checksum = "c92ea8a7602321c83615c82b408820ad54280fb026e92de0eeea937342fafa24" dependencies = [ "aws-credential-types", "aws-runtime", @@ -710,9 +710,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.3.3" +version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddfb9021f581b71870a17eac25b52335b82211cdc092e02b6876b2bcefa61666" +checksum = "084c34162187d39e3740cb635acd73c4e3a551a36146ad6fe8883c929c9f876c" dependencies = [ "aws-credential-types", "aws-smithy-http", @@ -743,9 +743,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.62.2" +version = "0.62.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43c82ba4cab184ea61f6edaafc1072aad3c2a17dcf4c0fce19ac5694b90d8b5f" +checksum = "7c4dacf2d38996cf729f55e7a762b30918229917eca115de45dfa8dfb97796c9" dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", @@ -814,9 +814,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.8.5" +version = "1.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "660f70d9d8af6876b4c9aa8dcb0dbaf0f89b04ee9a4455bea1b4ba03b15f26f6" +checksum = "9e107ce0783019dbff59b3a244aa0c114e4a8c9d93498af9162608cd5474e796" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -838,9 +838,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.8.5" +version = "1.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "937a49ecf061895fca4a6dd8e864208ed9be7546c0527d04bc07d502ec5fba1c" +checksum = "75d52251ed4b9776a3e8487b2a01ac915f73b2da3af8fc1e77e0fce697a550d4" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -1196,9 +1196,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.30" +version = "1.2.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deec109607ca693028562ed836a5f1c4b8bd77755c4e132fc5ce11b0b6211ae7" +checksum = "c3a42d84bb6b69d3a8b3eaacf0d88f179e1929695e1ad012b6cf64d9caaa5fd2" dependencies = [ "jobserver", "libc", @@ -1465,6 +1465,24 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -2111,7 +2129,7 @@ dependencies = [ "log", "recursive", "regex", - "regex-syntax", + "regex-syntax 0.8.5", ] [[package]] @@ -2498,14 +2516,14 @@ dependencies = [ [[package]] name = "dns-lookup" -version = "2.0.4" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc" +checksum = "91adf1f5ae09290d87cca8f4f0a8e49bcc30672993eb8aa11a5c9d8872d16a98" dependencies = [ "cfg-if", "libc", - "socket2 0.5.10", - "windows-sys 0.48.0", + "socket2 0.6.0", + "windows-sys 0.60.2", ] [[package]] @@ -2836,6 +2854,20 @@ version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "312d2295c7302019c395cfb90dacd00a82a2eabd700429bba9c7a3f38dbbe11b" +[[package]] +name = "generator" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d18470a76cb7f8ff746cf1f7470914f900252ec36bbc40b569d74b1258446827" +dependencies = [ + "cc", + "cfg-if", + "libc", + "log", + "rustversion", + "windows", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -3790,9 +3822,9 @@ dependencies = [ [[package]] name = "libbz2-rs-sys" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "775bf80d5878ab7c2b1080b5351a48b2f737d9f6f8b383574eebcc22be0dfccb" +checksum = "2c4a545a15244c7d945065b5d392b2d2d7f21526fba56ce51467b06ed445e8f7" [[package]] name = "libc" @@ -3858,7 +3890,6 @@ checksum = "391290121bad3d37fbddad76d8f5d1c1c314cfc646d143d7e07a3086ddff0ce3" dependencies = [ "bitflags", "libc", - "redox_syscall", ] [[package]] @@ -3904,6 +3935,19 @@ version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +[[package]] +name = "loom" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + [[package]] name = "lru-slab" version = "0.1.2" @@ -3951,6 +3995,15 @@ dependencies = [ "serde", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matchit" version = "0.7.3" @@ -4023,6 +4076,25 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "moka" +version = "0.12.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9321642ca94a4282428e6ea4af8cc2ca4eac48ac7a6a4ea8f33f76d0ce70926" +dependencies = [ + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "loom", + "parking_lot", + "portable-atomic", + "rustc_version", + "smallvec", + "tagptr", + "thiserror 1.0.69", + "uuid", +] + [[package]] name = "monostate" version = "0.1.14" @@ -4077,6 +4149,16 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num" version = "0.4.3" @@ -4400,6 +4482,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.12.4" @@ -4943,9 +5031,9 @@ checksum = "5a651516ddc9168ebd67b24afd085a718be02f8858fe406591b013d101ce2f40" [[package]] name = "quick-xml" -version = "0.38.0" +version = "0.38.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8927b0664f5c5a98265138b7e3f90aa19a6b21353182469ace36d4ac527b7b1b" +checksum = "9845d9dccf565065824e69f9f235fafba1587031eda353c1f1561cd6a6be78f4" dependencies = [ "memchr", "serde", @@ -5127,9 +5215,9 @@ dependencies = [ [[package]] name = "redox_users" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78eaea1f52c56d57821be178b2d47e09ff26481a6042e8e042fcb0ced068b470" +checksum = "a4e608c6638b9c18977b00b475ac1f28d14e84b27d8d42f70e0bf1e3dec127ac" dependencies = [ "getrandom 0.2.16", "libredox", @@ -5144,8 +5232,17 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.9", + "regex-syntax 0.8.5", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -5156,7 +5253,7 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.5", ] [[package]] @@ -5165,6 +5262,12 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.5" @@ -5398,6 +5501,19 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "sail-cache" +version = "0.3.1" +dependencies = [ + "chrono", + "datafusion", + "log", + "moka", + "object_store", + "sail-common", + "thiserror 2.0.12", +] + [[package]] name = "sail-catalog" version = "0.3.1" @@ -5732,6 +5848,7 @@ version = "0.3.1" dependencies = [ "async-stream", "datafusion", + "futures", "lazy_static", "log", "monostate", @@ -5744,6 +5861,7 @@ dependencies = [ "prost-build", "quote", "regex", + "sail-cache", "sail-common", "sail-common-datafusion", "sail-execution", @@ -5835,6 +5953,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -6020,6 +6144,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -6028,9 +6161,9 @@ checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" [[package]] name = "signal-hook-registry" -version = "1.4.5" +version = "1.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" +checksum = "b2a4719bff48cee6b39d12c020eeb490953ad2443b7055bd0b21fca26bd8c28b" dependencies = [ "libc", ] @@ -6252,6 +6385,12 @@ dependencies = [ "libc", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tap" version = "1.0.1" @@ -6317,6 +6456,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "thrift" version = "0.17.0" @@ -6456,9 +6604,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.15" +version = "0.7.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" +checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" dependencies = [ "bytes", "futures-core", @@ -6725,6 +6873,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -6908,6 +7086,12 @@ dependencies = [ "syn", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "vcpkg" version = "0.2.15" @@ -7124,6 +7308,28 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.61.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893" +dependencies = [ + "windows-collections", + "windows-core", + "windows-future", + "windows-link", + "windows-numerics", +] + +[[package]] +name = "windows-collections" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8" +dependencies = [ + "windows-core", +] + [[package]] name = "windows-core" version = "0.61.2" @@ -7137,6 +7343,17 @@ dependencies = [ "windows-strings", ] +[[package]] +name = "windows-future" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" +dependencies = [ + "windows-core", + "windows-link", + "windows-threading", +] + [[package]] name = "windows-implement" version = "0.60.0" @@ -7165,6 +7382,16 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" +[[package]] +name = "windows-numerics" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" +dependencies = [ + "windows-core", + "windows-link", +] + [[package]] name = "windows-registry" version = "0.5.3" @@ -7194,15 +7421,6 @@ dependencies = [ "windows-link", ] -[[package]] -name = "windows-sys" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" -dependencies = [ - "windows-targets 0.48.5", -] - [[package]] name = "windows-sys" version = "0.52.0" @@ -7230,21 +7448,6 @@ dependencies = [ "windows-targets 0.53.3", ] -[[package]] -name = "windows-targets" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" -dependencies = [ - "windows_aarch64_gnullvm 0.48.5", - "windows_aarch64_msvc 0.48.5", - "windows_i686_gnu 0.48.5", - "windows_i686_msvc 0.48.5", - "windows_x86_64_gnu 0.48.5", - "windows_x86_64_gnullvm 0.48.5", - "windows_x86_64_msvc 0.48.5", -] - [[package]] name = "windows-targets" version = "0.52.6" @@ -7279,10 +7482,13 @@ dependencies = [ ] [[package]] -name = "windows_aarch64_gnullvm" -version = "0.48.5" +name = "windows-threading" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6" +dependencies = [ + "windows-link", +] [[package]] name = "windows_aarch64_gnullvm" @@ -7296,12 +7502,6 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86b8d5f90ddd19cb4a147a5fa63ca848db3df085e25fee3cc10b39b6eebae764" -[[package]] -name = "windows_aarch64_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" - [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -7314,12 +7514,6 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7651a1f62a11b8cbd5e0d42526e55f2c99886c77e007179efff86c2b137e66c" -[[package]] -name = "windows_i686_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" - [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -7344,12 +7538,6 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ce6ccbdedbf6d6354471319e781c0dfef054c81fbc7cf83f338a4296c0cae11" -[[package]] -name = "windows_i686_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" - [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -7362,12 +7550,6 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "581fee95406bb13382d2f65cd4a908ca7b1e4c2f1917f143ba16efe98a589b5d" -[[package]] -name = "windows_x86_64_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" - [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -7380,12 +7562,6 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e55b5ac9ea33f2fc1716d1742db15574fd6fc8dadc51caab1c16a3d3b4190ba" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" - [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -7398,12 +7574,6 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0a6e035dd0599267ce1ee132e51c27dd29437f63325753051e71dd9e42406c57" -[[package]] -name = "windows_x86_64_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" - [[package]] name = "windows_x86_64_msvc" version = "0.52.6" @@ -7560,9 +7730,9 @@ dependencies = [ [[package]] name = "zerovec" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a05eb080e015ba39cc9e23bbe5e7fb04d5fb040350f99f34e338d5fdd294428" +checksum = "bdbb9122ea75b11bf96e7492afb723e8a7fbe12c67417aa95e7e3d18144d37cd" dependencies = [ "yoke", "zerofrom", diff --git a/Cargo.toml b/Cargo.toml index fa4d3b424e..ff42f9b2ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,6 +97,7 @@ percent-encoding = "2.3.1" rustls = "0.23.29" dashmap = "6.1.0" itertools = "0.14.0" +moka = { version = "0.12.10", features = ["sync"] } ###### # The versions of the following dependencies are managed manually. @@ -134,6 +135,7 @@ arrow-buffer = { version = "55.2.0" } arrow-schema = { version = "55.2.0", features = ["serde"] } arrow-flight = { version = "55.2.0" } arrow-pyarrow = { version = "55.2.0" } +parquet = { version = "55.2.0" } serde_arrow = { version = "0.13.5", features = ["arrow-55"] } # The `object_store` version must match the one used in DataFusion. object_store = { version = "0.12.3", features = ["aws", "gcp", "azure", "http"] } @@ -142,7 +144,6 @@ hdfs-native-object-store = "0.14.2" # Lakehouse deltalake = { git = "https://github.com/delta-io/delta-rs.git", rev = "9a2c8ea", default-features = false, features = ["rustls"] } delta_kernel = { version = "0.13.0", features = ["arrow-55", "internal-api", "default-engine"] } -parquet = { version = "55.2.0" } bytes = "1.9.0" indexmap = "2.10.0" diff --git a/crates/sail-cache/Cargo.toml b/crates/sail-cache/Cargo.toml new file mode 100644 index 0000000000..81a4648a27 --- /dev/null +++ b/crates/sail-cache/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "sail-cache" +version.workspace = true +authors.workspace = true +edition.workspace = true +homepage.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true + +[lints] +workspace = true + +[dependencies] +sail-common = { path = "../sail-common" } + +thiserror = { workspace = true } +object_store = { workspace = true } +datafusion = { workspace = true } +chrono = { workspace = true } +log = { workspace = true } +moka = { workspace = true } \ No newline at end of file diff --git a/crates/sail-cache/src/error.rs b/crates/sail-cache/src/error.rs new file mode 100644 index 0000000000..ef346a4606 --- /dev/null +++ b/crates/sail-cache/src/error.rs @@ -0,0 +1,44 @@ +use sail_common::error::CommonError; +use thiserror::Error; +pub type CacheResult = Result; + +#[derive(Debug, Error)] +pub enum CacheError { + #[error("missing argument: {0}")] + MissingArgument(String), + #[error("invalid argument: {0}")] + InvalidArgument(String), + #[error("not supported: {0}")] + NotSupported(String), + #[error("internal error: {0}")] + InternalError(String), +} + +impl CacheError { + pub fn missing(message: impl Into) -> Self { + CacheError::MissingArgument(message.into()) + } + + pub fn invalid(message: impl Into) -> Self { + CacheError::InvalidArgument(message.into()) + } + + pub fn unsupported(message: impl Into) -> Self { + CacheError::NotSupported(message.into()) + } + + pub fn internal(message: impl Into) -> Self { + CacheError::InternalError(message.into()) + } +} + +impl From for CacheError { + fn from(error: CommonError) -> Self { + match error { + CommonError::MissingArgument(message) => CacheError::MissingArgument(message), + CommonError::InvalidArgument(message) => CacheError::InvalidArgument(message), + CommonError::NotSupported(message) => CacheError::NotSupported(message), + CommonError::InternalError(message) => CacheError::InternalError(message), + } + } +} diff --git a/crates/sail-cache/src/lib.rs b/crates/sail-cache/src/lib.rs new file mode 100644 index 0000000000..af454db666 --- /dev/null +++ b/crates/sail-cache/src/lib.rs @@ -0,0 +1,42 @@ +use log::error; + +pub mod error; +pub mod list_file_cache; +pub mod table_files_statistics_cache; + +#[allow(dead_code)] +pub(crate) fn try_parse_memory_limit(limit: &str) -> Option { + let (number, unit) = limit.split_at(limit.len() - 1); + let number: f64 = match number.parse() { + Ok(n) => n, + Err(_) => { + error!("Memory limit not set! Failed to parse number from '{limit}'"); + return None; + } + }; + match unit { + "K" => Some((number * 1024.0) as usize), + "M" => Some((number * 1024.0 * 1024.0) as usize), + "G" => Some((number * 1024.0 * 1024.0 * 1024.0) as usize), + _ => { + error!("Memory limit not set! Unsupported unit '{unit}' in memory limit '{limit}'."); + None + } + } +} + +pub(crate) fn try_parse_non_zero_u64(number: &str) -> Option { + match number.parse::() { + Ok(n) => { + if n == 0 { + None + } else { + Some(n) + } + } + Err(_) => { + error!("Failed to parse '{number}' as u64"); + None + } + } +} diff --git a/crates/sail-cache/src/list_file_cache.rs b/crates/sail-cache/src/list_file_cache.rs new file mode 100644 index 0000000000..6bf1465143 --- /dev/null +++ b/crates/sail-cache/src/list_file_cache.rs @@ -0,0 +1,130 @@ +use std::sync::{Arc, LazyLock}; +use std::time::Duration; + +use datafusion::execution::cache::CacheAccessor; +use log::debug; +use moka::sync::Cache; +use object_store::path::Path; +use object_store::ObjectMeta; + +use crate::try_parse_non_zero_u64; + +pub static GLOBAL_LIST_FILES_CACHE: LazyLock> = LazyLock::new(|| { + let ttl = std::env::var("SAIL_RUNTIME__LIST_FILES_CACHE_TTL").ok(); + let max_entries = std::env::var("SAIL_RUNTIME__LIST_FILES_CACHE_MAX_ENTRIES").ok(); + Arc::new(MokaListFilesCache::new(ttl, max_entries)) +}); + +pub struct MokaListFilesCache { + statistics: Cache>>, +} + +impl MokaListFilesCache { + pub fn new(ttl: Option, max_entries: Option) -> Self { + let mut builder = Cache::builder(); + + if let Some(ttl) = ttl { + if let Some(ttl) = try_parse_non_zero_u64(&ttl) { + debug!("Setting TTL for MokaListFilesCache to {ttl}"); + builder = builder.time_to_live(Duration::from_secs(ttl)); + } else { + debug!("Disabled or invalid TTL for MokaListFilesCache: {ttl}"); + } + } else { + debug!("No TTL set for MokaListFilesCache"); + } + + if let Some(max_entries) = max_entries { + if let Some(max_entries) = try_parse_non_zero_u64(&max_entries) { + debug!("Setting max entries for MokaListFilesCache to {max_entries}"); + builder = builder.max_capacity(max_entries); + } else { + debug!("Disabled or invalid max entries for MokaListFilesCache: {max_entries}"); + } + } else { + debug!("No max entries set for MokaListFilesCache"); + } + + Self { + statistics: builder.build(), + } + } +} + +impl CacheAccessor>> for MokaListFilesCache { + type Extra = ObjectMeta; + + fn get(&self, k: &Path) -> Option>> { + self.statistics.get(k) + } + + fn get_with_extra(&self, k: &Path, _e: &Self::Extra) -> Option>> { + self.get(k) + } + + fn put(&self, key: &Path, value: Arc>) -> Option>> { + self.statistics.insert(key.clone(), value); + None + } + + fn put_with_extra( + &self, + key: &Path, + value: Arc>, + _e: &Self::Extra, + ) -> Option>> { + self.put(key, value) + } + + fn remove(&mut self, k: &Path) -> Option>> { + self.statistics.remove(k) + } + + fn contains_key(&self, k: &Path) -> bool { + self.statistics.contains_key(k) + } + + fn len(&self) -> usize { + self.statistics.entry_count() as usize + } + + fn clear(&self) { + self.statistics.invalidate_all() + } + + fn name(&self) -> String { + "MokaListFilesCache".to_string() + } +} + +#[allow(clippy::unwrap_used)] +#[cfg(test)] +mod tests { + use chrono::DateTime; + use object_store::path::Path; + use object_store::ObjectMeta; + + use super::*; + + #[test] + fn test_list_file_cache() { + let meta = ObjectMeta { + location: Path::from("test"), + last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00") + .unwrap() + .into(), + size: 1024, + e_tag: None, + version: None, + }; + + let cache = MokaListFilesCache::new(None, None); + assert!(cache.get(&meta.location).is_none()); + + cache.put(&meta.location, vec![meta.clone()].into()); + assert_eq!( + cache.get(&meta.location).unwrap().first().unwrap().clone(), + meta.clone() + ); + } +} diff --git a/crates/sail-cache/src/table_files_statistics_cache.rs b/crates/sail-cache/src/table_files_statistics_cache.rs new file mode 100644 index 0000000000..a5012d339b --- /dev/null +++ b/crates/sail-cache/src/table_files_statistics_cache.rs @@ -0,0 +1,165 @@ +use std::sync::{Arc, LazyLock}; +use std::time::Duration; + +use datafusion::common::Statistics; +use datafusion::execution::cache::CacheAccessor; +use log::{debug, error}; +use moka::sync::Cache; +use object_store::path::Path; +use object_store::ObjectMeta; + +use crate::try_parse_non_zero_u64; + +pub static GLOBAL_FILE_STATISTICS_CACHE: LazyLock> = + LazyLock::new(|| { + let ttl = std::env::var("SAIL_PARQUET__TABLE_FILES_STATISTICS_CACHE_TTL").ok(); + let max_entries = + std::env::var("SAIL_PARQUET__TABLE_FILES_STATISTICS_CACHE_MAX_ENTRIES").ok(); + Arc::new(MokaFileStatisticsCache::new(ttl, max_entries)) + }); + +pub struct MokaFileStatisticsCache { + statistics: Cache)>, +} + +impl MokaFileStatisticsCache { + pub fn new(ttl: Option, max_entries: Option) -> Self { + let mut builder = Cache::builder(); + + if let Some(ttl) = ttl { + if let Some(ttl) = try_parse_non_zero_u64(&ttl) { + debug!("Setting TTL for MokaFileStatisticsCache to {ttl}"); + builder = builder.time_to_live(Duration::from_secs(ttl)); + } else { + debug!("Disabled or invalid TTL for MokaFileStatisticsCache: {ttl}"); + } + } else { + debug!("No TTL set for MokaFileStatisticsCache"); + } + + if let Some(max_entries) = max_entries { + if let Some(max_entries) = try_parse_non_zero_u64(&max_entries) { + debug!("Setting max entries for MokaFileStatisticsCache to {max_entries}"); + builder = builder.max_capacity(max_entries); + } else { + debug!( + "Disabled or invalid max entries for MokaFileStatisticsCache: {max_entries}" + ); + } + } else { + debug!("No max entries set for MokaFileStatisticsCache"); + } + Self { + statistics: builder.build(), + } + } +} + +impl CacheAccessor> for MokaFileStatisticsCache { + type Extra = ObjectMeta; + + fn get(&self, k: &Path) -> Option> { + self.statistics + .get(k) + .map(|(_saved_meta, statistics)| statistics) + } + + fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option> { + self.statistics.get(k).and_then(|(saved_meta, statistics)| { + if saved_meta.size == e.size && saved_meta.last_modified == e.last_modified { + Some(Arc::clone(&statistics)) + } else { + None + } + }) + } + + fn put(&self, _key: &Path, _value: Arc) -> Option> { + error!("Put cache in MokaFileStatisticsCache without Extra not supported."); + None + } + + fn put_with_extra( + &self, + key: &Path, + value: Arc, + e: &Self::Extra, + ) -> Option> { + self.statistics.insert(key.clone(), (e.clone(), value)); + None + } + + fn remove(&mut self, k: &Path) -> Option> { + self.statistics.remove(k).map(|(_, statistics)| statistics) + } + + fn contains_key(&self, k: &Path) -> bool { + self.statistics.contains_key(k) + } + + fn len(&self) -> usize { + self.statistics.entry_count() as usize + } + + fn clear(&self) { + self.statistics.invalidate_all(); + } + fn name(&self) -> String { + "MokaFileStatisticsCache".to_string() + } +} + +#[allow(clippy::unwrap_used)] +#[cfg(test)] +mod tests { + use chrono::DateTime; + use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; + use object_store::path::Path; + use object_store::ObjectMeta; + + use super::*; + + #[test] + fn test_statistics_cache() { + let meta = ObjectMeta { + location: Path::from("test"), + last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00") + .unwrap() + .into(), + size: 1024, + e_tag: None, + version: None, + }; + let cache = MokaFileStatisticsCache::new(None, None); + assert!(cache.get_with_extra(&meta.location, &meta).is_none()); + + cache.put_with_extra( + &meta.location, + Statistics::new_unknown(&Schema::new(vec![Field::new( + "test_column", + DataType::Timestamp(TimeUnit::Second, None), + false, + )])) + .into(), + &meta, + ); + assert!(cache.get_with_extra(&meta.location, &meta).is_some()); + + // file size changed + let mut meta2 = meta.clone(); + meta2.size = 2048; + assert!(cache.get_with_extra(&meta2.location, &meta2).is_none()); + + // file last_modified changed + let mut meta2 = meta.clone(); + meta2.last_modified = DateTime::parse_from_rfc3339("2022-09-27T22:40:00+02:00") + .unwrap() + .into(); + assert!(cache.get_with_extra(&meta2.location, &meta2).is_none()); + + // different file + let mut meta2 = meta; + meta2.location = Path::from("test2"); + assert!(cache.get_with_extra(&meta2.location, &meta2).is_none()); + } +} diff --git a/crates/sail-common/src/config/application.rs b/crates/sail-common/src/config/application.rs index fc8e428bb0..4b5c49f500 100644 --- a/crates/sail-common/src/config/application.rs +++ b/crates/sail-common/src/config/application.rs @@ -78,6 +78,11 @@ pub enum ExecutionMode { pub struct RuntimeConfig { pub stack_size: usize, pub enable_secondary: bool, + pub list_files_cache: bool, + #[serde(deserialize_with = "deserialize_non_zero")] + pub list_files_cache_ttl: Option, + #[serde(deserialize_with = "deserialize_non_zero")] + pub list_files_cache_max_entries: Option, } #[derive(Debug, Clone, Deserialize)] @@ -213,6 +218,11 @@ pub struct ParquetConfig { pub allow_single_file_parallelism: bool, pub maximum_parallel_row_group_writers: usize, pub maximum_buffered_record_batches_per_stream: usize, + pub table_files_statistics_cache: bool, + #[serde(deserialize_with = "deserialize_non_zero")] + pub table_files_statistics_cache_ttl: Option, + #[serde(deserialize_with = "deserialize_non_zero")] + pub table_files_statistics_cache_max_entries: Option, } #[derive(Debug, Clone, Deserialize)] diff --git a/crates/sail-common/src/config/application.yaml b/crates/sail-common/src/config/application.yaml index d428f064eb..6c4da89f93 100644 --- a/crates/sail-common/src/config/application.yaml +++ b/crates/sail-common/src/config/application.yaml @@ -21,6 +21,41 @@ description: Whether to enable a secondary Tokio runtime for separating I/O and compute tasks. experimental: true +- key: runtime.list_files_cache + type: boolean + default: "false" + description: | + Enable cache of file metadata when listing files. + This setting avoids repeatedly listing file metadata across all sessions, + which may be expensive in certain situations (e.g., remote object storage). + When enabled, updates to the underlying location may not be visible until + the cache entry expires (controlled by `runtime.list_files_cache_ttl`). + Default is disabled. + +- key: runtime.list_files_cache_ttl + type: number + default: "1800" + description: | + The time-to-live (TTL) in seconds for cached file listings. + Entries expire after this duration from when they were cached, + ensuring eventual consistency with the file system. + This setting is only effective when `runtime.list_files_cache` is enabled. + Setting this value to `0` disables the TTL. + Note: This setting can only be configured at startup and cannot be changed at runtime. + Default is 1800 seconds (30 minutes). + experimental: true + +- key: runtime.list_files_cache_max_entries + type: number + default: "10000" + description: | + Maximum number of directory listings to cache. + This setting is only effective when `runtime.list_files_cache` is enabled. + Setting this value to `0` disables the limit. + Note: This setting can only be configured at startup and cannot be changed at runtime. + Default is 10000 entries. + experimental: true + - key: cluster.enable_tls type: boolean default: "false" @@ -447,6 +482,41 @@ at the expense of higher memory usage. experimental: true +- key: parquet.table_files_statistics_cache + type: boolean + default: "true" + description: | + (Reading) Enable cache of files statistics when listing Parquet files. + This setting avoids repeatedly computing statistics across all sessions, + which may be expensive in certain situations (e.g., remote object storage). + The cache is automatically invalidated when the underlying file is modified. + Default is enabled. + +- key: parquet.table_files_statistics_cache_ttl + type: number + default: "1800" + description: | + The time-to-live (TTL) in seconds for cached Parquet files statistics. + Entries expire after this duration from when they were cached, + ensuring eventual consistency with the file system. + This setting is only effective when `parquet.table_files_statistics_cache` is enabled. + Setting this value to `0` disables the TTL. + Note: This setting can only be configured at startup and cannot be changed at runtime. + Default is 1800 seconds (30 minutes). + experimental: true + +- key: parquet.table_files_statistics_cache_max_entries + type: number + default: "10000" + description: | + Maximum number of Parquet files statistics to cache. + This setting is only effective when `parquet.table_files_statistics_cache` is enabled. + When the limit is reached, least recently used entries are evicted. + Setting this value to `0` disables the limit. + Note: This setting can only be configured at startup and cannot be changed at runtime. + Default is 10000 entries. + experimental: true + - key: kubernetes.image type: string default: "sail:latest" diff --git a/crates/sail-spark-connect/Cargo.toml b/crates/sail-spark-connect/Cargo.toml index beb30c3a04..02883d149d 100644 --- a/crates/sail-spark-connect/Cargo.toml +++ b/crates/sail-spark-connect/Cargo.toml @@ -7,6 +7,7 @@ edition = { workspace = true } workspace = true [dependencies] +sail-cache = { path = "../sail-cache" } sail-common = { path = "../sail-common" } sail-common-datafusion = { path = "../sail-common-datafusion" } sail-server = { path = "../sail-server" } @@ -34,6 +35,7 @@ monostate = { workspace = true } regex = { workspace = true } phf = { workspace = true } log = { workspace = true } +futures = { workspace = true } [build-dependencies] prost-build = { workspace = true } diff --git a/crates/sail-spark-connect/src/error.rs b/crates/sail-spark-connect/src/error.rs index 9a209a23fb..84376959bc 100644 --- a/crates/sail-spark-connect/src/error.rs +++ b/crates/sail-spark-connect/src/error.rs @@ -7,6 +7,7 @@ use datafusion::arrow::error::ArrowError; use datafusion::common::DataFusionError; use log::error; use prost::{DecodeError, UnknownEnumValue}; +use sail_cache::error::CacheError; use sail_common::error::CommonError; use sail_common_datafusion::error::CommonErrorCause; use sail_execution::error::ExecutionError; @@ -125,6 +126,17 @@ impl From for SparkError { } } +impl From for SparkError { + fn from(error: CacheError) -> Self { + match error { + CacheError::MissingArgument(message) => SparkError::MissingArgument(message), + CacheError::InvalidArgument(message) => SparkError::InvalidArgument(message), + CacheError::NotSupported(message) => SparkError::NotSupported(message), + CacheError::InternalError(message) => SparkError::InternalError(message), + } + } +} + impl From> for SparkError { fn from(error: PoisonError) -> Self { SparkError::InternalError(error.to_string()) diff --git a/crates/sail-spark-connect/src/executor.rs b/crates/sail-spark-connect/src/executor.rs index 9779dea270..339ee0cf9b 100644 --- a/crates/sail-spark-connect/src/executor.rs +++ b/crates/sail-spark-connect/src/executor.rs @@ -7,10 +7,10 @@ use std::sync::{Arc, Mutex}; use datafusion::arrow::array::RecordBatch; use datafusion::arrow::ipc::writer::StreamWriter; use datafusion::execution::SendableRecordBatchStream; +use futures::stream::{StreamExt, TryStreamExt}; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; use tonic::codegen::tokio_stream::wrappers::ReceiverStream; -use tonic::codegen::tokio_stream::StreamExt; use uuid::Uuid; use crate::error::{SparkError, SparkResult}; @@ -279,14 +279,9 @@ impl Executor { } pub(crate) async fn read_stream( - mut stream: SendableRecordBatchStream, + stream: SendableRecordBatchStream, ) -> SparkResult> { - let mut output = vec![]; - while let Some(batch) = stream.next().await { - let batch = batch?; - output.push(batch); - } - Ok(output) + stream.err_into().try_collect::>().await } pub(crate) fn to_arrow_batch(batch: &RecordBatch) -> SparkResult { diff --git a/crates/sail-spark-connect/src/session_manager.rs b/crates/sail-spark-connect/src/session_manager.rs index 7bd8fe7f5f..e53d19acea 100644 --- a/crates/sail-spark-connect/src/session_manager.rs +++ b/crates/sail-spark-connect/src/session_manager.rs @@ -5,10 +5,13 @@ use std::ops::DerefMut; use std::sync::{Arc, Mutex}; use std::time::Duration; +use datafusion::execution::cache::cache_manager::CacheManagerConfig; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::execution::SessionStateBuilder; use datafusion::prelude::{SessionConfig, SessionContext}; -use log::info; +use log::{debug, info}; +use sail_cache::list_file_cache::GLOBAL_LIST_FILES_CACHE; +use sail_cache::table_files_statistics_cache::GLOBAL_FILE_STATISTICS_CACHE; use sail_common::config::{AppConfig, ExecutionMode}; use sail_common::runtime::RuntimeHandle; use sail_common_datafusion::extension::SessionExtensionAccessor; @@ -160,10 +163,29 @@ impl SessionManager { let runtime = { let registry = DynamicObjectStoreRegistry::new(options.runtime.clone()); - let builder = - RuntimeEnvBuilder::default().with_object_store_registry(Arc::new(registry)); + + let cache_config = CacheManagerConfig::default(); + let cache_config = if options.config.parquet.table_files_statistics_cache { + debug!("[table_files_statistics_cache] Using table files statistics cache"); + cache_config.with_files_statistics_cache(Some(GLOBAL_FILE_STATISTICS_CACHE.clone())) + } else { + debug!("[table_files_statistics_cache] Not using table files statistics cache"); + cache_config + }; + let cache_config = if options.config.runtime.list_files_cache { + debug!("[list_files_cache] Using list files cache"); + cache_config.with_list_files_cache(Some(GLOBAL_LIST_FILES_CACHE.clone())) + } else { + debug!("[list_files_cache] Not using list files cache"); + cache_config + }; + + let builder = RuntimeEnvBuilder::default() + .with_object_store_registry(Arc::new(registry)) + .with_cache_manager(cache_config); Arc::new(builder.build()?) }; + let state = SessionStateBuilder::new() .with_config(session_config) .with_runtime_env(runtime) diff --git a/python/pysail/examples/spark/tpch.py b/python/pysail/examples/spark/tpch.py index c7ffc7688d..fa0bbb34d0 100644 --- a/python/pysail/examples/spark/tpch.py +++ b/python/pysail/examples/spark/tpch.py @@ -62,16 +62,21 @@ def _run_query(self, spark: SparkSession, query: int, explain: bool): # noqa: F print(f"The query returned {len(rows)} rows and took {query_time} seconds.") return total_time - def run(self, query: int | None = None, explain: bool = False): # noqa: FBT001, FBT002 + def run(self, query: int | None = None, explain: bool = False, num_runs: int = 1): # noqa: FBT001, FBT002 with self.spark_session() as spark: if query is not None: self._run_query(spark, query, explain) else: - total_time = 0 - for query in range(1, 23): - total_time += self._run_query(spark, query, explain) + min_total_time = 0 + for run in range(num_runs): + total_time = 0 + for query in range(1, 23): + total_time += self._run_query(spark, query, explain) + min_total_time = total_time if run == 0 else min(min_total_time, total_time) + if not explain: + print(f"\n\nRun {run+1} Total time for all queries: {total_time} seconds.") if not explain: - print(f"\n\nTotal time for all queries: {total_time} seconds.") + print(f"\n\nMin total time across {num_runs}: {min_total_time} seconds.") def main(): @@ -79,6 +84,7 @@ def main(): parser.add_argument("--url", type=str, default="sc://localhost:50051") parser.add_argument("--data-path", type=str, required=True) parser.add_argument("--query-path", type=str, required=True) + parser.add_argument("--num-runs", type=int, default=1) group = parser.add_mutually_exclusive_group(required=True) group.add_argument("--console", action="store_true") group.add_argument("--query", type=int, choices=range(1, 23)) @@ -101,11 +107,11 @@ def main(): banner="Spark TPC-H Data Explorer\nThe Spark session is available as `spark`.", ) elif args.query: - benchmark.run(args.query) + benchmark.run(args.query, explain=False, num_runs=args.num_runs) elif args.explain: - benchmark.run(args.explain, explain=True) + benchmark.run(args.explain, explain=True, num_runs=args.num_runs) elif args.query_all: - benchmark.run() + benchmark.run(query=None, explain=False, num_runs=args.num_runs) if __name__ == "__main__":