diff --git a/applications/portfoliomanager/src/portfoliomanager/alpaca_client.py b/applications/portfoliomanager/src/portfoliomanager/alpaca_client.py index 1007b6daa..05651a72a 100644 --- a/applications/portfoliomanager/src/portfoliomanager/alpaca_client.py +++ b/applications/portfoliomanager/src/portfoliomanager/alpaca_client.py @@ -4,7 +4,6 @@ import structlog from alpaca.common.exceptions import APIError from alpaca.data import StockHistoricalDataClient -from alpaca.data.requests import StockLatestQuoteRequest from alpaca.trading import ( ClosePositionRequest, OrderRequest, @@ -61,74 +60,18 @@ def get_account(self) -> AlpacaAccount: buying_power=float(cast("str", account.buying_power)), ) - def _get_current_price(self, ticker: str, side: TradeSide) -> float: - """Get current price for a ticker based on trade side. - - Uses ask price for buys, bid price for sells. - Falls back to the opposite price if the primary price is unavailable. - """ - request = StockLatestQuoteRequest(symbol_or_symbols=ticker.upper()) - quotes = self.data_client.get_stock_latest_quote(request) - quote = quotes.get(ticker.upper()) - - if quote is None: - message = f"No quote returned for {ticker}" - raise ValueError(message) - - ask_price = ( - float(quote.ask_price) - if quote.ask_price is not None and quote.ask_price > 0 - else 0.0 - ) - bid_price = ( - float(quote.bid_price) - if quote.bid_price is not None and quote.bid_price > 0 - else 0.0 - ) - - if side == TradeSide.BUY: - if ask_price > 0: - return ask_price - if bid_price > 0: - logger.warning( - "Ask price unavailable, using bid price as fallback", - ticker=ticker, - side=side.value, - bid_price=bid_price, - ) - return bid_price - message = f"No valid price for {ticker}: ask and bid are 0" - raise ValueError(message) - - if bid_price > 0: - return bid_price - if ask_price > 0: - logger.warning( - "Bid price unavailable, using ask price as fallback", - ticker=ticker, - side=side.value, - ask_price=ask_price, - ) - return ask_price - message = f"No valid price for {ticker}: bid and ask are 0" - raise ValueError(message) - def open_position( self, ticker: str, side: TradeSide, dollar_amount: float, ) -> None: - # Calculate quantity from dollar amount and current price - # Allow fractional shares where supported by the brokerage - current_price = self._get_current_price(ticker, side) - qty = dollar_amount / current_price - - if qty <= 0: + # Use notional (dollar amount) for order submission + # This allows Alpaca to handle fractional shares automatically + if dollar_amount <= 0: message = ( f"Cannot open position for {ticker}: " - f"non-positive quantity calculated from dollar_amount {dollar_amount} " - f"and price {current_price}" + f"non-positive dollar_amount {dollar_amount}" ) raise ValueError(message) @@ -136,7 +79,7 @@ def open_position( self.trading_client.submit_order( order_data=OrderRequest( symbol=ticker.upper(), - qty=qty, + notional=dollar_amount, side=OrderSide(side.value.lower()), type=OrderType.MARKET, time_in_force=TimeInForce.DAY, diff --git a/applications/portfoliomanager/src/portfoliomanager/enums.py b/applications/portfoliomanager/src/portfoliomanager/enums.py index 946b55f4b..e805c017e 100644 --- a/applications/portfoliomanager/src/portfoliomanager/enums.py +++ b/applications/portfoliomanager/src/portfoliomanager/enums.py @@ -2,10 +2,8 @@ class PositionAction(Enum): - PDT_LOCKED = "PDT_LOCKED" - CLOSE_POSITION = "CLOSE_POSITION" - MAINTAIN_POSITION = "MAINTAIN_POSITION" OPEN_POSITION = "OPEN_POSITION" + CLOSE_POSITION = "CLOSE_POSITION" UNSPECIFIED = "UNSPECIFIED" diff --git a/applications/portfoliomanager/src/portfoliomanager/portfolio_schema.py b/applications/portfoliomanager/src/portfoliomanager/portfolio_schema.py index f21d62353..3a8d254c4 100644 --- a/applications/portfoliomanager/src/portfoliomanager/portfolio_schema.py +++ b/applications/portfoliomanager/src/portfoliomanager/portfolio_schema.py @@ -116,10 +116,8 @@ def check_position_side_sums( checks=[ pa.Check.isin( [ - PositionAction.PDT_LOCKED.value, - PositionAction.CLOSE_POSITION.value, PositionAction.OPEN_POSITION.value, - PositionAction.MAINTAIN_POSITION.value, + PositionAction.CLOSE_POSITION.value, PositionAction.UNSPECIFIED.value, ] ), diff --git a/applications/portfoliomanager/src/portfoliomanager/risk_management.py b/applications/portfoliomanager/src/portfoliomanager/risk_management.py index 1bb6f32b9..a68626228 100644 --- a/applications/portfoliomanager/src/portfoliomanager/risk_management.py +++ b/applications/portfoliomanager/src/portfoliomanager/risk_management.py @@ -1,6 +1,5 @@ -import math import os -from datetime import UTC, datetime +from datetime import datetime import polars as pl import structlog @@ -11,189 +10,8 @@ logger = structlog.get_logger() UNCERTAINTY_THRESHOLD = float(os.getenv("OSCM_UNCERTAINTY_THRESHOLD", "0.1")) - - -def add_portfolio_action_column( - prior_portfolio: pl.DataFrame, - current_timestamp: datetime, -) -> pl.DataFrame: - prior_portfolio = prior_portfolio.clone() - - return prior_portfolio.with_columns( - pl.when( - pl.col("timestamp") - .cast(pl.Float64) - .map_elements( - lambda ts: datetime.fromtimestamp(ts, tz=UTC).date(), - return_dtype=pl.Date, - ) - == current_timestamp.date() - ) - .then(pl.lit(PositionAction.PDT_LOCKED.value)) - .otherwise(pl.lit(PositionAction.UNSPECIFIED.value)) - .alias("action") - ) - - -def add_equity_bars_returns_and_realized_volatility_columns( - prior_equity_bars: pl.DataFrame, -) -> pl.DataFrame: - prior_equity_bars = prior_equity_bars.clone() - - minimum_bars_per_ticker_required = 30 - - ticker_counts = prior_equity_bars.group_by("ticker").agg(pl.len().alias("count")) - insufficient_tickers = ticker_counts.filter( - pl.col("count") < minimum_bars_per_ticker_required - ) - - if insufficient_tickers.height > 0: - insufficient_list = insufficient_tickers.select("ticker").to_series().to_list() - message = f"Tickers with insufficient data (< {minimum_bars_per_ticker_required} rows): {insufficient_list}" # noqa: E501 - raise ValueError(message) - - prior_equity_bars = prior_equity_bars.sort(["ticker", "timestamp"]) - daily_returns = pl.col("close_price").pct_change().over("ticker") - return prior_equity_bars.with_columns( - pl.when(pl.col("close_price").is_not_null()) - .then(daily_returns) - .otherwise(None) - .alias("daily_returns"), - pl.when(pl.col("close_price").is_not_null()) - .then( - pl.when((daily_returns + 1) > 0) - .then((daily_returns + 1).log()) - .otherwise(None) - ) - .otherwise(None) - .alias("log_daily_returns"), - daily_returns.rolling_std(window_size=minimum_bars_per_ticker_required).alias( - "realized_volatility" - ), - ) - - -def add_portfolio_performance_columns( - prior_portfolio: pl.DataFrame, - prior_predictions: pl.DataFrame, # per original ticker and timestamp - prior_equity_bars: pl.DataFrame, # per original ticker and timestamp - current_timestamp: datetime, -) -> pl.DataFrame: - prior_portfolio = prior_portfolio.clone() - prior_predictions = prior_predictions.clone() - prior_equity_bars = prior_equity_bars.clone() - - # Ensure timestamp columns have matching types for joins and comparisons. - # Timestamps may arrive as i64 (from JSON integer serialization) or f64 (from - # Python float conversion). Unconditional casting to Float64 is simpler and - # more robust than checking dtypes, and the performance cost is negligible. - prior_portfolio = prior_portfolio.with_columns(pl.col("timestamp").cast(pl.Float64)) - prior_predictions = prior_predictions.with_columns( - pl.col("timestamp").cast(pl.Float64) - ) - prior_equity_bars = prior_equity_bars.with_columns( - pl.col("timestamp").cast(pl.Float64) - ) - - prior_portfolio_predictions = prior_portfolio.join( - other=prior_predictions, - on=["ticker", "timestamp"], - how="left", - ).select( - pl.col("ticker"), - pl.col("timestamp"), - pl.col("side"), - pl.col("dollar_amount"), - pl.col("action"), - pl.col("quantile_10").alias("original_lower_threshold"), - pl.col("quantile_90").alias("original_upper_threshold"), - ) - - prior_equity_bars_with_returns = prior_equity_bars.sort(["ticker", "timestamp"]) - - position_returns = [] - - for row in prior_portfolio_predictions.iter_rows(named=True): - ticker = row["ticker"] - position_timestamp = row["timestamp"] - - ticker_bars = prior_equity_bars_with_returns.filter( - (pl.col("ticker") == ticker) - & (pl.col("timestamp") >= position_timestamp) - & (pl.col("timestamp") <= current_timestamp.timestamp()) - ) - - cumulative_log_return = ( - ticker_bars.select(pl.col("log_daily_returns").sum()).item() or 0 - ) - - cumulative_simple_return = math.exp(cumulative_log_return) - 1 - - position_returns.append( - { - "ticker": ticker, - "timestamp": position_timestamp, - "cumulative_simple_return": cumulative_simple_return, - } - ) - - returns = pl.DataFrame(position_returns) - - prior_portfolio_with_data = prior_portfolio_predictions.join( - other=returns, - on=["ticker", "timestamp"], - how="left", - ) - - return prior_portfolio_with_data.with_columns( - pl.when(pl.col("action") == PositionAction.PDT_LOCKED.value) - .then(pl.lit(PositionAction.PDT_LOCKED.value)) - .when( - (pl.col("action") != PositionAction.PDT_LOCKED.value) - & ( - ( - (pl.col("side") == PositionSide.LONG.value) - & ( - pl.col("cumulative_simple_return") - <= pl.col("original_lower_threshold") - ) - ) - | ( - (pl.col("side") == PositionSide.SHORT.value) - & ( - pl.col("cumulative_simple_return") - >= pl.col("original_upper_threshold") - ) - ) - ) - ) - .then(pl.lit(PositionAction.CLOSE_POSITION.value)) - .when( - ( - (pl.col("side") == PositionSide.LONG.value) - & ( - pl.col("cumulative_simple_return") - >= pl.col("original_upper_threshold") - ) - ) - | ( - (pl.col("side") == PositionSide.SHORT.value) - & ( - pl.col("cumulative_simple_return") - <= pl.col("original_lower_threshold") - ) - ) - ) - .then(pl.lit(PositionAction.MAINTAIN_POSITION.value)) - .otherwise(pl.lit(PositionAction.UNSPECIFIED.value)) - .alias("action") - ).drop( - [ - "original_lower_threshold", - "original_upper_threshold", - "cumulative_simple_return", - ] - ) +REQUIRED_PORTFOLIO_SIZE = 20 # 10 long + 10 short +SIDE_SIZE = REQUIRED_PORTFOLIO_SIZE // 2 def add_predictions_zscore_ranked_columns( @@ -224,12 +42,11 @@ def add_predictions_zscore_ranked_columns( def create_optimal_portfolio( current_predictions: pl.DataFrame, - prior_portfolio: pl.DataFrame, + prior_portfolio_tickers: list[str], maximum_capital: float, current_timestamp: datetime, ) -> pl.DataFrame: current_predictions = current_predictions.clone() - prior_portfolio = prior_portfolio.clone() high_uncertainty_tickers = ( current_predictions.filter( @@ -240,123 +57,58 @@ def create_optimal_portfolio( .to_list() ) - closed_positions, maintained_positions = _filter_positions(prior_portfolio) - - closed_position_tickers = closed_positions.select("ticker").to_series().to_list() - maintained_position_tickers = ( - maintained_positions.select("ticker").to_series().to_list() - ) - - excluded_tickers = ( - high_uncertainty_tickers + closed_position_tickers + maintained_position_tickers - ) - - prediction_summary = current_predictions.select( - "ticker", - "quantile_10", - "quantile_50", - "quantile_90", - "inter_quartile_range", - "composite_score", - ).to_dicts() - - logger.info( - "Current predictions received", - predictions=prediction_summary, - ) + # Excluding prior portfolio tickers to avoid pattern day trader restrictions. + excluded_tickers = list(set(high_uncertainty_tickers + prior_portfolio_tickers)) logger.info( "Portfolio filtering breakdown", total_predictions=current_predictions.height, high_uncertainty_excluded=len(high_uncertainty_tickers), high_uncertainty_threshold=UNCERTAINTY_THRESHOLD, - closed_positions_excluded=len(closed_position_tickers), - maintained_positions_excluded=len(maintained_position_tickers), + prior_portfolio_excluded=len(prior_portfolio_tickers), total_excluded=len(excluded_tickers), - high_uncertainty_tickers=high_uncertainty_tickers[:10] - if high_uncertainty_tickers - else [], ) available_predictions = current_predictions.filter( ~pl.col("ticker").is_in(excluded_tickers) - ) + ).sort(["composite_score", "inter_quartile_range"], descending=[True, False]) logger.info( "Available predictions after filtering", available_count=available_predictions.height, - required_for_full_portfolio=20, - ) - - maintained_long_capital = _filter_side_capital_amount( - maintained_positions, PositionSide.LONG.value - ) - maintained_short_capital = _filter_side_capital_amount( - maintained_positions, PositionSide.SHORT.value - ) - closed_long_capital = _filter_side_capital_amount( - closed_positions, PositionSide.LONG.value - ) - closed_short_capital = _filter_side_capital_amount( - closed_positions, PositionSide.SHORT.value - ) - - target_side_capital = maximum_capital / 2 - available_long_capital = max( - 0.0, - target_side_capital - maintained_long_capital + closed_long_capital, - ) - available_short_capital = max( - 0.0, - target_side_capital - maintained_short_capital + closed_short_capital, + required_for_full_portfolio=REQUIRED_PORTFOLIO_SIZE, ) - maintained_long_count = maintained_positions.filter( - pl.col("side") == PositionSide.LONG.value - ).height - maintained_short_count = maintained_positions.filter( - pl.col("side") == PositionSide.SHORT.value - ).height + if available_predictions.height < REQUIRED_PORTFOLIO_SIZE: + message = ( + f"Only {available_predictions.height} predictions available " + f"after filtering, need {REQUIRED_PORTFOLIO_SIZE} (10 long + 10 short). " + f"Excluded: {len(high_uncertainty_tickers)} high uncertainty, " + f"{len(prior_portfolio_tickers)} prior portfolio tickers." + ) + raise InsufficientPredictionsError(message) - new_long_positions_needed = max(0, 10 - maintained_long_count) - new_short_positions_needed = max(0, 10 - maintained_short_count) + long_candidates = available_predictions.head(SIDE_SIZE) + short_candidates = available_predictions.tail(SIDE_SIZE) - total_available = available_predictions.height - maximum_long_candidates = min(new_long_positions_needed, total_available // 2) - maximum_short_candidates = min( - new_short_positions_needed, total_available - maximum_long_candidates - ) + target_side_capital = maximum_capital / 2 + dollar_amount_per_position = target_side_capital / SIDE_SIZE logger.info( - "Position allocation calculation", - total_available_predictions=total_available, - new_long_positions_needed=new_long_positions_needed, - new_short_positions_needed=new_short_positions_needed, - maximum_long_candidates=maximum_long_candidates, - maximum_short_candidates=maximum_short_candidates, - maintained_long_count=maintained_long_count, - maintained_short_count=maintained_short_count, - ) - - long_candidates = available_predictions.head(maximum_long_candidates) - short_candidates = available_predictions.tail(maximum_short_candidates) - - dollar_amount_per_long = ( - available_long_capital / maximum_long_candidates - if maximum_long_candidates > 0 - else 0 - ) - dollar_amount_per_short = ( - available_short_capital / maximum_short_candidates - if maximum_short_candidates > 0 - else 0 + "Portfolio allocation", + total_capital=maximum_capital, + long_capital=target_side_capital, + short_capital=target_side_capital, + dollar_per_position=dollar_amount_per_position, + long_count=SIDE_SIZE, + short_count=SIDE_SIZE, ) long_positions = long_candidates.select( pl.col("ticker"), pl.lit(current_timestamp.timestamp()).cast(pl.Float64).alias("timestamp"), pl.lit(PositionSide.LONG.value).alias("side"), - pl.lit(dollar_amount_per_long).alias("dollar_amount"), + pl.lit(dollar_amount_per_position).alias("dollar_amount"), pl.lit(PositionAction.OPEN_POSITION.value).alias("action"), ) @@ -364,110 +116,8 @@ def create_optimal_portfolio( pl.col("ticker"), pl.lit(current_timestamp.timestamp()).cast(pl.Float64).alias("timestamp"), pl.lit(PositionSide.SHORT.value).alias("side"), - pl.lit(dollar_amount_per_short).alias("dollar_amount"), + pl.lit(dollar_amount_per_position).alias("dollar_amount"), pl.lit(PositionAction.OPEN_POSITION.value).alias("action"), ) - return _collect_portfolio_positions( - long_positions, - short_positions, - maintained_positions, - ) - - -def _filter_positions(positions: pl.DataFrame) -> tuple[pl.DataFrame, pl.DataFrame]: - positions = positions.clone() - - if positions.height == 0: - return ( - pl.DataFrame( - { - "ticker": [], - "timestamp": [], - "side": [], - "dollar_amount": [], - "action": [], - } - ), - pl.DataFrame( - { - "ticker": [], - "timestamp": [], - "side": [], - "dollar_amount": [], - "action": [], - } - ), - ) - - closed_positions = positions.filter( - pl.col("action") == PositionAction.CLOSE_POSITION.value - ) - maintained_positions = positions.filter( - pl.col("action") == PositionAction.MAINTAIN_POSITION.value - ) - - return closed_positions, maintained_positions - - -def _filter_side_capital_amount(positions: pl.DataFrame, side: str) -> float: - positions = positions.clone() - - filtered_positions = positions.filter(pl.col("side") == side.upper()) - - if filtered_positions.height == 0: - return 0.0 - - try: - side_capital_amount = filtered_positions.select(pl.sum("dollar_amount")).item() - return float(side_capital_amount or 0) - - except Exception: # noqa: BLE001 - return 0.0 - - -def _collect_portfolio_positions( - long_positions: pl.DataFrame, - short_positions: pl.DataFrame, - maintained_positions: pl.DataFrame, -) -> pl.DataFrame: - long_positions = long_positions.clone() - short_positions = short_positions.clone() - maintained_positions = maintained_positions.clone() - - portfolio_components = [] - - if long_positions.height > 0: - portfolio_components.append(long_positions) - if short_positions.height > 0: - portfolio_components.append(short_positions) - if maintained_positions.height > 0: - portfolio_components.append( - maintained_positions.with_columns(pl.col("timestamp").cast(pl.Float64)) - ) - - if len(portfolio_components) == 0: - logger.warning( - "No portfolio components available", - long_positions_count=long_positions.height, - short_positions_count=short_positions.height, - maintained_positions_count=maintained_positions.height, - ) - message = ( - "No portfolio components to create an optimal portfolio. " - f"Long positions: {long_positions.height}, " - f"Short positions: {short_positions.height}, " - f"Maintained positions: {maintained_positions.height}. " - "This may indicate insufficient predictions after filtering." - ) - raise InsufficientPredictionsError(message) - - optimal_portfolio = pl.concat(portfolio_components) - - return optimal_portfolio.select( - "ticker", - pl.col("timestamp").cast(pl.Float64), - "side", - "dollar_amount", - "action", - ).sort(["ticker", "side"]) + return pl.concat([long_positions, short_positions]).sort(["ticker", "side"]) diff --git a/applications/portfoliomanager/src/portfoliomanager/server.py b/applications/portfoliomanager/src/portfoliomanager/server.py index 3c45d4a37..7d8fe0c70 100644 --- a/applications/portfoliomanager/src/portfoliomanager/server.py +++ b/applications/portfoliomanager/src/portfoliomanager/server.py @@ -1,9 +1,6 @@ -import io -import json import logging import os from datetime import UTC, datetime -from typing import cast import httpx import polars as pl @@ -11,7 +8,6 @@ import sentry_sdk import structlog from fastapi import FastAPI, Response, status -from internal.equity_bars_schema import equity_bars_schema from sentry_sdk.integrations.logging import LoggingIntegration from .alpaca_client import AlpacaClient @@ -43,7 +39,7 @@ cache_logger_on_first_use=True, ) -from .enums import PositionAction, PositionSide, TradeSide # noqa: E402 +from .enums import PositionSide, TradeSide # noqa: E402 from .exceptions import ( # noqa: E402 AssetNotShortableError, InsufficientBuyingPowerError, @@ -52,9 +48,6 @@ from .portfolio_schema import portfolio_schema # noqa: E402 from .risk_management import ( # noqa: E402 UNCERTAINTY_THRESHOLD, - add_equity_bars_returns_and_realized_volatility_columns, - add_portfolio_action_column, - add_portfolio_performance_columns, add_predictions_zscore_ranked_columns, create_optimal_portfolio, ) @@ -123,16 +116,18 @@ async def create_portfolio() -> Response: # noqa: PLR0911, PLR0912, PLR0915, C9 return Response(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) try: - prior_portfolio = get_prior_portfolio(current_timestamp=current_timestamp) - logger.info("Retrieved prior portfolio", count=len(prior_portfolio)) + prior_portfolio_tickers = get_prior_portfolio_tickers() + logger.info( + "Retrieved prior portfolio tickers", count=len(prior_portfolio_tickers) + ) except Exception as e: - logger.exception("Failed to retrieve prior portfolio", error=str(e)) + logger.exception("Failed to retrieve prior portfolio tickers", error=str(e)) return Response(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) try: optimal_portfolio = get_optimal_portfolio( current_predictions=current_predictions, - prior_portfolio=prior_portfolio, + prior_portfolio_tickers=prior_portfolio_tickers, maximum_capital=float(account.cash_amount), current_timestamp=current_timestamp, ) @@ -143,7 +138,7 @@ async def create_portfolio() -> Response: # noqa: PLR0911, PLR0912, PLR0915, C9 error=str(e), uncertainty_threshold=UNCERTAINTY_THRESHOLD, predictions_count=len(current_predictions), - prior_portfolio_count=len(prior_portfolio), + prior_portfolio_tickers_count=len(prior_portfolio_tickers), ) return Response( status_code=status.HTTP_204_NO_CONTENT, @@ -155,7 +150,7 @@ async def create_portfolio() -> Response: # noqa: PLR0911, PLR0912, PLR0915, C9 try: open_positions, close_positions = get_positions( - prior_portfolio=prior_portfolio, + prior_portfolio_tickers=prior_portfolio_tickers, optimal_portfolio=optimal_portfolio, ) logger.info( @@ -223,7 +218,6 @@ async def create_portfolio() -> Response: # noqa: PLR0911, PLR0912, PLR0915, C9 side = open_position["side"] dollar_amount = open_position["dollar_amount"] - # Check if we have enough buying power before attempting the order if dollar_amount > remaining_buying_power: logger.warning( "Skipping position due to insufficient buying power", @@ -368,159 +362,60 @@ async def get_current_predictions() -> pl.DataFrame: ) -def get_prior_portfolio(current_timestamp: datetime) -> pl.DataFrame: # noqa: PLR0911 - empty_portfolio = pl.DataFrame( - { - "ticker": [], - "timestamp": [], - "side": [], - "dollar_amount": [], - "action": [], - } - ) - - prior_portfolio_response = requests.get( - url=f"{DATAMANAGER_BASE_URL}/portfolios", - timeout=60, - ) - - if prior_portfolio_response.status_code == HTTP_NOT_FOUND: - logger.info( - "No prior portfolio found - this is expected on first run", - status_code=prior_portfolio_response.status_code, - ) - return empty_portfolio - - if prior_portfolio_response.status_code >= HTTP_BAD_REQUEST: - logger.warning( - "Failed to fetch prior portfolio from datamanager", - status_code=prior_portfolio_response.status_code, - ) - return empty_portfolio - - # Handle empty or invalid JSON response - response_text = prior_portfolio_response.text.strip() - if not response_text or response_text == "[]": - logger.info("Prior portfolio response is empty") - return empty_portfolio - +def get_prior_portfolio_tickers() -> list[str]: # noqa: PLR0911 + """Fetch tickers from prior portfolio to exclude them (PDT avoidance).""" try: - prior_portfolio_data = prior_portfolio_response.json() - except (ValueError, requests.exceptions.JSONDecodeError) as e: - logger.warning( - "Failed to parse prior portfolio JSON", - error=str(e), - response_text=response_text[:200] if response_text else "empty", + prior_portfolio_response = requests.get( + url=f"{DATAMANAGER_BASE_URL}/portfolios", + timeout=60, ) - return empty_portfolio - - if not prior_portfolio_data: - logger.info("Prior portfolio data is empty") - return empty_portfolio - - prior_portfolio = pl.DataFrame(prior_portfolio_data) - - if prior_portfolio.is_empty(): - return empty_portfolio - - tickers = prior_portfolio["ticker"].unique().to_list() - timestamps = prior_portfolio["timestamp"].cast(pl.Float64) - start_timestamp = datetime.fromtimestamp(cast("float", timestamps.min()), tz=UTC) - - prior_equity_bars_response = requests.get( - url=f"{DATAMANAGER_BASE_URL}/equity-bars", - params={ - "tickers": ",".join(tickers), - "start_timestamp": start_timestamp.isoformat(), - "end_timestamp": current_timestamp.isoformat(), - }, - timeout=60, - ) - - prior_equity_bars_response.raise_for_status() - - tickers_and_timestamps = [ - {"ticker": row[0], "timestamp": row[1]} - for row in prior_portfolio[["ticker", "timestamp"]].rows() - ] - prior_predictions_response = requests.get( - url=f"{DATAMANAGER_BASE_URL}/predictions", - params={"tickers_and_timestamps": json.dumps(tickers_and_timestamps)}, - timeout=60, - ) + # If no prior portfolio, return empty list + if prior_portfolio_response.status_code == HTTP_NOT_FOUND: + logger.info("No prior portfolio found, starting fresh") + return [] - prior_predictions_response.raise_for_status() + if prior_portfolio_response.status_code >= HTTP_BAD_REQUEST: + logger.warning( + "Failed to fetch prior portfolio from datamanager", + status_code=prior_portfolio_response.status_code, + ) + return [] - prior_portfolio = add_portfolio_action_column( - prior_portfolio=prior_portfolio, - current_timestamp=current_timestamp, - ) + response_text = prior_portfolio_response.text.strip() + if not response_text or response_text == "[]": + logger.info("Prior portfolio is empty") + return [] - prior_equity_bars = pl.read_parquet(io.BytesIO(prior_equity_bars_response.content)) + prior_portfolio_data = prior_portfolio_response.json() - prior_equity_bars_validated = equity_bars_schema.validate(prior_equity_bars) - prior_equity_bars = cast( - "pl.DataFrame", - prior_equity_bars_validated.collect() - if isinstance(prior_equity_bars_validated, pl.LazyFrame) - else prior_equity_bars_validated, - ) + if not prior_portfolio_data: + return [] - prior_equity_bars = add_equity_bars_returns_and_realized_volatility_columns( - prior_equity_bars=prior_equity_bars - ) + prior_portfolio = pl.DataFrame(prior_portfolio_data) - predictions_response_text = prior_predictions_response.text.strip() - if not predictions_response_text or predictions_response_text == "[]": - logger.warning( - "Prior predictions empty, returning portfolio without performance" - ) - return add_portfolio_action_column( - prior_portfolio=prior_portfolio, - current_timestamp=current_timestamp, - ) + if prior_portfolio.is_empty(): + return [] - try: - prior_predictions_data = prior_predictions_response.json() - except (ValueError, requests.exceptions.JSONDecodeError) as error: - logger.exception( - "Failed to retrieve prior portfolio", - error=str(error), - ) - return add_portfolio_action_column( - prior_portfolio=prior_portfolio, - current_timestamp=current_timestamp, - ) - - if not prior_predictions_data: - logger.warning("Prior predictions data is empty") - return add_portfolio_action_column( - prior_portfolio=prior_portfolio, - current_timestamp=current_timestamp, - ) - - prior_predictions = pl.DataFrame(prior_predictions_data).with_columns( - pl.col("timestamp").cast(pl.Float64) - ) + tickers = prior_portfolio["ticker"].unique().to_list() + logger.info("Retrieved prior portfolio tickers", count=len(tickers)) + return tickers # noqa: TRY300 - return add_portfolio_performance_columns( - prior_portfolio=prior_portfolio, - prior_equity_bars=prior_equity_bars, - prior_predictions=prior_predictions, - current_timestamp=current_timestamp, - ) + except (ValueError, requests.exceptions.JSONDecodeError) as e: + logger.exception("Failed to parse prior portfolio JSON", error=str(e)) + return [] def get_optimal_portfolio( current_predictions: pl.DataFrame, - prior_portfolio: pl.DataFrame, + prior_portfolio_tickers: list[str], maximum_capital: float, current_timestamp: datetime, ) -> pl.DataFrame: + """Create optimal portfolio with prediction ranking and ticker exclusion.""" optimal_portfolio = create_optimal_portfolio( current_predictions=current_predictions, - prior_portfolio=prior_portfolio, + prior_portfolio_tickers=prior_portfolio_tickers, maximum_capital=maximum_capital, current_timestamp=current_timestamp, ) @@ -542,33 +437,11 @@ def get_optimal_portfolio( def get_positions( - prior_portfolio: pl.DataFrame, + prior_portfolio_tickers: list[str], optimal_portfolio: pl.DataFrame, ) -> tuple[list[dict], list[dict]]: - prior_portfolio = prior_portfolio.clone() - optimal_portfolio = optimal_portfolio.clone() - - close_positions = [] - if not prior_portfolio.is_empty(): - prior_tickers = set(prior_portfolio["ticker"].to_list()) - optimal_tickers = set(optimal_portfolio["ticker"].to_list()) - closing_tickers = prior_tickers - optimal_tickers - - if closing_tickers: - close_positions = [ - { - "ticker": row["ticker"], - "side": ( - TradeSide.SELL - if row["side"] == PositionSide.LONG.value - else TradeSide.BUY - ), - "dollar_amount": row["dollar_amount"], - } - for row in prior_portfolio.filter( - pl.col("ticker").is_in(list(closing_tickers)) - ).iter_rows(named=True) - ] + """Get positions to close and open.""" + close_positions = [{"ticker": ticker} for ticker in prior_portfolio_tickers] open_positions = [ { @@ -580,9 +453,7 @@ def get_positions( ), "dollar_amount": row["dollar_amount"], } - for row in optimal_portfolio.filter( - pl.col("action") == PositionAction.OPEN_POSITION.value - ).iter_rows(named=True) + for row in optimal_portfolio.iter_rows(named=True) ] return open_positions, close_positions diff --git a/applications/portfoliomanager/tests/test_risk_management.py b/applications/portfoliomanager/tests/test_risk_management.py index 2f09dc902..8c4701c4e 100644 --- a/applications/portfoliomanager/tests/test_risk_management.py +++ b/applications/portfoliomanager/tests/test_risk_management.py @@ -2,446 +2,13 @@ import polars as pl import pytest +from portfoliomanager.exceptions import InsufficientPredictionsError from portfoliomanager.risk_management import ( - add_equity_bars_returns_and_realized_volatility_columns, - add_portfolio_action_column, - add_portfolio_performance_columns, add_predictions_zscore_ranked_columns, create_optimal_portfolio, ) -def test_add_portfolio_action_column_same_day_positions_locked() -> None: - current_datetime = datetime(2024, 1, 15, 0, 0, 0, 0, tzinfo=UTC) - positions = pl.DataFrame( - { - "ticker": ["AAPL", "GOOGL"], - "timestamp": [ - datetime(2024, 1, 15, 9, 30, tzinfo=UTC).timestamp(), - datetime(2024, 1, 15, 14, 0, tzinfo=UTC).timestamp(), - ], - "side": ["LONG", "SHORT"], - "dollar_amount": [1000.0, 1000.0], - } - ) - - result = add_portfolio_action_column(positions, current_datetime) - - assert all(action == "PDT_LOCKED" for action in result["action"].to_list()) - assert len(result) == 2 # noqa: PLR2004 - - -def test_add_portfolio_action_column_previous_day_positions_unlocked() -> None: - current_datetime = datetime(2024, 1, 15, 0, 0, 0, 0, tzinfo=UTC) - positions = pl.DataFrame( - { - "ticker": ["AAPL", "GOOGL"], - "timestamp": [ - datetime(2024, 1, 14, 9, 30, tzinfo=UTC).timestamp(), - datetime(2024, 1, 13, 14, 0, tzinfo=UTC).timestamp(), - ], - "side": ["LONG", "SHORT"], - "dollar_amount": [1000.0, 1000.0], - } - ) - - result = add_portfolio_action_column(positions, current_datetime) - - assert all(action == "UNSPECIFIED" for action in result["action"].to_list()) - assert len(result) == 2 # noqa: PLR2004 - - -def test_add_portfolio_action_column_mixed_dates() -> None: - current_datetime = datetime(2024, 1, 15, 0, 0, 0, 0, tzinfo=UTC) - positions = pl.DataFrame( - { - "ticker": ["AAPL", "GOOGL", "TSLA"], - "timestamp": [ - datetime(2024, 1, 15, 9, 30, tzinfo=UTC).timestamp(), # same day - datetime(2024, 1, 14, 14, 0, tzinfo=UTC).timestamp(), # previous day - datetime(2024, 1, 15, 16, 0, tzinfo=UTC).timestamp(), # same day - ], - "side": ["LONG", "SHORT", "LONG"], - "dollar_amount": [1000.0, 1000.0, 1000.0], - } - ) - - result = add_portfolio_action_column(positions, current_datetime) - - expected_actions = ["PDT_LOCKED", "UNSPECIFIED", "PDT_LOCKED"] - assert result["action"].to_list() == expected_actions - - -def test_add_portfolio_action_column_empty_dataframe() -> None: - current_datetime = datetime(2024, 1, 15, 0, 0, 0, 0, tzinfo=UTC) - positions = pl.DataFrame( - {"ticker": [], "timestamp": [], "side": [], "dollar_amount": []} - ) - - result = add_portfolio_action_column(positions, current_datetime) - - assert len(result) == 0 - assert "action" in result.columns - - -def test_add_equity_bars_returns_and_realized_volatility_columns_sufficient_data_success() -> ( # noqa: E501 - None -): - equity_bars = pl.DataFrame( - { - "ticker": ["AAPL"] * 35, - "timestamp": [ - datetime(2024, 1, i + 1, tzinfo=UTC).timestamp() for i in range(31) - ] - + [datetime(2024, 2, i + 1, tzinfo=UTC).timestamp() for i in range(4)], - "close_price": list(range(100, 135)), # increasing prices - } - ) - - result = add_equity_bars_returns_and_realized_volatility_columns(equity_bars) - - assert "daily_returns" in result.columns - assert "log_daily_returns" in result.columns - assert "realized_volatility" in result.columns - assert len(result) == 35 # noqa: PLR2004 - - -def test_add_equity_bars_returns_and_realized_volatility_columns_insufficient_data_raises_error() -> ( # noqa: E501 - None -): - equity_bars = pl.DataFrame( - { - "ticker": ["AAPL"] * 25, # only 25 bars, need 30 - "timestamp": [ - datetime(2024, 1, i + 1, tzinfo=UTC).timestamp() for i in range(25) - ], - "close_price": list(range(100, 125)), - } - ) - - with pytest.raises(ValueError, match="Tickers with insufficient data"): - add_equity_bars_returns_and_realized_volatility_columns(equity_bars) - - -def test_add_equity_bars_returns_and_realized_volatility_columns_multiple_tickers_mixed_data() -> ( # noqa: E501 - None -): - equity_bars = pl.DataFrame( - { - "ticker": ["AAPL"] * 35 + ["GOOGL"] * 25, # AAPL has enough, GOOGL does not - "timestamp": [ - datetime(2024, 1, i + 1, tzinfo=UTC).timestamp() for i in range(31) - ] - + [datetime(2024, 2, i + 1, tzinfo=UTC).timestamp() for i in range(4)] - + [datetime(2024, 2, i + 1, tzinfo=UTC).timestamp() for i in range(25)], - "close_price": list(range(100, 135)) + list(range(200, 225)), - } - ) - - with pytest.raises(ValueError, match="GOOGL"): - add_equity_bars_returns_and_realized_volatility_columns(equity_bars) - - -def test_add_equity_bars_returns_grouped_per_ticker() -> None: - base_timestamp = datetime(2024, 1, 1, tzinfo=UTC).timestamp() - - aapl_data = [] - googl_data = [] - - for i in range(30): - timestamp = base_timestamp + (i * 86400) - aapl_price = 100.0 + i # AAPL prices increase - googl_price = 200.0 - i * 0.5 # GOOGL prices decrease slightly - - aapl_data.append( - {"ticker": "AAPL", "timestamp": timestamp, "close_price": aapl_price} - ) - googl_data.append( - {"ticker": "GOOGL", "timestamp": timestamp, "close_price": googl_price} - ) - - all_data = [] - for i in range(30): - all_data.append(aapl_data[i]) - all_data.append(googl_data[i]) - - equity_bars = pl.DataFrame(all_data) - - out = add_equity_bars_returns_and_realized_volatility_columns(equity_bars) - aapl = out.filter(pl.col("ticker") == "AAPL").sort("timestamp") - googl = out.filter(pl.col("ticker") == "GOOGL").sort("timestamp") - - aapl_returns = aapl["daily_returns"].to_list() - googl_returns = googl["daily_returns"].to_list() - - assert aapl_returns[0] is None - assert aapl_returns[1] == pytest.approx(0.01, abs=1e-9) - assert googl_returns[0] is None - assert googl_returns[1] == pytest.approx(-0.0025, abs=1e-9) - - aapl_log_returns = aapl["log_daily_returns"].to_list() - googl_log_returns = googl["log_daily_returns"].to_list() - - assert aapl_log_returns[0] is None - assert aapl_log_returns[1] == pytest.approx(0.00995, abs=1e-4) - assert googl_log_returns[0] is None - assert googl_log_returns[1] == pytest.approx(-0.00251, abs=1e-4) - - -def test_add_equity_bars_returns_and_realized_volatility_columns_null_prices_handled() -> ( # noqa: E501 - None -): - equity_bars = pl.DataFrame( - { - "ticker": ["AAPL"] * 35, - "timestamp": [ - datetime(2024, 1, i + 1, tzinfo=UTC).timestamp() for i in range(31) - ] - + [datetime(2024, 2, i + 1, tzinfo=UTC).timestamp() for i in range(4)], - "close_price": [ - 100.0, - None, - 102.0, - *list(range(103, 135)), - ], - } - ) - - result = add_equity_bars_returns_and_realized_volatility_columns(equity_bars) - - daily_returns = result["daily_returns"].to_list() - assert daily_returns[1] is None # second row should be null - - -def test_add_portfolio_performance_columns_long_position_outperforming() -> None: - base_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() - - positions = pl.DataFrame( - { - "ticker": ["AAPL"], - "timestamp": [base_timestamp], - "side": ["LONG"], - "dollar_amount": [1000.0], - "action": ["UNSPECIFIED"], - } - ) - - original_predictions = pl.DataFrame( - { - "ticker": ["AAPL"], - "timestamp": [base_timestamp], - "quantile_10": [-0.05], # -5% lower threshold - "quantile_90": [0.15], # +15% upper threshold - } - ) - - raw_equity_bars = pl.DataFrame( - { - "ticker": ["AAPL"] * 30, - "timestamp": [base_timestamp + (i * 86400) for i in range(30)], - "close_price": [100.0 + (20.0 * i / 29) for i in range(30)], - } - ) - - original_equity_bars = add_equity_bars_returns_and_realized_volatility_columns( - raw_equity_bars - ) - current_timestamp = datetime.fromtimestamp(base_timestamp + (29 * 86400), tz=UTC) - - result = add_portfolio_performance_columns( - positions, original_predictions, original_equity_bars, current_timestamp - ) - - assert result["action"][0] == "MAINTAIN_POSITION" # 20% > 15% threshold - - -def test_add_portfolio_performance_columns_long_position_underperforming() -> None: - base_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() - - positions = pl.DataFrame( - { - "ticker": ["AAPL"], - "timestamp": [base_timestamp], - "side": ["LONG"], - "dollar_amount": [1000.0], - "action": ["UNSPECIFIED"], - } - ) - - original_predictions = pl.DataFrame( - { - "ticker": ["AAPL"], - "timestamp": [base_timestamp], - "quantile_10": [-0.05], # -5% lower threshold - "quantile_90": [0.15], # +15% upper threshold - } - ) - - raw_equity_bars = pl.DataFrame( - { - "ticker": ["AAPL"] * 30, - "timestamp": [base_timestamp + (i * 86400) for i in range(30)], - "close_price": [100.0 - (10.0 * i / 29) for i in range(30)], - } - ) - - original_equity_bars = add_equity_bars_returns_and_realized_volatility_columns( - raw_equity_bars - ) - current_timestamp = datetime.fromtimestamp(base_timestamp + (29 * 86400), tz=UTC) - - result = add_portfolio_performance_columns( - positions, original_predictions, original_equity_bars, current_timestamp - ) - - assert result["action"][0] == "CLOSE_POSITION" # -10% < -5% threshold - - -def test_add_portfolio_performance_columns_short_position_outperforming() -> None: - base_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() - - positions = pl.DataFrame( - { - "ticker": ["AAPL"], - "timestamp": [base_timestamp], - "side": ["SHORT"], - "dollar_amount": [1000.0], - "action": ["UNSPECIFIED"], - } - ) - - original_predictions = pl.DataFrame( - { - "ticker": ["AAPL"], - "timestamp": [base_timestamp], - "quantile_10": [-0.05], # -5% lower threshold - "quantile_90": [0.15], # +15% upper threshold - } - ) - - raw_equity_bars = pl.DataFrame( - { - "ticker": ["AAPL"] * 30, - "timestamp": [base_timestamp + (i * 86400) for i in range(30)], - "close_price": [100.0 - (10.0 * i / 29) for i in range(30)], - } - ) - - original_equity_bars = add_equity_bars_returns_and_realized_volatility_columns( - raw_equity_bars - ) - current_timestamp = datetime.fromtimestamp(base_timestamp + (29 * 86400), tz=UTC) - - result = add_portfolio_performance_columns( - positions, original_predictions, original_equity_bars, current_timestamp - ) - - assert ( - result["action"][0] == "MAINTAIN_POSITION" - ) # -10% <= -5% threshold (good for short) - - -def test_add_portfolio_performance_columns_pdt_locked_position_maintained() -> None: - base_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() - - positions = pl.DataFrame( - { - "ticker": ["AAPL"], - "timestamp": [base_timestamp], - "side": ["LONG"], - "dollar_amount": [1000.0], - "action": ["PDT_LOCKED"], # pdt locked - } - ) - - original_predictions = pl.DataFrame( - { - "ticker": ["AAPL"], - "timestamp": [base_timestamp], - "quantile_10": [-0.05], - "quantile_90": [0.15], - } - ) - - raw_equity_bars = pl.DataFrame( - { - "ticker": ["AAPL"] * 30, - "timestamp": [base_timestamp + (i * 86400) for i in range(30)], - "close_price": [100.0 - (20.0 * i / 29) for i in range(30)], - } - ) - - original_equity_bars = add_equity_bars_returns_and_realized_volatility_columns( - raw_equity_bars - ) - current_timestamp = datetime.fromtimestamp(base_timestamp + (29 * 86400), tz=UTC) - - result = add_portfolio_performance_columns( - positions, original_predictions, original_equity_bars, current_timestamp - ) - - assert result["action"][0] == "PDT_LOCKED" # pdt locked overrides performance - - -def test_add_portfolio_performance_columns_multiple_tickers_independent() -> None: - current_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() - - positions = pl.DataFrame( - { - "ticker": ["AAPL", "GOOGL"], - "timestamp": [current_timestamp, current_timestamp], - "side": ["LONG", "LONG"], - "dollar_amount": [1000.0, 1000.0], - "action": ["UNSPECIFIED", "UNSPECIFIED"], - } - ) - - predictions = pl.DataFrame( - { - "ticker": ["AAPL", "GOOGL"], - "timestamp": [current_timestamp, current_timestamp], - "quantile_10": [-0.05, -0.05], - "quantile_90": [0.15, 0.15], - } - ) - - aapl_data = [] - googl_data = [] - - for i in range(30): - timestamp = current_timestamp + (i * 86400) - aapl_price = 100.0 + (20.0 * i / 29) - googl_price = 200.0 - (20.0 * i / 29) - - aapl_data.append( - {"ticker": "AAPL", "timestamp": timestamp, "close_price": aapl_price} - ) - googl_data.append( - {"ticker": "GOOGL", "timestamp": timestamp, "close_price": googl_price} - ) - - all_data = [] - for i in range(30): - all_data.append(aapl_data[i]) - all_data.append(googl_data[i]) - - raw_equity_bars = pl.DataFrame(all_data) - - equity_bars = add_equity_bars_returns_and_realized_volatility_columns( - raw_equity_bars - ) - - out = add_portfolio_performance_columns( - positions, - predictions, - equity_bars, - datetime.fromtimestamp(current_timestamp + (29 * 86400), tz=UTC), # 30th day - ) - - assert out.filter(pl.col("ticker") == "AAPL")["action"][0] == "MAINTAIN_POSITION" - assert out.filter(pl.col("ticker") == "GOOGL")["action"][0] == "CLOSE_POSITION" - - def test_add_predictions_zscore_ranked_columns_zscore_calculation() -> None: predictions = pl.DataFrame( { @@ -501,251 +68,293 @@ def test_add_predictions_zscore_ranked_columns_single_prediction() -> None: assert result["z_score_return"][0] == 0.0 # single value has z-score of 0 -def test_create_optimal_portfolio_fresh_start_no_existing_positions() -> None: +def test_create_optimal_portfolio_fresh_start_no_prior_tickers() -> None: + """Test portfolio creation with no prior portfolio (fresh start).""" + current_timestamp = datetime(2024, 1, 15, 9, 30, tzinfo=UTC) + + # Create 30 predictions with varying scores predictions = pl.DataFrame( { - "ticker": [f"STOCK{i}" for i in range(25)], - "quantile_10": [0.0] * 25, - "quantile_50": [0.1] * 25, - "quantile_90": [0.2] * 25, - "composite_score": list(range(25, 0, -1)), # descending scores - "inter_quartile_range": [0.1] * 25, # low uncertainty + "ticker": [f"TICK{i:02d}" for i in range(30)], + "quantile_10": [0.0] * 30, + "quantile_50": [i * 0.01 for i in range(30)], # 0%, 1%, 2%, ..., 29% + "quantile_90": [0.05] * 30, # Low uncertainty (IQR = 0.05 < 0.1 threshold) } ) - positions = pl.DataFrame( - { - "ticker": [], - "timestamp": [], - "side": [], - "dollar_amount": [], - "action": [], - } - ) + # Rank and sort predictions + ranked_predictions = add_predictions_zscore_ranked_columns(predictions) result = create_optimal_portfolio( - predictions, positions, 20000.0, datetime.now(tz=UTC) + current_predictions=ranked_predictions, + prior_portfolio_tickers=[], # No prior portfolio + maximum_capital=10000.0, + current_timestamp=current_timestamp, ) - assert len(result) == 20 # 10 long + 10 short # noqa: PLR2004 + # Should create 20 positions (10 long, 10 short) + assert len(result) == 20 # noqa: PLR2004 assert result.filter(pl.col("side") == "LONG").height == 10 # noqa: PLR2004 assert result.filter(pl.col("side") == "SHORT").height == 10 # noqa: PLR2004 - long_total = result.filter(pl.col("side") == "LONG")["dollar_amount"].sum() - short_total = result.filter(pl.col("side") == "SHORT")["dollar_amount"].sum() - assert abs(long_total - short_total) < 0.01 # noqa: PLR2004 + # All positions should have action=OPEN_POSITION + assert all(action == "OPEN_POSITION" for action in result["action"].to_list()) + # Equal dollar allocation: 50% to longs, 50% to shorts + long_capital = result.filter(pl.col("side") == "LONG")["dollar_amount"].sum() + short_capital = result.filter(pl.col("side") == "SHORT")["dollar_amount"].sum() + assert long_capital == pytest.approx(5000.0) + assert short_capital == pytest.approx(5000.0) -def test_create_optimal_portfolio_some_maintained_positions() -> None: - predictions = pl.DataFrame( - { - "ticker": [f"STOCK{i}" for i in range(25)], - "quantile_10": [0.0] * 25, - "quantile_50": [0.1] * 25, - "quantile_90": [0.2] * 25, - "composite_score": list(range(25, 0, -1)), - "inter_quartile_range": [0.1] * 25, - } + # Each position should get (capital / 2) / 10 + expected_amount = 500.0 + assert all( + amount == pytest.approx(expected_amount) + for amount in result["dollar_amount"].to_list() ) - positions = pl.DataFrame( + # Top 10 should be long (highest composite scores) + long_tickers = result.filter(pl.col("side") == "LONG")["ticker"].to_list() + expected_long = [f"TICK{i:02d}" for i in range(29, 19, -1)] # TICK29 to TICK20 + assert set(long_tickers) == set(expected_long) + + # Bottom 10 should be short (lowest composite scores) + short_tickers = result.filter(pl.col("side") == "SHORT")["ticker"].to_list() + expected_short = [f"TICK{i:02d}" for i in range(10)] # TICK00 to TICK09 + assert set(short_tickers) == set(expected_short) + + +def test_create_optimal_portfolio_with_prior_ticker_exclusion() -> None: + """Test that prior portfolio tickers are excluded to avoid PDT violations.""" + current_timestamp = datetime(2024, 1, 15, 9, 30, tzinfo=UTC) + + # Create 30 predictions + predictions = pl.DataFrame( { - "ticker": ["STOCK1", "STOCK2", "STOCK24"], - "timestamp": [ - datetime(2024, 1, 10, tzinfo=UTC).timestamp(), - datetime(2024, 1, 11, tzinfo=UTC).timestamp(), - datetime(2024, 1, 12, tzinfo=UTC).timestamp(), - ], - "side": ["LONG", "LONG", "SHORT"], - "dollar_amount": [1000.0, 1000.0, 1000.0], - "action": ["MAINTAIN_POSITION", "MAINTAIN_POSITION", "MAINTAIN_POSITION"], + "ticker": [f"TICK{i:02d}" for i in range(30)], + "quantile_10": [0.0] * 30, + "quantile_50": [i * 0.01 for i in range(30)], + "quantile_90": [0.05] * 30, # Low uncertainty (IQR = 0.05 < 0.1 threshold) } ) + # Rank and sort predictions + ranked_predictions = add_predictions_zscore_ranked_columns(predictions) + + # Exclude the top 5 tickers from prior portfolio + prior_tickers = ["TICK29", "TICK28", "TICK27", "TICK26", "TICK25"] + result = create_optimal_portfolio( - predictions, positions, 20000.0, datetime.now(tz=UTC) + current_predictions=ranked_predictions, + prior_portfolio_tickers=prior_tickers, + maximum_capital=10000.0, + current_timestamp=current_timestamp, ) + # Should still create 20 positions assert len(result) == 20 # noqa: PLR2004 - assert "STOCK1" in result["ticker"].to_list() - assert "STOCK2" in result["ticker"].to_list() - assert "STOCK24" in result["ticker"].to_list() + # None of the prior tickers should appear in the new portfolio + result_tickers = result["ticker"].to_list() + for ticker in prior_tickers: + assert ticker not in result_tickers -def test_create_optimal_portfolio_high_uncertainty_exclusions() -> None: - predictions = pl.DataFrame( - { - "ticker": ["HIGH_UNCERT", "LOW_UNCERT1", "LOW_UNCERT2"], - "quantile_10": [0.0, 0.0, 0.0], - "quantile_50": [0.1, 0.1, 0.1], - "quantile_90": [0.2, 0.2, 0.2], - "composite_score": [10.0, 5.0, 1.0], - "inter_quartile_range": [0.8, 0.1, 0.1], # first one too uncertain - } - ) + # Since top 5 are excluded, next 10 should be long (TICK24 to TICK15) + long_tickers = result.filter(pl.col("side") == "LONG")["ticker"].to_list() + expected_long = [f"TICK{i:02d}" for i in range(24, 14, -1)] + assert set(long_tickers) == set(expected_long) - positions = pl.DataFrame( + +def test_create_optimal_portfolio_unsorted_input() -> None: + """Test that create_optimal_portfolio handles unsorted input correctly.""" + current_timestamp = datetime(2024, 1, 15, 9, 30, tzinfo=UTC) + + # Create predictions intentionally out of order + predictions = pl.DataFrame( { - "ticker": [], - "timestamp": [], - "side": [], - "dollar_amount": [], - "action": [], + "ticker": ["TICK10", "TICK05", "TICK20", "TICK15", "TICK25"] + + [f"TICK{i:02d}" for i in range(30) if i not in [5, 10, 15, 20, 25]], + "quantile_10": [0.0] * 30, + "quantile_50": [0.10, 0.05, 0.20, 0.15, 0.25] + + [i * 0.01 for i in range(30) if i not in [5, 10, 15, 20, 25]], + "quantile_90": [0.05] * 30, # Low uncertainty + "z_score_return": [10.0, 5.0, 20.0, 15.0, 25.0] + + [float(i) for i in range(30) if i not in [5, 10, 15, 20, 25]], + "inter_quartile_range": [0.05] * 30, + "composite_score": [10.0, 5.0, 20.0, 15.0, 25.0] + + [float(i) for i in range(30) if i not in [5, 10, 15, 20, 25]], + "action": ["UNSPECIFIED"] * 30, } ) result = create_optimal_portfolio( - predictions, positions, 20000.0, datetime.now(tz=UTC) + current_predictions=predictions, + prior_portfolio_tickers=[], + maximum_capital=10000.0, + current_timestamp=current_timestamp, ) - assert "HIGH_UNCERT" not in result["ticker"].to_list() - assert len(result) == 2 # only 2 available predictions # noqa: PLR2004 + # Should create 20 positions despite unsorted input + assert len(result) == 20 # noqa: PLR2004 + assert result.filter(pl.col("side") == "LONG").height == 10 # noqa: PLR2004 + assert result.filter(pl.col("side") == "SHORT").height == 10 # noqa: PLR2004 + + # Top 10 should be long (highest composite scores) + long_tickers = result.filter(pl.col("side") == "LONG")["ticker"].to_list() + assert "TICK25" in long_tickers # Highest score should be long + assert "TICK20" in long_tickers + + # Bottom 10 should be short (lowest composite scores) + short_tickers = result.filter(pl.col("side") == "SHORT")["ticker"].to_list() + assert "TICK00" in short_tickers # Lowest scores should be short + + +def test_create_optimal_portfolio_high_uncertainty_exclusions() -> None: + """Test that high uncertainty predictions are excluded. + Note: This test intentionally uses pre-computed ranking columns rather than calling + add_predictions_zscore_ranked_columns to verify portfolio logic in isolation. + """ + current_timestamp = datetime(2024, 1, 15, 9, 30, tzinfo=UTC) -def test_create_optimal_portfolio_all_positions_maintained_no_new_needed() -> None: + # Create 25 predictions: 20 low uncertainty, 5 high uncertainty + high_uncertainty_count = 5 + tickers = [f"TICK{i:02d}" for i in range(25)] predictions = pl.DataFrame( { - "ticker": [f"STOCK{i}" for i in range(25)], + "ticker": tickers, "quantile_10": [0.0] * 25, - "quantile_50": [0.1] * 25, - "quantile_90": [0.2] * 25, - "composite_score": list(range(25, 0, -1)), - "inter_quartile_range": [0.1] * 25, - } - ) - - positions = pl.DataFrame( - { - "ticker": [f"MAINTAINED{i}" for i in range(20)], - "timestamp": [datetime(2024, 1, 10, tzinfo=UTC).timestamp()] * 20, - "side": ["LONG"] * 10 + ["SHORT"] * 10, - "dollar_amount": [500.0] * 20, - "action": ["MAINTAIN_POSITION"] * 20, + "quantile_50": [i * 0.01 for i in range(25)], + # First 5 have high uncertainty (IQR > 0.1), rest have low uncertainty + "quantile_90": [0.50] * high_uncertainty_count + [0.08] * 20, + "z_score_return": [float(i) for i in range(25)], + "inter_quartile_range": [0.50] * high_uncertainty_count + [0.08] * 20, + "composite_score": [ + float(i) / 1.5 if i < high_uncertainty_count else float(i) / 1.08 + for i in range(25) + ], + "action": ["UNSPECIFIED"] * 25, } ) result = create_optimal_portfolio( - predictions, positions, 20000.0, datetime.now(tz=UTC) + current_predictions=predictions, + prior_portfolio_tickers=[], + maximum_capital=10000.0, + current_timestamp=current_timestamp, ) - assert len(result) == 20 # all maintained positions # noqa: PLR2004 - expected_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() - assert all(ts == expected_timestamp for ts in result["timestamp"].to_list()) + # Should create 20 positions from the 20 low-uncertainty tickers + assert len(result) == 20 # noqa: PLR2004 + + # None of the high uncertainty tickers should appear + result_tickers = result["ticker"].to_list() + for i in range(high_uncertainty_count): + assert f"TICK{i:02d}" not in result_tickers + +def test_create_optimal_portfolio_insufficient_after_exclusions() -> None: + """Test that InsufficientPredictionsError is raised when fewer than 20 available.""" + current_timestamp = datetime(2024, 1, 15, 9, 30, tzinfo=UTC) -def test_create_optimal_portfolio_capital_rebalancing_with_closed_positions() -> None: + # Create 25 predictions: 15 high uncertainty, 5 prior portfolio, only 5 available predictions = pl.DataFrame( { - "ticker": [f"NEW{i}" for i in range(15)], - "quantile_10": [0.0] * 15, - "quantile_50": [0.1] * 15, - "quantile_90": [0.2] * 15, - "composite_score": list(range(15, 0, -1)), - "inter_quartile_range": [0.1] * 15, - } - ) + "ticker": [f"TICK{i:02d}" for i in range(25)], + "quantile_10": [0.0] * 25, + "quantile_50": [i * 0.01 for i in range(25)], + # First 15 have high uncertainty (IQR > 0.1) + "quantile_90": [0.50] * 15 + [0.08] * 10, + "z_score_return": [float(i) for i in range(25)], + "inter_quartile_range": [0.50] * 15 + [0.08] * 10, + "composite_score": [float(i) / 1.5 for i in range(25)], + "action": ["UNSPECIFIED"] * 25, + } + ) + + # Exclude 5 more tickers as prior portfolio (from the low-uncertainty ones) + prior_tickers = [f"TICK{i:02d}" for i in range(15, 20)] + + # Should raise InsufficientPredictionsError (only 5 available, need 20) + with pytest.raises(InsufficientPredictionsError) as exc_info: + create_optimal_portfolio( + current_predictions=predictions, + prior_portfolio_tickers=prior_tickers, + maximum_capital=10000.0, + current_timestamp=current_timestamp, + ) - positions = pl.DataFrame( + assert "Only 5 predictions available" in str(exc_info.value) + assert "need 20" in str(exc_info.value) + + +@pytest.mark.parametrize("capital", [10000.0, 25000.0, 50000.0]) +def test_create_optimal_portfolio_equal_capital_allocation(capital: float) -> None: + """Test that capital is allocated equally across positions.""" + current_timestamp = datetime(2024, 1, 15, 9, 30, tzinfo=UTC) + + predictions = pl.DataFrame( { - "ticker": ["MAINTAINED1", "MAINTAINED2", "CLOSED1", "CLOSED2"], - "timestamp": [ - datetime(2024, 1, 10, tzinfo=UTC).timestamp(), - datetime(2024, 1, 11, tzinfo=UTC).timestamp(), - datetime(2024, 1, 12, tzinfo=UTC).timestamp(), - datetime(2024, 1, 13, tzinfo=UTC).timestamp(), - ], - "side": ["LONG", "SHORT", "LONG", "SHORT"], - "dollar_amount": [800.0, 1200.0, 500.0, 500.0], # uneven amounts - "action": [ - "MAINTAIN_POSITION", - "MAINTAIN_POSITION", - "CLOSE_POSITION", - "CLOSE_POSITION", - ], + "ticker": [f"TICK{i:02d}" for i in range(30)], + "quantile_10": [0.0] * 30, + "quantile_50": [i * 0.01 for i in range(30)], + "quantile_90": [0.05] * 30, # Low uncertainty (IQR = 0.05 < 0.1 threshold) } ) + # Rank and sort predictions + ranked_predictions = add_predictions_zscore_ranked_columns(predictions) + result = create_optimal_portfolio( - predictions, positions, 20000.0, datetime.now(tz=UTC) + current_predictions=ranked_predictions, + prior_portfolio_tickers=[], + maximum_capital=capital, + current_timestamp=current_timestamp, ) - # 2 maintained + 15 new (limited by available predictions) - # even though this isn't a realistic scenario - assert len(result) == 17 # noqa: PLR2004 + expected_per_position = capital / 20 + for amount in result["dollar_amount"].to_list(): + assert amount == pytest.approx(expected_per_position) - maintained = result.filter( - pl.col("timestamp").is_in( - [ - datetime(2024, 1, 10, tzinfo=UTC).timestamp(), - datetime(2024, 1, 11, tzinfo=UTC).timestamp(), - ] - ) - ) - assert len(maintained) == 2 # noqa: PLR2004 + # Long and short should be equal + long_sum = result.filter(pl.col("side") == "LONG")["dollar_amount"].sum() + short_sum = result.filter(pl.col("side") == "SHORT")["dollar_amount"].sum() + assert long_sum == pytest.approx(short_sum) + assert long_sum == pytest.approx(capital / 2) - long_total = result.filter(pl.col("side") == "LONG")["dollar_amount"].sum() - short_total = result.filter(pl.col("side") == "SHORT")["dollar_amount"].sum() - assert abs(long_total - short_total) < 0.01 # noqa: PLR2004 +def test_create_optimal_portfolio_head_tail_selection() -> None: + """Test that top 10 are long, bottom 10 are short based on composite scores.""" + current_timestamp = datetime(2024, 1, 15, 9, 30, tzinfo=UTC) + + # Create predictions with known composite scores (via quantile_50 values) + tickers = [f"TICK{i:02d}" for i in range(30)] -def test_create_optimal_portfolio_mixed_closed_and_maintained_positions() -> None: predictions = pl.DataFrame( { - "ticker": [f"STOCK{i:02d}" for i in range(30)], + "ticker": tickers, "quantile_10": [0.0] * 30, - "quantile_50": [0.05] * 30, - "quantile_90": [0.1] * 30, - "composite_score": list(range(30, 0, -1)), - "inter_quartile_range": [0.05] - * 30, # all acceptable uncertainty (below 0.1 threshold) + "quantile_50": [i * 0.01 for i in range(30)], # 0, 0.01, 0.02, ..., 0.29 + "quantile_90": [0.05] * 30, # Low uncertainty (IQR = 0.05 < 0.1 threshold) } ) - positions = pl.DataFrame( - { - "ticker": ["OLD1", "OLD2", "OLD3", "OLD4", "OLD5"], - "timestamp": [ - datetime(2024, 1, 10, tzinfo=UTC).timestamp(), - datetime(2024, 1, 11, tzinfo=UTC).timestamp(), - datetime(2024, 1, 12, tzinfo=UTC).timestamp(), - datetime(2024, 1, 13, tzinfo=UTC).timestamp(), - datetime(2024, 1, 14, tzinfo=UTC).timestamp(), - ], - "side": ["LONG", "LONG", "SHORT", "SHORT", "LONG"], - "dollar_amount": [1000.0, 1000.0, 1000.0, 1000.0, 1000.0], - "action": [ - "CLOSE_POSITION", - "MAINTAIN_POSITION", - "MAINTAIN_POSITION", - "CLOSE_POSITION", - "MAINTAIN_POSITION", - ], - } - ) + # Rank and sort predictions (will sort by composite score descending) + ranked_predictions = add_predictions_zscore_ranked_columns(predictions) result = create_optimal_portfolio( - predictions, - positions, - 20000.0, - datetime.now(tz=UTC), + current_predictions=ranked_predictions, + prior_portfolio_tickers=[], + maximum_capital=10000.0, + current_timestamp=current_timestamp, ) - assert len(result) == 20 # noqa: PLR2004 - - maintained_tickers = ["OLD2", "OLD3", "OLD5"] - for ticker in maintained_tickers: - assert ticker in result["ticker"].to_list() - - closed_tickers = ["OLD1", "OLD4"] - for ticker in closed_tickers: - assert ticker not in result["ticker"].to_list() - - assert "ticker" in result.columns - assert "timestamp" in result.columns - assert "side" in result.columns - assert "dollar_amount" in result.columns - assert "action" in result.columns - assert len(result.columns) == 5 # noqa: PLR2004 + # Top 10 (highest composite scores: 29, 28, ..., 20) should be LONG + long_tickers = result.filter(pl.col("side") == "LONG")["ticker"].to_list() + expected_long = [f"TICK{i:02d}" for i in range(29, 19, -1)] + assert set(long_tickers) == set(expected_long) - sorted_result = result.sort(["ticker", "side"]) - assert sorted_result.equals(result) + # Bottom 10 (lowest composite scores: 0, 1, ..., 9) should be SHORT + short_tickers = result.filter(pl.col("side") == "SHORT")["ticker"].to_list() + expected_short = [f"TICK{i:02d}" for i in range(10)] + assert set(short_tickers) == set(expected_short) diff --git a/uv.lock b/uv.lock index 21685e6df..933caa4db 100644 --- a/uv.lock +++ b/uv.lock @@ -10,9 +10,9 @@ resolution-markers = [ [manifest] members = [ "equitypricemodel", + "fund", "infrastructure", "internal", - "oscm", "portfoliomanager", "tools", ] @@ -729,6 +729,48 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/01/c9/97cc5aae1648dcb851958a3ddf73ccd7dbe5650d95203ecb4d7720b4cdbf/fsspec-2026.1.0-py3-none-any.whl", hash = "sha256:cb76aa913c2285a3b49bdd5fc55b1d7c708d7208126b60f2eb8194fe1b4cbdcc", size = 201838, upload-time = "2026-01-09T15:21:34.041Z" }, ] +[[package]] +name = "fund" +version = "20250602.4" +source = { virtual = "." } +dependencies = [ + { name = "fastapi" }, + { name = "internal" }, + { name = "numpy" }, + { name = "requests" }, + { name = "sagemaker", version = "3.0.1", source = { registry = "https://pypi.org/simple" }, marker = "sys_platform == 'win32'" }, + { name = "sagemaker", version = "3.4.0", source = { registry = "https://pypi.org/simple" }, marker = "sys_platform != 'win32'" }, + { name = "structlog" }, + { name = "tinygrad" }, + { name = "uvicorn" }, +] + +[package.dev-dependencies] +dev = [ + { name = "behave" }, + { name = "coverage" }, + { name = "pytest" }, +] + +[package.metadata] +requires-dist = [ + { name = "fastapi", specifier = ">=0.121.0" }, + { name = "internal", editable = "libraries/python" }, + { name = "numpy", specifier = ">=1.26.4" }, + { name = "requests", specifier = ">=2.32.5" }, + { name = "sagemaker", specifier = ">=2.256.0" }, + { name = "structlog", specifier = ">=25.5.0" }, + { name = "tinygrad", specifier = ">=0.10.3" }, + { name = "uvicorn", specifier = ">=0.35.0" }, +] + +[package.metadata.requires-dev] +dev = [ + { name = "behave", specifier = ">=1.2.6" }, + { name = "coverage", specifier = ">=7.8.0" }, + { name = "pytest", specifier = ">=8.3.5" }, +] + [[package]] name = "gevent" version = "25.9.1" @@ -1721,48 +1763,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/12/27/fb8d7338b4d551900fa3e580acbe7a0cf655d940e164cb5c00ec31961094/orderly_set-5.5.0-py3-none-any.whl", hash = "sha256:46f0b801948e98f427b412fcabb831677194c05c3b699b80de260374baa0b1e7", size = 13068, upload-time = "2025-07-10T20:10:54.377Z" }, ] -[[package]] -name = "oscm" -version = "20250602.4" -source = { virtual = "." } -dependencies = [ - { name = "fastapi" }, - { name = "internal" }, - { name = "numpy" }, - { name = "requests" }, - { name = "sagemaker", version = "3.0.1", source = { registry = "https://pypi.org/simple" }, marker = "sys_platform == 'win32'" }, - { name = "sagemaker", version = "3.4.0", source = { registry = "https://pypi.org/simple" }, marker = "sys_platform != 'win32'" }, - { name = "structlog" }, - { name = "tinygrad" }, - { name = "uvicorn" }, -] - -[package.dev-dependencies] -dev = [ - { name = "behave" }, - { name = "coverage" }, - { name = "pytest" }, -] - -[package.metadata] -requires-dist = [ - { name = "fastapi", specifier = ">=0.121.0" }, - { name = "internal", editable = "libraries/python" }, - { name = "numpy", specifier = ">=1.26.4" }, - { name = "requests", specifier = ">=2.32.5" }, - { name = "sagemaker", specifier = ">=2.256.0" }, - { name = "structlog", specifier = ">=25.5.0" }, - { name = "tinygrad", specifier = ">=0.10.3" }, - { name = "uvicorn", specifier = ">=0.35.0" }, -] - -[package.metadata.requires-dev] -dev = [ - { name = "behave", specifier = ">=1.2.6" }, - { name = "coverage", specifier = ">=7.8.0" }, - { name = "pytest", specifier = ">=8.3.5" }, -] - [[package]] name = "packaging" version = "25.0"