diff --git a/applications/datamanager/src/data.rs b/applications/datamanager/src/data.rs index d54ea9bf..60c357d6 100644 --- a/applications/datamanager/src/data.rs +++ b/applications/datamanager/src/data.rs @@ -124,6 +124,7 @@ pub struct Portfolio { pub side: String, pub dollar_amount: f64, pub action: String, + pub pair_id: String, } pub fn create_portfolio_dataframe(portfolio_rows: Vec) -> Result { @@ -138,6 +139,7 @@ pub fn create_portfolio_dataframe(portfolio_rows: Vec) -> Result portfolio_rows.iter().map(|p| p.side.as_str()).collect::>(), "dollar_amount" => portfolio_rows.iter().map(|p| p.dollar_amount).collect::>(), "action" => portfolio_rows.iter().map(|p| p.action.as_str()).collect::>(), + "pair_id" => portfolio_rows.iter().map(|p| p.pair_id.as_str()).collect::>(), )?; debug!("Normalizing ticker, side, and action columns to uppercase"); diff --git a/applications/datamanager/src/portfolios.rs b/applications/datamanager/src/portfolios.rs index 8d63743f..4bcaaf80 100644 --- a/applications/datamanager/src/portfolios.rs +++ b/applications/datamanager/src/portfolios.rs @@ -74,8 +74,13 @@ pub async fn get( match query_portfolio_dataframe_from_s3(&state, timestamp).await { Ok(dataframe) => { if dataframe.height() == 0 { - warn!("No portfolio data found - this is expected on first run"); - return (StatusCode::NOT_FOUND, "No portfolio data found").into_response(); + info!("No portfolio data found, returning empty array"); + return ( + StatusCode::OK, + [(axum::http::header::CONTENT_TYPE, "application/json")], + "[]".to_string(), + ) + .into_response(); } // Convert DataFrame to JSON array @@ -110,17 +115,17 @@ pub async fn get( } Err(err) => { let err_str = err.to_string(); - // Check if error indicates no files found (expected on first run) if err_str.contains("No files found") || err_str.contains("Could not find") || err_str.contains("does not exist") || err_str.contains("Invalid Input") { - warn!( - "No portfolio files in S3 - this is expected on first run: {}", - err - ); - return (StatusCode::NOT_FOUND, "No portfolio data found - first run") + info!("No portfolio files in S3, returning empty array"); + return ( + StatusCode::OK, + [(axum::http::header::CONTENT_TYPE, "application/json")], + "[]".to_string(), + ) .into_response(); } warn!("Failed to fetch portfolio from S3: {}", err); diff --git a/applications/datamanager/src/storage.rs b/applications/datamanager/src/storage.rs index 40f0ebd5..9274379d 100644 --- a/applications/datamanager/src/storage.rs +++ b/applications/datamanager/src/storage.rs @@ -554,7 +554,7 @@ pub async fn query_portfolio_dataframe_from_s3( ); let connection = create_duckdb_connection().await?; - let (query_with_action, query_without_action) = match timestamp { + let query = match timestamp { Some(ts) => { let year = ts.format("%Y"); let month = ts.format("%m"); @@ -568,34 +568,20 @@ pub async fn query_portfolio_dataframe_from_s3( year, month, day ); - let with_action = format!( + format!( " SELECT ticker, timestamp, side, dollar_amount, - action + action, + pair_id FROM '{}' ORDER BY timestamp, ticker ", s3_path - ); - - let without_action = format!( - " - SELECT - ticker, - timestamp, - side, - dollar_amount - FROM '{}' - ORDER BY timestamp, ticker - ", - s3_path - ); - - (with_action, without_action) + ) } None => { let s3_wildcard = format!( @@ -607,7 +593,7 @@ pub async fn query_portfolio_dataframe_from_s3( s3_wildcard ); - let with_action = format!( + format!( " WITH partitioned_data AS ( SELECT @@ -616,6 +602,7 @@ pub async fn query_portfolio_dataframe_from_s3( side, dollar_amount, action, + pair_id, year, month, day @@ -630,62 +617,18 @@ pub async fn query_portfolio_dataframe_from_s3( timestamp, side, dollar_amount, - action - FROM partitioned_data - WHERE (year::int * 10000 + month::int * 100 + day::int) = (SELECT date_int FROM max_date) - ORDER BY timestamp, ticker - ", - s3_wildcard - ); - - let without_action = format!( - " - WITH partitioned_data AS ( - SELECT - ticker, - timestamp, - side, - dollar_amount, - year, - month, - day - FROM read_parquet('{}', hive_partitioning = true) - ), - max_date AS ( - SELECT MAX(year::int * 10000 + month::int * 100 + day::int) as date_int - FROM partitioned_data - ) - SELECT - ticker, - timestamp, - side, - dollar_amount + action, + pair_id FROM partitioned_data WHERE (year::int * 10000 + month::int * 100 + day::int) = (SELECT date_int FROM max_date) ORDER BY timestamp, ticker ", s3_wildcard - ); - - (with_action, without_action) + ) } }; - // Try query with action column first, fall back to query without if column doesn't exist - let portfolios = match execute_portfolio_query_with_action(&connection, &query_with_action) { - Ok(portfolios) => portfolios, - Err(e) => { - let err_str = e.to_string(); - if err_str.contains("action") && err_str.contains("not found") { - info!( - "Action column not found in parquet, using fallback query with default action" - ); - execute_portfolio_query_without_action(&connection, &query_without_action)? - } else { - return Err(e); - } - } - }; + let portfolios = execute_portfolio_query(&connection, &query)?; info!("Query returned {} portfolio records", portfolios.len()); @@ -700,11 +643,8 @@ pub async fn query_portfolio_dataframe_from_s3( Ok(portfolio_dataframe) } -fn execute_portfolio_query_with_action( - connection: &Connection, - query: &str, -) -> Result, Error> { - debug!("Executing query with action column: {}", query); +fn execute_portfolio_query(connection: &Connection, query: &str) -> Result, Error> { + debug!("Executing portfolio query: {}", query); let mut statement = connection.prepare(query)?; @@ -716,33 +656,7 @@ fn execute_portfolio_query_with_action( side: row.get::<_, String>(2)?, dollar_amount: row.get::<_, f64>(3)?, action: row.get::<_, String>(4)?, - }) - })? - .collect::, _>>() - .map_err(|e| { - warn!("Failed to map portfolio query results: {}", e); - Error::Other(format!("Failed to map query results: {}", e)) - })?; - - Ok(portfolios) -} - -fn execute_portfolio_query_without_action( - connection: &Connection, - query: &str, -) -> Result, Error> { - debug!("Executing query without action column: {}", query); - - let mut statement = connection.prepare(query)?; - - let portfolios: Vec = statement - .query_map([], |row| { - Ok(Portfolio { - ticker: row.get::<_, String>(0)?, - timestamp: row.get::<_, f64>(1)?, - side: row.get::<_, String>(2)?, - dollar_amount: row.get::<_, f64>(3)?, - action: "UNSPECIFIED".to_string(), + pair_id: row.get::<_, String>(5)?, }) })? .collect::, _>>() diff --git a/applications/datamanager/tests/test_data.rs b/applications/datamanager/tests/test_data.rs index 5ba2e733..e30c788e 100644 --- a/applications/datamanager/tests/test_data.rs +++ b/applications/datamanager/tests/test_data.rs @@ -67,6 +67,7 @@ fn sample_portfolio() -> Portfolio { side: "long".to_string(), dollar_amount: 10000.0, action: "hold".to_string(), + pair_id: "AAPL-GOOGL".to_string(), } } @@ -78,6 +79,7 @@ fn sample_portfolio_lowercase() -> Portfolio { side: "short".to_string(), dollar_amount: 5000.0, action: "sell".to_string(), + pair_id: "aapl-googl".to_string(), } } @@ -328,12 +330,13 @@ fn test_create_portfolio_dataframe_valid_data() { let df = create_portfolio_dataframe(portfolios).unwrap(); assert_eq!(df.height(), 1); - assert_eq!(df.width(), 5); + assert_eq!(df.width(), 6); assert!(df.column("ticker").is_ok()); assert!(df.column("timestamp").is_ok()); assert!(df.column("side").is_ok()); assert!(df.column("dollar_amount").is_ok()); assert!(df.column("action").is_ok()); + assert!(df.column("pair_id").is_ok()); } #[test] @@ -363,6 +366,7 @@ fn test_create_portfolio_dataframe_mixed_case() { side: "long".to_string(), dollar_amount: 10000.0, action: "buy".to_string(), + pair_id: "AAPL-GOOGL".to_string(), }, Portfolio { ticker: "GOOGL".to_string(), @@ -370,6 +374,7 @@ fn test_create_portfolio_dataframe_mixed_case() { side: "SHORT".to_string(), dollar_amount: 5000.0, action: "Sell".to_string(), + pair_id: "AAPL-GOOGL".to_string(), }, ]; @@ -416,7 +421,7 @@ fn test_create_portfolio_dataframe_empty_vec() { let df = create_portfolio_dataframe(portfolios).unwrap(); assert_eq!(df.height(), 0); - assert_eq!(df.width(), 5); + assert_eq!(df.width(), 6); } // Tests for create_equity_details_dataframe @@ -643,7 +648,7 @@ fn test_portfolio_dataframe_parquet_roundtrip() { let cursor = Cursor::new(buffer); let deserialized_df = ParquetReader::new(cursor).finish().unwrap(); - assert_eq!(deserialized_df.width(), 5); + assert_eq!(deserialized_df.width(), 6); assert_eq!(deserialized_df.height(), 1); assert!(deserialized_df.column("ticker").is_ok()); @@ -651,9 +656,13 @@ fn test_portfolio_dataframe_parquet_roundtrip() { assert!(deserialized_df.column("side").is_ok()); assert!(deserialized_df.column("dollar_amount").is_ok()); assert!(deserialized_df.column("action").is_ok()); + assert!(deserialized_df.column("pair_id").is_ok()); let ticker_series = deserialized_df.column("ticker").unwrap(); assert_eq!(ticker_series.str().unwrap().get(0).unwrap(), "AAPL"); + + let pair_id_series = deserialized_df.column("pair_id").unwrap(); + assert_eq!(pair_id_series.str().unwrap().get(0).unwrap(), "AAPL-GOOGL"); } #[test] diff --git a/applications/datamanager/tests/test_handlers.rs b/applications/datamanager/tests/test_handlers.rs index 8a374754..9a8cefd1 100644 --- a/applications/datamanager/tests/test_handlers.rs +++ b/applications/datamanager/tests/test_handlers.rs @@ -217,7 +217,8 @@ async fn test_portfolios_save_and_get_round_trip() { "timestamp": 1735689600.0, "side": "long", "dollar_amount": 10000.0, - "action": "buy" + "action": "buy", + "pair_id": "AAPL-MSFT" }], "timestamp": "2025-01-01T00:00:00Z" }"#; @@ -257,7 +258,8 @@ async fn test_portfolios_save_returns_internal_server_error_when_s3_upload_fails "timestamp": 1735689600.0, "side": "long", "dollar_amount": 10000.0, - "action": "buy" + "action": "buy", + "pair_id": "AAPL-MSFT" }], "timestamp": "2025-01-01T00:00:00Z" }"#; @@ -274,7 +276,7 @@ async fn test_portfolios_save_returns_internal_server_error_when_s3_upload_fails #[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[serial] -async fn test_portfolios_get_returns_not_found_for_first_run_without_files() { +async fn test_portfolios_get_returns_empty_array_for_first_run_without_files() { let (endpoint, _s3, _env_guard) = setup_test_bucket().await; let (app, _env_guard) = spawn_app(&endpoint, "http://127.0.0.1:1".to_string()).await; @@ -283,12 +285,13 @@ async fn test_portfolios_get_returns_not_found_for_first_run_without_files() { .send() .await .unwrap(); - assert_eq!(response.status(), StatusCode::NOT_FOUND); + assert_eq!(response.status(), StatusCode::OK); + assert_eq!(response.text().await.unwrap(), "[]"); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[serial] -async fn test_portfolios_get_returns_not_found_when_portfolio_file_is_empty() { +async fn test_portfolios_get_returns_empty_array_when_portfolio_file_is_empty() { let (endpoint, _s3, _env_guard) = setup_test_bucket().await; let (app, _env_guard) = spawn_app(&endpoint, "http://127.0.0.1:1".to_string()).await; let client = reqwest::Client::new(); @@ -312,7 +315,8 @@ async fn test_portfolios_get_returns_not_found_when_portfolio_file_is_empty() { .send() .await .unwrap(); - assert_eq!(response.status(), StatusCode::NOT_FOUND); + assert_eq!(response.status(), StatusCode::OK); + assert_eq!(response.text().await.unwrap(), "[]"); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] diff --git a/applications/datamanager/tests/test_storage.rs b/applications/datamanager/tests/test_storage.rs index de50a732..c6388e54 100644 --- a/applications/datamanager/tests/test_storage.rs +++ b/applications/datamanager/tests/test_storage.rs @@ -39,6 +39,7 @@ fn sample_portfolio() -> Portfolio { side: "LONG".to_string(), dollar_amount: 10_000.0, action: "BUY".to_string(), + pair_id: "AAPL-MSFT".to_string(), } } @@ -268,46 +269,6 @@ async fn test_query_portfolio_without_timestamp_uses_latest_partition() { ); } -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -#[serial] -async fn test_query_portfolio_falls_back_when_action_column_is_missing() { - let (endpoint, s3, _env_guard) = setup_test_bucket().await; - let state = create_state(&endpoint).await; - let timestamp = fixed_date_time(); - - let key = format_s3_key(×tamp, "portfolios"); - - let mut dataframe = df!( - "ticker" => vec!["AAPL"], - "timestamp" => vec![1_735_689_600.0], - "side" => vec!["LONG"], - "dollar_amount" => vec![10_000.0], - ) - .unwrap(); - - let mut parquet_bytes = Vec::new(); - ParquetWriter::new(&mut parquet_bytes) - .finish(&mut dataframe) - .unwrap(); - - put_test_object(&s3, &key, parquet_bytes).await; - - let query_results = query_portfolio_dataframe_from_s3(&state, Some(timestamp)) - .await - .unwrap(); - - assert_eq!(query_results.height(), 1); - assert_eq!( - query_results - .column("action") - .unwrap() - .str() - .unwrap() - .get(0), - Some("UNSPECIFIED") - ); -} - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] #[serial] async fn test_write_and_query_equity_bars_round_trip() { diff --git a/applications/portfoliomanager/src/portfoliomanager/beta.py b/applications/portfoliomanager/src/portfoliomanager/beta.py new file mode 100644 index 00000000..1374cb94 --- /dev/null +++ b/applications/portfoliomanager/src/portfoliomanager/beta.py @@ -0,0 +1,76 @@ +import numpy as np +import polars as pl +import scipy.stats + +BETA_WINDOW_DAYS = 60 + +_TRADING_DAYS_PER_YEAR = 252 +_MINIMUM_RETURN_COUNT = 2 + + +def compute_market_betas( + historical_prices: pl.DataFrame, + spy_prices: pl.DataFrame, + window_days: int = BETA_WINDOW_DAYS, +) -> pl.DataFrame: + spy_close = ( + spy_prices.sort("timestamp").tail(window_days + 1)["close_price"].to_numpy() + ) + + if len(spy_close) < _MINIMUM_RETURN_COUNT + 1 or np.any(spy_close <= 0): + return pl.DataFrame(schema={"ticker": pl.String, "market_beta": pl.Float64}) + + spy_returns = np.diff(np.log(spy_close)) + tickers = historical_prices["ticker"].unique().to_list() + results = [] + + for ticker in tickers: + ticker_close = ( + historical_prices.filter(pl.col("ticker") == ticker) + .sort("timestamp") + .tail(window_days + 1)["close_price"] + .to_numpy() + ) + if len(ticker_close) < _MINIMUM_RETURN_COUNT or np.any(ticker_close <= 0): + continue + + ticker_returns = np.diff(np.log(ticker_close)) + count = min(len(spy_returns), len(ticker_returns)) + if count < _MINIMUM_RETURN_COUNT: + continue + + slope, _, _, _, _ = scipy.stats.linregress( + spy_returns[-count:], ticker_returns[-count:] + ) + results.append({"ticker": ticker, "market_beta": float(slope)}) + + if not results: + return pl.DataFrame(schema={"ticker": pl.String, "market_beta": pl.Float64}) + + return pl.DataFrame(results) + + +# Validates beta neutralization in tests; retained for future beta reporting. +def compute_portfolio_beta( + portfolio: pl.DataFrame, + market_betas: pl.DataFrame, +) -> float: + beta_lookup = dict( + zip( + market_betas["ticker"].to_list(), + market_betas["market_beta"].to_list(), + strict=False, + ) + ) + + total_gross = portfolio["dollar_amount"].sum() + if np.isclose(total_gross, 0.0): + return 0.0 + + net_beta = 0.0 + for row in portfolio.iter_rows(named=True): + beta = beta_lookup.get(row["ticker"], 0.0) + sign = 1.0 if row["side"] == "LONG" else -1.0 + net_beta += sign * (row["dollar_amount"] / total_gross) * beta + + return net_beta diff --git a/applications/portfoliomanager/src/portfoliomanager/data_client.py b/applications/portfoliomanager/src/portfoliomanager/data_client.py index e0fd4d21..e90505b4 100644 --- a/applications/portfoliomanager/src/portfoliomanager/data_client.py +++ b/applications/portfoliomanager/src/portfoliomanager/data_client.py @@ -53,3 +53,34 @@ def fetch_equity_details(datamanager_base_url: str) -> pl.DataFrame: dataframe = pl.read_csv(io.BytesIO(response.content)) return dataframe.select(["ticker", "sector"]) + + +def fetch_spy_prices( + datamanager_base_url: str, + reference_date: datetime, + lookback_days: int = 90, +) -> pl.DataFrame: + start_timestamp = reference_date - timedelta(days=lookback_days) + + try: + response = requests.get( + url=f"{datamanager_base_url}/equity-bars", + params={ + "tickers": "SPY", + "start_timestamp": start_timestamp.isoformat(), + "end_timestamp": reference_date.isoformat(), + }, + timeout=120, + ) + response.raise_for_status() + except requests.HTTPError as error: + message = f"Failed to fetch SPY prices from data manager: {error}" + raise PriceDataUnavailableError(message) from error + except requests.RequestException as error: + message = f"Network error fetching SPY prices from data manager: {error}" + raise PriceDataUnavailableError(message) from error + + dataframe = pl.read_parquet(io.BytesIO(response.content)) + return dataframe.select(["ticker", "timestamp", "close_price"]).drop_nulls( + subset=["close_price"] + ) diff --git a/applications/portfoliomanager/src/portfoliomanager/portfolio_schema.py b/applications/portfoliomanager/src/portfoliomanager/portfolio_schema.py index c8ead343..74378e38 100644 --- a/applications/portfoliomanager/src/portfoliomanager/portfolio_schema.py +++ b/applications/portfoliomanager/src/portfoliomanager/portfolio_schema.py @@ -168,7 +168,7 @@ def check_pair_tickers_different(data: PolarsData) -> bool: ], required=False, ), - "pair_id": pa.Column(dtype=str, nullable=True, required=False), + "pair_id": pa.Column(dtype=str), }, unique=["ticker"], coerce=True, diff --git a/applications/portfoliomanager/src/portfoliomanager/regime.py b/applications/portfoliomanager/src/portfoliomanager/regime.py new file mode 100644 index 00000000..423d5802 --- /dev/null +++ b/applications/portfoliomanager/src/portfoliomanager/regime.py @@ -0,0 +1,67 @@ +from typing import TypedDict + +import numpy as np +import polars as pl + + +class RegimeResult(TypedDict): + state: str + confidence: float + + +REGIME_WINDOW_DAYS = 60 +REGIME_VOLATILITY_THRESHOLD = 0.20 +REGIME_AUTOCORRELATION_THRESHOLD = 0.0 + +_TRADING_DAYS_PER_YEAR = 252 +_MINIMUM_RETURN_COUNT = 2 + + +def classify_regime( + spy_prices: pl.DataFrame, + window_days: int = REGIME_WINDOW_DAYS, +) -> RegimeResult: + spy_close = ( + spy_prices.sort("timestamp").tail(window_days + 1)["close_price"].to_numpy() + ) + + if np.any(spy_close <= 0): + return {"state": "trending", "confidence": 0.0} + + returns = np.diff(np.log(spy_close)) + + # Sparse data defaults to trending/0.0, halving exposure in the caller. + if len(returns) < _MINIMUM_RETURN_COUNT + 1: + return {"state": "trending", "confidence": 0.0} + + realized_volatility = float( + np.std(returns, ddof=1) * np.sqrt(_TRADING_DAYS_PER_YEAR) + ) + + autocorrelation = float(np.corrcoef(returns[:-1], returns[1:])[0, 1]) + + low_volatility = realized_volatility < REGIME_VOLATILITY_THRESHOLD + mean_reverting_signal = autocorrelation < REGIME_AUTOCORRELATION_THRESHOLD + + if low_volatility and mean_reverting_signal: + volatility_margin = ( + REGIME_VOLATILITY_THRESHOLD - realized_volatility + ) / REGIME_VOLATILITY_THRESHOLD + autocorrelation_margin = min(1.0, -autocorrelation) + confidence = float( + np.clip((volatility_margin + autocorrelation_margin) / 2.0, 0.0, 1.0) + ) + return {"state": "mean_reversion", "confidence": confidence} + + excess_volatility = max( + 0.0, + (realized_volatility - REGIME_VOLATILITY_THRESHOLD) + / REGIME_VOLATILITY_THRESHOLD, + ) + excess_autocorrelation = max( + 0.0, autocorrelation - REGIME_AUTOCORRELATION_THRESHOLD + ) + confidence = float( + np.clip((excess_volatility + excess_autocorrelation) / 2.0, 0.0, 1.0) + ) + return {"state": "trending", "confidence": confidence} diff --git a/applications/portfoliomanager/src/portfoliomanager/risk_management.py b/applications/portfoliomanager/src/portfoliomanager/risk_management.py index 8e7b6595..b26052d2 100644 --- a/applications/portfoliomanager/src/portfoliomanager/risk_management.py +++ b/applications/portfoliomanager/src/portfoliomanager/risk_management.py @@ -1,6 +1,8 @@ from datetime import datetime +import numpy as np import polars as pl +import scipy.optimize import structlog from .enums import PositionAction, PositionSide @@ -9,12 +11,62 @@ logger = structlog.get_logger() REQUIRED_PAIRS = 10 +Z_SCORE_HOLD_THRESHOLD = 0.5 +Z_SCORE_STOP_LOSS = 4.0 +BETA_WEIGHT_LOWER_BOUND = 0.5 +BETA_WEIGHT_UPPER_BOUND = 2.0 + + +def _apply_beta_neutral_weights( + pairs: pl.DataFrame, + market_betas: pl.DataFrame, + volatility_parity_weights: np.ndarray, +) -> np.ndarray: + beta_lookup = { + row["ticker"]: row["market_beta"] for row in market_betas.iter_rows(named=True) + } + + pair_net_betas = np.array( + [ + beta_lookup.get(row["long_ticker"], 0.0) + - beta_lookup.get(row["short_ticker"], 0.0) + for row in pairs.iter_rows(named=True) + ] + ) + + def objective(weights: np.ndarray) -> float: + total = float(np.sum(weights)) + net_beta = float(np.dot(weights, pair_net_betas) / total) + return net_beta**2 + + bounds = [ + (BETA_WEIGHT_LOWER_BOUND * weight, BETA_WEIGHT_UPPER_BOUND * weight) + for weight in volatility_parity_weights + ] + target_total = float(np.sum(volatility_parity_weights)) + constraints = [{"type": "eq", "fun": lambda w: float(np.sum(w)) - target_total}] + + result = scipy.optimize.minimize( + objective, + x0=volatility_parity_weights.copy(), + method="SLSQP", + bounds=bounds, + constraints=constraints, + ) + + if result.success: + return np.array(result.x) + + logger.warning("Beta-neutral optimizer did not converge, using vol-parity weights") + return volatility_parity_weights def size_pairs_with_volatility_parity( candidate_pairs: pl.DataFrame, maximum_capital: float, current_timestamp: datetime, + market_betas: pl.DataFrame, + exposure_scale: float = 1.0, ) -> pl.DataFrame: if candidate_pairs.height < REQUIRED_PAIRS: message = ( @@ -42,17 +94,26 @@ def size_pairs_with_volatility_parity( ) total_weight = pairs["inverse_volatility_weight"].sum() - pairs = pairs.with_columns( - ( - (pl.col("inverse_volatility_weight") / total_weight) - * (maximum_capital / 2.0) - ).alias("dollar_amount") + volatility_parity_weights = ( + pairs["inverse_volatility_weight"] / total_weight + ).to_numpy() + + adjusted_weights = _apply_beta_neutral_weights( + pairs, market_betas, volatility_parity_weights ) + if np.isclose(adjusted_weights.sum(), 0.0): + adjusted_weights = volatility_parity_weights / volatility_parity_weights.sum() + else: + adjusted_weights = adjusted_weights / adjusted_weights.sum() + + dollar_amounts = adjusted_weights * (maximum_capital / 2.0) * exposure_scale + pairs = pairs.with_columns(pl.Series("dollar_amount", dollar_amounts)) logger.info( "Sized pairs with volatility parity", pair_count=pairs.height, total_capital=maximum_capital, + exposure_scale=exposure_scale, ) timestamp_val = float(current_timestamp.timestamp()) diff --git a/applications/portfoliomanager/src/portfoliomanager/server.py b/applications/portfoliomanager/src/portfoliomanager/server.py index 3e5d6dc1..e0379836 100644 --- a/applications/portfoliomanager/src/portfoliomanager/server.py +++ b/applications/portfoliomanager/src/portfoliomanager/server.py @@ -1,8 +1,11 @@ import logging import os +from collections.abc import AsyncGenerator +from contextlib import asynccontextmanager from datetime import UTC, datetime import httpx +import numpy as np import polars as pl import requests import sentry_sdk @@ -39,8 +42,13 @@ cache_logger_on_first_use=True, ) +from .beta import compute_market_betas # noqa: E402 from .consolidation import consolidate_predictions # noqa: E402 -from .data_client import fetch_equity_details, fetch_historical_prices # noqa: E402 +from .data_client import ( # noqa: E402 + fetch_equity_details, + fetch_historical_prices, + fetch_spy_prices, +) from .enums import PositionSide, TradeSide # noqa: E402 from .exceptions import ( # noqa: E402 AssetNotShortableError, @@ -48,16 +56,23 @@ InsufficientPairsError, ) from .portfolio_schema import pairs_schema, portfolio_schema # noqa: E402 -from .risk_management import size_pairs_with_volatility_parity # noqa: E402 -from .statistical_arbitrage import select_pairs # noqa: E402 +from .regime import classify_regime # noqa: E402 +from .risk_management import ( # noqa: E402 + Z_SCORE_HOLD_THRESHOLD, + Z_SCORE_STOP_LOSS, + size_pairs_with_volatility_parity, +) +from .statistical_arbitrage import ( # noqa: E402 + CORRELATION_WINDOW_DAYS, + compute_spread_zscore, + select_pairs, +) logger = structlog.get_logger() -application: FastAPI = FastAPI() - DATAMANAGER_BASE_URL = os.getenv("FUND_DATAMANAGER_BASE_URL", "http://datamanager:8080") -HTTP_NOT_FOUND = 404 HTTP_BAD_REQUEST = 400 +_MINIMUM_PAIR_PRICE_ROWS = 30 EQUITYPRICEMODEL_BASE_URL = os.getenv( "FUND_EQUITYPRICEMODEL_BASE_URL", "http://equitypricemodel:8080", @@ -66,24 +81,40 @@ ALPACA_API_KEY_ID = os.getenv("ALPACA_API_KEY_ID", "") ALPACA_API_SECRET = os.getenv("ALPACA_API_SECRET", "") -if not ALPACA_API_KEY_ID or not ALPACA_API_SECRET: - logger.error( - "Missing Alpaca credentials", - api_key_id_set=bool(ALPACA_API_KEY_ID), - api_secret_set=bool(ALPACA_API_SECRET), + +@asynccontextmanager +async def _lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: + if not ALPACA_API_KEY_ID or not ALPACA_API_SECRET: + logger.error( + "Missing Alpaca credentials", + api_key_id_set=bool(ALPACA_API_KEY_ID), + api_secret_set=bool(ALPACA_API_SECRET), + ) + message = ( + "ALPACA_API_KEY_ID and ALPACA_API_SECRET environment variables are required" + ) + raise ValueError(message) + _app.state.alpaca_client = AlpacaClient( + api_key=ALPACA_API_KEY_ID, + api_secret=ALPACA_API_SECRET, + is_paper=os.getenv("ALPACA_IS_PAPER", "true").lower() == "true", ) - message = ( - "ALPACA_API_KEY_ID and ALPACA_API_SECRET environment variables are required" + logger.info( + "Portfolio manager initialized", is_paper=_app.state.alpaca_client.is_paper ) - raise ValueError(message) + yield -alpaca_client = AlpacaClient( - api_key=ALPACA_API_KEY_ID, - api_secret=ALPACA_API_SECRET, - is_paper=os.getenv("ALPACA_IS_PAPER", "true").lower() == "true", -) -logger.info("Portfolio manager initialized", is_paper=alpaca_client.is_paper) +application: FastAPI = FastAPI(lifespan=_lifespan) + +_PRIOR_PORTFOLIO_SCHEMA: dict[str, type] = { + "ticker": pl.String, + "timestamp": pl.Float64, + "side": pl.String, + "dollar_amount": pl.Float64, + "action": pl.String, + "pair_id": pl.String, +} @application.get("/health") @@ -93,6 +124,7 @@ def health_check() -> Response: @application.post("/portfolio") async def create_portfolio() -> Response: # noqa: PLR0911, PLR0912, PLR0915, C901 + alpaca_client: AlpacaClient = application.state.alpaca_client current_timestamp = datetime.now(tz=UTC) logger.info("Starting portfolio rebalance", timestamp=current_timestamp.isoformat()) @@ -112,10 +144,12 @@ async def create_portfolio() -> Response: # noqa: PLR0911, PLR0912, PLR0915, C9 DATAMANAGER_BASE_URL, current_timestamp ) equity_details = fetch_equity_details(DATAMANAGER_BASE_URL) + spy_prices = fetch_spy_prices(DATAMANAGER_BASE_URL, current_timestamp) logger.info( "Retrieved historical data", prices_count=historical_prices.height, equity_details_count=equity_details.height, + spy_prices_count=spy_prices.height, ) except Exception as e: logger.exception("Failed to retrieve historical data", error=str(e)) @@ -140,12 +174,22 @@ async def create_portfolio() -> Response: # noqa: PLR0911, PLR0912, PLR0915, C9 return Response(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) try: - prior_portfolio_tickers = get_prior_portfolio_tickers() + prior_portfolio = get_prior_portfolio() + prior_portfolio_tickers = prior_portfolio["ticker"].unique().to_list() + logger.info("Retrieved prior portfolio", count=len(prior_portfolio_tickers)) + except Exception as e: + logger.exception("Failed to retrieve prior portfolio", error=str(e)) + return Response(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) + + try: + held_tickers = evaluate_prior_pairs(prior_portfolio, historical_prices) logger.info( - "Retrieved prior portfolio tickers", count=len(prior_portfolio_tickers) + "Evaluated prior pairs", + held_count=len(held_tickers), + closing_count=len(prior_portfolio_tickers) - len(held_tickers), ) except Exception as e: - logger.exception("Failed to retrieve prior portfolio tickers", error=str(e)) + logger.exception("Failed to evaluate prior pairs", error=str(e)) return Response(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) consolidated_signals = consolidated_signals.filter( @@ -181,14 +225,28 @@ async def create_portfolio() -> Response: # noqa: PLR0911, PLR0912, PLR0915, C9 logger.exception("Candidate pairs failed schema validation", error=str(e)) return Response(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) - regime = get_regime_state() - logger.info("Current market regime", state=regime) + try: + market_betas = compute_market_betas(historical_prices, spy_prices) + regime = classify_regime(spy_prices) + # Binary scale is intentional; confidence reserved for future graduated scaling. + exposure_scale = 1.0 if regime["state"] == "mean_reversion" else 0.5 + logger.info( + "Computed market betas and regime", + regime_state=regime["state"], + regime_confidence=regime["confidence"], + exposure_scale=exposure_scale, + ) + except Exception as e: + logger.exception("Failed to compute market betas or regime", error=str(e)) + return Response(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) try: optimal_portfolio = get_optimal_portfolio( candidate_pairs=candidate_pairs, maximum_capital=float(account.cash_amount), current_timestamp=current_timestamp, + market_betas=market_betas, + exposure_scale=exposure_scale, ) logger.info("Created optimal portfolio", count=len(optimal_portfolio)) except InsufficientPairsError as e: @@ -198,8 +256,9 @@ async def create_portfolio() -> Response: # noqa: PLR0911, PLR0912, PLR0915, C9 candidate_pairs_count=candidate_pairs.height, ) return Response( - status_code=status.HTTP_204_NO_CONTENT, - headers={"X-Portfolio-Status": "insufficient-pairs"}, + status_code=status.HTTP_200_OK, + content="Insufficient pairs to create portfolio, no trades will be made", + media_type="text/plain", ) except Exception as e: logger.exception("Failed to create optimal portfolio", error=str(e)) @@ -208,6 +267,7 @@ async def create_portfolio() -> Response: # noqa: PLR0911, PLR0912, PLR0915, C9 try: open_positions, close_positions = get_positions( prior_portfolio_tickers=prior_portfolio_tickers, + held_tickers=held_tickers, optimal_portfolio=optimal_portfolio, ) logger.info( @@ -250,7 +310,7 @@ async def create_portfolio() -> Response: # noqa: PLR0911, PLR0912, PLR0915, C9 "reason": "position_not_found", } ) - except Exception as e: # noqa: PERF203 + except Exception as e: logger.exception( "Failed to close position", ticker=close_position["ticker"], @@ -413,63 +473,170 @@ async def get_raw_predictions() -> pl.DataFrame: return pl.DataFrame(response.json()["data"]) -def get_regime_state() -> str: - """TODO: replace with regime.classify_regime in Phase 4.""" - return "mean_reversion" - - -def get_prior_portfolio_tickers() -> list[str]: # noqa: PLR0911 +def get_prior_portfolio() -> pl.DataFrame: + empty = pl.DataFrame(schema=_PRIOR_PORTFOLIO_SCHEMA) try: - prior_portfolio_response = requests.get( + response = requests.get( url=f"{DATAMANAGER_BASE_URL}/portfolios", timeout=60, ) - # If no prior portfolio, return empty list - if prior_portfolio_response.status_code == HTTP_NOT_FOUND: - logger.info("No prior portfolio found, starting fresh") - return [] - - if prior_portfolio_response.status_code >= HTTP_BAD_REQUEST: + if response.status_code >= HTTP_BAD_REQUEST: logger.warning( "Failed to fetch prior portfolio from data manager", - status_code=prior_portfolio_response.status_code, + status_code=response.status_code, ) - return [] + return empty - response_text = prior_portfolio_response.text.strip() + response_text = response.text.strip() if not response_text or response_text == "[]": logger.info("Prior portfolio is empty") - return [] + return empty - prior_portfolio_data = prior_portfolio_response.json() + prior_portfolio_data = response.json() if not prior_portfolio_data: - return [] + return empty - prior_portfolio = pl.DataFrame(prior_portfolio_data) + prior_portfolio = pl.DataFrame( + prior_portfolio_data, schema=_PRIOR_PORTFOLIO_SCHEMA + ) if prior_portfolio.is_empty(): - return [] + return empty - tickers = prior_portfolio["ticker"].unique().to_list() - logger.info("Retrieved prior portfolio tickers", count=len(tickers)) - return tickers # noqa: TRY300 + logger.info("Retrieved prior portfolio", count=prior_portfolio.height) + return prior_portfolio # noqa: TRY300 - except (ValueError, requests.exceptions.JSONDecodeError) as e: + except ( + ValueError, + requests.exceptions.JSONDecodeError, + pl.exceptions.PolarsError, + ) as e: logger.exception("Failed to parse prior portfolio JSON", error=str(e)) - return [] + return empty + + +def evaluate_prior_pairs( + prior_portfolio: pl.DataFrame, + historical_prices: pl.DataFrame, +) -> set[str]: + held_tickers: set[str] = set() + + if prior_portfolio.is_empty(): + return held_tickers + + pair_ids = prior_portfolio["pair_id"].unique(maintain_order=False).sort().to_list() + + for pair_id_value in pair_ids: + pair_rows = prior_portfolio.filter(pl.col("pair_id") == pair_id_value) + + long_rows = pair_rows.filter(pl.col("side") == PositionSide.LONG.value) + short_rows = pair_rows.filter(pl.col("side") == PositionSide.SHORT.value) + + if long_rows.is_empty() or short_rows.is_empty(): + logger.warning( + "Malformed prior pair, closing normally", pair_id=pair_id_value + ) + continue + + long_ticker = long_rows["ticker"][0] + short_ticker = short_rows["ticker"][0] + + pair_price_matrix = ( + historical_prices.filter( + pl.col("ticker").is_in([long_ticker, short_ticker]) + ) + .pivot( + on="ticker", + index="timestamp", + values="close_price", + aggregate_function="last", + ) + .sort("timestamp") + .drop_nulls() + ) + + if ( + long_ticker not in pair_price_matrix.columns + or short_ticker not in pair_price_matrix.columns + ): + logger.warning( + "Missing price data for prior pair, closing normally", + pair_id=pair_id_value, + ) + continue + + pair_price_matrix = pair_price_matrix.tail(CORRELATION_WINDOW_DAYS) + + if pair_price_matrix.height < _MINIMUM_PAIR_PRICE_ROWS: + logger.warning( + "Insufficient price history for prior pair, closing normally", + pair_id=pair_id_value, + ) + continue + + long_prices = pair_price_matrix[long_ticker].to_numpy() + short_prices = pair_price_matrix[short_ticker].to_numpy() + + if np.any(long_prices <= 0) or np.any(short_prices <= 0): + logger.warning( + "Non-positive prices in prior pair, closing normally", + pair_id=pair_id_value, + ) + continue + + log_prices_long = np.log(long_prices) + log_prices_short = np.log(short_prices) + + current_z, _ = compute_spread_zscore(log_prices_long, log_prices_short) + + if np.isnan(current_z): + logger.warning( + "NaN z-score for prior pair, closing normally", + pair_id=pair_id_value, + ) + continue + + abs_z = abs(current_z) + + if Z_SCORE_HOLD_THRESHOLD <= abs_z < Z_SCORE_STOP_LOSS: + held_tickers.add(long_ticker) + held_tickers.add(short_ticker) + logger.info( + "Holding prior pair, spread still mean-reverting", + pair_id=pair_id_value, + z_score=current_z, + ) + elif abs_z < Z_SCORE_HOLD_THRESHOLD: + logger.info( + "Closing prior pair to realize profit, spread converged", + pair_id=pair_id_value, + z_score=current_z, + ) + else: + logger.info( + "Closing prior pair on stop-loss, spread diverged", + pair_id=pair_id_value, + z_score=current_z, + ) + + return held_tickers def get_optimal_portfolio( candidate_pairs: pl.DataFrame, maximum_capital: float, current_timestamp: datetime, + market_betas: pl.DataFrame, + exposure_scale: float, ) -> pl.DataFrame: optimal_portfolio = size_pairs_with_volatility_parity( candidate_pairs=candidate_pairs, maximum_capital=maximum_capital, current_timestamp=current_timestamp, + market_betas=market_betas, + exposure_scale=exposure_scale, ) optimal_portfolio = portfolio_schema.validate(optimal_portfolio) @@ -490,9 +657,14 @@ def get_optimal_portfolio( def get_positions( prior_portfolio_tickers: list[str], + held_tickers: set[str], optimal_portfolio: pl.DataFrame, ) -> tuple[list[dict], list[dict]]: - close_positions = [{"ticker": ticker} for ticker in prior_portfolio_tickers] + close_positions = [ + {"ticker": ticker} + for ticker in prior_portfolio_tickers + if ticker not in held_tickers + ] open_positions = [ { diff --git a/applications/portfoliomanager/tests/test_beta.py b/applications/portfoliomanager/tests/test_beta.py new file mode 100644 index 00000000..ff9d3a56 --- /dev/null +++ b/applications/portfoliomanager/tests/test_beta.py @@ -0,0 +1,163 @@ +import numpy as np +import polars as pl +import pytest +from portfoliomanager.beta import compute_market_betas, compute_portfolio_beta + + +def _make_spy_prices( + log_returns: list[float], base_price: float = 100.0 +) -> pl.DataFrame: + prices = [base_price] + for log_return in log_returns: + prices.append(prices[-1] * np.exp(log_return)) + return pl.DataFrame( + { + "ticker": ["SPY"] * len(prices), + "timestamp": list(range(len(prices))), + "close_price": prices, + } + ) + + +def _make_historical_prices( + ticker_log_returns: dict[str, list[float]], + base_price: float = 100.0, +) -> pl.DataFrame: + rows = [] + for ticker, log_returns in ticker_log_returns.items(): + prices = [base_price] + for log_return in log_returns: + prices.append(prices[-1] * np.exp(log_return)) + for i, price in enumerate(prices): + rows.append({"ticker": ticker, "timestamp": i, "close_price": price}) + return pl.DataFrame(rows) + + +def test_compute_market_betas_returns_expected_columns() -> None: + spy_log_returns = [0.01 * ((-1) ** i) for i in range(61)] + spy_prices = _make_spy_prices(spy_log_returns) + historical_prices = _make_historical_prices({"AAPL": spy_log_returns}) + + result = compute_market_betas(historical_prices, spy_prices) + + assert result.columns == ["ticker", "market_beta"] + assert result.height == 1 + + +def test_compute_market_betas_returns_correct_beta_for_known_data() -> None: + # Deterministic log returns with clear relationships + spy_log_returns = [0.01 * np.sin(2 * np.pi * i / 10) for i in range(61)] + ticker_a_log_returns = spy_log_returns # beta ≈ 1.0 + ticker_b_log_returns = [2.0 * r for r in spy_log_returns] # beta ≈ 2.0 + + spy_prices = _make_spy_prices(spy_log_returns) + historical_prices = _make_historical_prices( + {"AAPL": ticker_a_log_returns, "MSFT": ticker_b_log_returns} + ) + + result = compute_market_betas(historical_prices, spy_prices) + betas = dict( + zip(result["ticker"].to_list(), result["market_beta"].to_list(), strict=False) + ) + + assert betas["AAPL"] == pytest.approx(1.0, abs=0.01) + assert betas["MSFT"] == pytest.approx(2.0, abs=0.01) + + +def test_compute_market_betas_drops_tickers_with_insufficient_data() -> None: + spy_log_returns = [0.01 * ((-1) ** i) for i in range(61)] + spy_prices = _make_spy_prices(spy_log_returns) + + # AAPL has enough data; MSFT has only 1 price (0 returns) + historical_prices = pl.DataFrame( + [ + *[ + {"ticker": "AAPL", "timestamp": i, "close_price": 100.0 + i} + for i in range(62) + ], + {"ticker": "MSFT", "timestamp": 0, "close_price": 200.0}, + ] + ) + + result = compute_market_betas(historical_prices, spy_prices) + tickers = result["ticker"].to_list() + + assert "AAPL" in tickers + assert "MSFT" not in tickers + + +def test_compute_market_betas_returns_empty_dataframe_for_insufficient_spy_data() -> ( + None +): + spy_prices = pl.DataFrame( + {"ticker": ["SPY"], "timestamp": [0], "close_price": [100.0]} + ) + historical_prices = _make_historical_prices({"AAPL": [0.01, 0.02, -0.01, 0.03]}) + + result = compute_market_betas(historical_prices, spy_prices) + + assert result.is_empty() + assert result.columns == ["ticker", "market_beta"] + + +def test_compute_portfolio_beta_returns_zero_for_balanced_portfolio() -> None: + portfolio = pl.DataFrame( + { + "ticker": ["AAPL", "MSFT"], + "side": ["LONG", "SHORT"], + "dollar_amount": [1000.0, 1000.0], + } + ) + market_betas = pl.DataFrame({"ticker": ["AAPL", "MSFT"], "market_beta": [1.5, 1.5]}) + + result = compute_portfolio_beta(portfolio, market_betas) + + assert result == pytest.approx(0.0, abs=1e-10) + + +def test_compute_portfolio_beta_returns_positive_for_long_high_beta() -> None: + portfolio = pl.DataFrame( + { + "ticker": ["AAPL", "MSFT"], + "side": ["LONG", "SHORT"], + "dollar_amount": [1000.0, 1000.0], + } + ) + market_betas = pl.DataFrame({"ticker": ["AAPL", "MSFT"], "market_beta": [2.0, 0.5]}) + + result = compute_portfolio_beta(portfolio, market_betas) + + # net = +1*(1000/2000)*2.0 + -1*(1000/2000)*0.5 = 0.5*(2.0-0.5) = 0.75 + assert result == pytest.approx(0.75, abs=1e-10) + + +def test_compute_portfolio_beta_returns_zero_for_zero_gross_exposure() -> None: + portfolio = pl.DataFrame( + { + "ticker": ["AAPL", "MSFT"], + "side": ["LONG", "SHORT"], + "dollar_amount": [0.0, 0.0], + } + ) + market_betas = pl.DataFrame({"ticker": ["AAPL", "MSFT"], "market_beta": [1.5, 1.5]}) + + result = compute_portfolio_beta(portfolio, market_betas) + + assert result == 0.0 + + +def test_compute_portfolio_beta_uses_zero_for_missing_tickers() -> None: + portfolio = pl.DataFrame( + { + "ticker": ["AAPL", "MSFT"], + "side": ["LONG", "SHORT"], + "dollar_amount": [1000.0, 1000.0], + } + ) + market_betas = pl.DataFrame({"ticker": ["AAPL"], "market_beta": [2.0]}) + + result = compute_portfolio_beta(portfolio, market_betas) + + # MSFT has no beta → treated as 0.0 + # net = +1*(1000/2000)*2.0 + -1*(1000/2000)*0.0 = 1.0 + assert result == pytest.approx(1.0, abs=1e-10) diff --git a/applications/portfoliomanager/tests/test_data_client.py b/applications/portfoliomanager/tests/test_data_client.py index c1def6ec..19ce6430 100644 --- a/applications/portfoliomanager/tests/test_data_client.py +++ b/applications/portfoliomanager/tests/test_data_client.py @@ -5,7 +5,11 @@ import polars as pl import pytest import requests -from portfoliomanager.data_client import fetch_equity_details, fetch_historical_prices +from portfoliomanager.data_client import ( + fetch_equity_details, + fetch_historical_prices, + fetch_spy_prices, +) from portfoliomanager.exceptions import PriceDataUnavailableError @@ -148,3 +152,87 @@ def test_fetch_equity_details_raises_on_network_error() -> None: pytest.raises(PriceDataUnavailableError), ): fetch_equity_details("http://localhost") + + +def test_fetch_spy_prices_returns_expected_columns() -> None: + raw = pl.DataFrame( + { + "ticker": ["SPY", "SPY"], + "timestamp": ["2024-01-01", "2024-01-02"], + "close_price": [450.0, 452.0], + "extra_column": [1, 2], + } + ) + mock_response = MagicMock() + mock_response.content = _make_parquet_bytes(raw) + mock_response.raise_for_status.return_value = None + + with patch("portfoliomanager.data_client.requests.get", return_value=mock_response): + result = fetch_spy_prices("http://localhost", datetime(2024, 1, 3, tzinfo=UTC)) + + assert result.columns == ["ticker", "timestamp", "close_price"] + assert result.height == raw.height + + +def test_fetch_spy_prices_drops_null_close_prices() -> None: + raw = pl.DataFrame( + { + "ticker": ["SPY", "SPY"], + "timestamp": ["2024-01-01", "2024-01-02"], + "close_price": [450.0, None], + } + ) + mock_response = MagicMock() + mock_response.content = _make_parquet_bytes(raw) + mock_response.raise_for_status.return_value = None + + with patch("portfoliomanager.data_client.requests.get", return_value=mock_response): + result = fetch_spy_prices("http://localhost", datetime(2024, 1, 3, tzinfo=UTC)) + + assert result.height == 1 + + +def test_fetch_spy_prices_sends_correct_query_params() -> None: + reference_date = datetime(2024, 4, 1, tzinfo=UTC) + raw = pl.DataFrame({"ticker": [], "timestamp": [], "close_price": []}) + mock_response = MagicMock() + mock_response.content = _make_parquet_bytes(raw) + mock_response.raise_for_status.return_value = None + + with patch( + "portfoliomanager.data_client.requests.get", return_value=mock_response + ) as mock_get: + fetch_spy_prices("http://datamanager:8080", reference_date, lookback_days=90) + + expected_start = (reference_date - timedelta(days=90)).isoformat() + mock_get.assert_called_once_with( + url="http://datamanager:8080/equity-bars", + params={ + "tickers": "SPY", + "start_timestamp": expected_start, + "end_timestamp": reference_date.isoformat(), + }, + timeout=120, + ) + + +def test_fetch_spy_prices_raises_on_http_error() -> None: + mock_response = MagicMock() + mock_response.raise_for_status.side_effect = requests.HTTPError("500 Server Error") + + with ( + patch("portfoliomanager.data_client.requests.get", return_value=mock_response), + pytest.raises(PriceDataUnavailableError), + ): + fetch_spy_prices("http://localhost", datetime(2024, 1, 1, tzinfo=UTC)) + + +def test_fetch_spy_prices_raises_on_network_error() -> None: + with ( + patch( + "portfoliomanager.data_client.requests.get", + side_effect=requests.RequestException("Connection refused"), + ), + pytest.raises(PriceDataUnavailableError), + ): + fetch_spy_prices("http://localhost", datetime(2024, 1, 1, tzinfo=UTC)) diff --git a/applications/portfoliomanager/tests/test_portfolio_schema.py b/applications/portfoliomanager/tests/test_portfolio_schema.py index 7051b327..e70f4e0a 100644 --- a/applications/portfoliomanager/tests/test_portfolio_schema.py +++ b/applications/portfoliomanager/tests/test_portfolio_schema.py @@ -24,40 +24,69 @@ def _as_polars_data(df: pl.DataFrame) -> PolarsData: return cast("PolarsData", _MockPolarsData(df)) +_TICKERS = [ + "AAPL", + "GOOGL", + "MSFT", + "AMZN", + "TSLA", + "NVDA", + "META", + "NFLX", + "BABA", + "CRM", + "AMD", + "INTC", + "ORCL", + "ADBE", + "PYPL", + "SHOP", + "SPOT", + "ROKU", + "ZM", + "DOCU", +] + +_PAIR_IDS = [ + # LONG legs (indices 0-9) + "AAPL-AMD", + "GOOGL-INTC", + "MSFT-ORCL", + "AMZN-ADBE", + "TSLA-PYPL", + "NVDA-SHOP", + "META-SPOT", + "NFLX-ROKU", + "BABA-ZM", + "CRM-DOCU", + # SHORT legs (indices 10-19) + "AAPL-AMD", + "GOOGL-INTC", + "MSFT-ORCL", + "AMZN-ADBE", + "TSLA-PYPL", + "NVDA-SHOP", + "META-SPOT", + "NFLX-ROKU", + "BABA-ZM", + "CRM-DOCU", +] + + def test_portfolio_schema_valid_data() -> None: valid_data = pl.DataFrame( { - "ticker": [ - "AAPL", - "GOOGL", - "MSFT", - "AMZN", - "TSLA", - "NVDA", - "META", - "NFLX", - "BABA", - "CRM", - "AMD", - "INTC", - "ORCL", - "ADBE", - "PYPL", - "SHOP", - "SPOT", - "ROKU", - "ZM", - "DOCU", - ], + "ticker": _TICKERS, "timestamp": [datetime(2025, 1, 1, 0, 0, 0, 0, tzinfo=UTC).timestamp()] * 20, "side": (["LONG"] * 10) + (["SHORT"] * 10), - "dollar_amount": [1000.0] * 20, # Equal amounts for balanced portfolio + "dollar_amount": [1000.0] * 20, + "pair_id": _PAIR_IDS, } ) validated_df = portfolio_schema.validate(valid_data) - assert validated_df.shape == (20, 4) + assert validated_df.shape == (20, 5) def test_portfolio_schema_ticker_lowercase_fails() -> None: @@ -67,6 +96,7 @@ def test_portfolio_schema_ticker_lowercase_fails() -> None: "timestamp": [datetime(2025, 1, 1, 0, 0, 0, 0, tzinfo=UTC).timestamp()], "side": ["LONG"], "dollar_amount": [1000.0], + "pair_id": ["AAPL-MSFT"], } ) @@ -81,6 +111,7 @@ def test_portfolio_schema_invalid_side_fails() -> None: "timestamp": [datetime(2025, 1, 1, 0, 0, 0, 0, tzinfo=UTC).timestamp()], "side": ["BUY"], # Invalid side value "dollar_amount": [1000.0], + "pair_id": ["AAPL-MSFT"], } ) @@ -95,6 +126,7 @@ def test_portfolio_schema_negative_dollar_amount_fails() -> None: "timestamp": [datetime(2025, 1, 1, 0, 0, 0, 0, tzinfo=UTC).timestamp()], "side": ["LONG"], "dollar_amount": [-1000.0], # Negative amount should fail + "pair_id": ["AAPL-MSFT"], } ) @@ -105,32 +137,12 @@ def test_portfolio_schema_negative_dollar_amount_fails() -> None: def test_portfolio_schema_unbalanced_sides_fails() -> None: data = pl.DataFrame( { - "ticker": [ - "AAPL", - "GOOGL", - "MSFT", - "AMZN", - "TSLA", - "NVDA", - "META", - "NFLX", - "BABA", - "CRM", - "AMD", - "INTC", - "ORCL", - "ADBE", - "PYPL", - "SHOP", - "SPOT", - "ROKU", - "ZM", - "DOCU", - ], + "ticker": _TICKERS, "timestamp": [datetime(2025, 1, 1, 0, 0, 0, 0, tzinfo=UTC).timestamp()] * 20, "side": ["LONG"] * 15 + ["SHORT"] * 5, # Unbalanced: 15 LONG, 5 SHORT "dollar_amount": [1000.0] * 20, + "pair_id": _PAIR_IDS, } ) @@ -141,33 +153,13 @@ def test_portfolio_schema_unbalanced_sides_fails() -> None: def test_portfolio_schema_imbalanced_dollar_amounts_fails() -> None: data = pl.DataFrame( { - "ticker": [ - "AAPL", - "GOOGL", - "MSFT", - "AMZN", - "TSLA", - "NVDA", - "META", - "NFLX", - "BABA", - "CRM", - "AMD", - "INTC", - "ORCL", - "ADBE", - "PYPL", - "SHOP", - "SPOT", - "ROKU", - "ZM", - "DOCU", - ], + "ticker": _TICKERS, "timestamp": [datetime(2025, 1, 1, 0, 0, 0, 0, tzinfo=UTC).timestamp()] * 20, "side": (["LONG"] * 10) + (["SHORT"] * 10), "dollar_amount": ([2000.0] * 10) + ([500.0] * 10), # Very imbalanced amounts + "pair_id": _PAIR_IDS, } ) @@ -182,6 +174,7 @@ def test_portfolio_schema_duplicate_tickers_fails() -> None: "timestamp": [datetime(2025, 1, 1, 0, 0, 0, 0, tzinfo=UTC).timestamp()] * 2, "side": ["LONG", "SHORT"], "dollar_amount": [1000.0, 1000.0], + "pair_id": ["AAPL-MSFT", "AAPL-MSFT"], } ) @@ -196,6 +189,7 @@ def test_portfolio_schema_zero_timestamp_fails() -> None: "timestamp": [0.0], # Zero timestamp should fail "side": ["LONG"], "dollar_amount": [1000.0], + "pair_id": ["AAPL-MSFT"], } ) @@ -257,6 +251,20 @@ def test_pairs_schema_validates_valid_pairs() -> None: assert validated.shape[0] == len(data) +def test_portfolio_schema_missing_pair_id_fails() -> None: + data = pl.DataFrame( + { + "ticker": ["AAPL"], + "timestamp": [datetime(2025, 1, 1, 0, 0, 0, 0, tzinfo=UTC).timestamp()], + "side": ["LONG"], + "dollar_amount": [1000.0], + } + ) + + with pytest.raises((SchemaError, pl.exceptions.ColumnNotFoundError)): + portfolio_schema.validate(data) + + def test_check_pair_tickers_different_same_ticker_raises() -> None: data = pl.DataFrame( { diff --git a/applications/portfoliomanager/tests/test_portfolio_server.py b/applications/portfoliomanager/tests/test_portfolio_server.py new file mode 100644 index 00000000..d93abdb2 --- /dev/null +++ b/applications/portfoliomanager/tests/test_portfolio_server.py @@ -0,0 +1,367 @@ +import json +from unittest.mock import MagicMock, patch + +import polars as pl +import pytest +from portfoliomanager.server import ( + _PRIOR_PORTFOLIO_SCHEMA, + evaluate_prior_pairs, + get_positions, + get_prior_portfolio, +) + + +def _make_prior_portfolio(pairs: list[dict]) -> pl.DataFrame: + rows = [] + for pair in pairs: + rows.append( + { + "ticker": pair["long_ticker"], + "timestamp": 1735689600.0, + "side": "LONG", + "dollar_amount": 1000.0, + "action": "OPEN_POSITION", + "pair_id": pair["pair_id"], + } + ) + rows.append( + { + "ticker": pair["short_ticker"], + "timestamp": 1735689600.0, + "side": "SHORT", + "dollar_amount": 1000.0, + "action": "OPEN_POSITION", + "pair_id": pair["pair_id"], + } + ) + return pl.DataFrame(rows, schema=_PRIOR_PORTFOLIO_SCHEMA) + + +def _make_historical_prices(tickers: list[str], n_rows: int = 65) -> pl.DataFrame: + rows = [] + for ticker in tickers: + rows.extend( + { + "ticker": ticker, + "timestamp": float(i), + "close_price": 100.0 + (i * 0.1), + } + for i in range(n_rows) + ) + return pl.DataFrame(rows) + + +def _make_optimal_portfolio() -> pl.DataFrame: + return pl.DataFrame( + { + "ticker": ["NVDA", "AMD"], + "timestamp": [1735689600.0, 1735689600.0], + "side": ["LONG", "SHORT"], + "dollar_amount": [1000.0, 1000.0], + "action": ["OPEN_POSITION", "OPEN_POSITION"], + "pair_id": ["NVDA-AMD", "NVDA-AMD"], + } + ) + + +# --- evaluate_prior_pairs --- + + +def test_evaluate_prior_pairs_returns_empty_set_for_empty_portfolio() -> None: + empty_portfolio = pl.DataFrame(schema=_PRIOR_PORTFOLIO_SCHEMA) + historical_prices = _make_historical_prices(["AAPL", "MSFT"]) + result = evaluate_prior_pairs(empty_portfolio, historical_prices) + assert result == set() + + +def test_evaluate_prior_pairs_holds_pair_in_intermediate_zone() -> None: + prior = _make_prior_portfolio( + [{"pair_id": "AAPL-MSFT", "long_ticker": "AAPL", "short_ticker": "MSFT"}] + ) + historical_prices = _make_historical_prices(["AAPL", "MSFT"]) + with patch( + "portfoliomanager.server.compute_spread_zscore", return_value=(2.0, 1.0) + ): + result = evaluate_prior_pairs(prior, historical_prices) + assert "AAPL" in result + assert "MSFT" in result + + +def test_evaluate_prior_pairs_holds_pair_at_lower_bound_of_hold_zone() -> None: + prior = _make_prior_portfolio( + [{"pair_id": "AAPL-MSFT", "long_ticker": "AAPL", "short_ticker": "MSFT"}] + ) + historical_prices = _make_historical_prices(["AAPL", "MSFT"]) + with patch( + "portfoliomanager.server.compute_spread_zscore", return_value=(0.5, 1.0) + ): + result = evaluate_prior_pairs(prior, historical_prices) + assert "AAPL" in result + assert "MSFT" in result + + +def test_evaluate_prior_pairs_does_not_hold_converged_pair() -> None: + prior = _make_prior_portfolio( + [{"pair_id": "AAPL-MSFT", "long_ticker": "AAPL", "short_ticker": "MSFT"}] + ) + historical_prices = _make_historical_prices(["AAPL", "MSFT"]) + with patch( + "portfoliomanager.server.compute_spread_zscore", return_value=(0.2, 1.0) + ): + result = evaluate_prior_pairs(prior, historical_prices) + assert "AAPL" not in result + assert "MSFT" not in result + + +def test_evaluate_prior_pairs_does_not_hold_stop_loss_pair() -> None: + prior = _make_prior_portfolio( + [{"pair_id": "AAPL-MSFT", "long_ticker": "AAPL", "short_ticker": "MSFT"}] + ) + historical_prices = _make_historical_prices(["AAPL", "MSFT"]) + with patch( + "portfoliomanager.server.compute_spread_zscore", return_value=(5.0, 1.0) + ): + result = evaluate_prior_pairs(prior, historical_prices) + assert "AAPL" not in result + assert "MSFT" not in result + + +def test_evaluate_prior_pairs_does_not_hold_pair_at_stop_loss_boundary() -> None: + prior = _make_prior_portfolio( + [{"pair_id": "AAPL-MSFT", "long_ticker": "AAPL", "short_ticker": "MSFT"}] + ) + historical_prices = _make_historical_prices(["AAPL", "MSFT"]) + with patch( + "portfoliomanager.server.compute_spread_zscore", return_value=(4.0, 1.0) + ): + result = evaluate_prior_pairs(prior, historical_prices) + assert "AAPL" not in result + assert "MSFT" not in result + + +def test_evaluate_prior_pairs_handles_negative_z_score_in_hold_zone() -> None: + prior = _make_prior_portfolio( + [{"pair_id": "AAPL-MSFT", "long_ticker": "AAPL", "short_ticker": "MSFT"}] + ) + historical_prices = _make_historical_prices(["AAPL", "MSFT"]) + with patch( + "portfoliomanager.server.compute_spread_zscore", return_value=(-2.0, 1.0) + ): + result = evaluate_prior_pairs(prior, historical_prices) + assert "AAPL" in result + assert "MSFT" in result + + +def test_evaluate_prior_pairs_skips_malformed_pair_missing_long_leg() -> None: + prior = pl.DataFrame( + { + "ticker": ["MSFT"], + "timestamp": [1735689600.0], + "side": ["SHORT"], + "dollar_amount": [1000.0], + "action": ["OPEN_POSITION"], + "pair_id": ["AAPL-MSFT"], + }, + schema=_PRIOR_PORTFOLIO_SCHEMA, + ) + historical_prices = _make_historical_prices(["AAPL", "MSFT"]) + result = evaluate_prior_pairs(prior, historical_prices) + assert result == set() + + +def test_evaluate_prior_pairs_skips_pair_with_insufficient_price_history() -> None: + prior = _make_prior_portfolio( + [{"pair_id": "AAPL-MSFT", "long_ticker": "AAPL", "short_ticker": "MSFT"}] + ) + historical_prices = pl.DataFrame( + { + "ticker": ["AAPL", "MSFT"], + "timestamp": [1.0, 1.0], + "close_price": [100.0, 100.0], + } + ) + result = evaluate_prior_pairs(prior, historical_prices) + assert result == set() + + +def test_evaluate_prior_pairs_skips_pair_missing_from_price_data() -> None: + prior = _make_prior_portfolio( + [{"pair_id": "AAPL-MSFT", "long_ticker": "AAPL", "short_ticker": "MSFT"}] + ) + historical_prices = _make_historical_prices(["AAPL"]) # MSFT missing + result = evaluate_prior_pairs(prior, historical_prices) + assert result == set() + + +def test_evaluate_prior_pairs_skips_pair_with_non_positive_prices() -> None: + prior = _make_prior_portfolio( + [{"pair_id": "AAPL-MSFT", "long_ticker": "AAPL", "short_ticker": "MSFT"}] + ) + n_rows = 65 + last_row = n_rows - 1 + rows = [] + for i in range(n_rows): + rows.append( + { + "ticker": "AAPL", + "timestamp": float(i), + "close_price": 0.0 if i == last_row else 100.0, + } + ) + rows.append({"ticker": "MSFT", "timestamp": float(i), "close_price": 100.0}) + historical_prices = pl.DataFrame(rows) + result = evaluate_prior_pairs(prior, historical_prices) + assert result == set() + + +def test_evaluate_prior_pairs_skips_pair_with_nan_z_score() -> None: + prior = _make_prior_portfolio( + [{"pair_id": "AAPL-MSFT", "long_ticker": "AAPL", "short_ticker": "MSFT"}] + ) + historical_prices = _make_historical_prices(["AAPL", "MSFT"]) + with patch( + "portfoliomanager.server.compute_spread_zscore", + return_value=(float("nan"), 1.0), + ): + result = evaluate_prior_pairs(prior, historical_prices) + assert result == set() + + +def test_evaluate_prior_pairs_holds_multiple_pairs_independently() -> None: + prior = _make_prior_portfolio( + [ + {"pair_id": "AAPL-MSFT", "long_ticker": "AAPL", "short_ticker": "MSFT"}, + {"pair_id": "GOOGL-AMZN", "long_ticker": "GOOGL", "short_ticker": "AMZN"}, + ] + ) + historical_prices = _make_historical_prices(["AAPL", "MSFT", "GOOGL", "AMZN"]) + + # pair_ids are sorted: "AAPL-MSFT" < "GOOGL-AMZN" + # first call → AAPL-MSFT (z=2.0, held), second → GOOGL-AMZN (z=0.2, closed) + with patch( + "portfoliomanager.server.compute_spread_zscore", + side_effect=[(2.0, 1.0), (0.2, 1.0)], + ): + result = evaluate_prior_pairs(prior, historical_prices) + + assert result == {"AAPL", "MSFT"} + + +# --- get_prior_portfolio --- + + +def test_get_prior_portfolio_returns_empty_dataframe_on_empty_array_response() -> None: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.text = "[]" + with patch("portfoliomanager.server.requests.get", return_value=mock_response): + result = get_prior_portfolio() + assert result.is_empty() + assert "pair_id" in result.columns + + +def test_get_prior_portfolio_returns_dataframe_with_pair_id_on_success() -> None: + data = [ + { + "ticker": "AAPL", + "timestamp": 1735689600.0, + "side": "LONG", + "dollar_amount": 1000.0, + "action": "OPEN_POSITION", + "pair_id": "AAPL-MSFT", + } + ] + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.text = json.dumps(data) + mock_response.json.return_value = data + with patch("portfoliomanager.server.requests.get", return_value=mock_response): + result = get_prior_portfolio() + assert result.height == 1 + assert "pair_id" in result.columns + assert result["pair_id"][0] == "AAPL-MSFT" + + +def test_get_prior_portfolio_returns_empty_dataframe_on_error_response() -> None: + mock_response = MagicMock() + mock_response.status_code = 500 + with patch("portfoliomanager.server.requests.get", return_value=mock_response): + result = get_prior_portfolio() + assert result.is_empty() + assert "pair_id" in result.columns + + +def test_get_prior_portfolio_returns_empty_dataframe_on_whitespace_response() -> None: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.text = " " + with patch("portfoliomanager.server.requests.get", return_value=mock_response): + result = get_prior_portfolio() + assert result.is_empty() + + +# --- get_positions --- + + +def test_get_positions_excludes_held_tickers_from_close_list() -> None: + prior_tickers = ["AAPL", "MSFT", "GOOGL"] + held_tickers = {"AAPL", "MSFT"} + optimal = _make_optimal_portfolio() + _, close_positions = get_positions(prior_tickers, held_tickers, optimal) + close_ticker_list = [p["ticker"] for p in close_positions] + assert "AAPL" not in close_ticker_list + assert "MSFT" not in close_ticker_list + assert "GOOGL" in close_ticker_list + + +def test_get_positions_includes_all_non_held_prior_tickers_in_close_list() -> None: + prior_tickers = ["AAPL", "MSFT", "GOOGL", "AMZN"] + held_tickers = {"AAPL"} + optimal = _make_optimal_portfolio() + _, close_positions = get_positions(prior_tickers, held_tickers, optimal) + close_ticker_list = [p["ticker"] for p in close_positions] + assert "MSFT" in close_ticker_list + assert "GOOGL" in close_ticker_list + assert "AMZN" in close_ticker_list + assert len(close_positions) == 3 # noqa: PLR2004 + + +def test_get_positions_closes_all_prior_tickers_when_held_set_is_empty() -> None: + prior_tickers = ["AAPL", "MSFT"] + held_tickers: set[str] = set() + optimal = _make_optimal_portfolio() + _, close_positions = get_positions(prior_tickers, held_tickers, optimal) + close_ticker_list = [p["ticker"] for p in close_positions] + assert "AAPL" in close_ticker_list + assert "MSFT" in close_ticker_list + assert len(close_positions) == 2 # noqa: PLR2004 + + +def test_get_positions_returns_correct_open_positions() -> None: + prior_tickers: list[str] = [] + held_tickers: set[str] = set() + optimal = _make_optimal_portfolio() + open_positions, _ = get_positions(prior_tickers, held_tickers, optimal) + assert len(open_positions) == 2 # noqa: PLR2004 + tickers = [p["ticker"] for p in open_positions] + assert "NVDA" in tickers + assert "AMD" in tickers + + +@pytest.mark.parametrize( + ("held_tickers", "expected_close_count"), + [ + (set(), 3), + ({"AAPL"}, 2), + ({"AAPL", "MSFT"}, 1), + ({"AAPL", "MSFT", "GOOGL"}, 0), + ], +) +def test_get_positions_close_count_matches_non_held_prior_tickers( + held_tickers: set[str], + expected_close_count: int, +) -> None: + prior_tickers = ["AAPL", "MSFT", "GOOGL"] + optimal = _make_optimal_portfolio() + _, close_positions = get_positions(prior_tickers, held_tickers, optimal) + assert len(close_positions) == expected_close_count diff --git a/applications/portfoliomanager/tests/test_regime.py b/applications/portfoliomanager/tests/test_regime.py new file mode 100644 index 00000000..5f178558 --- /dev/null +++ b/applications/portfoliomanager/tests/test_regime.py @@ -0,0 +1,122 @@ +import numpy as np +import polars as pl +from portfoliomanager.regime import ( + REGIME_AUTOCORRELATION_THRESHOLD, + REGIME_VOLATILITY_THRESHOLD, + REGIME_WINDOW_DAYS, + classify_regime, +) + + +def _make_spy_prices_from_returns( + returns: list[float], base_price: float = 100.0 +) -> pl.DataFrame: + prices = [base_price] + for ret in returns: + prices.append(prices[-1] * (1.0 + ret)) + return pl.DataFrame( + { + "ticker": ["SPY"] * len(prices), + "timestamp": list(range(len(prices))), + "close_price": prices, + } + ) + + +def _make_low_vol_negative_autocorr_spy_prices() -> pl.DataFrame: + # Alternating small returns → annualized vol ≈ 0.008 * sqrt(252) ≈ 0.13 < 0.20 + # Lag-1 autocorr ≈ -1.0 (strongly negative) + n_returns = REGIME_WINDOW_DAYS + 1 + returns = [0.008 if i % 2 == 0 else -0.008 for i in range(n_returns)] + return _make_spy_prices_from_returns(returns) + + +def _make_high_vol_negative_autocorr_spy_prices() -> pl.DataFrame: + # Alternating large returns → annualized vol ≈ 0.025 * sqrt(252) ≈ 0.40 > 0.20 + # Lag-1 autocorr ≈ -1.0 (strongly negative) + n_returns = REGIME_WINDOW_DAYS + 1 + returns = [0.025 if i % 2 == 0 else -0.025 for i in range(n_returns)] + return _make_spy_prices_from_returns(returns) + + +def _make_low_vol_positive_autocorr_spy_prices() -> pl.DataFrame: + # Sine wave with period 20 → lag-1 autocorr ≈ cos(2π/20) ≈ 0.95 > 0 + # Amplitude 0.003 → annualized vol ≈ 0.003/sqrt(2)*sqrt(252) ≈ 0.034 < 0.20 + n_returns = REGIME_WINDOW_DAYS + 1 + returns = [0.003 * np.sin(2 * np.pi * i / 20) for i in range(n_returns)] + return _make_spy_prices_from_returns(returns) + + +def test_classify_regime_returns_mean_reversion_for_low_vol_negative_autocorr() -> None: + spy_prices = _make_low_vol_negative_autocorr_spy_prices() + result = classify_regime(spy_prices) + assert result["state"] == "mean_reversion" + + +def test_classify_regime_returns_trending_for_high_vol_negative_autocorr() -> None: + spy_prices = _make_high_vol_negative_autocorr_spy_prices() + result = classify_regime(spy_prices) + assert result["state"] == "trending" + + +def test_classify_regime_returns_trending_for_low_vol_positive_autocorr() -> None: + spy_prices = _make_low_vol_positive_autocorr_spy_prices() + result = classify_regime(spy_prices) + assert result["state"] == "trending" + + +def test_classify_regime_confidence_is_in_valid_range() -> None: + for spy_prices in [ + _make_low_vol_negative_autocorr_spy_prices(), + _make_high_vol_negative_autocorr_spy_prices(), + _make_low_vol_positive_autocorr_spy_prices(), + ]: + result = classify_regime(spy_prices) + assert 0.0 <= result["confidence"] <= 1.0 + + +def test_classify_regime_returns_trending_for_insufficient_data() -> None: + spy_prices = pl.DataFrame( + {"ticker": ["SPY"], "timestamp": [0], "close_price": [100.0]} + ) + result = classify_regime(spy_prices) + assert result["state"] == "trending" + assert result["confidence"] == 0.0 + + +def test_classify_regime_returns_trending_for_exactly_one_return() -> None: + # Exactly 2 prices yields 1 return, which is below the minimum return count + spy_prices = pl.DataFrame( + {"ticker": ["SPY", "SPY"], "timestamp": [0, 1], "close_price": [100.0, 101.0]} + ) + result = classify_regime(spy_prices) + assert result["state"] == "trending" + assert result["confidence"] == 0.0 + + +def test_classify_regime_returns_trending_for_exactly_two_returns() -> None: + # 3 prices yields 2 returns; np.corrcoef on 1-element arrays produces NaN, + # so the guard must catch len(returns) == 2 before the autocorrelation step. + spy_prices = pl.DataFrame( + { + "ticker": ["SPY", "SPY", "SPY"], + "timestamp": [0, 1, 2], + "close_price": [100.0, 101.0, 102.0], + } + ) + result = classify_regime(spy_prices) + assert result["state"] == "trending" + assert result["confidence"] == 0.0 + + +def test_classify_regime_mean_reversion_confidence_is_positive() -> None: + spy_prices = _make_low_vol_negative_autocorr_spy_prices() + result = classify_regime(spy_prices) + assert result["confidence"] > 0.0 + + +def test_classify_regime_uses_module_constants() -> None: + # Verify constants have the values discussed in design + assert REGIME_VOLATILITY_THRESHOLD == 0.20 # noqa: PLR2004 + assert REGIME_AUTOCORRELATION_THRESHOLD == 0.0 + assert REGIME_WINDOW_DAYS == 60 # noqa: PLR2004 diff --git a/applications/portfoliomanager/tests/test_risk_management.py b/applications/portfoliomanager/tests/test_risk_management.py index f348b22c..248bb3d7 100644 --- a/applications/portfoliomanager/tests/test_risk_management.py +++ b/applications/portfoliomanager/tests/test_risk_management.py @@ -2,6 +2,7 @@ import polars as pl import pytest +from portfoliomanager.beta import compute_portfolio_beta from portfoliomanager.exceptions import InsufficientPairsError from portfoliomanager.portfolio_schema import portfolio_schema from portfoliomanager.risk_management import ( @@ -35,10 +36,35 @@ def _make_candidate_pairs( ) +def _make_neutral_market_betas(count: int = 10) -> pl.DataFrame: + tickers = [] + for i in range(count): + tickers.extend([f"TICK{i:02d}A", f"TICK{i:02d}B"]) + return pl.DataFrame({"ticker": tickers, "market_beta": [1.0] * (count * 2)}) + + +def _make_asymmetric_market_betas() -> pl.DataFrame: + """Pairs 0-7: long_beta=2.0, short_beta=1.0. + Pairs 8-9: long_beta=1.0, short_beta=2.0. + """ + tickers = [] + betas = [] + for i in range(8): + tickers.extend([f"TICK{i:02d}A", f"TICK{i:02d}B"]) + betas.extend([2.0, 1.0]) + for i in range(8, 10): + tickers.extend([f"TICK{i:02d}A", f"TICK{i:02d}B"]) + betas.extend([1.0, 2.0]) + return pl.DataFrame({"ticker": tickers, "market_beta": betas}) + + def test_size_pairs_with_volatility_parity_long_equals_short_dollar_totals() -> None: pairs = _make_candidate_pairs() result = size_pairs_with_volatility_parity( - pairs, maximum_capital=10000.0, current_timestamp=_CURRENT_TIMESTAMP + pairs, + maximum_capital=10000.0, + current_timestamp=_CURRENT_TIMESTAMP, + market_betas=_make_neutral_market_betas(), ) long_sum = result.filter(pl.col("side") == "LONG")["dollar_amount"].sum() short_sum = result.filter(pl.col("side") == "SHORT")["dollar_amount"].sum() @@ -52,7 +78,10 @@ def test_size_pairs_with_volatility_parity_lower_volatility_receives_more_capita short_vols = [0.01] + [0.04] * 9 pairs = _make_candidate_pairs(long_vols=long_vols, short_vols=short_vols) result = size_pairs_with_volatility_parity( - pairs, maximum_capital=10000.0, current_timestamp=_CURRENT_TIMESTAMP + pairs, + maximum_capital=10000.0, + current_timestamp=_CURRENT_TIMESTAMP, + market_betas=_make_neutral_market_betas(), ) long_df = result.filter(pl.col("side") == "LONG") low_vol_amount = long_df.filter(pl.col("ticker") == "TICK00A")[ @@ -68,7 +97,10 @@ def test_size_pairs_with_volatility_parity_raises_insufficient_pairs_error() -> pairs = _make_candidate_pairs(count=REQUIRED_PAIRS - 1) with pytest.raises(InsufficientPairsError): size_pairs_with_volatility_parity( - pairs, maximum_capital=10000.0, current_timestamp=_CURRENT_TIMESTAMP + pairs, + maximum_capital=10000.0, + current_timestamp=_CURRENT_TIMESTAMP, + market_betas=_make_neutral_market_betas(count=REQUIRED_PAIRS - 1), ) @@ -77,7 +109,69 @@ def test_size_pairs_with_volatility_parity_output_passes_portfolio_schema_valida ): pairs = _make_candidate_pairs() result = size_pairs_with_volatility_parity( - pairs, maximum_capital=10000.0, current_timestamp=_CURRENT_TIMESTAMP + pairs, + maximum_capital=10000.0, + current_timestamp=_CURRENT_TIMESTAMP, + market_betas=_make_neutral_market_betas(), ) validated = portfolio_schema.validate(result) assert validated.height == REQUIRED_PAIRS * 2 + + +def test_size_pairs_with_volatility_parity_exposure_scale_halves_dollar_amounts() -> ( + None +): + pairs = _make_candidate_pairs() + market_betas = _make_neutral_market_betas() + + full_result = size_pairs_with_volatility_parity( + pairs, + maximum_capital=10000.0, + current_timestamp=_CURRENT_TIMESTAMP, + market_betas=market_betas, + exposure_scale=1.0, + ) + half_result = size_pairs_with_volatility_parity( + pairs, + maximum_capital=10000.0, + current_timestamp=_CURRENT_TIMESTAMP, + market_betas=market_betas, + exposure_scale=0.5, + ) + + full_amounts = full_result.sort(["ticker", "side"])["dollar_amount"].to_list() + half_amounts = half_result.sort(["ticker", "side"])["dollar_amount"].to_list() + + for full, half in zip(full_amounts, half_amounts, strict=False): + assert half == pytest.approx(full * 0.5) + + +def test_size_pairs_with_volatility_parity_beta_neutral_reduces_portfolio_beta() -> ( + None +): + # Pairs 0-7: long_beta=2.0, short_beta=1.0 (net positive contribution) + # Pairs 8-9: long_beta=1.0, short_beta=2.0 (net negative contribution) + # Equal vol-parity weights produce portfolio beta ≠ 0; optimizer drives it toward 0 + pairs = _make_candidate_pairs() + asymmetric_betas = _make_asymmetric_market_betas() + neutral_betas = _make_neutral_market_betas() + + beta_neutral_result = size_pairs_with_volatility_parity( + pairs, + maximum_capital=10000.0, + current_timestamp=_CURRENT_TIMESTAMP, + market_betas=asymmetric_betas, + ) + vol_parity_result = size_pairs_with_volatility_parity( + pairs, + maximum_capital=10000.0, + current_timestamp=_CURRENT_TIMESTAMP, + market_betas=neutral_betas, + ) + + beta_neutral_beta = abs( + compute_portfolio_beta(beta_neutral_result, asymmetric_betas) + ) + vol_parity_beta = abs(compute_portfolio_beta(vol_parity_result, asymmetric_betas)) + + assert beta_neutral_beta < vol_parity_beta diff --git a/pyproject.toml b/pyproject.toml index cefaa149..ad11c94d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,9 @@ skip_covered = true [tool.coverage.xml] output = ".coverage_output/python.xml" +[tool.ruff] +target-version = "py312" + [tool.ruff.lint] select = [ "A", # flake8 builtins