Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5cba691
Build initial risk/data management functionality for statistical arbi…
forstmeier Mar 5, 2026
2200d7b
Address PR #785 review feedback: zero-price guard and pairs schema va…
forstmeier Mar 5, 2026
3d1f569
Expand portfolio manager rebalancing logic
forstmeier Mar 6, 2026
43d095f
Address PR #786 feedback: stop-loss boundary, test determinism, and e…
forstmeier Mar 7, 2026
1d8462e
Fix CI test collection failure and warning in portfoliomanager tests
forstmeier Mar 7, 2026
dde3654
Restore fail-fast credential check via FastAPI lifespan
forstmeier Mar 7, 2026
879a7be
Address PR #786 follow-up feedback: schema enforcement and boundary test
forstmeier Mar 7, 2026
bb22373
Catch PolarsError in get_prior_portfolio after schema enforcement
forstmeier Mar 7, 2026
61d15fb
Specify aggregate_function in pivot to handle duplicate timestamps
forstmeier Mar 7, 2026
8d6a751
Raise _MINIMUM_PAIR_PRICE_ROWS to 30 for statistically stable z-scores
forstmeier Mar 7, 2026
0e8a317
Reorder test values
forstmeier Mar 7, 2026
924b0d8
Build initial beta/regime risk management features
forstmeier Mar 8, 2026
3e1dcb9
Fix off-by-one guard and dead code in classify_regime; annotate compu…
forstmeier Mar 8, 2026
f889301
Address PR #785 review feedback: constants, RNG modernization, zero-c…
forstmeier Mar 8, 2026
6f9a27d
Address PR #785 review feedback: defensive fixes for stat arb data pi…
forstmeier Mar 8, 2026
68a8a31
Address PR #785 review feedback: pair_id propagation and group_by ord…
forstmeier Mar 8, 2026
f710475
Address PR #787 review feedback: NaN guard, zero-weight guard, confid…
forstmeier Mar 8, 2026
4968dde
Address PR #787 review feedback: beta guard off-by-one, float equalit…
forstmeier Mar 9, 2026
ee568c3
Guard SPY prices against non-positive values before np.log in beta an…
forstmeier Mar 9, 2026
3b38a1f
Update floating-point equality operation
forstmeier Mar 9, 2026
c63ab2e
Merge pull request #787 from oscmcompany/statistical-arbitrage-phase-…
forstmeier Mar 9, 2026
8a86b1c
Resolve merge conflicts with phase one branch
forstmeier Mar 9, 2026
6418c5b
Merge pull request #786 from oscmcompany/statistical-arbitrage-phase-two
forstmeier Mar 9, 2026
a93674f
Add pull request feedback/add future reporter logic from manual testi…
forstmeier Mar 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flox/env/manifest.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4074,4 +4074,4 @@
"priority": 5
}
]
}
}
2 changes: 2 additions & 0 deletions applications/datamanager/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ pub struct Portfolio {
pub side: String,
pub dollar_amount: f64,
pub action: String,
pub pair_id: String,
}

pub fn create_portfolio_dataframe(portfolio_rows: Vec<Portfolio>) -> Result<DataFrame, Error> {
Expand All @@ -138,6 +139,7 @@ pub fn create_portfolio_dataframe(portfolio_rows: Vec<Portfolio>) -> Result<Data
"side" => portfolio_rows.iter().map(|p| p.side.as_str()).collect::<Vec<&str>>(),
"dollar_amount" => portfolio_rows.iter().map(|p| p.dollar_amount).collect::<Vec<f64>>(),
"action" => portfolio_rows.iter().map(|p| p.action.as_str()).collect::<Vec<&str>>(),
"pair_id" => portfolio_rows.iter().map(|p| p.pair_id.as_str()).collect::<Vec<&str>>(),
)?;

debug!("Normalizing ticker, side, and action columns to uppercase");
Expand Down
21 changes: 13 additions & 8 deletions applications/datamanager/src/portfolios.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,13 @@ pub async fn get(
match query_portfolio_dataframe_from_s3(&state, timestamp).await {
Ok(dataframe) => {
if dataframe.height() == 0 {
warn!("No portfolio data found - this is expected on first run");
return (StatusCode::NOT_FOUND, "No portfolio data found").into_response();
info!("No portfolio data found, returning empty array");
return (
StatusCode::OK,
[(axum::http::header::CONTENT_TYPE, "application/json")],
"[]".to_string(),
)
.into_response();
}

// Convert DataFrame to JSON array
Expand Down Expand Up @@ -110,17 +115,17 @@ pub async fn get(
}
Err(err) => {
let err_str = err.to_string();
// Check if error indicates no files found (expected on first run)
if err_str.contains("No files found")
|| err_str.contains("Could not find")
|| err_str.contains("does not exist")
|| err_str.contains("Invalid Input")
{
warn!(
"No portfolio files in S3 - this is expected on first run: {}",
err
);
return (StatusCode::NOT_FOUND, "No portfolio data found - first run")
info!("No portfolio files in S3, returning empty array");
return (
StatusCode::OK,
[(axum::http::header::CONTENT_TYPE, "application/json")],
"[]".to_string(),
)
.into_response();
}
warn!("Failed to fetch portfolio from S3: {}", err);
Expand Down
114 changes: 14 additions & 100 deletions applications/datamanager/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ pub async fn query_portfolio_dataframe_from_s3(
);
let connection = create_duckdb_connection().await?;

let (query_with_action, query_without_action) = match timestamp {
let query = match timestamp {
Some(ts) => {
let year = ts.format("%Y");
let month = ts.format("%m");
Expand All @@ -568,34 +568,20 @@ pub async fn query_portfolio_dataframe_from_s3(
year, month, day
);

let with_action = format!(
format!(
"
SELECT
ticker,
timestamp,
side,
dollar_amount,
action
action,
pair_id
FROM '{}'
ORDER BY timestamp, ticker
",
s3_path
);

let without_action = format!(
"
SELECT
ticker,
timestamp,
side,
dollar_amount
FROM '{}'
ORDER BY timestamp, ticker
",
s3_path
);

(with_action, without_action)
)
}
None => {
let s3_wildcard = format!(
Expand All @@ -607,7 +593,7 @@ pub async fn query_portfolio_dataframe_from_s3(
s3_wildcard
);

let with_action = format!(
format!(
"
WITH partitioned_data AS (
SELECT
Expand All @@ -616,6 +602,7 @@ pub async fn query_portfolio_dataframe_from_s3(
side,
dollar_amount,
action,
pair_id,
year,
month,
day
Expand All @@ -630,62 +617,18 @@ pub async fn query_portfolio_dataframe_from_s3(
timestamp,
side,
dollar_amount,
action
FROM partitioned_data
WHERE (year::int * 10000 + month::int * 100 + day::int) = (SELECT date_int FROM max_date)
ORDER BY timestamp, ticker
",
s3_wildcard
);

let without_action = format!(
"
WITH partitioned_data AS (
SELECT
ticker,
timestamp,
side,
dollar_amount,
year,
month,
day
FROM read_parquet('{}', hive_partitioning = true)
),
max_date AS (
SELECT MAX(year::int * 10000 + month::int * 100 + day::int) as date_int
FROM partitioned_data
)
SELECT
ticker,
timestamp,
side,
dollar_amount
action,
pair_id
FROM partitioned_data
WHERE (year::int * 10000 + month::int * 100 + day::int) = (SELECT date_int FROM max_date)
ORDER BY timestamp, ticker
",
s3_wildcard
);

(with_action, without_action)
)
}
};

// Try query with action column first, fall back to query without if column doesn't exist
let portfolios = match execute_portfolio_query_with_action(&connection, &query_with_action) {
Ok(portfolios) => portfolios,
Err(e) => {
let err_str = e.to_string();
if err_str.contains("action") && err_str.contains("not found") {
info!(
"Action column not found in parquet, using fallback query with default action"
);
execute_portfolio_query_without_action(&connection, &query_without_action)?
} else {
return Err(e);
}
}
};
let portfolios = execute_portfolio_query(&connection, &query)?;

info!("Query returned {} portfolio records", portfolios.len());

Expand All @@ -700,11 +643,8 @@ pub async fn query_portfolio_dataframe_from_s3(
Ok(portfolio_dataframe)
}

fn execute_portfolio_query_with_action(
connection: &Connection,
query: &str,
) -> Result<Vec<Portfolio>, Error> {
debug!("Executing query with action column: {}", query);
fn execute_portfolio_query(connection: &Connection, query: &str) -> Result<Vec<Portfolio>, Error> {
debug!("Executing portfolio query: {}", query);

let mut statement = connection.prepare(query)?;

Expand All @@ -716,33 +656,7 @@ fn execute_portfolio_query_with_action(
side: row.get::<_, String>(2)?,
dollar_amount: row.get::<_, f64>(3)?,
action: row.get::<_, String>(4)?,
})
})?
.collect::<Result<Vec<_>, _>>()
.map_err(|e| {
warn!("Failed to map portfolio query results: {}", e);
Error::Other(format!("Failed to map query results: {}", e))
})?;

Ok(portfolios)
}

fn execute_portfolio_query_without_action(
connection: &Connection,
query: &str,
) -> Result<Vec<Portfolio>, Error> {
debug!("Executing query without action column: {}", query);

let mut statement = connection.prepare(query)?;

let portfolios: Vec<Portfolio> = statement
.query_map([], |row| {
Ok(Portfolio {
ticker: row.get::<_, String>(0)?,
timestamp: row.get::<_, f64>(1)?,
side: row.get::<_, String>(2)?,
dollar_amount: row.get::<_, f64>(3)?,
action: "UNSPECIFIED".to_string(),
pair_id: row.get::<_, String>(5)?,
})
})?
.collect::<Result<Vec<_>, _>>()
Expand Down
15 changes: 12 additions & 3 deletions applications/datamanager/tests/test_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ fn sample_portfolio() -> Portfolio {
side: "long".to_string(),
dollar_amount: 10000.0,
action: "hold".to_string(),
pair_id: "AAPL-GOOGL".to_string(),
}
}

Expand All @@ -78,6 +79,7 @@ fn sample_portfolio_lowercase() -> Portfolio {
side: "short".to_string(),
dollar_amount: 5000.0,
action: "sell".to_string(),
pair_id: "aapl-googl".to_string(),
}
}

Expand Down Expand Up @@ -328,12 +330,13 @@ fn test_create_portfolio_dataframe_valid_data() {
let df = create_portfolio_dataframe(portfolios).unwrap();

assert_eq!(df.height(), 1);
assert_eq!(df.width(), 5);
assert_eq!(df.width(), 6);
assert!(df.column("ticker").is_ok());
assert!(df.column("timestamp").is_ok());
assert!(df.column("side").is_ok());
assert!(df.column("dollar_amount").is_ok());
assert!(df.column("action").is_ok());
assert!(df.column("pair_id").is_ok());
}

#[test]
Expand Down Expand Up @@ -363,13 +366,15 @@ fn test_create_portfolio_dataframe_mixed_case() {
side: "long".to_string(),
dollar_amount: 10000.0,
action: "buy".to_string(),
pair_id: "AAPL-GOOGL".to_string(),
},
Portfolio {
ticker: "GOOGL".to_string(),
timestamp: 1234567890.0,
side: "SHORT".to_string(),
dollar_amount: 5000.0,
action: "Sell".to_string(),
pair_id: "AAPL-GOOGL".to_string(),
},
];

Expand Down Expand Up @@ -416,7 +421,7 @@ fn test_create_portfolio_dataframe_empty_vec() {
let df = create_portfolio_dataframe(portfolios).unwrap();

assert_eq!(df.height(), 0);
assert_eq!(df.width(), 5);
assert_eq!(df.width(), 6);
}

// Tests for create_equity_details_dataframe
Expand Down Expand Up @@ -643,17 +648,21 @@ fn test_portfolio_dataframe_parquet_roundtrip() {
let cursor = Cursor::new(buffer);
let deserialized_df = ParquetReader::new(cursor).finish().unwrap();

assert_eq!(deserialized_df.width(), 5);
assert_eq!(deserialized_df.width(), 6);
assert_eq!(deserialized_df.height(), 1);

assert!(deserialized_df.column("ticker").is_ok());
assert!(deserialized_df.column("timestamp").is_ok());
assert!(deserialized_df.column("side").is_ok());
assert!(deserialized_df.column("dollar_amount").is_ok());
assert!(deserialized_df.column("action").is_ok());
assert!(deserialized_df.column("pair_id").is_ok());

let ticker_series = deserialized_df.column("ticker").unwrap();
assert_eq!(ticker_series.str().unwrap().get(0).unwrap(), "AAPL");

let pair_id_series = deserialized_df.column("pair_id").unwrap();
assert_eq!(pair_id_series.str().unwrap().get(0).unwrap(), "AAPL-GOOGL");
}

#[test]
Expand Down
16 changes: 10 additions & 6 deletions applications/datamanager/tests/test_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ async fn test_portfolios_save_and_get_round_trip() {
"timestamp": 1735689600.0,
"side": "long",
"dollar_amount": 10000.0,
"action": "buy"
"action": "buy",
"pair_id": "AAPL-MSFT"
}],
"timestamp": "2025-01-01T00:00:00Z"
}"#;
Expand Down Expand Up @@ -257,7 +258,8 @@ async fn test_portfolios_save_returns_internal_server_error_when_s3_upload_fails
"timestamp": 1735689600.0,
"side": "long",
"dollar_amount": 10000.0,
"action": "buy"
"action": "buy",
"pair_id": "AAPL-MSFT"
}],
"timestamp": "2025-01-01T00:00:00Z"
}"#;
Expand All @@ -274,7 +276,7 @@ async fn test_portfolios_save_returns_internal_server_error_when_s3_upload_fails

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_portfolios_get_returns_not_found_for_first_run_without_files() {
async fn test_portfolios_get_returns_empty_array_for_first_run_without_files() {
let (endpoint, _s3, _env_guard) = setup_test_bucket().await;
let (app, _env_guard) = spawn_app(&endpoint, "http://127.0.0.1:1".to_string()).await;

Expand All @@ -283,12 +285,13 @@ async fn test_portfolios_get_returns_not_found_for_first_run_without_files() {
.send()
.await
.unwrap();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
assert_eq!(response.status(), StatusCode::OK);
assert_eq!(response.text().await.unwrap(), "[]");
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn test_portfolios_get_returns_not_found_when_portfolio_file_is_empty() {
async fn test_portfolios_get_returns_empty_array_when_portfolio_file_is_empty() {
let (endpoint, _s3, _env_guard) = setup_test_bucket().await;
let (app, _env_guard) = spawn_app(&endpoint, "http://127.0.0.1:1".to_string()).await;
let client = reqwest::Client::new();
Expand All @@ -312,7 +315,8 @@ async fn test_portfolios_get_returns_not_found_when_portfolio_file_is_empty() {
.send()
.await
.unwrap();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
assert_eq!(response.status(), StatusCode::OK);
assert_eq!(response.text().await.unwrap(), "[]");
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
Expand Down
Loading
Loading