Conversation
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughReplaces the Python FastAPI datamanager with a Rust Axum service: adds Cargo/Rust project layout, AppState with reqwest and AWS S3 clients populated from env, new /health and /equity routes (S3+DuckDB Parquet read path and existing sync/upload), integration tests, and updates local AI command allowlist. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant C as Client
participant Ax as Axum Router
participant EQ as Equity Handler
participant ST as AppState
participant S3 as AWS S3
participant DD as DuckDB (HTTPFS)
rect rgb(245,248,255)
note over Ax,ST: App initialization
Ax->>ST: create_app() -> AppState::from_env()
ST->>ST: load env, build reqwest & S3 clients
ST-->>Ax: Router with shared AppState
end
C->>Ax: GET /equity?start_date&end_date
Ax->>EQ: handler(AppState, query)
EQ->>DD: init DuckDB + httpfs, configure AWS creds
EQ->>S3: request per-day parquet paths
S3-->>DD: parquet content
DD-->>EQ: combined parquet file -> read bytes
EQ-->>C: 200 application/octet-stream (attachment) or 500 error
sequenceDiagram
autonumber
participant C as Client
participant Ax as Axum Router
participant HL as Health Handler
C->>Ax: GET /health
Ax->>HL: handler(AppState)
HL-->>C: 200 OK (empty body)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
This stack of pull requests is managed by Graphite. Learn more about stacking. |
Graphite Automations"Assign author to pull request" took an action on this PR • (09/04/25)1 assignee was added to this PR based on John Forstmeier's automation. |
a453dfc to
ac36cee
Compare
ac36cee to
c8141ac
Compare
forstmeier
left a comment
There was a problem hiding this comment.
Broadly things look fine on this PR (and add in fixes asked for on the preceding PR) but I'll hold off approving just to make sure things get fixes upstream (e.g. Python tests) so merges don't get funky.
|
Marked stale due to inactivity. Remove stale label or comment or this will be closed. |
|
@chrisaddy I took the "stale" label off the pull request. |
There was a problem hiding this comment.
Pull Request Overview
This PR implements S3 data querying functionality for the equity data endpoint, transitioning from a simple placeholder to a fully functional system that can fetch, query, and return equity data stored in S3 as Parquet files.
Key changes:
- Added DuckDB-based S3 querying with httpfs extension for direct Parquet file access
- Implemented comprehensive integration tests covering various endpoint scenarios
- Restructured the application architecture by extracting main.rs logic into a reusable lib.rs module
Reviewed Changes
Copilot reviewed 7 out of 9 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| applications/datamanager/tests/integration_tests.rs | Added comprehensive integration tests covering health, equity fetch/sync endpoints with various scenarios |
| applications/datamanager/src/routes/equity.rs | Implemented S3 data querying with DuckDB, AWS credential handling, and date range filtering |
| applications/datamanager/src/main.rs | Simplified main function to use extracted create_app function |
| applications/datamanager/src/lib.rs | Extracted application setup logic into reusable library module |
| applications/datamanager/src/datamanager/main.py | Removed Python FastAPI implementation |
| applications/datamanager/Cargo.toml | Added dependencies and dev-dependencies for S3 querying and testing |
| applications/datamanager/.claude/settings.local.json | Added additional bash command permissions |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
applications/datamanager/Cargo.toml (1)
14-30: Add tempfile for secure temp files used in equity fetch.Fetching currently writes to /tmp with a predictable name. Add tempfile and use NamedTempFile in the handler.
[dependencies] axum = "0.8.4" chrono = { version = "0.4.41", features = ["serde"] } polars = { version = "0.50.0", features = ["json", "lazy", "parquet", "temporal"] } reqwest = "0.12.23" serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.143" tokio = { version = "1.47.1", features = ["full"] } tower = "0.5.2" tower-http = { version = "0.6.6", features = ["trace"] } tracing = "0.1.41" tracing-subscriber = { version = "0.3.20", features = ["env-filter", "fmt"] } aws-config = "1.5.8" aws-sdk-s3 = "1.48.0" aws-credential-types = "1.2.6" duckdb = { version = "1.0", features = ["r2d2", "chrono"] } +tempfile = "3.12"
♻️ Duplicate comments (1)
applications/datamanager/tests/integration_tests.rs (1)
24-46: Remove explanatory comments per prior feedback.Carry forward earlier note to strip comments to keep tests terse.
🧹 Nitpick comments (18)
applications/datamanager/.claude/settings.local.json (1)
15-17: Tighten allowed Bash commands; current patterns are overly broad.
- "Bash(brew install:*)" permits arbitrary package installs.
- "Bash(export:*)" permits exporting arbitrary env vars (risk of leaking/overwriting secrets).
Constrain these to specific, required commands (e.g., only needed formulae and specific AWS-related exports), or gate behind a prompt. Example:
- "Bash(brew install duckdb)"
- "Bash(export AWS_REGION=)", "Bash(export AWS_ACCESS_KEY_ID=)", "Bash(export AWS_SECRET_ACCESS_KEY=*)"
applications/datamanager/Cargo.toml (1)
31-37: Unify duplicate versions across deps/dev-deps to avoid dual resolution.
- serde_json appears as "1.0.143" in deps and "1.0" in dev-deps.
- tower is "0.5.2" in deps and "0.5" in dev-deps.
- reqwest is "0.12.23" vs "0.12".
Align them to exact versions:
[dev-dependencies] tokio-test = "0.4" -serde_json = "1.0" -tower = { version = "0.5", features = ["util"] } -hyper = { version = "1.0", features = ["full"] } -http-body-util = "0.1" -reqwest = { version = "0.12", features = ["json"] } +serde_json = "1.0.143" +tower = { version = "0.5.2", features = ["util"] } +hyper = { version = "1.0", features = ["full"] } +http-body-util = "0.1" +reqwest = { version = "0.12.23", features = ["json"] }applications/datamanager/src/main.rs (2)
8-9: Fix default EnvFilter target."example=debug" looks leftover. Prefer your crate/module name.
- .unwrap_or_else(|_| "example=debug,tower_http=debug,axum=debug".into()), + .unwrap_or_else(|_| "datamanager=info,tower_http=info,axum=info".into()),
15-17: Read bind address/port from env and support graceful shutdown.
- Make port configurable (e.g., PORT or DATAMANAGER_PORT).
- Add graceful shutdown for SIGINT/SIGTERM (k8s/containers).
- let listener = tokio::net::TcpListener::bind("0.0.0.0:9000").await.unwrap(); - - axum::serve(listener, app).await.unwrap(); + let addr = std::env::var("DATAMANAGER_ADDR").unwrap_or_else(|_| "0.0.0.0:9000".into()); + let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); + axum::serve(listener, app) + .with_graceful_shutdown(shutdown_signal()) + .await + .unwrap();Add this helper in the same file:
async fn shutdown_signal() { use tokio::signal; let ctrl_c = async { let _ = signal::ctrl_c().await; }; #[cfg(unix)] let terminate = async { let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate()).unwrap(); sigterm.recv().await; }; #[cfg(not(unix))] let terminate = std::future::pending::<()>(); tokio::select! { _ = ctrl_c => {}, _ = terminate => {}, } }applications/datamanager/tests/integration_tests.rs (3)
52-63: GET with JSON body is unconventional; prefer query params.Either switch tests to query string or update handler to accept Query (and keep JSON optional for backwards-compat).
Example test change:
- let request = Request::builder() - .method("GET") - .uri("/equity") - .header("content-type", "application/json") - .body(Body::from(request_body.to_string())) + let request = Request::builder() + .method("GET") + .uri("/equity?start_date=2025-08-29&end_date=2025-08-29") + .body(Body::empty()) .unwrap();
214-216: Invalid date format may be 400, not 422, for Json extractor.Confirm actual status and align tests (or add custom rejection to map deserialization errors to 422).
- assert_eq!(response.status(), StatusCode::UNPROCESSABLE_ENTITY); + assert!(response.status().is_client_error());
274-276: Avoid fixed sleep; poll /health with backoff.Fixed delays are flaky. Poll until ready or timeout.
- tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let client = reqwest::Client::new(); + let start = std::time::Instant::now(); + loop { + if start.elapsed() > std::time::Duration::from_secs(5) { break; } + if let Ok(resp) = client.get(&format!("http://127.0.0.1:{}/health", port)).send().await { + if resp.status().is_success() { break; } + } + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + }applications/datamanager/src/routes/equity.rs (6)
113-141: Support query params for GET; keep JSON as optional fallback.Accept both Option<Query> and Option<Json> so clients don’t need a GET body.
-use axum::{ - body::Body, - extract::{Json, State}, - http::{header, StatusCode}, - response::{IntoResponse, Response}, - routing::{get, post}, - Router, -}; +use axum::{ + body::Body, + extract::{Json, State, Query}, + http::{header, StatusCode}, + response::{IntoResponse, Response}, + routing::{get, post}, + Router, +}; -async fn fetch(State(state): State<AppState>, query: Option<Json<DateRangeQuery>>) -> Response { +async fn fetch( + State(state): State<AppState>, + query: Option<Query<DateRangeQuery>>, + body: Option<Json<DateRangeQuery>>, +) -> Response { info!("Fetching equity data from S3 partitioned files"); - - match query_s3_parquet_data(&state, query).await { + let effective = match (query, body) { + (Some(q), _) => Some(q.0), + (None, Some(j)) => Some(j.0), + _ => None, + }; + match query_s3_parquet_data(&state, effective).await {
186-205: Handle missing-day files more robustly.Current approach builds explicit per-day paths and UNION ALL; any missing file will error the whole query. Prefer one of:
- List existing keys via S3 ListObjectsV2 and only include those.
- Or upgrade to DuckDB ≥1.1 and use parquet_scan/read_parquet with globbing and allow_missing_files=true, plus predicate pushdown on date/timestamp.
73-88: Avoid cloning DataFrame in Parquet writer call.Clone then taking &mut of a temporary is wasteful. Create a local mutable clone once.
- { - let cursor = Cursor::new(&mut buffer); - let writer = ParquetWriter::new(cursor); - match writer.finish(&mut df.clone()) { + { + let cursor = Cursor::new(&mut buffer); + let writer = ParquetWriter::new(cursor); + let mut df_tmp = df.clone(); + match writer.finish(&mut df_tmp) {
31-45: Use integer types for count-like fields to avoid float→int casts.Polygon’s "v" (volume) and "n" (transactions) are counts. Parsing them as f64 then casting risks truncation/precision issues.
struct BarResult { #[serde(rename = "T")] ticker: String, // TODO: money types c: Option<f64>, h: Option<f64>, l: Option<f64>, - n: Option<f64>, + n: Option<u64>, o: Option<f64>, // otc: bool, t: i64, - v: Option<f64>, + v: Option<u64>, vw: Option<f64>, }And update the DataFrame builders accordingly (see below).
337-356: Align DataFrame column builders with integer types for volume/transactions.- let volumes: Vec<Option<u64>> = bars.iter().map(|b| b.v.map(|v| v as u64)).collect(); + let volumes: Vec<Option<u64>> = bars.iter().map(|b| b.v).collect(); @@ - let num_transactions: Vec<Option<u64>> = bars.iter().map(|b| b.n.map(|n| n as u64)).collect(); + let num_transactions: Vec<Option<u64>> = bars.iter().map(|b| b.n).collect();
116-131: Consider streaming the parquet response to reduce memory spikes.Instead of collecting all bytes, stream the temp file via ReaderStream.
Minimal change (after switching to NamedTempFile, return a stream from the file):
// After writing the parquet to temp_file: let file = tokio::fs::File::open(&temp_file).await.map_err(|e| format!("open: {}", e))?; let stream = tokio_util::io::ReaderStream::new(file); let body = Body::from_stream(stream); // build Response with `body`This avoids holding large parquet files in RAM.
applications/datamanager/src/lib.rs (5)
24-31: Limit AppState field exposureKeep internals crate‑private to avoid unintended external mutation and to make future refactors easier.
#[derive(Clone)] pub struct AppState { - pub client: Client, - pub polygon: PolygonSecrets, - pub alpaca: AlpacaSecrets, - pub s3_client: S3Client, - pub bucket_name: String, + pub(crate) client: Client, + pub(crate) polygon: PolygonSecrets, + pub(crate) alpaca: AlpacaSecrets, + pub(crate) s3_client: S3Client, + pub(crate) bucket_name: String, }
35-38: Avoid panic on client buildPrefer an
expectwith context (or bubble up a Result) instead ofunwrap()to aid troubleshooting and reduce panic risk.- .build() - .unwrap(); + .build() + .expect("failed to build reqwest client")
42-44: Minor: avoid unnecessary allocation in env fallbackUse
unwrap_or_elseto avoid allocating the defaultStringwhen the env var is set.- let bucket_name = - std::env::var("S3_BUCKET_NAME").unwrap_or("pocketsizefund-ml-data".to_string()); + let bucket_name = + std::env::var("S3_BUCKET_NAME").unwrap_or_else(|_| "pocketsizefund-ml-data".to_owned());
4-4: Remove unused tracing_subscriber importNot used here; keeping it in the lib triggers warnings. Initialize tracing in main instead.
-use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
40-41: Prefer aws_config::defaults(BehaviorVersion::latest()).load().await (optional)aws_config::defaults(BehaviorVersion::latest()).load().await is the recommended pattern; aws_config::load_defaults(...) remains a thin async wrapper, so switching is optional.
File: applications/datamanager/src/lib.rs Lines: 40-41
- let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; + let config = aws_config::defaults(aws_config::BehaviorVersion::latest()).load().await;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.lockapplications/datamanager/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (7)
applications/datamanager/.claude/settings.local.json(1 hunks)applications/datamanager/Cargo.toml(2 hunks)applications/datamanager/src/datamanager/main.py(0 hunks)applications/datamanager/src/lib.rs(1 hunks)applications/datamanager/src/main.rs(2 hunks)applications/datamanager/src/routes/equity.rs(4 hunks)applications/datamanager/tests/integration_tests.rs(1 hunks)
💤 Files with no reviewable changes (1)
- applications/datamanager/src/datamanager/main.py
🧰 Additional context used
🧬 Code graph analysis (3)
applications/datamanager/tests/integration_tests.rs (2)
applications/datamanager/src/lib.rs (1)
create_app(67-75)applications/datamanager/src/routes/mod.rs (2)
equity(1-1)health(2-2)
applications/datamanager/src/lib.rs (2)
applications/datamanager/src/routes/health.rs (1)
check(6-8)applications/datamanager/src/routes/equity.rs (1)
router(399-403)
applications/datamanager/src/main.rs (1)
applications/datamanager/src/lib.rs (1)
create_app(67-75)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Run Python tests
🔇 Additional comments (2)
applications/datamanager/Cargo.toml (1)
29-29: Confirm httpfs availability with DuckDB 1.0."INSTALL httpfs; LOAD httpfs;" downloads at runtime. This can fail in restricted CI/prod. Consider vendoring extensions or upgrading to a build that embeds httpfs. Please verify CI/prod images have network access and extension dir write permissions.
applications/datamanager/tests/integration_tests.rs (1)
192-195: Invalid JSON vs invalid shape: assert the right status codes.Axum returns 400 for malformed JSON. Keep this as-is, but ensure it's consistent with your rejection handling.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
applications/datamanager/src/routes/equity.rs (2)
153-171: S3 config: derive region dynamically, include session token, and escape creds
Hardcoding region and omitting the session token breaks STS/SSO/EKS flows; embedding raw creds without escaping risks injection via stray quotes. Use the loaded AWS config’s region, includesession_tokenwhen present, and escape single quotes. This aligns with the PR objective “Implements AWS credential handling.”Apply this diff:
let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; let provider = config .credentials_provider() .ok_or_else(|| "No AWS credentials provider available".to_string())?; let credentials = provider .provide_credentials() .await .map_err(|e| format!("Failed to get AWS credentials: {}", e))?; - let s3_config = format!( + let region = config + .region() + .map(|r| r.to_string()) + .or_else(|| std::env::var("AWS_REGION").ok()) + .unwrap_or_else(|| "us-east-1".to_string()); + let esc = |s: &str| s.replace('\'', "''"); + let mut s3_config = format!( " - SET s3_region='us-east-1'; + SET s3_region='{}'; SET s3_url_style='path'; - SET s3_access_key_id='{}'; - SET s3_secret_access_key='{}'; + SET s3_access_key_id='{}'; + SET s3_secret_access_key='{}'; ", - credentials.access_key_id(), - credentials.secret_access_key() + region, + esc(credentials.access_key_id()), + esc(credentials.secret_access_key()) ); + if let Some(token) = credentials.session_token() { + s3_config.push_str(&format!("SET s3_session_token='{}';\n", esc(token))); + } conn.execute_batch(&s3_config) .map_err(|e| format!("Failed to configure S3 settings: {}", e))?;Also applies to: 162-175
216-221: Secure temp file handling: use tempfile::NamedTempFile (avoid predictable /tmp path)
The current/tmp/query_result_*.parquetis predictable (TOCTOU/symlink risk) and requires manual cleanup. Prefertempfile.Apply these diffs:
use polars::prelude::*; use std::io::Cursor; +use tempfile::NamedTempFile; use tracing::{debug, info};- // Create a temporary parquet file path - let temp_file = format!( - "/tmp/query_result_{}.parquet", - Utc::now().timestamp_micros() - ); + // Create a secure temporary parquet file + let tmp = NamedTempFile::new() + .map_err(|e| format!("Failed to create temp file: {}", e))?; + let temp_file = tmp.path().to_string_lossy().to_string();- let parquet_data = - std::fs::read(&temp_file).map_err(|e| format!("Failed to read parquet file: {}", e))?; - - if let Err(e) = std::fs::remove_file(&temp_file) { - info!("Failed to clean up temp file {}: {}", temp_file, e); - } + let parquet_data = + std::fs::read(&temp_file).map_err(|e| format!("Failed to read parquet file: {}", e))?; + // tmp is deleted automatically when droppedAlso applies to: 247-253, 17-18
🧹 Nitpick comments (5)
applications/datamanager/src/routes/equity.rs (5)
184-186: Validate date range and return a clear error early
Guard againststart_date > end_dateto avoid confusing “No files to query” and unnecessary work.Apply this diff:
+ if start_date > end_date { + return Err("Invalid date range: start_date must be <= end_date".to_string()); + } info!("Querying data from {} to {}", start_date, end_date);
119-123: Set a more specific Content-Type for Parquet
Preferapplication/vnd.apache.parquetover generic octet-stream for better client detection.- response.headers_mut().insert( - header::CONTENT_TYPE, - "application/octet-stream".parse().unwrap(), - ); + response.headers_mut().insert( + header::CONTENT_TYPE, + "application/vnd.apache.parquet".parse().unwrap(), + );
135-139: Map client errors to 400 when applicable
Consider returning 400 for validation errors like an invalid date range (rather than 500). Easiest: validate and early-return infetchbefore calling the query function.
150-151: Blocking DuckDB work on async runtime
INSTALL/LOAD httpfs,execute_batch, and the S3 parquet export are CPU/IO-heavy blocking calls. Run them viatokio::task::spawn_blockingto avoid starving the async reactor on high load.Also applies to: 244-246
210-215: Replace N-way UNION with read_parquet([...]) and handle missing files explicitly
- Use read_parquet([...], union_by_name = TRUE, hive_partitioning = TRUE) instead of building a long
UNION ALL; DuckDB will handle schema union and pushdowns more efficiently.- DuckDB does NOT support an “ignore missing files” option — pre-list S3 paths and call read_parquet only when the list is non-empty, or catch the IO error and return an empty table.
File: applications/datamanager/src/routes/equity.rs (210–215; also applies to 222–241)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
applications/datamanager/src/lib.rs(1 hunks)applications/datamanager/src/routes/equity.rs(4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- applications/datamanager/src/lib.rs

Overview
Implement S3 data querying for equity data
Changes
Comments
This PR enables the fetch endpoint to query equity data stored in S3 partitioned Parquet files. It uses DuckDB with the httpfs extension to efficiently query the data directly from S3 without downloading entire files. The implementation supports date range filtering and handles AWS credentials properly.
Summary by CodeRabbit
New Features
Refactor
Tests
Chores