diff --git a/Cargo.lock b/Cargo.lock index 2e79e913..1a858eb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -197,6 +197,39 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "arrow" +version = "53.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4caf25cdc4a985f91df42ed9e9308e1adbcd341a31a72605c697033fcef163e3" +dependencies = [ + "arrow-arith", + "arrow-array 53.2.0", + "arrow-buffer 53.2.0", + "arrow-cast 53.2.0", + "arrow-data 53.2.0", + "arrow-ord", + "arrow-row", + "arrow-schema 53.2.0", + "arrow-select 53.2.0", + "arrow-string", +] + +[[package]] +name = "arrow-arith" +version = "53.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91f2dfd1a7ec0aca967dfaa616096aec49779adc8eccec005e2f5e4111b1192a" +dependencies = [ + "arrow-array 53.2.0", + "arrow-buffer 53.2.0", + "arrow-data 53.2.0", + "arrow-schema 53.2.0", + "chrono", + "half 2.4.1", + "num", +] + [[package]] name = "arrow-array" version = "52.2.0" @@ -204,9 +237,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16f4a9468c882dc66862cef4e1fd8423d47e67972377d85d80e022786427768c" dependencies = [ "ahash 0.8.11", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-buffer 52.2.0", + "arrow-data 52.2.0", + "arrow-schema 52.2.0", + "chrono", + "half 2.4.1", + "hashbrown 0.14.5", + "num", +] + +[[package]] +name = "arrow-array" +version = "53.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d39387ca628be747394890a6e47f138ceac1aa912eab64f02519fed24b637af8" +dependencies = [ + "ahash 0.8.11", + "arrow-buffer 53.2.0", + "arrow-data 53.2.0", + "arrow-schema 53.2.0", "chrono", "half 2.4.1", "hashbrown 0.14.5", @@ -224,17 +273,28 @@ dependencies = [ "num", ] +[[package]] +name = "arrow-buffer" +version = "53.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e51e05228852ffe3eb391ce7178a0f97d2cf80cc6ef91d3c4a6b3cb688049ec" +dependencies = [ + "bytes", + "half 2.4.1", + "num", +] + [[package]] name = "arrow-cast" version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da26719e76b81d8bc3faad1d4dbdc1bcc10d14704e63dc17fc9f3e7e1e567c8e" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", - "arrow-select", + "arrow-array 52.2.0", + "arrow-buffer 52.2.0", + "arrow-data 52.2.0", + "arrow-schema 52.2.0", + "arrow-select 52.2.0", "atoi", "base64 0.22.1", "chrono", @@ -244,14 +304,47 @@ dependencies = [ "ryu", ] +[[package]] +name = "arrow-cast" +version = "53.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d09aea56ec9fa267f3f3f6cdab67d8a9974cbba90b3aa38c8fe9d0bb071bd8c1" +dependencies = [ + "arrow-array 53.2.0", + "arrow-buffer 53.2.0", + "arrow-data 53.2.0", + "arrow-schema 53.2.0", + "arrow-select 53.2.0", + "atoi", + "base64 0.22.1", + "chrono", + "comfy-table", + "half 2.4.1", + "lexical-core 1.0.2", + "num", + "ryu", +] + [[package]] name = "arrow-data" version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd9d6f18c65ef7a2573ab498c374d8ae364b4a4edf67105357491c031f716ca5" dependencies = [ - "arrow-buffer", - "arrow-schema", + "arrow-buffer 52.2.0", + "arrow-schema 52.2.0", + "half 2.4.1", + "num", +] + +[[package]] +name = "arrow-data" +version = "53.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b98ae0af50890b494cebd7d6b04b35e896205c1d1df7b29a6272c5d0d0249ef5" +dependencies = [ + "arrow-buffer 53.2.0", + "arrow-schema 53.2.0", "half 2.4.1", "num", ] @@ -262,20 +355,58 @@ version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e786e1cdd952205d9a8afc69397b317cfbb6e0095e445c69cda7e8da5c1eeb0f" dependencies = [ - "arrow-array", - "arrow-buffer", - "arrow-cast", - "arrow-data", - "arrow-schema", + "arrow-array 52.2.0", + "arrow-buffer 52.2.0", + "arrow-cast 52.2.0", + "arrow-data 52.2.0", + "arrow-schema 52.2.0", "flatbuffers", ] +[[package]] +name = "arrow-ord" +version = "53.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2883d7035e0b600fb4c30ce1e50e66e53d8656aa729f2bfa4b51d359cf3ded52" +dependencies = [ + "arrow-array 53.2.0", + "arrow-buffer 53.2.0", + "arrow-data 53.2.0", + "arrow-schema 53.2.0", + "arrow-select 53.2.0", + "half 2.4.1", + "num", +] + +[[package]] +name = "arrow-row" +version = "53.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552907e8e587a6fde4f8843fd7a27a576a260f65dab6c065741ea79f633fc5be" +dependencies = [ + "ahash 0.8.11", + "arrow-array 53.2.0", + "arrow-buffer 53.2.0", + "arrow-data 53.2.0", + "arrow-schema 53.2.0", + "half 2.4.1", +] + [[package]] name = "arrow-schema" version = "52.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e972cd1ff4a4ccd22f86d3e53e835c2ed92e0eea6a3e8eadb72b4f1ac802cf8" +[[package]] +name = "arrow-schema" +version = "53.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "539ada65246b949bd99ffa0881a9a15a4a529448af1a07a9838dd78617dafab1" +dependencies = [ + "bitflags 2.5.0", +] + [[package]] name = "arrow-select" version = "52.2.0" @@ -283,13 +414,44 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "600bae05d43483d216fb3494f8c32fdbefd8aa4e1de237e790dbb3d9f44690a3" dependencies = [ "ahash 0.8.11", - "arrow-array", - "arrow-buffer", - "arrow-data", - "arrow-schema", + "arrow-array 52.2.0", + "arrow-buffer 52.2.0", + "arrow-data 52.2.0", + "arrow-schema 52.2.0", + "num", +] + +[[package]] +name = "arrow-select" +version = "53.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6259e566b752da6dceab91766ed8b2e67bf6270eb9ad8a6e07a33c1bede2b125" +dependencies = [ + "ahash 0.8.11", + "arrow-array 53.2.0", + "arrow-buffer 53.2.0", + "arrow-data 53.2.0", + "arrow-schema 53.2.0", "num", ] +[[package]] +name = "arrow-string" +version = "53.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3179ccbd18ebf04277a095ba7321b93fd1f774f18816bd5f6b3ce2f594edb6c" +dependencies = [ + "arrow-array 53.2.0", + "arrow-buffer 53.2.0", + "arrow-data 53.2.0", + "arrow-schema 53.2.0", + "arrow-select 53.2.0", + "memchr", + "num", + "regex", + "regex-syntax 0.8.3", +] + [[package]] name = "assert-json-diff" version = "2.0.2" @@ -1342,6 +1504,12 @@ dependencies = [ "toml 0.8.13", ] +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cc" version = "1.0.98" @@ -1585,6 +1753,17 @@ dependencies = [ "memchr", ] +[[package]] +name = "comfy-table" +version = "7.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7" +dependencies = [ + "strum 0.26.3", + "strum_macros 0.26.4", + "unicode-width", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -2104,6 +2283,25 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "duckdb" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86844939330ba6ce345c4b5333d3be45c4f0c092779bf9617bba92efb8b841f5" +dependencies = [ + "arrow", + "cast", + "fallible-iterator 0.3.0", + "fallible-streaming-iterator", + "hashlink", + "libduckdb-sys", + "memchr", + "num-integer", + "rust_decimal", + "smallvec", + "strum 0.25.0", +] + [[package]] name = "dyn-clone" version = "1.0.17" @@ -2416,6 +2614,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "1.9.0" @@ -2451,6 +2655,18 @@ dependencies = [ "subtle", ] +[[package]] +name = "filetime" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35c0522e981e68cbfa8c3f978441a5f34b30b96e146b33cd3359176b50fe8586" +dependencies = [ + "cfg-if", + "libc", + "libredox", + "windows-sys 0.59.0", +] + [[package]] name = "finl_unicode" version = "1.2.0" @@ -2844,6 +3060,15 @@ dependencies = [ "foldhash", ] +[[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "heapless" version = "0.8.0" @@ -3440,11 +3665,24 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46" dependencies = [ - "lexical-parse-float", - "lexical-parse-integer", - "lexical-util", - "lexical-write-float", - "lexical-write-integer", + "lexical-parse-float 0.8.5", + "lexical-parse-integer 0.8.6", + "lexical-util 0.8.5", + "lexical-write-float 0.8.5", + "lexical-write-integer 0.8.5", +] + +[[package]] +name = "lexical-core" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0431c65b318a590c1de6b8fd6e72798c92291d27762d94c9e6c37ed7a73d8458" +dependencies = [ + "lexical-parse-float 1.0.2", + "lexical-parse-integer 1.0.2", + "lexical-util 1.0.3", + "lexical-write-float 1.0.2", + "lexical-write-integer 1.0.2", ] [[package]] @@ -3453,8 +3691,19 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f" dependencies = [ - "lexical-parse-integer", - "lexical-util", + "lexical-parse-integer 0.8.6", + "lexical-util 0.8.5", + "static_assertions", +] + +[[package]] +name = "lexical-parse-float" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb17a4bdb9b418051aa59d41d65b1c9be5affab314a872e5ad7f06231fb3b4e0" +dependencies = [ + "lexical-parse-integer 1.0.2", + "lexical-util 1.0.3", "static_assertions", ] @@ -3464,7 +3713,17 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9" dependencies = [ - "lexical-util", + "lexical-util 0.8.5", + "static_assertions", +] + +[[package]] +name = "lexical-parse-integer" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5df98f4a4ab53bf8b175b363a34c7af608fe31f93cc1fb1bf07130622ca4ef61" +dependencies = [ + "lexical-util 1.0.3", "static_assertions", ] @@ -3477,14 +3736,34 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "lexical-util" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85314db53332e5c192b6bca611fb10c114a80d1b831ddac0af1e9be1b9232ca0" +dependencies = [ + "static_assertions", +] + [[package]] name = "lexical-write-float" version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862" dependencies = [ - "lexical-util", - "lexical-write-integer", + "lexical-util 0.8.5", + "lexical-write-integer 0.8.5", + "static_assertions", +] + +[[package]] +name = "lexical-write-float" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e7c3ad4e37db81c1cbe7cf34610340adc09c322871972f74877a712abc6c809" +dependencies = [ + "lexical-util 1.0.3", + "lexical-write-integer 1.0.2", "static_assertions", ] @@ -3494,7 +3773,17 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446" dependencies = [ - "lexical-util", + "lexical-util 0.8.5", + "static_assertions", +] + +[[package]] +name = "lexical-write-integer" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb89e9f6958b83258afa3deed90b5de9ef68eef090ad5086c791cd2345610162" +dependencies = [ + "lexical-util 1.0.3", "static_assertions", ] @@ -3504,6 +3793,22 @@ version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +[[package]] +name = "libduckdb-sys" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac2de5219db852597558df5dcd617ffccd5cbd7b9f5402ccbf899aca6cb6047" +dependencies = [ + "autocfg", + "cc", + "flate2", + "pkg-config", + "serde 1.0.202", + "serde_json", + "tar", + "vcpkg", +] + [[package]] name = "libloading" version = "0.8.3" @@ -3511,7 +3816,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -3528,6 +3833,7 @@ checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ "bitflags 2.5.0", "libc", + "redox_syscall 0.5.1", ] [[package]] @@ -3724,9 +4030,9 @@ checksum = "7e6bcd6433cff03a4bfc3d9834d504467db1f1cf6d0ea765d37d330249ed629d" [[package]] name = "memchr" -version = "2.7.2" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "memfd" @@ -4234,13 +4540,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e977b9066b4d3b03555c22bdc442f3fadebd96a39111249113087d0edb2691cd" dependencies = [ "ahash 0.8.11", - "arrow-array", - "arrow-buffer", - "arrow-cast", - "arrow-data", + "arrow-array 52.2.0", + "arrow-buffer 52.2.0", + "arrow-cast 52.2.0", + "arrow-data 52.2.0", "arrow-ipc", - "arrow-schema", - "arrow-select", + "arrow-schema 52.2.0", + "arrow-select 52.2.0", "base64 0.22.1", "brotli", "bytes", @@ -5602,6 +5908,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustversion" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" + [[package]] name = "rusty-fork" version = "0.3.0" @@ -6148,6 +6460,47 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "strum" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" +dependencies = [ + "strum_macros 0.25.3", +] + +[[package]] +name = "strum" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" + +[[package]] +name = "strum_macros" +version = "0.25.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.65", +] + +[[package]] +name = "strum_macros" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.65", +] + [[package]] name = "subtle" version = "2.5.0" @@ -6289,6 +6642,17 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "tar" +version = "0.4.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ff6c40d3aedb5e06b57c6f669ad17ab063dd1e63d977c6a88e7f4dfa4f04020" +dependencies = [ + "filetime", + "libc", + "xattr", +] + [[package]] name = "target-lexicon" version = "0.12.16" @@ -7792,7 +8156,7 @@ name = "wrappers" version = "0.4.3" dependencies = [ "anyhow", - "arrow-array", + "arrow-array 52.2.0", "async-compression 0.3.15", "aws-config", "aws-sdk-cognitoidentityprovider", @@ -7802,6 +8166,7 @@ dependencies = [ "clickhouse-rs", "csv", "dirs", + "duckdb", "either", "futures", "gcp-bigquery-client", @@ -7843,6 +8208,17 @@ dependencies = [ "tap", ] +[[package]] +name = "xattr" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8da84f1a25939b27f6820d92aed108f83ff920fdf11a7b19366c27c4cda81d4f" +dependencies = [ + "libc", + "linux-raw-sys 0.4.14", + "rustix 0.38.34", +] + [[package]] name = "xdg-home" version = "1.1.0" diff --git a/wrappers/Cargo.toml b/wrappers/Cargo.toml index 3f68eb6a..0b8a85df 100644 --- a/wrappers/Cargo.toml +++ b/wrappers/Cargo.toml @@ -140,6 +140,10 @@ wasm_fdw = [ "serde_json", "jwt-simple", ] +duckdb_fdw = [ + "duckdb", + "thiserror", +] # Does not include helloworld_fdw because of its general uselessness native_fdws = [ "airtable_fdw", @@ -242,6 +246,9 @@ dirs = { version = "5.0.1", optional = true } sha2 = { version = "0.10.8", optional = true } hex = { version = "0.4.3", optional = true } +# for duckdb_fdw +duckdb = { version = "1.1.1", features = ["bundled"], optional = true } + thiserror = { version = "1.0.48", optional = true } anyhow = { version = "1.0.81", optional = true } diff --git a/wrappers/src/fdw/duckdb_fdw/README.md b/wrappers/src/fdw/duckdb_fdw/README.md new file mode 100644 index 00000000..c641c0cd --- /dev/null +++ b/wrappers/src/fdw/duckdb_fdw/README.md @@ -0,0 +1,14 @@ +# DuckDB Foreign Data Wrapper + +This is a foreign data wrapper for [DuckDB](https://duckdb.org/). It is developed using [Wrappers](https://github.com/supabase/wrappers) and only supports data scan at this moment. + +## Documentation + +[https://fdw.dev/catalog/duckdb/](https://fdw.dev/catalog/duckdb/) + +## Changelog + +| Version | Date | Notes | +| ------- | ---------- | ---------------------------------------------------- | +| 0.1.0 | 2024-10-31 | Initial version | + diff --git a/wrappers/src/fdw/duckdb_fdw/duckdb_fdw.rs b/wrappers/src/fdw/duckdb_fdw/duckdb_fdw.rs new file mode 100644 index 00000000..bb8a08b9 --- /dev/null +++ b/wrappers/src/fdw/duckdb_fdw/duckdb_fdw.rs @@ -0,0 +1,271 @@ +use crate::stats; +use duckdb::{self, types::Type as DuckdbType, Connection}; +use pgrx::{pg_sys, prelude::to_timestamp, PgBuiltInOids, PgOid}; +use std::collections::HashMap; + +use supabase_wrappers::prelude::*; + +use super::{DuckdbFdwError, DuckdbFdwResult}; + +// convert a DuckDB field to a wrappers cell +fn field_to_cell( + src_row: &duckdb::Row<'_>, + col_idx: usize, + tgt_col: &Column, +) -> Result, duckdb::Error> { + match PgOid::from(tgt_col.type_oid) { + PgOid::BuiltIn(PgBuiltInOids::BOOLOID) => src_row + .get::<_, Option>(col_idx) + .map(|v| v.map(Cell::Bool)), + PgOid::BuiltIn(PgBuiltInOids::CHAROID) => src_row + .get::<_, Option>(col_idx) + .map(|v| v.map(Cell::I8)), + PgOid::BuiltIn(PgBuiltInOids::INT2OID) => src_row + .get::<_, Option>(col_idx) + .map(|v| v.map(Cell::I16)), + PgOid::BuiltIn(PgBuiltInOids::FLOAT4OID) => src_row + .get::<_, Option>(col_idx) + .map(|v| v.map(Cell::F32)), + PgOid::BuiltIn(PgBuiltInOids::INT4OID) => src_row + .get::<_, Option>(col_idx) + .map(|v| v.map(Cell::I32)), + PgOid::BuiltIn(PgBuiltInOids::FLOAT8OID) => src_row + .get::<_, Option>(col_idx) + .map(|v| v.map(Cell::F64)), + PgOid::BuiltIn(PgBuiltInOids::INT8OID) => src_row + .get::<_, Option>(col_idx) + .map(|v| v.map(Cell::I64)), + PgOid::BuiltIn(PgBuiltInOids::NUMERICOID) => src_row + .get::<_, Option>(col_idx) + .map(|v| v.map(pgrx::AnyNumeric::from).map(Cell::Numeric)), + PgOid::BuiltIn(PgBuiltInOids::TEXTOID) => src_row + .get::<_, Option>(col_idx) + .map(|v| v.map(Cell::String)), + PgOid::BuiltIn(PgBuiltInOids::DATEOID) => src_row.get::<_, Option>(col_idx).map(|v| { + v.map(|v| { + let ts = to_timestamp((v * 86_400) as f64); + Cell::Date(pgrx::prelude::Date::from(ts)) + }) + }), + PgOid::BuiltIn(PgBuiltInOids::TIMESTAMPOID) => { + src_row.get::<_, Option>(col_idx).map(|v| { + v.map(|v| { + let ts = to_timestamp((v / 1_000_000) as _); + Cell::Timestamp(ts.to_utc()) + }) + }) + } + PgOid::BuiltIn(PgBuiltInOids::TIMESTAMPTZOID) => { + src_row.get::<_, Option>(col_idx).map(|v| { + v.map(|v| { + let ts = to_timestamp((v / 1_000_000) as _); + Cell::Timestamptz(ts) + }) + }) + } + _ => Err(duckdb::Error::InvalidColumnType( + col_idx, + tgt_col.name.clone(), + DuckdbType::Any, + )), + } +} + +#[wrappers_fdw( + version = "0.1.0", + author = "Supabase", + website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/duckdb_fdw", + error_type = "DuckdbFdwError" +)] +pub(crate) struct DuckdbFdw { + conn: Connection, + table: String, + scan_result: Vec, + iter_idx: usize, +} + +impl DuckdbFdw { + const FDW_NAME: &'static str = "DuckdbFdw"; + + fn deparse( + &self, + quals: &[Qual], + columns: &[Column], + sorts: &[Sort], + limit: &Option, + ) -> DuckdbFdwResult { + let tgts = if columns.is_empty() { + "*".to_string() + } else { + columns + .iter() + .map(|c| c.name.clone()) + .collect::>() + .join(", ") + }; + + let mut sql = format!("select {} from {} as _wrappers_tbl", tgts, &self.table); + + if !quals.is_empty() { + let cond = quals + .iter() + .map(|q| q.deparse()) + .collect::>() + .join(" and "); + + if !cond.is_empty() { + sql.push_str(&format!(" where {}", cond)); + } + } + + // push down sorts + if !sorts.is_empty() { + let order_by = sorts + .iter() + .map(|sort| { + let mut clause = sort.field.to_string(); + if sort.reversed { + clause.push_str(" desc"); + } else { + clause.push_str(" asc"); + } + clause + }) + .collect::>() + .join(", "); + sql.push_str(&format!(" order by {}", order_by)); + } + + // push down limits + // Note: Postgres will take limit and offset locally after reading rows + // from remote, so we calculate the real limit and only use it without + // pushing down offset. + if let Some(limit) = limit { + let real_limit = limit.offset + limit.count; + sql.push_str(&format!(" limit {}", real_limit)); + } + + Ok(sql) + } +} + +impl ForeignDataWrapper for DuckdbFdw { + fn new(server: ForeignServer) -> DuckdbFdwResult { + let conn = Connection::open_in_memory()?; + + // create secret if key is specified in options + if let Some(key_type) = server.options.get("key_type") { + let mut params = vec![format!("type {key_type}")]; + + if let Some(key_id) = server.options.get("key_id") { + params.push(format!("key_id '{key_id}'")); + } else if let Some(vault_key_id) = server.options.get("vault_key_id") { + params.push(format!( + "key_id '{}'", + get_vault_secret(vault_key_id).unwrap_or_default() + )); + } + + if let Some(key_secret) = server.options.get("key_secret") { + params.push(format!("secret '{key_secret}'")); + } else if let Some(vault_key_secret) = server.options.get("vault_key_secret") { + params.push(format!( + "secret '{}'", + get_vault_secret(vault_key_secret).unwrap_or_default() + )); + } + + if let Some(key_region) = server.options.get("key_region") { + params.push(format!("region '{key_region}'")); + } + + let sql = format!("create secret ({});", params.join(",")); + //report_info(&format!("secret sql=={}", sql)); + conn.execute_batch(&sql)?; + } + + stats::inc_stats(Self::FDW_NAME, stats::Metric::CreateTimes, 1); + + Ok(DuckdbFdw { + conn, + table: String::default(), + scan_result: Vec::new(), + iter_idx: 0, + }) + } + + fn begin_scan( + &mut self, + quals: &[Qual], + columns: &[Column], + sorts: &[Sort], + limit: &Option, + options: &HashMap, + ) -> DuckdbFdwResult<()> { + self.table = require_option("table", options)?.to_string(); + + // compile sql query to run on DuckDB + let sql = self.deparse(quals, columns, sorts, limit)?; + //report_info(&format!("sql=={}", sql)); + + let mut stmt = self.conn.prepare(&sql)?; + let tgt_rows: Result, _> = stmt + .query_map([], |src_row| { + let mut tgt_row = Row::new(); + for (col_idx, tgt_col) in columns.iter().enumerate() { + let cell = field_to_cell(src_row, col_idx, tgt_col)?; + tgt_row.push(&tgt_col.name, cell); + } + Ok(tgt_row) + })? + .collect(); + self.scan_result = tgt_rows?; + + stats::inc_stats( + Self::FDW_NAME, + stats::Metric::RowsIn, + self.scan_result.len() as _, + ); + stats::inc_stats( + Self::FDW_NAME, + stats::Metric::RowsOut, + self.scan_result.len() as _, + ); + + Ok(()) + } + + fn iter_scan(&mut self, row: &mut Row) -> DuckdbFdwResult> { + if self.iter_idx >= self.scan_result.len() { + return Ok(None); + } + + row.replace_with(self.scan_result[self.iter_idx].clone()); + self.iter_idx += 1; + + Ok(Some(())) + } + + fn re_scan(&mut self) -> DuckdbFdwResult<()> { + self.iter_idx = 0; + Ok(()) + } + + fn end_scan(&mut self) -> DuckdbFdwResult<()> { + self.scan_result.clear(); + Ok(()) + } + + fn validator( + options: Vec>, + catalog: Option, + ) -> DuckdbFdwResult<()> { + if let Some(oid) = catalog { + if oid == FOREIGN_TABLE_RELATION_ID { + check_options_contain(&options, "table")?; + } + } + + Ok(()) + } +} diff --git a/wrappers/src/fdw/duckdb_fdw/mod.rs b/wrappers/src/fdw/duckdb_fdw/mod.rs new file mode 100644 index 00000000..f09f8a4a --- /dev/null +++ b/wrappers/src/fdw/duckdb_fdw/mod.rs @@ -0,0 +1,28 @@ +#![allow(clippy::module_inception)] +mod duckdb_fdw; + +use pgrx::pg_sys::panic::ErrorReport; +use pgrx::prelude::PgSqlErrorCode; +use thiserror::Error; + +use supabase_wrappers::prelude::{CreateRuntimeError, OptionsError}; + +#[derive(Error, Debug)] +enum DuckdbFdwError { + #[error("{0}")] + Duckdb(#[from] duckdb::Error), + + #[error("{0}")] + Options(#[from] OptionsError), + + #[error("{0}")] + CreateRuntime(#[from] CreateRuntimeError), +} + +impl From for ErrorReport { + fn from(value: DuckdbFdwError) -> Self { + ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, format!("{value}"), "") + } +} + +type DuckdbFdwResult = Result; diff --git a/wrappers/src/fdw/mod.rs b/wrappers/src/fdw/mod.rs index d10de110..7ad6190d 100644 --- a/wrappers/src/fdw/mod.rs +++ b/wrappers/src/fdw/mod.rs @@ -36,3 +36,6 @@ mod cognito_fdw; #[cfg(feature = "wasm_fdw")] mod wasm_fdw; + +#[cfg(feature = "duckdb_fdw")] +mod duckdb_fdw;