Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions applications/portfoliomanager/src/portfoliomanager/beta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import numpy as np
import polars as pl
import scipy.stats

BETA_WINDOW_DAYS = 60

_TRADING_DAYS_PER_YEAR = 252
_MINIMUM_RETURN_COUNT = 2
Comment thread
forstmeier marked this conversation as resolved.


def compute_market_betas(
historical_prices: pl.DataFrame,
spy_prices: pl.DataFrame,
Comment thread
forstmeier marked this conversation as resolved.
window_days: int = BETA_WINDOW_DAYS,
) -> pl.DataFrame:
spy_close = (
spy_prices.sort("timestamp").tail(window_days + 1)["close_price"].to_numpy()
)

if len(spy_close) < _MINIMUM_RETURN_COUNT + 1 or np.any(spy_close <= 0):
return pl.DataFrame(schema={"ticker": pl.String, "market_beta": pl.Float64})

spy_returns = np.diff(np.log(spy_close))
Comment thread
forstmeier marked this conversation as resolved.
tickers = historical_prices["ticker"].unique().to_list()
results = []

for ticker in tickers:
ticker_close = (
historical_prices.filter(pl.col("ticker") == ticker)
.sort("timestamp")
.tail(window_days + 1)["close_price"]
.to_numpy()
)
if len(ticker_close) < _MINIMUM_RETURN_COUNT or np.any(ticker_close <= 0):
continue

ticker_returns = np.diff(np.log(ticker_close))
count = min(len(spy_returns), len(ticker_returns))
if count < _MINIMUM_RETURN_COUNT:
continue

slope, _, _, _, _ = scipy.stats.linregress(
spy_returns[-count:], ticker_returns[-count:]
)
results.append({"ticker": ticker, "market_beta": float(slope)})

if not results:
return pl.DataFrame(schema={"ticker": pl.String, "market_beta": pl.Float64})

return pl.DataFrame(results)


# Validates beta neutralization in tests; retained for future beta reporting.
def compute_portfolio_beta(
portfolio: pl.DataFrame,
market_betas: pl.DataFrame,
) -> float:
Comment thread
forstmeier marked this conversation as resolved.
beta_lookup = dict(
zip(
market_betas["ticker"].to_list(),
market_betas["market_beta"].to_list(),
strict=False,
)
)

total_gross = portfolio["dollar_amount"].sum()
if np.isclose(total_gross, 0.0):
return 0.0

net_beta = 0.0
for row in portfolio.iter_rows(named=True):
beta = beta_lookup.get(row["ticker"], 0.0)
sign = 1.0 if row["side"] == "LONG" else -1.0
net_beta += sign * (row["dollar_amount"] / total_gross) * beta

return net_beta
31 changes: 31 additions & 0 deletions applications/portfoliomanager/src/portfoliomanager/data_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,34 @@ def fetch_equity_details(datamanager_base_url: str) -> pl.DataFrame:

dataframe = pl.read_csv(io.BytesIO(response.content))
return dataframe.select(["ticker", "sector"])


def fetch_spy_prices(
datamanager_base_url: str,
reference_date: datetime,
lookback_days: int = 90,
) -> pl.DataFrame:
start_timestamp = reference_date - timedelta(days=lookback_days)

try:
response = requests.get(
url=f"{datamanager_base_url}/equity-bars",
params={
"tickers": "SPY",
"start_timestamp": start_timestamp.isoformat(),
"end_timestamp": reference_date.isoformat(),
},
timeout=120,
)
response.raise_for_status()
except requests.HTTPError as error:
message = f"Failed to fetch SPY prices from data manager: {error}"
raise PriceDataUnavailableError(message) from error
except requests.RequestException as error:
message = f"Network error fetching SPY prices from data manager: {error}"
raise PriceDataUnavailableError(message) from error

dataframe = pl.read_parquet(io.BytesIO(response.content))
Comment thread
forstmeier marked this conversation as resolved.
return dataframe.select(["ticker", "timestamp", "close_price"]).drop_nulls(
subset=["close_price"]
)
67 changes: 67 additions & 0 deletions applications/portfoliomanager/src/portfoliomanager/regime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import TypedDict

import numpy as np
import polars as pl


class RegimeResult(TypedDict):
state: str
confidence: float


REGIME_WINDOW_DAYS = 60
REGIME_VOLATILITY_THRESHOLD = 0.20
REGIME_AUTOCORRELATION_THRESHOLD = 0.0

_TRADING_DAYS_PER_YEAR = 252
_MINIMUM_RETURN_COUNT = 2


def classify_regime(
spy_prices: pl.DataFrame,
window_days: int = REGIME_WINDOW_DAYS,
) -> RegimeResult:
Comment thread
forstmeier marked this conversation as resolved.
spy_close = (
spy_prices.sort("timestamp").tail(window_days + 1)["close_price"].to_numpy()
)

if np.any(spy_close <= 0):
return {"state": "trending", "confidence": 0.0}

returns = np.diff(np.log(spy_close))

# Sparse data defaults to trending/0.0, halving exposure in the caller.
if len(returns) < _MINIMUM_RETURN_COUNT + 1:
return {"state": "trending", "confidence": 0.0}

realized_volatility = float(
np.std(returns, ddof=1) * np.sqrt(_TRADING_DAYS_PER_YEAR)
)

autocorrelation = float(np.corrcoef(returns[:-1], returns[1:])[0, 1])

low_volatility = realized_volatility < REGIME_VOLATILITY_THRESHOLD
mean_reverting_signal = autocorrelation < REGIME_AUTOCORRELATION_THRESHOLD

if low_volatility and mean_reverting_signal:
volatility_margin = (
REGIME_VOLATILITY_THRESHOLD - realized_volatility
) / REGIME_VOLATILITY_THRESHOLD
autocorrelation_margin = min(1.0, -autocorrelation)
confidence = float(
np.clip((volatility_margin + autocorrelation_margin) / 2.0, 0.0, 1.0)
)
return {"state": "mean_reversion", "confidence": confidence}

excess_volatility = max(
0.0,
(realized_volatility - REGIME_VOLATILITY_THRESHOLD)
/ REGIME_VOLATILITY_THRESHOLD,
)
excess_autocorrelation = max(
0.0, autocorrelation - REGIME_AUTOCORRELATION_THRESHOLD
)
confidence = float(
np.clip((excess_volatility + excess_autocorrelation) / 2.0, 0.0, 1.0)
)
return {"state": "trending", "confidence": confidence}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from datetime import datetime

import numpy as np
import polars as pl
import scipy.optimize
import structlog

from .enums import PositionAction, PositionSide
Expand All @@ -12,12 +14,60 @@
MINIMUM_PAIRS_REQUIRED = REQUIRED_PAIRS
Z_SCORE_HOLD_THRESHOLD = 0.5
Z_SCORE_STOP_LOSS = 4.0
BETA_WEIGHT_LOWER_BOUND = 0.5
BETA_WEIGHT_UPPER_BOUND = 2.0


def _apply_beta_neutral_weights(
pairs: pl.DataFrame,
market_betas: pl.DataFrame,
volatility_parity_weights: np.ndarray,
) -> np.ndarray:
beta_lookup = {
row["ticker"]: row["market_beta"] for row in market_betas.iter_rows(named=True)
}

pair_net_betas = np.array(
[
beta_lookup.get(row["long_ticker"], 0.0)
- beta_lookup.get(row["short_ticker"], 0.0)
for row in pairs.iter_rows(named=True)
]
)

def objective(weights: np.ndarray) -> float:
total = float(np.sum(weights))
net_beta = float(np.dot(weights, pair_net_betas) / total)
return net_beta**2
Comment thread
forstmeier marked this conversation as resolved.

bounds = [
(BETA_WEIGHT_LOWER_BOUND * weight, BETA_WEIGHT_UPPER_BOUND * weight)
for weight in volatility_parity_weights
]
target_total = float(np.sum(volatility_parity_weights))
constraints = [{"type": "eq", "fun": lambda w: float(np.sum(w)) - target_total}]
Comment thread
forstmeier marked this conversation as resolved.

result = scipy.optimize.minimize(
objective,
x0=volatility_parity_weights.copy(),
method="SLSQP",
bounds=bounds,
constraints=constraints,
)

if result.success:
return np.array(result.x)

logger.warning("Beta-neutral optimizer did not converge, using vol-parity weights")
return volatility_parity_weights


def size_pairs_with_volatility_parity(
candidate_pairs: pl.DataFrame,
maximum_capital: float,
current_timestamp: datetime,
market_betas: pl.DataFrame,
exposure_scale: float = 1.0,
) -> pl.DataFrame:
if candidate_pairs.height < MINIMUM_PAIRS_REQUIRED:
message = (
Expand Down Expand Up @@ -45,17 +95,26 @@ def size_pairs_with_volatility_parity(
)

total_weight = pairs["inverse_volatility_weight"].sum()
pairs = pairs.with_columns(
(
(pl.col("inverse_volatility_weight") / total_weight)
* (maximum_capital / 2.0)
).alias("dollar_amount")
volatility_parity_weights = (
pairs["inverse_volatility_weight"] / total_weight
).to_numpy()

adjusted_weights = _apply_beta_neutral_weights(
pairs, market_betas, volatility_parity_weights
)
if np.isclose(adjusted_weights.sum(), 0.0):
adjusted_weights = volatility_parity_weights / volatility_parity_weights.sum()
else:
adjusted_weights = adjusted_weights / adjusted_weights.sum()

dollar_amounts = adjusted_weights * (maximum_capital / 2.0) * exposure_scale
pairs = pairs.with_columns(pl.Series("dollar_amount", dollar_amounts))

logger.info(
"Sized pairs with volatility parity",
pair_count=pairs.height,
total_capital=maximum_capital,
exposure_scale=exposure_scale,
)

timestamp_val = float(current_timestamp.timestamp())
Expand Down
39 changes: 30 additions & 9 deletions applications/portfoliomanager/src/portfoliomanager/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,21 @@
cache_logger_on_first_use=True,
)

from .beta import compute_market_betas # noqa: E402
from .consolidation import consolidate_predictions # noqa: E402
from .data_client import fetch_equity_details, fetch_historical_prices # noqa: E402
from .data_client import ( # noqa: E402
fetch_equity_details,
fetch_historical_prices,
fetch_spy_prices,
)
from .enums import PositionSide, TradeSide # noqa: E402
from .exceptions import ( # noqa: E402
AssetNotShortableError,
InsufficientBuyingPowerError,
InsufficientPairsError,
)
from .portfolio_schema import pairs_schema, portfolio_schema # noqa: E402
from .regime import classify_regime # noqa: E402
from .risk_management import ( # noqa: E402
Z_SCORE_HOLD_THRESHOLD,
Z_SCORE_STOP_LOSS,
Expand Down Expand Up @@ -138,10 +144,12 @@ async def create_portfolio() -> Response: # noqa: PLR0911, PLR0912, PLR0915, C9
DATAMANAGER_BASE_URL, current_timestamp
)
equity_details = fetch_equity_details(DATAMANAGER_BASE_URL)
spy_prices = fetch_spy_prices(DATAMANAGER_BASE_URL, current_timestamp)
logger.info(
"Retrieved historical data",
prices_count=historical_prices.height,
equity_details_count=equity_details.height,
spy_prices_count=spy_prices.height,
)
except Exception as e:
logger.exception("Failed to retrieve historical data", error=str(e))
Expand Down Expand Up @@ -217,14 +225,28 @@ async def create_portfolio() -> Response: # noqa: PLR0911, PLR0912, PLR0915, C9
logger.exception("Candidate pairs failed schema validation", error=str(e))
return Response(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)

regime = get_regime_state()
logger.info("Current market regime", state=regime)
try:
market_betas = compute_market_betas(historical_prices, spy_prices)
regime = classify_regime(spy_prices)
# Binary scale is intentional; confidence reserved for future graduated scaling.
exposure_scale = 1.0 if regime["state"] == "mean_reversion" else 0.5
Comment thread
forstmeier marked this conversation as resolved.
logger.info(
"Computed market betas and regime",
regime_state=regime["state"],
regime_confidence=regime["confidence"],
exposure_scale=exposure_scale,
)
except Exception as e:
logger.exception("Failed to compute market betas or regime", error=str(e))
return Response(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)

try:
optimal_portfolio = get_optimal_portfolio(
candidate_pairs=candidate_pairs,
maximum_capital=float(account.cash_amount),
current_timestamp=current_timestamp,
market_betas=market_betas,
exposure_scale=exposure_scale,
)
logger.info("Created optimal portfolio", count=len(optimal_portfolio))
except InsufficientPairsError as e:
Expand Down Expand Up @@ -288,7 +310,7 @@ async def create_portfolio() -> Response: # noqa: PLR0911, PLR0912, PLR0915, C9
"reason": "position_not_found",
}
)
except Exception as e: # noqa: PERF203
except Exception as e:
Comment thread
forstmeier marked this conversation as resolved.
Comment thread
forstmeier marked this conversation as resolved.
logger.exception(
"Failed to close position",
ticker=close_position["ticker"],
Expand Down Expand Up @@ -451,11 +473,6 @@ async def get_raw_predictions() -> pl.DataFrame:
return pl.DataFrame(response.json()["data"])


def get_regime_state() -> str:
"""TODO: replace with regime.classify_regime in Phase 4."""
return "mean_reversion"


def get_prior_portfolio() -> pl.DataFrame:
empty = pl.DataFrame(schema=_PRIOR_PORTFOLIO_SCHEMA)
try:
Expand Down Expand Up @@ -611,11 +628,15 @@ def get_optimal_portfolio(
candidate_pairs: pl.DataFrame,
maximum_capital: float,
current_timestamp: datetime,
market_betas: pl.DataFrame,
exposure_scale: float,
) -> pl.DataFrame:
optimal_portfolio = size_pairs_with_volatility_parity(
candidate_pairs=candidate_pairs,
maximum_capital=maximum_capital,
current_timestamp=current_timestamp,
market_betas=market_betas,
exposure_scale=exposure_scale,
)

optimal_portfolio = portfolio_schema.validate(optimal_portfolio)
Expand Down
Loading
Loading