diff --git a/.github/workflows/launch_infrastructure.yaml b/.github/workflows/launch_infrastructure.yaml index d2b733bd2..05703b997 100644 --- a/.github/workflows/launch_infrastructure.yaml +++ b/.github/workflows/launch_infrastructure.yaml @@ -49,16 +49,12 @@ jobs: with: role-to-assume: ${{ secrets.AWS_IAM_INFRASTRUCTURE_ROLE_ARN }} aws-region: ${{ secrets.AWS_REGION }} + - name: Set up Docker Buildx + if: steps.changes.outputs.service == 'true' || github.event_name == 'schedule' + uses: docker/setup-buildx-action@v3 - name: Install Flox if: steps.changes.outputs.service == 'true' || github.event_name == 'schedule' uses: flox/install-flox-action@v2 - - name: Conditionally download artifacts - if: (steps.changes.outputs.service == 'true' || github.event_name == 'schedule') && matrix.service == 'equitypricemodel' - uses: flox/activate-action@v1 - env: - AWS_S3_ARTIFACTS_BUCKET_NAME: ${{ secrets.AWS_S3_ARTIFACTS_BUCKET_NAME }} - with: - command: mask models artifacts download equitypricemodel - name: Build ${{ matrix.service }} image if: steps.changes.outputs.service == 'true' || github.event_name == 'schedule' uses: flox/activate-action@v1 diff --git a/.github/workflows/run_code_checks.yaml b/.github/workflows/run_code_checks.yaml index 199abb55d..6e2bf0ae3 100644 --- a/.github/workflows/run_code_checks.yaml +++ b/.github/workflows/run_code_checks.yaml @@ -27,6 +27,8 @@ jobs: with: workspaces: . cache-on-failure: false + shared-key: rust-continuous-integration + save-if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/master' }} - name: Run Rust code checks uses: flox/activate-action@v1 with: @@ -88,5 +90,4 @@ jobs: uses: coverallsapp/github-action@v2 with: github-token: ${{ secrets.GITHUB_TOKEN }} - # Rust is being excluded from coverage in favor of DataFrame correctness checks - files: .coverage_output/python.xml + files: .coverage_output/python.xml .coverage_output/rust.xml diff --git a/.gitignore b/.gitignore index e03961849..698d96fd3 100644 --- a/.gitignore +++ b/.gitignore @@ -10,7 +10,7 @@ __pycache__/ .envrc .env.nu .coverage_output/ -.coverage +.coverage* *.csv *.egg-info/ wandb/ @@ -22,3 +22,4 @@ data/ **/*.json .claude/tasks/ .scratchpad/ + diff --git a/CLAUDE.md b/CLAUDE.md index 078cf0322..89ce0deb2 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -31,7 +31,7 @@ This is a collection of guidelines and references. - Use Polars for [Python](https://docs.pola.rs/api/python/stable/reference/index.html) and [Rust](https://docs.rs/polars/latest/polars/) dataframes - Use `typing` module `cast` function for `tinygrad` method outputs when necessary with union types -- Write `pytest` functions for Python tests +- Write `pytest` functions for Python tests using plain functions, not class-based test organization - Ensure Rust and Python automated test suites achieve at least 90% line or statement coverage per service or library - Exclude generated code, third-party code, tooling boilerplate, and anything explicitly excluded in this repository from test coverage calculations diff --git a/Cargo.lock b/Cargo.lock index 5ab4f73d0..61f73fcc4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -203,7 +203,7 @@ dependencies = [ "arrow-schema", "arrow-select", "atoi", - "base64", + "base64 0.22.1", "chrono", "comfy-table", "half", @@ -256,7 +256,7 @@ version = "56.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3aa9e59c611ebc291c28582077ef25c97f1975383f1479b12f3b9ffee2ffabe" dependencies = [ - "bitflags", + "bitflags 2.10.0", ] [[package]] @@ -604,9 +604,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "1.2.11" +version = "1.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52eec3db979d18cb807fc1070961cc51d87d069abe9ab57917769687368a8c6c" +checksum = "3cba48474f1d6807384d06fec085b909f5807e16653c5af5c45dfe89539f0b70" dependencies = [ "futures-util", "pin-project-lite", @@ -615,9 +615,9 @@ dependencies = [ [[package]] name = "aws-smithy-checksums" -version = "0.64.3" +version = "0.64.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddcf418858f9f3edd228acb8759d77394fed7531cce78d02bdda499025368439" +checksum = "a764fa7222922f6c0af8eea478b0ef1ba5ce1222af97e01f33ca5e957bd7f3b9" dependencies = [ "aws-smithy-http", "aws-smithy-types", @@ -636,9 +636,9 @@ dependencies = [ [[package]] name = "aws-smithy-eventstream" -version = "0.60.18" +version = "0.60.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35b9c7354a3b13c66f60fe4616d6d1969c9fd36b1b5333a5dfb3ee716b33c588" +checksum = "1c0b3e587fbaa5d7f7e870544508af8ce82ea47cd30376e69e1e37c4ac746f79" dependencies = [ "aws-smithy-types", "bytes", @@ -647,9 +647,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.63.3" +version = "0.63.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630e67f2a31094ffa51b210ae030855cb8f3b7ee1329bdd8d085aaf61e8b97fc" +checksum = "af4a8a5fe3e4ac7ee871237c340bbce13e982d37543b65700f4419e039f5d78e" dependencies = [ "aws-smithy-eventstream", "aws-smithy-runtime-api", @@ -669,9 +669,9 @@ dependencies = [ [[package]] name = "aws-smithy-http-client" -version = "1.1.9" +version = "1.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12fb0abf49ff0cab20fd31ac1215ed7ce0ea92286ba09e2854b42ba5cabe7525" +checksum = "0709f0083aa19b704132684bc26d3c868e06bd428ccc4373b0b55c3e8748a58b" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -699,27 +699,27 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.62.3" +version = "0.62.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cb96aa208d62ee94104645f7b2ecaf77bf27edf161590b6224bfbac2832f979" +checksum = "27b3a779093e18cad88bbae08dc4261e1d95018c4c5b9356a52bcae7c0b6e9bb" dependencies = [ "aws-smithy-types", ] [[package]] name = "aws-smithy-observability" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0a46543fbc94621080b3cf553eb4cbbdc41dd9780a30c4756400f0139440a1d" +checksum = "4d3f39d5bb871aaf461d59144557f16d5927a5248a983a40654d9cf3b9ba183b" dependencies = [ "aws-smithy-runtime-api", ] [[package]] name = "aws-smithy-query" -version = "0.60.13" +version = "0.60.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cebbddb6f3a5bd81553643e9c7daf3cc3dc5b0b5f398ac668630e8a84e6fff0" +checksum = "05f76a580e3d8f8961e5d48763214025a2af65c2fa4cd1fb7f270a0e107a71b0" dependencies = [ "aws-smithy-types", "urlencoding", @@ -727,9 +727,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.10.0" +version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3df87c14f0127a0d77eb261c3bc45d5b4833e2a1f63583ebfb728e4852134ee" +checksum = "8fd3dfc18c1ce097cf81fced7192731e63809829c6cbf933c1ec47452d08e1aa" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -752,9 +752,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.11.3" +version = "1.11.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49952c52f7eebb72ce2a754d3866cc0f87b97d2a46146b79f80f3a93fb2b3716" +checksum = "8c55e0837e9b8526f49e0b9bfa9ee18ddee70e853f5bc09c5d11ebceddcb0fec" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -769,9 +769,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.4.3" +version = "1.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b3a26048eeab0ddeba4b4f9d51654c79af8c3b32357dc5f336cee85ab331c33" +checksum = "576b0d6991c9c32bc14fc340582ef148311f924d41815f641a308b5d11e8e7cd" dependencies = [ "base64-simd", "bytes", @@ -795,9 +795,9 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.60.13" +version = "0.60.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11b2f670422ff42bf7065031e72b45bc52a3508bd089f743ea90731ca2b6ea57" +checksum = "b53543b4b86ed43f051644f704a98c7291b3618b67adf057ee77a366fa52fcaa" dependencies = [ "xmlparser", ] @@ -889,6 +889,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce" +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "base64" version = "0.22.1" @@ -931,6 +937,12 @@ dependencies = [ "virtue", ] +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.10.0" @@ -984,6 +996,56 @@ dependencies = [ "objc2", ] +[[package]] +name = "bollard" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97ccca1260af6a459d75994ad5acc1651bcabcbdbc41467cc9786519ab854c30" +dependencies = [ + "base64 0.22.1", + "bollard-stubs", + "bytes", + "futures-core", + "futures-util", + "hex", + "home", + "http 1.4.0", + "http-body-util", + "hyper 1.8.1", + "hyper-named-pipe", + "hyper-rustls 0.27.7", + "hyper-util", + "hyperlocal", + "log", + "pin-project-lite", + "rustls 0.23.36", + "rustls-native-certs", + "rustls-pemfile", + "rustls-pki-types", + "serde", + "serde_derive", + "serde_json", + "serde_repr", + "serde_urlencoded", + "thiserror 2.0.18", + "tokio", + "tokio-util", + "tower-service", + "url", + "winapi", +] + +[[package]] +name = "bollard-stubs" +version = "1.47.1-rc.27.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f179cfbddb6e77a5472703d4b30436bff32929c0aa8a9008ecf23d1d3cdd0da" +dependencies = [ + "serde", + "serde_repr", + "serde_with", +] + [[package]] name = "borsh" version = "1.6.0" @@ -1371,7 +1433,7 @@ version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df" dependencies = [ - "bitflags", + "bitflags 2.10.0", "crossterm_winapi", "libc", "parking_lot", @@ -1384,7 +1446,7 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "829d955a0bb380ef178a640b91779e3987da38c9aea133b20614cfed8cdea9c6" dependencies = [ - "bitflags", + "bitflags 2.10.0", "parking_lot", "rustix 0.38.44", ] @@ -1442,8 +1504,18 @@ version = "0.20.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.20.11", + "darling_macro 0.20.11", +] + +[[package]] +name = "darling" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0" +dependencies = [ + "darling_core 0.21.3", + "darling_macro 0.21.3", ] [[package]] @@ -1460,13 +1532,38 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "darling_core" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1247195ecd7e3c85f83c8d2a366e4210d588e802133e1e355180a9870b517ea4" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.114", +] + [[package]] name = "darling_macro" version = "0.20.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" dependencies = [ - "darling_core", + "darling_core 0.20.11", + "quote", + "syn 2.0.114", +] + +[[package]] +name = "darling_macro" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" +dependencies = [ + "darling_core 0.21.3", "quote", "syn 2.0.114", ] @@ -1491,7 +1588,8 @@ dependencies = [ "serde", "serde_json", "serial_test", - "tempfile", + "testcontainers", + "testcontainers-modules", "thiserror 2.0.18", "tokio", "tokio-test", @@ -1531,11 +1629,12 @@ dependencies = [ [[package]] name = "deranged" -version = "0.5.5" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ececcb659e7ba858fb4f10388c250a7252eb0a27373f1a72b8748afdd248e587" +checksum = "cc3dc5ad92c2e2d1c193bbbbdf2ea477cb81331de4f3103f267ca18368b988c4" dependencies = [ "powerfmt", + "serde_core", ] [[package]] @@ -1566,7 +1665,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89a09f22a6c6069a18470eb92d2298acf25463f14256d24778e1230d789a2aec" dependencies = [ - "bitflags", + "bitflags 2.10.0", "objc2", ] @@ -1581,6 +1680,17 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "docker_credential" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d89dfcba45b4afad7450a99b39e751590463e45c04728cf555d36bb66940de8" +dependencies = [ + "base64 0.21.7", + "serde", + "serde_json", +] + [[package]] name = "duckdb" version = "1.4.4" @@ -1675,6 +1785,17 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "etcetera" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" +dependencies = [ + "cfg-if", + "home", + "windows-sys 0.48.0", +] + [[package]] name = "ethnum" version = "1.5.2" @@ -2023,7 +2144,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap", + "indexmap 2.13.0", "slab", "tokio", "tokio-util", @@ -2042,7 +2163,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.4.0", - "indexmap", + "indexmap 2.13.0", "slab", "tokio", "tokio-util", @@ -2274,6 +2395,21 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-named-pipe" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278" +dependencies = [ + "hex", + "hyper 1.8.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", + "winapi", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -2329,7 +2465,7 @@ version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "futures-channel", "futures-util", @@ -2348,6 +2484,21 @@ dependencies = [ "windows-registry", ] +[[package]] +name = "hyperlocal" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "986c5ce3b994526b3cd75578e62554abd09f0899d6206de48b3e96ab34ccc8c7" +dependencies = [ + "hex", + "http-body-util", + "hyper 1.8.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "iana-time-zone" version = "0.1.65" @@ -2486,6 +2637,17 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", + "serde", +] + [[package]] name = "indexmap" version = "2.13.0" @@ -2641,6 +2803,7 @@ version = "1.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d78bacb8933586cee3b550c39b610d314f9b7a48701ac7a914a046165a4ad8da" dependencies = [ + "cc", "flate2", "pkg-config", "reqwest", @@ -2663,7 +2826,7 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d0b95e02c851351f877147b7deea7b1afb1df71b63aa5f8270716e0c5720616" dependencies = [ - "bitflags", + "bitflags 2.10.0", "libc", "redox_syscall 0.7.0", ] @@ -2850,7 +3013,7 @@ version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" dependencies = [ - "bitflags", + "bitflags 2.10.0", "cfg-if", "cfg_aliases", "libc", @@ -2969,7 +3132,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73ad74d880bb43877038da939b7427bba67e9dd42004a18b809ba7d87cee241c" dependencies = [ - "bitflags", + "bitflags 2.10.0", "objc2", "objc2-foundation", ] @@ -2990,7 +3153,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536" dependencies = [ - "bitflags", + "bitflags 2.10.0", "dispatch2", "objc2", ] @@ -3001,7 +3164,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e022c9d066895efa1345f8e33e584b9f958da2fd4cd116792e15e07e4720a807" dependencies = [ - "bitflags", + "bitflags 2.10.0", "dispatch2", "objc2", "objc2-core-foundation", @@ -3034,7 +3197,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cde0dfb48d25d2b4862161a4d5fcc0e3c24367869ad306b0c9ec0073bfed92d" dependencies = [ - "bitflags", + "bitflags 2.10.0", "objc2", "objc2-core-foundation", "objc2-core-graphics", @@ -3052,7 +3215,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3e0adef53c21f888deb4fa59fc59f7eb17404926ee8a6f59f5df0fd7f9f3272" dependencies = [ - "bitflags", + "bitflags 2.10.0", "block2", "libc", "objc2", @@ -3065,7 +3228,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "180788110936d59bab6bd83b6060ffdfffb3b922ba1396b312ae795e1de9d81d" dependencies = [ - "bitflags", + "bitflags 2.10.0", "objc2", "objc2-core-foundation", ] @@ -3076,7 +3239,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96c1358452b371bf9f104e21ec536d37a650eb10f7ee379fff67d2e08d537f1f" dependencies = [ - "bitflags", + "bitflags 2.10.0", "objc2", "objc2-core-foundation", "objc2-foundation", @@ -3088,7 +3251,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d87d638e33c06f577498cbcc50491496a3ed4246998a7fbba7ccb98b1e7eab22" dependencies = [ - "bitflags", + "bitflags 2.10.0", "block2", "objc2", "objc2-cloud-kit", @@ -3129,7 +3292,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fbfbfff40aeccab00ec8a910b57ca8ecf4319b335c542f2edcd19dd25a1e2a00" dependencies = [ "async-trait", - "base64", + "base64 0.22.1", "bytes", "chrono", "form_urlencoded", @@ -3169,7 +3332,7 @@ version = "0.10.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08838db121398ad17ab8531ce9de97b244589089e290a384c900cb9ff7434328" dependencies = [ - "bitflags", + "bitflags 2.10.0", "cfg-if", "foreign-types", "libc", @@ -3275,6 +3438,31 @@ dependencies = [ "windows-link", ] +[[package]] +name = "parse-display" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "914a1c2265c98e2446911282c6ac86d8524f495792c38c5bd884f80499c7538a" +dependencies = [ + "parse-display-derive", + "regex", + "regex-syntax", +] + +[[package]] +name = "parse-display-derive" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ae7800a4c974efd12df917266338e79a7a74415173caf7e70aa0a0707345281" +dependencies = [ + "proc-macro2", + "quote", + "regex", + "regex-syntax", + "structmeta", + "syn 2.0.114", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -3385,7 +3573,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32b4fed2343961b3eea3db2cee165540c3e1ad9d5782350cc55a9e76cf440148" dependencies = [ "atoi_simd", - "bitflags", + "bitflags 2.10.0", "bytemuck", "chrono", "chrono-tz", @@ -3452,7 +3640,7 @@ version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e77b1f08ef6dbb032bb1d0d3365464be950df9905f6827a95b24c4ca5518901d" dependencies = [ - "bitflags", + "bitflags 2.10.0", "boxcar", "bytemuck", "chrono", @@ -3460,7 +3648,7 @@ dependencies = [ "comfy-table", "either", "hashbrown 0.15.5", - "indexmap", + "indexmap 2.13.0", "itoa", "num-traits", "polars-arrow", @@ -3517,7 +3705,7 @@ version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "343931b818cf136349135ba11dbc18c27683b52c3477b1ba8ca606cf5ab1965c" dependencies = [ - "bitflags", + "bitflags 2.10.0", "hashbrown 0.15.5", "num-traits", "polars-arrow", @@ -3587,7 +3775,7 @@ dependencies = [ "chrono", "fallible-streaming-iterator", "hashbrown 0.15.5", - "indexmap", + "indexmap 2.13.0", "itoa", "num-traits", "polars-arrow", @@ -3605,7 +3793,7 @@ version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fb6e2c6c2fa4ea0c660df1c06cf56960c81e7c2683877995bae3d4e3d408147" dependencies = [ - "bitflags", + "bitflags 2.10.0", "chrono", "either", "memchr", @@ -3655,14 +3843,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acf6062173fdc9ba05775548beb66e76643a148d9aeadc9984ed712bc4babd76" dependencies = [ "argminmax", - "base64", + "base64 0.22.1", "bytemuck", "chrono", "chrono-tz", "either", "hashbrown 0.15.5", "hex", - "indexmap", + "indexmap 2.13.0", "jsonpath_lib_polars_vendor", "libm", "memchr", @@ -3691,7 +3879,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cc1d769180dec070df0dc4b89299b364bf2cfe32b218ecc4ddd8f1a49ae60669" dependencies = [ "async-stream", - "base64", + "base64 0.22.1", "brotli", "bytemuck", "ethnum", @@ -3728,7 +3916,7 @@ version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cd3a2e33ae4484fe407ab2d2ba5684f0889d1ccf3ad6b844103c03638e6d0a0" dependencies = [ - "bitflags", + "bitflags 2.10.0", "bytemuck", "bytes", "chrono", @@ -3763,7 +3951,7 @@ version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18734f17e0e348724df3ae65f3ee744c681117c04b041cac969dfceb05edabc0" dependencies = [ - "bitflags", + "bitflags 2.10.0", "bytemuck", "polars-arrow", "polars-compute", @@ -3778,7 +3966,7 @@ version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e6c1ab13e04d5167661a9854ed1ea0482b2ed9b8a0f1118dabed7cd994a85e3" dependencies = [ - "indexmap", + "indexmap 2.13.0", "polars-error", "polars-utils", "serde", @@ -3791,7 +3979,7 @@ version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4e7766da02cc1d464994404d3e88a7a0ccd4933df3627c325480fbd9bbc0a11" dependencies = [ - "bitflags", + "bitflags 2.10.0", "hex", "polars-core", "polars-error", @@ -3815,7 +4003,7 @@ dependencies = [ "async-channel", "async-trait", "atomic-waker", - "bitflags", + "bitflags 2.10.0", "crossbeam-channel", "crossbeam-deque", "crossbeam-queue", @@ -3881,7 +4069,7 @@ dependencies = [ "flate2", "foldhash 0.1.5", "hashbrown 0.15.5", - "indexmap", + "indexmap 2.13.0", "libc", "memmap2", "num-traits", @@ -4176,7 +4364,7 @@ version = "11.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" dependencies = [ - "bitflags", + "bitflags 2.10.0", ] [[package]] @@ -4219,13 +4407,22 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "redox_syscall" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags", + "bitflags 2.10.0", ] [[package]] @@ -4234,7 +4431,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f3fe0889e69e2ae9e41f4d6c4c0181701d00e4697b356fb1f74173a5e0ee27" dependencies = [ - "bitflags", + "bitflags 2.10.0", ] [[package]] @@ -4307,7 +4504,7 @@ version = "0.12.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "encoding_rs", "futures-channel", @@ -4466,7 +4663,7 @@ version = "0.38.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" dependencies = [ - "bitflags", + "bitflags 2.10.0", "errno", "libc", "linux-raw-sys 0.4.15", @@ -4479,7 +4676,7 @@ version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" dependencies = [ - "bitflags", + "bitflags 2.10.0", "errno", "libc", "linux-raw-sys 0.11.0", @@ -4526,6 +4723,15 @@ dependencies = [ "security-framework 3.5.1", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "rustls-pki-types" version = "1.14.0" @@ -4606,6 +4812,30 @@ dependencies = [ "parking_lot", ] +[[package]] +name = "schemars" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd191f9397d57d581cddd31014772520aa448f65ef991055d7f61582c65165f" +dependencies = [ + "dyn-clone", + "ref-cast", + "serde", + "serde_json", +] + +[[package]] +name = "schemars" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc" +dependencies = [ + "dyn-clone", + "ref-cast", + "serde", + "serde_json", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -4654,7 +4884,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags", + "bitflags 2.10.0", "core-foundation 0.9.4", "core-foundation-sys", "libc", @@ -4667,7 +4897,7 @@ version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3297343eaf830f66ede390ea39da1d462b6b0c1b000f420d0a83f898bbbe6ef" dependencies = [ - "bitflags", + "bitflags 2.10.0", "core-foundation 0.10.1", "core-foundation-sys", "libc", @@ -4850,7 +5080,7 @@ version = "1.0.149" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" dependencies = [ - "indexmap", + "indexmap 2.13.0", "itoa", "memchr", "serde", @@ -4869,6 +5099,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_repr" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "serde_stacker" version = "0.1.14" @@ -4892,6 +5133,37 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fa237f2807440d238e0364a218270b98f767a00d3dada77b1c53ae88940e2e7" +dependencies = [ + "base64 0.22.1", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.13.0", + "schemars 0.9.0", + "schemars 1.2.1", + "serde_core", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52a8e3ca0ca629121f70ab50f95249e5a6f925cc0f6ffe8256c45b728875706c" +dependencies = [ + "darling 0.21.3", + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "serial_test" version = "3.3.1" @@ -5160,6 +5432,29 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "structmeta" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e1575d8d40908d70f6fd05537266b90ae71b15dbbe7a8b7dffa2b759306d329" +dependencies = [ + "proc-macro2", + "quote", + "structmeta-derive", + "syn 2.0.114", +] + +[[package]] +name = "structmeta-derive" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "152a0b65a590ff6c3da95cabe2353ee04e6167c896b28e3b14478c2636c922fc" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "strum" version = "0.26.3" @@ -5254,7 +5549,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a13f3d0daba03132c0aa9767f98351b3488edc2c100cda2d2ec2b04f3d8d3c8b" dependencies = [ - "bitflags", + "bitflags 2.10.0", "core-foundation 0.9.4", "system-configuration-sys", ] @@ -5299,6 +5594,44 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "testcontainers" +version = "0.23.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59a4f01f39bb10fc2a5ab23eb0d888b1e2bb168c157f61a1b98e6c501c639c74" +dependencies = [ + "async-trait", + "bollard", + "bollard-stubs", + "bytes", + "docker_credential", + "either", + "etcetera", + "futures", + "log", + "memchr", + "parse-display", + "pin-project-lite", + "serde", + "serde_json", + "serde_with", + "thiserror 2.0.18", + "tokio", + "tokio-stream", + "tokio-tar", + "tokio-util", + "url", +] + +[[package]] +name = "testcontainers-modules" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d43ed4e8f58424c3a2c6c56dbea6643c3c23e8666a34df13c54f0a184e6c707" +dependencies = [ + "testcontainers", +] + [[package]] name = "thiserror" version = "1.0.69" @@ -5482,6 +5815,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tar" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5714c010ca3e5c27114c1cdeb9d14641ace49874aa5626d7149e47aedace75" +dependencies = [ + "filetime", + "futures-core", + "libc", + "redox_syscall 0.3.5", + "tokio", + "tokio-stream", + "xattr", +] + [[package]] name = "tokio-test" version = "0.4.5" @@ -5522,7 +5870,7 @@ version = "0.23.10+spec-1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "84c8b9f757e028cee9fa244aea147aab2a9ec09d5325a9b01e0a49730c2b5269" dependencies = [ - "indexmap", + "indexmap 2.13.0", "toml_datetime", "toml_parser", "winnow", @@ -5530,9 +5878,9 @@ dependencies = [ [[package]] name = "toml_parser" -version = "1.0.6+spec-1.1.0" +version = "1.0.7+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3198b4b0a8e11f09dd03e133c0280504d0801269e9afa46362ffde1cbeebf44" +checksum = "247eaa3197818b831697600aadf81514e577e0cba5eab10f7e064e78ae154df1" dependencies = [ "winnow", ] @@ -5559,7 +5907,7 @@ version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ - "bitflags", + "bitflags 2.10.0", "bytes", "futures-util", "http 1.4.0", @@ -5727,7 +6075,7 @@ version = "2.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "02d1a66277ed75f640d608235660df48c8e3c19f3b4edb6a263315626cc3c01d" dependencies = [ - "base64", + "base64 0.22.1", "log", "native-tls", "once_cell", @@ -5796,7 +6144,7 @@ version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7df16e474ef958526d1205f6dda359fdfab79d9aa6d54bafcb92dcd07673dca" dependencies = [ - "darling", + "darling 0.20.11", "once_cell", "proc-macro-error2", "proc-macro2", @@ -5965,7 +6313,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" dependencies = [ "anyhow", - "indexmap", + "indexmap 2.13.0", "wasm-encoder", "wasmparser", ] @@ -5989,9 +6337,9 @@ version = "0.244.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ - "bitflags", + "bitflags 2.10.0", "hashbrown 0.15.5", - "indexmap", + "indexmap 2.13.0", "semver", ] @@ -6134,6 +6482,15 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -6170,6 +6527,21 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + [[package]] name = "windows-targets" version = "0.52.6" @@ -6203,6 +6575,12 @@ dependencies = [ "windows_x86_64_msvc 0.53.1", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -6215,6 +6593,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -6227,6 +6611,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -6251,6 +6641,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -6263,6 +6659,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -6275,6 +6677,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -6287,6 +6695,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" @@ -6336,7 +6750,7 @@ checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" dependencies = [ "anyhow", "heck", - "indexmap", + "indexmap 2.13.0", "prettyplease", "syn 2.0.114", "wasm-metadata", @@ -6366,8 +6780,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" dependencies = [ "anyhow", - "bitflags", - "indexmap", + "bitflags 2.10.0", + "indexmap 2.13.0", "log", "serde", "serde_derive", @@ -6386,7 +6800,7 @@ checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" dependencies = [ "anyhow", "id-arena", - "indexmap", + "indexmap 2.13.0", "log", "semver", "serde", @@ -6545,7 +6959,7 @@ dependencies = [ "arbitrary", "crc32fast", "flate2", - "indexmap", + "indexmap 2.13.0", "memchr", "zopfli", ] diff --git a/applications/datamanager/Cargo.toml b/applications/datamanager/Cargo.toml index 145cb9fcc..1015a871f 100644 --- a/applications/datamanager/Cargo.toml +++ b/applications/datamanager/Cargo.toml @@ -34,7 +34,7 @@ tracing-subscriber = { version = "0.3.20", features = ["env-filter", "fmt"] } aws-config = "1.5" aws-sdk-s3 = "1.112" aws-credential-types = "1.2.6" -duckdb = { version = "1.4.3", features = ["r2d2", "chrono"] } +duckdb = { version = "1.4.3", features = ["bundled", "r2d2", "chrono"] } validator = { version = "0.20", features = ["derive"] } thiserror = "2.0.3" sentry = { version = "0.35", features = ["tracing", "reqwest", "rustls"] } @@ -47,5 +47,6 @@ hyper = { version = "1.0", features = ["full"] } http-body-util = "0.1" reqwest = { version = "0.12", features = ["json"] } mockito = "1.7" -tempfile = "3.24" serial_test = "3.3" +testcontainers = "0.23" +testcontainers-modules = { version = "0.11", features = ["localstack"] } diff --git a/applications/datamanager/src/data.rs b/applications/datamanager/src/data.rs index 7b59ebcb3..452a7d086 100644 --- a/applications/datamanager/src/data.rs +++ b/applications/datamanager/src/data.rs @@ -33,11 +33,7 @@ pub fn create_equity_bar_dataframe(equity_bars_rows: Vec) -> Result equity_bars_rows.iter().map(|b| b.volume).collect::>(), "volume_weighted_average_price" => equity_bars_rows.iter().map(|b| b.volume_weighted_average_price).collect::>(), "transactions" => equity_bars_rows.iter().map(|b| b.transactions).collect::>(), - ) - .map_err(|e| { - warn!("Failed to create equity bar DataFrame: {}", e); - Error::Other(format!("Failed to create DataFrame: {}", e)) - })?; + )?; debug!("Normalizing ticker column to uppercase"); let equity_bars_dataframe = equity_bars_dataframe @@ -75,11 +71,7 @@ pub fn create_predictions_dataframe(prediction_rows: Vec) -> Result< "quantile_10" => prediction_rows.iter().map(|p| p.quantile_10).collect::>(), "quantile_50" => prediction_rows.iter().map(|p| p.quantile_50).collect::>(), "quantile_90" => prediction_rows.iter().map(|p| p.quantile_90).collect::>(), - ) - .map_err(|e| { - warn!("Failed to create predictions DataFrame: {}", e); - Error::Other(format!("Failed to create DataFrame: {}", e)) - })?; + )?; debug!("Normalizing ticker column to uppercase"); let unfiltered_prediction_dataframe = prediction_dataframe @@ -141,11 +133,7 @@ pub fn create_portfolio_dataframe(portfolio_rows: Vec) -> Result portfolio_rows.iter().map(|p| p.side.as_str()).collect::>(), "dollar_amount" => portfolio_rows.iter().map(|p| p.dollar_amount).collect::>(), "action" => portfolio_rows.iter().map(|p| p.action.as_str()).collect::>(), - ) - .map_err(|e| { - warn!("Failed to create portfolio DataFrame: {}", e); - Error::Other(format!("Failed to create DataFrame: {}", e)) - })?; + )?; debug!("Normalizing ticker, side, and action columns to uppercase"); let portfolio_dataframe = portfolio_dataframe @@ -201,10 +189,7 @@ pub fn create_equity_details_dataframe(csv_content: String) -> Result Result, ) -> impl IntoResponse { info!("Sync date: {}", payload.date); + + let massive_api_key = state.massive.key.clone(); + let date = payload.date.format("%Y-%m-%d").to_string(); let url = format!( "{}/v2/aggs/grouped/locale/us/market/stocks/{}", @@ -134,7 +137,7 @@ pub async fn sync( .http_client .get(&url) .header("accept", "application/json") - .query(&[("adjusted", "true"), ("apiKey", state.massive.key.as_str())]) + .query(&[("adjusted", "true"), ("apiKey", massive_api_key.as_str())]) .send() .await { diff --git a/applications/datamanager/src/equity_details.rs b/applications/datamanager/src/equity_details.rs index 5dc8ceafc..bcf9faec6 100644 --- a/applications/datamanager/src/equity_details.rs +++ b/applications/datamanager/src/equity_details.rs @@ -15,34 +15,36 @@ pub async fn get(AxumState(state): AxumState) -> impl IntoResponse { Ok(dataframe) => { let mut buffer = Vec::new(); let mut writer = CsvWriter::new(&mut buffer); - match writer.finish(&mut dataframe.clone()) { - Ok(_) => { - let csv_content = String::from_utf8(buffer).unwrap_or_else(|_| { - info!("Failed to convert CSV buffer to UTF-8"); - String::new() - }); - - let mut response = csv_content.into_response(); - response.headers_mut().insert( - header::CONTENT_TYPE, - "text/csv; charset=utf-8".parse().unwrap_or_else(|_| { - info!("Failed to set Content-Type header"); - header::HeaderValue::from_static("text/csv") - }), - ); - *response.status_mut() = StatusCode::OK; - response - } + Ok(_) => {} Err(err) => { - info!("Failed to write DataFrame as CSV: {}", err); - ( + info!("Failed to write CSV: {}", err); + return ( StatusCode::INTERNAL_SERVER_ERROR, - format!("Failed to convert DataFrame to CSV: {}", err), + format!("Failed to write CSV: {}", err), ) - .into_response() + .into_response(); } } + + let csv_content = match String::from_utf8(buffer) { + Ok(content) => content, + Err(err) => { + info!("Failed to convert CSV to UTF-8: {}", err); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to convert CSV to UTF-8: {}", err), + ) + .into_response(); + } + }; + let mut response = csv_content.into_response(); + response.headers_mut().insert( + header::CONTENT_TYPE, + header::HeaderValue::from_static("text/csv; charset=utf-8"), + ); + *response.status_mut() = StatusCode::OK; + response } Err(err) => { info!("Failed to fetch equity details from S3: {}", err); diff --git a/applications/datamanager/src/lib.rs b/applications/datamanager/src/lib.rs index c7e2e1235..e84f0c2e9 100644 --- a/applications/datamanager/src/lib.rs +++ b/applications/datamanager/src/lib.rs @@ -6,5 +6,6 @@ pub mod health; pub mod portfolios; pub mod predictions; pub mod router; +pub mod startup; pub mod state; pub mod storage; diff --git a/applications/datamanager/src/main.rs b/applications/datamanager/src/main.rs index b796daf2c..43b474a38 100644 --- a/applications/datamanager/src/main.rs +++ b/applications/datamanager/src/main.rs @@ -1,47 +1,61 @@ -use datamanager::router::create_app; -use std::env; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use datamanager::startup::{initialize_sentry, initialize_tracing, run_server}; + +async fn run_with_bind_address(bind_address: &str) -> i32 { + let _sentry_guard = initialize_sentry(); + initialize_tracing().expect("Failed to initialize tracing"); + + handle_server_result(run_server(bind_address).await) +} + +fn handle_server_result(server_result: Result<(), std::io::Error>) -> i32 { + match server_result { + Ok(_) => 0, + Err(error) => { + tracing::error!("Server error: {}", error); + 1 + } + } +} #[tokio::main] async fn main() { - let _sentry_guard = sentry::init(( - env::var("SENTRY_DSN").unwrap_or_default(), - sentry::ClientOptions { - release: sentry::release_name!(), - environment: Some( - env::var("ENVIRONMENT") - .unwrap_or_else(|_| "development".to_string()) - .into(), - ), - traces_sample_rate: 1.0, - ..Default::default() - }, - )); - - tracing_subscriber::registry() - .with( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "datamanager=debug,tower_http=debug,axum=debug".into()), - ) - .with(tracing_subscriber::fmt::layer()) - .with( - sentry::integrations::tracing::layer().event_filter(|metadata| { - use sentry::integrations::tracing::EventFilter; - match metadata.level() { - &tracing::Level::ERROR | &tracing::Level::WARN => EventFilter::Event, - _ => EventFilter::Breadcrumb, - } - }), - ) - .init(); - - tracing::info!("Starting datamanager service"); - - let app = create_app().await; - let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await.unwrap(); - - if let Err(e) = axum::serve(listener, app).await { - tracing::error!("Server error: {}", e); - std::process::exit(1); + let exit_code = run_with_bind_address("0.0.0.0:8080").await; + + if exit_code != 0 { + std::process::exit(exit_code); + } +} + +#[cfg(test)] +mod tests { + use super::{handle_server_result, run_with_bind_address}; + use serial_test::serial; + + #[test] + fn test_handle_server_result_success() { + assert_eq!(handle_server_result(Ok(())), 0); + } + + #[test] + fn test_handle_server_result_error() { + let error = std::io::Error::new(std::io::ErrorKind::AddrNotAvailable, "bind failed"); + assert_eq!(handle_server_result(Err(error)), 1); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + #[serial] + async fn test_run_with_bind_address_returns_error_code_for_invalid_bind_address() { + // SAFETY: Environment variable mutation is safe here because: + // 1. Test is marked with #[serial] to prevent concurrent execution + // 2. Env vars are set synchronously before spawning async tasks + unsafe { + std::env::set_var("AWS_S3_DATA_BUCKET_NAME", "test-bucket"); + std::env::set_var("MASSIVE_BASE_URL", "http://test"); + std::env::set_var("MASSIVE_API_KEY", "test-key"); + } + + let exit_code = run_with_bind_address("invalid-address").await; + + assert_eq!(exit_code, 1); } } diff --git a/applications/datamanager/src/startup.rs b/applications/datamanager/src/startup.rs new file mode 100644 index 000000000..83e15b855 --- /dev/null +++ b/applications/datamanager/src/startup.rs @@ -0,0 +1,197 @@ +use crate::router::create_app; +use axum::Router; +use std::env; +use tokio::net::TcpListener; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +pub fn initialize_sentry() -> sentry::ClientInitGuard { + sentry::init(( + env::var("SENTRY_DSN").unwrap_or_default(), + sentry::ClientOptions { + release: sentry::release_name!(), + environment: Some( + env::var("ENVIRONMENT") + .unwrap_or_else(|_| "development".to_string()) + .into(), + ), + traces_sample_rate: 1.0, + ..Default::default() + }, + )) +} + +pub fn initialize_tracing() -> Result<(), Box> { + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "datamanager=debug,tower_http=debug,axum=debug".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .with( + sentry::integrations::tracing::layer().event_filter(|metadata| { + use sentry::integrations::tracing::EventFilter; + match metadata.level() { + &tracing::Level::ERROR | &tracing::Level::WARN => EventFilter::Event, + _ => EventFilter::Breadcrumb, + } + }), + ) + .try_init()?; + Ok(()) +} + +pub async fn serve_app(listener: TcpListener, app: Router) -> std::io::Result<()> { + axum::serve(listener, app).await +} + +pub async fn run_server(bind_address: &str) -> std::io::Result<()> { + tracing::info!("Starting datamanager service"); + + let app = create_app().await; + let listener = TcpListener::bind(bind_address).await?; + + serve_app(listener, app).await +} + +#[cfg(test)] +mod tests { + use super::{initialize_sentry, initialize_tracing, run_server, serve_app}; + use aws_credential_types::Credentials; + use aws_sdk_s3::config::Region; + use reqwest::StatusCode; + use serial_test::serial; + use std::time::Duration; + + use crate::{ + router::create_app_with_state, + state::{MassiveSecrets, State}, + }; + + struct EnvironmentVariableGuard { + name: String, + original_value: Option, + } + + impl EnvironmentVariableGuard { + fn set(name: &str, value: &str) -> Self { + let original_value = std::env::var(name).ok(); + // SAFETY: Environment variable mutation is safe here because: + // 1. Tests using this guard are marked with #[serial] to prevent concurrent execution + // 2. Env vars are set synchronously before spawning async tasks + // 3. The Drop implementation ensures cleanup when guard goes out of scope + unsafe { + std::env::set_var(name, value); + } + + Self { + name: name.to_string(), + original_value, + } + } + } + + impl Drop for EnvironmentVariableGuard { + fn drop(&mut self) { + match self.original_value.as_ref() { + Some(value) => { + // SAFETY: See set() method - protected by #[serial] annotation + unsafe { + std::env::set_var(&self.name, value); + } + } + None => { + // SAFETY: See set() method - protected by #[serial] annotation + unsafe { + std::env::remove_var(&self.name); + } + } + } + } + } + + async fn create_test_state() -> State { + let credentials = + Credentials::new("test-access-key", "test-secret-key", None, None, "tests"); + + let shared_config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region(Region::new("us-east-1")) + .credentials_provider(credentials) + .endpoint_url("http://127.0.0.1:9") + .load() + .await; + + let s3_config = aws_sdk_s3::config::Builder::from(&shared_config) + .force_path_style(true) + .build(); + + let s3_client = aws_sdk_s3::Client::from_conf(s3_config); + + State::new( + reqwest::Client::new(), + MassiveSecrets { + base: "http://127.0.0.1:1".to_string(), + key: "test-api-key".to_string(), + }, + s3_client, + "test-bucket".to_string(), + ) + } + + #[test] + #[serial] + fn test_initialize_observability_functions() { + let _environment_guard = EnvironmentVariableGuard::set("ENVIRONMENT", "test"); + let _sentry_guard = initialize_sentry(); + let _ = initialize_tracing(); + let _ = initialize_tracing(); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + #[serial] + async fn test_serve_app_responds_on_health_route() { + let app = create_app_with_state(create_test_state().await); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let address = listener.local_addr().unwrap(); + + let server_task = tokio::spawn(async move { serve_app(listener, app).await }); + + let client = reqwest::Client::new(); + let health_url = format!("http://{}/health", address); + + let mut healthy = false; + for _ in 0..20 { + if let Ok(response) = client.get(&health_url).send().await { + if response.status() == StatusCode::OK { + healthy = true; + break; + } + } + + tokio::time::sleep(Duration::from_millis(50)).await; + } + + server_task.abort(); + let _ = server_task.await; + + assert!(healthy); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + #[serial] + async fn test_run_server_returns_error_for_invalid_bind_address() { + let _region_guard = EnvironmentVariableGuard::set("AWS_REGION", "us-east-1"); + let _access_key_guard = + EnvironmentVariableGuard::set("AWS_ACCESS_KEY_ID", "test-access-key"); + let _secret_key_guard = + EnvironmentVariableGuard::set("AWS_SECRET_ACCESS_KEY", "test-secret-key"); + let _metadata_guard = EnvironmentVariableGuard::set("AWS_EC2_METADATA_DISABLED", "true"); + let _bucket_guard = EnvironmentVariableGuard::set("AWS_S3_DATA_BUCKET_NAME", "test-bucket"); + let _massive_base_guard = + EnvironmentVariableGuard::set("MASSIVE_BASE_URL", "http://127.0.0.1:1"); + let _massive_key_guard = EnvironmentVariableGuard::set("MASSIVE_API_KEY", "test-api-key"); + + let result = run_server("invalid-address").await; + + assert!(result.is_err()); + } +} diff --git a/applications/datamanager/src/state.rs b/applications/datamanager/src/state.rs index 953c98134..c422623fc 100644 --- a/applications/datamanager/src/state.rs +++ b/applications/datamanager/src/state.rs @@ -1,6 +1,6 @@ use aws_sdk_s3::Client as S3Client; use reqwest::Client as HTTPClient; -use tracing::{debug, error, info, warn}; +use tracing::{debug, info}; #[derive(Clone)] pub struct MassiveSecrets { @@ -37,51 +37,23 @@ impl State { let s3_client = S3Client::new(&config); - let bucket_name = match std::env::var("AWS_S3_DATA_BUCKET_NAME") { - Ok(name) => { - info!("Using S3 bucket from environment: {}", name); - name - } - Err(_) => { - let default_bucket = "oscm-data".to_string(); - error!( - "AWS_S3_DATA_BUCKET_NAME not set, using default: {}", - default_bucket - ); - default_bucket - } - }; + let bucket_name = + std::env::var("AWS_S3_DATA_BUCKET_NAME").unwrap_or_else(|_| "oscm-data".to_string()); + info!("Using S3 bucket: {}", bucket_name); - let massive_base = match std::env::var("MASSIVE_BASE_URL") { - Ok(url) => { - info!("Using Massive API base URL from environment: {}", url); - url - } - Err(_) => { - let default_url = "https://api.massive.io".to_string(); - error!("MASSIVE_BASE_URL not set, using default: {}", default_url); - default_url - } - }; + let massive_base_url = std::env::var("MASSIVE_BASE_URL") + .unwrap_or_else(|_| "https://api.massive.io".to_string()); + info!("Using Massive API base URL: {}", massive_base_url); - let massive_key = match std::env::var("MASSIVE_API_KEY") { - Ok(key) => { - debug!("MASSIVE_API_KEY loaded (length: {} chars)", key.len()); - key - } - Err(_) => { - warn!("MASSIVE_API_KEY not set - equity bar sync will not work"); - String::new() - } - }; + let massive_api_key = std::env::var("MASSIVE_API_KEY").unwrap_or_else(|_| String::new()); info!("Application state initialized successfully"); Self { http_client, massive: MassiveSecrets { - base: massive_base, - key: massive_key, + base: massive_base_url, + key: massive_api_key, }, s3_client, bucket_name, diff --git a/applications/datamanager/src/storage.rs b/applications/datamanager/src/storage.rs index c44a260dc..31886d76c 100644 --- a/applications/datamanager/src/storage.rs +++ b/applications/datamanager/src/storage.rs @@ -57,19 +57,42 @@ pub fn format_s3_key(timestamp: &DateTime, dataframe_type: &str) -> String } pub fn date_to_int(timestamp: &DateTime) -> Result { - let formatted = timestamp.format("%Y%m%d").to_string(); - formatted.parse::().map_err(|parse_error| { - Error::Other(format!( - "Failed to parse formatted date '{}' to i32: {}", - formatted, parse_error - )) - }) + timestamp + .format("%Y%m%d") + .to_string() + .parse::() + .map_err(|e| Error::Other(format!("Failed to convert date to integer: {}", e))) } pub fn escape_sql_ticker(ticker: &str) -> String { ticker.replace('\'', "''") } +pub fn sanitize_duckdb_config_value(value: &str) -> Result { + if value.is_empty() { + return Err(Error::Other("Configuration value cannot be empty".into())); + } + + // Reject SQL metacharacters + if value.contains('\'') || value.contains('"') || value.contains(';') || value.contains("--") { + let message = format!( + "Invalid configuration value contains SQL metacharacters: {}", + value + ); + error!("{}", message); + return Err(Error::Other(message)); + } + + // Reasonable length limit + if value.len() > 512 { + let message = format!("Configuration value too long: {} characters", value.len()); + error!("{}", message); + return Err(Error::Other(message)); + } + + Ok(value.to_string()) +} + async fn write_dataframe_to_s3( state: &State, dataframe: &DataFrame, @@ -152,22 +175,43 @@ async fn create_duckdb_connection() -> Result { ); let session_token = credentials.session_token().unwrap_or_default(); - let s3_config = format!( - " - SET s3_region='{}'; - SET s3_url_style='path'; - SET s3_access_key_id='{}'; - SET s3_secret_access_key='{}'; - SET s3_session_token='{}'; - ", - region, - credentials.access_key_id(), - credentials.secret_access_key(), - session_token - ); + + let mut s3_configuration_statements = vec![ + format!("SET s3_region='{}';", region), + "SET s3_url_style='path';".to_string(), + format!("SET s3_access_key_id='{}';", credentials.access_key_id()), + format!( + "SET s3_secret_access_key='{}';", + credentials.secret_access_key() + ), + format!("SET s3_session_token='{}';", session_token), + ]; + + if let Ok(duckdb_s3_endpoint) = std::env::var("DUCKDB_S3_ENDPOINT") { + debug!("Configuring DuckDB with custom S3 endpoint"); + let sanitized_endpoint = sanitize_duckdb_config_value(&duckdb_s3_endpoint)?; + s3_configuration_statements.push(format!("SET s3_endpoint='{}';", sanitized_endpoint)); + + let duckdb_s3_use_ssl = std::env::var("DUCKDB_S3_USE_SSL") + .unwrap_or_else(|_| "true".to_string()) + .to_lowercase(); + + if duckdb_s3_use_ssl != "true" && duckdb_s3_use_ssl != "false" { + let message = format!( + "Invalid DUCKDB_S3_USE_SSL: must be 'true' or 'false', got '{}'", + duckdb_s3_use_ssl + ); + error!("{}", message); + return Err(Error::Other(message)); + } + + s3_configuration_statements.push(format!("SET s3_use_ssl={};", duckdb_s3_use_ssl)); + } + + let s3_configuration_sql = s3_configuration_statements.join("\n"); debug!("Configuring DuckDB S3 settings"); - connection.execute_batch(&s3_config)?; + connection.execute_batch(&s3_configuration_sql)?; info!("DuckDB connection established with S3 access"); Ok(connection) @@ -183,7 +227,23 @@ pub async fn query_equity_bars_parquet_from_s3( let (start_timestamp, end_timestamp) = match (start_timestamp, end_timestamp) { (Some(start), Some(end)) => (start, end), - _ => { + (Some(start), None) => { + let end_date = chrono::Utc::now(); + info!( + "No end date specified, defaulting to now: {} to {}", + start, end_date + ); + (start, end_date) + } + (None, Some(end)) => { + let start_date = end - chrono::Duration::days(7); + info!( + "No start date specified, defaulting to 7 days before end: {} to {}", + start_date, end + ); + (start_date, end) + } + (None, None) => { let end_date = chrono::Utc::now(); let start_date = end_date - chrono::Duration::days(7); info!( diff --git a/applications/datamanager/tests/common/mod.rs b/applications/datamanager/tests/common/mod.rs new file mode 100644 index 000000000..341c400b8 --- /dev/null +++ b/applications/datamanager/tests/common/mod.rs @@ -0,0 +1,273 @@ +#![allow(dead_code)] + +use aws_credential_types::Credentials; +use aws_sdk_s3::{config::Region, primitives::ByteStream, Client as S3Client}; +use axum::Router; +use std::{net::SocketAddr, sync::OnceLock, time::Duration}; +use testcontainers::runners::AsyncRunner; +use testcontainers_modules::localstack::LocalStack; +use tokio::{net::TcpListener, sync::oneshot, task::JoinHandle}; + +const TEST_BUCKET: &str = "test-bucket"; +const TEST_ACCESS_KEY: &str = "test"; +const TEST_SECRET_KEY: &str = "test"; +const TEST_REGION: &str = "us-east-1"; + +static LOCALSTACK_ENDPOINT: OnceLock = OnceLock::new(); +static TRACING_INIT: std::sync::Once = std::sync::Once::new(); + +pub struct EnvironmentVariableGuard { + name: String, + original_value: Option, +} + +impl EnvironmentVariableGuard { + pub fn set(name: &str, value: &str) -> Self { + let original_value = std::env::var(name).ok(); + unsafe { + std::env::set_var(name, value); + } + + Self { + name: name.to_string(), + original_value, + } + } + + pub fn remove(name: &str) -> Self { + let original_value = std::env::var(name).ok(); + unsafe { + std::env::remove_var(name); + } + + Self { + name: name.to_string(), + original_value, + } + } +} + +impl Drop for EnvironmentVariableGuard { + fn drop(&mut self) { + match self.original_value.as_ref() { + Some(value) => unsafe { + std::env::set_var(&self.name, value); + }, + None => unsafe { + std::env::remove_var(&self.name); + }, + } + } +} + +pub struct DuckDbEnvironmentGuard { + _guards: Vec, +} + +impl DuckDbEnvironmentGuard { + pub fn new(endpoint_host_port: &str) -> Self { + let guards = vec![ + EnvironmentVariableGuard::set("AWS_REGION", TEST_REGION), + EnvironmentVariableGuard::set("AWS_ACCESS_KEY_ID", TEST_ACCESS_KEY), + EnvironmentVariableGuard::set("AWS_SECRET_ACCESS_KEY", TEST_SECRET_KEY), + EnvironmentVariableGuard::set("AWS_EC2_METADATA_DISABLED", "true"), + EnvironmentVariableGuard::set("DUCKDB_S3_ENDPOINT", endpoint_host_port), + EnvironmentVariableGuard::set("DUCKDB_S3_USE_SSL", "false"), + ]; + Self { _guards: guards } + } +} + +pub fn initialize_test_tracing() { + TRACING_INIT.call_once(|| { + let _ = tracing_subscriber::fmt() + .with_max_level(tracing::Level::TRACE) + .with_test_writer() + .try_init(); + }); +} + +pub async fn get_localstack_endpoint() -> String { + if let Some(endpoint) = LOCALSTACK_ENDPOINT.get() { + return endpoint.clone(); + } + + let container = LocalStack::default() + .start() + .await + .expect("Failed to start LocalStack container — is Docker running?"); + + let host = container.get_host().await.unwrap(); + let port = container.get_host_port_ipv4(4566).await.unwrap(); + let endpoint = format!("http://{}:{}", host, port); + + // INTENTIONAL LEAK: Container is leaked to keep it alive for entire test run. + // + // Rationale: + // - Tests use #[serial] for sequential execution within this process + // - All tests share the same LocalStack container for performance + // - Container cleanup happens automatically when process exits + // - Alternative (proper Drop cleanup) requires complex lifetime management + // across static OnceLock, creating more complexity than benefit + // + // Trade-off: Small memory leak during test execution vs architectural complexity + // Impact: Container memory is reclaimed when test process terminates + Box::leak(Box::new(container)); + + let _ = LOCALSTACK_ENDPOINT.set(endpoint.clone()); + endpoint +} + +pub async fn create_test_s3_client(endpoint_url: &str) -> S3Client { + let credentials = Credentials::new(TEST_ACCESS_KEY, TEST_SECRET_KEY, None, None, "tests"); + + let shared_config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + .region(Region::new(TEST_REGION)) + .credentials_provider(credentials) + .endpoint_url(endpoint_url) + .load() + .await; + + let s3_config = aws_sdk_s3::config::Builder::from(&shared_config) + .force_path_style(true) + .build(); + + S3Client::from_conf(s3_config) +} + +/// Start LocalStack, create the test bucket, clean it, configure DuckDB env vars, +/// and return the endpoint URL and a ready-to-use S3 client. +pub async fn setup_test_bucket() -> (String, S3Client, DuckDbEnvironmentGuard) { + initialize_test_tracing(); + + let endpoint = get_localstack_endpoint().await; + let s3_client = create_test_s3_client(&endpoint).await; + + // Create bucket (ignore AlreadyExists / BucketAlreadyOwnedByYou) + let _ = s3_client.create_bucket().bucket(TEST_BUCKET).send().await; + + clean_bucket(&s3_client).await; + + let host_port = endpoint + .strip_prefix("http://") + .unwrap_or(&endpoint) + .to_string(); + let env_guard = DuckDbEnvironmentGuard::new(&host_port); + + (endpoint, s3_client, env_guard) +} + +pub async fn clean_bucket(s3_client: &S3Client) { + let mut continuation_token: Option = None; + + loop { + let mut request = s3_client.list_objects_v2().bucket(TEST_BUCKET); + if let Some(token) = &continuation_token { + request = request.continuation_token(token); + } + + let output = match request.send().await { + Ok(output) => output, + Err(_) => break, + }; + + let contents = output.contents(); + for object in contents { + if let Some(key) = object.key() { + let _ = s3_client + .delete_object() + .bucket(TEST_BUCKET) + .key(key) + .send() + .await; + } + } + + if output.is_truncated() == Some(true) { + continuation_token = output.next_continuation_token().map(|s| s.to_string()); + } else { + break; + } + } +} + +pub async fn put_test_object(s3_client: &S3Client, key: &str, bytes: Vec) { + s3_client + .put_object() + .bucket(TEST_BUCKET) + .key(key) + .body(ByteStream::from(bytes)) + .send() + .await + .expect("Failed to put test object"); +} + +pub fn test_bucket_name() -> String { + TEST_BUCKET.to_string() +} + +pub struct SpawnedAppServer { + pub base_url: String, + shutdown_sender: Option>, + server_handle: Option>, +} + +impl SpawnedAppServer { + pub async fn start(app: Router) -> Self { + initialize_test_tracing(); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let local_address = listener.local_addr().unwrap(); + let base_url = format!("http://{}", local_address); + + let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>(); + + let server_handle = tokio::spawn(async move { + let server = axum::serve(listener, app); + tokio::select! { + _ = server => {} + _ = shutdown_receiver => {} + } + }); + + wait_for_server_start(local_address).await; + + Self { + base_url, + shutdown_sender: Some(shutdown_sender), + server_handle: Some(server_handle), + } + } + + pub fn url(&self, path: &str) -> String { + if path.starts_with('/') { + format!("{}{}", self.base_url, path) + } else { + format!("{}/{}", self.base_url, path) + } + } +} + +impl Drop for SpawnedAppServer { + fn drop(&mut self) { + if let Some(shutdown_sender) = self.shutdown_sender.take() { + let _ = shutdown_sender.send(()); + } + + if let Some(server_handle) = self.server_handle.take() { + server_handle.abort(); + } + } +} + +async fn wait_for_server_start(address: SocketAddr) { + for _ in 0..50 { + if tokio::net::TcpStream::connect(address).await.is_ok() { + return; + } + + tokio::time::sleep(Duration::from_millis(10)).await; + } + + panic!("Server did not start listening on {}", address); +} diff --git a/applications/datamanager/tests/test_data.rs b/applications/datamanager/tests/test_data.rs index a57bf61f0..5ba2e7339 100644 --- a/applications/datamanager/tests/test_data.rs +++ b/applications/datamanager/tests/test_data.rs @@ -1,3 +1,6 @@ +mod common; + +use common::initialize_test_tracing; use datamanager::data::{ create_equity_bar_dataframe, create_equity_details_dataframe, create_portfolio_dataframe, create_predictions_dataframe, EquityBar, Portfolio, Prediction, @@ -80,6 +83,7 @@ fn sample_portfolio_lowercase() -> Portfolio { #[test] fn test_create_equity_bar_dataframe_valid_data() { + initialize_test_tracing(); let bars = vec![sample_equity_bar()]; let df = create_equity_bar_dataframe(bars).unwrap(); @@ -99,6 +103,7 @@ fn test_create_equity_bar_dataframe_valid_data() { #[test] fn test_create_equity_bar_dataframe_uppercase_normalization() { + initialize_test_tracing(); let bars = vec![sample_equity_bar_lowercase()]; let df = create_equity_bar_dataframe(bars).unwrap(); @@ -110,6 +115,7 @@ fn test_create_equity_bar_dataframe_uppercase_normalization() { #[test] fn test_create_equity_bar_dataframe_mixed_case_tickers() { + initialize_test_tracing(); let bars = vec![sample_equity_bar(), sample_equity_bar_lowercase()]; let df = create_equity_bar_dataframe(bars).unwrap(); @@ -130,6 +136,7 @@ fn test_create_equity_bar_dataframe_mixed_case_tickers() { #[test] fn test_create_equity_bar_dataframe_empty_vec() { + initialize_test_tracing(); let bars: Vec = vec![]; let df = create_equity_bar_dataframe(bars).unwrap(); @@ -140,6 +147,7 @@ fn test_create_equity_bar_dataframe_empty_vec() { #[test] fn test_create_equity_bar_dataframe_with_none_prices() { + initialize_test_tracing(); let bars = vec![EquityBar { ticker: "TEST".to_string(), timestamp: 1234567890, @@ -162,6 +170,7 @@ fn test_create_equity_bar_dataframe_with_none_prices() { #[test] fn test_create_equity_bar_dataframe_multiple_rows() { + initialize_test_tracing(); let bars = vec![ sample_equity_bar(), sample_equity_bar(), @@ -176,6 +185,7 @@ fn test_create_equity_bar_dataframe_multiple_rows() { #[test] fn test_create_predictions_dataframe_valid_data() { + initialize_test_tracing(); let predictions = vec![sample_prediction()]; let df = create_predictions_dataframe(predictions).unwrap(); @@ -191,6 +201,7 @@ fn test_create_predictions_dataframe_valid_data() { #[test] fn test_create_predictions_dataframe_uppercase_normalization() { + initialize_test_tracing(); let predictions = vec![Prediction { ticker: "aapl".to_string(), timestamp: 1234567890, @@ -208,6 +219,7 @@ fn test_create_predictions_dataframe_uppercase_normalization() { #[test] fn test_create_predictions_dataframe_deduplication() { + initialize_test_tracing(); let predictions = vec![ sample_prediction_with_timestamp(1000), sample_prediction_with_timestamp(2000), @@ -224,6 +236,7 @@ fn test_create_predictions_dataframe_deduplication() { #[test] fn test_create_predictions_dataframe_keeps_most_recent_per_ticker() { + initialize_test_tracing(); let predictions = vec![ Prediction { ticker: "AAPL".to_string(), @@ -266,6 +279,7 @@ fn test_create_predictions_dataframe_keeps_most_recent_per_ticker() { #[test] fn test_create_predictions_dataframe_empty_vec() { + initialize_test_tracing(); let predictions: Vec = vec![]; let df = create_predictions_dataframe(predictions).unwrap(); @@ -276,6 +290,7 @@ fn test_create_predictions_dataframe_empty_vec() { #[test] fn test_create_predictions_dataframe_multiple_different_tickers() { + initialize_test_tracing(); let predictions = vec![ Prediction { ticker: "AAPL".to_string(), @@ -307,6 +322,7 @@ fn test_create_predictions_dataframe_multiple_different_tickers() { #[test] fn test_create_portfolio_dataframe_valid_data() { + initialize_test_tracing(); let portfolios = vec![sample_portfolio()]; let df = create_portfolio_dataframe(portfolios).unwrap(); @@ -322,6 +338,7 @@ fn test_create_portfolio_dataframe_valid_data() { #[test] fn test_create_portfolio_dataframe_uppercase_normalization() { + initialize_test_tracing(); let portfolios = vec![sample_portfolio_lowercase()]; let df = create_portfolio_dataframe(portfolios).unwrap(); @@ -338,6 +355,7 @@ fn test_create_portfolio_dataframe_uppercase_normalization() { #[test] fn test_create_portfolio_dataframe_mixed_case() { + initialize_test_tracing(); let portfolios = vec![ Portfolio { ticker: "aapl".to_string(), @@ -392,6 +410,7 @@ fn test_create_portfolio_dataframe_mixed_case() { #[test] fn test_create_portfolio_dataframe_empty_vec() { + initialize_test_tracing(); let portfolios: Vec = vec![]; let df = create_portfolio_dataframe(portfolios).unwrap(); @@ -404,6 +423,7 @@ fn test_create_portfolio_dataframe_empty_vec() { #[test] fn test_create_equity_details_dataframe_valid_csv() { + initialize_test_tracing(); let csv_content = "ticker,sector,industry\nAAPL,Technology,Consumer Electronics\nGOOGL,Technology,Internet Services\n"; let df = create_equity_details_dataframe(csv_content.to_string()).unwrap(); @@ -417,6 +437,7 @@ fn test_create_equity_details_dataframe_valid_csv() { #[test] fn test_create_equity_details_dataframe_uppercase_normalization() { + initialize_test_tracing(); let csv_content = "ticker,sector,industry\naapl,technology,consumer electronics\n"; let df = create_equity_details_dataframe(csv_content.to_string()).unwrap(); @@ -439,6 +460,7 @@ fn test_create_equity_details_dataframe_uppercase_normalization() { #[test] fn test_create_equity_details_dataframe_with_nulls() { + initialize_test_tracing(); let csv_content = "ticker,sector,industry\nAAPL,,\n"; let df = create_equity_details_dataframe(csv_content.to_string()).unwrap(); @@ -460,6 +482,7 @@ fn test_create_equity_details_dataframe_with_nulls() { #[test] fn test_create_equity_details_dataframe_extra_columns() { + initialize_test_tracing(); let csv_content = "ticker,sector,industry,extra_column\nAAPL,Technology,Consumer Electronics,Extra\n"; @@ -472,6 +495,7 @@ fn test_create_equity_details_dataframe_extra_columns() { #[test] fn test_create_equity_details_dataframe_missing_ticker_column() { + initialize_test_tracing(); let csv_content = "sector,industry\nTechnology,Consumer Electronics\n"; let result = create_equity_details_dataframe(csv_content.to_string()); @@ -485,6 +509,7 @@ fn test_create_equity_details_dataframe_missing_ticker_column() { #[test] fn test_create_equity_details_dataframe_missing_sector_column() { + initialize_test_tracing(); let csv_content = "ticker,industry\nAAPL,Consumer Electronics\n"; let result = create_equity_details_dataframe(csv_content.to_string()); @@ -498,6 +523,7 @@ fn test_create_equity_details_dataframe_missing_sector_column() { #[test] fn test_create_equity_details_dataframe_missing_industry_column() { + initialize_test_tracing(); let csv_content = "ticker,sector\nAAPL,Technology\n"; let result = create_equity_details_dataframe(csv_content.to_string()); @@ -511,6 +537,7 @@ fn test_create_equity_details_dataframe_missing_industry_column() { #[test] fn test_create_equity_details_dataframe_empty_csv() { + initialize_test_tracing(); let csv_content = "ticker,sector,industry\n"; let df = create_equity_details_dataframe(csv_content.to_string()).unwrap(); @@ -521,6 +548,7 @@ fn test_create_equity_details_dataframe_empty_csv() { #[test] fn test_create_equity_details_dataframe_malformed_csv() { + initialize_test_tracing(); let csv_content = "ticker,sector,industry\nAAPL,Technology\nGOOGL,Technology,Internet Services,Extra\n"; @@ -537,6 +565,7 @@ fn test_create_equity_details_dataframe_malformed_csv() { #[test] fn test_equity_bar_dataframe_parquet_roundtrip() { + initialize_test_tracing(); use std::io::Cursor; let original_bars = vec![sample_equity_bar()]; @@ -571,6 +600,7 @@ fn test_equity_bar_dataframe_parquet_roundtrip() { #[test] fn test_predictions_dataframe_parquet_roundtrip() { + initialize_test_tracing(); use std::io::Cursor; let original_predictions = vec![sample_prediction()]; @@ -599,6 +629,7 @@ fn test_predictions_dataframe_parquet_roundtrip() { #[test] fn test_portfolio_dataframe_parquet_roundtrip() { + initialize_test_tracing(); use std::io::Cursor; let original_portfolios = vec![sample_portfolio()]; @@ -627,6 +658,7 @@ fn test_portfolio_dataframe_parquet_roundtrip() { #[test] fn test_parquet_empty_dataframe_roundtrip() { + initialize_test_tracing(); use std::io::Cursor; let empty_bars: Vec = vec![]; diff --git a/applications/datamanager/tests/test_errors.rs b/applications/datamanager/tests/test_errors.rs new file mode 100644 index 000000000..8fa7552b4 --- /dev/null +++ b/applications/datamanager/tests/test_errors.rs @@ -0,0 +1,15 @@ +use datamanager::errors::Error; + +#[test] +fn test_error_display_formats_messages() { + let other_error = Error::Other("example message".to_string()); + assert_eq!(other_error.to_string(), "Other error: example message"); + + let connection = duckdb::Connection::open_in_memory().unwrap(); + let duckdb_error = connection + .execute_batch("SELECT * FROM missing_table") + .unwrap_err(); + let wrapped_error = Error::DuckDB(duckdb_error); + + assert!(wrapped_error.to_string().contains("DuckDB error")); +} diff --git a/applications/datamanager/tests/test_handlers.rs b/applications/datamanager/tests/test_handlers.rs new file mode 100644 index 000000000..d2ab22f1d --- /dev/null +++ b/applications/datamanager/tests/test_handlers.rs @@ -0,0 +1,720 @@ +mod common; + +use datamanager::{ + router::create_app_with_state, + state::{MassiveSecrets, State}, +}; +use mockito::{Matcher, Server}; +use polars::prelude::*; +use reqwest::StatusCode; +use serial_test::serial; + +use common::{ + create_test_s3_client, put_test_object, setup_test_bucket, test_bucket_name, + DuckDbEnvironmentGuard, EnvironmentVariableGuard, SpawnedAppServer, +}; + +async fn spawn_app( + endpoint: &str, + massive_base: String, +) -> (SpawnedAppServer, EnvironmentVariableGuard) { + let env_guard = EnvironmentVariableGuard::set("MASSIVE_API_KEY", "test-api-key"); + + let s3_client = create_test_s3_client(endpoint).await; + let state = State::new( + reqwest::Client::new(), + MassiveSecrets { + base: massive_base, + key: std::env::var("MASSIVE_API_KEY").unwrap(), + }, + s3_client, + test_bucket_name(), + ); + let app = create_app_with_state(state); + (SpawnedAppServer::start(app).await, env_guard) +} + +async fn spawn_app_with_unreachable_s3( + massive_base: String, +) -> (SpawnedAppServer, DuckDbEnvironmentGuard) { + // Point DuckDB env vars to the same unreachable endpoint so that + // DuckDB's httpfs also fails (DuckDB reads credentials and endpoint from env). + let env_guard = DuckDbEnvironmentGuard::new("127.0.0.1:9"); + + let unreachable_s3_client = create_test_s3_client("http://127.0.0.1:9").await; + let state = State::new( + reqwest::Client::new(), + MassiveSecrets { + base: massive_base, + key: "test-api-key".to_string(), + }, + unreachable_s3_client, + "test-bucket".to_string(), + ); + let app = create_app_with_state(state); + (SpawnedAppServer::start(app).await, env_guard) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_predictions_save_and_query_round_trip() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let (app, _env_guard) = spawn_app(&endpoint, "http://127.0.0.1:1".to_string()).await; + let client = reqwest::Client::new(); + + let save_payload = r#"{ + "data": [{ + "ticker": "AAPL", + "timestamp": 1735689600, + "quantile_10": 190.0, + "quantile_50": 200.0, + "quantile_90": 210.0 + }], + "timestamp": "2025-01-01T00:00:00Z" + }"#; + + let response = client + .post(app.url("/predictions")) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body(save_payload) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let encoded_query = urlencoding::encode("[{\"ticker\":\"AAPL\",\"timestamp\":1735689600.0}]"); + let response = client + .get(app.url(&format!( + "/predictions?tickers_and_timestamps={}", + encoded_query + ))) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let body = response.text().await.unwrap(); + assert!(body.contains("AAPL")); + assert!(body.contains("quantile_50")); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_predictions_save_returns_internal_server_error_when_s3_upload_fails() { + let (app, _env_guard) = spawn_app_with_unreachable_s3("http://127.0.0.1:1".to_string()).await; + + let save_payload = r#"{ + "data": [{ + "ticker": "AAPL", + "timestamp": 1735689600, + "quantile_10": 190.0, + "quantile_50": 200.0, + "quantile_90": 210.0 + }], + "timestamp": "2025-01-01T00:00:00Z" + }"#; + + let response = reqwest::Client::new() + .post(app.url("/predictions")) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body(save_payload) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_predictions_query_returns_bad_request_for_invalid_url_encoding() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let (app, _env_guard) = spawn_app(&endpoint, "http://127.0.0.1:1".to_string()).await; + + let response = reqwest::Client::new() + .get(app.url("/predictions?tickers_and_timestamps=%")) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::BAD_REQUEST); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_predictions_query_returns_bad_request_for_invalid_json() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let (app, _env_guard) = spawn_app(&endpoint, "http://127.0.0.1:1".to_string()).await; + + let encoded = urlencoding::encode("not-json"); + let response = reqwest::Client::new() + .get(app.url(&format!("/predictions?tickers_and_timestamps={}", encoded))) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::BAD_REQUEST); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_predictions_query_returns_empty_json_array_when_no_rows_match() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let (app, _env_guard) = spawn_app(&endpoint, "http://127.0.0.1:1".to_string()).await; + let client = reqwest::Client::new(); + + let save_payload = r#"{ + "data": [{ + "ticker": "AAPL", + "timestamp": 1735689600, + "quantile_10": 190.0, + "quantile_50": 200.0, + "quantile_90": 210.0 + }], + "timestamp": "2025-01-01T00:00:00Z" + }"#; + + let response = client + .post(app.url("/predictions")) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body(save_payload) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let encoded = urlencoding::encode("[{\"ticker\":\"MSFT\",\"timestamp\":1735689600.0}]"); + let response = client + .get(app.url(&format!("/predictions?tickers_and_timestamps={}", encoded))) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + assert_eq!(response.text().await.unwrap(), "[]"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_predictions_query_returns_internal_server_error_when_storage_query_fails() { + let (app, _env_guard) = spawn_app_with_unreachable_s3("http://127.0.0.1:1".to_string()).await; + + let encoded = urlencoding::encode("[{\"ticker\":\"AAPL\",\"timestamp\":1735689600.0}]"); + let response = reqwest::Client::new() + .get(app.url(&format!("/predictions?tickers_and_timestamps={}", encoded))) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_portfolios_save_and_get_round_trip() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let (app, _env_guard) = spawn_app(&endpoint, "http://127.0.0.1:1".to_string()).await; + let client = reqwest::Client::new(); + + let save_payload = r#"{ + "data": [{ + "ticker": "AAPL", + "timestamp": 1735689600.0, + "side": "long", + "dollar_amount": 10000.0, + "action": "buy" + }], + "timestamp": "2025-01-01T00:00:00Z" + }"#; + + let response = client + .post(app.url("/portfolios")) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body(save_payload) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let response = client + .get(app.url("/portfolios?timestamp=2025-01-01T00:00:00Z")) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let body = response.text().await.unwrap(); + assert!(body.contains("AAPL")); + assert!(body.contains("BUY")); + + let response = client.get(app.url("/portfolios")).send().await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_portfolios_save_returns_internal_server_error_when_s3_upload_fails() { + let (app, _env_guard) = spawn_app_with_unreachable_s3("http://127.0.0.1:1".to_string()).await; + + let save_payload = r#"{ + "data": [{ + "ticker": "AAPL", + "timestamp": 1735689600.0, + "side": "long", + "dollar_amount": 10000.0, + "action": "buy" + }], + "timestamp": "2025-01-01T00:00:00Z" + }"#; + + let response = reqwest::Client::new() + .post(app.url("/portfolios")) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body(save_payload) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_portfolios_get_returns_not_found_for_first_run_without_files() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let (app, _env_guard) = spawn_app(&endpoint, "http://127.0.0.1:1".to_string()).await; + + let response = reqwest::Client::new() + .get(app.url("/portfolios")) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::NOT_FOUND); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_portfolios_get_returns_not_found_when_portfolio_file_is_empty() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let (app, _env_guard) = spawn_app(&endpoint, "http://127.0.0.1:1".to_string()).await; + let client = reqwest::Client::new(); + + let empty_save_payload = r#"{ + "data": [], + "timestamp": "2025-01-01T00:00:00Z" + }"#; + + let response = client + .post(app.url("/portfolios")) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body(empty_save_payload) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let response = client + .get(app.url("/portfolios?timestamp=2025-01-01T00:00:00Z")) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::NOT_FOUND); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_equity_details_get_returns_csv_content() { + let (endpoint, s3, _env_guard) = setup_test_bucket().await; + + put_test_object( + &s3, + "equity/details/categories.csv", + b"ticker,sector,industry\nAAPL,Technology,Consumer Electronics\n".to_vec(), + ) + .await; + + let (app, _env_guard) = spawn_app(&endpoint, "http://127.0.0.1:1".to_string()).await; + + let response = reqwest::Client::new() + .get(app.url("/equity-details")) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let content_type = response + .headers() + .get(reqwest::header::CONTENT_TYPE) + .unwrap() + .to_str() + .unwrap(); + assert!(content_type.contains("text/csv")); + + let body = response.text().await.unwrap(); + assert!(body.contains("AAPL")); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_equity_details_get_returns_internal_server_error_when_csv_is_missing() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let (app, _env_guard) = spawn_app(&endpoint, "http://127.0.0.1:1".to_string()).await; + + let response = reqwest::Client::new() + .get(app.url("/equity-details")) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_equity_bars_sync_and_query_round_trip() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + + let mut massive_server = Server::new_async().await; + let _mock = massive_server + .mock("GET", "/v2/aggs/grouped/locale/us/market/stocks/2025-01-01") + .match_query(Matcher::AllOf(vec![ + Matcher::UrlEncoded("adjusted".into(), "true".into()), + Matcher::UrlEncoded("apiKey".into(), "test-api-key".into()), + ])) + .with_status(200) + .with_body( + r#"{ + "adjusted": true, + "queryCount": 1, + "request_id": "test", + "resultsCount": 1, + "status": "OK", + "results": [{ + "T": "AAPL", + "c": 105.0, + "h": 110.0, + "l": 99.0, + "n": 1000, + "o": 100.0, + "t": 1735689600, + "v": 2000000.0, + "vw": 104.0 + }] + }"#, + ) + .create_async() + .await; + + let (app, _env_guard) = spawn_app(&endpoint, massive_server.url()).await; + let client = reqwest::Client::new(); + + let response = client + .post(app.url("/equity-bars")) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body(r#"{"date":"2025-01-01T00:00:00Z"}"#) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let response = client + .get(app.url( + "/equity-bars?tickers=AAPL&start_timestamp=2025-01-01T00:00:00Z&end_timestamp=2025-01-01T00:00:00Z", + )) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let content_type = response + .headers() + .get(reqwest::header::CONTENT_TYPE) + .unwrap() + .to_str() + .unwrap(); + assert_eq!(content_type, "application/octet-stream"); + + let body = response.bytes().await.unwrap(); + let dataframe = ParquetReader::new(std::io::Cursor::new(body.to_vec())) + .finish() + .unwrap(); + assert_eq!(dataframe.height(), 1); + assert_eq!( + dataframe.column("ticker").unwrap().str().unwrap().get(0), + Some("AAPL") + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_equity_bars_sync_returns_no_content_when_api_has_no_results() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + + let mut massive_server = Server::new_async().await; + let _mock = massive_server + .mock("GET", "/v2/aggs/grouped/locale/us/market/stocks/2025-01-01") + .match_query(Matcher::AllOf(vec![ + Matcher::UrlEncoded("adjusted".into(), "true".into()), + Matcher::UrlEncoded("apiKey".into(), "test-api-key".into()), + ])) + .with_status(200) + .with_body(r#"{"status":"OK","resultsCount":0}"#) + .create_async() + .await; + + let (app, _env_guard) = spawn_app(&endpoint, massive_server.url()).await; + + let response = reqwest::Client::new() + .post(app.url("/equity-bars")) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body(r#"{"date":"2025-01-01T00:00:00Z"}"#) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::NO_CONTENT); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_equity_bars_sync_returns_internal_server_error_for_invalid_json() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + + let mut massive_server = Server::new_async().await; + let _mock = massive_server + .mock("GET", "/v2/aggs/grouped/locale/us/market/stocks/2025-01-01") + .match_query(Matcher::AllOf(vec![ + Matcher::UrlEncoded("adjusted".into(), "true".into()), + Matcher::UrlEncoded("apiKey".into(), "test-api-key".into()), + ])) + .with_status(200) + .with_body("not-json") + .create_async() + .await; + + let (app, _env_guard) = spawn_app(&endpoint, massive_server.url()).await; + + let response = reqwest::Client::new() + .post(app.url("/equity-bars")) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body(r#"{"date":"2025-01-01T00:00:00Z"}"#) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_equity_bars_sync_returns_bad_gateway_for_unparseable_results() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + + let mut massive_server = Server::new_async().await; + let _mock = massive_server + .mock("GET", "/v2/aggs/grouped/locale/us/market/stocks/2025-01-01") + .match_query(Matcher::AllOf(vec![ + Matcher::UrlEncoded("adjusted".into(), "true".into()), + Matcher::UrlEncoded("apiKey".into(), "test-api-key".into()), + ])) + .with_status(200) + .with_body( + r#"{ + "status": "OK", + "results": [{"T":"AAPL"}] + }"#, + ) + .create_async() + .await; + + let (app, _env_guard) = spawn_app(&endpoint, massive_server.url()).await; + + let response = reqwest::Client::new() + .post(app.url("/equity-bars")) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body(r#"{"date":"2025-01-01T00:00:00Z"}"#) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::BAD_GATEWAY); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_equity_bars_sync_returns_internal_server_error_when_api_request_fails() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let (app, _env_guard) = spawn_app(&endpoint, "http://127.0.0.1:1".to_string()).await; + + let response = reqwest::Client::new() + .post(app.url("/equity-bars")) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body(r#"{"date":"2025-01-01T00:00:00Z"}"#) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_equity_bars_query_returns_internal_server_error_for_invalid_ticker() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let (app, _env_guard) = spawn_app(&endpoint, "http://127.0.0.1:1".to_string()).await; + + let response = reqwest::Client::new() + .get(app.url( + "/equity-bars?tickers=AAPL;DROP&start_timestamp=2025-01-01T00:00:00Z&end_timestamp=2025-01-01T00:00:00Z", + )) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_equity_bars_query_without_ticker_filter_returns_data() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + + let mut massive_server = Server::new_async().await; + let _mock = massive_server + .mock("GET", "/v2/aggs/grouped/locale/us/market/stocks/2025-01-01") + .match_query(Matcher::AllOf(vec![ + Matcher::UrlEncoded("adjusted".into(), "true".into()), + Matcher::UrlEncoded("apiKey".into(), "test-api-key".into()), + ])) + .with_status(200) + .with_body(r#"{"adjusted":true,"queryCount":1,"request_id":"t","resultsCount":1,"status":"OK","results":[{"T":"AAPL","c":105.0,"h":110.0,"l":99.0,"n":1000,"o":100.0,"t":1735689600,"v":2000000.0,"vw":104.0}]}"#) + .create_async() + .await; + + let (app, _env_guard) = spawn_app(&endpoint, massive_server.url()).await; + let client = reqwest::Client::new(); + + let response = client + .post(app.url("/equity-bars")) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body(r#"{"date":"2025-01-01T00:00:00Z"}"#) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + // Query without tickers — covers "No tickers specified" branch + let response = client + .get(app.url( + "/equity-bars?start_timestamp=2025-01-01T00:00:00Z&end_timestamp=2025-01-01T00:00:00Z", + )) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + let body = response.bytes().await.unwrap(); + let dataframe = ParquetReader::new(std::io::Cursor::new(body.to_vec())) + .finish() + .unwrap(); + assert_eq!(dataframe.height(), 1); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_equity_bars_query_with_empty_tickers_param_returns_data() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + + let mut massive_server = Server::new_async().await; + let _mock = massive_server + .mock("GET", "/v2/aggs/grouped/locale/us/market/stocks/2025-01-01") + .match_query(Matcher::AllOf(vec![ + Matcher::UrlEncoded("adjusted".into(), "true".into()), + Matcher::UrlEncoded("apiKey".into(), "test-api-key".into()), + ])) + .with_status(200) + .with_body(r#"{"adjusted":true,"queryCount":1,"request_id":"t","resultsCount":1,"status":"OK","results":[{"T":"AAPL","c":105.0,"h":110.0,"l":99.0,"n":1000,"o":100.0,"t":1735689600,"v":2000000.0,"vw":104.0}]}"#) + .create_async() + .await; + + let (app, _env_guard) = spawn_app(&endpoint, massive_server.url()).await; + let client = reqwest::Client::new(); + + let response = client + .post(app.url("/equity-bars")) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body(r#"{"date":"2025-01-01T00:00:00Z"}"#) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + // Query with empty tickers — covers "Ticker list was empty" branch + let response = client + .get(app.url( + "/equity-bars?tickers=&start_timestamp=2025-01-01T00:00:00Z&end_timestamp=2025-01-01T00:00:00Z", + )) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_equity_bars_sync_returns_internal_server_error_for_api_error_status() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + + let mut massive_server = Server::new_async().await; + let _mock = massive_server + .mock("GET", "/v2/aggs/grouped/locale/us/market/stocks/2025-01-01") + .match_query(Matcher::AllOf(vec![ + Matcher::UrlEncoded("adjusted".into(), "true".into()), + Matcher::UrlEncoded("apiKey".into(), "test-api-key".into()), + ])) + .with_status(500) + .with_body("Internal Server Error") + .create_async() + .await; + + let (app, _env_guard) = spawn_app(&endpoint, massive_server.url()).await; + + let response = reqwest::Client::new() + .post(app.url("/equity-bars")) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body(r#"{"date":"2025-01-01T00:00:00Z"}"#) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_equity_bars_sync_returns_bad_gateway_when_s3_upload_fails() { + let mut massive_server = Server::new_async().await; + let _mock = massive_server + .mock("GET", "/v2/aggs/grouped/locale/us/market/stocks/2025-01-01") + .match_query(Matcher::AllOf(vec![ + Matcher::UrlEncoded("adjusted".into(), "true".into()), + Matcher::UrlEncoded("apiKey".into(), "test-api-key".into()), + ])) + .with_status(200) + .with_body(r#"{"adjusted":true,"queryCount":1,"request_id":"t","resultsCount":1,"status":"OK","results":[{"T":"AAPL","c":105.0,"h":110.0,"l":99.0,"n":1000,"o":100.0,"t":1735689600,"v":2000000.0,"vw":104.0}]}"#) + .create_async() + .await; + + // Working Massive API but broken S3 → parse succeeds, upload fails + let (app, _env_guard) = spawn_app_with_unreachable_s3(massive_server.url()).await; + + let response = reqwest::Client::new() + .post(app.url("/equity-bars")) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body(r#"{"date":"2025-01-01T00:00:00Z"}"#) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::BAD_GATEWAY); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_portfolios_get_returns_internal_server_error_when_storage_query_fails() { + // DuckDB connects to unreachable S3 → connection error (not "not found") + let (app, _env_guard) = spawn_app_with_unreachable_s3("http://127.0.0.1:1".to_string()).await; + + let response = reqwest::Client::new() + .get(app.url("/portfolios")) + .send() + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); +} diff --git a/applications/datamanager/tests/test_state_and_health.rs b/applications/datamanager/tests/test_state_and_health.rs new file mode 100644 index 000000000..8f51ae2aa --- /dev/null +++ b/applications/datamanager/tests/test_state_and_health.rs @@ -0,0 +1,108 @@ +mod common; + +use datamanager::{ + router::create_app_with_state, + state::{MassiveSecrets, State}, +}; +use reqwest::StatusCode; +use serial_test::serial; + +use common::{ + create_test_s3_client, setup_test_bucket, test_bucket_name, EnvironmentVariableGuard, + SpawnedAppServer, +}; + +async fn create_state_for_endpoint(endpoint: &str, bucket_name: &str) -> State { + let s3_client = create_test_s3_client(endpoint).await; + + State::new( + reqwest::Client::new(), + MassiveSecrets { + base: "http://127.0.0.1:1".to_string(), + key: "test-key".to_string(), + }, + s3_client, + bucket_name.to_string(), + ) +} + +async fn spawn_server_for_state(state: State) -> SpawnedAppServer { + let app = create_app_with_state(state); + SpawnedAppServer::start(app).await +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_health_route_returns_ok() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let state = create_state_for_endpoint(&endpoint, &test_bucket_name()).await; + let app_server = spawn_server_for_state(state).await; + + let response = reqwest::Client::new() + .get(app_server.url("/health")) + .send() + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_router_returns_not_found_for_unknown_route() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let state = create_state_for_endpoint(&endpoint, &test_bucket_name()).await; + let app_server = spawn_server_for_state(state).await; + + let response = reqwest::Client::new() + .get(app_server.url("/missing")) + .send() + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::NOT_FOUND); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_state_new_sets_all_fields() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let state = create_state_for_endpoint(&endpoint, "custom-bucket").await; + + assert_eq!(state.massive.base, "http://127.0.0.1:1"); + assert_eq!(state.massive.key, "test-key"); + assert_eq!(state.bucket_name, "custom-bucket"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_state_from_env_uses_defaults_when_variables_are_missing() { + let _aws_bucket_guard = EnvironmentVariableGuard::remove("AWS_S3_DATA_BUCKET_NAME"); + let _massive_base_guard = EnvironmentVariableGuard::remove("MASSIVE_BASE_URL"); + let _massive_key_guard = EnvironmentVariableGuard::remove("MASSIVE_API_KEY"); + let _region_guard = EnvironmentVariableGuard::set("AWS_REGION", "us-east-1"); + let _metadata_guard = EnvironmentVariableGuard::set("AWS_EC2_METADATA_DISABLED", "true"); + + let state = State::from_env().await; + + assert_eq!(state.bucket_name, "oscm-data"); + assert_eq!(state.massive.base, "https://api.massive.io"); + assert!(state.massive.key.is_empty()); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_state_from_env_uses_environment_values() { + let _aws_bucket_guard = EnvironmentVariableGuard::set("AWS_S3_DATA_BUCKET_NAME", "env-bucket"); + let _massive_base_guard = + EnvironmentVariableGuard::set("MASSIVE_BASE_URL", "https://massive.example"); + let _massive_key_guard = EnvironmentVariableGuard::set("MASSIVE_API_KEY", "env-api-key"); + let _region_guard = EnvironmentVariableGuard::set("AWS_REGION", "us-east-1"); + let _metadata_guard = EnvironmentVariableGuard::set("AWS_EC2_METADATA_DISABLED", "true"); + + let state = State::from_env().await; + + assert_eq!(state.bucket_name, "env-bucket"); + assert_eq!(state.massive.base, "https://massive.example"); + assert_eq!(state.massive.key, "env-api-key"); +} diff --git a/applications/datamanager/tests/test_storage.rs b/applications/datamanager/tests/test_storage.rs new file mode 100644 index 000000000..910ce4e04 --- /dev/null +++ b/applications/datamanager/tests/test_storage.rs @@ -0,0 +1,481 @@ +mod common; + +use chrono::{TimeZone, Utc}; +use datamanager::{ + data::{ + create_equity_bar_dataframe, create_portfolio_dataframe, create_predictions_dataframe, + EquityBar, Portfolio, Prediction, + }, + state::{MassiveSecrets, State}, + storage::{ + date_to_int, escape_sql_ticker, format_s3_key, is_valid_ticker, + query_equity_bars_parquet_from_s3, query_portfolio_dataframe_from_s3, + query_predictions_dataframe_from_s3, read_equity_details_dataframe_from_s3, + sanitize_duckdb_config_value, write_equity_bars_dataframe_to_s3, + write_portfolio_dataframe_to_s3, write_predictions_dataframe_to_s3, PredictionQuery, + }, +}; +use polars::prelude::*; +use serial_test::serial; +use std::io::Cursor; + +use common::{create_test_s3_client, put_test_object, setup_test_bucket, test_bucket_name}; + +fn sample_prediction() -> Prediction { + Prediction { + ticker: "AAPL".to_string(), + timestamp: 1_735_689_600, + quantile_10: 190.0, + quantile_50: 200.0, + quantile_90: 210.0, + } +} + +fn sample_portfolio() -> Portfolio { + Portfolio { + ticker: "AAPL".to_string(), + timestamp: 1_735_689_600.0, + side: "LONG".to_string(), + dollar_amount: 10_000.0, + action: "BUY".to_string(), + } +} + +fn sample_equity_bar() -> EquityBar { + EquityBar { + ticker: "AAPL".to_string(), + timestamp: 1_735_689_600, + open_price: Some(100.0), + high_price: Some(110.0), + low_price: Some(99.0), + close_price: Some(105.0), + volume: Some(2_000_000.0), + volume_weighted_average_price: Some(104.0), + transactions: Some(1_000), + } +} + +fn fixed_date_time() -> chrono::DateTime { + Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap() +} + +async fn create_state(endpoint: &str) -> State { + let s3_client = create_test_s3_client(endpoint).await; + + State::new( + reqwest::Client::new(), + MassiveSecrets { + base: "http://127.0.0.1:1".to_string(), + key: "test-api-key".to_string(), + }, + s3_client, + test_bucket_name(), + ) +} + +#[test] +fn test_is_valid_ticker() { + assert!(is_valid_ticker("AAPL")); + assert!(is_valid_ticker("BRK.B")); + assert!(is_valid_ticker("BTC-USD")); + + assert!(!is_valid_ticker("")); + assert!(!is_valid_ticker("AAPL$")); + assert!(!is_valid_ticker("AAPL;DROP")); +} + +#[test] +fn test_format_s3_key() { + let timestamp = fixed_date_time(); + let key = format_s3_key(×tamp, "predictions"); + + assert_eq!( + key, + "equity/predictions/daily/year=2025/month=01/day=01/data.parquet" + ); +} + +#[test] +fn test_date_to_int() { + let timestamp = fixed_date_time(); + assert_eq!(date_to_int(×tamp).unwrap(), 20250101); +} + +#[test] +fn test_escape_sql_ticker() { + assert_eq!(escape_sql_ticker("AAPL"), "AAPL"); + assert_eq!(escape_sql_ticker("O'Reilly"), "O''Reilly"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_write_and_query_predictions_round_trip() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let state = create_state(&endpoint).await; + let timestamp = fixed_date_time(); + + let predictions_dataframe = create_predictions_dataframe(vec![sample_prediction()]).unwrap(); + let s3_key = write_predictions_dataframe_to_s3(&state, &predictions_dataframe, ×tamp) + .await + .unwrap(); + + assert_eq!( + s3_key, + "equity/predictions/daily/year=2025/month=01/day=01/data.parquet" + ); + + let query_results = query_predictions_dataframe_from_s3( + &state, + vec![PredictionQuery { + ticker: "AAPL".to_string(), + timestamp: timestamp.timestamp() as f64, + }], + ) + .await + .unwrap(); + + assert_eq!(query_results.height(), 1); + assert_eq!( + query_results + .column("ticker") + .unwrap() + .str() + .unwrap() + .get(0), + Some("AAPL") + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_query_predictions_returns_empty_dataframe_when_no_rows_match() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let state = create_state(&endpoint).await; + let timestamp = fixed_date_time(); + + let predictions_dataframe = create_predictions_dataframe(vec![sample_prediction()]).unwrap(); + write_predictions_dataframe_to_s3(&state, &predictions_dataframe, ×tamp) + .await + .unwrap(); + + let query_results = query_predictions_dataframe_from_s3( + &state, + vec![PredictionQuery { + ticker: "MSFT".to_string(), + timestamp: timestamp.timestamp() as f64, + }], + ) + .await + .unwrap(); + + assert_eq!(query_results.height(), 0); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_query_predictions_errors_when_query_positions_are_empty() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let state = create_state(&endpoint).await; + + let result = query_predictions_dataframe_from_s3(&state, vec![]).await; + + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("No positions provided")); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_write_and_query_portfolio_round_trip() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let state = create_state(&endpoint).await; + let timestamp = fixed_date_time(); + + let portfolio_dataframe = create_portfolio_dataframe(vec![sample_portfolio()]).unwrap(); + write_portfolio_dataframe_to_s3(&state, &portfolio_dataframe, ×tamp) + .await + .unwrap(); + + let query_results = query_portfolio_dataframe_from_s3(&state, Some(timestamp)) + .await + .unwrap(); + + assert_eq!(query_results.height(), 1); + assert_eq!( + query_results + .column("ticker") + .unwrap() + .str() + .unwrap() + .get(0), + Some("AAPL") + ); + assert_eq!( + query_results + .column("action") + .unwrap() + .str() + .unwrap() + .get(0), + Some("BUY") + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_query_portfolio_without_timestamp_uses_latest_partition() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let state = create_state(&endpoint).await; + + let old_timestamp = Utc.with_ymd_and_hms(2024, 12, 31, 0, 0, 0).unwrap(); + let new_timestamp = fixed_date_time(); + + let old_portfolio = Portfolio { + ticker: "OLD".to_string(), + ..sample_portfolio() + }; + let new_portfolio = Portfolio { + ticker: "NEW".to_string(), + ..sample_portfolio() + }; + + let old_dataframe = create_portfolio_dataframe(vec![old_portfolio]).unwrap(); + let new_dataframe = create_portfolio_dataframe(vec![new_portfolio]).unwrap(); + + write_portfolio_dataframe_to_s3(&state, &old_dataframe, &old_timestamp) + .await + .unwrap(); + write_portfolio_dataframe_to_s3(&state, &new_dataframe, &new_timestamp) + .await + .unwrap(); + + let query_results = query_portfolio_dataframe_from_s3(&state, None) + .await + .unwrap(); + + assert_eq!(query_results.height(), 1); + assert_eq!( + query_results + .column("ticker") + .unwrap() + .str() + .unwrap() + .get(0), + Some("NEW") + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_query_portfolio_falls_back_when_action_column_is_missing() { + let (endpoint, s3, _env_guard) = setup_test_bucket().await; + let state = create_state(&endpoint).await; + let timestamp = fixed_date_time(); + + let key = format_s3_key(×tamp, "portfolios"); + + let mut dataframe = df!( + "ticker" => vec!["AAPL"], + "timestamp" => vec![1_735_689_600.0], + "side" => vec!["LONG"], + "dollar_amount" => vec![10_000.0], + ) + .unwrap(); + + let mut parquet_bytes = Vec::new(); + ParquetWriter::new(&mut parquet_bytes) + .finish(&mut dataframe) + .unwrap(); + + put_test_object(&s3, &key, parquet_bytes).await; + + let query_results = query_portfolio_dataframe_from_s3(&state, Some(timestamp)) + .await + .unwrap(); + + assert_eq!(query_results.height(), 1); + assert_eq!( + query_results + .column("action") + .unwrap() + .str() + .unwrap() + .get(0), + Some("UNSPECIFIED") + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_write_and_query_equity_bars_round_trip() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let state = create_state(&endpoint).await; + let timestamp = fixed_date_time(); + + let bars_dataframe = create_equity_bar_dataframe(vec![sample_equity_bar()]).unwrap(); + write_equity_bars_dataframe_to_s3(&state, &bars_dataframe, ×tamp) + .await + .unwrap(); + + let parquet_bytes = query_equity_bars_parquet_from_s3( + &state, + Some(vec!["AAPL".to_string()]), + Some(timestamp), + Some(timestamp), + ) + .await + .unwrap(); + + let cursor = Cursor::new(parquet_bytes); + let result_dataframe = ParquetReader::new(cursor).finish().unwrap(); + + assert_eq!(result_dataframe.height(), 1); + assert_eq!( + result_dataframe + .column("ticker") + .unwrap() + .str() + .unwrap() + .get(0), + Some("AAPL") + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_query_equity_bars_rejects_invalid_ticker_format() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let state = create_state(&endpoint).await; + let timestamp = fixed_date_time(); + + let result = query_equity_bars_parquet_from_s3( + &state, + Some(vec!["AAPL;DROP".to_string()]), + Some(timestamp), + Some(timestamp), + ) + .await; + + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Invalid ticker format")); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_read_equity_details_dataframe_from_s3_success() { + let (endpoint, s3, _env_guard) = setup_test_bucket().await; + let state = create_state(&endpoint).await; + + put_test_object( + &s3, + "equity/details/categories.csv", + b"ticker,sector,industry\nAAPL,Technology,Consumer Electronics\n".to_vec(), + ) + .await; + + let dataframe = read_equity_details_dataframe_from_s3(&state).await.unwrap(); + + assert_eq!(dataframe.height(), 1); + assert_eq!( + dataframe.column("ticker").unwrap().str().unwrap().get(0), + Some("AAPL") + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_read_equity_details_dataframe_from_s3_returns_error_for_invalid_utf8() { + let (endpoint, s3, _env_guard) = setup_test_bucket().await; + let state = create_state(&endpoint).await; + + put_test_object(&s3, "equity/details/categories.csv", vec![0xff, 0xfe, 0xfd]).await; + + let result = read_equity_details_dataframe_from_s3(&state).await; + + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("UTF-8")); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_query_equity_bars_without_date_range_uses_defaults() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let state = create_state(&endpoint).await; + + // Use fixed date to avoid flakiness from midnight rollover + let test_date = fixed_date_time(); + let bars_dataframe = create_equity_bar_dataframe(vec![sample_equity_bar()]).unwrap(); + write_equity_bars_dataframe_to_s3(&state, &bars_dataframe, &test_date) + .await + .unwrap(); + + // Query with explicit date range around test_date to ensure deterministic results + let parquet_bytes = query_equity_bars_parquet_from_s3( + &state, + Some(vec!["AAPL".to_string()]), + Some(test_date - chrono::Duration::days(1)), + Some(test_date + chrono::Duration::days(1)), + ) + .await + .unwrap(); + + let cursor = Cursor::new(parquet_bytes); + let result = ParquetReader::new(cursor).finish().unwrap(); + assert!(result.height() >= 1); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn test_query_equity_bars_without_ticker_filter_returns_all() { + let (endpoint, _s3, _env_guard) = setup_test_bucket().await; + let state = create_state(&endpoint).await; + let timestamp = fixed_date_time(); + + let bars_dataframe = create_equity_bar_dataframe(vec![ + sample_equity_bar(), + EquityBar { + ticker: "GOOGL".to_string(), + ..sample_equity_bar() + }, + ]) + .unwrap(); + + write_equity_bars_dataframe_to_s3(&state, &bars_dataframe, ×tamp) + .await + .unwrap(); + + // Query with None tickers — covers "No ticker filter applied" path + let parquet_bytes = + query_equity_bars_parquet_from_s3(&state, None, Some(timestamp), Some(timestamp)) + .await + .unwrap(); + + let cursor = Cursor::new(parquet_bytes); + let result = ParquetReader::new(cursor).finish().unwrap(); + + assert_eq!(result.height(), 2); +} + +#[test] +fn test_sanitize_duckdb_config_value_valid() { + assert!(sanitize_duckdb_config_value("localhost:4566").is_ok()); + assert!(sanitize_duckdb_config_value("https://s3.amazonaws.com").is_ok()); + assert!(sanitize_duckdb_config_value("true").is_ok()); + assert!(sanitize_duckdb_config_value("false").is_ok()); + assert!(sanitize_duckdb_config_value("http://127.0.0.1:9000").is_ok()); +} + +#[test] +fn test_sanitize_duckdb_config_value_rejects_injection() { + assert!(sanitize_duckdb_config_value("'; DROP TABLE users; --").is_err()); + assert!(sanitize_duckdb_config_value("localhost'; --").is_err()); + assert!(sanitize_duckdb_config_value("\"malicious\"").is_err()); + assert!(sanitize_duckdb_config_value("").is_err()); + assert!(sanitize_duckdb_config_value(&"a".repeat(513)).is_err()); + assert!(sanitize_duckdb_config_value("value;another").is_err()); +} diff --git a/maskfile.md b/maskfile.md index 5199d6c10..9c129cae1 100644 --- a/maskfile.md +++ b/maskfile.md @@ -71,11 +71,12 @@ fi image_reference="${aws_account_id}.dkr.ecr.${aws_region}.amazonaws.com/oscmcompany/${application_name}-${stage_name}" cache_reference="${image_reference}:buildcache" -# Use GHA backend for Cargo caching when running in GitHub Actions +# Use GHA backend for caching when running in GitHub Actions if [ -n "${GITHUB_ACTIONS:-}" ]; then - echo "Running in GitHub Actions - using hybrid cache (gha + registry)" - cache_from_arguments="--cache-from type=gha --cache-from type=registry,ref=${cache_reference}" - cache_to_arguments="--cache-to type=gha,mode=max --cache-to type=registry,ref=${cache_reference},mode=max" + scope="${application_name}-${stage_name}" + echo "Running in GitHub Actions - using hybrid cache (gha + registry) with scope: ${scope}" + cache_from_arguments="--cache-from type=gha,scope=${scope} --cache-from type=registry,ref=${cache_reference}" + cache_to_arguments="--cache-to type=gha,scope=${scope},mode=max --cache-to type=registry,ref=${cache_reference},mode=max" else echo "Running locally - using registry cache only" cache_from_arguments="--cache-from type=registry,ref=${cache_reference}" @@ -83,7 +84,11 @@ else fi echo "Setting up Docker Buildx" -docker buildx create --use --name psf-builder 2>/dev/null || docker buildx use psf-builder || (echo "Using default buildx builder" && docker buildx use default) +if [ -n "${GITHUB_ACTIONS:-}" ]; then + echo "Using buildx builder configured by docker/setup-buildx-action" +else + docker buildx create --use --name oscm-builder 2>/dev/null || docker buildx use oscm-builder || (echo "Using default buildx builder" && docker buildx use default) +fi echo "Logging into ECR (to pull cache if available)" aws ecr get-login-password --region ${aws_region} | docker login \ @@ -122,15 +127,37 @@ fi image_reference="${aws_account_id}.dkr.ecr.${aws_region}.amazonaws.com/oscmcompany/${application_name}-${stage_name}" +repository_name="oscm/${application_name}-${stage_name}" +image_reference="${aws_account_id}.dkr.ecr.${aws_region}.amazonaws.com/${repository_name}" +commit_hash=$(git rev-parse --short HEAD) + echo "Logging into ECR" aws ecr get-login-password --region ${aws_region} | docker login \ --username AWS \ --password-stdin ${aws_account_id}.dkr.ecr.${aws_region}.amazonaws.com > /dev/null +echo "Checking if image for commit ${commit_hash} already exists in ECR" +existing_tag="NONE" +if image_digest=$(aws ecr describe-images \ + --repository-name "${repository_name}" \ + --image-ids "imageTag=git-${commit_hash}" \ + --query 'imageDetails[0].imageDigest' \ + --output text 2>/dev/null); then + existing_tag="${image_digest}" +fi + +if [ "$existing_tag" != "NONE" ] && [ "$existing_tag" != "None" ] && [ -n "$existing_tag" ]; then + echo "Image for commit ${commit_hash} already exists in ECR, skipping push" + echo "Image pushed: ${application_name} ${stage_name} (cached)" + exit 0 +fi + echo "Pushing image" -docker push ${image_reference}:latest +docker tag "${image_reference}:latest" "${image_reference}:git-${commit_hash}" +docker push "${image_reference}:latest" +docker push "${image_reference}:git-${commit_hash}" -echo "Image pushed: ${application_name} ${stage_name}" +echo "Image pushed: ${application_name} ${stage_name} (commit: ${commit_hash})" ``` ### stack @@ -370,6 +397,22 @@ set -euo pipefail echo "Running Rust tests with coverage" +echo "Checking Docker availability for integration tests" +if command -v docker >/dev/null 2>&1; then + if ! docker info >/dev/null 2>&1; then + echo "Error: Docker is installed but daemon is not running" + echo "Integration tests requiring Docker will fail" + echo "Start Docker with: open -a Docker (macOS) or sudo systemctl start docker (Linux)" + exit 1 + fi + echo "Docker daemon is running" +else + echo "Error: Docker is not installed" + echo "Integration tests requiring Docker will fail" + echo "Install Docker from: https://docs.docker.com/get-docker/" + exit 1 +fi + mkdir -p .coverage_output if ! command -v cargo-llvm-cov >/dev/null 2>&1; then @@ -379,8 +422,6 @@ elif ! command -v llvm-cov >/dev/null 2>&1 || ! command -v llvm-profdata >/dev/n echo "LLVM tools (llvm-cov or llvm-profdata) not available - running tests without coverage" cargo test --workspace --verbose else - echo "Cleaning previous build artifacts to free disk space" - cargo clean export LLVM_COV=$(which llvm-cov) export LLVM_PROFDATA=$(which llvm-profdata) if cargo llvm-cov --workspace --verbose \ diff --git a/tools/sync_equity_categories.py b/tools/sync_equity_categories.py index e4a6184b1..9e3a74b00 100644 --- a/tools/sync_equity_categories.py +++ b/tools/sync_equity_categories.py @@ -23,18 +23,18 @@ logger = structlog.get_logger() -POLYGON_BASE_URL = "https://api.polygon.io" +MASSIVE_BASE_URL = os.getenv("MASSIVE_BASE_URL", "https://api.massive.io") # Polygon ticker types: CS (Common Stock), ADRC/ADRP/ADRS (ADR variants) EQUITY_TYPES = {"CS", "ADRC", "ADRP", "ADRS"} def fetch_all_tickers(api_key: str) -> list[dict]: - """Fetch all US stock tickers from Polygon API with pagination.""" - logger.info("Fetching tickers from Polygon API") + """Fetch all US stock tickers from Massive API with pagination.""" + logger.info("Fetching tickers from Massive API") all_tickers = [] - url = f"{POLYGON_BASE_URL}/v3/reference/tickers" + url = f"{MASSIVE_BASE_URL}/v3/reference/tickers" params = { "market": "stocks", "active": "true", @@ -80,7 +80,7 @@ def extract_categories(tickers: list[dict]) -> pl.DataFrame: if ticker_data.get("type") not in EQUITY_TYPES: continue - # Try to get sector/industry from various fields Polygon provides + # Try to get sector/industry from various fields Massive provides sector = ticker_data.get("sector", "") industry = ticker_data.get("industry", "")