From e892c509a5662d44998314b8b03ca14d0e4b0c76 Mon Sep 17 00:00:00 2001 From: John Forstmeier Date: Thu, 5 Feb 2026 21:22:02 -0500 Subject: [PATCH 1/5] Add portfolio rebalancing logic and update Alpaca orders to use notional values MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit implements automatic portfolio rebalancing when stop-loss triggers create position count imbalances, and updates the Alpaca client to use dollar amounts (notional) instead of share quantities for orders. ## Changes ### Portfolio Rebalancing (risk_management.py) - Added automatic rebalancing logic in add_portfolio_performance_columns() - When longs hit stop-loss, system closes equal number of best-performing shorts - When shorts hit stop-loss, system closes equal number of best-performing longs - Rebalancing respects PDT-locked positions and existing closure decisions - Best performers selected by cumulative return to lock in profits - Enables existing create_optimal_portfolio() to maintain 10/10 position balance ### Alpaca Client Updates (alpaca_client.py) - Changed open_position() to use 'notional' parameter instead of 'qty' - Removed _get_current_price() method and associated quote fetching - Removed unused StockLatestQuoteRequest import - Simplifies order submission and leverages Alpaca's fractional share handling - Reduces API calls (no longer needs price quotes before orders) ### Testing - Added 4 comprehensive rebalancing tests in test_risk_management.py - Test coverage: 96% for risk_management.py - All 83 tests passing ## Root Cause & Fix Previously, when stop-loss triggers caused asymmetric position closures (e.g., 3 longs closed but 0 shorts), the portfolio would fail schema validation (requires exactly 10 long + 10 short positions). The system would struggle to find enough quality predictions to fill all 10 positions on the depleted side. Solution: When position closures are imbalanced, automatically close an equal number of positions from the opposite side. This: 1. Locks in profits from best performers 2. Frees capital symmetrically 3. Enables create_optimal_portfolio() to open 3 new longs + 3 new shorts 4. Maintains 10/10 balance and ±5% dollar balance requirements The Alpaca notional change simplifies order execution and ensures exact dollar allocations without rounding errors from quantity calculations. Co-Authored-By: Claude Sonnet 4.5 --- .../src/portfoliomanager/alpaca_client.py | 67 +---- .../src/portfoliomanager/risk_management.py | 84 +++++- .../tests/test_risk_management.py | 269 ++++++++++++++++++ uv.lock | 86 +++--- 4 files changed, 399 insertions(+), 107 deletions(-) diff --git a/applications/portfoliomanager/src/portfoliomanager/alpaca_client.py b/applications/portfoliomanager/src/portfoliomanager/alpaca_client.py index 1007b6daa..05651a72a 100644 --- a/applications/portfoliomanager/src/portfoliomanager/alpaca_client.py +++ b/applications/portfoliomanager/src/portfoliomanager/alpaca_client.py @@ -4,7 +4,6 @@ import structlog from alpaca.common.exceptions import APIError from alpaca.data import StockHistoricalDataClient -from alpaca.data.requests import StockLatestQuoteRequest from alpaca.trading import ( ClosePositionRequest, OrderRequest, @@ -61,74 +60,18 @@ def get_account(self) -> AlpacaAccount: buying_power=float(cast("str", account.buying_power)), ) - def _get_current_price(self, ticker: str, side: TradeSide) -> float: - """Get current price for a ticker based on trade side. - - Uses ask price for buys, bid price for sells. - Falls back to the opposite price if the primary price is unavailable. - """ - request = StockLatestQuoteRequest(symbol_or_symbols=ticker.upper()) - quotes = self.data_client.get_stock_latest_quote(request) - quote = quotes.get(ticker.upper()) - - if quote is None: - message = f"No quote returned for {ticker}" - raise ValueError(message) - - ask_price = ( - float(quote.ask_price) - if quote.ask_price is not None and quote.ask_price > 0 - else 0.0 - ) - bid_price = ( - float(quote.bid_price) - if quote.bid_price is not None and quote.bid_price > 0 - else 0.0 - ) - - if side == TradeSide.BUY: - if ask_price > 0: - return ask_price - if bid_price > 0: - logger.warning( - "Ask price unavailable, using bid price as fallback", - ticker=ticker, - side=side.value, - bid_price=bid_price, - ) - return bid_price - message = f"No valid price for {ticker}: ask and bid are 0" - raise ValueError(message) - - if bid_price > 0: - return bid_price - if ask_price > 0: - logger.warning( - "Bid price unavailable, using ask price as fallback", - ticker=ticker, - side=side.value, - ask_price=ask_price, - ) - return ask_price - message = f"No valid price for {ticker}: bid and ask are 0" - raise ValueError(message) - def open_position( self, ticker: str, side: TradeSide, dollar_amount: float, ) -> None: - # Calculate quantity from dollar amount and current price - # Allow fractional shares where supported by the brokerage - current_price = self._get_current_price(ticker, side) - qty = dollar_amount / current_price - - if qty <= 0: + # Use notional (dollar amount) for order submission + # This allows Alpaca to handle fractional shares automatically + if dollar_amount <= 0: message = ( f"Cannot open position for {ticker}: " - f"non-positive quantity calculated from dollar_amount {dollar_amount} " - f"and price {current_price}" + f"non-positive dollar_amount {dollar_amount}" ) raise ValueError(message) @@ -136,7 +79,7 @@ def open_position( self.trading_client.submit_order( order_data=OrderRequest( symbol=ticker.upper(), - qty=qty, + notional=dollar_amount, side=OrderSide(side.value.lower()), type=OrderType.MARKET, time_in_force=TimeInForce.DAY, diff --git a/applications/portfoliomanager/src/portfoliomanager/risk_management.py b/applications/portfoliomanager/src/portfoliomanager/risk_management.py index 1bb6f32b9..6917b7243 100644 --- a/applications/portfoliomanager/src/portfoliomanager/risk_management.py +++ b/applications/portfoliomanager/src/portfoliomanager/risk_management.py @@ -145,7 +145,7 @@ def add_portfolio_performance_columns( how="left", ) - return prior_portfolio_with_data.with_columns( + portfolio_with_actions = prior_portfolio_with_data.with_columns( pl.when(pl.col("action") == PositionAction.PDT_LOCKED.value) .then(pl.lit(PositionAction.PDT_LOCKED.value)) .when( @@ -187,7 +187,87 @@ def add_portfolio_performance_columns( .then(pl.lit(PositionAction.MAINTAIN_POSITION.value)) .otherwise(pl.lit(PositionAction.UNSPECIFIED.value)) .alias("action") - ).drop( + ) + + # Rebalancing logic: if one side has more closures than the other, + # close equal number of best performers from the opposite side + closed_long_count = portfolio_with_actions.filter( + (pl.col("side") == PositionSide.LONG.value) + & (pl.col("action") == PositionAction.CLOSE_POSITION.value) + ).height + + closed_short_count = portfolio_with_actions.filter( + (pl.col("side") == PositionSide.SHORT.value) + & (pl.col("action") == PositionAction.CLOSE_POSITION.value) + ).height + + # If more longs are being closed, close equal number of best-performing shorts + if closed_long_count > closed_short_count: + shorts_to_rebalance = closed_long_count - closed_short_count + + # Select best-performing shorts (most negative cumulative return = best gain) + # Consider positions that are not already being closed and not PDT locked + best_shorts = ( + portfolio_with_actions.filter( + (pl.col("side") == PositionSide.SHORT.value) + & (pl.col("action") != PositionAction.CLOSE_POSITION.value) + & (pl.col("action") != PositionAction.PDT_LOCKED.value) + ) + .sort("cumulative_simple_return", descending=False) + .head(shorts_to_rebalance) + .select("ticker") + ) + + if best_shorts.height > 0: + logger.info( + "Rebalancing portfolio by closing shorts", + closed_longs=closed_long_count, + closed_shorts=closed_short_count, + additional_shorts_to_close=shorts_to_rebalance, + shorts_being_closed=best_shorts.to_series().to_list(), + ) + + portfolio_with_actions = portfolio_with_actions.with_columns( + pl.when(pl.col("ticker").is_in(best_shorts["ticker"])) + .then(pl.lit(PositionAction.CLOSE_POSITION.value)) + .otherwise(pl.col("action")) + .alias("action") + ) + + # If more shorts are being closed, close equal number of best-performing longs + elif closed_short_count > closed_long_count: + longs_to_rebalance = closed_short_count - closed_long_count + + # Select best-performing longs (most positive cumulative return = best gain) + # Consider positions that are not already being closed and not PDT locked + best_longs = ( + portfolio_with_actions.filter( + (pl.col("side") == PositionSide.LONG.value) + & (pl.col("action") != PositionAction.CLOSE_POSITION.value) + & (pl.col("action") != PositionAction.PDT_LOCKED.value) + ) + .sort("cumulative_simple_return", descending=True) + .head(longs_to_rebalance) + .select("ticker") + ) + + if best_longs.height > 0: + logger.info( + "Rebalancing portfolio by closing longs", + closed_longs=closed_long_count, + closed_shorts=closed_short_count, + additional_longs_to_close=longs_to_rebalance, + longs_being_closed=best_longs.to_series().to_list(), + ) + + portfolio_with_actions = portfolio_with_actions.with_columns( + pl.when(pl.col("ticker").is_in(best_longs["ticker"])) + .then(pl.lit(PositionAction.CLOSE_POSITION.value)) + .otherwise(pl.col("action")) + .alias("action") + ) + + return portfolio_with_actions.drop( [ "original_lower_threshold", "original_upper_threshold", diff --git a/applications/portfoliomanager/tests/test_risk_management.py b/applications/portfoliomanager/tests/test_risk_management.py index 2f09dc902..0ba76dd3c 100644 --- a/applications/portfoliomanager/tests/test_risk_management.py +++ b/applications/portfoliomanager/tests/test_risk_management.py @@ -749,3 +749,272 @@ def test_create_optimal_portfolio_mixed_closed_and_maintained_positions() -> Non sorted_result = result.sort(["ticker", "side"]) assert sorted_result.equals(result) + + +def test_add_portfolio_performance_columns_rebalancing_closes_shorts_when_longs_stopped() -> ( # noqa: E501 + None +): + """Test that when longs hit stop-loss, best-performing shorts are closed for rebalancing.""" # noqa: E501 + base_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() + + positions = pl.DataFrame( + { + "ticker": ["LONG1", "LONG2", "LONG3", "SHORT1", "SHORT2", "SHORT3"], + "timestamp": [base_timestamp] * 6, + "side": ["LONG", "LONG", "LONG", "SHORT", "SHORT", "SHORT"], + "dollar_amount": [1000.0] * 6, + "action": ["UNSPECIFIED"] * 6, + } + ) + + predictions = pl.DataFrame( + { + "ticker": ["LONG1", "LONG2", "LONG3", "SHORT1", "SHORT2", "SHORT3"], + "timestamp": [base_timestamp] * 6, + "quantile_10": [-0.05] * 6, + "quantile_90": [0.15] * 6, + } + ) + + raw_equity_bars = [] + for ticker in ["LONG1", "LONG2", "LONG3", "SHORT1", "SHORT2", "SHORT3"]: + for i in range(30): + timestamp = base_timestamp + (i * 86400) + if ticker in ["LONG1", "LONG2", "LONG3"]: + price = 100.0 - (10.0 * i / 29) # longs losing -10% + elif ticker == "SHORT1": + price = 100.0 + (20.0 * i / 29) # short1 gaining +20% (worst for short) + elif ticker == "SHORT2": + price = 100.0 + (25.0 * i / 29) # short2 gaining +25% (best performer) + else: # SHORT3 + price = 100.0 + (15.0 * i / 29) # short3 gaining +15% + + raw_equity_bars.append( + {"ticker": ticker, "timestamp": timestamp, "close_price": price} + ) + + equity_bars = add_equity_bars_returns_and_realized_volatility_columns( + pl.DataFrame(raw_equity_bars) + ) + current_timestamp = datetime.fromtimestamp(base_timestamp + (29 * 86400), tz=UTC) + + result = add_portfolio_performance_columns( + positions, predictions, equity_bars, current_timestamp + ) + + # 3 longs should be closed (hit stop-loss at -10% < -5%) + closed_longs = result.filter( + (pl.col("side") == "LONG") & (pl.col("action") == "CLOSE_POSITION") + ) + assert closed_longs.height == 3 # noqa: PLR2004 + + # 3 shorts should also be closed for rebalancing (best performers: SHORT2, SHORT1, SHORT3) # noqa: E501 + closed_shorts = result.filter( + (pl.col("side") == "SHORT") & (pl.col("action") == "CLOSE_POSITION") + ) + assert closed_shorts.height == 3 # noqa: PLR2004 + + # Verify that best-performing shorts (most negative cumulative return) were closed + # SHORT2 has worst performance for short (+25% = bad), so most negative return + closed_short_tickers = closed_shorts["ticker"].to_list() + assert "SHORT2" in closed_short_tickers + assert "SHORT1" in closed_short_tickers + assert "SHORT3" in closed_short_tickers + + +def test_add_portfolio_performance_columns_rebalancing_closes_longs_when_shorts_stopped() -> ( # noqa: E501 + None +): + """Test that when shorts hit stop-loss, best-performing longs are closed for rebalancing.""" # noqa: E501 + base_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() + + positions = pl.DataFrame( + { + "ticker": ["LONG1", "LONG2", "LONG3", "SHORT1", "SHORT2"], + "timestamp": [base_timestamp] * 5, + "side": ["LONG", "LONG", "LONG", "SHORT", "SHORT"], + "dollar_amount": [1000.0] * 5, + "action": ["UNSPECIFIED"] * 5, + } + ) + + predictions = pl.DataFrame( + { + "ticker": ["LONG1", "LONG2", "LONG3", "SHORT1", "SHORT2"], + "timestamp": [base_timestamp] * 5, + "quantile_10": [-0.05] * 5, + "quantile_90": [0.15] * 5, + } + ) + + raw_equity_bars = [] + for ticker in ["LONG1", "LONG2", "LONG3", "SHORT1", "SHORT2"]: + for i in range(30): + timestamp = base_timestamp + (i * 86400) + if ticker == "LONG1": + price = 100.0 + (10.0 * i / 29) # long1 gaining +10% + elif ticker == "LONG2": + price = 100.0 + (20.0 * i / 29) # long2 gaining +20% (best performer) + elif ticker == "LONG3": + price = 100.0 + (5.0 * i / 29) # long3 gaining +5% + else: # SHORT1, SHORT2 + price = 100.0 + (20.0 * i / 29) # shorts gaining +20% (hit stop-loss) + + raw_equity_bars.append( + {"ticker": ticker, "timestamp": timestamp, "close_price": price} + ) + + equity_bars = add_equity_bars_returns_and_realized_volatility_columns( + pl.DataFrame(raw_equity_bars) + ) + current_timestamp = datetime.fromtimestamp(base_timestamp + (29 * 86400), tz=UTC) + + result = add_portfolio_performance_columns( + positions, predictions, equity_bars, current_timestamp + ) + + # 2 shorts should be closed (hit stop-loss at +20% > +15%) + closed_shorts = result.filter( + (pl.col("side") == "SHORT") & (pl.col("action") == "CLOSE_POSITION") + ) + assert closed_shorts.height == 2 # noqa: PLR2004 + + # 2 longs should also be closed for rebalancing (best performers) + closed_longs = result.filter( + (pl.col("side") == "LONG") & (pl.col("action") == "CLOSE_POSITION") + ) + assert closed_longs.height == 2 # noqa: PLR2004 + + # Verify that best-performing longs were closed + closed_long_tickers = closed_longs["ticker"].to_list() + assert "LONG2" in closed_long_tickers # best performer (+20%) + assert "LONG1" in closed_long_tickers # second best (+10%) + + +def test_add_portfolio_performance_columns_no_rebalancing_when_equal_closures() -> None: + """Test that no additional rebalancing occurs when equal positions are closed.""" + base_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() + + positions = pl.DataFrame( + { + "ticker": ["LONG1", "LONG2", "SHORT1", "SHORT2"], + "timestamp": [base_timestamp] * 4, + "side": ["LONG", "LONG", "SHORT", "SHORT"], + "dollar_amount": [1000.0] * 4, + "action": ["UNSPECIFIED"] * 4, + } + ) + + predictions = pl.DataFrame( + { + "ticker": ["LONG1", "LONG2", "SHORT1", "SHORT2"], + "timestamp": [base_timestamp] * 4, + "quantile_10": [-0.05] * 4, + "quantile_90": [0.15] * 4, + } + ) + + raw_equity_bars = [] + for ticker in ["LONG1", "LONG2", "SHORT1", "SHORT2"]: + for i in range(30): + timestamp = base_timestamp + (i * 86400) + if ticker in ["LONG1", "LONG2"]: + price = 100.0 - (10.0 * i / 29) # longs losing -10% + else: # SHORT1, SHORT2 + price = 100.0 + (20.0 * i / 29) # shorts gaining +20% + + raw_equity_bars.append( + {"ticker": ticker, "timestamp": timestamp, "close_price": price} + ) + + equity_bars = add_equity_bars_returns_and_realized_volatility_columns( + pl.DataFrame(raw_equity_bars) + ) + current_timestamp = datetime.fromtimestamp(base_timestamp + (29 * 86400), tz=UTC) + + result = add_portfolio_performance_columns( + positions, predictions, equity_bars, current_timestamp + ) + + # 2 longs and 2 shorts should be closed (no rebalancing needed) + closed_longs = result.filter( + (pl.col("side") == "LONG") & (pl.col("action") == "CLOSE_POSITION") + ).height + closed_shorts = result.filter( + (pl.col("side") == "SHORT") & (pl.col("action") == "CLOSE_POSITION") + ).height + + assert closed_longs == 2 # noqa: PLR2004 + assert closed_shorts == 2 # noqa: PLR2004 + + +def test_add_portfolio_performance_columns_rebalancing_respects_pdt_locked_positions() -> ( # noqa: E501 + None +): + """Test that rebalancing does not close PDT-locked positions.""" + base_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() + + positions = pl.DataFrame( + { + "ticker": ["LONG1", "SHORT1", "SHORT2", "SHORT3"], + "timestamp": [base_timestamp] * 4, + "side": ["LONG", "SHORT", "SHORT", "SHORT"], + "dollar_amount": [1000.0] * 4, + "action": ["UNSPECIFIED", "PDT_LOCKED", "UNSPECIFIED", "UNSPECIFIED"], + } + ) + + predictions = pl.DataFrame( + { + "ticker": ["LONG1", "SHORT1", "SHORT2", "SHORT3"], + "timestamp": [base_timestamp] * 4, + "quantile_10": [-0.05] * 4, + "quantile_90": [0.15] * 4, + } + ) + + raw_equity_bars = [] + for ticker in ["LONG1", "SHORT1", "SHORT2", "SHORT3"]: + for i in range(30): + timestamp = base_timestamp + (i * 86400) + if ticker == "LONG1": + price = 100.0 - (10.0 * i / 29) # long losing -10% + elif ticker == "SHORT1": + price = 100.0 + (30.0 * i / 29) # short1 +30% (best, but PDT locked) + elif ticker == "SHORT2": + price = 100.0 + (20.0 * i / 29) # short2 +20% + else: # SHORT3 + price = 100.0 + (10.0 * i / 29) # short3 +10% + + raw_equity_bars.append( + {"ticker": ticker, "timestamp": timestamp, "close_price": price} + ) + + equity_bars = add_equity_bars_returns_and_realized_volatility_columns( + pl.DataFrame(raw_equity_bars) + ) + current_timestamp = datetime.fromtimestamp(base_timestamp + (29 * 86400), tz=UTC) + + result = add_portfolio_performance_columns( + positions, predictions, equity_bars, current_timestamp + ) + + # 1 long should be closed + closed_longs = result.filter( + (pl.col("side") == "LONG") & (pl.col("action") == "CLOSE_POSITION") + ) + assert closed_longs.height == 1 + + # 1 short should be closed for rebalancing (should not be SHORT1 since it's PDT locked) # noqa: E501 + closed_shorts = result.filter( + (pl.col("side") == "SHORT") & (pl.col("action") == "CLOSE_POSITION") + ) + assert closed_shorts.height == 1 + + # SHORT1 should remain PDT_LOCKED, not be closed + short1_action = result.filter(pl.col("ticker") == "SHORT1")["action"][0] + assert short1_action == "PDT_LOCKED" + + # Either SHORT2 or SHORT3 should be closed (SHORT2 has better performance) + closed_short_ticker = closed_shorts["ticker"][0] + assert closed_short_ticker == "SHORT2" diff --git a/uv.lock b/uv.lock index 21685e6df..933caa4db 100644 --- a/uv.lock +++ b/uv.lock @@ -10,9 +10,9 @@ resolution-markers = [ [manifest] members = [ "equitypricemodel", + "fund", "infrastructure", "internal", - "oscm", "portfoliomanager", "tools", ] @@ -729,6 +729,48 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/01/c9/97cc5aae1648dcb851958a3ddf73ccd7dbe5650d95203ecb4d7720b4cdbf/fsspec-2026.1.0-py3-none-any.whl", hash = "sha256:cb76aa913c2285a3b49bdd5fc55b1d7c708d7208126b60f2eb8194fe1b4cbdcc", size = 201838, upload-time = "2026-01-09T15:21:34.041Z" }, ] +[[package]] +name = "fund" +version = "20250602.4" +source = { virtual = "." } +dependencies = [ + { name = "fastapi" }, + { name = "internal" }, + { name = "numpy" }, + { name = "requests" }, + { name = "sagemaker", version = "3.0.1", source = { registry = "https://pypi.org/simple" }, marker = "sys_platform == 'win32'" }, + { name = "sagemaker", version = "3.4.0", source = { registry = "https://pypi.org/simple" }, marker = "sys_platform != 'win32'" }, + { name = "structlog" }, + { name = "tinygrad" }, + { name = "uvicorn" }, +] + +[package.dev-dependencies] +dev = [ + { name = "behave" }, + { name = "coverage" }, + { name = "pytest" }, +] + +[package.metadata] +requires-dist = [ + { name = "fastapi", specifier = ">=0.121.0" }, + { name = "internal", editable = "libraries/python" }, + { name = "numpy", specifier = ">=1.26.4" }, + { name = "requests", specifier = ">=2.32.5" }, + { name = "sagemaker", specifier = ">=2.256.0" }, + { name = "structlog", specifier = ">=25.5.0" }, + { name = "tinygrad", specifier = ">=0.10.3" }, + { name = "uvicorn", specifier = ">=0.35.0" }, +] + +[package.metadata.requires-dev] +dev = [ + { name = "behave", specifier = ">=1.2.6" }, + { name = "coverage", specifier = ">=7.8.0" }, + { name = "pytest", specifier = ">=8.3.5" }, +] + [[package]] name = "gevent" version = "25.9.1" @@ -1721,48 +1763,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/12/27/fb8d7338b4d551900fa3e580acbe7a0cf655d940e164cb5c00ec31961094/orderly_set-5.5.0-py3-none-any.whl", hash = "sha256:46f0b801948e98f427b412fcabb831677194c05c3b699b80de260374baa0b1e7", size = 13068, upload-time = "2025-07-10T20:10:54.377Z" }, ] -[[package]] -name = "oscm" -version = "20250602.4" -source = { virtual = "." } -dependencies = [ - { name = "fastapi" }, - { name = "internal" }, - { name = "numpy" }, - { name = "requests" }, - { name = "sagemaker", version = "3.0.1", source = { registry = "https://pypi.org/simple" }, marker = "sys_platform == 'win32'" }, - { name = "sagemaker", version = "3.4.0", source = { registry = "https://pypi.org/simple" }, marker = "sys_platform != 'win32'" }, - { name = "structlog" }, - { name = "tinygrad" }, - { name = "uvicorn" }, -] - -[package.dev-dependencies] -dev = [ - { name = "behave" }, - { name = "coverage" }, - { name = "pytest" }, -] - -[package.metadata] -requires-dist = [ - { name = "fastapi", specifier = ">=0.121.0" }, - { name = "internal", editable = "libraries/python" }, - { name = "numpy", specifier = ">=1.26.4" }, - { name = "requests", specifier = ">=2.32.5" }, - { name = "sagemaker", specifier = ">=2.256.0" }, - { name = "structlog", specifier = ">=25.5.0" }, - { name = "tinygrad", specifier = ">=0.10.3" }, - { name = "uvicorn", specifier = ">=0.35.0" }, -] - -[package.metadata.requires-dev] -dev = [ - { name = "behave", specifier = ">=1.2.6" }, - { name = "coverage", specifier = ">=7.8.0" }, - { name = "pytest", specifier = ">=8.3.5" }, -] - [[package]] name = "packaging" version = "25.0" From bef64eafe55c85ac20e3acdc52980e0cb8a1b915 Mon Sep 17 00:00:00 2001 From: John Forstmeier Date: Thu, 5 Feb 2026 21:47:47 -0500 Subject: [PATCH 2/5] Add bot pull request feedback --- .../src/portfoliomanager/risk_management.py | 23 ++++++- .../tests/test_risk_management.py | 68 +++++++++++-------- 2 files changed, 59 insertions(+), 32 deletions(-) diff --git a/applications/portfoliomanager/src/portfoliomanager/risk_management.py b/applications/portfoliomanager/src/portfoliomanager/risk_management.py index 6917b7243..8bfce9c04 100644 --- a/applications/portfoliomanager/src/portfoliomanager/risk_management.py +++ b/applications/portfoliomanager/src/portfoliomanager/risk_management.py @@ -137,7 +137,16 @@ def add_portfolio_performance_columns( } ) - returns = pl.DataFrame(position_returns) + if position_returns: + returns = pl.DataFrame(position_returns) + else: + returns = pl.DataFrame( + schema={ + "ticker": pl.Utf8, + "timestamp": pl.Float64, + "cumulative_simple_return": pl.Float64, + } + ) prior_portfolio_with_data = prior_portfolio_predictions.join( other=returns, @@ -224,11 +233,15 @@ def add_portfolio_performance_columns( closed_longs=closed_long_count, closed_shorts=closed_short_count, additional_shorts_to_close=shorts_to_rebalance, + actual_shorts_closed=best_shorts.height, shorts_being_closed=best_shorts.to_series().to_list(), ) portfolio_with_actions = portfolio_with_actions.with_columns( - pl.when(pl.col("ticker").is_in(best_shorts["ticker"])) + pl.when( + pl.col("ticker").is_in(best_shorts["ticker"]) + & (pl.col("side") == PositionSide.SHORT.value) + ) .then(pl.lit(PositionAction.CLOSE_POSITION.value)) .otherwise(pl.col("action")) .alias("action") @@ -257,11 +270,15 @@ def add_portfolio_performance_columns( closed_longs=closed_long_count, closed_shorts=closed_short_count, additional_longs_to_close=longs_to_rebalance, + actual_longs_closed=best_longs.height, longs_being_closed=best_longs.to_series().to_list(), ) portfolio_with_actions = portfolio_with_actions.with_columns( - pl.when(pl.col("ticker").is_in(best_longs["ticker"])) + pl.when( + pl.col("ticker").is_in(best_longs["ticker"]) + & (pl.col("side") == PositionSide.LONG.value) + ) .then(pl.lit(PositionAction.CLOSE_POSITION.value)) .otherwise(pl.col("action")) .alias("action") diff --git a/applications/portfoliomanager/tests/test_risk_management.py b/applications/portfoliomanager/tests/test_risk_management.py index 0ba76dd3c..6ae5d9f1c 100644 --- a/applications/portfoliomanager/tests/test_risk_management.py +++ b/applications/portfoliomanager/tests/test_risk_management.py @@ -783,11 +783,11 @@ def test_add_portfolio_performance_columns_rebalancing_closes_shorts_when_longs_ if ticker in ["LONG1", "LONG2", "LONG3"]: price = 100.0 - (10.0 * i / 29) # longs losing -10% elif ticker == "SHORT1": - price = 100.0 + (20.0 * i / 29) # short1 gaining +20% (worst for short) + price = 100.0 - (5.0 * i / 29) # short1 falling -5% (best for short) elif ticker == "SHORT2": - price = 100.0 + (25.0 * i / 29) # short2 gaining +25% (best performer) + price = 100.0 - (2.0 * i / 29) # short2 falling -2% (second best) else: # SHORT3 - price = 100.0 + (15.0 * i / 29) # short3 gaining +15% + price = 100.0 - (1.0 * i / 29) # short3 falling -1% (worst) raw_equity_bars.append( {"ticker": ticker, "timestamp": timestamp, "close_price": price} @@ -808,17 +808,19 @@ def test_add_portfolio_performance_columns_rebalancing_closes_shorts_when_longs_ ) assert closed_longs.height == 3 # noqa: PLR2004 - # 3 shorts should also be closed for rebalancing (best performers: SHORT2, SHORT1, SHORT3) # noqa: E501 + # 3 shorts should also be closed by rebalancing + # None of the shorts hit stop-loss (all have negative returns, not >= +15%) + # Rebalancing closes 3 shorts to match the 3 closed longs closed_shorts = result.filter( (pl.col("side") == "SHORT") & (pl.col("action") == "CLOSE_POSITION") ) assert closed_shorts.height == 3 # noqa: PLR2004 - # Verify that best-performing shorts (most negative cumulative return) were closed - # SHORT2 has worst performance for short (+25% = bad), so most negative return + # Verify best-performing shorts were closed (most negative cumulative return) + # SHORT1 (-5%), SHORT2 (-2%), SHORT3 (-1%) all closed by rebalancing closed_short_tickers = closed_shorts["ticker"].to_list() - assert "SHORT2" in closed_short_tickers assert "SHORT1" in closed_short_tickers + assert "SHORT2" in closed_short_tickers assert "SHORT3" in closed_short_tickers @@ -956,35 +958,41 @@ def test_add_portfolio_performance_columns_rebalancing_respects_pdt_locked_posit positions = pl.DataFrame( { - "ticker": ["LONG1", "SHORT1", "SHORT2", "SHORT3"], - "timestamp": [base_timestamp] * 4, - "side": ["LONG", "SHORT", "SHORT", "SHORT"], - "dollar_amount": [1000.0] * 4, - "action": ["UNSPECIFIED", "PDT_LOCKED", "UNSPECIFIED", "UNSPECIFIED"], + "ticker": ["LONG1", "LONG2", "SHORT1", "SHORT2", "SHORT3"], + "timestamp": [base_timestamp] * 5, + "side": ["LONG", "LONG", "SHORT", "SHORT", "SHORT"], + "dollar_amount": [1000.0] * 5, + "action": [ + "UNSPECIFIED", + "UNSPECIFIED", + "PDT_LOCKED", + "UNSPECIFIED", + "UNSPECIFIED", + ], } ) predictions = pl.DataFrame( { - "ticker": ["LONG1", "SHORT1", "SHORT2", "SHORT3"], - "timestamp": [base_timestamp] * 4, - "quantile_10": [-0.05] * 4, - "quantile_90": [0.15] * 4, + "ticker": ["LONG1", "LONG2", "SHORT1", "SHORT2", "SHORT3"], + "timestamp": [base_timestamp] * 5, + "quantile_10": [-0.05] * 5, + "quantile_90": [0.15] * 5, } ) raw_equity_bars = [] - for ticker in ["LONG1", "SHORT1", "SHORT2", "SHORT3"]: + for ticker in ["LONG1", "LONG2", "SHORT1", "SHORT2", "SHORT3"]: for i in range(30): timestamp = base_timestamp + (i * 86400) - if ticker == "LONG1": - price = 100.0 - (10.0 * i / 29) # long losing -10% + if ticker in ["LONG1", "LONG2"]: + price = 100.0 - (10.0 * i / 29) # longs losing -10% elif ticker == "SHORT1": - price = 100.0 + (30.0 * i / 29) # short1 +30% (best, but PDT locked) + price = 100.0 - (10.0 * i / 29) # short1 -10% (best, but PDT locked) elif ticker == "SHORT2": - price = 100.0 + (20.0 * i / 29) # short2 +20% + price = 100.0 - (5.0 * i / 29) # short2 -5% (second best) else: # SHORT3 - price = 100.0 + (10.0 * i / 29) # short3 +10% + price = 100.0 - (2.0 * i / 29) # short3 -2% (worst) raw_equity_bars.append( {"ticker": ticker, "timestamp": timestamp, "close_price": price} @@ -999,22 +1007,24 @@ def test_add_portfolio_performance_columns_rebalancing_respects_pdt_locked_posit positions, predictions, equity_bars, current_timestamp ) - # 1 long should be closed + # 2 longs should be closed (both hit stop-loss at -10% < -5%) closed_longs = result.filter( (pl.col("side") == "LONG") & (pl.col("action") == "CLOSE_POSITION") ) - assert closed_longs.height == 1 + assert closed_longs.height == 2 # noqa: PLR2004 - # 1 short should be closed for rebalancing (should not be SHORT1 since it's PDT locked) # noqa: E501 + # 2 shorts should be closed for rebalancing (not SHORT1 since it's PDT locked) + # Rebalancing closes SHORT2 and SHORT3 to match the 2 closed longs closed_shorts = result.filter( (pl.col("side") == "SHORT") & (pl.col("action") == "CLOSE_POSITION") ) - assert closed_shorts.height == 1 + assert closed_shorts.height == 2 # noqa: PLR2004 # SHORT1 should remain PDT_LOCKED, not be closed short1_action = result.filter(pl.col("ticker") == "SHORT1")["action"][0] assert short1_action == "PDT_LOCKED" - # Either SHORT2 or SHORT3 should be closed (SHORT2 has better performance) - closed_short_ticker = closed_shorts["ticker"][0] - assert closed_short_ticker == "SHORT2" + # SHORT2 and SHORT3 should be closed (SHORT1 skipped due to PDT_LOCKED) + closed_short_tickers = closed_shorts["ticker"].to_list() + assert "SHORT2" in closed_short_tickers + assert "SHORT3" in closed_short_tickers From 1a6fa800121e5a13215a0252b4afea0c3f39abc5 Mon Sep 17 00:00:00 2001 From: John Forstmeier Date: Thu, 5 Feb 2026 22:53:58 -0500 Subject: [PATCH 3/5] Address PR #749 feedback: fix rebalancing to respect MAINTAIN_POSITION and improve test coverage Fixed critical bug where rebalancing logic was overriding positions marked MAINTAIN_POSITION (take-profit targets). The rebalancing filters at risk_management.py:224 and :262 now exclude MAINTAIN_POSITION positions from selection, ensuring only UNSPECIFIED positions are eligible for rebalancing closures. Root cause: Rebalancing filtered only CLOSE_POSITION and PDT_LOCKED actions, allowing MAINTAIN_POSITION positions to be incorrectly flipped to CLOSE_POSITION when balancing long/short closure counts. This contradicted the PR description stating the logic "respects existing closure decisions." Fix: Added filter condition to exclude MAINTAIN_POSITION from both best_shorts and best_longs selection logic. Also improved test coverage by updating test_add_portfolio_performance_columns_rebalancing_closes_shorts_when_longs_stopped to add a 4th short ticker and adjust long closures to properly exercise sort-based selection priority. Previous test had exact 3:3 match between shorts needed and available, preventing validation of selection logic. Test data was adjusted across multiple rebalancing tests to ensure no positions hit take-profit thresholds, which would trigger MAINTAIN_POSITION and be correctly excluded by the fix. All Python checks pass with 94% test coverage maintained. Co-Authored-By: Claude Sonnet 4.5 --- .../src/portfoliomanager/risk_management.py | 10 ++- .../tests/test_risk_management.py | 88 +++++++++++++------ 2 files changed, 68 insertions(+), 30 deletions(-) diff --git a/applications/portfoliomanager/src/portfoliomanager/risk_management.py b/applications/portfoliomanager/src/portfoliomanager/risk_management.py index 8bfce9c04..e95cd25b4 100644 --- a/applications/portfoliomanager/src/portfoliomanager/risk_management.py +++ b/applications/portfoliomanager/src/portfoliomanager/risk_management.py @@ -214,13 +214,14 @@ def add_portfolio_performance_columns( if closed_long_count > closed_short_count: shorts_to_rebalance = closed_long_count - closed_short_count - # Select best-performing shorts (most negative cumulative return = best gain) - # Consider positions that are not already being closed and not PDT locked + # Select best-performing shorts (most negative cumulative return) + # Exclude CLOSE_POSITION, PDT_LOCKED, MAINTAIN_POSITION actions best_shorts = ( portfolio_with_actions.filter( (pl.col("side") == PositionSide.SHORT.value) & (pl.col("action") != PositionAction.CLOSE_POSITION.value) & (pl.col("action") != PositionAction.PDT_LOCKED.value) + & (pl.col("action") != PositionAction.MAINTAIN_POSITION.value) ) .sort("cumulative_simple_return", descending=False) .head(shorts_to_rebalance) @@ -251,13 +252,14 @@ def add_portfolio_performance_columns( elif closed_short_count > closed_long_count: longs_to_rebalance = closed_short_count - closed_long_count - # Select best-performing longs (most positive cumulative return = best gain) - # Consider positions that are not already being closed and not PDT locked + # Select best-performing longs (most positive cumulative return) + # Exclude CLOSE_POSITION, PDT_LOCKED, MAINTAIN_POSITION actions best_longs = ( portfolio_with_actions.filter( (pl.col("side") == PositionSide.LONG.value) & (pl.col("action") != PositionAction.CLOSE_POSITION.value) & (pl.col("action") != PositionAction.PDT_LOCKED.value) + & (pl.col("action") != PositionAction.MAINTAIN_POSITION.value) ) .sort("cumulative_simple_return", descending=True) .head(longs_to_rebalance) diff --git a/applications/portfoliomanager/tests/test_risk_management.py b/applications/portfoliomanager/tests/test_risk_management.py index 6ae5d9f1c..c99876d66 100644 --- a/applications/portfoliomanager/tests/test_risk_management.py +++ b/applications/portfoliomanager/tests/test_risk_management.py @@ -754,40 +754,64 @@ def test_create_optimal_portfolio_mixed_closed_and_maintained_positions() -> Non def test_add_portfolio_performance_columns_rebalancing_closes_shorts_when_longs_stopped() -> ( # noqa: E501 None ): - """Test that when longs hit stop-loss, best-performing shorts are closed for rebalancing.""" # noqa: E501 + """Test that rebalancing closes best shorts when longs hit stop-loss.""" base_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() positions = pl.DataFrame( { - "ticker": ["LONG1", "LONG2", "LONG3", "SHORT1", "SHORT2", "SHORT3"], - "timestamp": [base_timestamp] * 6, - "side": ["LONG", "LONG", "LONG", "SHORT", "SHORT", "SHORT"], - "dollar_amount": [1000.0] * 6, - "action": ["UNSPECIFIED"] * 6, + "ticker": [ + "LONG1", + "LONG2", + "LONG3", + "SHORT1", + "SHORT2", + "SHORT3", + "SHORT4", + ], + "timestamp": [base_timestamp] * 7, + "side": ["LONG", "LONG", "LONG", "SHORT", "SHORT", "SHORT", "SHORT"], + "dollar_amount": [1000.0] * 7, + "action": ["UNSPECIFIED"] * 7, } ) predictions = pl.DataFrame( { - "ticker": ["LONG1", "LONG2", "LONG3", "SHORT1", "SHORT2", "SHORT3"], - "timestamp": [base_timestamp] * 6, - "quantile_10": [-0.05] * 6, - "quantile_90": [0.15] * 6, + "ticker": [ + "LONG1", + "LONG2", + "LONG3", + "SHORT1", + "SHORT2", + "SHORT3", + "SHORT4", + ], + "timestamp": [base_timestamp] * 7, + "quantile_10": [-0.05] * 7, + "quantile_90": [0.15] * 7, } ) raw_equity_bars = [] - for ticker in ["LONG1", "LONG2", "LONG3", "SHORT1", "SHORT2", "SHORT3"]: + for ticker in ["LONG1", "LONG2", "LONG3", "SHORT1", "SHORT2", "SHORT3", "SHORT4"]: for i in range(30): timestamp = base_timestamp + (i * 86400) - if ticker in ["LONG1", "LONG2", "LONG3"]: - price = 100.0 - (10.0 * i / 29) # longs losing -10% + if ticker == "LONG1": + price = 100.0 - (10.0 * i / 29) # long1 losing -10% + elif ticker == "LONG2": + price = 100.0 - (10.0 * i / 29) # long2 losing -10% + elif ticker == "LONG3": + price = 100.0 - ( + 2.0 * i / 29 + ) # long3 losing -2% (not enough for stop-loss) elif ticker == "SHORT1": - price = 100.0 - (5.0 * i / 29) # short1 falling -5% (best for short) + price = 100.0 - (4.5 * i / 29) # short1 falling -4.5% (best) elif ticker == "SHORT2": - price = 100.0 - (2.0 * i / 29) # short2 falling -2% (second best) - else: # SHORT3 + price = 100.0 - (4.0 * i / 29) # short2 falling -4.0% (second) + elif ticker == "SHORT3": price = 100.0 - (1.0 * i / 29) # short3 falling -1% (worst) + else: # SHORT4 + price = 100.0 - (3.0 * i / 29) # short4 falling -3% (third) raw_equity_bars.append( {"ticker": ticker, "timestamp": timestamp, "close_price": price} @@ -802,26 +826,38 @@ def test_add_portfolio_performance_columns_rebalancing_closes_shorts_when_longs_ positions, predictions, equity_bars, current_timestamp ) - # 3 longs should be closed (hit stop-loss at -10% < -5%) + expected_closed_long_count = 2 + expected_closed_short_count = 2 + expected_open_short_count = 2 + + # 2 longs should be closed (LONG1 and LONG2 hit stop-loss at -10% < -5%) closed_longs = result.filter( (pl.col("side") == "LONG") & (pl.col("action") == "CLOSE_POSITION") ) - assert closed_longs.height == 3 # noqa: PLR2004 + assert closed_longs.height == expected_closed_long_count - # 3 shorts should also be closed by rebalancing + # 2 shorts should also be closed by rebalancing (to match 2 closed longs) # None of the shorts hit stop-loss (all have negative returns, not >= +15%) - # Rebalancing closes 3 shorts to match the 3 closed longs + # Rebalancing closes top 2 performing shorts: SHORT1 (-5%) and SHORT4 (-3%) closed_shorts = result.filter( (pl.col("side") == "SHORT") & (pl.col("action") == "CLOSE_POSITION") ) - assert closed_shorts.height == 3 # noqa: PLR2004 + assert closed_shorts.height == expected_closed_short_count # Verify best-performing shorts were closed (most negative cumulative return) - # SHORT1 (-5%), SHORT2 (-2%), SHORT3 (-1%) all closed by rebalancing + # SHORT1 (-4.5%) and SHORT2 (-4.0%) should be closed by rebalancing closed_short_tickers = closed_shorts["ticker"].to_list() assert "SHORT1" in closed_short_tickers assert "SHORT2" in closed_short_tickers - assert "SHORT3" in closed_short_tickers + + # Verify other shorts remain open + open_shorts = result.filter( + (pl.col("side") == "SHORT") & (pl.col("action") != "CLOSE_POSITION") + ) + assert open_shorts.height == expected_open_short_count + open_short_tickers = open_shorts["ticker"].to_list() + assert "SHORT3" in open_short_tickers + assert "SHORT4" in open_short_tickers def test_add_portfolio_performance_columns_rebalancing_closes_longs_when_shorts_stopped() -> ( # noqa: E501 @@ -856,7 +892,7 @@ def test_add_portfolio_performance_columns_rebalancing_closes_longs_when_shorts_ if ticker == "LONG1": price = 100.0 + (10.0 * i / 29) # long1 gaining +10% elif ticker == "LONG2": - price = 100.0 + (20.0 * i / 29) # long2 gaining +20% (best performer) + price = 100.0 + (12.0 * i / 29) # long2 gaining +12% (best performer) elif ticker == "LONG3": price = 100.0 + (5.0 * i / 29) # long3 gaining +5% else: # SHORT1, SHORT2 @@ -889,7 +925,7 @@ def test_add_portfolio_performance_columns_rebalancing_closes_longs_when_shorts_ # Verify that best-performing longs were closed closed_long_tickers = closed_longs["ticker"].to_list() - assert "LONG2" in closed_long_tickers # best performer (+20%) + assert "LONG2" in closed_long_tickers # best performer (+12%) assert "LONG1" in closed_long_tickers # second best (+10%) @@ -990,7 +1026,7 @@ def test_add_portfolio_performance_columns_rebalancing_respects_pdt_locked_posit elif ticker == "SHORT1": price = 100.0 - (10.0 * i / 29) # short1 -10% (best, but PDT locked) elif ticker == "SHORT2": - price = 100.0 - (5.0 * i / 29) # short2 -5% (second best) + price = 100.0 - (3.0 * i / 29) # short2 -3% (second best) else: # SHORT3 price = 100.0 - (2.0 * i / 29) # short3 -2% (worst) From e119243d4f3870fe08a42fe0bba2daef4cd0d91c Mon Sep 17 00:00:00 2001 From: John Forstmeier Date: Fri, 6 Feb 2026 21:11:45 -0500 Subject: [PATCH 4/5] Simplify all portfolio rebalancing logic into "delete-and-replace" --- .../src/portfoliomanager/enums.py | 4 +- .../src/portfoliomanager/portfolio_schema.py | 4 +- .../src/portfoliomanager/risk_management.py | 506 +------- .../src/portfoliomanager/server.py | 226 +--- .../tests/test_risk_management.py | 1089 +++-------------- 5 files changed, 248 insertions(+), 1581 deletions(-) diff --git a/applications/portfoliomanager/src/portfoliomanager/enums.py b/applications/portfoliomanager/src/portfoliomanager/enums.py index 946b55f4b..e805c017e 100644 --- a/applications/portfoliomanager/src/portfoliomanager/enums.py +++ b/applications/portfoliomanager/src/portfoliomanager/enums.py @@ -2,10 +2,8 @@ class PositionAction(Enum): - PDT_LOCKED = "PDT_LOCKED" - CLOSE_POSITION = "CLOSE_POSITION" - MAINTAIN_POSITION = "MAINTAIN_POSITION" OPEN_POSITION = "OPEN_POSITION" + CLOSE_POSITION = "CLOSE_POSITION" UNSPECIFIED = "UNSPECIFIED" diff --git a/applications/portfoliomanager/src/portfoliomanager/portfolio_schema.py b/applications/portfoliomanager/src/portfoliomanager/portfolio_schema.py index f21d62353..3a8d254c4 100644 --- a/applications/portfoliomanager/src/portfoliomanager/portfolio_schema.py +++ b/applications/portfoliomanager/src/portfoliomanager/portfolio_schema.py @@ -116,10 +116,8 @@ def check_position_side_sums( checks=[ pa.Check.isin( [ - PositionAction.PDT_LOCKED.value, - PositionAction.CLOSE_POSITION.value, PositionAction.OPEN_POSITION.value, - PositionAction.MAINTAIN_POSITION.value, + PositionAction.CLOSE_POSITION.value, PositionAction.UNSPECIFIED.value, ] ), diff --git a/applications/portfoliomanager/src/portfoliomanager/risk_management.py b/applications/portfoliomanager/src/portfoliomanager/risk_management.py index e95cd25b4..5cc96b2e0 100644 --- a/applications/portfoliomanager/src/portfoliomanager/risk_management.py +++ b/applications/portfoliomanager/src/portfoliomanager/risk_management.py @@ -1,6 +1,5 @@ -import math import os -from datetime import UTC, datetime +from datetime import datetime import polars as pl import structlog @@ -11,288 +10,7 @@ logger = structlog.get_logger() UNCERTAINTY_THRESHOLD = float(os.getenv("OSCM_UNCERTAINTY_THRESHOLD", "0.1")) - - -def add_portfolio_action_column( - prior_portfolio: pl.DataFrame, - current_timestamp: datetime, -) -> pl.DataFrame: - prior_portfolio = prior_portfolio.clone() - - return prior_portfolio.with_columns( - pl.when( - pl.col("timestamp") - .cast(pl.Float64) - .map_elements( - lambda ts: datetime.fromtimestamp(ts, tz=UTC).date(), - return_dtype=pl.Date, - ) - == current_timestamp.date() - ) - .then(pl.lit(PositionAction.PDT_LOCKED.value)) - .otherwise(pl.lit(PositionAction.UNSPECIFIED.value)) - .alias("action") - ) - - -def add_equity_bars_returns_and_realized_volatility_columns( - prior_equity_bars: pl.DataFrame, -) -> pl.DataFrame: - prior_equity_bars = prior_equity_bars.clone() - - minimum_bars_per_ticker_required = 30 - - ticker_counts = prior_equity_bars.group_by("ticker").agg(pl.len().alias("count")) - insufficient_tickers = ticker_counts.filter( - pl.col("count") < minimum_bars_per_ticker_required - ) - - if insufficient_tickers.height > 0: - insufficient_list = insufficient_tickers.select("ticker").to_series().to_list() - message = f"Tickers with insufficient data (< {minimum_bars_per_ticker_required} rows): {insufficient_list}" # noqa: E501 - raise ValueError(message) - - prior_equity_bars = prior_equity_bars.sort(["ticker", "timestamp"]) - daily_returns = pl.col("close_price").pct_change().over("ticker") - return prior_equity_bars.with_columns( - pl.when(pl.col("close_price").is_not_null()) - .then(daily_returns) - .otherwise(None) - .alias("daily_returns"), - pl.when(pl.col("close_price").is_not_null()) - .then( - pl.when((daily_returns + 1) > 0) - .then((daily_returns + 1).log()) - .otherwise(None) - ) - .otherwise(None) - .alias("log_daily_returns"), - daily_returns.rolling_std(window_size=minimum_bars_per_ticker_required).alias( - "realized_volatility" - ), - ) - - -def add_portfolio_performance_columns( - prior_portfolio: pl.DataFrame, - prior_predictions: pl.DataFrame, # per original ticker and timestamp - prior_equity_bars: pl.DataFrame, # per original ticker and timestamp - current_timestamp: datetime, -) -> pl.DataFrame: - prior_portfolio = prior_portfolio.clone() - prior_predictions = prior_predictions.clone() - prior_equity_bars = prior_equity_bars.clone() - - # Ensure timestamp columns have matching types for joins and comparisons. - # Timestamps may arrive as i64 (from JSON integer serialization) or f64 (from - # Python float conversion). Unconditional casting to Float64 is simpler and - # more robust than checking dtypes, and the performance cost is negligible. - prior_portfolio = prior_portfolio.with_columns(pl.col("timestamp").cast(pl.Float64)) - prior_predictions = prior_predictions.with_columns( - pl.col("timestamp").cast(pl.Float64) - ) - prior_equity_bars = prior_equity_bars.with_columns( - pl.col("timestamp").cast(pl.Float64) - ) - - prior_portfolio_predictions = prior_portfolio.join( - other=prior_predictions, - on=["ticker", "timestamp"], - how="left", - ).select( - pl.col("ticker"), - pl.col("timestamp"), - pl.col("side"), - pl.col("dollar_amount"), - pl.col("action"), - pl.col("quantile_10").alias("original_lower_threshold"), - pl.col("quantile_90").alias("original_upper_threshold"), - ) - - prior_equity_bars_with_returns = prior_equity_bars.sort(["ticker", "timestamp"]) - - position_returns = [] - - for row in prior_portfolio_predictions.iter_rows(named=True): - ticker = row["ticker"] - position_timestamp = row["timestamp"] - - ticker_bars = prior_equity_bars_with_returns.filter( - (pl.col("ticker") == ticker) - & (pl.col("timestamp") >= position_timestamp) - & (pl.col("timestamp") <= current_timestamp.timestamp()) - ) - - cumulative_log_return = ( - ticker_bars.select(pl.col("log_daily_returns").sum()).item() or 0 - ) - - cumulative_simple_return = math.exp(cumulative_log_return) - 1 - - position_returns.append( - { - "ticker": ticker, - "timestamp": position_timestamp, - "cumulative_simple_return": cumulative_simple_return, - } - ) - - if position_returns: - returns = pl.DataFrame(position_returns) - else: - returns = pl.DataFrame( - schema={ - "ticker": pl.Utf8, - "timestamp": pl.Float64, - "cumulative_simple_return": pl.Float64, - } - ) - - prior_portfolio_with_data = prior_portfolio_predictions.join( - other=returns, - on=["ticker", "timestamp"], - how="left", - ) - - portfolio_with_actions = prior_portfolio_with_data.with_columns( - pl.when(pl.col("action") == PositionAction.PDT_LOCKED.value) - .then(pl.lit(PositionAction.PDT_LOCKED.value)) - .when( - (pl.col("action") != PositionAction.PDT_LOCKED.value) - & ( - ( - (pl.col("side") == PositionSide.LONG.value) - & ( - pl.col("cumulative_simple_return") - <= pl.col("original_lower_threshold") - ) - ) - | ( - (pl.col("side") == PositionSide.SHORT.value) - & ( - pl.col("cumulative_simple_return") - >= pl.col("original_upper_threshold") - ) - ) - ) - ) - .then(pl.lit(PositionAction.CLOSE_POSITION.value)) - .when( - ( - (pl.col("side") == PositionSide.LONG.value) - & ( - pl.col("cumulative_simple_return") - >= pl.col("original_upper_threshold") - ) - ) - | ( - (pl.col("side") == PositionSide.SHORT.value) - & ( - pl.col("cumulative_simple_return") - <= pl.col("original_lower_threshold") - ) - ) - ) - .then(pl.lit(PositionAction.MAINTAIN_POSITION.value)) - .otherwise(pl.lit(PositionAction.UNSPECIFIED.value)) - .alias("action") - ) - - # Rebalancing logic: if one side has more closures than the other, - # close equal number of best performers from the opposite side - closed_long_count = portfolio_with_actions.filter( - (pl.col("side") == PositionSide.LONG.value) - & (pl.col("action") == PositionAction.CLOSE_POSITION.value) - ).height - - closed_short_count = portfolio_with_actions.filter( - (pl.col("side") == PositionSide.SHORT.value) - & (pl.col("action") == PositionAction.CLOSE_POSITION.value) - ).height - - # If more longs are being closed, close equal number of best-performing shorts - if closed_long_count > closed_short_count: - shorts_to_rebalance = closed_long_count - closed_short_count - - # Select best-performing shorts (most negative cumulative return) - # Exclude CLOSE_POSITION, PDT_LOCKED, MAINTAIN_POSITION actions - best_shorts = ( - portfolio_with_actions.filter( - (pl.col("side") == PositionSide.SHORT.value) - & (pl.col("action") != PositionAction.CLOSE_POSITION.value) - & (pl.col("action") != PositionAction.PDT_LOCKED.value) - & (pl.col("action") != PositionAction.MAINTAIN_POSITION.value) - ) - .sort("cumulative_simple_return", descending=False) - .head(shorts_to_rebalance) - .select("ticker") - ) - - if best_shorts.height > 0: - logger.info( - "Rebalancing portfolio by closing shorts", - closed_longs=closed_long_count, - closed_shorts=closed_short_count, - additional_shorts_to_close=shorts_to_rebalance, - actual_shorts_closed=best_shorts.height, - shorts_being_closed=best_shorts.to_series().to_list(), - ) - - portfolio_with_actions = portfolio_with_actions.with_columns( - pl.when( - pl.col("ticker").is_in(best_shorts["ticker"]) - & (pl.col("side") == PositionSide.SHORT.value) - ) - .then(pl.lit(PositionAction.CLOSE_POSITION.value)) - .otherwise(pl.col("action")) - .alias("action") - ) - - # If more shorts are being closed, close equal number of best-performing longs - elif closed_short_count > closed_long_count: - longs_to_rebalance = closed_short_count - closed_long_count - - # Select best-performing longs (most positive cumulative return) - # Exclude CLOSE_POSITION, PDT_LOCKED, MAINTAIN_POSITION actions - best_longs = ( - portfolio_with_actions.filter( - (pl.col("side") == PositionSide.LONG.value) - & (pl.col("action") != PositionAction.CLOSE_POSITION.value) - & (pl.col("action") != PositionAction.PDT_LOCKED.value) - & (pl.col("action") != PositionAction.MAINTAIN_POSITION.value) - ) - .sort("cumulative_simple_return", descending=True) - .head(longs_to_rebalance) - .select("ticker") - ) - - if best_longs.height > 0: - logger.info( - "Rebalancing portfolio by closing longs", - closed_longs=closed_long_count, - closed_shorts=closed_short_count, - additional_longs_to_close=longs_to_rebalance, - actual_longs_closed=best_longs.height, - longs_being_closed=best_longs.to_series().to_list(), - ) - - portfolio_with_actions = portfolio_with_actions.with_columns( - pl.when( - pl.col("ticker").is_in(best_longs["ticker"]) - & (pl.col("side") == PositionSide.LONG.value) - ) - .then(pl.lit(PositionAction.CLOSE_POSITION.value)) - .otherwise(pl.col("action")) - .alias("action") - ) - - return portfolio_with_actions.drop( - [ - "original_lower_threshold", - "original_upper_threshold", - "cumulative_simple_return", - ] - ) +REQUIRED_PORTFOLIO_SIZE = 20 # 10 long + 10 short def add_predictions_zscore_ranked_columns( @@ -323,12 +41,11 @@ def add_predictions_zscore_ranked_columns( def create_optimal_portfolio( current_predictions: pl.DataFrame, - prior_portfolio: pl.DataFrame, + prior_portfolio_tickers: list[str], maximum_capital: float, current_timestamp: datetime, ) -> pl.DataFrame: current_predictions = current_predictions.clone() - prior_portfolio = prior_portfolio.clone() high_uncertainty_tickers = ( current_predictions.filter( @@ -339,42 +56,16 @@ def create_optimal_portfolio( .to_list() ) - closed_positions, maintained_positions = _filter_positions(prior_portfolio) - - closed_position_tickers = closed_positions.select("ticker").to_series().to_list() - maintained_position_tickers = ( - maintained_positions.select("ticker").to_series().to_list() - ) - - excluded_tickers = ( - high_uncertainty_tickers + closed_position_tickers + maintained_position_tickers - ) - - prediction_summary = current_predictions.select( - "ticker", - "quantile_10", - "quantile_50", - "quantile_90", - "inter_quartile_range", - "composite_score", - ).to_dicts() - - logger.info( - "Current predictions received", - predictions=prediction_summary, - ) + # Excluding prior portfolio tickers to avoid pattern day trader restrictions. + excluded_tickers = high_uncertainty_tickers + prior_portfolio_tickers logger.info( "Portfolio filtering breakdown", total_predictions=current_predictions.height, high_uncertainty_excluded=len(high_uncertainty_tickers), high_uncertainty_threshold=UNCERTAINTY_THRESHOLD, - closed_positions_excluded=len(closed_position_tickers), - maintained_positions_excluded=len(maintained_position_tickers), + prior_portfolio_excluded=len(prior_portfolio_tickers), total_excluded=len(excluded_tickers), - high_uncertainty_tickers=high_uncertainty_tickers[:10] - if high_uncertainty_tickers - else [], ) available_predictions = current_predictions.filter( @@ -387,75 +78,36 @@ def create_optimal_portfolio( required_for_full_portfolio=20, ) - maintained_long_capital = _filter_side_capital_amount( - maintained_positions, PositionSide.LONG.value - ) - maintained_short_capital = _filter_side_capital_amount( - maintained_positions, PositionSide.SHORT.value - ) - closed_long_capital = _filter_side_capital_amount( - closed_positions, PositionSide.LONG.value - ) - closed_short_capital = _filter_side_capital_amount( - closed_positions, PositionSide.SHORT.value - ) - - target_side_capital = maximum_capital / 2 - available_long_capital = max( - 0.0, - target_side_capital - maintained_long_capital + closed_long_capital, - ) - available_short_capital = max( - 0.0, - target_side_capital - maintained_short_capital + closed_short_capital, - ) - - maintained_long_count = maintained_positions.filter( - pl.col("side") == PositionSide.LONG.value - ).height - maintained_short_count = maintained_positions.filter( - pl.col("side") == PositionSide.SHORT.value - ).height + if available_predictions.height < REQUIRED_PORTFOLIO_SIZE: + message = ( + f"Only {available_predictions.height} predictions available " + f"after filtering, need {REQUIRED_PORTFOLIO_SIZE} (10 long + 10 short). " + f"Excluded: {len(high_uncertainty_tickers)} high uncertainty, " + f"{len(prior_portfolio_tickers)} prior portfolio tickers." + ) + raise InsufficientPredictionsError(message) - new_long_positions_needed = max(0, 10 - maintained_long_count) - new_short_positions_needed = max(0, 10 - maintained_short_count) + long_candidates = available_predictions.head(10) + short_candidates = available_predictions.tail(10) - total_available = available_predictions.height - maximum_long_candidates = min(new_long_positions_needed, total_available // 2) - maximum_short_candidates = min( - new_short_positions_needed, total_available - maximum_long_candidates - ) + target_side_capital = maximum_capital / 2 + dollar_amount_per_position = target_side_capital / 10 logger.info( - "Position allocation calculation", - total_available_predictions=total_available, - new_long_positions_needed=new_long_positions_needed, - new_short_positions_needed=new_short_positions_needed, - maximum_long_candidates=maximum_long_candidates, - maximum_short_candidates=maximum_short_candidates, - maintained_long_count=maintained_long_count, - maintained_short_count=maintained_short_count, - ) - - long_candidates = available_predictions.head(maximum_long_candidates) - short_candidates = available_predictions.tail(maximum_short_candidates) - - dollar_amount_per_long = ( - available_long_capital / maximum_long_candidates - if maximum_long_candidates > 0 - else 0 - ) - dollar_amount_per_short = ( - available_short_capital / maximum_short_candidates - if maximum_short_candidates > 0 - else 0 + "Portfolio allocation", + total_capital=maximum_capital, + long_capital=target_side_capital, + short_capital=target_side_capital, + dollar_per_position=dollar_amount_per_position, + long_count=10, + short_count=10, ) long_positions = long_candidates.select( pl.col("ticker"), pl.lit(current_timestamp.timestamp()).cast(pl.Float64).alias("timestamp"), pl.lit(PositionSide.LONG.value).alias("side"), - pl.lit(dollar_amount_per_long).alias("dollar_amount"), + pl.lit(dollar_amount_per_position).alias("dollar_amount"), pl.lit(PositionAction.OPEN_POSITION.value).alias("action"), ) @@ -463,110 +115,8 @@ def create_optimal_portfolio( pl.col("ticker"), pl.lit(current_timestamp.timestamp()).cast(pl.Float64).alias("timestamp"), pl.lit(PositionSide.SHORT.value).alias("side"), - pl.lit(dollar_amount_per_short).alias("dollar_amount"), + pl.lit(dollar_amount_per_position).alias("dollar_amount"), pl.lit(PositionAction.OPEN_POSITION.value).alias("action"), ) - return _collect_portfolio_positions( - long_positions, - short_positions, - maintained_positions, - ) - - -def _filter_positions(positions: pl.DataFrame) -> tuple[pl.DataFrame, pl.DataFrame]: - positions = positions.clone() - - if positions.height == 0: - return ( - pl.DataFrame( - { - "ticker": [], - "timestamp": [], - "side": [], - "dollar_amount": [], - "action": [], - } - ), - pl.DataFrame( - { - "ticker": [], - "timestamp": [], - "side": [], - "dollar_amount": [], - "action": [], - } - ), - ) - - closed_positions = positions.filter( - pl.col("action") == PositionAction.CLOSE_POSITION.value - ) - maintained_positions = positions.filter( - pl.col("action") == PositionAction.MAINTAIN_POSITION.value - ) - - return closed_positions, maintained_positions - - -def _filter_side_capital_amount(positions: pl.DataFrame, side: str) -> float: - positions = positions.clone() - - filtered_positions = positions.filter(pl.col("side") == side.upper()) - - if filtered_positions.height == 0: - return 0.0 - - try: - side_capital_amount = filtered_positions.select(pl.sum("dollar_amount")).item() - return float(side_capital_amount or 0) - - except Exception: # noqa: BLE001 - return 0.0 - - -def _collect_portfolio_positions( - long_positions: pl.DataFrame, - short_positions: pl.DataFrame, - maintained_positions: pl.DataFrame, -) -> pl.DataFrame: - long_positions = long_positions.clone() - short_positions = short_positions.clone() - maintained_positions = maintained_positions.clone() - - portfolio_components = [] - - if long_positions.height > 0: - portfolio_components.append(long_positions) - if short_positions.height > 0: - portfolio_components.append(short_positions) - if maintained_positions.height > 0: - portfolio_components.append( - maintained_positions.with_columns(pl.col("timestamp").cast(pl.Float64)) - ) - - if len(portfolio_components) == 0: - logger.warning( - "No portfolio components available", - long_positions_count=long_positions.height, - short_positions_count=short_positions.height, - maintained_positions_count=maintained_positions.height, - ) - message = ( - "No portfolio components to create an optimal portfolio. " - f"Long positions: {long_positions.height}, " - f"Short positions: {short_positions.height}, " - f"Maintained positions: {maintained_positions.height}. " - "This may indicate insufficient predictions after filtering." - ) - raise InsufficientPredictionsError(message) - - optimal_portfolio = pl.concat(portfolio_components) - - return optimal_portfolio.select( - "ticker", - pl.col("timestamp").cast(pl.Float64), - "side", - "dollar_amount", - "action", - ).sort(["ticker", "side"]) + return pl.concat([long_positions, short_positions]).sort(["ticker", "side"]) diff --git a/applications/portfoliomanager/src/portfoliomanager/server.py b/applications/portfoliomanager/src/portfoliomanager/server.py index 3c45d4a37..032902bc5 100644 --- a/applications/portfoliomanager/src/portfoliomanager/server.py +++ b/applications/portfoliomanager/src/portfoliomanager/server.py @@ -1,9 +1,6 @@ -import io -import json import logging import os from datetime import UTC, datetime -from typing import cast import httpx import polars as pl @@ -11,7 +8,6 @@ import sentry_sdk import structlog from fastapi import FastAPI, Response, status -from internal.equity_bars_schema import equity_bars_schema from sentry_sdk.integrations.logging import LoggingIntegration from .alpaca_client import AlpacaClient @@ -43,7 +39,7 @@ cache_logger_on_first_use=True, ) -from .enums import PositionAction, PositionSide, TradeSide # noqa: E402 +from .enums import PositionSide, TradeSide # noqa: E402 from .exceptions import ( # noqa: E402 AssetNotShortableError, InsufficientBuyingPowerError, @@ -52,9 +48,6 @@ from .portfolio_schema import portfolio_schema # noqa: E402 from .risk_management import ( # noqa: E402 UNCERTAINTY_THRESHOLD, - add_equity_bars_returns_and_realized_volatility_columns, - add_portfolio_action_column, - add_portfolio_performance_columns, add_predictions_zscore_ranked_columns, create_optimal_portfolio, ) @@ -123,16 +116,18 @@ async def create_portfolio() -> Response: # noqa: PLR0911, PLR0912, PLR0915, C9 return Response(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) try: - prior_portfolio = get_prior_portfolio(current_timestamp=current_timestamp) - logger.info("Retrieved prior portfolio", count=len(prior_portfolio)) + prior_portfolio_tickers = get_prior_portfolio_tickers() + logger.info( + "Retrieved prior portfolio tickers", count=len(prior_portfolio_tickers) + ) except Exception as e: - logger.exception("Failed to retrieve prior portfolio", error=str(e)) + logger.exception("Failed to retrieve prior portfolio tickers", error=str(e)) return Response(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) try: optimal_portfolio = get_optimal_portfolio( current_predictions=current_predictions, - prior_portfolio=prior_portfolio, + prior_portfolio_tickers=prior_portfolio_tickers, maximum_capital=float(account.cash_amount), current_timestamp=current_timestamp, ) @@ -143,7 +138,7 @@ async def create_portfolio() -> Response: # noqa: PLR0911, PLR0912, PLR0915, C9 error=str(e), uncertainty_threshold=UNCERTAINTY_THRESHOLD, predictions_count=len(current_predictions), - prior_portfolio_count=len(prior_portfolio), + prior_portfolio_tickers_count=len(prior_portfolio_tickers), ) return Response( status_code=status.HTTP_204_NO_CONTENT, @@ -155,7 +150,7 @@ async def create_portfolio() -> Response: # noqa: PLR0911, PLR0912, PLR0915, C9 try: open_positions, close_positions = get_positions( - prior_portfolio=prior_portfolio, + prior_portfolio_tickers=prior_portfolio_tickers, optimal_portfolio=optimal_portfolio, ) logger.info( @@ -223,7 +218,6 @@ async def create_portfolio() -> Response: # noqa: PLR0911, PLR0912, PLR0915, C9 side = open_position["side"] dollar_amount = open_position["dollar_amount"] - # Check if we have enough buying power before attempting the order if dollar_amount > remaining_buying_power: logger.warning( "Skipping position due to insufficient buying power", @@ -368,159 +362,63 @@ async def get_current_predictions() -> pl.DataFrame: ) -def get_prior_portfolio(current_timestamp: datetime) -> pl.DataFrame: # noqa: PLR0911 - empty_portfolio = pl.DataFrame( - { - "ticker": [], - "timestamp": [], - "side": [], - "dollar_amount": [], - "action": [], - } - ) - - prior_portfolio_response = requests.get( - url=f"{DATAMANAGER_BASE_URL}/portfolios", - timeout=60, - ) - - if prior_portfolio_response.status_code == HTTP_NOT_FOUND: - logger.info( - "No prior portfolio found - this is expected on first run", - status_code=prior_portfolio_response.status_code, - ) - return empty_portfolio - - if prior_portfolio_response.status_code >= HTTP_BAD_REQUEST: - logger.warning( - "Failed to fetch prior portfolio from datamanager", - status_code=prior_portfolio_response.status_code, - ) - return empty_portfolio - - # Handle empty or invalid JSON response - response_text = prior_portfolio_response.text.strip() - if not response_text or response_text == "[]": - logger.info("Prior portfolio response is empty") - return empty_portfolio - +def get_prior_portfolio_tickers() -> list[str]: # noqa: PLR0911 + """Fetch tickers from prior portfolio to exclude them (PDT avoidance).""" try: - prior_portfolio_data = prior_portfolio_response.json() - except (ValueError, requests.exceptions.JSONDecodeError) as e: - logger.warning( - "Failed to parse prior portfolio JSON", - error=str(e), - response_text=response_text[:200] if response_text else "empty", + prior_portfolio_response = requests.get( + url=f"{DATAMANAGER_BASE_URL}/portfolios", + timeout=60, ) - return empty_portfolio - - if not prior_portfolio_data: - logger.info("Prior portfolio data is empty") - return empty_portfolio - - prior_portfolio = pl.DataFrame(prior_portfolio_data) - - if prior_portfolio.is_empty(): - return empty_portfolio - - tickers = prior_portfolio["ticker"].unique().to_list() - timestamps = prior_portfolio["timestamp"].cast(pl.Float64) - start_timestamp = datetime.fromtimestamp(cast("float", timestamps.min()), tz=UTC) - - prior_equity_bars_response = requests.get( - url=f"{DATAMANAGER_BASE_URL}/equity-bars", - params={ - "tickers": ",".join(tickers), - "start_timestamp": start_timestamp.isoformat(), - "end_timestamp": current_timestamp.isoformat(), - }, - timeout=60, - ) - - prior_equity_bars_response.raise_for_status() - - tickers_and_timestamps = [ - {"ticker": row[0], "timestamp": row[1]} - for row in prior_portfolio[["ticker", "timestamp"]].rows() - ] - prior_predictions_response = requests.get( - url=f"{DATAMANAGER_BASE_URL}/predictions", - params={"tickers_and_timestamps": json.dumps(tickers_and_timestamps)}, - timeout=60, - ) + # If no prior portfolio, return empty list + if prior_portfolio_response.status_code == HTTP_NOT_FOUND: + logger.info("No prior portfolio found, starting fresh") + return [] - prior_predictions_response.raise_for_status() + if prior_portfolio_response.status_code >= HTTP_BAD_REQUEST: + logger.warning( + "Failed to fetch prior portfolio from datamanager", + status_code=prior_portfolio_response.status_code, + ) + return [] - prior_portfolio = add_portfolio_action_column( - prior_portfolio=prior_portfolio, - current_timestamp=current_timestamp, - ) + response_text = prior_portfolio_response.text.strip() + if not response_text or response_text == "[]": + logger.info("Prior portfolio is empty") + return [] - prior_equity_bars = pl.read_parquet(io.BytesIO(prior_equity_bars_response.content)) + prior_portfolio_data = prior_portfolio_response.json() - prior_equity_bars_validated = equity_bars_schema.validate(prior_equity_bars) - prior_equity_bars = cast( - "pl.DataFrame", - prior_equity_bars_validated.collect() - if isinstance(prior_equity_bars_validated, pl.LazyFrame) - else prior_equity_bars_validated, - ) + if not prior_portfolio_data: + return [] - prior_equity_bars = add_equity_bars_returns_and_realized_volatility_columns( - prior_equity_bars=prior_equity_bars - ) + prior_portfolio = pl.DataFrame(prior_portfolio_data) - predictions_response_text = prior_predictions_response.text.strip() - if not predictions_response_text or predictions_response_text == "[]": - logger.warning( - "Prior predictions empty, returning portfolio without performance" - ) - return add_portfolio_action_column( - prior_portfolio=prior_portfolio, - current_timestamp=current_timestamp, - ) + if prior_portfolio.is_empty(): + return [] - try: - prior_predictions_data = prior_predictions_response.json() - except (ValueError, requests.exceptions.JSONDecodeError) as error: - logger.exception( - "Failed to retrieve prior portfolio", - error=str(error), - ) - return add_portfolio_action_column( - prior_portfolio=prior_portfolio, - current_timestamp=current_timestamp, - ) - - if not prior_predictions_data: - logger.warning("Prior predictions data is empty") - return add_portfolio_action_column( - prior_portfolio=prior_portfolio, - current_timestamp=current_timestamp, - ) - - prior_predictions = pl.DataFrame(prior_predictions_data).with_columns( - pl.col("timestamp").cast(pl.Float64) - ) + tickers = prior_portfolio["ticker"].unique().to_list() + logger.info("Retrieved prior portfolio tickers", count=len(tickers)) + return tickers # noqa: TRY300 - return add_portfolio_performance_columns( - prior_portfolio=prior_portfolio, - prior_equity_bars=prior_equity_bars, - prior_predictions=prior_predictions, - current_timestamp=current_timestamp, - ) + except (ValueError, requests.exceptions.JSONDecodeError) as e: + logger.warning("Failed to parse prior portfolio JSON", error=str(e)) + return [] + except Exception as e: # noqa: BLE001 + logger.warning("Unexpected error fetching prior portfolio", error=str(e)) + return [] def get_optimal_portfolio( current_predictions: pl.DataFrame, - prior_portfolio: pl.DataFrame, + prior_portfolio_tickers: list[str], maximum_capital: float, current_timestamp: datetime, ) -> pl.DataFrame: + """Create optimal portfolio with prediction ranking and ticker exclusion.""" optimal_portfolio = create_optimal_portfolio( current_predictions=current_predictions, - prior_portfolio=prior_portfolio, + prior_portfolio_tickers=prior_portfolio_tickers, maximum_capital=maximum_capital, current_timestamp=current_timestamp, ) @@ -542,33 +440,11 @@ def get_optimal_portfolio( def get_positions( - prior_portfolio: pl.DataFrame, + prior_portfolio_tickers: list[str], optimal_portfolio: pl.DataFrame, ) -> tuple[list[dict], list[dict]]: - prior_portfolio = prior_portfolio.clone() - optimal_portfolio = optimal_portfolio.clone() - - close_positions = [] - if not prior_portfolio.is_empty(): - prior_tickers = set(prior_portfolio["ticker"].to_list()) - optimal_tickers = set(optimal_portfolio["ticker"].to_list()) - closing_tickers = prior_tickers - optimal_tickers - - if closing_tickers: - close_positions = [ - { - "ticker": row["ticker"], - "side": ( - TradeSide.SELL - if row["side"] == PositionSide.LONG.value - else TradeSide.BUY - ), - "dollar_amount": row["dollar_amount"], - } - for row in prior_portfolio.filter( - pl.col("ticker").is_in(list(closing_tickers)) - ).iter_rows(named=True) - ] + """Get positions to close and open.""" + close_positions = [{"ticker": ticker} for ticker in prior_portfolio_tickers] open_positions = [ { @@ -580,9 +456,7 @@ def get_positions( ), "dollar_amount": row["dollar_amount"], } - for row in optimal_portfolio.filter( - pl.col("action") == PositionAction.OPEN_POSITION.value - ).iter_rows(named=True) + for row in optimal_portfolio.iter_rows(named=True) ] return open_positions, close_positions diff --git a/applications/portfoliomanager/tests/test_risk_management.py b/applications/portfoliomanager/tests/test_risk_management.py index c99876d66..f08cc7eb3 100644 --- a/applications/portfoliomanager/tests/test_risk_management.py +++ b/applications/portfoliomanager/tests/test_risk_management.py @@ -2,446 +2,13 @@ import polars as pl import pytest +from portfoliomanager.exceptions import InsufficientPredictionsError from portfoliomanager.risk_management import ( - add_equity_bars_returns_and_realized_volatility_columns, - add_portfolio_action_column, - add_portfolio_performance_columns, add_predictions_zscore_ranked_columns, create_optimal_portfolio, ) -def test_add_portfolio_action_column_same_day_positions_locked() -> None: - current_datetime = datetime(2024, 1, 15, 0, 0, 0, 0, tzinfo=UTC) - positions = pl.DataFrame( - { - "ticker": ["AAPL", "GOOGL"], - "timestamp": [ - datetime(2024, 1, 15, 9, 30, tzinfo=UTC).timestamp(), - datetime(2024, 1, 15, 14, 0, tzinfo=UTC).timestamp(), - ], - "side": ["LONG", "SHORT"], - "dollar_amount": [1000.0, 1000.0], - } - ) - - result = add_portfolio_action_column(positions, current_datetime) - - assert all(action == "PDT_LOCKED" for action in result["action"].to_list()) - assert len(result) == 2 # noqa: PLR2004 - - -def test_add_portfolio_action_column_previous_day_positions_unlocked() -> None: - current_datetime = datetime(2024, 1, 15, 0, 0, 0, 0, tzinfo=UTC) - positions = pl.DataFrame( - { - "ticker": ["AAPL", "GOOGL"], - "timestamp": [ - datetime(2024, 1, 14, 9, 30, tzinfo=UTC).timestamp(), - datetime(2024, 1, 13, 14, 0, tzinfo=UTC).timestamp(), - ], - "side": ["LONG", "SHORT"], - "dollar_amount": [1000.0, 1000.0], - } - ) - - result = add_portfolio_action_column(positions, current_datetime) - - assert all(action == "UNSPECIFIED" for action in result["action"].to_list()) - assert len(result) == 2 # noqa: PLR2004 - - -def test_add_portfolio_action_column_mixed_dates() -> None: - current_datetime = datetime(2024, 1, 15, 0, 0, 0, 0, tzinfo=UTC) - positions = pl.DataFrame( - { - "ticker": ["AAPL", "GOOGL", "TSLA"], - "timestamp": [ - datetime(2024, 1, 15, 9, 30, tzinfo=UTC).timestamp(), # same day - datetime(2024, 1, 14, 14, 0, tzinfo=UTC).timestamp(), # previous day - datetime(2024, 1, 15, 16, 0, tzinfo=UTC).timestamp(), # same day - ], - "side": ["LONG", "SHORT", "LONG"], - "dollar_amount": [1000.0, 1000.0, 1000.0], - } - ) - - result = add_portfolio_action_column(positions, current_datetime) - - expected_actions = ["PDT_LOCKED", "UNSPECIFIED", "PDT_LOCKED"] - assert result["action"].to_list() == expected_actions - - -def test_add_portfolio_action_column_empty_dataframe() -> None: - current_datetime = datetime(2024, 1, 15, 0, 0, 0, 0, tzinfo=UTC) - positions = pl.DataFrame( - {"ticker": [], "timestamp": [], "side": [], "dollar_amount": []} - ) - - result = add_portfolio_action_column(positions, current_datetime) - - assert len(result) == 0 - assert "action" in result.columns - - -def test_add_equity_bars_returns_and_realized_volatility_columns_sufficient_data_success() -> ( # noqa: E501 - None -): - equity_bars = pl.DataFrame( - { - "ticker": ["AAPL"] * 35, - "timestamp": [ - datetime(2024, 1, i + 1, tzinfo=UTC).timestamp() for i in range(31) - ] - + [datetime(2024, 2, i + 1, tzinfo=UTC).timestamp() for i in range(4)], - "close_price": list(range(100, 135)), # increasing prices - } - ) - - result = add_equity_bars_returns_and_realized_volatility_columns(equity_bars) - - assert "daily_returns" in result.columns - assert "log_daily_returns" in result.columns - assert "realized_volatility" in result.columns - assert len(result) == 35 # noqa: PLR2004 - - -def test_add_equity_bars_returns_and_realized_volatility_columns_insufficient_data_raises_error() -> ( # noqa: E501 - None -): - equity_bars = pl.DataFrame( - { - "ticker": ["AAPL"] * 25, # only 25 bars, need 30 - "timestamp": [ - datetime(2024, 1, i + 1, tzinfo=UTC).timestamp() for i in range(25) - ], - "close_price": list(range(100, 125)), - } - ) - - with pytest.raises(ValueError, match="Tickers with insufficient data"): - add_equity_bars_returns_and_realized_volatility_columns(equity_bars) - - -def test_add_equity_bars_returns_and_realized_volatility_columns_multiple_tickers_mixed_data() -> ( # noqa: E501 - None -): - equity_bars = pl.DataFrame( - { - "ticker": ["AAPL"] * 35 + ["GOOGL"] * 25, # AAPL has enough, GOOGL does not - "timestamp": [ - datetime(2024, 1, i + 1, tzinfo=UTC).timestamp() for i in range(31) - ] - + [datetime(2024, 2, i + 1, tzinfo=UTC).timestamp() for i in range(4)] - + [datetime(2024, 2, i + 1, tzinfo=UTC).timestamp() for i in range(25)], - "close_price": list(range(100, 135)) + list(range(200, 225)), - } - ) - - with pytest.raises(ValueError, match="GOOGL"): - add_equity_bars_returns_and_realized_volatility_columns(equity_bars) - - -def test_add_equity_bars_returns_grouped_per_ticker() -> None: - base_timestamp = datetime(2024, 1, 1, tzinfo=UTC).timestamp() - - aapl_data = [] - googl_data = [] - - for i in range(30): - timestamp = base_timestamp + (i * 86400) - aapl_price = 100.0 + i # AAPL prices increase - googl_price = 200.0 - i * 0.5 # GOOGL prices decrease slightly - - aapl_data.append( - {"ticker": "AAPL", "timestamp": timestamp, "close_price": aapl_price} - ) - googl_data.append( - {"ticker": "GOOGL", "timestamp": timestamp, "close_price": googl_price} - ) - - all_data = [] - for i in range(30): - all_data.append(aapl_data[i]) - all_data.append(googl_data[i]) - - equity_bars = pl.DataFrame(all_data) - - out = add_equity_bars_returns_and_realized_volatility_columns(equity_bars) - aapl = out.filter(pl.col("ticker") == "AAPL").sort("timestamp") - googl = out.filter(pl.col("ticker") == "GOOGL").sort("timestamp") - - aapl_returns = aapl["daily_returns"].to_list() - googl_returns = googl["daily_returns"].to_list() - - assert aapl_returns[0] is None - assert aapl_returns[1] == pytest.approx(0.01, abs=1e-9) - assert googl_returns[0] is None - assert googl_returns[1] == pytest.approx(-0.0025, abs=1e-9) - - aapl_log_returns = aapl["log_daily_returns"].to_list() - googl_log_returns = googl["log_daily_returns"].to_list() - - assert aapl_log_returns[0] is None - assert aapl_log_returns[1] == pytest.approx(0.00995, abs=1e-4) - assert googl_log_returns[0] is None - assert googl_log_returns[1] == pytest.approx(-0.00251, abs=1e-4) - - -def test_add_equity_bars_returns_and_realized_volatility_columns_null_prices_handled() -> ( # noqa: E501 - None -): - equity_bars = pl.DataFrame( - { - "ticker": ["AAPL"] * 35, - "timestamp": [ - datetime(2024, 1, i + 1, tzinfo=UTC).timestamp() for i in range(31) - ] - + [datetime(2024, 2, i + 1, tzinfo=UTC).timestamp() for i in range(4)], - "close_price": [ - 100.0, - None, - 102.0, - *list(range(103, 135)), - ], - } - ) - - result = add_equity_bars_returns_and_realized_volatility_columns(equity_bars) - - daily_returns = result["daily_returns"].to_list() - assert daily_returns[1] is None # second row should be null - - -def test_add_portfolio_performance_columns_long_position_outperforming() -> None: - base_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() - - positions = pl.DataFrame( - { - "ticker": ["AAPL"], - "timestamp": [base_timestamp], - "side": ["LONG"], - "dollar_amount": [1000.0], - "action": ["UNSPECIFIED"], - } - ) - - original_predictions = pl.DataFrame( - { - "ticker": ["AAPL"], - "timestamp": [base_timestamp], - "quantile_10": [-0.05], # -5% lower threshold - "quantile_90": [0.15], # +15% upper threshold - } - ) - - raw_equity_bars = pl.DataFrame( - { - "ticker": ["AAPL"] * 30, - "timestamp": [base_timestamp + (i * 86400) for i in range(30)], - "close_price": [100.0 + (20.0 * i / 29) for i in range(30)], - } - ) - - original_equity_bars = add_equity_bars_returns_and_realized_volatility_columns( - raw_equity_bars - ) - current_timestamp = datetime.fromtimestamp(base_timestamp + (29 * 86400), tz=UTC) - - result = add_portfolio_performance_columns( - positions, original_predictions, original_equity_bars, current_timestamp - ) - - assert result["action"][0] == "MAINTAIN_POSITION" # 20% > 15% threshold - - -def test_add_portfolio_performance_columns_long_position_underperforming() -> None: - base_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() - - positions = pl.DataFrame( - { - "ticker": ["AAPL"], - "timestamp": [base_timestamp], - "side": ["LONG"], - "dollar_amount": [1000.0], - "action": ["UNSPECIFIED"], - } - ) - - original_predictions = pl.DataFrame( - { - "ticker": ["AAPL"], - "timestamp": [base_timestamp], - "quantile_10": [-0.05], # -5% lower threshold - "quantile_90": [0.15], # +15% upper threshold - } - ) - - raw_equity_bars = pl.DataFrame( - { - "ticker": ["AAPL"] * 30, - "timestamp": [base_timestamp + (i * 86400) for i in range(30)], - "close_price": [100.0 - (10.0 * i / 29) for i in range(30)], - } - ) - - original_equity_bars = add_equity_bars_returns_and_realized_volatility_columns( - raw_equity_bars - ) - current_timestamp = datetime.fromtimestamp(base_timestamp + (29 * 86400), tz=UTC) - - result = add_portfolio_performance_columns( - positions, original_predictions, original_equity_bars, current_timestamp - ) - - assert result["action"][0] == "CLOSE_POSITION" # -10% < -5% threshold - - -def test_add_portfolio_performance_columns_short_position_outperforming() -> None: - base_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() - - positions = pl.DataFrame( - { - "ticker": ["AAPL"], - "timestamp": [base_timestamp], - "side": ["SHORT"], - "dollar_amount": [1000.0], - "action": ["UNSPECIFIED"], - } - ) - - original_predictions = pl.DataFrame( - { - "ticker": ["AAPL"], - "timestamp": [base_timestamp], - "quantile_10": [-0.05], # -5% lower threshold - "quantile_90": [0.15], # +15% upper threshold - } - ) - - raw_equity_bars = pl.DataFrame( - { - "ticker": ["AAPL"] * 30, - "timestamp": [base_timestamp + (i * 86400) for i in range(30)], - "close_price": [100.0 - (10.0 * i / 29) for i in range(30)], - } - ) - - original_equity_bars = add_equity_bars_returns_and_realized_volatility_columns( - raw_equity_bars - ) - current_timestamp = datetime.fromtimestamp(base_timestamp + (29 * 86400), tz=UTC) - - result = add_portfolio_performance_columns( - positions, original_predictions, original_equity_bars, current_timestamp - ) - - assert ( - result["action"][0] == "MAINTAIN_POSITION" - ) # -10% <= -5% threshold (good for short) - - -def test_add_portfolio_performance_columns_pdt_locked_position_maintained() -> None: - base_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() - - positions = pl.DataFrame( - { - "ticker": ["AAPL"], - "timestamp": [base_timestamp], - "side": ["LONG"], - "dollar_amount": [1000.0], - "action": ["PDT_LOCKED"], # pdt locked - } - ) - - original_predictions = pl.DataFrame( - { - "ticker": ["AAPL"], - "timestamp": [base_timestamp], - "quantile_10": [-0.05], - "quantile_90": [0.15], - } - ) - - raw_equity_bars = pl.DataFrame( - { - "ticker": ["AAPL"] * 30, - "timestamp": [base_timestamp + (i * 86400) for i in range(30)], - "close_price": [100.0 - (20.0 * i / 29) for i in range(30)], - } - ) - - original_equity_bars = add_equity_bars_returns_and_realized_volatility_columns( - raw_equity_bars - ) - current_timestamp = datetime.fromtimestamp(base_timestamp + (29 * 86400), tz=UTC) - - result = add_portfolio_performance_columns( - positions, original_predictions, original_equity_bars, current_timestamp - ) - - assert result["action"][0] == "PDT_LOCKED" # pdt locked overrides performance - - -def test_add_portfolio_performance_columns_multiple_tickers_independent() -> None: - current_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() - - positions = pl.DataFrame( - { - "ticker": ["AAPL", "GOOGL"], - "timestamp": [current_timestamp, current_timestamp], - "side": ["LONG", "LONG"], - "dollar_amount": [1000.0, 1000.0], - "action": ["UNSPECIFIED", "UNSPECIFIED"], - } - ) - - predictions = pl.DataFrame( - { - "ticker": ["AAPL", "GOOGL"], - "timestamp": [current_timestamp, current_timestamp], - "quantile_10": [-0.05, -0.05], - "quantile_90": [0.15, 0.15], - } - ) - - aapl_data = [] - googl_data = [] - - for i in range(30): - timestamp = current_timestamp + (i * 86400) - aapl_price = 100.0 + (20.0 * i / 29) - googl_price = 200.0 - (20.0 * i / 29) - - aapl_data.append( - {"ticker": "AAPL", "timestamp": timestamp, "close_price": aapl_price} - ) - googl_data.append( - {"ticker": "GOOGL", "timestamp": timestamp, "close_price": googl_price} - ) - - all_data = [] - for i in range(30): - all_data.append(aapl_data[i]) - all_data.append(googl_data[i]) - - raw_equity_bars = pl.DataFrame(all_data) - - equity_bars = add_equity_bars_returns_and_realized_volatility_columns( - raw_equity_bars - ) - - out = add_portfolio_performance_columns( - positions, - predictions, - equity_bars, - datetime.fromtimestamp(current_timestamp + (29 * 86400), tz=UTC), # 30th day - ) - - assert out.filter(pl.col("ticker") == "AAPL")["action"][0] == "MAINTAIN_POSITION" - assert out.filter(pl.col("ticker") == "GOOGL")["action"][0] == "CLOSE_POSITION" - - def test_add_predictions_zscore_ranked_columns_zscore_calculation() -> None: predictions = pl.DataFrame( { @@ -501,566 +68,246 @@ def test_add_predictions_zscore_ranked_columns_single_prediction() -> None: assert result["z_score_return"][0] == 0.0 # single value has z-score of 0 -def test_create_optimal_portfolio_fresh_start_no_existing_positions() -> None: +def test_create_optimal_portfolio_fresh_start_no_prior_tickers() -> None: + """Test portfolio creation with no prior portfolio (fresh start).""" + current_timestamp = datetime(2024, 1, 15, 9, 30, tzinfo=UTC) + + # Create 30 predictions with varying scores predictions = pl.DataFrame( { - "ticker": [f"STOCK{i}" for i in range(25)], - "quantile_10": [0.0] * 25, - "quantile_50": [0.1] * 25, - "quantile_90": [0.2] * 25, - "composite_score": list(range(25, 0, -1)), # descending scores - "inter_quartile_range": [0.1] * 25, # low uncertainty + "ticker": [f"TICK{i:02d}" for i in range(30)], + "quantile_10": [0.0] * 30, + "quantile_50": [i * 0.01 for i in range(30)], # 0%, 1%, 2%, ..., 29% + "quantile_90": [0.05] * 30, # Low uncertainty (IQR = 0.05 < 0.1 threshold) } ) - positions = pl.DataFrame( - { - "ticker": [], - "timestamp": [], - "side": [], - "dollar_amount": [], - "action": [], - } - ) + # Rank and sort predictions + ranked_predictions = add_predictions_zscore_ranked_columns(predictions) result = create_optimal_portfolio( - predictions, positions, 20000.0, datetime.now(tz=UTC) + current_predictions=ranked_predictions, + prior_portfolio_tickers=[], # No prior portfolio + maximum_capital=10000.0, + current_timestamp=current_timestamp, ) - assert len(result) == 20 # 10 long + 10 short # noqa: PLR2004 + # Should create 20 positions (10 long, 10 short) + assert len(result) == 20 # noqa: PLR2004 assert result.filter(pl.col("side") == "LONG").height == 10 # noqa: PLR2004 assert result.filter(pl.col("side") == "SHORT").height == 10 # noqa: PLR2004 - long_total = result.filter(pl.col("side") == "LONG")["dollar_amount"].sum() - short_total = result.filter(pl.col("side") == "SHORT")["dollar_amount"].sum() - assert abs(long_total - short_total) < 0.01 # noqa: PLR2004 + # All positions should have action=OPEN_POSITION + assert all(action == "OPEN_POSITION" for action in result["action"].to_list()) + # Equal dollar allocation: 50% to longs, 50% to shorts + long_capital = result.filter(pl.col("side") == "LONG")["dollar_amount"].sum() + short_capital = result.filter(pl.col("side") == "SHORT")["dollar_amount"].sum() + assert long_capital == pytest.approx(5000.0) + assert short_capital == pytest.approx(5000.0) -def test_create_optimal_portfolio_some_maintained_positions() -> None: - predictions = pl.DataFrame( - { - "ticker": [f"STOCK{i}" for i in range(25)], - "quantile_10": [0.0] * 25, - "quantile_50": [0.1] * 25, - "quantile_90": [0.2] * 25, - "composite_score": list(range(25, 0, -1)), - "inter_quartile_range": [0.1] * 25, - } + # Each position should get (capital / 2) / 10 + expected_amount = 500.0 + assert all( + amount == pytest.approx(expected_amount) + for amount in result["dollar_amount"].to_list() ) - positions = pl.DataFrame( - { - "ticker": ["STOCK1", "STOCK2", "STOCK24"], - "timestamp": [ - datetime(2024, 1, 10, tzinfo=UTC).timestamp(), - datetime(2024, 1, 11, tzinfo=UTC).timestamp(), - datetime(2024, 1, 12, tzinfo=UTC).timestamp(), - ], - "side": ["LONG", "LONG", "SHORT"], - "dollar_amount": [1000.0, 1000.0, 1000.0], - "action": ["MAINTAIN_POSITION", "MAINTAIN_POSITION", "MAINTAIN_POSITION"], - } - ) + # Top 10 should be long (highest composite scores) + long_tickers = result.filter(pl.col("side") == "LONG")["ticker"].to_list() + expected_long = [f"TICK{i:02d}" for i in range(29, 19, -1)] # TICK29 to TICK20 + assert set(long_tickers) == set(expected_long) - result = create_optimal_portfolio( - predictions, positions, 20000.0, datetime.now(tz=UTC) - ) + # Bottom 10 should be short (lowest composite scores) + short_tickers = result.filter(pl.col("side") == "SHORT")["ticker"].to_list() + expected_short = [f"TICK{i:02d}" for i in range(10)] # TICK00 to TICK09 + assert set(short_tickers) == set(expected_short) - assert len(result) == 20 # noqa: PLR2004 - assert "STOCK1" in result["ticker"].to_list() - assert "STOCK2" in result["ticker"].to_list() - assert "STOCK24" in result["ticker"].to_list() +def test_create_optimal_portfolio_with_prior_ticker_exclusion() -> None: + """Test that prior portfolio tickers are excluded to avoid PDT violations.""" + current_timestamp = datetime(2024, 1, 15, 9, 30, tzinfo=UTC) -def test_create_optimal_portfolio_high_uncertainty_exclusions() -> None: + # Create 30 predictions predictions = pl.DataFrame( { - "ticker": ["HIGH_UNCERT", "LOW_UNCERT1", "LOW_UNCERT2"], - "quantile_10": [0.0, 0.0, 0.0], - "quantile_50": [0.1, 0.1, 0.1], - "quantile_90": [0.2, 0.2, 0.2], - "composite_score": [10.0, 5.0, 1.0], - "inter_quartile_range": [0.8, 0.1, 0.1], # first one too uncertain - } - ) - - positions = pl.DataFrame( - { - "ticker": [], - "timestamp": [], - "side": [], - "dollar_amount": [], - "action": [], + "ticker": [f"TICK{i:02d}" for i in range(30)], + "quantile_10": [0.0] * 30, + "quantile_50": [i * 0.01 for i in range(30)], + "quantile_90": [0.05] * 30, # Low uncertainty (IQR = 0.05 < 0.1 threshold) } ) - result = create_optimal_portfolio( - predictions, positions, 20000.0, datetime.now(tz=UTC) - ) + # Rank and sort predictions + ranked_predictions = add_predictions_zscore_ranked_columns(predictions) - assert "HIGH_UNCERT" not in result["ticker"].to_list() - assert len(result) == 2 # only 2 available predictions # noqa: PLR2004 - - -def test_create_optimal_portfolio_all_positions_maintained_no_new_needed() -> None: - predictions = pl.DataFrame( - { - "ticker": [f"STOCK{i}" for i in range(25)], - "quantile_10": [0.0] * 25, - "quantile_50": [0.1] * 25, - "quantile_90": [0.2] * 25, - "composite_score": list(range(25, 0, -1)), - "inter_quartile_range": [0.1] * 25, - } - ) - - positions = pl.DataFrame( - { - "ticker": [f"MAINTAINED{i}" for i in range(20)], - "timestamp": [datetime(2024, 1, 10, tzinfo=UTC).timestamp()] * 20, - "side": ["LONG"] * 10 + ["SHORT"] * 10, - "dollar_amount": [500.0] * 20, - "action": ["MAINTAIN_POSITION"] * 20, - } - ) + # Exclude the top 5 tickers from prior portfolio + prior_tickers = ["TICK29", "TICK28", "TICK27", "TICK26", "TICK25"] result = create_optimal_portfolio( - predictions, positions, 20000.0, datetime.now(tz=UTC) + current_predictions=ranked_predictions, + prior_portfolio_tickers=prior_tickers, + maximum_capital=10000.0, + current_timestamp=current_timestamp, ) - assert len(result) == 20 # all maintained positions # noqa: PLR2004 - expected_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() - assert all(ts == expected_timestamp for ts in result["timestamp"].to_list()) - - -def test_create_optimal_portfolio_capital_rebalancing_with_closed_positions() -> None: - predictions = pl.DataFrame( - { - "ticker": [f"NEW{i}" for i in range(15)], - "quantile_10": [0.0] * 15, - "quantile_50": [0.1] * 15, - "quantile_90": [0.2] * 15, - "composite_score": list(range(15, 0, -1)), - "inter_quartile_range": [0.1] * 15, - } - ) - - positions = pl.DataFrame( - { - "ticker": ["MAINTAINED1", "MAINTAINED2", "CLOSED1", "CLOSED2"], - "timestamp": [ - datetime(2024, 1, 10, tzinfo=UTC).timestamp(), - datetime(2024, 1, 11, tzinfo=UTC).timestamp(), - datetime(2024, 1, 12, tzinfo=UTC).timestamp(), - datetime(2024, 1, 13, tzinfo=UTC).timestamp(), - ], - "side": ["LONG", "SHORT", "LONG", "SHORT"], - "dollar_amount": [800.0, 1200.0, 500.0, 500.0], # uneven amounts - "action": [ - "MAINTAIN_POSITION", - "MAINTAIN_POSITION", - "CLOSE_POSITION", - "CLOSE_POSITION", - ], - } - ) - - result = create_optimal_portfolio( - predictions, positions, 20000.0, datetime.now(tz=UTC) - ) + # Should still create 20 positions + assert len(result) == 20 # noqa: PLR2004 - # 2 maintained + 15 new (limited by available predictions) - # even though this isn't a realistic scenario - assert len(result) == 17 # noqa: PLR2004 + # None of the prior tickers should appear in the new portfolio + result_tickers = result["ticker"].to_list() + for ticker in prior_tickers: + assert ticker not in result_tickers - maintained = result.filter( - pl.col("timestamp").is_in( - [ - datetime(2024, 1, 10, tzinfo=UTC).timestamp(), - datetime(2024, 1, 11, tzinfo=UTC).timestamp(), - ] - ) - ) - assert len(maintained) == 2 # noqa: PLR2004 + # Since top 5 are excluded, next 10 should be long (TICK24 to TICK15) + long_tickers = result.filter(pl.col("side") == "LONG")["ticker"].to_list() + expected_long = [f"TICK{i:02d}" for i in range(24, 14, -1)] + assert set(long_tickers) == set(expected_long) - long_total = result.filter(pl.col("side") == "LONG")["dollar_amount"].sum() - short_total = result.filter(pl.col("side") == "SHORT")["dollar_amount"].sum() - assert abs(long_total - short_total) < 0.01 # noqa: PLR2004 +def test_create_optimal_portfolio_high_uncertainty_exclusions() -> None: + """Test that high uncertainty predictions are excluded.""" + current_timestamp = datetime(2024, 1, 15, 9, 30, tzinfo=UTC) -def test_create_optimal_portfolio_mixed_closed_and_maintained_positions() -> None: + # Create 25 predictions: 20 low uncertainty, 5 high uncertainty + high_uncertainty_count = 5 + tickers = [f"TICK{i:02d}" for i in range(25)] predictions = pl.DataFrame( { - "ticker": [f"STOCK{i:02d}" for i in range(30)], - "quantile_10": [0.0] * 30, - "quantile_50": [0.05] * 30, - "quantile_90": [0.1] * 30, - "composite_score": list(range(30, 0, -1)), - "inter_quartile_range": [0.05] - * 30, # all acceptable uncertainty (below 0.1 threshold) - } - ) - - positions = pl.DataFrame( - { - "ticker": ["OLD1", "OLD2", "OLD3", "OLD4", "OLD5"], - "timestamp": [ - datetime(2024, 1, 10, tzinfo=UTC).timestamp(), - datetime(2024, 1, 11, tzinfo=UTC).timestamp(), - datetime(2024, 1, 12, tzinfo=UTC).timestamp(), - datetime(2024, 1, 13, tzinfo=UTC).timestamp(), - datetime(2024, 1, 14, tzinfo=UTC).timestamp(), - ], - "side": ["LONG", "LONG", "SHORT", "SHORT", "LONG"], - "dollar_amount": [1000.0, 1000.0, 1000.0, 1000.0, 1000.0], - "action": [ - "CLOSE_POSITION", - "MAINTAIN_POSITION", - "MAINTAIN_POSITION", - "CLOSE_POSITION", - "MAINTAIN_POSITION", + "ticker": tickers, + "quantile_10": [0.0] * 25, + "quantile_50": [i * 0.01 for i in range(25)], + # First 5 have high uncertainty (IQR > 0.1), rest have low uncertainty + "quantile_90": [0.50] * high_uncertainty_count + [0.08] * 20, + "z_score_return": [float(i) for i in range(25)], + "inter_quartile_range": [0.50] * high_uncertainty_count + [0.08] * 20, + "composite_score": [ + float(i) / 1.5 if i < high_uncertainty_count else float(i) / 1.08 + for i in range(25) ], + "action": ["UNSPECIFIED"] * 25, } ) result = create_optimal_portfolio( - predictions, - positions, - 20000.0, - datetime.now(tz=UTC), + current_predictions=predictions, + prior_portfolio_tickers=[], + maximum_capital=10000.0, + current_timestamp=current_timestamp, ) + # Should create 20 positions from the 20 low-uncertainty tickers assert len(result) == 20 # noqa: PLR2004 - maintained_tickers = ["OLD2", "OLD3", "OLD5"] - for ticker in maintained_tickers: - assert ticker in result["ticker"].to_list() - - closed_tickers = ["OLD1", "OLD4"] - for ticker in closed_tickers: - assert ticker not in result["ticker"].to_list() + # None of the high uncertainty tickers should appear + result_tickers = result["ticker"].to_list() + for i in range(high_uncertainty_count): + assert f"TICK{i:02d}" not in result_tickers - assert "ticker" in result.columns - assert "timestamp" in result.columns - assert "side" in result.columns - assert "dollar_amount" in result.columns - assert "action" in result.columns - assert len(result.columns) == 5 # noqa: PLR2004 - - sorted_result = result.sort(["ticker", "side"]) - assert sorted_result.equals(result) - - -def test_add_portfolio_performance_columns_rebalancing_closes_shorts_when_longs_stopped() -> ( # noqa: E501 - None -): - """Test that rebalancing closes best shorts when longs hit stop-loss.""" - base_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() - positions = pl.DataFrame( - { - "ticker": [ - "LONG1", - "LONG2", - "LONG3", - "SHORT1", - "SHORT2", - "SHORT3", - "SHORT4", - ], - "timestamp": [base_timestamp] * 7, - "side": ["LONG", "LONG", "LONG", "SHORT", "SHORT", "SHORT", "SHORT"], - "dollar_amount": [1000.0] * 7, - "action": ["UNSPECIFIED"] * 7, - } - ) - - predictions = pl.DataFrame( - { - "ticker": [ - "LONG1", - "LONG2", - "LONG3", - "SHORT1", - "SHORT2", - "SHORT3", - "SHORT4", - ], - "timestamp": [base_timestamp] * 7, - "quantile_10": [-0.05] * 7, - "quantile_90": [0.15] * 7, - } - ) - - raw_equity_bars = [] - for ticker in ["LONG1", "LONG2", "LONG3", "SHORT1", "SHORT2", "SHORT3", "SHORT4"]: - for i in range(30): - timestamp = base_timestamp + (i * 86400) - if ticker == "LONG1": - price = 100.0 - (10.0 * i / 29) # long1 losing -10% - elif ticker == "LONG2": - price = 100.0 - (10.0 * i / 29) # long2 losing -10% - elif ticker == "LONG3": - price = 100.0 - ( - 2.0 * i / 29 - ) # long3 losing -2% (not enough for stop-loss) - elif ticker == "SHORT1": - price = 100.0 - (4.5 * i / 29) # short1 falling -4.5% (best) - elif ticker == "SHORT2": - price = 100.0 - (4.0 * i / 29) # short2 falling -4.0% (second) - elif ticker == "SHORT3": - price = 100.0 - (1.0 * i / 29) # short3 falling -1% (worst) - else: # SHORT4 - price = 100.0 - (3.0 * i / 29) # short4 falling -3% (third) - - raw_equity_bars.append( - {"ticker": ticker, "timestamp": timestamp, "close_price": price} - ) - - equity_bars = add_equity_bars_returns_and_realized_volatility_columns( - pl.DataFrame(raw_equity_bars) - ) - current_timestamp = datetime.fromtimestamp(base_timestamp + (29 * 86400), tz=UTC) - - result = add_portfolio_performance_columns( - positions, predictions, equity_bars, current_timestamp - ) - - expected_closed_long_count = 2 - expected_closed_short_count = 2 - expected_open_short_count = 2 - - # 2 longs should be closed (LONG1 and LONG2 hit stop-loss at -10% < -5%) - closed_longs = result.filter( - (pl.col("side") == "LONG") & (pl.col("action") == "CLOSE_POSITION") - ) - assert closed_longs.height == expected_closed_long_count - - # 2 shorts should also be closed by rebalancing (to match 2 closed longs) - # None of the shorts hit stop-loss (all have negative returns, not >= +15%) - # Rebalancing closes top 2 performing shorts: SHORT1 (-5%) and SHORT4 (-3%) - closed_shorts = result.filter( - (pl.col("side") == "SHORT") & (pl.col("action") == "CLOSE_POSITION") - ) - assert closed_shorts.height == expected_closed_short_count - - # Verify best-performing shorts were closed (most negative cumulative return) - # SHORT1 (-4.5%) and SHORT2 (-4.0%) should be closed by rebalancing - closed_short_tickers = closed_shorts["ticker"].to_list() - assert "SHORT1" in closed_short_tickers - assert "SHORT2" in closed_short_tickers - - # Verify other shorts remain open - open_shorts = result.filter( - (pl.col("side") == "SHORT") & (pl.col("action") != "CLOSE_POSITION") - ) - assert open_shorts.height == expected_open_short_count - open_short_tickers = open_shorts["ticker"].to_list() - assert "SHORT3" in open_short_tickers - assert "SHORT4" in open_short_tickers - - -def test_add_portfolio_performance_columns_rebalancing_closes_longs_when_shorts_stopped() -> ( # noqa: E501 - None -): - """Test that when shorts hit stop-loss, best-performing longs are closed for rebalancing.""" # noqa: E501 - base_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() - - positions = pl.DataFrame( - { - "ticker": ["LONG1", "LONG2", "LONG3", "SHORT1", "SHORT2"], - "timestamp": [base_timestamp] * 5, - "side": ["LONG", "LONG", "LONG", "SHORT", "SHORT"], - "dollar_amount": [1000.0] * 5, - "action": ["UNSPECIFIED"] * 5, - } - ) +def test_create_optimal_portfolio_insufficient_after_exclusions() -> None: + """Test that InsufficientPredictionsError is raised when fewer than 20 available.""" + current_timestamp = datetime(2024, 1, 15, 9, 30, tzinfo=UTC) + # Create 25 predictions: 15 high uncertainty, 5 prior portfolio, only 5 available predictions = pl.DataFrame( { - "ticker": ["LONG1", "LONG2", "LONG3", "SHORT1", "SHORT2"], - "timestamp": [base_timestamp] * 5, - "quantile_10": [-0.05] * 5, - "quantile_90": [0.15] * 5, - } - ) - - raw_equity_bars = [] - for ticker in ["LONG1", "LONG2", "LONG3", "SHORT1", "SHORT2"]: - for i in range(30): - timestamp = base_timestamp + (i * 86400) - if ticker == "LONG1": - price = 100.0 + (10.0 * i / 29) # long1 gaining +10% - elif ticker == "LONG2": - price = 100.0 + (12.0 * i / 29) # long2 gaining +12% (best performer) - elif ticker == "LONG3": - price = 100.0 + (5.0 * i / 29) # long3 gaining +5% - else: # SHORT1, SHORT2 - price = 100.0 + (20.0 * i / 29) # shorts gaining +20% (hit stop-loss) - - raw_equity_bars.append( - {"ticker": ticker, "timestamp": timestamp, "close_price": price} - ) - - equity_bars = add_equity_bars_returns_and_realized_volatility_columns( - pl.DataFrame(raw_equity_bars) - ) - current_timestamp = datetime.fromtimestamp(base_timestamp + (29 * 86400), tz=UTC) - - result = add_portfolio_performance_columns( - positions, predictions, equity_bars, current_timestamp - ) - - # 2 shorts should be closed (hit stop-loss at +20% > +15%) - closed_shorts = result.filter( - (pl.col("side") == "SHORT") & (pl.col("action") == "CLOSE_POSITION") - ) - assert closed_shorts.height == 2 # noqa: PLR2004 - - # 2 longs should also be closed for rebalancing (best performers) - closed_longs = result.filter( - (pl.col("side") == "LONG") & (pl.col("action") == "CLOSE_POSITION") - ) - assert closed_longs.height == 2 # noqa: PLR2004 - - # Verify that best-performing longs were closed - closed_long_tickers = closed_longs["ticker"].to_list() - assert "LONG2" in closed_long_tickers # best performer (+12%) - assert "LONG1" in closed_long_tickers # second best (+10%) + "ticker": [f"TICK{i:02d}" for i in range(25)], + "quantile_10": [0.0] * 25, + "quantile_50": [i * 0.01 for i in range(25)], + # First 15 have high uncertainty (IQR > 0.1) + "quantile_90": [0.50] * 15 + [0.08] * 10, + "z_score_return": [float(i) for i in range(25)], + "inter_quartile_range": [0.50] * 15 + [0.08] * 10, + "composite_score": [float(i) / 1.5 for i in range(25)], + "action": ["UNSPECIFIED"] * 25, + } + ) + + # Exclude 5 more tickers as prior portfolio (from the low-uncertainty ones) + prior_tickers = [f"TICK{i:02d}" for i in range(15, 20)] + + # Should raise InsufficientPredictionsError (only 5 available, need 20) + with pytest.raises(InsufficientPredictionsError) as exc_info: + create_optimal_portfolio( + current_predictions=predictions, + prior_portfolio_tickers=prior_tickers, + maximum_capital=10000.0, + current_timestamp=current_timestamp, + ) + assert "Only 5 predictions available" in str(exc_info.value) + assert "need 20" in str(exc_info.value) -def test_add_portfolio_performance_columns_no_rebalancing_when_equal_closures() -> None: - """Test that no additional rebalancing occurs when equal positions are closed.""" - base_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() - positions = pl.DataFrame( - { - "ticker": ["LONG1", "LONG2", "SHORT1", "SHORT2"], - "timestamp": [base_timestamp] * 4, - "side": ["LONG", "LONG", "SHORT", "SHORT"], - "dollar_amount": [1000.0] * 4, - "action": ["UNSPECIFIED"] * 4, - } - ) +def test_create_optimal_portfolio_equal_capital_allocation() -> None: + """Test that capital is allocated equally across positions.""" + current_timestamp = datetime(2024, 1, 15, 9, 30, tzinfo=UTC) predictions = pl.DataFrame( { - "ticker": ["LONG1", "LONG2", "SHORT1", "SHORT2"], - "timestamp": [base_timestamp] * 4, - "quantile_10": [-0.05] * 4, - "quantile_90": [0.15] * 4, + "ticker": [f"TICK{i:02d}" for i in range(30)], + "quantile_10": [0.0] * 30, + "quantile_50": [i * 0.01 for i in range(30)], + "quantile_90": [0.05] * 30, # Low uncertainty (IQR = 0.05 < 0.1 threshold) } ) - raw_equity_bars = [] - for ticker in ["LONG1", "LONG2", "SHORT1", "SHORT2"]: - for i in range(30): - timestamp = base_timestamp + (i * 86400) - if ticker in ["LONG1", "LONG2"]: - price = 100.0 - (10.0 * i / 29) # longs losing -10% - else: # SHORT1, SHORT2 - price = 100.0 + (20.0 * i / 29) # shorts gaining +20% + # Rank and sort predictions + ranked_predictions = add_predictions_zscore_ranked_columns(predictions) - raw_equity_bars.append( - {"ticker": ticker, "timestamp": timestamp, "close_price": price} - ) - - equity_bars = add_equity_bars_returns_and_realized_volatility_columns( - pl.DataFrame(raw_equity_bars) - ) - current_timestamp = datetime.fromtimestamp(base_timestamp + (29 * 86400), tz=UTC) - - result = add_portfolio_performance_columns( - positions, predictions, equity_bars, current_timestamp - ) + # Test with different capital amounts + for capital in [10000.0, 25000.0, 50000.0]: + result = create_optimal_portfolio( + current_predictions=ranked_predictions, + prior_portfolio_tickers=[], + maximum_capital=capital, + current_timestamp=current_timestamp, + ) - # 2 longs and 2 shorts should be closed (no rebalancing needed) - closed_longs = result.filter( - (pl.col("side") == "LONG") & (pl.col("action") == "CLOSE_POSITION") - ).height - closed_shorts = result.filter( - (pl.col("side") == "SHORT") & (pl.col("action") == "CLOSE_POSITION") - ).height + expected_per_position = capital / 20 + for amount in result["dollar_amount"].to_list(): + assert amount == pytest.approx(expected_per_position) - assert closed_longs == 2 # noqa: PLR2004 - assert closed_shorts == 2 # noqa: PLR2004 + # Long and short should be equal + long_sum = result.filter(pl.col("side") == "LONG")["dollar_amount"].sum() + short_sum = result.filter(pl.col("side") == "SHORT")["dollar_amount"].sum() + assert long_sum == pytest.approx(short_sum) + assert long_sum == pytest.approx(capital / 2) -def test_add_portfolio_performance_columns_rebalancing_respects_pdt_locked_positions() -> ( # noqa: E501 - None -): - """Test that rebalancing does not close PDT-locked positions.""" - base_timestamp = datetime(2024, 1, 10, tzinfo=UTC).timestamp() +def test_create_optimal_portfolio_head_tail_selection() -> None: + """Test that top 10 are long, bottom 10 are short based on composite scores.""" + current_timestamp = datetime(2024, 1, 15, 9, 30, tzinfo=UTC) - positions = pl.DataFrame( - { - "ticker": ["LONG1", "LONG2", "SHORT1", "SHORT2", "SHORT3"], - "timestamp": [base_timestamp] * 5, - "side": ["LONG", "LONG", "SHORT", "SHORT", "SHORT"], - "dollar_amount": [1000.0] * 5, - "action": [ - "UNSPECIFIED", - "UNSPECIFIED", - "PDT_LOCKED", - "UNSPECIFIED", - "UNSPECIFIED", - ], - } - ) + # Create predictions with known composite scores (via quantile_50 values) + tickers = [f"TICK{i:02d}" for i in range(30)] predictions = pl.DataFrame( { - "ticker": ["LONG1", "LONG2", "SHORT1", "SHORT2", "SHORT3"], - "timestamp": [base_timestamp] * 5, - "quantile_10": [-0.05] * 5, - "quantile_90": [0.15] * 5, + "ticker": tickers, + "quantile_10": [0.0] * 30, + "quantile_50": [i * 0.01 for i in range(30)], # 0, 0.01, 0.02, ..., 0.29 + "quantile_90": [0.05] * 30, # Low uncertainty (IQR = 0.05 < 0.1 threshold) } ) - raw_equity_bars = [] - for ticker in ["LONG1", "LONG2", "SHORT1", "SHORT2", "SHORT3"]: - for i in range(30): - timestamp = base_timestamp + (i * 86400) - if ticker in ["LONG1", "LONG2"]: - price = 100.0 - (10.0 * i / 29) # longs losing -10% - elif ticker == "SHORT1": - price = 100.0 - (10.0 * i / 29) # short1 -10% (best, but PDT locked) - elif ticker == "SHORT2": - price = 100.0 - (3.0 * i / 29) # short2 -3% (second best) - else: # SHORT3 - price = 100.0 - (2.0 * i / 29) # short3 -2% (worst) - - raw_equity_bars.append( - {"ticker": ticker, "timestamp": timestamp, "close_price": price} - ) - - equity_bars = add_equity_bars_returns_and_realized_volatility_columns( - pl.DataFrame(raw_equity_bars) - ) - current_timestamp = datetime.fromtimestamp(base_timestamp + (29 * 86400), tz=UTC) - - result = add_portfolio_performance_columns( - positions, predictions, equity_bars, current_timestamp - ) + # Rank and sort predictions (will sort by composite score descending) + ranked_predictions = add_predictions_zscore_ranked_columns(predictions) - # 2 longs should be closed (both hit stop-loss at -10% < -5%) - closed_longs = result.filter( - (pl.col("side") == "LONG") & (pl.col("action") == "CLOSE_POSITION") - ) - assert closed_longs.height == 2 # noqa: PLR2004 - - # 2 shorts should be closed for rebalancing (not SHORT1 since it's PDT locked) - # Rebalancing closes SHORT2 and SHORT3 to match the 2 closed longs - closed_shorts = result.filter( - (pl.col("side") == "SHORT") & (pl.col("action") == "CLOSE_POSITION") + result = create_optimal_portfolio( + current_predictions=ranked_predictions, + prior_portfolio_tickers=[], + maximum_capital=10000.0, + current_timestamp=current_timestamp, ) - assert closed_shorts.height == 2 # noqa: PLR2004 - # SHORT1 should remain PDT_LOCKED, not be closed - short1_action = result.filter(pl.col("ticker") == "SHORT1")["action"][0] - assert short1_action == "PDT_LOCKED" + # Top 10 (highest composite scores: 29, 28, ..., 20) should be LONG + long_tickers = result.filter(pl.col("side") == "LONG")["ticker"].to_list() + expected_long = [f"TICK{i:02d}" for i in range(29, 19, -1)] + assert set(long_tickers) == set(expected_long) - # SHORT2 and SHORT3 should be closed (SHORT1 skipped due to PDT_LOCKED) - closed_short_tickers = closed_shorts["ticker"].to_list() - assert "SHORT2" in closed_short_tickers - assert "SHORT3" in closed_short_tickers + # Bottom 10 (lowest composite scores: 0, 1, ..., 9) should be SHORT + short_tickers = result.filter(pl.col("side") == "SHORT")["ticker"].to_list() + expected_short = [f"TICK{i:02d}" for i in range(10)] + assert set(short_tickers) == set(expected_short) From 24576bb115fcc3470228b9c1e9f6df949a9d9569 Mon Sep 17 00:00:00 2001 From: John Forstmeier Date: Fri, 6 Feb 2026 21:26:56 -0500 Subject: [PATCH 5/5] Address PR #749 review feedback: constants, sorting, error handling, and tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause: PR #749 had 14 unresolved review threads from copilot and coderabbitai covering: - Hard-coded magic numbers (10, 20) scattered throughout risk_management.py - Fragile sorting assumption in create_optimal_portfolio (relied on pre-sorted input) - Potential duplicate counting in excluded_tickers logging - Poor error handling (logger.warning instead of logger.exception, broad exception swallowing) - Missing test coverage for unsorted input - Non-parameterized capital loop test Fix: 1. Constants (Group 1): - Added SIDE_SIZE = REQUIRED_PORTFOLIO_SIZE // 2 constant - Replaced all 5 hard-coded "10" occurrences with SIDE_SIZE - Replaced hard-coded "20" in log message with REQUIRED_PORTFOLIO_SIZE 2. Sorting logic (Group 2): - Added internal sorting in create_optimal_portfolio by composite_score (desc) and inter_quartile_range (asc) before head/tail selection - Ensures correct long/short selection even if input is unsorted 3. Duplicate exclusion tracking (Group 6): - Convert excluded_tickers to set to remove duplicates before logging - Ensures total_excluded count accurately reflects unique tickers 4. Error handling (Group 4): - Changed logger.warning to logger.exception in JSON parse exception handler - Removed broad "except Exception" block that was swallowing unexpected errors - Kept E402 noqa comments (imports must follow structlog configuration) - Kept PLR0911 noqa (get_prior_portfolio_tickers has 7 returns by design) 5. Test improvements (Group 5): - Added test_create_optimal_portfolio_unsorted_input to verify sorting works - Parameterized test_create_optimal_portfolio_equal_capital_allocation with @pytest.mark.parametrize for cleaner testing across capital values - Added comment explaining intentional lack of ranking in high_uncertainty test Rejected feedback: - Thread 1 (Alpaca notional for shorts): Alpaca API docs show notional parameter usage with SELL side. Current implementation follows documented patterns. - Thread 8 (PDT lock logic): Delete-and-replace strategy is intentional per commit e119243. System now closes all positions and opens fresh portfolio. Verification: All Python checks pass (mask development python all): - Formatting: ✓ - Linting: ✓ - Type checking: ✓ - Dead code analysis: ✓ - Tests: 68 passed, 94% coverage ✓ Co-Authored-By: Claude Sonnet 4.5 --- .../src/portfoliomanager/risk_management.py | 17 ++-- .../src/portfoliomanager/server.py | 5 +- .../tests/test_risk_management.py | 83 +++++++++++++++---- 3 files changed, 75 insertions(+), 30 deletions(-) diff --git a/applications/portfoliomanager/src/portfoliomanager/risk_management.py b/applications/portfoliomanager/src/portfoliomanager/risk_management.py index 5cc96b2e0..a68626228 100644 --- a/applications/portfoliomanager/src/portfoliomanager/risk_management.py +++ b/applications/portfoliomanager/src/portfoliomanager/risk_management.py @@ -11,6 +11,7 @@ UNCERTAINTY_THRESHOLD = float(os.getenv("OSCM_UNCERTAINTY_THRESHOLD", "0.1")) REQUIRED_PORTFOLIO_SIZE = 20 # 10 long + 10 short +SIDE_SIZE = REQUIRED_PORTFOLIO_SIZE // 2 def add_predictions_zscore_ranked_columns( @@ -57,7 +58,7 @@ def create_optimal_portfolio( ) # Excluding prior portfolio tickers to avoid pattern day trader restrictions. - excluded_tickers = high_uncertainty_tickers + prior_portfolio_tickers + excluded_tickers = list(set(high_uncertainty_tickers + prior_portfolio_tickers)) logger.info( "Portfolio filtering breakdown", @@ -70,12 +71,12 @@ def create_optimal_portfolio( available_predictions = current_predictions.filter( ~pl.col("ticker").is_in(excluded_tickers) - ) + ).sort(["composite_score", "inter_quartile_range"], descending=[True, False]) logger.info( "Available predictions after filtering", available_count=available_predictions.height, - required_for_full_portfolio=20, + required_for_full_portfolio=REQUIRED_PORTFOLIO_SIZE, ) if available_predictions.height < REQUIRED_PORTFOLIO_SIZE: @@ -87,11 +88,11 @@ def create_optimal_portfolio( ) raise InsufficientPredictionsError(message) - long_candidates = available_predictions.head(10) - short_candidates = available_predictions.tail(10) + long_candidates = available_predictions.head(SIDE_SIZE) + short_candidates = available_predictions.tail(SIDE_SIZE) target_side_capital = maximum_capital / 2 - dollar_amount_per_position = target_side_capital / 10 + dollar_amount_per_position = target_side_capital / SIDE_SIZE logger.info( "Portfolio allocation", @@ -99,8 +100,8 @@ def create_optimal_portfolio( long_capital=target_side_capital, short_capital=target_side_capital, dollar_per_position=dollar_amount_per_position, - long_count=10, - short_count=10, + long_count=SIDE_SIZE, + short_count=SIDE_SIZE, ) long_positions = long_candidates.select( diff --git a/applications/portfoliomanager/src/portfoliomanager/server.py b/applications/portfoliomanager/src/portfoliomanager/server.py index 032902bc5..7d8fe0c70 100644 --- a/applications/portfoliomanager/src/portfoliomanager/server.py +++ b/applications/portfoliomanager/src/portfoliomanager/server.py @@ -402,10 +402,7 @@ def get_prior_portfolio_tickers() -> list[str]: # noqa: PLR0911 return tickers # noqa: TRY300 except (ValueError, requests.exceptions.JSONDecodeError) as e: - logger.warning("Failed to parse prior portfolio JSON", error=str(e)) - return [] - except Exception as e: # noqa: BLE001 - logger.warning("Unexpected error fetching prior portfolio", error=str(e)) + logger.exception("Failed to parse prior portfolio JSON", error=str(e)) return [] diff --git a/applications/portfoliomanager/tests/test_risk_management.py b/applications/portfoliomanager/tests/test_risk_management.py index f08cc7eb3..8c4701c4e 100644 --- a/applications/portfoliomanager/tests/test_risk_management.py +++ b/applications/portfoliomanager/tests/test_risk_management.py @@ -165,8 +165,56 @@ def test_create_optimal_portfolio_with_prior_ticker_exclusion() -> None: assert set(long_tickers) == set(expected_long) +def test_create_optimal_portfolio_unsorted_input() -> None: + """Test that create_optimal_portfolio handles unsorted input correctly.""" + current_timestamp = datetime(2024, 1, 15, 9, 30, tzinfo=UTC) + + # Create predictions intentionally out of order + predictions = pl.DataFrame( + { + "ticker": ["TICK10", "TICK05", "TICK20", "TICK15", "TICK25"] + + [f"TICK{i:02d}" for i in range(30) if i not in [5, 10, 15, 20, 25]], + "quantile_10": [0.0] * 30, + "quantile_50": [0.10, 0.05, 0.20, 0.15, 0.25] + + [i * 0.01 for i in range(30) if i not in [5, 10, 15, 20, 25]], + "quantile_90": [0.05] * 30, # Low uncertainty + "z_score_return": [10.0, 5.0, 20.0, 15.0, 25.0] + + [float(i) for i in range(30) if i not in [5, 10, 15, 20, 25]], + "inter_quartile_range": [0.05] * 30, + "composite_score": [10.0, 5.0, 20.0, 15.0, 25.0] + + [float(i) for i in range(30) if i not in [5, 10, 15, 20, 25]], + "action": ["UNSPECIFIED"] * 30, + } + ) + + result = create_optimal_portfolio( + current_predictions=predictions, + prior_portfolio_tickers=[], + maximum_capital=10000.0, + current_timestamp=current_timestamp, + ) + + # Should create 20 positions despite unsorted input + assert len(result) == 20 # noqa: PLR2004 + assert result.filter(pl.col("side") == "LONG").height == 10 # noqa: PLR2004 + assert result.filter(pl.col("side") == "SHORT").height == 10 # noqa: PLR2004 + + # Top 10 should be long (highest composite scores) + long_tickers = result.filter(pl.col("side") == "LONG")["ticker"].to_list() + assert "TICK25" in long_tickers # Highest score should be long + assert "TICK20" in long_tickers + + # Bottom 10 should be short (lowest composite scores) + short_tickers = result.filter(pl.col("side") == "SHORT")["ticker"].to_list() + assert "TICK00" in short_tickers # Lowest scores should be short + + def test_create_optimal_portfolio_high_uncertainty_exclusions() -> None: - """Test that high uncertainty predictions are excluded.""" + """Test that high uncertainty predictions are excluded. + + Note: This test intentionally uses pre-computed ranking columns rather than calling + add_predictions_zscore_ranked_columns to verify portfolio logic in isolation. + """ current_timestamp = datetime(2024, 1, 15, 9, 30, tzinfo=UTC) # Create 25 predictions: 20 low uncertainty, 5 high uncertainty @@ -240,7 +288,8 @@ def test_create_optimal_portfolio_insufficient_after_exclusions() -> None: assert "need 20" in str(exc_info.value) -def test_create_optimal_portfolio_equal_capital_allocation() -> None: +@pytest.mark.parametrize("capital", [10000.0, 25000.0, 50000.0]) +def test_create_optimal_portfolio_equal_capital_allocation(capital: float) -> None: """Test that capital is allocated equally across positions.""" current_timestamp = datetime(2024, 1, 15, 9, 30, tzinfo=UTC) @@ -256,24 +305,22 @@ def test_create_optimal_portfolio_equal_capital_allocation() -> None: # Rank and sort predictions ranked_predictions = add_predictions_zscore_ranked_columns(predictions) - # Test with different capital amounts - for capital in [10000.0, 25000.0, 50000.0]: - result = create_optimal_portfolio( - current_predictions=ranked_predictions, - prior_portfolio_tickers=[], - maximum_capital=capital, - current_timestamp=current_timestamp, - ) + result = create_optimal_portfolio( + current_predictions=ranked_predictions, + prior_portfolio_tickers=[], + maximum_capital=capital, + current_timestamp=current_timestamp, + ) - expected_per_position = capital / 20 - for amount in result["dollar_amount"].to_list(): - assert amount == pytest.approx(expected_per_position) + expected_per_position = capital / 20 + for amount in result["dollar_amount"].to_list(): + assert amount == pytest.approx(expected_per_position) - # Long and short should be equal - long_sum = result.filter(pl.col("side") == "LONG")["dollar_amount"].sum() - short_sum = result.filter(pl.col("side") == "SHORT")["dollar_amount"].sum() - assert long_sum == pytest.approx(short_sum) - assert long_sum == pytest.approx(capital / 2) + # Long and short should be equal + long_sum = result.filter(pl.col("side") == "LONG")["dollar_amount"].sum() + short_sum = result.filter(pl.col("side") == "SHORT")["dollar_amount"].sum() + assert long_sum == pytest.approx(short_sum) + assert long_sum == pytest.approx(capital / 2) def test_create_optimal_portfolio_head_tail_selection() -> None: