Skip to content

Datamanager predictions handlers#626

Merged
forstmeier merged 3 commits intomasterfrom
datamanager-predictions-handlers
Sep 17, 2025
Merged

Datamanager predictions handlers#626
forstmeier merged 3 commits intomasterfrom
datamanager-predictions-handlers

Conversation

@forstmeier
Copy link
Copy Markdown
Collaborator

@forstmeier forstmeier commented Sep 16, 2025

Overview

Changes

  • cleanup missed in master
  • added "save" method for "predictions" on datamanager
  • added "query" method for "predictions on datamanager

Comments

These bits of logic are necessary for correctly evaluating portfolio positions in the risk management logic in #625.

Summary by CodeRabbit

  • New Features

    • Added Predictions API with endpoints to upload prediction data and query results.
    • Cloud-backed storage and retrieval of prediction datasets (S3) with on-demand querying.
  • Bug Fixes / Changes

    • Updated equity data handling to integer-based per-bar storage with runtime conversion for downstream use.
  • Chores

    • Added a new data manager service component (library + binary) and reorganized routing modules for future endpoints.

@forstmeier forstmeier added this to the Refactor milestone Sep 16, 2025
@forstmeier forstmeier self-assigned this Sep 16, 2025
Copilot AI review requested due to automatic review settings September 16, 2025 13:39
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Sep 16, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds a new Rust crate "datamanager" with lib and bin targets, declares route submodules, implements a predictions API (POST -> upload Parquet to S3; GET -> query Parquet via in-memory DuckDB/httpfs), changes equity bar field types to unsigned integers, and minor formatting/JSON newline fix.

Changes

Cohort / File(s) Summary
Crate manifest
applications/datamanager/Cargo.toml
New crate manifest (v0.1.0, edition 2021) with lib and bin targets; adds dependencies for axum, tokio, polars, duckdb, aws-sdk-s3, serde, tracing, etc., and dev-deps.
Routing declarations
applications/datamanager/src/routes/mod.rs
Adds pub mod equity;, pub mod health;, and pub mod prediction;.
Predictions API
applications/datamanager/src/routes/prediction.rs
New Axum router exposing POST /predictions (serialize DataFrame to Parquet and upload to S3) and GET /predictions (build DuckDB query over S3 parquet via httpfs and return results). Adds payload types, Prediction struct, error mapping, S3 upload and DuckDB query helpers, and logging.
Equity route data model
applications/datamanager/src/routes/equity.rs
Changes BarResult numeric fields from floating-point/signed timestamp to unsigned integers (e.g., c,h,l,n,o,v,vw: Option<u64>, t: u64) and adjusts DataFrame construction conversions.
Library entry
applications/datamanager/src/lib.rs
Merges prediction::router() into main App router; removes unused import and trailing blank line (formatting only).
Local settings formatting
applications/datamanager/.claude/settings.local.json
End-of-file formatting adjusted (newline/blank-line); no semantic changes.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant C as Client
  participant R as Axum Router (datamanager)
  participant H1 as save_prediction
  participant H2 as query_prediction
  participant S3 as AWS S3
  participant DDB as DuckDB (in-memory)

  rect rgba(200,230,255,0.35)
  Note over C,R: POST /predictions (upload)
  C->>R: POST /predictions { DataFrame, timestamp }
  R->>H1: route -> handler
  H1->>S3: upload_dataframe_to_s3(parquet bytes)
  S3-->>H1: put_object response (key)
  H1-->>C: 200 OK (rows, key)
  end

  rect rgba(220,255,200,0.35)
  Note over C,R: GET /predictions (query)
  C->>R: GET /predictions { positions[], timestamp }
  R->>H2: route -> handler
  H2->>DDB: init DuckDB, install httpfs, set AWS creds
  H2->>DDB: execute SELECT ... FROM parquet(S3_paths) UNION ALL ...
  DDB-->>H2: rows (ticker, ts, q10, q50, q90)
  H2-->>C: 200 OK (JSON/DataFrame)
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Poem

A rabbit taps keys with a caffeinated cheer,
New routes hop in: predictions appear!
Parquet bites packed, S3 burrows deep,
DuckDB wakes from its in-memory sleep.
Logs like little carrots mark the trail—🥕

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "Datamanager predictions handlers" is concise and directly describes the main change (adding prediction-related handlers to the datamanager), so it clearly communicates the PR's primary purpose to reviewers.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch datamanager-predictions-handlers

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors the datamanager application from Python to Rust and adds new prediction handling capabilities. The changes introduce REST endpoints for saving and querying prediction data through S3 storage, which are necessary for portfolio position evaluation in risk management logic.

Key changes:

  • Complete rewrite from Python/FastAPI to Rust/Axum architecture
  • Addition of prediction data management endpoints (save and query operations)
  • Integration with S3 for data persistence using DuckDB for querying

Reviewed Changes

Copilot reviewed 18 out of 23 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
applications/datamanager/src/routes/prediction.rs New prediction handler with save/query endpoints for risk management
applications/datamanager/src/routes/equity.rs Equity data handling with S3 storage and Polygon API integration
applications/datamanager/src/lib.rs Main application setup with routing and state management
applications/datamanager/tests/integration_tests.rs Comprehensive integration tests for HTTP endpoints
applications/datamanager/Cargo.toml Rust dependencies and project configuration

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment thread applications/datamanager/src/routes/prediction.rs
Comment thread applications/datamanager/src/routes/prediction.rs Outdated
Comment thread applications/datamanager/src/lib.rs
Comment thread applications/datamanager/src/lib.rs
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 15

🧹 Nitpick comments (20)
applications/datamanager/src/routes/health.rs (1)

6-8: Nit: Return impl IntoResponse instead of a concrete Response.

Slightly cleaner and avoids an unnecessary concrete type.

-pub async fn check() -> Response {
-    (StatusCode::OK).into_response()
-}
+pub async fn check() -> impl IntoResponse {
+    StatusCode::OK
+}
.flox/env/manifest.toml (1)

38-40: Remove empty on-activate hook or use it.

Empty hook is noise. Remove it or initialize env (e.g., RUSTFLAGS, sccache).

-[hook]
-on-activate = '''
-'''
+#[hook]
+# on-activate = '''
+# export RUSTFLAGS="-C debuginfo=1"
+# '''
applications/datamanager/.claude/settings.local.json (1)

3-18: Allowlist may be overly permissive for a repo config.

Commands like brew install, export, kill/pkill, and blanket http POST can be risky if used outside isolated dev contexts. Consider narrowing to essentials or keeping this file untracked locally.

Do you intend this to be committed for all contributors, or should it live in a local, git-ignored profile?

applications/datamanager/Cargo.toml (2)

1-4: Optional: add metadata (rust-version/license/repository).

Improves tooling and CI gating; set MSRV via rust-version and include license/repository.


25-25: Unify reqwest features; drop duplicate dev deps.

Integration tests call Client::json() (applications/datamanager/tests/integration_tests.rs) while production code does not; serde_json is declared in both [dependencies] and [dev-dependencies]. Enable reqwest's "json" feature on the main dependency and remove the duplicate reqwest and serde_json entries from dev-dependencies.

-reqwest = "0.12.23"
+reqwest = { version = "0.12.23", features = ["json"] }
@@
-[dev-dependencies]
+[dev-dependencies]
 tokio-test = "0.4"
-serde_json = "1.0"
-tower = { version = "0.5", features = ["util"] }
-hyper = { version = "1.0", features = ["full"] }
-http-body-util = "0.1"
-reqwest = { version = "0.12", features = ["json"] }
+tower = { version = "0.5", features = ["util"] }
+hyper = { version = "1.0", features = ["full"] }
+http-body-util = "0.1"
applications/datamanager/bacon.toml (1)

20-23: Add a fmt job to keep formatting green locally.

Handy in Bacon; pair with CI.

 [jobs.test]
 command = ["cargo", "test"]
 need_stdout = true
+
+[jobs.fmt]
+command = ["cargo", "fmt", "--all", "--", "--check"]
+need_stdout = false
applications/datamanager/src/routes/prediction.rs (2)

111-115: Use tracing, not println!, for structured logs.

-                println!(
-                    "DataFrame successfully converted to parquet, size: {} bytes",
-                    buffer.len()
-                );
+                info!(
+                    "DataFrame successfully converted to parquet, size: {} bytes",
+                    buffer.len()
+                );

283-287: Non-standard GET with JSON body; consider POST /predictions/query.

GET bodies are often dropped by clients/proxies. Consider adding a POST route variant to improve interoperability.

Would you like me to wire a POST /predictions/query route and add tests?

applications/datamanager/src/lib.rs (1)

1-6: Remove unused tracing_subscriber imports from lib.

They’re needed in main.rs, not lib.rs.

-use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
applications/datamanager/src/main.rs (1)

8-11: Default log filter references “example”; set to crate targets or rely on env.

Minor cleanup to avoid noisy logs and irrelevant target.

-            tracing_subscriber::EnvFilter::try_from_default_env()
-                .unwrap_or_else(|_| "example=debug,tower_http=debug,axum=debug".into()),
+            tracing_subscriber::EnvFilter::try_from_default_env()
+                .unwrap_or_else(|_| "datamanager=info,tower_http=info,axum=info".into()),
applications/datamanager/src/mod.rs (1)

1-1: Likely dead file; remove or integrate via lib.rs.

This standalone src/mod.rs with a single use has no effect unless lib.rs mod-includes it.

applications/datamanager/tests/integration_tests.rs (1)

1-315: Add integration tests for /predictions save & query.

Coverage is missing for the new handlers.

Here’s a starter you can append to this file:

+#[tokio::test]
+async fn test_predictions_save_and_query_roundtrip() {
+    let app = datamanager::create_app().await;
+
+    // Minimal DataFrame payload represented as JSON rows for now (adjust to your accepted schema)
+    // Consider switching save endpoint to accept Parquet bytes to avoid large JSON bodies.
+    let save_body = serde_json::json!({
+        "timestamp": "2025-08-29T00:00:00Z",
+        "data": {
+            "columns": ["ticker","timestamp","quantile_10","quantile_50","quantile_90"],
+            "data": [
+                ["AAPL", 1693267200000i64, 0.1, 0.5, 0.9]
+            ]
+        }
+    });
+
+    let save_req = Request::builder()
+        .method("POST")
+        .uri("/predictions")
+        .header("content-type", "application/json")
+        .body(Body::from(save_body.to_string()))
+        .unwrap();
+    let save_resp = app.clone().oneshot(save_req).await.unwrap();
+    assert_eq!(save_resp.status(), StatusCode::OK);
+
+    // Query for the same day/ticker
+    let query_body = serde_json::json!({
+        "positions": [{
+            "ticker": "AAPL",
+            "timestamp": "2025-08-29T00:00:00Z"
+        }],
+        "timestamp": "2025-08-29T00:00:00Z"
+    });
+    let query_req = Request::builder()
+        .method("GET")
+        .uri("/predictions")
+        .header("content-type", "application/json")
+        .body(Body::from(query_body.to_string()))
+        .unwrap();
+    let query_resp = app.oneshot(query_req).await.unwrap();
+    assert!(
+        query_resp.status() == StatusCode::OK ||
+        query_resp.status() == StatusCode::INTERNAL_SERVER_ERROR
+    );
+}
applications/datamanager/src/routes/equity.rs (8)

250-257: Avoid blocking file I/O on async path

Use tokio::fs to prevent blocking the runtime thread.

-    // Read the parquet file into memory
-    let parquet_data =
-        std::fs::read(&temp_file).map_err(|e| format!("Failed to read parquet file: {}", e))?;
+    // Read the parquet file into memory (async)
+    let parquet_data = tokio::fs::read(&temp_file)
+        .await
+        .map_err(|e| format!("Failed to read parquet file: {}", e))?;
@@
-    if let Err(e) = std::fs::remove_file(&temp_file) {
+    if let Err(e) = tokio::fs::remove_file(&temp_file).await {

247-249: The DuckDB export blocks the reactor

The export is CPU/IO heavy; consider offloading to a blocking task (new Connection inside spawn_blocking since Connection may not be Send).

I can sketch a spawn_blocking refactor if you confirm the crate features allow opening a new DuckDB Connection inside the task.


63-67: Reduce cloning and enable Parquet compression

Avoid cloning the DataFrame just to satisfy writer.finish and use compression to cut S3 storage and egress.

-async fn upload_dataframe_to_s3(
-    df: &DataFrame,
+async fn upload_dataframe_to_s3(
+    df: &mut DataFrame,
     state: &AppState,
     date: &NaiveDate,
 ) -> Result<String, String> {
@@
-        let cursor = Cursor::new(&mut buffer);
-        let writer = ParquetWriter::new(cursor);
-        match writer.finish(&mut df.clone()) {
+        use polars::prelude::ParquetCompression;
+        let cursor = Cursor::new(&mut buffer);
+        let mut writer = ParquetWriter::new(cursor).with_compression(ParquetCompression::Zstd);
+        match writer.finish(df) {
@@
-            match upload_dataframe_to_s3(&df, &state, &payload.date).await {
+            match upload_dataframe_to_s3(&mut df, &state, &payload.date).await {

Note: if changing the function signature ripples too much, keep &DataFrame but clone once: let mut df = df.clone(); writer.finish(&mut df).

Also applies to: 77-92, 361-393


122-132: Avoid unwrap() on header parsing

Use HeaderValue::from_static to avoid panics.

-            response.headers_mut().insert(
-                header::CONTENT_TYPE,
-                "application/octet-stream".parse().unwrap(),
-            );
+            response.headers_mut().insert(
+                header::CONTENT_TYPE,
+                header::HeaderValue::from_static("application/octet-stream"),
+            );
@@
-                "attachment; filename=\"equity_data.parquet\""
-                    .parse()
-                    .unwrap(),
+                header::HeaderValue::from_static("attachment; filename=\"equity_data.parquet\""),

96-114: Prefer a Parquet content-type

Minor: application/x-parquet is more specific than octet-stream.

-        .content_type("application/octet-stream")
+        .content_type("application/x-parquet")

267-274: Nit: clarify log message

"name: {}" is misleading; use a clearer key.

-    info!("name: {}", payload.date);
+    info!("sync_date: {}", payload.date);

382-389: Don’t return full DataFrame text in responses

This can explode response sizes. Log a small head() and return a concise error message.

-                    let json_output = df.to_string();
-                    (
-                        StatusCode::BAD_GATEWAY,
-                        format!("DataFrame created but S3 upload failed: {}\n\n{}", err, json_output),
-                    )
+                    debug!("DataFrame head on S3 upload failure:\n{}", df.head(Some(5)));
+                    (
+                        StatusCode::BAD_GATEWAY,
+                        format!("DataFrame created ({} rows) but S3 upload failed: {}", df.height(), err),
+                    )

225-243: Optional: drop ORDER BY unless callers require sorted parquet

Sorting materially increases latency and cost on large ranges.

-            ORDER BY timestamp, ticker
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 89695a9 and 0a7d293.

⛔ Files ignored due to path filters (4)
  • .flox/env/manifest.lock is excluded by !**/*.lock
  • Cargo.lock is excluded by !**/*.lock
  • applications/datamanager/Cargo.lock is excluded by !**/*.lock
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (19)
  • .flox/env/manifest.toml (2 hunks)
  • .gitignore (1 hunks)
  • Cargo.toml (1 hunks)
  • applications/datamanager/.claude/settings.local.json (1 hunks)
  • applications/datamanager/Cargo.toml (1 hunks)
  • applications/datamanager/bacon.toml (1 hunks)
  • applications/datamanager/src/datamanager/alpaca_client.py (0 hunks)
  • applications/datamanager/src/datamanager/main.py (0 hunks)
  • applications/datamanager/src/datamanager/s3_client.py (0 hunks)
  • applications/datamanager/src/lib.rs (1 hunks)
  • applications/datamanager/src/main.rs (1 hunks)
  • applications/datamanager/src/mod.rs (1 hunks)
  • applications/datamanager/src/routes/equity.rs (1 hunks)
  • applications/datamanager/src/routes/health.rs (1 hunks)
  • applications/datamanager/src/routes/mod.rs (1 hunks)
  • applications/datamanager/src/routes/prediction.rs (1 hunks)
  • applications/datamanager/tests/integration_tests.rs (1 hunks)
  • applications/datamanager/tests/test_alpaca_client.py (0 hunks)
  • applications/datamanager/tests/test_s3_client.py (0 hunks)
💤 Files with no reviewable changes (5)
  • applications/datamanager/src/datamanager/s3_client.py
  • applications/datamanager/tests/test_s3_client.py
  • applications/datamanager/src/datamanager/alpaca_client.py
  • applications/datamanager/tests/test_alpaca_client.py
  • applications/datamanager/src/datamanager/main.py
🧰 Additional context used
🧬 Code graph analysis (5)
applications/datamanager/src/main.rs (1)
applications/datamanager/src/lib.rs (1)
  • create_app (67-75)
applications/datamanager/src/routes/equity.rs (1)
applications/datamanager/src/routes/prediction.rs (4)
  • upload_dataframe_to_s3 (89-149)
  • query_s3_parquet_data (178-281)
  • s3_paths (226-229)
  • tickers (232-235)
applications/datamanager/tests/integration_tests.rs (1)
applications/datamanager/src/lib.rs (1)
  • create_app (67-75)
applications/datamanager/src/lib.rs (2)
applications/datamanager/src/routes/health.rs (1)
  • check (6-8)
applications/datamanager/src/routes/equity.rs (1)
  • router (401-405)
applications/datamanager/src/routes/prediction.rs (1)
applications/datamanager/src/routes/equity.rs (4)
  • upload_dataframe_to_s3 (63-115)
  • query_s3_parquet_data (147-264)
  • s3_paths (213-216)
  • router (401-405)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Run Python tests
🔇 Additional comments (4)
.gitignore (1)

22-22: Workspace-friendly ignore looks good.

Ignoring all nested target/ dirs prevents noisy artifacts across the workspace.

Cargo.toml (1)

1-3: Root workspace + resolver=2: LGTM.

This will simplify cross-crate builds and feature unification.

applications/datamanager/Cargo.toml (1)

36-36: duckdb features (r2d2, chrono) present in 1.x — no action required.
Crates.io/docs show both features in current releases (latest 1.3.2).

applications/datamanager/src/routes/mod.rs (1)

1-3: Prediction routes present — add integration tests for save & query

Router registers POST /predictions -> save_prediction and GET /predictions -> query_prediction in applications/datamanager/src/routes/prediction.rs (router() — ~lines 283–286). I did not find integration tests covering predictions; add integration tests for both POST and GET (happy-path + validation/error cases) under applications/datamanager/tests or tests/integration.

Comment thread applications/datamanager/.claude/settings.local.json
Comment thread applications/datamanager/src/lib.rs
Comment thread applications/datamanager/src/lib.rs Outdated
Comment thread applications/datamanager/src/routes/equity.rs
Comment thread applications/datamanager/src/routes/equity.rs
Comment thread applications/datamanager/src/routes/prediction.rs
Comment thread applications/datamanager/src/routes/prediction.rs
Comment thread applications/datamanager/src/routes/prediction.rs
Comment thread applications/datamanager/src/routes/prediction.rs
Comment thread applications/datamanager/src/routes/prediction.rs
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
applications/datamanager/src/routes/equity.rs (1)

151-173: Honor AWS region/session token; avoid hard-coded us-east-1

Use region from aws_config and include session token. Prevents auth failures with STS and non-us-east-1 buckets.

-  let s3_config = format!(
-      "
-      SET s3_region='us-east-1';
-      SET s3_url_style='path';
-      SET s3_access_key_id='{}';
-      SET s3_secret_access_key='{}';
-  ",
-      credentials.access_key_id(),
-      credentials.secret_access_key()
-  );
+  let region = config
+      .region()
+      .map(|r| r.to_string())
+      .unwrap_or_else(|| "us-east-1".to_string());
+  let session_token = credentials.session_token().unwrap_or_default();
+  let s3_config = format!(
+      "
+      SET s3_region='{}';
+      SET s3_url_style='path';
+      SET s3_access_key_id='{}';
+      SET s3_secret_access_key='{}';
+      SET s3_session_token='{}';
+  ",
+      region,
+      credentials.access_key_id(),
+      credentials.secret_access_key(),
+      session_token
+  );
♻️ Duplicate comments (6)
applications/datamanager/src/routes/equity.rs (1)

111-114: GET body via Json is non‑standard (acknowledging defer preference)

Not blocking given your stated preference to defer Query extractor; consider documenting that clients must send GET with a body.

applications/datamanager/src/routes/prediction.rs (5)

288-292: GET with JSON body

Not standard; acknowledging your preference to defer. Consider documenting client behavior.


76-86: Return 5xx on S3 upload failure

Don’t mask failures with 200.

-      (
-          StatusCode::OK,
-          format!("S3 upload failed: {}\n\n{}", err, json_output),
-      )
+      (
+          StatusCode::INTERNAL_SERVER_ERROR,
+          format!("S3 upload failed: {}\n\n{}", err, json_output),
+      )

179-186: Return bytes from query function

Aligns with handler change.

-async fn query_s3_parquet_data(
+async fn query_s3_parquet_data(
     state: &AppState,
     positions: Vec<QueryPredictionsPositionPayload>,
-) -> Result<DataFrame, Error> {
+) -> Result<Vec<u8>, Error> {

152-167: Response claims JSON but sends display string; return Parquet bytes

Return bytes and correct headers (match equity).

-  match query_s3_parquet_data(&state, payload.positions).await {
-      Ok(dataframe) => {
-          let json_string = dataframe.to_string();
-          let mut response = Response::new(Body::from(json_string));
-          response
-              .headers_mut()
-              .insert(header::CONTENT_TYPE, "application/json".parse().unwrap());
-          *response.status_mut() = StatusCode::OK;
-          response
-      }
+  match query_s3_parquet_data(&state, payload.positions).await {
+      Ok(parquet_bytes) => {
+          let mut response = Response::new(Body::from(parquet_bytes));
+          response.headers_mut().insert(
+              header::CONTENT_TYPE,
+              "application/octet-stream".parse().unwrap(),
+          );
+          response.headers_mut().insert(
+              "Content-Disposition",
+              "attachment; filename=\"predictions.parquet\"".parse().unwrap(),
+          );
+          *response.status_mut() = StatusCode::OK;
+          response
+      }

213-230: Sanitize tickers and deduplicate S3 paths/tickers

Prevents SQL injection and redundant reads.

-  let mut s3_paths = Vec::new();
-  let mut tickers = Vec::new();
+  use std::collections::HashSet;
+  let mut s3_paths = Vec::new();
+  let mut s3_paths_set = HashSet::new();
+  let mut tickers = Vec::new();
+  let mut tickers_set = HashSet::new();
@@
-  s3_paths.push(s3_path);
-  tickers.push(position.ticker);
+  if s3_paths_set.insert(s3_path.clone()) {
+      s3_paths.push(s3_path);
+  }
+  let ticker = position.ticker.trim().to_uppercase();
+  if !ticker.chars().all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '_' || c == '-') {
+      return Err(Error::OtherError(format!("Invalid ticker: {}", ticker)));
+  }
+  if tickers_set.insert(ticker.clone()) {
+      tickers.push(ticker);
+  }
🧹 Nitpick comments (5)
applications/datamanager/src/routes/equity.rs (2)

185-213: Replace per‑day UNION with hive-partitioned glob

More resilient and faster; missing days won’t break queries.

-  let mut s3_paths = Vec::new();
-  let mut current_date = start_date;
-  while current_date <= end_date {
-      let year = current_date.format("%Y");
-      let month = current_date.format("%m");
-      let day = current_date.format("%d");
-      let s3_path = format!(
-          "s3://{}/equity/bars/daily/year={}/month={}/day={}/data.parquet",
-          state.bucket_name, year, month, day
-      );
-      s3_paths.push(s3_path);
-      current_date = current_date + chrono::Duration::days(1);
-  }
-  if s3_paths.is_empty() {
-      return Err("No files to query for the given date range".to_string());
-  }
-  info!("Querying {} S3 files", s3_paths.len());
-  let s3_paths_str = s3_paths
-      .iter()
-      .map(|path| format!("SELECT * FROM '{}'", path))
-      .collect::<Vec<_>>()
-      .join(" UNION ALL ");
+  let parquet_glob = format!(
+      "s3://{}/equity/bars/daily/year=*/month=*/day=*/data.parquet",
+      state.bucket_name
+  );
+  info!("Querying S3 parquet via glob with HIVE partitioning");
@@
-  let export_sql = format!(
-      "
-      COPY (
-          SELECT 
-              ticker,
-              timestamp,
-              open_price,
-              high_price,
-              low_price,
-              close_price,
-              volume,
-              volume_weighted_average_price,
-              transactions
-          FROM ({})
-          ORDER BY timestamp, ticker
-      ) TO '{}' (FORMAT PARQUET)
-      ",
-      s3_paths_str, temp_file
-  );
+  let export_sql = format!(
+      "
+      COPY (
+          SELECT 
+              ticker,
+              timestamp,
+              open_price,
+              high_price,
+              low_price,
+              close_price,
+              volume,
+              volume_weighted_average_price,
+              transactions
+          FROM read_parquet('{parquet_glob}', HIVE_PARTITIONING=1, UNION_BY_NAME=1)
+          WHERE MAKE_DATE(CAST(year AS INTEGER), CAST(month AS INTEGER), CAST(day AS INTEGER))
+                BETWEEN DATE '{start_date}' AND DATE '{end_date}'
+          ORDER BY timestamp, ticker
+      ) TO '{temp_file}' (FORMAT PARQUET)
+      "
+  );

Also applies to: 220-238


214-218: Prefer tempfile over manual /tmp path

Avoid collisions and path assumptions.

-  let temp_file = format!(
-      "/tmp/query_result_{}.parquet",
-      Utc::now().timestamp_micros()
-  );
+  let tmp = tempfile::NamedTempFile::new().map_err(|e| format!("tmpfile: {}", e))?;
+  let temp_file = tmp.path().to_string_lossy().to_string();
applications/datamanager/src/routes/prediction.rs (3)

112-116: Use tracing, not println!

Aligns with structured logging.

-        println!(
+        info!(
             "DataFrame successfully converted to parquet, size: {} bytes",
             buffer.len()
         );

239-258: Build ticker CTE and export to Parquet (avoid string‑built IN and row mapping)

Safer and consistent with equity path.

-  let tickers_query = tickers
-      .iter()
-      .map(|ticker| format!("'{}'", ticker))
-      .collect::<Vec<_>>()
-      .join(", ");
-
-  let query = format!(
-      "
-      SELECT
-          ticker,
-          timestamp,
-          quantile_10,
-          quantile_50,
-          quantile_90
-      FROM ({})
-      WHERE ticker IN ({})
-      ORDER BY timestamp, ticker
-      ",
-      s3_paths_query, tickers_query,
-  );
-
-  debug!("Executing export SQL: {}", query);
-
-  let mut statement = connection.prepare(&query)?;
-  let predictions_iterator = statement.query_map([], |row| {
-      Ok(Prediction { /* ... */ })
-  })?;
-  let predictions: Vec<Prediction> = predictions_iterator
-      .collect::<Result<Vec<_>, _>>()
-      .map_err(|e| Error::OtherError(format!("Failed to collect predictions: {}", e)))?;
-  df!( /* build DataFrame */ )
-      .map_err(|e| Error::OtherError(format!("Failed to create DataFrame: {}", e)))
+  let values = tickers
+      .iter()
+      .map(|t| format!("('{}')", t.replace('\'', "''")))
+      .collect::<Vec<_>>()
+      .join(", ");
+  let tickers_cte = format!("WITH tickers(ticker) AS (VALUES {})", values);
+
+  let temp_file = format!("/tmp/predictions_query_{}.parquet", Utc::now().timestamp_micros());
+  let export_sql = format!(
+      "
+      {tickers_cte}
+      COPY (
+          SELECT
+              s.ticker,
+              CAST(s.timestamp AS BIGINT) AS timestamp,
+              s.quantile_10,
+              s.quantile_50,
+              s.quantile_90
+          FROM ({s3_paths_query}) AS s
+          JOIN tickers USING (ticker)
+          ORDER BY timestamp, ticker
+      ) TO '{temp_file}' (FORMAT PARQUET)
+      "
+  );
+  debug!("Executing export SQL: {}", export_sql);
+  connection
+      .execute(&export_sql, [])
+      .map_err(|e| Error::OtherError(format!("Failed to execute parquet export: {}", e)))?;
+  let parquet_data = std::fs::read(&temp_file)
+      .map_err(|e| Error::OtherError(format!("Failed to read parquet file: {}", e)))?;
+  if let Err(e) = std::fs::remove_file(&temp_file) {
+      info!("Failed to clean up temp file {}: {}", temp_file, e);
+  }
+  Ok(parquet_data)

Also applies to: 260-286


231-238: Optional: switch to hive‑partitioned glob instead of per‑day UNION

Same benefits as equity route; filter by year/month/day derived from positions.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fd3eb64 and 9a21c69.

📒 Files selected for processing (4)
  • applications/datamanager/.claude/settings.local.json (1 hunks)
  • applications/datamanager/src/lib.rs (2 hunks)
  • applications/datamanager/src/routes/equity.rs (2 hunks)
  • applications/datamanager/src/routes/prediction.rs (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • applications/datamanager/.claude/settings.local.json
🚧 Files skipped from review as they are similar to previous changes (1)
  • applications/datamanager/src/lib.rs
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-16T20:20:16.281Z
Learnt from: forstmeier
PR: pocketsizefund/pocketsizefund#624
File: applications/datamanager/src/routes/equity.rs:6-6
Timestamp: 2025-09-16T20:20:16.281Z
Learning: User forstmeier prefers to defer switching from `Option<Json<DateRangeQuery>>` to `Option<Query<DateRangeQuery>>` for GET endpoints in the pocketsizefund datamanager Rust application, even when it's technically more correct for RESTful APIs.

Applied to files:

  • applications/datamanager/src/routes/equity.rs
🧬 Code graph analysis (1)
applications/datamanager/src/routes/prediction.rs (1)
applications/datamanager/src/routes/equity.rs (4)
  • upload_dataframe_to_s3 (57-109)
  • query_s3_parquet_data (141-257)
  • s3_paths (208-211)
  • router (398-402)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Run Python quality checks
  • GitHub Check: Run Python tests

Comment thread applications/datamanager/src/routes/equity.rs
Comment thread applications/datamanager/src/routes/prediction.rs
@forstmeier forstmeier merged commit e6507e1 into master Sep 17, 2025
3 checks passed
@forstmeier forstmeier deleted the datamanager-predictions-handlers branch September 17, 2025 01:40
@coderabbitai coderabbitai Bot mentioned this pull request Jan 20, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants