Conversation
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds a new Rust datamanager service (Axum) with health and equity routes that ingest Polygon data, build Parquet via Polars, and upload to S3; introduces Rust/Cargo tooling and Bacon jobs, updates .gitignore and flox manifest, expands local Claude permissions, and removes a Python FastAPI module alias. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Client
participant Axum as Axum Router
participant Equity as Equity Handler
participant Polygon as Polygon API
participant Polars as Polars
participant S3 as AWS S3
Client->>Axum: POST /equity { date }
Axum->>Equity: sync(date, state)
Equity->>Polygon: GET /v2/aggs/grouped/.../{date}?adjusted=true&apiKey=...
Polygon-->>Equity: 200 JSON { results: [...] } / error
alt results present
Equity->>Polars: Build DataFrame from results
Parallels-->>Equity: DataFrame
Equity->>Equity: Serialize DataFrame -> Parquet (in-memory)
Equity->>S3: PutObject(bucket, key=equity/bars/daily/year=.../data.parquet, bytes)
S3-->>Equity: PutObject OK
Equity-->>Axum: 200 OK { rows, s3_key }
else API/parse/df/s3 error
Equity-->>Axum: 4xx/5xx/502 with error detail
end
Axum-->>Client: HTTP response
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
This stack of pull requests is managed by Graphite. Learn more about stacking. |
c267986 to
a00c7c5
Compare
0e4eb21 to
328f960
Compare
There was a problem hiding this comment.
Pull Request Overview
This PR ports the data manager application from Python to Rust, creating a new Axum-based web service for handling equity data operations.
- Implementation of Rust-based data manager with equity data fetching and health check endpoints
- Integration with Polygon API for equity data retrieval and AWS S3 for data storage
- Replacement of Python FastAPI service with equivalent Rust functionality
Reviewed Changes
Copilot reviewed 10 out of 14 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| applications/datamanager/src/main.rs | Main application entry point with server setup and configuration |
| applications/datamanager/src/routes/equity.rs | Equity data processing endpoint with Polygon API integration and S3 upload |
| applications/datamanager/src/routes/health.rs | Simple health check endpoint implementation |
| applications/datamanager/src/routes/mod.rs | Module declarations for routing |
| applications/datamanager/Cargo.toml | Rust project dependencies and configuration |
| applications/datamanager/bacon.toml | Development tool configuration for Bacon |
| applications/datamanager/src/mod.rs | Module structure setup |
| applications/datamanager/src/datamanager/main.py | Removal of old Python application code |
| applications/datamanager/.claude/settings.local.json | Updated development permissions |
| .flox/env/manifest.toml | Updated development environment with Rust tools |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
Graphite Automations"Assign author to pull request" took an action on this PR • (09/04/25)1 assignee was added to this PR based on John Forstmeier's automation. |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (16)
applications/datamanager/src/routes/health.rs (1)
6-8: Simplify signature and optionally add no-store.Returning StatusCode directly (impl IntoResponse) avoids the explicit Response type, and adding Cache-Control: no-store prevents proxy caching of health checks.
-pub async fn check() -> Response { - (StatusCode::OK).into_response() -} +pub async fn check() -> impl IntoResponse { + // Prevent intermediaries from caching liveness results + (StatusCode::OK, [("Cache-Control", "no-store")]) +}.flox/env/manifest.toml (1)
16-19: Approve: bacon.toml discoverable and nextest configured
bacon.toml found at applications/datamanager/bacon.toml with a jobs.nextest block; bacon, cargo-watch, and cargo entries look correct. Consider adding cargo-llvm-cov and cargo-audit to .flox/env/manifest.toml later for coverage reporting and security scanning.applications/datamanager/.claude/settings.local.json (1)
5-15: Scope down broad permissions and fix quoting in the POST pattern.The wide patterns ("Bash(http POST:)", "Bash(curl:)") grant very broad network/exec access. Also the POST example’s JSON is over-escaped and date is hard-coded. Prefer a narrower allow rule with a wildcard date.
- "Bash(http POST:*)", - "Bash(curl:*)", - "Bash(RUST_LOG=debug curl -X POST -H \"Content-Type: application/json\" -d '{\"\"date\"\": \"\"2025-08-25\"\"}' http://localhost:8089/equity)", + // Narrow allow-list: only local equity POSTs with any date + "Bash(RUST_LOG=debug curl -X POST -H \"Content-Type: application/json\" -d '{\"date\":\"*\"}' http://localhost:8089/equity)",Additionally, consider not committing local settings. Add this to the repo’s .gitignore (root) so devs keep their local overrides untracked:
+applications/datamanager/.claude/settings.local.jsonapplications/datamanager/Cargo.toml (3)
1-5: Confirm edition support in CI and declare MSRV.Edition "2024" may require a sufficiently new toolchain. Ensure CI uses a compiler that supports it and declare rust-version to pin MSRV; otherwise, fall back to 2021.
Option A (fallback now):
- edition = "2024" + edition = "2021"Option B (preferred): keep 2024 but add a rust-version and rust-toolchain file pinned to a stable version that supports 2024.
10-10: Prefer rustls to avoid OpenSSL headaches.Unless you require native-tls, reduce default features and enable rustls and json explicitly.
-reqwest = "0.12.23" +reqwest = { version = "0.12.23", default-features = false, features = ["json", "rustls-tls"] }
9-20: Trim heavy features you don’t use.Polars with lazy/parquet and duckdb with r2d2 are great if used. If not, consider dropping unused features to speed builds.
applications/datamanager/bacon.toml (2)
41-51: Ensure run job kills previous process on change.If you see orphaned child processes, enable the kill hook.
[jobs.run] @@ on_change_strategy = "kill_then_restart" env.RUST_LOG = "debug" -# kill = ["pkill", "-TERM", "-P"] +kill = ["pkill", "-TERM", "-P"]
20-23: Add a fmt job and shortcut.Keeps formatting quick and consistent.
[jobs.test] command = ["cargo", "test"] need_stdout = true + +[jobs.fmt] +command = ["cargo", "fmt", "--all"] +need_stdout = falseAnd bind a key:
[keybindings] # alt-m = "job:my-job" c = "job:clippy-all" # comment this to have 'c' run clippy on only the default target +f = "job:fmt"applications/datamanager/src/mod.rs (1)
1-1: Remove stray crate root mod file (unused).This top-level
mod.rsisn’t referenced by the binary crate and can confuse module resolution. Delete it or convert tolib.rsif you intend a library.-use routes::health;applications/datamanager/src/main.rs (3)
69-75: Fix default tracing filter target.
example=debugis likely a leftover and won’t match this crate. Default to sane levels.- .with( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "example=debug,tower_http=debug,axum=debug".into()), - ) + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "info,axum=info,tower_http=info".into()), + )
33-64: Prefer returning Result from from_env over panicking.Avoid
expectin service startup; propagate config errors and exit gracefully.-impl AppState { - async fn from_env() -> Self { +impl AppState { + async fn from_env() -> anyhow::Result<Self> { let client = Client::builder() .timeout(std::time::Duration::from_secs(10)) .build() - .unwrap(); + .context("building reqwest client")?; @@ - let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; + let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; let s3_client = S3Client::new(&config); let bucket_name = std::env::var("S3_BUCKET_NAME") .unwrap_or("pocketsizefund-ml-data".to_string()); @@ - Self { + Ok(Self { client, polygon: PolygonSecrets { - base: std::env::var("POLYGON_BASE_URL") + base: std::env::var("POLYGON_BASE_URL") .unwrap_or("https://api.polygon.io".to_string()), - key: std::env::var("POLYGON_API_KEY") - .expect("POLYGON_API_KEY must be set in environment"), + key: std::env::var("POLYGON_API_KEY") + .context("POLYGON_API_KEY must be set in environment")?, }, alpaca: AlpacaSecrets { - base: std::env::var("ALPACA_BASE_URL") + base: std::env::var("ALPACA_BASE_URL") .unwrap_or("https://data.alpaca.markets".to_string()), - key: std::env::var("ALPACA_API_KEY") - .expect("ALPACA_API_KEY must be set in environment"), - secret: std::env::var("ALPACA_API_SECRET") - .expect("ALPACA_API_SECRET must be set in environment"), + key: std::env::var("ALPACA_API_KEY") + .context("ALPACA_API_KEY must be set in environment")?, + secret: std::env::var("ALPACA_API_SECRET") + .context("ALPACA_API_SECRET must be set in environment")?, }, s3_client, bucket_name, - } + }) } } @@ - let state = AppState::from_env().await; + let state = AppState::from_env().await.expect("invalid configuration");Also applies to: 77-77
85-88: Add graceful shutdown (SIGINT/SIGTERM).Prevents abrupt drops during deploys.
- axum::serve(listener, app).await.unwrap(); + axum::serve(listener, app) + .with_graceful_shutdown(shutdown_signal()) + .await + .unwrap();Additional (place anywhere in this file):
async fn shutdown_signal() { use tokio::signal; let ctrl_c = async { signal::ctrl_c().await.ok(); }; #[cfg(unix)] let terminate = async { use signal::unix::{signal, SignalKind}; if let Ok(mut sigterm) = signal(SignalKind::terminate()) { sigterm.recv().await; } }; #[cfg(not(unix))] let terminate = std::future::pending::<()>(); tokio::select! { _ = ctrl_c => {}, _ = terminate => {}, } }applications/datamanager/src/routes/equity.rs (4)
7-7: Remove unused imports/types to keep build clean.
duckdbimport,DateRangeQuery, andPolygonResponseare unused.-use duckdb::{Connection, Result as DuckResult}; @@ -#[derive(serde::Deserialize)] -struct DateRangeQuery { - start_date: NaiveDate, - end_date: NaiveDate, -} @@ -#[derive(serde::Deserialize, Debug)] -struct PolygonResponse { - adjusted: bool, - queryCount: u64, - request_id: String, - resultsCount: u64, - status: String, - results: Option<Vec<BarResult>>, -}Also applies to: 26-31, 48-56
69-81: Streamline Parquet write; add compression and correct content type.Avoid the
Cursor, drop the extra clone duringfinish, and use a Parquet MIME type.- let mut buffer = Vec::new(); - { - let cursor = Cursor::new(&mut buffer); - let writer = ParquetWriter::new(cursor); - match writer.finish(&mut df.clone()) { - Ok(_) => { - info!("DataFrame successfully converted to parquet, size: {} bytes", buffer.len()); - }, - Err(err) => { - return Err(format!("Failed to write parquet: {}", err)); - } - } - } + let mut buffer = Vec::new(); + let mut df_to_write = df.clone(); + ParquetWriter::new(&mut buffer) + .with_compression(ParquetCompression::Snappy) + .finish(&mut df_to_write) + .map_err(|err| format!("Failed to write parquet: {}", err))?; + info!("DataFrame -> parquet bytes: {}", buffer.len()); @@ - .content_type("application/octet-stream") + .content_type("application/x-parquet")Also applies to: 90-90
110-110: Fix log label.Use
dateinstead ofnameto avoid confusion.- info!("name: {}", payload.date); + info!("date: {}", payload.date);
33-46: Consider integer types for counts/volumes.If Polygon guarantees integers for
v(volume) andn(transactions), model them asOption<u64>to avoid float casts/precision loss. Otherwise keep as-is.Also applies to: 182-191
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (3)
.flox/env/manifest.lockis excluded by!**/*.lockapplications/datamanager/Cargo.lockis excluded by!**/*.lockuv.lockis excluded by!**/*.lock
📒 Files selected for processing (11)
.flox/env/manifest.toml(1 hunks).gitignore(1 hunks)applications/datamanager/.claude/settings.local.json(1 hunks)applications/datamanager/Cargo.toml(1 hunks)applications/datamanager/bacon.toml(1 hunks)applications/datamanager/src/datamanager/main.py(0 hunks)applications/datamanager/src/main.rs(1 hunks)applications/datamanager/src/mod.rs(1 hunks)applications/datamanager/src/routes/equity.rs(1 hunks)applications/datamanager/src/routes/health.rs(1 hunks)applications/datamanager/src/routes/mod.rs(1 hunks)
💤 Files with no reviewable changes (1)
- applications/datamanager/src/datamanager/main.py
🧰 Additional context used
🧬 Code graph analysis (2)
applications/datamanager/src/routes/health.rs (2)
applications/datamanager/src/datamanager/main.py (1)
health_check(10-11)applications/portfoliomanager/src/portfoliomanager/main.py (1)
health_check(14-15)
applications/datamanager/src/main.rs (3)
applications/datamanager/src/datamanager/s3_client.py (1)
S3Client(11-107)applications/datamanager/src/routes/health.rs (1)
check(6-8)applications/datamanager/src/routes/equity.rs (1)
router(233-237)
🔇 Additional comments (1)
applications/datamanager/src/routes/mod.rs (1)
1-2: LGTM — public route modules exposed clearly.
forstmeier
left a comment
There was a problem hiding this comment.
Couple of tweaks and the bots caught a handful of things as well. The Python tests and checks need to be updated since they're failing on this PR as well - even if they're failing just because Python code has been removed, those should be fixed prior to merging.
|
Marked stale due to inactivity. Remove stale label or comment or this will be closed. |
|
@chrisaddy I took the "stale" label off the pull request. |
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (4)
.gitignore (1)
22-23: LGTM: correct ignore for Cargo targets.Covers both repo-root and nested crates. Matches prior guidance.
applications/datamanager/src/routes/equity.rs (3)
186-191: Avoid echoing upstream payloads on parse errors.Can be large and noisy; return a concise 502 message instead.
- return (StatusCode::BAD_GATEWAY, raw_text).into_response(); + return ( + StatusCode::BAD_GATEWAY, + "Upstream response not in expected format", + ).into_response();
236-245: Don’t serialize and return the entire DataFrame on S3 failure.Large responses can blow up memory and bandwidth.
- let json_output = df.to_string(); - ( - StatusCode::BAD_GATEWAY, - format!( - "DataFrame created but S3 upload failed: {}\n\n{}", - err, json_output - ), - ) + (StatusCode::BAD_GATEWAY, "DataFrame created but S3 upload failed") .into_response()
174-182: Don’t send a body with 204 No Content.204 responses must not include a message body.
- return ( - StatusCode::NO_CONTENT, - "No market data available for this date", - ) - .into_response(); + return (StatusCode::NO_CONTENT, ()).into_response();
🧹 Nitpick comments (12)
libraries/python/src/internal/mhsa_network.py (1)
39-39: Remove unused noqa (RUF100) or rephrase the comment.Ruff flags this as an unused directive since ERA001 isn’t enabled. Drop the noqa (or enable
eradicateif you truly need it).- # shape: (batch, heads_count, sequence_length, head_dimension) # noqa: ERA001 + # shape: (batch, heads_count, sequence_length, head_dimension)applications/datamanager/pyproject.toml (2)
5-5: Pin Python minor, not patch, to allow security patch updates.Exact patch pin
==3.12.10is brittle for deploys and CI.Apply:
-requires-python = "==3.12.10" +requires-python = ">=3.12,<3.13"
10-10: Keep duckdb dependency; pin its major version (<2.0).applications/datamanager/src/datamanager/s3_client.py (line 15) calls duckdb.connect and tests patch duckdb.connect (applications/datamanager/tests/test_s3_client.py) — do not remove; instead pin the dependency in pyproject.
dependencies = [ - "duckdb>=1.4.0", + "duckdb>=1.4.0,<2.0", ]applications/datamanager/src/routes/equity.rs (9)
5-5: Remove unused DuckDB import.Not referenced; leave until actually used to avoid warnings.
-use duckdb::{Connection, Result as DuckResult};
6-7: Avoid wildcard prelude; import only what you use.Reduces namespace clutter and compile times.
-use polars::prelude::ParquetWriter; -use polars::prelude::*; +use polars::prelude::{DataFrame, ParquetWriter}; +use polars::df; // if you need the macro in scope; or call as `polars::df!`
26-30: Drop unused DateRangeQuery or gate behind feature.Dead code adds maintenance surface.
-#[derive(serde::Deserialize)] -struct DateRangeQuery { - start_date: NaiveDate, - end_date: NaiveDate, -}
72-86: Avoid cloning the whole DataFrame just to get &mut.Clone once into a local mutable var.
- let mut buffer = Vec::new(); - { - let cursor = Cursor::new(&mut buffer); - let writer = ParquetWriter::new(cursor); - match writer.finish(&mut df.clone()) { + let mut buffer = Vec::new(); + { + let cursor = Cursor::new(&mut buffer); + let writer = ParquetWriter::new(cursor); + let mut df = df.clone(); + match writer.finish(&mut df) { Ok(_) => {
97-98: Set a Parquet-specific content type.Helps downstream consumers infer format.
- .content_type("application/octet-stream") + .content_type("application/x-parquet")
112-115: Make GET /equity respond with a useful body or use 204.Current “hello” log + empty body is vestigial.
-async fn fetch(State(_state): State<AppState>) -> Response { - info!("hello"); - (StatusCode::OK).into_response() -} +async fn fetch(State(_state): State<AppState>) -> Response { + (StatusCode::NO_CONTENT).into_response() +}
118-118: Minor log nit: label mismatch.
info!("name: {}", payload.date);→ label asdatefor clarity.- info!("name: {}", payload.date); + info!("date: {}", payload.date);
249-253: Avoid returning raw upstream text on internal errors.Keep error bodies small and generic.
- (StatusCode::INTERNAL_SERVER_ERROR, raw_text).into_response() + ( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to construct DataFrame", + ).into_response()
162-172: Parse into PolygonResponse directly to cut a full JSON pass and clone.You already defined the type; use it to simplify and remove a
Valueround-trip.- let json_value: serde_json::Value = match serde_json::from_str(&raw_text) { - Ok(value) => value, + let polygon: PolygonResponse = match serde_json::from_str(&raw_text) { + Ok(value) => value, Err(err) => { info!("Failed to parse JSON response: {}", err); return ( StatusCode::INTERNAL_SERVER_ERROR, "Invalid JSON response from API", ) .into_response(); } }; - - let results_array = match json_value.get("results") { - Some(results) => results, + let results_array = match polygon.results { + Some(results) => serde_json::to_value(results).unwrap_or_default(), None => { info!("No results field found in response"); return ( StatusCode::NO_CONTENT, - "No market data available for this date", + (), ) .into_response(); } };Also applies to: 48-56
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
.flox/env/manifest.lockis excluded by!**/*.lockuv.lockis excluded by!**/*.lock
📒 Files selected for processing (6)
.gitignore(1 hunks)applications/datamanager/pyproject.toml(1 hunks)applications/datamanager/src/routes/equity.rs(1 hunks)applications/portfoliomanager/pyproject.toml(1 hunks)libraries/python/src/internal/mhsa_network.py(1 hunks)libraries/python/src/internal/tft_model.py(0 hunks)
💤 Files with no reviewable changes (1)
- libraries/python/src/internal/tft_model.py
✅ Files skipped from review due to trivial changes (1)
- applications/portfoliomanager/pyproject.toml
🧰 Additional context used
🧬 Code graph analysis (1)
libraries/python/src/internal/mhsa_network.py (1)
libraries/python/tests/test_mhsa_network.py (4)
test_multi_head_attention_forward(17-28)test_multi_head_attention_single_sequence(45-51)test_multi_head_attention_longer_sequences(54-61)test_multi_head_attention_different_heads(31-42)
🪛 Ruff (0.12.2)
libraries/python/src/internal/mhsa_network.py
39-39: Unused noqa directive (non-enabled: ERA001)
Remove unused noqa directive
(RUF100)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Run Python tests
- GitHub Check: Run Python quality checks
🔇 Additional comments (2)
applications/datamanager/src/routes/equity.rs (1)
124-132: Resolved — reqwest Client timeout already set.
.state.client is built with .timeout(std::time::Duration::from_secs(10)) in applications/datamanager/src/main.rs:36; no action required.applications/datamanager/pyproject.toml (1)
13-15: Confirm uv packaging excludes Rust/Cargo artifactsSandbox couldn't run
uv(uv: command not found); I couldn't inspect dist outputs. Run locally and paste the outputs:uv build unzip -l dist/*.whl | sed -n '1,200p' tar -tzf dist/*.tar.gz | sed -n '1,200p' # quick grep for Rust/Cargo files unzip -l dist/*.whl | grep -E 'Cargo(\.toml|\.lock)?|target/|\.rs$|build\.rs' || true tar -tzf dist/*.tar.gz | grep -E 'Cargo(\.toml|\.lock)?|target/|\.rs$|build\.rs' || trueCheck for Cargo.toml, Cargo.lock, target/, .rs, build.rs. If any appear, add excludes to [tool.uv] in applications/datamanager/pyproject.toml (e.g., exclude = ["/Cargo.toml","/Cargo.lock","/target/","**/.rs","**/build.rs"]).

Overview
Changes
Comments
Summary by CodeRabbit
New Features
Chores
Refactor