Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 33 additions & 22 deletions .claude/settings.local.json
Original file line number Diff line number Diff line change
@@ -1,24 +1,35 @@
{
"permissions": {
"allow": [
"Bash(cat:*)",
"Bash(curl:*)",
"Bash(docker:*)",
"Bash(flox activate:*)",
"Bash(gh issue:*)",
"Bash(mask:*)",
"Bash(mkdir:*)",
"Bash(pre-commit:*)",
"Bash(pulumi:*)",
"Bash(rm:*)",
"Bash(timeout 5 curl -I http://localhost:*)",
"Bash(uv:*)",
"WebFetch(domain:docs.github.com)",
"WebSearch"
],
"deny": [],
"defaultMode": "acceptEdits"
},
"enableAllProjectMcpServers": false
"permissions": {
"allow": [
"Bash(cat:*)",
"Bash(curl:*)",
"Bash(docker:*)",
"Bash(flox activate:*)",
"Bash(gh issue:*)",
"Bash(mask:*)",
"Bash(mkdir:*)",
"Bash(pre-commit:*)",
"Bash(pulumi:*)",
"Bash(rm:*)",
"Bash(timeout 5 curl -I http://localhost:*)",
"Bash(uv:*)",
"WebFetch(domain:docs.github.com)",
"WebSearch",
"Bash(aws ecr describe-repositories:*)",
"Bash(aws ecs describe-services:*)",
"Bash(aws logs tail:*)",
"Bash(aws logs:*)",
"Bash(xargs:*)",
"Bash(aws ecs describe-task-definition:*)",
"Bash(aws s3 cp:*)",
"Bash(python3:*)",
"Bash(aws s3 ls:*)",
"Bash(find:*)",
"Bash(gh api:*)",
"Bash(gh pr view:*)"
Comment on lines +18 to +29
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Restrict broad execution permissions (python3/xargs) to least privilege.

Line 22 and Line 25 add Bash(xargs:*) and Bash(python3:*), which effectively permit arbitrary code execution and command chaining. If this file is shared in-repo, it weakens the permission boundary. Consider removing these entries or narrowing them to vetted scripts/flags.

🔧 Suggested tightening (remove the broad entries unless strictly required)
-      "Bash(xargs:*)",
-      "Bash(python3:*)",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"Bash(aws ecr describe-repositories:*)",
"Bash(aws ecs describe-services:*)",
"Bash(aws logs tail:*)",
"Bash(aws logs:*)",
"Bash(xargs:*)",
"Bash(aws ecs describe-task-definition:*)",
"Bash(aws s3 cp:*)",
"Bash(python3:*)",
"Bash(aws s3 ls:*)",
"Bash(find:*)",
"Bash(gh api:*)",
"Bash(gh pr view:*)"
"Bash(aws ecr describe-repositories:*)",
"Bash(aws ecs describe-services:*)",
"Bash(aws logs tail:*)",
"Bash(aws logs:*)",
"Bash(aws ecs describe-task-definition:*)",
"Bash(aws s3 cp:*)",
"Bash(aws s3 ls:*)",
"Bash(find:*)",
"Bash(gh api:*)",
"Bash(gh pr view:*)"
🤖 Prompt for AI Agents
In @.claude/settings.local.json around lines 18 - 29, The entries
"Bash(xargs:*)" and "Bash(python3:*)" grant overly broad execution rights;
remove these two lines from the .claude/settings.local.json file or replace them
with narrowed, explicit command patterns (e.g., specific scripts or vetted
flags) so only required, least-privilege invocations are allowed; update any
documentation/tests that expect those broad permissions and run a quick
permission audit to confirm no other wildcard Bash(...) entries remain
unnecessarily permissive.

],
"deny": [],
"defaultMode": "acceptEdits"
},
"enableAllProjectMcpServers": false
}

2 changes: 1 addition & 1 deletion .flox/env/manifest.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3269,4 +3269,4 @@
"priority": 5
}
]
}
}
3 changes: 3 additions & 0 deletions applications/datamanager/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ COPY applications/datamanager/Cargo.toml ./applications/datamanager/Cargo.toml

COPY applications/datamanager/src/ applications/datamanager/src/

ENV DUCKDB_LIB_DIR=/usr/local/lib
ENV DUCKDB_INCLUDE_DIR=/usr/local/include

RUN --mount=type=cache,target=/usr/local/cargo/registry \
--mount=type=cache,target=/app/target \
cargo build --release --bin datamanager && \
Expand Down
100 changes: 92 additions & 8 deletions applications/datamanager/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::errors::Error;
use polars::prelude::*;
use serde::Deserialize;
use std::io::Cursor;
use tracing::{debug, info, warn};

#[derive(Debug, Deserialize)]
pub struct EquityBar {
Expand All @@ -17,6 +18,11 @@ pub struct EquityBar {
}

pub fn create_equity_bar_dataframe(equity_bars_rows: Vec<EquityBar>) -> Result<DataFrame, Error> {
debug!(
"Creating equity bar DataFrame from {} rows",
equity_bars_rows.len()
);

let equity_bars_dataframe = df!(
"ticker" => equity_bars_rows.iter().map(|b| b.ticker.as_str()).collect::<Vec<_>>(),
"timestamp" => equity_bars_rows.iter().map(|b| b.timestamp).collect::<Vec<_>>(),
Expand All @@ -28,13 +34,23 @@ pub fn create_equity_bar_dataframe(equity_bars_rows: Vec<EquityBar>) -> Result<D
"volume_weighted_average_price" => equity_bars_rows.iter().map(|b| b.volume_weighted_average_price).collect::<Vec<_>>(),
"transactions" => equity_bars_rows.iter().map(|b| b.transactions).collect::<Vec<_>>(),
)
.map_err(|e| Error::Other(format!("Failed to create DataFrame: {}", e)))?;
.map_err(|e| {
warn!("Failed to create equity bar DataFrame: {}", e);
Error::Other(format!("Failed to create DataFrame: {}", e))
})?;

debug!("Normalizing ticker column to uppercase");
let equity_bars_dataframe = equity_bars_dataframe
.lazy()
.with_columns([col("ticker").str().to_uppercase().alias("ticker")])
.collect()?;

info!(
"Created equity bar DataFrame: {} rows x {} columns",
equity_bars_dataframe.height(),
equity_bars_dataframe.width()
);

Ok(equity_bars_dataframe)
}

Expand All @@ -48,21 +64,36 @@ pub struct Prediction {
}

pub fn create_predictions_dataframe(prediction_rows: Vec<Prediction>) -> Result<DataFrame, Error> {
debug!(
"Creating predictions DataFrame from {} rows",
prediction_rows.len()
);

let prediction_dataframe = df!(
"ticker" => prediction_rows.iter().map(|p| p.ticker.as_str()).collect::<Vec<_>>(),
"timestamp" => prediction_rows.iter().map(|p| p.timestamp).collect::<Vec<_>>(),
"quantile_10" => prediction_rows.iter().map(|p| p.quantile_10).collect::<Vec<_>>(),
"quantile_50" => prediction_rows.iter().map(|p| p.quantile_50).collect::<Vec<_>>(),
"quantile_90" => prediction_rows.iter().map(|p| p.quantile_90).collect::<Vec<_>>(),
)
.map_err(|e| Error::Other(format!("Failed to create DataFrame: {}", e)))?;
.map_err(|e| {
warn!("Failed to create predictions DataFrame: {}", e);
Error::Other(format!("Failed to create DataFrame: {}", e))
})?;

debug!("Normalizing ticker column to uppercase");
let unfiltered_prediction_dataframe = prediction_dataframe
.lazy()
.with_columns([col("ticker").str().to_uppercase().alias("ticker")])
.collect()?;

debug!(
"Unfiltered predictions DataFrame has {} rows",
unfiltered_prediction_dataframe.height()
);

// filtering necessary due to potentially overlapping tickers in predictions parquet files
debug!("Filtering to keep only most recent prediction per ticker");
let filtered_prediction_dataframe = unfiltered_prediction_dataframe
.lazy()
.with_columns([col("timestamp")
Expand All @@ -79,6 +110,13 @@ pub fn create_predictions_dataframe(prediction_rows: Vec<Prediction>) -> Result<
])
.collect()?;

info!(
"Created predictions DataFrame: {} rows x {} columns (filtered from {} input rows)",
filtered_prediction_dataframe.height(),
filtered_prediction_dataframe.width(),
prediction_rows.len()
);

Ok(filtered_prediction_dataframe)
}

Expand All @@ -92,46 +130,83 @@ pub struct Portfolio {
}

pub fn create_portfolio_dataframe(portfolio_rows: Vec<Portfolio>) -> Result<DataFrame, Error> {
debug!(
"Creating portfolio DataFrame from {} rows",
portfolio_rows.len()
);

let portfolio_dataframe = df!(
"ticker" => portfolio_rows.iter().map(|p| p.ticker.as_str()).collect::<Vec<&str>>(),
"timestamp" => portfolio_rows.iter().map(|p| p.timestamp).collect::<Vec<i64>>(),
"side" => portfolio_rows.iter().map(|p| p.side.as_str()).collect::<Vec<&str>>(),
"dollar_amount" => portfolio_rows.iter().map(|p| p.dollar_amount).collect::<Vec<f64>>(),
"action" => portfolio_rows.iter().map(|p| p.action.as_str()).collect::<Vec<&str>>(),
)
.map_err(|e| Error::Other(format!("Failed to create DataFrame: {}", e)))?;
.map_err(|e| {
warn!("Failed to create portfolio DataFrame: {}", e);
Error::Other(format!("Failed to create DataFrame: {}", e))
})?;

debug!("Normalizing ticker, side, and action columns to uppercase");
let portfolio_dataframe = portfolio_dataframe
.lazy()
.with_columns([col("ticker").str().to_uppercase().alias("ticker")])
.with_columns([col("side").str().to_uppercase().alias("side")])
.with_columns([col("action").str().to_uppercase().alias("action")])
.collect()?;

info!(
"Created portfolio DataFrame: {} rows x {} columns",
portfolio_dataframe.height(),
portfolio_dataframe.width()
);

Ok(portfolio_dataframe)
}

pub fn create_equity_details_dataframe(csv_content: String) -> Result<DataFrame, Error> {
debug!(
"Creating equity details DataFrame from CSV ({} bytes)",
csv_content.len()
);

let cursor = Cursor::new(csv_content.as_bytes());
let mut dataframe = CsvReadOptions::default()
.with_has_header(true)
.into_reader_with_file_handle(cursor)
.finish()
.map_err(|e| Error::Other(format!("Failed to parse CSV: {}", e)))?;
.map_err(|e| {
warn!("Failed to parse CSV: {}", e);
Error::Other(format!("Failed to parse CSV: {}", e))
})?;

debug!(
"Parsed CSV into DataFrame: {} rows x {} columns",
dataframe.height(),
dataframe.width()
);

let required_columns = vec!["sector", "industry"];
let column_names = dataframe.get_column_names();

debug!("Available columns: {:?}", column_names);
debug!("Required columns: {:?}", required_columns);

for column in &required_columns {
if !column_names.iter().any(|c| c.as_str() == *column) {
let message = format!("CSV missing required column: {}", column);
warn!("{}", message);
return Err(Error::Other(message));
}
}

dataframe = dataframe
.select(required_columns)
.map_err(|e| Error::Other(format!("Failed to select columns: {}", e)))?;
debug!("All required columns present, selecting subset");
dataframe = dataframe.select(required_columns).map_err(|e| {
warn!("Failed to select columns: {}", e);
Error::Other(format!("Failed to select columns: {}", e))
})?;

debug!("Normalizing sector and industry columns to uppercase and filling nulls");
let equity_details_dataframe = dataframe
.lazy()
.with_columns([
Expand All @@ -147,7 +222,16 @@ pub fn create_equity_details_dataframe(csv_content: String) -> Result<DataFrame,
.alias("industry"),
])
.collect()
.map_err(|e| Error::Other(format!("Failed to transform columns: {}", e)))?;
.map_err(|e| {
warn!("Failed to transform columns: {}", e);
Error::Other(format!("Failed to transform columns: {}", e))
})?;

info!(
"Created equity details DataFrame: {} rows x {} columns",
equity_details_dataframe.height(),
equity_details_dataframe.width()
);

Ok(equity_details_dataframe)
}
Loading
Loading