Skip to content

Refactor datamanager module#635

Merged
forstmeier merged 4 commits intomasterfrom
refactor-data-manager
Oct 12, 2025
Merged

Refactor datamanager module#635
forstmeier merged 4 commits intomasterfrom
refactor-data-manager

Conversation

@forstmeier
Copy link
Copy Markdown
Collaborator

@forstmeier forstmeier commented Sep 23, 2025

Overview

Changes

  • move all files to flat hierarchy
  • remove unused mod.rs and lib.rs files
  • refactor various helper functions

Comments

This was to clean things up a bit and to consolidate logic. A couple of things:

  1. I did not implement hive querying in DuckDB but it might be something we can do later (especially for the smaller data chunks like portfolios)
  2. I left off pandera-esque validation on the DataFrames because it was just going to be extra work for the time being
  3. The "querying" logic could be further refactored I just don't want to do it right now
  4. I left the "create connection" for DuckDB in the individual handlers because there was an issue with the Clone trait being required for Axum's State but that could be a further tweak

Summary by CodeRabbit

  • New Features

    • Added HTTP endpoints: /health, /predictions (save/query), /portfolios (save/get), /equity-bars (sync/fetch).
    • S3-backed Parquet storage with query/read support and CSV/DataFrame payload handling.
    • New data contracts for equity bars, predictions, and portfolios.
  • Refactor

    • Consolidated routing into a single app router with shared state and simplified HTTP extractors.
    • Centralized error type for unified error messages.
  • Chores

    • Upgraded data-processing dependency and added URL-encoding support.

@forstmeier forstmeier added this to the Refactor milestone Sep 23, 2025
@forstmeier forstmeier self-assigned this Sep 23, 2025
Copilot AI review requested due to automatic review settings September 23, 2025 17:43
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Sep 23, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Refactors datamanager into modular Axum handlers and a storage layer: adds Polars-backed data models and DataFrame builders, centralizes errors, implements S3/Parquet writes and DuckDB/httpfs queries, introduces router and state changes, and adds endpoints for health, predictions, portfolios, and equity-bars; updates dependencies.

Changes

Cohort / File(s) Summary
Dependencies
applications/datamanager/Cargo.toml
Bump polars 0.50.0→0.51.0 (adds strings, retains features), add urlencoding = "2.1.3", remove [lib] target.
Data models & DataFrame builders
applications/datamanager/src/data.rs
New EquityBar, Prediction, Portfolio structs (Deserialize) and functions to build Polars DataFrames (uppercase ticker, prediction latest-timestamp-per-ticker filter).
Unified errors
applications/datamanager/src/errors.rs
New public Error enum (ThisError) with variants: DuckDB(DuckError), Credentials(CredentialsError), Polars(PolarsError), Other(String).
Storage layer (S3/Parquet/DuckDB)
applications/datamanager/src/storage.rs
New S3 write helpers (equity/portfolio/predictions), shared Parquet serialization, DuckDB+httpfs connection helper, and query functions returning Parquet bytes or Polars DataFrame; adds PredictionQuery.
HTTP handlers: predictions
applications/datamanager/src/predictions.rs
New endpoints: POST /predictions (save) and GET /predictions (query via URL-encoded JSON); uses storage and urlencoding.
HTTP handlers: portfolios
applications/datamanager/src/portfolios.rs
New endpoints: POST /portfolios (save) and GET /portfolios (get latest); accepts DataFrame payloads and delegates to storage.
HTTP handlers: equity bars
applications/datamanager/src/equity_bars.rs
Handlers converted to Axum extractors (AxumState, Query, Json), public DailySync and DateRangeParameters, and S3 I/O delegated to storage; local router removed.
Health endpoint
applications/datamanager/src/health.rs
New get_health handler returning HTTP 200 OK.
Router composition & main
applications/datamanager/src/router.rs, applications/datamanager/src/main.rs
New create_app() builds routes and attaches State; main uses router::create_app and logs serve errors.
State refactor
applications/datamanager/src/state.rs
AppStateState; client: Clienthttp_client: HTTPClient (alias for reqwest::Client); removed old routes wiring/router creation.
Route removals / cleanup
applications/datamanager/src/routes/*, applications/datamanager/src/mod.rs
Removed legacy routes modules and their router() functions (health, portfolio, prediction, equity) and adjusted imports.
CI tweak
.github/workflows/run_python_code_checks.yaml
Adjust coverage input path to tests/coverage_output/.python_coverage.xml.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Client
  participant Router as Axum Router
  participant SavePred as predictions::save
  participant Storage as storage::write_predictions_dataframe_to_s3
  participant S3 as S3

  Client->>Router: POST /predictions {dataframe, timestamp}
  Router->>SavePred: Json<SavePredictionsPayload>
  SavePred->>Storage: write_predictions_dataframe_to_s3(state, df, ts)
  Storage->>S3: PUT s3://.../predictions/YYYY/MM/DD/*.parquet
  S3-->>Storage: 200 OK (key)
  Storage-->>SavePred: Ok(key)
  SavePred-->>Client: 200 OK (rows, key) / 500 on error
Loading
sequenceDiagram
  autonumber
  actor Client
  participant Router as Axum Router
  participant FetchBars as equity_bars::fetch
  participant Storage as storage::query_equity_bars_parquet_from_s3
  participant DuckDB as DuckDB+httpfs
  participant S3 as S3

  Client->>Router: GET /equity-bars?start_date&end_date
  Router->>FetchBars: Query<DateRangeParameters>
  FetchBars->>Storage: query_equity_bars_parquet_from_s3(state, start, end)
  Storage->>DuckDB: init & configure httpfs/S3 creds
  DuckDB->>S3: read parquet(s)
  S3-->>DuckDB: parquet bytes
  DuckDB-->>Storage: parquet bytes
  Storage-->>FetchBars: Vec<u8>
  FetchBars-->>Client: 200 OK (parquet bytes) / 500 on error
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • chrisaddy

Poem

Thump-thump, I hop through routes anew,
Parquet crumbs and S3 dew.
Polars stacks and DuckDB bites,
Endpoints hum through restful nights.
Hooray — my data hops just right! 🥕

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "Refactor datamanager module" accurately and concisely summarizes the primary intent of the changeset — reorganizing and consolidating the datamanager code (flattening module hierarchy, removing old route modules, adding router/storage modules and related refactors) — and is clear enough for a teammate scanning PR history to understand the main change.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch refactor-data-manager

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors the datamanager module by flattening the file structure and consolidating logic into dedicated modules. The refactoring moves away from a nested routes-based organization to a flat module hierarchy where each domain (equity_bars, predictions, portfolios) has its own module alongside shared utilities (storage, data, errors).

Key changes include:

  • Flattened file hierarchy by removing nested route modules
  • Consolidated storage operations into a centralized storage module
  • Extracted data structures and DataFrame creation logic into a data module
  • Updated dependencies including Polars version bump

Reviewed Changes

Copilot reviewed 16 out of 17 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
applications/datamanager/src/storage.rs New centralized storage module handling S3 operations and DuckDB queries
applications/datamanager/src/state.rs Simplified state structure with renamed fields and removed router creation
applications/datamanager/src/predictions.rs New predictions handler module replacing the old route-based structure
applications/datamanager/src/portfolios.rs New portfolios handler module replacing the old route-based structure
applications/datamanager/src/equity_bars.rs Refactored equity bars handler with consolidated storage operations
applications/datamanager/src/data.rs New data module containing struct definitions and DataFrame creation functions
applications/datamanager/src/router.rs New router module consolidating application routing
applications/datamanager/src/health.rs Simplified health check handler
applications/datamanager/src/errors.rs New centralized error handling module

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment thread applications/datamanager/Cargo.toml Outdated
Comment thread applications/datamanager/src/storage.rs Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
applications/datamanager/src/equity_bars.rs (1)

152-155: 204 No Content should not include a body.

Return an empty body for 204 to comply with HTTP semantics.

Apply this diff:

-            return (
-                StatusCode::NO_CONTENT,
-                "No market data available for this date",
-            )
-                .into_response();
+            return StatusCode::NO_CONTENT.into_response();
🧹 Nitpick comments (14)
applications/datamanager/Cargo.toml (1)

11-12: Remove stale commented dependency config.

The commented axum line is noise; drop it to avoid confusion about features in use.

Apply:

-# axum = { version = "0.8.4", features = ["query"] }
 axum = "0.8.4"
applications/datamanager/src/portfolios.rs (1)

51-65: Fetching “today’s” portfolio only may be surprising.

get uses Utc::now(); callers can’t specify a date. Consider a query parameter for date or returning 204 when today’s file doesn’t exist.

Would you like a minimal change to accept an optional ?date=YYYY-MM-DD and return 204 if not found?

applications/datamanager/src/data.rs (3)

59-79: Avoid double collect; keep the pipeline lazy and collect once.

You collect into a DataFrame, then go lazy again to window/filter, and collect again. This does two materializations.

Apply this diff to keep it lazy and collect once:

-    let unfiltered_prediction_dataframe = prediction_dataframe
-        .lazy()
-        .with_columns([col("ticker").str().to_uppercase().alias("ticker")])
-        .collect()?;
-
-    // filtering necessary due to potentially overlapping tickers in predictions parquet files
-    let filtered_prediction_dataframe = unfiltered_prediction_dataframe
-        .lazy()
+    // filtering necessary due to potentially overlapping tickers in predictions parquet files
+    let filtered_prediction_dataframe = prediction_dataframe
+        .lazy()
+        .with_columns([col("ticker").str().to_uppercase().alias("ticker")])
         .with_columns([col("timestamp")
             .max()
             .over([col("ticker")])
             .alias("max_timestamp")])
         .filter(col("timestamp").eq(col("max_timestamp")))
         .select([
             col("ticker"),
             col("timestamp"),
             col("quantile_10"),
             col("quantile_50"),
             col("quantile_90"),
         ])
-        .collect()?;
+        .collect()
+        .map_err(|e| Error::Other(format!("Failed to filter predictions: {}", e)))?;

101-104: Same explicit error mapping for collect.

Apply this diff:

-        .collect()?;
+        .collect()
+        .map_err(|e| Error::Other(format!("Failed to normalize ticker: {}", e)))?;

49-57: Consider borrowing to avoid intermediate Vec allocations.

iter().map(...).collect::<Vec<_>>() is fine, but for large inputs you can use Series::new from slices/iterators to reduce allocations. Non-blocking.

applications/datamanager/src/equity_bars.rs (2)

63-71: Parquet response: set a precise content type.

Use application/x-parquet instead of generic octet-stream.

Apply this diff:

-                header::CONTENT_TYPE,
-                "application/octet-stream".parse().unwrap(),
+                header::CONTENT_TYPE,
+                "application/x-parquet".parse().unwrap(),

198-219: On S3 upload failure, avoid returning the full DataFrame in the response.

Large payloads may bloat responses and logs. Consider truncating or removing the DataFrame dump.

applications/datamanager/src/storage.rs (7)

82-83: Set accurate content type for parquet upload.

Apply this diff:

-        .content_type("application/octet-stream")
+        .content_type("application/x-parquet")

63-67: Use tracing instead of println!.

Apply this diff:

-                println!(
-                    "DataFrame successfully converted to parquet, size: {} bytes",
-                    buffer.len()
-                );
+                debug!(
+                    "DataFrame converted to parquet, size: {} bytes",
+                    buffer.len()
+                );

220-227: Avoid cloning inside finish; clone explicitly for clarity.

&mut dataframe.clone() is odd. Clone once and pass a named mutable.

Apply this diff:

-        writer
-            .finish(&mut equity_bars_dataframe?.clone())
+        let mut df = equity_bars_dataframe?;
+        writer
+            .finish(&mut df)

131-171: Guard against reversed date ranges.

If start_date > end_date, the loop won’t run and you’ll return “No files…” with an ambiguous message. Consider validating and returning a clearer error.


40-95: Compression for parquet uploads (optional).

Consider enabling ZSTD or SNAPPY to reduce S3 storage and egress. Non-blocking.


97-129: DuckDB AWS config: guard against missing/invalid credentials.

You already error on missing provider; also consider logging the configured region and avoiding empty session token var if not needed. Non-blocking.


1-15: Nit: module imports.

If serde::Deserialize is only used for PredictionQuery here, consider keeping data model derives in their own modules to reduce surface. Non-blocking.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7d0be02 and d169b3f.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (16)
  • applications/datamanager/Cargo.toml (1 hunks)
  • applications/datamanager/src/data.rs (1 hunks)
  • applications/datamanager/src/equity_bars.rs (6 hunks)
  • applications/datamanager/src/errors.rs (1 hunks)
  • applications/datamanager/src/health.rs (1 hunks)
  • applications/datamanager/src/main.rs (1 hunks)
  • applications/datamanager/src/mod.rs (0 hunks)
  • applications/datamanager/src/portfolios.rs (1 hunks)
  • applications/datamanager/src/predictions.rs (1 hunks)
  • applications/datamanager/src/router.rs (1 hunks)
  • applications/datamanager/src/routes/health.rs (0 hunks)
  • applications/datamanager/src/routes/mod.rs (0 hunks)
  • applications/datamanager/src/routes/portfolio.rs (0 hunks)
  • applications/datamanager/src/routes/prediction.rs (0 hunks)
  • applications/datamanager/src/state.rs (3 hunks)
  • applications/datamanager/src/storage.rs (1 hunks)
💤 Files with no reviewable changes (5)
  • applications/datamanager/src/routes/prediction.rs
  • applications/datamanager/src/routes/mod.rs
  • applications/datamanager/src/routes/health.rs
  • applications/datamanager/src/routes/portfolio.rs
  • applications/datamanager/src/mod.rs
🧰 Additional context used
🧬 Code graph analysis (10)
applications/datamanager/src/router.rs (6)
applications/datamanager/src/portfolios.rs (2)
  • get (51-65)
  • save (19-49)
applications/datamanager/src/state.rs (1)
  • from_env (27-57)
applications/datamanager/src/health.rs (1)
  • get_health (3-5)
applications/datamanager/src/predictions.rs (2)
  • save (28-58)
  • query (60-99)
applications/datamanager/src/equity_bars.rs (2)
  • sync (87-227)
  • fetch (52-85)
applications/datamanager/src/lib.rs (1)
  • create_app (68-78)
applications/datamanager/src/health.rs (3)
applications/datamanager/src/routes/health.rs (1)
  • check (6-8)
applications/datamanager/src/routes/mod.rs (1)
  • health (2-2)
applications/datamanager/src/lib.rs (1)
  • create_app (68-78)
applications/datamanager/src/errors.rs (2)
applications/datamanager/src/routes/portfolio.rs (3)
  • Error (21-28)
  • Error (160-160)
  • e (230-230)
applications/datamanager/src/routes/prediction.rs (4)
  • Error (22-29)
  • Error (184-184)
  • e (279-279)
  • e (270-270)
applications/datamanager/src/state.rs (3)
applications/datamanager/src/lib.rs (4)
  • AppState (26-32)
  • routes (6-6)
  • create_app (68-78)
  • AppState (34-66)
applications/datamanager/src/routes/portfolio.rs (1)
  • router (233-237)
applications/datamanager/src/routes/prediction.rs (1)
  • router (282-286)
applications/datamanager/src/portfolios.rs (1)
applications/datamanager/src/storage.rs (2)
  • query_portfolio_dataframe_from_s3 (315-362)
  • write_portfolio_dataframe_to_s3 (24-30)
applications/datamanager/src/predictions.rs (1)
applications/datamanager/src/storage.rs (2)
  • query_predictions_dataframe_from_s3 (240-313)
  • write_predictions_dataframe_to_s3 (32-38)
applications/datamanager/src/data.rs (2)
applications/datamanager/src/routes/portfolio.rs (3)
  • Portfolio (37-42)
  • query_s3_parquet_data (149-231)
  • SavePortfolioPayload (31-34)
applications/datamanager/src/routes/prediction.rs (3)
  • query_s3_parquet_data (173-280)
  • Prediction (51-57)
  • SavePredictionsPayload (32-35)
applications/datamanager/src/main.rs (2)
applications/datamanager/src/router.rs (1)
  • create_app (12-25)
applications/datamanager/src/lib.rs (2)
  • create_app (68-78)
  • AppState (26-32)
applications/datamanager/src/equity_bars.rs (2)
applications/datamanager/src/storage.rs (2)
  • query_equity_bars_parquet_from_s3 (131-232)
  • write_equity_bars_dataframe_to_s3 (16-22)
applications/datamanager/src/lib.rs (1)
  • AppState (26-32)
applications/datamanager/src/storage.rs (2)
applications/datamanager/src/data.rs (3)
  • create_equity_bar_dataframe (18-38)
  • create_portfolio_dataframe (92-107)
  • create_predictions_dataframe (49-82)
applications/datamanager/src/predictions.rs (1)
  • query (60-99)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Run Rust code checks
  • GitHub Check: Run Rust code checks
🔇 Additional comments (7)
applications/datamanager/src/health.rs (1)

3-5: LGTM: simple, correct health handler.

Returns 200 OK and composes cleanly with axum’s IntoResponse.

applications/datamanager/src/main.rs (1)

1-11: Module reorg and router extraction look good.

Using router::create_app keeps main lean and aligns with the new module layout.

applications/datamanager/src/state.rs (2)

17-24: State rename and HTTP client field are coherent.

http_client via alias and Clone across nested types aligns with axum state requirements.


26-39: LGTM: environment construction.

Sane defaults, strict env for secrets, and 10s HTTP timeout are reasonable.

applications/datamanager/src/errors.rs (1)

6-16: Unified error type looks solid.

Covers DuckDB, AWS credential, and Polars errors with #[from] for ergonomic propagation.

applications/datamanager/Cargo.toml (1)

26-26: Approve: urlencoding addition is OK — no duplicate decoder found.
rg shows a single call to decode(&parameters.data) at applications/datamanager/src/predictions.rs:66; no other decode(...) usages in Rust sources.

applications/datamanager/src/router.rs (1)

16-22: axum 0.8 merges route methods — current code is safe; optional: consolidate for clarity

Router::route adds methods to an existing MethodRouter when called multiple times for the same path (docs.rs), so the routes in applications/datamanager/src/router.rs (lines 16–22) will not drop any methods; consolidating to .route(path, get(...).post(...)) is recommended purely for readability.

Likely an incorrect or invalid review comment.

Comment thread applications/datamanager/Cargo.toml
Comment thread applications/datamanager/src/data.rs
Comment thread applications/datamanager/src/data.rs
Comment thread applications/datamanager/src/equity_bars.rs Outdated
Comment thread applications/datamanager/src/portfolios.rs
Comment thread applications/datamanager/src/storage.rs Outdated
Comment thread applications/datamanager/src/storage.rs
Comment thread applications/datamanager/src/storage.rs
Comment thread applications/datamanager/src/storage.rs
Comment thread applications/datamanager/src/storage.rs Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
applications/datamanager/src/equity_bars.rs (3)

25-37: Polygon result numeric types should be f64 (not u64).

Polygon’s o/h/l/c/vw are floats; deserializing as u64 will fail on decimals or truncate values.

Apply:

 #[derive(Deserialize, Debug)]
 struct BarResult {
     #[serde(rename = "T")]
     ticker: String,
-    c: Option<u64>,
-    h: Option<u64>,
-    l: Option<u64>,
+    c: Option<f64>,
+    h: Option<f64>,
+    l: Option<f64>,
     n: Option<u64>,
-    o: Option<u64>,
+    o: Option<f64>,
     t: u64,
     v: Option<u64>,
-    vw: Option<u64>,
+    vw: Option<f64>,
 }

And update the below mappings accordingly (see next comment).


167-174: Stop casting ints to f64; use the corrected f64 fields directly.

Apply:

-    let volumes: Vec<Option<u64>> = bars.iter().map(|b| b.v).collect();
-    let vw_prices: Vec<Option<f64>> = bars.iter().map(|b| b.vw.map(|vw| vw as f64)).collect();
-    let open_prices: Vec<Option<f64>> = bars.iter().map(|b| b.o.map(|o| o as f64)).collect();
-    let close_prices: Vec<Option<f64>> = bars.iter().map(|b| b.c.map(|c| c as f64)).collect();
-    let high_prices: Vec<Option<f64>> = bars.iter().map(|b| b.h.map(|h| h as f64)).collect();
-    let low_prices: Vec<Option<f64>> = bars.iter().map(|b| b.l.map(|l| l as f64)).collect();
+    let volumes: Vec<Option<u64>> = bars.iter().map(|b| b.v).collect();
+    let vw_prices: Vec<Option<f64>> = bars.iter().map(|b| b.vw).collect();
+    let open_prices: Vec<Option<f64>> = bars.iter().map(|b| b.o).collect();
+    let close_prices: Vec<Option<f64>> = bars.iter().map(|b| b.c).collect();
+    let high_prices: Vec<Option<f64>> = bars.iter().map(|b| b.h).collect();
+    let low_prices: Vec<Option<f64>> = bars.iter().map(|b| b.l).collect();

39-50: Remove unused PolygonResponse struct.

It’s defined but not used; delete to reduce noise.

🧹 Nitpick comments (4)
applications/datamanager/src/main.rs (1)

18-21: Tighten default log filter (avoid “example=debug”).

The fallback filter targets “example”, likely a leftover. Prefer crate-specific or generic levels.

Example:

-                .unwrap_or_else(|_| "example=debug,tower_http=debug,axum=debug".into()),
+                .unwrap_or_else(|_| "datamanager=info,tower_http=info,axum=info".into()),
applications/datamanager/src/equity_bars.rs (3)

63-71: Use a Parquet-specific content type.

application/octet-stream works, but application/x-parquet is clearer for clients.

-                header::CONTENT_TYPE,
-                "application/octet-stream".parse().unwrap(),
+                header::CONTENT_TYPE,
+                "application/x-parquet".parse().unwrap(),

77-83: Return 204 when no files exist for the date range.

Map “no files” to NO_CONTENT instead of 500 to better reflect the condition.

Example:

-        Err(err) => {
-            info!("Failed to query S3 data: {}", err);
-            (
-                StatusCode::INTERNAL_SERVER_ERROR,
-                format!("Query failed: {}", err),
-            )
-                .into_response()
-        }
+        Err(err) => {
+            let msg = err.to_string();
+            if msg.contains("No files to query for the given date range") {
+                (StatusCode::NO_CONTENT, "No data").into_response()
+            } else {
+                info!("Failed to query S3 data: {}", msg);
+                (StatusCode::INTERNAL_SERVER_ERROR, format!("Query failed: {}", msg)).into_response()
+            }
+        }

Prefer defining a dedicated Error::NoData and matching on it when feasible.


209-216: Avoid echoing the entire DataFrame in error responses.

Prevents oversized responses and potential data leakage.

-                    let json_output = df.to_string();
-                    (
-                        StatusCode::BAD_GATEWAY,
-                        format!(
-                            "DataFrame created but S3 upload failed: {}\n\n{}",
-                            err, json_output
-                        ),
-                    )
+                    (
+                        StatusCode::BAD_GATEWAY,
+                        format!("DataFrame created but S3 upload failed: {}", err),
+                    )
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d169b3f and 726b7fd.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (8)
  • applications/datamanager/Cargo.toml (1 hunks)
  • applications/datamanager/src/data.rs (1 hunks)
  • applications/datamanager/src/equity_bars.rs (5 hunks)
  • applications/datamanager/src/main.rs (2 hunks)
  • applications/datamanager/src/portfolios.rs (1 hunks)
  • applications/datamanager/src/predictions.rs (1 hunks)
  • applications/datamanager/src/router.rs (1 hunks)
  • applications/datamanager/src/storage.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • applications/datamanager/src/router.rs
  • applications/datamanager/src/portfolios.rs
  • applications/datamanager/src/storage.rs
  • applications/datamanager/Cargo.toml
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-24T00:54:42.437Z
Learnt from: forstmeier
PR: pocketsizefund/pocketsizefund#635
File: applications/datamanager/src/predictions.rs:60-99
Timestamp: 2025-09-24T00:54:42.437Z
Learning: In applications/datamanager/src/predictions.rs, there is a known SQL injection vulnerability where user-supplied tickers from predictions_query flow to storage::query_predictions_dataframe_from_s3 without validation, and the storage function builds SQL queries via string concatenation. The user forstmeier acknowledged this issue but chose to skip fixing it in PR #635, deferring it for later.

Applied to files:

  • applications/datamanager/src/predictions.rs
🧬 Code graph analysis (4)
applications/datamanager/src/data.rs (3)
applications/datamanager/src/routes/portfolio.rs (2)
  • Portfolio (37-42)
  • query_s3_parquet_data (149-231)
applications/datamanager/src/routes/prediction.rs (2)
  • Prediction (51-57)
  • query_s3_parquet_data (173-280)
applications/datamanager/src/routes/equity.rs (1)
  • BarResult (32-43)
applications/datamanager/src/equity_bars.rs (1)
applications/datamanager/src/storage.rs (2)
  • query_equity_bars_parquet_from_s3 (131-232)
  • write_equity_bars_dataframe_to_s3 (16-22)
applications/datamanager/src/main.rs (2)
applications/datamanager/src/router.rs (1)
  • create_app (12-25)
applications/datamanager/src/lib.rs (2)
  • create_app (68-78)
  • AppState (26-32)
applications/datamanager/src/predictions.rs (1)
applications/datamanager/src/storage.rs (2)
  • query_predictions_dataframe_from_s3 (240-317)
  • write_predictions_dataframe_to_s3 (32-38)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Run Rust code checks
  • GitHub Check: Run Python code checks
  • GitHub Check: Run Python code checks
  • GitHub Check: Run Rust code checks
🔇 Additional comments (3)
applications/datamanager/src/predictions.rs (1)

86-96: SQL injection risk via unvalidated tickers (known, deferred).

Tickers flow to a string‑concatenated IN (…) in storage; validate tickers and parameterize the query when you pick this back up.

If helpful later, I can draft a patch that adds ticker validation here and placeholder binding (params_from_iter) in storage.

applications/datamanager/src/data.rs (2)

9-16: Fix schema to match persisted Parquet (floats and nullables).

Prices and VWAP should be Option, timestamp i64; volume/transactions Option. Current u64s will break on NULLs/floats.

Apply:

 pub struct EquityBar {
     pub ticker: String,
     pub timestamp: i64,
-    pub open_price: Option<u64>,
-    pub high_price: Option<u64>,
-    pub low_price: Option<u64>,
-    pub close_price: Option<u64>,
+    pub open_price: Option<f64>,
+    pub high_price: Option<f64>,
+    pub low_price: Option<f64>,
+    pub close_price: Option<f64>,
     pub volume: Option<u64>,
-    pub volume_weighted_average_price: Option<u64>,
+    pub volume_weighted_average_price: Option<f64>,
     pub transactions: Option<u64>,
 }

32-35: Map Polars .collect() errors into crate Error explicitly.

Avoid relying on implicit From; improve error messages and ensure compilation.

Apply:

-        .collect()?;
+        .collect()
+        .map_err(|e| Error::Other(format!("Failed to uppercase equity tickers: {}", e)))?;
-        .collect()?;
+        .collect()
+        .map_err(|e| Error::Other(format!("Failed to uppercase prediction tickers: {}", e)))?;
-        .collect()?;
+        .collect()
+        .map_err(|e| Error::Other(format!("Failed to filter predictions by latest timestamp: {}", e)))?;
-        .collect()?;
+        .collect()
+        .map_err(|e| Error::Other(format!("Failed to uppercase portfolio tickers: {}", e)))?;

Also applies to: 59-63, 65-79, 101-104

Comment thread applications/datamanager/src/equity_bars.rs
Comment thread applications/datamanager/src/main.rs Outdated
Copilot AI review requested due to automatic review settings September 24, 2025 02:15
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 17 out of 18 changed files in this pull request and generated 2 comments.


Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment thread applications/datamanager/src/storage.rs
Comment thread applications/datamanager/src/data.rs
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
applications/datamanager/src/equity_bars.rs (3)

25-37: Fix Polygon JSON types: use f64 for prices/vw and i64 for timestamp

Current Option for o/h/l/c/vw will fail on decimal JSON and silently truncate; t should be i64. Update BarResult types.

-#[derive(Deserialize, Debug)]
-struct BarResult {
-    #[serde(rename = "T")]
-    ticker: String,
-    c: Option<u64>,
-    h: Option<u64>,
-    l: Option<u64>,
-    n: Option<u64>,
-    o: Option<u64>,
-    t: u64,
-    v: Option<u64>,
-    vw: Option<u64>,
-}
+#[derive(Deserialize, Debug)]
+struct BarResult {
+    #[serde(rename = "T")]
+    ticker: String,
+    c: Option<f64>,
+    h: Option<f64>,
+    l: Option<f64>,
+    n: Option<u64>,
+    o: Option<f64>,
+    t: i64,
+    v: Option<u64>,
+    vw: Option<f64>,
+}

Also update the downstream mappings (see next comment).


168-176: Remove lossy casts; map floats directly

After fixing BarResult types, drop u64→f64 casts and use values directly. Also drop the as i64 cast for timestamps.

-    let vw_prices: Vec<Option<f64>> = bars.iter().map(|b| b.vw.map(|vw| vw as f64)).collect();
-    let open_prices: Vec<Option<f64>> = bars.iter().map(|b| b.o.map(|o| o as f64)).collect();
-    let close_prices: Vec<Option<f64>> = bars.iter().map(|b| b.c.map(|c| c as f64)).collect();
-    let high_prices: Vec<Option<f64>> = bars.iter().map(|b| b.h.map(|h| h as f64)).collect();
-    let low_prices: Vec<Option<f64>> = bars.iter().map(|b| b.l.map(|l| l as f64)).collect();
-    let timestamps: Vec<i64> = bars.iter().map(|b| b.t as i64).collect();
+    let vw_prices: Vec<Option<f64>> = bars.iter().map(|b| b.vw).collect();
+    let open_prices: Vec<Option<f64>> = bars.iter().map(|b| b.o).collect();
+    let close_prices: Vec<Option<f64>> = bars.iter().map(|b| b.c).collect();
+    let high_prices: Vec<Option<f64>> = bars.iter().map(|b| b.h).collect();
+    let low_prices: Vec<Option<f64>> = bars.iter().map(|b| b.l).collect();
+    let timestamps: Vec<i64> = bars.iter().map(|b| b.t).collect();

98-116: Add a connect timeout and retries/backoff to the reqwest client used for Polygon calls

  • state.rs currently builds the client with .timeout(Duration::from_secs(10)) (overall request timeout) — add .connect_timeout(Duration::from_secs(2–5)) and consider pool settings as needed.
  • Add retry with exponential backoff for Polygon requests (use reqwest_middleware + a retry policy or implement retries around the .send() in equity_bars.rs).

Location: applications/datamanager/src/state.rs:28–31 (client builder); callsite: applications/datamanager/src/equity_bars.rs:98–103.

🧹 Nitpick comments (3)
applications/datamanager/src/equity_bars.rs (3)

52-85: Use error! for failures and set a more specific content type

  • Log failures with error! to aid alerting.
  • Consider application/x-parquet (or application/vnd.apache.parquet) for clarity.
-        Err(err) => {
-            info!("Failed to query S3 data: {}", err);
+        Err(err) => {
+            error!("Failed to query S3 data: {}", err);
             (
                 StatusCode::INTERNAL_SERVER_ERROR,
                 format!("Query failed: {}", err),
             )
                 .into_response()
         }
-            response.headers_mut().insert(
-                header::CONTENT_TYPE,
-                "application/octet-stream".parse().unwrap(),
-            );
+            response.headers_mut().insert(
+                header::CONTENT_TYPE,
+                "application/x-parquet".parse().unwrap(),
+            );

118-134: Promote error logs to error! for failure paths

Upgrade these info! logs to error! to reflect failures and improve signal.

-            info!("Failed to read response text: {}", err);
+            error!("Failed to read response text: {}", err);
-            info!("API request failed: {}", err);
+            error!("API request failed: {}", err);
-            info!("Failed to parse JSON response: {}", err);
+            error!("Failed to parse JSON response: {}", err);
-            info!("No results field found in response");
+            warn!("No results field found in response");
-            info!("Failed to parse results into BarResult structs: {}", err);
+            error!("Failed to parse results into BarResult structs: {}", err);

Also applies to: 139-139, 151-151, 163-163


210-219: Avoid returning full DataFrame text on errors

This can be huge. Return a small preview to cap payload size.

-                    let json_output = df.to_string();
+                    let preview_rows = std::cmp::min(df.height(), 50);
+                    let json_output = df.head(Some(preview_rows)).to_string();
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 726b7fd and 3a514ba.

📒 Files selected for processing (3)
  • .github/workflows/run_python_code_checks.yaml (1 hunks)
  • applications/datamanager/src/equity_bars.rs (5 hunks)
  • applications/datamanager/src/main.rs (2 hunks)
✅ Files skipped from review due to trivial changes (1)
  • .github/workflows/run_python_code_checks.yaml
🧰 Additional context used
🧬 Code graph analysis (2)
applications/datamanager/src/main.rs (2)
applications/datamanager/src/router.rs (1)
  • create_app (12-25)
applications/datamanager/src/lib.rs (2)
  • create_app (68-78)
  • AppState (26-32)
applications/datamanager/src/equity_bars.rs (1)
applications/datamanager/src/storage.rs (2)
  • query_equity_bars_parquet_from_s3 (131-232)
  • write_equity_bars_dataframe_to_s3 (16-22)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Run Rust code checks
  • GitHub Check: Run Python code checks
  • GitHub Check: Run Rust code checks
  • GitHub Check: Run Python code checks
🔇 Additional comments (5)
applications/datamanager/src/main.rs (3)

1-9: Module consolidation LGTM

Flat module layout and explicit mods look good.


11-11: Router factory adoption LGTM

Using router::create_app centralizes route wiring appropriately.


27-30: Good: don’t swallow server errors; exit non‑zero

This addresses the prior review’s concern.

applications/datamanager/src/equity_bars.rs (2)

87-96: Date formatting and label fix LGTM

Correct YYYY‑MM‑DD formatting and clearer log message.


19-23: Minor: summary says fields are public; code has private fields

Code is fine as-is (serde can set private fields), but note the discrepancy.

If external crates need to construct DateRangeParameters, consider making fields pub; otherwise keep them private to maintain control.

@forstmeier
Copy link
Copy Markdown
Collaborator Author

@chrisaddy this is ready for review.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Oct 2, 2025

Marked stale due to inactivity. Remove stale label or comment or this will be closed.

@github-actions github-actions Bot added the stale Old issue or pull request label Oct 2, 2025
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Oct 9, 2025

Closed due to inactivity.

@github-actions github-actions Bot closed this Oct 9, 2025
@forstmeier forstmeier removed the stale Old issue or pull request label Oct 9, 2025
@forstmeier forstmeier reopened this Oct 9, 2025
@forstmeier forstmeier merged commit ce58169 into master Oct 12, 2025
9 checks passed
@forstmeier forstmeier deleted the refactor-data-manager branch October 12, 2025 00:58
@coderabbitai coderabbitai Bot mentioned this pull request Dec 7, 2025
This was referenced Jan 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants