From 837309289aceb894e5a5b8a99208f72ae41d4ff5 Mon Sep 17 00:00:00 2001 From: John Forstmeier Date: Mon, 2 Jun 2025 21:26:45 -0400 Subject: [PATCH 01/17] Fix inter-service communication patterns --- .claude/settings.local.json | 5 +- .flox/env/manifest.lock | 123 ------------------ .../src/positionmanager/clients.py | 27 ++-- 3 files changed, 19 insertions(+), 136 deletions(-) diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 664a98426..e86f2e7f8 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -18,5 +18,6 @@ "Bash(gh issue close:*)" ], "deny": [] - } -} + }, + "enableAllProjectMcpServers": false +} \ No newline at end of file diff --git a/.flox/env/manifest.lock b/.flox/env/manifest.lock index 964e64525..ff15369b9 100644 --- a/.flox/env/manifest.lock +++ b/.flox/env/manifest.lock @@ -12,9 +12,6 @@ "pulumi-python": { "pkg-path": "pulumiPackages.pulumi-python" }, - "pulumictl": { - "pkg-path": "pulumictl" - }, "ruff": { "pkg-path": "ruff" }, @@ -398,126 +395,6 @@ "group": "toplevel", "priority": 5 }, - { - "attr_path": "pulumictl", - "broken": false, - "derivation": "/nix/store/kx43jzcfslw28byvs6h5ngsgl432pvvv-pulumictl-0.0.49.drv", - "description": "Swiss Army Knife for Pulumi Development", - "install_id": "pulumictl", - "license": "Apache-2.0", - "locked_url": "https://github.com/flox/nixpkgs?rev=979daf34c8cacebcd917d540070b52a3c2b9b16e", - "name": "pulumictl-0.0.49", - "pname": "pulumictl", - "rev": "979daf34c8cacebcd917d540070b52a3c2b9b16e", - "rev_count": 793735, - "rev_date": "2025-05-04T03:14:55Z", - "scrape_date": "2025-05-05T04:19:37.687142Z", - "stabilities": [ - "staging", - "unstable" - ], - "unfree": false, - "version": "0.0.49", - "outputs_to_install": [ - "out" - ], - "outputs": { - "out": "/nix/store/ny69c9bfkf4w179240ch45injfb2ajqr-pulumictl-0.0.49" - }, - "system": "aarch64-darwin", - "group": "toplevel", - "priority": 5 - }, - { - "attr_path": "pulumictl", - "broken": false, - "derivation": "/nix/store/www9nfncvv7l339n8dks22x5vs5lz1mk-pulumictl-0.0.49.drv", - "description": "Swiss Army Knife for Pulumi Development", - "install_id": "pulumictl", - "license": "Apache-2.0", - "locked_url": "https://github.com/flox/nixpkgs?rev=979daf34c8cacebcd917d540070b52a3c2b9b16e", - "name": "pulumictl-0.0.49", - "pname": "pulumictl", - "rev": "979daf34c8cacebcd917d540070b52a3c2b9b16e", - "rev_count": 793735, - "rev_date": "2025-05-04T03:14:55Z", - "scrape_date": "2025-05-05T04:37:42.118866Z", - "stabilities": [ - "staging", - "unstable" - ], - "unfree": false, - "version": "0.0.49", - "outputs_to_install": [ - "out" - ], - "outputs": { - "out": "/nix/store/xpdh5dijdki4cngh7k7n4rg84i6c28zs-pulumictl-0.0.49" - }, - "system": "aarch64-linux", - "group": "toplevel", - "priority": 5 - }, - { - "attr_path": "pulumictl", - "broken": false, - "derivation": "/nix/store/17wf5x1kk3v5ch5npwhamnix629y07wg-pulumictl-0.0.49.drv", - "description": "Swiss Army Knife for Pulumi Development", - "install_id": "pulumictl", - "license": "Apache-2.0", - "locked_url": "https://github.com/flox/nixpkgs?rev=979daf34c8cacebcd917d540070b52a3c2b9b16e", - "name": "pulumictl-0.0.49", - "pname": "pulumictl", - "rev": "979daf34c8cacebcd917d540070b52a3c2b9b16e", - "rev_count": 793735, - "rev_date": "2025-05-04T03:14:55Z", - "scrape_date": "2025-05-05T04:54:38.447587Z", - "stabilities": [ - "staging", - "unstable" - ], - "unfree": false, - "version": "0.0.49", - "outputs_to_install": [ - "out" - ], - "outputs": { - "out": "/nix/store/6wmig1w7f3vmfrlyg2qzv21bvacj3as8-pulumictl-0.0.49" - }, - "system": "x86_64-darwin", - "group": "toplevel", - "priority": 5 - }, - { - "attr_path": "pulumictl", - "broken": false, - "derivation": "/nix/store/ib7hqxg7xdf5kyh78jqggzdcs97q1224-pulumictl-0.0.49.drv", - "description": "Swiss Army Knife for Pulumi Development", - "install_id": "pulumictl", - "license": "Apache-2.0", - "locked_url": "https://github.com/flox/nixpkgs?rev=979daf34c8cacebcd917d540070b52a3c2b9b16e", - "name": "pulumictl-0.0.49", - "pname": "pulumictl", - "rev": "979daf34c8cacebcd917d540070b52a3c2b9b16e", - "rev_count": 793735, - "rev_date": "2025-05-04T03:14:55Z", - "scrape_date": "2025-05-05T05:16:19.858098Z", - "stabilities": [ - "staging", - "unstable" - ], - "unfree": false, - "version": "0.0.49", - "outputs_to_install": [ - "out" - ], - "outputs": { - "out": "/nix/store/rmh9mjkxijxcc7cvjhsqc9657fbw0yyg-pulumictl-0.0.49" - }, - "system": "x86_64-linux", - "group": "toplevel", - "priority": 5 - }, { "attr_path": "ruff", "broken": false, diff --git a/application/positionmanager/src/positionmanager/clients.py b/application/positionmanager/src/positionmanager/clients.py index cf4ff79f8..68e07c7ce 100644 --- a/application/positionmanager/src/positionmanager/clients.py +++ b/application/positionmanager/src/positionmanager/clients.py @@ -1,6 +1,7 @@ import requests import polars as pl from typing import Dict, Any +import pyarrow as pa from alpaca.trading.client import TradingClient from alpaca.trading.requests import MarketOrderRequest @@ -71,30 +72,34 @@ def get_data( endpoint = f"{self.datamanager_base_url}/equity-bars" + params = { + "start_date": date_range.start.date().isoformat(), + "end_date": date_range.end.date().isoformat(), + } + try: - response = requests.post(endpoint, json=date_range.to_payload(), timeout=10) + response = requests.get(endpoint, params=params, timeout=30) except requests.RequestException as err: raise RuntimeError(f"Data manager service call error: {err}") from err - if response.status_code != 200: + if response.status_code == 404: + return pl.DataFrame() + elif response.status_code != 200: raise Exception( f"Data service error: {response.text}, status code: {response.status_code}", ) - response_data = response.json() + buffer = pa.py_buffer(response.content) + reader = pa.ipc.RecordBatchStreamReader(buffer) + table = reader.read_all() - data = pl.DataFrame(response_data["data"]) + data = pl.DataFrame(pl.from_arrow(table)) - data = data.with_columns( - pl.col("timestamp") - .str.slice(0, 10) - .str.strptime(pl.Date, "%Y-%m-%d") - .alias("date") - ) + data = data.with_columns(pl.col("t").cast(pl.Datetime).dt.date().alias("date")) data = ( data.sort("date") - .pivot(on="ticker", index="date", values="close_price") + .pivot(on="T", index="date", values="c") .with_columns(pl.all().exclude("date").cast(pl.Float64)) ) From 22a9c60eb8ee47a71867880058415279313d7c37 Mon Sep 17 00:00:00 2001 From: Chris Addy Date: Mon, 2 Jun 2025 21:46:44 -0400 Subject: [PATCH 02/17] Update application/positionmanager/src/positionmanager/clients.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- application/positionmanager/src/positionmanager/clients.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/application/positionmanager/src/positionmanager/clients.py b/application/positionmanager/src/positionmanager/clients.py index 68e07c7ce..4d888460e 100644 --- a/application/positionmanager/src/positionmanager/clients.py +++ b/application/positionmanager/src/positionmanager/clients.py @@ -84,8 +84,8 @@ def get_data( if response.status_code == 404: return pl.DataFrame() - elif response.status_code != 200: - raise Exception( + if response.status_code != 200: + raise requests.HTTPError( f"Data service error: {response.text}, status code: {response.status_code}", ) From 00ffd1d889c112d08b80522222623ebf8fdfd471 Mon Sep 17 00:00:00 2001 From: John Forstmeier Date: Wed, 4 Jun 2025 20:36:37 -0400 Subject: [PATCH 03/17] Integrate predictionengine into the workflows --- .../datamanager/src/datamanager/config.py | 3 +- .../datamanager/src/datamanager/main.py | 6 +- .../src/positionmanager/clients.py | 30 ++-- .../src/positionmanager/main.py | 11 +- .../src/predictionengine/dataset.py | 32 ++-- .../gated_residual_network.py | 16 +- .../long_short_term_memory.py | 19 +-- .../src/predictionengine/loss_function.py | 17 ++- .../src/predictionengine/main.py | 45 +++--- .../miniature_temporal_fusion_transformer.py | 45 +++--- .../src/predictionengine/models.py | 3 +- .../multi_head_self_attention.py | 24 +-- .../src/predictionengine/post_processor.py | 17 ++- .../predictionengine/tests/test_dataset.py | 9 +- .../tests/test_gated_residual_network.py | 15 +- .../tests/test_long_short_term_memory.py | 32 ++-- .../tests/test_loss_function.py | 16 +- .../tests/test_multi_head_self_attention.py | 32 ++-- .../tests/test_post_processor.py | 11 +- .../tests/test_ticker_embedding.py | 1 + workflows/backfill_datamanager.py | 2 +- workflows/prediction_model.py | 49 ------- workflows/pyproject.toml | 1 + workflows/train_predctionengine.py | 138 ++++++++++++++++++ 24 files changed, 351 insertions(+), 223 deletions(-) delete mode 100644 workflows/prediction_model.py create mode 100644 workflows/train_predctionengine.py diff --git a/application/datamanager/src/datamanager/config.py b/application/datamanager/src/datamanager/config.py index fa234e05a..089c640bc 100644 --- a/application/datamanager/src/datamanager/config.py +++ b/application/datamanager/src/datamanager/config.py @@ -19,7 +19,8 @@ class Bucket(BaseModel): @computed_field def daily_bars_path(self) -> str: if self.name is None: - raise ValueError("DATA_BUCKET environment variable is required") + msg = "DATA_BUCKET environment variable is required" + raise ValueError(msg) return f"gs://{self.name}/equity/bars/" diff --git a/application/datamanager/src/datamanager/main.py b/application/datamanager/src/datamanager/main.py index 6f1019059..6ae8643b8 100644 --- a/application/datamanager/src/datamanager/main.py +++ b/application/datamanager/src/datamanager/main.py @@ -137,8 +137,8 @@ async def fetch_equity_bars(request: Request, summary_date: SummaryDate) -> Bars polygon = request.app.state.settings.polygon bucket = request.app.state.settings.gcp.bucket - summary_date: str = summary_date.date.strftime("%Y-%m-%d") - url = f"{polygon.base_url}{polygon.daily_bars}{summary_date}" + request_summary_date: str = summary_date.date.strftime("%Y-%m-%d") + url = f"{polygon.base_url}{polygon.daily_bars}{request_summary_date}" logger.info(f"polygon_api_endpoint={url}") params = {"adjusted": "true", "apiKey": polygon.api_key} @@ -178,7 +178,7 @@ async def fetch_equity_bars(request: Request, summary_date: SummaryDate) -> Bars status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to write data", ) from e - return BarsSummary(date=summary_date, count=count) + return BarsSummary(date=request_summary_date, count=count) @application.delete("/equity-bars") diff --git a/application/positionmanager/src/positionmanager/clients.py b/application/positionmanager/src/positionmanager/clients.py index d92b0a61d..13f1aa818 100644 --- a/application/positionmanager/src/positionmanager/clients.py +++ b/application/positionmanager/src/positionmanager/clients.py @@ -1,9 +1,7 @@ -import requests -import polars as pl -from typing import Dict, Any -import pyarrow as pa +from typing import Any import polars as pl +import pyarrow as pa import requests from alpaca.trading.client import TradingClient from alpaca.trading.enums import OrderSide, TimeInForce @@ -18,13 +16,11 @@ def __init__( *, api_key: str | None = "", api_secret: str | None = "", - api_key: str | None = None, - api_secret: str | None = None, paper: bool = True, ) -> None: if not api_key or not api_secret: - msg = "Alpaca API key and secret are required" - raise ValueError(msg) + message = "Alpaca API key and secret are required" + raise ValueError(message) self.trading_client = TradingClient(api_key, api_secret, paper=paper) @@ -33,7 +29,8 @@ def get_cash_balance(self) -> Money: cash_balance = getattr(account, "cash", None) if cash_balance is None: - raise ValueError("Cash balance is not available") + message = "Cash balance is not available" + raise ValueError(message) return Money.from_float(float(cash_balance)) @@ -74,8 +71,8 @@ def get_data( date_range: DateRange, ) -> pl.DataFrame: if not self.datamanager_base_url: - msg = "Data manager URL is not configured" - raise ValueError(msg) + message = "Data manager URL is not configured" + raise ValueError(message) endpoint = f"{self.datamanager_base_url}/equity-bars" @@ -87,14 +84,15 @@ def get_data( try: response = requests.get(endpoint, params=params, timeout=30) except requests.RequestException as err: - msg = f"Data manager service call error: {err}" - raise RuntimeError(msg) from err + message = f"Data manager service call error: {err}" + raise RuntimeError(message) from err - if response.status_code == 404: + if response.status_code == requests.codes.not_found: return pl.DataFrame() - if response.status_code != 200: + if response.status_code != requests.codes.ok: + message = f"Data service error: {response.text}, status code: {response.status_code}" # noqa: E501 raise requests.HTTPError( - f"Data service error: {response.text}, status code: {response.status_code}", + message, ) buffer = pa.py_buffer(response.content) diff --git a/application/positionmanager/src/positionmanager/main.py b/application/positionmanager/src/positionmanager/main.py index a64764b4b..bc7c683a7 100644 --- a/application/positionmanager/src/positionmanager/main.py +++ b/application/positionmanager/src/positionmanager/main.py @@ -1,15 +1,10 @@ import os -from datetime import datetime, timedelta, timezone -import polars as pl -from typing import Dict, Any -from .models import Money, DateRange, PredictionPayload -from .clients import AlpacaClient, DataClient -from .portfolio import PortfolioOptimizer -from prometheus_fastapi_instrumentator import Instrumentator +from datetime import UTC, datetime, timedelta +from typing import Any import polars as pl import requests -from alpaca.common.rest import APIError +from alpaca.common.exceptions import APIError from fastapi import FastAPI, HTTPException from prometheus_fastapi_instrumentator import Instrumentator from pydantic import ValidationError diff --git a/application/predictionengine/src/predictionengine/dataset.py b/application/predictionengine/src/predictionengine/dataset.py index a77761e09..45352d9d8 100644 --- a/application/predictionengine/src/predictionengine/dataset.py +++ b/application/predictionengine/src/predictionengine/dataset.py @@ -1,8 +1,9 @@ -from typing import Dict, List, Any, Tuple, Generator -from tinygrad.tensor import Tensor +from collections.abc import Generator +from typing import Any + import polars as pl from category_encoders import OrdinalEncoder - +from tinygrad.tensor import Tensor continuous_variable_columns = [ "open_price", @@ -20,13 +21,15 @@ def __init__( batch_size: int, sequence_length: int, sample_count: int, - scalers: Dict[str, Dict[str, Tensor]] = {}, + scalers: dict[str, dict[str, Tensor]] | None = None, ) -> None: + if scalers is None: + scalers = {} self.batch_size = batch_size self.sequence_length = sequence_length self.sample_count = sample_count self.scalers = scalers if scalers is not None else {} - self.preprocessors: Dict[str, Any] = {} + self.preprocessors: dict[str, Any] = {} def __len__(self) -> int: return (self.sample_count + self.batch_size - 1) // self.batch_size @@ -106,7 +109,7 @@ def _encode_tickers(self, data: pl.DataFrame) -> pl.DataFrame: def _compute_scalers(self, data: pl.DataFrame) -> None: if len(self.scalers) == 0: - self.scalers: Dict[str, Dict[str, Tensor]] = {} + self.scalers: dict[str, dict[str, Tensor]] = {} for ticker_key, group in data.group_by("ticker"): ticker = ticker_key[0] means = group[continuous_variable_columns].mean() @@ -118,7 +121,7 @@ def _compute_scalers(self, data: pl.DataFrame) -> None: } def _scale_data(self, data: pl.DataFrame) -> Tensor: - groups: List[Tensor] = [] + groups: list[Tensor] = [] for ticker_key, group in data.group_by("ticker"): ticker = ticker_key[0] means = self.scalers[str(ticker)]["means"] @@ -133,7 +136,8 @@ def _scale_data(self, data: pl.DataFrame) -> Tensor: groups.append(combined_group) if not groups: - raise ValueError("No data available after preprocessing") + message = "No data available after preprocessing" + raise ValueError(message) output_data = Tensor.empty(groups[0].shape) return output_data.cat(*groups, dim=0) @@ -150,9 +154,10 @@ def load_data(self, data: pl.DataFrame) -> None: self._compute_scalers(data) self.data = self._scale_data(data) - def get_preprocessors(self) -> Dict[str, Any]: + def get_preprocessors(self) -> dict[str, Any]: if not self.preprocessors: - raise ValueError("Preprocessors have not been initialized.") + message = "Preprocessors have not been initialized." + raise ValueError(message) means_by_ticker = { ticker: values["means"] for ticker, values in self.scalers.items() @@ -169,7 +174,7 @@ def get_preprocessors(self) -> Dict[str, Any]: "indices": self.preprocessors["indices"], } - def batches(self) -> Generator[Tuple[Tensor, Tensor, Tensor], None, None]: + def batches(self) -> Generator[tuple[Tensor, Tensor, Tensor], None, None]: close_price_idx = self.preprocessors["indices"]["close_price"] for i in range(0, self.sample_count, self.batch_size): @@ -193,9 +198,8 @@ def batches(self) -> Generator[Tuple[Tensor, Tensor, Tensor], None, None]: ] if not batch_tensors: - raise ValueError( - "Cannot stack empty batch tensors (batch_size must be ≥ 1)" - ) + message = "Cannot stack empty batch tensors (batch_size must be ≥ 1)" + raise ValueError(message) if len(batch_tensors) == 1: historical_features = batch_tensors[0].unsqueeze(0) else: diff --git a/application/predictionengine/src/predictionengine/gated_residual_network.py b/application/predictionengine/src/predictionengine/gated_residual_network.py index 13ea9e2cb..b03c58300 100644 --- a/application/predictionengine/src/predictionengine/gated_residual_network.py +++ b/application/predictionengine/src/predictionengine/gated_residual_network.py @@ -1,7 +1,7 @@ from typing import cast + +from tinygrad.nn import LayerNorm, Linear from tinygrad.tensor import Tensor -from tinygrad.nn import Linear, LayerNorm -from typing import Optional class GatedResidualNetwork: @@ -9,7 +9,7 @@ def __init__( self, input_size: int, hidden_size: int, - output_size: Optional[int] = None, + output_size: int | None = None, ) -> None: output_size = output_size if output_size is not None else input_size @@ -30,18 +30,18 @@ def __init__( def forward( self, - input: Tensor, + features: Tensor, ) -> Tensor: - hidden_state = self.dense_input(input).relu() + hidden_state = self.dense_input(features).relu() output_state = self.dense_output(hidden_state) gate_state = self.gate(hidden_state).sigmoid() if self.residual_projection is not None: - residual = self.residual_projection(input) + residual = self.residual_projection(features) else: - residual = input + residual = features - gated_output = cast(Tensor, gate_state * output_state + residual) + gated_output = cast("Tensor", gate_state * output_state + residual) return self.layer_normalizer(gated_output) diff --git a/application/predictionengine/src/predictionengine/long_short_term_memory.py b/application/predictionengine/src/predictionengine/long_short_term_memory.py index f3e4420cd..627d94c6e 100644 --- a/application/predictionengine/src/predictionengine/long_short_term_memory.py +++ b/application/predictionengine/src/predictionengine/long_short_term_memory.py @@ -1,6 +1,5 @@ -from typing import List, Tuple -from tinygrad.tensor import Tensor from tinygrad.nn import LSTMCell +from tinygrad.tensor import Tensor class LongShortTermMemory: @@ -15,16 +14,16 @@ def __init__( self.layer_count = layer_count self.dropout_rate = dropout_rate - self.layers: List[LSTMCell] = [] + self.layers: list[LSTMCell] = [] for index in range(layer_count): input_size = input_size if index == 0 else self.hidden_size self.layers.append(LSTMCell(input_size, self.hidden_size)) def forward( self, - input: Tensor, - ) -> Tuple[Tensor, Tuple[Tensor, Tensor]]: - batch_size, sequence_length, _ = input.shape + features: Tensor, + ) -> tuple[Tensor, tuple[Tensor, Tensor]]: + batch_size, sequence_length, _ = features.shape hidden_state = Tensor.zeros( self.layer_count, batch_size, self.hidden_size @@ -36,7 +35,7 @@ def forward( outputs = [] for t in range(int(sequence_length)): - layer_input = input[:, t] + layer_input = features[:, t] for index, layer in enumerate(self.layers): layer_hidden_state, layer_cell_state = layer( @@ -59,8 +58,10 @@ def forward( outputs.append(hidden_state[-1]) if not outputs: - raise ValueError("Cannot stack empty outputs list") - elif len(outputs) == 1: + message = "Cannot stack empty outputs list" + raise ValueError(message) + + if len(outputs) == 1: output_tensor = outputs[0].unsqueeze(1) else: output_tensor = Tensor.stack(outputs[0], *outputs[1:], dim=1) diff --git a/application/predictionengine/src/predictionengine/loss_function.py b/application/predictionengine/src/predictionengine/loss_function.py index 66479e77a..fcbf49546 100644 --- a/application/predictionengine/src/predictionengine/loss_function.py +++ b/application/predictionengine/src/predictionengine/loss_function.py @@ -1,6 +1,7 @@ -from tinygrad.tensor import Tensor from typing import cast +from tinygrad.tensor import Tensor + Quantiles = tuple[float, float, float] | tuple[float, float, float, float, float] @@ -11,18 +12,18 @@ def quantile_loss( quantiles = (0.25, 0.5, 0.75) if y_pred.shape != y_true.shape: - raise ValueError( - f"Shape mismatch: y_pred {y_pred.shape} vs y_true {y_true.shape}" - ) + message = f"Shape mismatch: y_pred {y_pred.shape} vs y_true {y_true.shape}" + raise ValueError(message) if not all(0 <= q <= 1 for q in quantiles): - raise ValueError("All quantiles must be between 0 and 1") + message = "All quantiles must be between 0 and 1" + raise ValueError(message) loss: Tensor = Tensor.zeros(1) - error = cast(Tensor, y_true - y_pred) + error = cast("Tensor", y_true - y_pred) for quantile in quantiles: - quantile_error = cast(Tensor, quantile * error) - quantile_minus_one_error = cast(Tensor, (quantile - 1) * error) + quantile_error = cast("Tensor", quantile * error) + quantile_minus_one_error = cast("Tensor", (quantile - 1) * error) loss += Tensor.maximum(quantile_error, quantile_minus_one_error).mean() return loss diff --git a/application/predictionengine/src/predictionengine/main.py b/application/predictionengine/src/predictionengine/main.py index 20075aee5..e9c9736a8 100644 --- a/application/predictionengine/src/predictionengine/main.py +++ b/application/predictionengine/src/predictionengine/main.py @@ -1,17 +1,22 @@ import os import traceback -from typing import AsyncGenerator -from datetime import date, datetime, timedelta +from collections.abc import AsyncGenerator from contextlib import asynccontextmanager -import requests +from datetime import UTC, date, datetime, timedelta +from pathlib import Path + import polars as pl -from fastapi import FastAPI, Request, Response, status, HTTPException -from prometheus_fastapi_instrumentator import Instrumentator +import requests +from fastapi import FastAPI, HTTPException, Request, Response, status from loguru import logger -from .miniature_temporal_fusion_transformer import MiniatureTemporalFusionTransformer +from prometheus_fastapi_instrumentator import Instrumentator + from .dataset import DataSet +from .miniature_temporal_fusion_transformer import MiniatureTemporalFusionTransformer from .models import PredictionResponse +SEQUENCE_LENGTH = 30 + @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None]: @@ -40,7 +45,7 @@ def fetch_historical_data( "end_date": end_date.isoformat(), } - response = requests.get(url, params=parameters, timeout=30) + response = requests.get(url, params=parameters, timeout=SEQUENCE_LENGTH) response.raise_for_status() import pyarrow as pa @@ -55,7 +60,7 @@ def fetch_historical_data( def load_or_initialize_model(data: pl.DataFrame) -> MiniatureTemporalFusionTransformer: dataset = DataSet( batch_size=32, - sequence_length=30, + sequence_length=SEQUENCE_LENGTH, sample_count=len(data), ) dataset.load_data(data) @@ -74,25 +79,25 @@ def load_or_initialize_model(data: pl.DataFrame) -> MiniatureTemporalFusionTrans ticker_encoder=preprocessors["ticker_encoder"], dropout_rate=0.0, ) - model_path = "miniature_temporal_fusion_transformer.safetensor" - if os.path.exists(model_path): + if Path(model_path).exists(): try: model.load(model_path) logger.info("Loaded existing model weights") - except Exception as e: + except Exception as e: # noqa: BLE001 + logger.warning(f"Failed to load model weights: {e}") logger.warning(f"Failed to load model weights: {e}") return model -@application.post("/create-predictions", response_model=PredictionResponse) +@application.post("/create-predictions") async def create_predictions( request: Request, ) -> PredictionResponse: try: - end_date = datetime.now().date() - start_date = end_date - timedelta(days=30) + end_date = datetime.now(tz=UTC).date() + start_date = end_date - timedelta(days=SEQUENCE_LENGTH) logger.info(f"Fetching data from {start_date} to {end_date}") data = fetch_historical_data( @@ -100,7 +105,7 @@ async def create_predictions( ) if data.is_empty(): - raise HTTPException( + raise HTTPException( # noqa: TRY301 status_code=404, detail="No data available for prediction" ) @@ -115,15 +120,15 @@ async def create_predictions( for ticker in unique_tickers: ticker_data = data.filter(pl.col("ticker") == ticker) - if len(ticker_data) < 30: + if len(ticker_data) < SEQUENCE_LENGTH: logger.warning(f"Insufficient data for ticker {ticker}") continue - recent_data = ticker_data.tail(30) + recent_data = ticker_data.tail(SEQUENCE_LENGTH) dataset = DataSet( batch_size=1, - sequence_length=30, + sequence_length=SEQUENCE_LENGTH, sample_count=1, ) dataset.load_data(recent_data) @@ -145,7 +150,7 @@ async def create_predictions( } if not predictions: - raise HTTPException( + raise HTTPException( # noqa: TRY301 status_code=404, detail="No predictions could be generated" ) @@ -157,5 +162,5 @@ async def create_predictions( logger.error(f"Error creating predictions: {e}") logger.error(traceback.format_exc()) raise HTTPException( - status_code=500, detail=f"Internal server error: {str(e)}" + status_code=500, detail=f"Internal server error: {e!s}" ) from e diff --git a/application/predictionengine/src/predictionengine/miniature_temporal_fusion_transformer.py b/application/predictionengine/src/predictionengine/miniature_temporal_fusion_transformer.py index 97f0a5037..a193039d0 100644 --- a/application/predictionengine/src/predictionengine/miniature_temporal_fusion_transformer.py +++ b/application/predictionengine/src/predictionengine/miniature_temporal_fusion_transformer.py @@ -1,27 +1,26 @@ -from typing import Dict +import numpy as np from category_encoders import OrdinalEncoder -from .ticker_embedding import TickerEmbedding -from .long_short_term_memory import LongShortTermMemory -from .gated_residual_network import GatedResidualNetwork -from .multi_head_self_attention import MultiHeadSelfAttention -from .post_processor import PostProcessor -from tinygrad.tensor import Tensor from tinygrad.nn.optim import Adam from tinygrad.nn.state import ( get_parameters, get_state_dict, - safe_save, - safe_load, load_state_dict, + safe_load, + safe_save, ) -from typing import Tuple, List -import numpy as np +from tinygrad.tensor import Tensor + from .dataset import DataSet +from .gated_residual_network import GatedResidualNetwork +from .long_short_term_memory import LongShortTermMemory from .loss_function import quantile_loss +from .multi_head_self_attention import MultiHeadSelfAttention +from .post_processor import PostProcessor +from .ticker_embedding import TickerEmbedding class MiniatureTemporalFusionTransformer: - def __init__( + def __init__( # noqa: PLR0913 self, input_size: int, hidden_size: int, @@ -30,8 +29,8 @@ def __init__( ticker_count: int, embedding_size: int, attention_head_count: int, - means_by_ticker: Dict[str, Tensor], - standard_deviations_by_ticker: Dict[str, Tensor], + means_by_ticker: dict[str, Tensor], + standard_deviations_by_ticker: dict[str, Tensor], ticker_encoder: OrdinalEncoder, dropout_rate: float, # non-zero indicates training ) -> None: @@ -72,11 +71,15 @@ def __init__( self.parameters = get_parameters(self) + def get_parameters(self) -> list[Tensor]: + """Return all trainable parameters of the model.""" + return self.parameters + def forward( self, tickers: Tensor, features: Tensor, - ) -> Tuple[Tensor, Tensor, Tuple[np.ndarray, np.ndarray, np.ndarray]]: + ) -> tuple[Tensor, Tensor, tuple[np.ndarray, np.ndarray, np.ndarray]]: ticker_embeddings = self.ticker_embedding.forward( tickers ) # (batch_size, embedding_dim) @@ -109,11 +112,11 @@ def train( dataset: DataSet, epoch_count: int, learning_rate: float = 1e-3, - ) -> List[float]: + ) -> list[float]: optimizer = Adam(params=self.parameters, lr=learning_rate) quantiles = (0.25, 0.50, 0.75) - losses: List[float] = [] + losses: list[float] = [] for _ in range(epoch_count): epoch_loss = 0.0 @@ -152,7 +155,7 @@ def validate( average_loss = total_loss / batch_count - return average_loss + return average_loss # noqa: RET504 def save( self, @@ -171,9 +174,9 @@ def load( def predict( self, tickers: Tensor, - input: Tensor, - ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: - predictions, _, _ = self.forward(tickers, input) + features: Tensor, + ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: + predictions, _, _ = self.forward(tickers, features) percentile_25, percentile_50, percentile_75 = ( self.post_processor.post_process_predictions( diff --git a/application/predictionengine/src/predictionengine/models.py b/application/predictionengine/src/predictionengine/models.py index 37ba3e0d9..123d30a33 100644 --- a/application/predictionengine/src/predictionengine/models.py +++ b/application/predictionengine/src/predictionengine/models.py @@ -1,6 +1,5 @@ from pydantic import BaseModel -from typing import Dict class PredictionResponse(BaseModel): - predictions: Dict[str, Dict[str, float]] + predictions: dict[str, dict[str, float]] diff --git a/application/predictionengine/src/predictionengine/multi_head_self_attention.py b/application/predictionengine/src/predictionengine/multi_head_self_attention.py index 5ba0f83b9..86f787797 100644 --- a/application/predictionengine/src/predictionengine/multi_head_self_attention.py +++ b/application/predictionengine/src/predictionengine/multi_head_self_attention.py @@ -1,7 +1,8 @@ -from typing import Tuple, cast -from tinygrad.tensor import Tensor -from tinygrad.nn import Linear +from typing import cast + from tinygrad.dtype import dtypes +from tinygrad.nn import Linear +from tinygrad.tensor import Tensor class MultiHeadSelfAttention: @@ -11,7 +12,8 @@ def __init__( embedding_size: int, ) -> None: if embedding_size % heads_count != 0: - raise ValueError("Embedding dimension must be divisible by heads count") + message = "Embedding dimension must be divisible by heads count" + raise ValueError(message) self.heads_count = heads_count self.embedding_size = embedding_size @@ -27,13 +29,13 @@ def __init__( def forward( self, - input: Tensor, - ) -> Tuple[Tensor, Tensor]: - batch_size, sequence_length, _ = input.shape + features: Tensor, + ) -> tuple[Tensor, Tensor]: + batch_size, sequence_length, _ = features.shape - query_weights = self.query_weight(input) - key_weights = self.key_weight(input) - value_weights = self.value_weight(input) + query_weights = self.query_weight(features) + key_weights = self.key_weight(features) + value_weights = self.value_weight(features) query_weights = query_weights.view( (batch_size, sequence_length, self.heads_count, self.heads_dimension), @@ -49,7 +51,7 @@ def forward( query_weights.matmul(key_weights.transpose(-2, -1)) / self.scale ) - attention_weights: Tensor = cast(Tensor, attention_scores).softmax(axis=-1) + attention_weights: Tensor = cast("Tensor", attention_scores).softmax(axis=-1) attention_output = attention_weights.matmul(value_weights) diff --git a/application/predictionengine/src/predictionengine/post_processor.py b/application/predictionengine/src/predictionengine/post_processor.py index d0b615010..dbdb43523 100644 --- a/application/predictionengine/src/predictionengine/post_processor.py +++ b/application/predictionengine/src/predictionengine/post_processor.py @@ -1,15 +1,16 @@ -from typing import Dict, Tuple, Any -from tinygrad.tensor import Tensor -from category_encoders import OrdinalEncoder +from typing import Any + import numpy as np import polars as pl +from category_encoders import OrdinalEncoder +from tinygrad.tensor import Tensor class PostProcessor: def __init__( self, - means_by_ticker: Dict[str, Tensor], - standard_deviations_by_ticker: Dict[str, Tensor], + means_by_ticker: dict[str, Tensor], + standard_deviations_by_ticker: dict[str, Tensor], ticker_encoder: OrdinalEncoder, ) -> None: self.means_by_ticker = means_by_ticker @@ -20,7 +21,7 @@ def post_process_predictions( self, encoded_tickers: np.ndarray, predictions: np.ndarray, - ) -> Tuple[ + ) -> tuple[ np.ndarray[Any, np.dtype[np.float64]], np.ndarray[Any, np.dtype[np.float64]], np.ndarray[Any, np.dtype[np.float64]], @@ -40,7 +41,9 @@ def post_process_predictions( ticker not in self.means_by_ticker or ticker not in self.standard_deviations_by_ticker ): - raise ValueError(f"Statistics not found for ticker: {ticker}") + message = f"Statistics not found for ticker: {ticker}" + raise ValueError(message) + mean = self.means_by_ticker[ticker].numpy() standard_deviation = self.standard_deviations_by_ticker[ticker].numpy() rescaled_predictions[i, :] = predictions[i, :] * standard_deviation + mean diff --git a/application/predictionengine/tests/test_dataset.py b/application/predictionengine/tests/test_dataset.py index c43cb19a6..41d549df4 100644 --- a/application/predictionengine/tests/test_dataset.py +++ b/application/predictionengine/tests/test_dataset.py @@ -1,5 +1,6 @@ import polars as pl import pytest + from application.predictionengine.src.predictionengine.dataset import DataSet @@ -10,10 +11,10 @@ def test_dataset_initialization() -> None: sample_count=3, ) - assert dataset.batch_size == 2 - assert dataset.sequence_length == 3 - assert dataset.sample_count == 3 - assert len(dataset) == 2 + assert dataset.batch_size == 2 # noqa: PLR2004 + assert dataset.sequence_length == 3 # noqa: PLR2004 + assert dataset.sample_count == 3 # noqa: PLR2004 + assert len(dataset) == 2 # noqa: PLR2004 def test_dataset_load_data() -> None: diff --git a/application/predictionengine/tests/test_gated_residual_network.py b/application/predictionengine/tests/test_gated_residual_network.py index 617676302..3ec41dc22 100644 --- a/application/predictionengine/tests/test_gated_residual_network.py +++ b/application/predictionengine/tests/test_gated_residual_network.py @@ -1,5 +1,6 @@ -from tinygrad.tensor import Tensor import numpy as np +from tinygrad.tensor import Tensor + from application.predictionengine.src.predictionengine.gated_residual_network import ( GatedResidualNetwork, ) @@ -26,7 +27,8 @@ def test_gated_residual_network_initialization() -> None: def test_gated_residual_network_forward() -> None: grn = GatedResidualNetwork(input_size=32, hidden_size=64, output_size=32) - input_tensor = Tensor(np.random.randn(8, 32)) + default_range = np.random.default_rng() + input_tensor = Tensor(default_range.standard_normal((8, 32))) output = grn.forward(input_tensor) assert output.shape == (8, 32) @@ -35,7 +37,8 @@ def test_gated_residual_network_forward() -> None: def test_gated_residual_network_different_sizes() -> None: grn = GatedResidualNetwork(input_size=16, hidden_size=32, output_size=8) - input_tensor = Tensor(np.random.randn(4, 16)) + default_range = np.random.default_rng() + input_tensor = Tensor(default_range.standard_normal((4, 16))) output = grn.forward(input_tensor) assert output.shape == (4, 8) @@ -44,7 +47,8 @@ def test_gated_residual_network_different_sizes() -> None: def test_gated_residual_network_single_sample() -> None: grn = GatedResidualNetwork(input_size=10, hidden_size=20, output_size=10) - input_tensor = Tensor(np.random.randn(1, 10)) + default_range = np.random.default_rng() + input_tensor = Tensor(default_range.standard_normal((1, 10))) output = grn.forward(input_tensor) assert output.shape == (1, 10) @@ -53,7 +57,8 @@ def test_gated_residual_network_single_sample() -> None: def test_gated_residual_network_consistency() -> None: grn = GatedResidualNetwork(input_size=16, hidden_size=32, output_size=16) - input_tensor = Tensor(np.random.randn(2, 16)) + default_range = np.random.default_rng() + input_tensor = Tensor(default_range.standard_normal((2, 16))) output1 = grn.forward(input_tensor) output2 = grn.forward(input_tensor) diff --git a/application/predictionengine/tests/test_long_short_term_memory.py b/application/predictionengine/tests/test_long_short_term_memory.py index dd631c1bf..c3b513b06 100644 --- a/application/predictionengine/tests/test_long_short_term_memory.py +++ b/application/predictionengine/tests/test_long_short_term_memory.py @@ -1,5 +1,6 @@ -from tinygrad.tensor import Tensor import numpy as np +from tinygrad.tensor import Tensor + from application.predictionengine.src.predictionengine.long_short_term_memory import ( LongShortTermMemory, ) @@ -10,9 +11,9 @@ def test_lstm_initialization() -> None: input_size=32, hidden_size=64, layer_count=2, dropout_rate=0.1 ) - assert lstm.hidden_size == 64 - assert lstm.layer_count == 2 - assert lstm.dropout_rate == 0.1 + assert lstm.hidden_size == 64 # noqa: PLR2004 + assert lstm.layer_count == 2 # noqa: PLR2004 + assert lstm.dropout_rate == 0.1 # noqa: PLR2004 def test_lstm_forward() -> None: @@ -20,12 +21,13 @@ def test_lstm_forward() -> None: input_size=16, hidden_size=32, layer_count=1, dropout_rate=0.0 ) - input_tensor = Tensor(np.random.randn(4, 10, 16)) + default_range = np.random.default_rng() + input_tensor = Tensor(default_range.standard_normal((4, 10, 16))) output, hidden_state = lstm.forward(input_tensor) assert output.shape == (4, 10, 32) assert isinstance(hidden_state, tuple) - assert len(hidden_state) == 2 + assert len(hidden_state) == 2 # noqa: PLR2004 def test_lstm_different_sequence_lengths() -> None: @@ -33,11 +35,12 @@ def test_lstm_different_sequence_lengths() -> None: input_size=8, hidden_size=16, layer_count=1, dropout_rate=0.0 ) - for seq_len in [5, 10, 20]: - input_tensor = Tensor(np.random.randn(2, seq_len, 8)) - output, hidden_state = lstm.forward(input_tensor) + for sequence_length in [5, 10, 20]: + default_range = np.random.default_rng() + input_tensor = Tensor(default_range.standard_normal((2, sequence_length, 8))) + output, _ = lstm.forward(input_tensor) - assert output.shape == (2, seq_len, 16) + assert output.shape == (2, sequence_length, 16) def test_lstm_multiple_layers() -> None: @@ -45,7 +48,8 @@ def test_lstm_multiple_layers() -> None: input_size=10, hidden_size=20, layer_count=3, dropout_rate=0.0 ) - input_tensor = Tensor(np.random.randn(2, 5, 10)) + default_range = np.random.default_rng() + input_tensor = Tensor(default_range.standard_normal((2, 5, 10))) output, hidden_state = lstm.forward(input_tensor) assert output.shape == (2, 5, 20) @@ -57,7 +61,8 @@ def test_lstm_single_timestep() -> None: input_size=12, hidden_size=24, layer_count=1, dropout_rate=0.0 ) - input_tensor = Tensor(np.random.randn(3, 1, 12)) + default_range = np.random.default_rng() + input_tensor = Tensor(default_range.standard_normal((3, 1, 12))) output, _ = lstm.forward(input_tensor) assert output.shape == (3, 1, 24) @@ -68,7 +73,8 @@ def test_lstm_consistency() -> None: input_size=6, hidden_size=12, layer_count=1, dropout_rate=0.0 ) - input_tensor = Tensor(np.random.randn(1, 3, 6)) + default_range = np.random.default_rng() + input_tensor = Tensor(default_range.standard_normal((1, 3, 6))) first_output, _ = lstm.forward(input_tensor) second_output, _ = lstm.forward(input_tensor) diff --git a/application/predictionengine/tests/test_loss_function.py b/application/predictionengine/tests/test_loss_function.py index 6bebf839f..dee69c386 100644 --- a/application/predictionengine/tests/test_loss_function.py +++ b/application/predictionengine/tests/test_loss_function.py @@ -1,6 +1,7 @@ -from tinygrad.tensor import Tensor import numpy as np import pytest +from tinygrad.tensor import Tensor + from application.predictionengine.src.predictionengine.loss_function import ( quantile_loss, ) @@ -14,7 +15,7 @@ def test_quantile_loss_basic() -> None: loss = quantile_loss(predictions, targets, quantiles) assert isinstance(loss, Tensor) - assert loss.shape == () or loss.shape == (1,) + assert loss.shape == () or loss.shape == (1,) # noqa: PLR1714 def test_quantile_loss_multiple_samples() -> None: @@ -25,7 +26,7 @@ def test_quantile_loss_multiple_samples() -> None: loss = quantile_loss(predictions, targets, quantiles) assert isinstance(loss, Tensor) - assert loss.shape == () or loss.shape == (1,) + assert loss.shape == () or loss.shape == (1,) # noqa: PLR1714 def test_quantile_loss_perfect_prediction() -> None: @@ -51,8 +52,13 @@ def test_quantile_loss_different_quantiles() -> None: def test_quantile_loss_shapes() -> None: for batch_size in [1, 2, 4, 8]: - predictions = Tensor(np.random.randn(batch_size, 1).astype(np.float32)) - targets = Tensor(np.random.randn(batch_size, 1).astype(np.float32)) + default_range = np.random.default_rng() + predictions = Tensor( + default_range.standard_normal((batch_size, 1)).astype(np.float32) + ) + targets = Tensor( + default_range.standard_normal((batch_size, 1)).astype(np.float32) + ) quantiles = (0.25, 0.5, 0.75) loss = quantile_loss(predictions, targets, quantiles) diff --git a/application/predictionengine/tests/test_multi_head_self_attention.py b/application/predictionengine/tests/test_multi_head_self_attention.py index d818eea00..ca3c9f6e1 100644 --- a/application/predictionengine/tests/test_multi_head_self_attention.py +++ b/application/predictionengine/tests/test_multi_head_self_attention.py @@ -1,6 +1,7 @@ -from tinygrad.tensor import Tensor import numpy as np -from application.predictionengine.src.predictionengine.multi_head_self_attention import ( +from tinygrad.tensor import Tensor + +from application.predictionengine.src.predictionengine.multi_head_self_attention import ( # noqa: E501 MultiHeadSelfAttention, ) @@ -8,19 +9,20 @@ def test_multi_head_attention_initialization() -> None: attention = MultiHeadSelfAttention(heads_count=8, embedding_size=64) - assert attention.heads_count == 8 - assert attention.embedding_size == 64 + assert attention.heads_count == 8 # noqa: PLR2004 + assert attention.embedding_size == 64 # noqa: PLR2004 def test_multi_head_attention_forward() -> None: attention = MultiHeadSelfAttention(heads_count=4, embedding_size=32) - input_tensor = Tensor(np.random.randn(2, 10, 32)) + default_range = np.random.default_rng() + input_tensor = Tensor(default_range.standard_normal((2, 10, 32))) output, attention_weights = attention.forward(input_tensor) assert output.shape == (2, 10, 32) - assert attention_weights.shape[0] == 2 # batch size - assert attention_weights.shape[1] == 4 # heads count + assert attention_weights.shape[0] == 2 # noqa: PLR2004 batch size + assert attention_weights.shape[1] == 4 # noqa: PLR2004 heads count def test_multi_head_attention_different_heads() -> None: @@ -30,7 +32,8 @@ def test_multi_head_attention_different_heads() -> None: heads_count=heads_count, embedding_size=embedding_size ) - input_tensor = Tensor(np.random.randn(1, 5, embedding_size)) + default_range = np.random.default_rng() + input_tensor = Tensor(default_range.standard_normal((1, 5, embedding_size))) output, attention_weights = attention.forward(input_tensor) assert output.shape == (1, 5, embedding_size) @@ -40,7 +43,8 @@ def test_multi_head_attention_different_heads() -> None: def test_multi_head_attention_single_sequence() -> None: attention = MultiHeadSelfAttention(heads_count=2, embedding_size=16) - input_tensor = Tensor(np.random.randn(1, 1, 16)) + default_range = np.random.default_rng() + input_tensor = Tensor(default_range.standard_normal((1, 1, 16))) output, _ = attention.forward(input_tensor) assert output.shape == (1, 1, 16) @@ -49,18 +53,20 @@ def test_multi_head_attention_single_sequence() -> None: def test_multi_head_attention_longer_sequences() -> None: attention = MultiHeadSelfAttention(heads_count=4, embedding_size=64) - for seq_len in [10, 20, 50]: - input_tensor = Tensor(np.random.randn(1, seq_len, 64)) + for sequence_length in [10, 20, 50]: + default_range = np.random.default_rng() + input_tensor = Tensor(default_range.standard_normal((1, sequence_length, 64))) output, _ = attention.forward(input_tensor) - assert output.shape == (1, seq_len, 64) + assert output.shape == (1, sequence_length, 64) def test_multi_head_attention_batch_processing() -> None: attention = MultiHeadSelfAttention(heads_count=2, embedding_size=32) for batch_size in [1, 2, 4, 8]: - input_tensor = Tensor(np.random.randn(batch_size, 5, 32)) + default_range = np.random.default_rng() + input_tensor = Tensor(default_range.standard_normal((batch_size, 5, 32))) output, attention_weights = attention.forward(input_tensor) assert output.shape == (batch_size, 5, 32) diff --git a/application/predictionengine/tests/test_post_processor.py b/application/predictionengine/tests/test_post_processor.py index 0caf8c0cb..cdf278a2e 100644 --- a/application/predictionengine/tests/test_post_processor.py +++ b/application/predictionengine/tests/test_post_processor.py @@ -1,7 +1,8 @@ -from category_encoders import OrdinalEncoder +import numpy as np import polars as pl +from category_encoders import OrdinalEncoder from tinygrad.tensor import Tensor -import numpy as np + from application.predictionengine.src.predictionengine.post_processor import ( PostProcessor, ) @@ -66,9 +67,9 @@ def test_post_processor_predictions() -> None: assert isinstance(percentile_25, np.ndarray) assert isinstance(percentile_50, np.ndarray) assert isinstance(percentile_75, np.ndarray) - assert len(percentile_25) == 2 - assert len(percentile_50) == 2 - assert len(percentile_75) == 2 + assert len(percentile_25) == 2 # noqa: PLR2004 + assert len(percentile_50) == 2 # noqa: PLR2004 + assert len(percentile_75) == 2 # noqa: PLR2004 assert np.all(percentile_25 <= percentile_50) assert np.all(percentile_50 <= percentile_75) diff --git a/application/predictionengine/tests/test_ticker_embedding.py b/application/predictionengine/tests/test_ticker_embedding.py index abb70d349..5928832d6 100644 --- a/application/predictionengine/tests/test_ticker_embedding.py +++ b/application/predictionengine/tests/test_ticker_embedding.py @@ -1,4 +1,5 @@ from tinygrad.tensor import Tensor + from application.predictionengine.src.predictionengine.ticker_embedding import ( TickerEmbedding, ) diff --git a/workflows/backfill_datamanager.py b/workflows/backfill_datamanager.py index d1bcd1724..4e7a8a3b8 100644 --- a/workflows/backfill_datamanager.py +++ b/workflows/backfill_datamanager.py @@ -13,7 +13,7 @@ def backfill_single_date(base_url: str, day: date) -> int: @workflow def backfill_equity_bars(base_url: str, start_date: date, end_date: date) -> list[int]: - results: list[int] = [] + results = [] current = start_date while current <= end_date: results.append(backfill_single_date(base_url=base_url, day=current)) diff --git a/workflows/prediction_model.py b/workflows/prediction_model.py deleted file mode 100644 index 986821143..000000000 --- a/workflows/prediction_model.py +++ /dev/null @@ -1,49 +0,0 @@ -import os -import pickle -import statistics -import uuid -from datetime import datetime -from pathlib import Path -from typing import Any, Dict, List - -import requests -from flytekit import task, workflow - - -@task -def fetch_data(start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: - base_url = os.getenv("DATAMANAGER_BASE_URL", "http://localhost:8080") - response = requests.get( - f"{base_url}/equity-bars", - params={"start_date": start_date.isoformat(), "end_date": end_date.isoformat()}, - timeout=10, - ) - response.raise_for_status() - return response.json().get("data", []) - - -@task -def train_dummy_model(data: List[Dict[str, Any]]) -> bytes: - """Train a trivial model that stores the average close price.""" - close_prices = [row.get("close_price", 0.0) for row in data] - mean_close = statistics.mean(close_prices) if close_prices else 0.0 - model = {"average_close_price": mean_close} - return pickle.dumps(model) - - -@task -def store_model(model_bytes: bytes) -> str: - """Store the serialized model in blob storage.""" - bucket_path = os.getenv("MODEL_BUCKET", "/tmp") - filename = f"model-{uuid.uuid4().hex}.pkl" - path = Path(bucket_path) / filename - path.write_bytes(model_bytes) - return str(path) - - -@workflow -def training_workflow(start_date: datetime, end_date: datetime) -> None: - data = fetch_data(start_date=start_date, end_date=end_date) - model_bytes = train_dummy_model(data=data) - artifact_path = store_model(model_bytes=model_bytes) - return diff --git a/workflows/pyproject.toml b/workflows/pyproject.toml index 4302983ef..ff5006991 100644 --- a/workflows/pyproject.toml +++ b/workflows/pyproject.toml @@ -6,6 +6,7 @@ requires-python = ">=3.13" dependencies = [ "flytekit>=1.10.0", "httpx>=0.28.1", + "loguru>=0.7.3", ] [tool.hatch.build.targets.wheel] diff --git a/workflows/train_predctionengine.py b/workflows/train_predctionengine.py new file mode 100644 index 000000000..ec6e62856 --- /dev/null +++ b/workflows/train_predctionengine.py @@ -0,0 +1,138 @@ +import os +import tempfile +import uuid +from datetime import datetime +from pathlib import Path +from typing import Any, cast + +import polars as pl +import pyarrow as pa +import requests +from flytekit import task, workflow +from loguru import logger +from tinygrad.nn.state import get_state_dict, safe_save + +from application.predictionengine.src.predictionengine.dataset import DataSet +from application.predictionengine.src.predictionengine.miniature_temporal_fusion_transformer import ( # noqa: E501 + MiniatureTemporalFusionTransformer, +) + + +@task +def fetch_data( + start_date: datetime, + end_date: datetime, +) -> list[dict[str, Any]]: + base_url = os.getenv("DATAMANAGER_BASE_URL", "http://localhost:8080") + response = requests.get( + f"{base_url}/equity-bars", + params={ + "start_date": start_date.date().isoformat(), + "end_date": end_date.date().isoformat(), + }, + timeout=30, + ) + response.raise_for_status() + + buffer = pa.py_buffer(response.content) + reader = pa.ipc.RecordBatchStreamReader(buffer) + table = reader.read_all() + + data = pl.DataFrame(pl.from_arrow(table)) + + data = data.with_columns( + [ + pl.col("t").cast(pl.Datetime).alias("timestamp"), + pl.col("o").alias("open_price"), + pl.col("h").alias("high_price"), + pl.col("l").alias("low_price"), + pl.col("c").alias("close_price"), + pl.col("v").alias("volume"), + pl.col("vw").alias("volume_weighted_average_price"), + pl.col("T").alias("ticker"), + ] + ).select( + [ + "timestamp", + "open_price", + "high_price", + "low_price", + "close_price", + "volume", + "volume_weighted_average_price", + "ticker", + ] + ) + + return data.to_dicts() + + +@task +def train_model( + data: list[dict[str, Any]], + epochs: int = 100, +) -> bytes: + if not data: + msg = "No data provided for training" + raise ValueError(msg) + + training_data = pl.DataFrame(data) + + dataset = DataSet( + batch_size=32, + sequence_length=30, + sample_count=len(training_data), + ) + dataset.load_data(training_data) + preprocessors = dataset.get_preprocessors() + + model = MiniatureTemporalFusionTransformer( + input_size=6, + hidden_size=128, + output_size=3, + layer_count=2, + ticker_count=len(training_data["ticker"].unique()), + embedding_size=32, + attention_head_count=4, + means_by_ticker=preprocessors["means_by_ticker"], + standard_deviations_by_ticker=preprocessors["standard_deviations_by_ticker"], + ticker_encoder=preprocessors["ticker_encoder"], + dropout_rate=0.1, + ) + + losses = model.train(dataset, epochs, learning_rate=0.001) + + for epoch, loss in enumerate(losses): + if epoch % 10 == 0: + logger.info(f"Epoch {epoch}, Loss: {loss}") + + with tempfile.NamedTemporaryFile( + suffix=".safetensor", + delete=False, + ) as temporary_file: + safe_save(get_state_dict(model), temporary_file.name) + temporary_file.seek(0) + model_bytes = temporary_file.read() + + return model_bytes # noqa: RET504 + + +@task +def store_model(model_bytes: bytes) -> str: + bucket_path = os.getenv("MODEL_BUCKET", "/tmp") # noqa: S108 + filename = f"miniature_temporal_fusion_transformer-{uuid.uuid4().hex}.safetensor" + path = Path(bucket_path) / filename + path.write_bytes(model_bytes) + return str(path) + + +@workflow +def training_workflow( + start_date: datetime, + end_date: datetime, + epochs: int = 100, +) -> str: + data = fetch_data(start_date=start_date, end_date=end_date) + model_bytes = train_model(data=cast("list[dict[str, Any]]", data), epochs=epochs) + artifact_path = store_model(model_bytes=cast("bytes", model_bytes)) + return cast("str", artifact_path) From b870adfd67ee0e5976ef2b7f9fa0cc2a5cddda92 Mon Sep 17 00:00:00 2001 From: chrisaddy Date: Wed, 4 Jun 2025 21:03:44 -0400 Subject: [PATCH 04/17] rebasing master --- application/positionmanager/src/positionmanager/clients.py | 4 +--- .../predictionengine/tests/test_long_short_term_memory.py | 2 +- .../predictionengine/tests/test_multi_head_self_attention.py | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/application/positionmanager/src/positionmanager/clients.py b/application/positionmanager/src/positionmanager/clients.py index 423c85754..9616f564c 100644 --- a/application/positionmanager/src/positionmanager/clients.py +++ b/application/positionmanager/src/positionmanager/clients.py @@ -89,9 +89,7 @@ def get_data( message = f"Data manager service call error: {err}" raise RuntimeError(message) from err - if response.status_code == requests.codes.not_found: - return pl.DataFrame() - if response.status_code != requests.codes.ok: + if not response.ok: message = f"Data service error: {response.text}, status code: {response.status_code}" # noqa: E501 raise requests.HTTPError( message, diff --git a/application/predictionengine/tests/test_long_short_term_memory.py b/application/predictionengine/tests/test_long_short_term_memory.py index f0d59d4a9..2e08fbba9 100644 --- a/application/predictionengine/tests/test_long_short_term_memory.py +++ b/application/predictionengine/tests/test_long_short_term_memory.py @@ -52,7 +52,7 @@ def test_lstm_different_sequence_lengths() -> None: input_tensor = Tensor(rng.standard_normal((2, seq_len, 8))) output, hidden_state = lstm.forward(input_tensor) - assert output.shape == (2, sequence_length, 16) + assert output.shape == (2, seq_len, 16) def test_lstm_multiple_layers() -> None: diff --git a/application/predictionengine/tests/test_multi_head_self_attention.py b/application/predictionengine/tests/test_multi_head_self_attention.py index 2872c9a38..692f99b2c 100644 --- a/application/predictionengine/tests/test_multi_head_self_attention.py +++ b/application/predictionengine/tests/test_multi_head_self_attention.py @@ -61,7 +61,7 @@ def test_multi_head_attention_longer_sequences() -> None: input_tensor = Tensor(rng.standard_normal((1, seq_len, 64))) output, _ = attention.forward(input_tensor) - assert output.shape == (1, sequence_length, 64) + assert output.shape == (1, seq_len, 64) def test_multi_head_attention_batch_processing() -> None: From 3b05fa47be92e0faf41bc78313d1bd0f51d40a7d Mon Sep 17 00:00:00 2001 From: John Forstmeier Date: Wed, 4 Jun 2025 21:16:03 -0400 Subject: [PATCH 05/17] Various fixes --- application/positionmanager/pyproject.toml | 1 + application/positionmanager/src/positionmanager/clients.py | 4 +--- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/application/positionmanager/pyproject.toml b/application/positionmanager/pyproject.toml index 0dac627de..303749482 100644 --- a/application/positionmanager/pyproject.toml +++ b/application/positionmanager/pyproject.toml @@ -14,6 +14,7 @@ dependencies = [ "pyportfolioopt>=1.5.6", "ecos>=2.0.14", "prometheus-fastapi-instrumentator>=7.1.0", + "pyarrow>=20.0.0", ] [tool.hatch.build.targets.wheel] diff --git a/application/positionmanager/src/positionmanager/clients.py b/application/positionmanager/src/positionmanager/clients.py index 9616f564c..ac5c5a91f 100644 --- a/application/positionmanager/src/positionmanager/clients.py +++ b/application/positionmanager/src/positionmanager/clients.py @@ -91,9 +91,7 @@ def get_data( if not response.ok: message = f"Data service error: {response.text}, status code: {response.status_code}" # noqa: E501 - raise requests.HTTPError( - message, - ) + raise requests.HTTPError(message) buffer = pa.py_buffer(response.content) reader = pa.ipc.RecordBatchStreamReader(buffer) From c329c4ba07a598b014bbc9bfe450e2db977237a5 Mon Sep 17 00:00:00 2001 From: John Forstmeier Date: Wed, 4 Jun 2025 22:40:59 -0400 Subject: [PATCH 06/17] Add basic tests for datamanager --- .../tests/test_datamanager_main.py | 93 +++++++++++++ .../tests/test_datamanager_models.py | 131 ++++++++++++++++++ 2 files changed, 224 insertions(+) create mode 100644 application/datamanager/tests/test_datamanager_main.py create mode 100644 application/datamanager/tests/test_datamanager_models.py diff --git a/application/datamanager/tests/test_datamanager_main.py b/application/datamanager/tests/test_datamanager_main.py new file mode 100644 index 000000000..268d2e3b3 --- /dev/null +++ b/application/datamanager/tests/test_datamanager_main.py @@ -0,0 +1,93 @@ +import unittest +from datetime import date +from unittest.mock import MagicMock, patch + +from fastapi import status +from fastapi.testclient import TestClient + +from application.datamanager.src.datamanager.main import application +from application.datamanager.src.datamanager.models import BarsSummary, SummaryDate + +client = TestClient(application) + + +def test_health_check() -> None: + response = client.get("/health") + assert response.status_code == status.HTTP_200_OK + + +class TestDataManagerModels(unittest.TestCase): + def test_summary_date_default(self) -> None: + summary_date = SummaryDate() + assert isinstance(summary_date.date, date) + + def test_summary_date_with_date(self) -> None: + test_date = date(2023, 1, 1) + summary_date = SummaryDate(date=test_date) + assert summary_date.date == test_date + + def test_summary_date_string_parsing(self) -> None: + summary_date = SummaryDate(date="2023-01-01") # type: ignore + assert summary_date.date == date(2023, 1, 1) + + def test_bars_summary_creation(self) -> None: + bars_summary = BarsSummary(date="2023-01-01", count=100) + assert bars_summary.date == "2023-01-01" + assert bars_summary.count == 100 # noqa: PLR2004 + + +class TestEquityBarsEndpoints(unittest.TestCase): + def test_get_equity_bars_missing_parameters(self) -> None: + response = client.get("/equity-bars") + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + def test_get_equity_bars_invalid_date_format(self) -> None: + response = client.get( + "/equity-bars", + params={"start_date": "invalid-date", "end_date": "2023-01-02"}, + ) + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + def test_post_equity_bars_missing_body(self) -> None: + response = client.post("/equity-bars") + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + def test_post_equity_bars_invalid_date(self) -> None: + response = client.post("/equity-bars", json={"date": "invalid-date"}) + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + def test_delete_equity_bars_missing_body(self) -> None: + response = client.request("DELETE", "/equity-bars") + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + def test_delete_equity_bars_invalid_date(self) -> None: + response = client.request( + "DELETE", "/equity-bars", json={"date": "invalid-date"} + ) + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + @patch("application.datamanager.src.datamanager.main.duckdb") + def test_get_equity_bars_database_error(self, mock_duckdb: MagicMock) -> None: + from duckdb import IOException + + mock_connection = MagicMock() + mock_connection.execute.side_effect = IOException("Database error") + mock_duckdb.connect.return_value = mock_connection + + mock_settings = MagicMock() + mock_settings.gcp.bucket.name = "test-bucket" + + with patch.object(application, "state") as mock_app_state: + mock_app_state.connection = mock_connection + mock_app_state.settings = mock_settings + + response = client.get( + "/equity-bars", + params={"start_date": "2023-01-01", "end_date": "2023-01-02"}, + ) + + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + + +if __name__ == "__main__": + unittest.main() diff --git a/application/datamanager/tests/test_datamanager_models.py b/application/datamanager/tests/test_datamanager_models.py new file mode 100644 index 000000000..7ccbbf64e --- /dev/null +++ b/application/datamanager/tests/test_datamanager_models.py @@ -0,0 +1,131 @@ +import unittest +from datetime import date + +import pytest +from pydantic import ValidationError + +from application.datamanager.src.datamanager.models import ( + BarsSummary, + DateRange, + SummaryDate, +) + + +class TestSummaryDate(unittest.TestCase): + def test_summary_date_initialization_default(self) -> None: + summary_date = SummaryDate() + assert isinstance(summary_date.date, date) + + def test_summary_date_initialization_with_date(self) -> None: + test_date = date(2023, 5, 15) + summary_date = SummaryDate(date=test_date) + assert summary_date.date == test_date + + def test_summary_date_string_parsing_iso_format(self) -> None: + summary_date = SummaryDate(date="2023-5-15") # type: ignore + assert summary_date.date == date(2023, 5, 15) + + def test_summary_date_string_parsing_slash_format(self) -> None: + summary_date = SummaryDate(date="2023/05/15") # type: ignore + assert summary_date.date == date(2023, 5, 15) + + def test_summary_date_invalid_format(self) -> None: + with pytest.raises(ValidationError, match="Invalid date format"): + SummaryDate(date="invalid-date") # type: ignore + + def test_summary_date_invalid_date_values(self) -> None: + with pytest.raises(ValidationError): + SummaryDate(date="2023-13-01") # type: ignore + + def test_summary_date_json_encoder(self) -> None: + test_date = date(2023, 5, 15) + summary_date = SummaryDate(date=test_date) + json_data = summary_date.model_dump(mode="json") + assert json_data["date"] == "2023/05/15" + + +class TestDateRange(unittest.TestCase): + def test_date_range_valid(self) -> None: + start_date = date(2023, 1, 1) + end_date = date(2023, 12, 31) + date_range = DateRange(start=start_date, end=end_date) + + assert date_range.start == start_date + assert date_range.end == end_date + + def test_date_range_same_dates(self) -> None: + same_date = date(2023, 5, 15) + with pytest.raises(ValidationError, match="End date must be after start date"): + DateRange(start=same_date, end=same_date) + + def test_date_range_end_before_start(self) -> None: + start_date = date(2023, 12, 31) + end_date = date(2023, 1, 1) + with pytest.raises(ValidationError, match="End date must be after start date"): + DateRange(start=start_date, end=end_date) + + def test_date_range_valid_one_day_apart(self) -> None: + start_date = date(2023, 5, 15) + end_date = date(2023, 5, 16) + date_range = DateRange(start=start_date, end=end_date) + + assert date_range.start == start_date + assert date_range.end == end_date + + +class TestBarsSummary(unittest.TestCase): + def test_bars_summary_initialization(self) -> None: + bars_summary = BarsSummary(date="2023-05-15", count=1500) + + assert bars_summary.date == "2023-05-15" + assert bars_summary.count == 1500 # noqa: PLR2004 + + def test_bars_summary_zero_count(self) -> None: + bars_summary = BarsSummary(date="2023-05-15", count=0) + + assert bars_summary.date == "2023-05-15" + assert bars_summary.count == 0 + + def test_bars_summary_negative_count(self) -> None: + bars_summary = BarsSummary(date="2023-05-15", count=-1) + + assert bars_summary.date == "2023-05-15" + assert bars_summary.count == -1 + + def test_bars_summary_json_serialization(self) -> None: + bars_summary = BarsSummary(date="2023-05-15", count=1500) + json_data = bars_summary.model_dump() + + assert json_data == {"date": "2023-05-15", "count": 1500} + + def test_bars_summary_from_dict(self) -> None: + data = {"date": "2023-05-15", "count": 1500} + bars_summary = BarsSummary.model_validate(data) + + assert bars_summary.date == "2023-05-15" + assert bars_summary.count == 1500 # noqa: PLR2004 + + +class TestModelIntegration(unittest.TestCase): + def test_summary_date_to_bars_summary(self) -> None: + summary_date = SummaryDate(date="2023-05-15") # type: ignore + bars_summary = BarsSummary( + date=summary_date.date.strftime("%Y-%m-%d"), count=100 + ) + + assert bars_summary.date == "2023-05-15" + assert bars_summary.count == 100 # noqa: PLR2004 + + def test_multiple_model_validation(self) -> None: + summary_date = SummaryDate(date="2023-05-15") # type: ignore + date_range = DateRange(start=date(2023, 1, 1), end=date(2023, 12, 31)) + bars_summary = BarsSummary(date="2023-05-15", count=1000) + + assert summary_date.date == date(2023, 5, 15) + assert date_range.start == date(2023, 1, 1) + assert date_range.end == date(2023, 12, 31) + assert bars_summary.count == 1000 # noqa: PLR2004 + + +if __name__ == "__main__": + unittest.main() From f872aae4b09820dacbd82ca385604334b00f0ff8 Mon Sep 17 00:00:00 2001 From: John Forstmeier Date: Fri, 6 Jun 2025 10:05:00 -0400 Subject: [PATCH 07/17] Remove Flox environment variables to fix breaking tests --- .flox/env/manifest.toml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/.flox/env/manifest.toml b/.flox/env/manifest.toml index 993d21e53..82807107a 100644 --- a/.flox/env/manifest.toml +++ b/.flox/env/manifest.toml @@ -12,14 +12,6 @@ nushell.pkg-path = "nushell" fd.pkg-path = "fd" fselect.pkg-path = "fselect" -[vars] -ALPACA_API_KEY = "${ALPACA_API_KEY}" -ALPACA_API_SECRET = "${ALPACA_API_SECRET}" -POLYGON_API_KEY = "${POLYGON_API_KEY}" -DATA_BUCKET = "${DATA_BUCKET}" -DUCKDB_ACCESS_KEY = "${DUCKDB_ACCESS_KEY}" -DUCKDB_SECRET = "${DUCKDB_SECRET}" - [options] systems = [ "aarch64-darwin", From 14a698431f29c576ff08193ad74ad65572a58d0a Mon Sep 17 00:00:00 2001 From: chrisaddy Date: Fri, 6 Jun 2025 10:26:11 -0400 Subject: [PATCH 08/17] fix some linting issues fix linting From 43af2daab0fbb1771419c9690debc5feaa243c90 Mon Sep 17 00:00:00 2001 From: John Forstmeier Date: Mon, 2 Jun 2025 21:26:45 -0400 Subject: [PATCH 09/17] Fix inter-service communication patterns --- .../positionmanager/src/positionmanager/clients.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/application/positionmanager/src/positionmanager/clients.py b/application/positionmanager/src/positionmanager/clients.py index ac5c5a91f..a8a1af54f 100644 --- a/application/positionmanager/src/positionmanager/clients.py +++ b/application/positionmanager/src/positionmanager/clients.py @@ -89,9 +89,12 @@ def get_data( message = f"Data manager service call error: {err}" raise RuntimeError(message) from err - if not response.ok: - message = f"Data service error: {response.text}, status code: {response.status_code}" # noqa: E501 - raise requests.HTTPError(message) + if response.status_code == 404: + return pl.DataFrame() + elif response.status_code != 200: + raise Exception( + f"Data service error: {response.text}, status code: {response.status_code}", + ) buffer = pa.py_buffer(response.content) reader = pa.ipc.RecordBatchStreamReader(buffer) From 0be3104aec043051b6fa1c2e8cff5e41c6416914 Mon Sep 17 00:00:00 2001 From: John Forstmeier Date: Wed, 4 Jun 2025 20:36:37 -0400 Subject: [PATCH 10/17] Integrate predictionengine into the workflows --- .../src/positionmanager/clients.py | 9 ++-- .../src/predictionengine/main.py | 5 +- .../src/predictionengine/post_processor.py | 6 ++- .../tests/test_long_short_term_memory.py | 2 +- .../tests/test_multi_head_self_attention.py | 2 +- workflows/prediction_model.py | 48 ------------------- 6 files changed, 14 insertions(+), 58 deletions(-) delete mode 100644 workflows/prediction_model.py diff --git a/application/positionmanager/src/positionmanager/clients.py b/application/positionmanager/src/positionmanager/clients.py index a8a1af54f..423c85754 100644 --- a/application/positionmanager/src/positionmanager/clients.py +++ b/application/positionmanager/src/positionmanager/clients.py @@ -89,11 +89,12 @@ def get_data( message = f"Data manager service call error: {err}" raise RuntimeError(message) from err - if response.status_code == 404: + if response.status_code == requests.codes.not_found: return pl.DataFrame() - elif response.status_code != 200: - raise Exception( - f"Data service error: {response.text}, status code: {response.status_code}", + if response.status_code != requests.codes.ok: + message = f"Data service error: {response.text}, status code: {response.status_code}" # noqa: E501 + raise requests.HTTPError( + message, ) buffer = pa.py_buffer(response.content) diff --git a/application/predictionengine/src/predictionengine/main.py b/application/predictionengine/src/predictionengine/main.py index bf0861288..9da95c7f8 100644 --- a/application/predictionengine/src/predictionengine/main.py +++ b/application/predictionengine/src/predictionengine/main.py @@ -88,8 +88,9 @@ def load_or_initialize_model(data: pl.DataFrame) -> MiniatureTemporalFusionTrans try: model.load(model_path) logger.info("Loaded existing model weights") - except LoadError as e: - logger.error(f"Failed to load model weights: {e}") + except Exception as e: # noqa: BLE001 + logger.warning(f"Failed to load model weights: {e}") + logger.warning(f"Failed to load model weights: {e}") return model diff --git a/application/predictionengine/src/predictionengine/post_processor.py b/application/predictionengine/src/predictionengine/post_processor.py index 7f07c2409..f82ab737c 100644 --- a/application/predictionengine/src/predictionengine/post_processor.py +++ b/application/predictionengine/src/predictionengine/post_processor.py @@ -1,3 +1,5 @@ +from typing import Any + import numpy as np import numpy.typing as npt import polars as pl @@ -44,8 +46,8 @@ def post_process_predictions( ticker not in self.means_by_ticker or ticker not in self.standard_deviations_by_ticker ): - msg = f"Statistics not found for ticker: {ticker}" - raise ValueError(msg) + message = f"Statistics not found for ticker: {ticker}" + raise ValueError(message) mean = self.means_by_ticker[ticker].numpy() standard_deviation = self.standard_deviations_by_ticker[ticker].numpy() diff --git a/application/predictionengine/tests/test_long_short_term_memory.py b/application/predictionengine/tests/test_long_short_term_memory.py index 2e08fbba9..f0d59d4a9 100644 --- a/application/predictionengine/tests/test_long_short_term_memory.py +++ b/application/predictionengine/tests/test_long_short_term_memory.py @@ -52,7 +52,7 @@ def test_lstm_different_sequence_lengths() -> None: input_tensor = Tensor(rng.standard_normal((2, seq_len, 8))) output, hidden_state = lstm.forward(input_tensor) - assert output.shape == (2, seq_len, 16) + assert output.shape == (2, sequence_length, 16) def test_lstm_multiple_layers() -> None: diff --git a/application/predictionengine/tests/test_multi_head_self_attention.py b/application/predictionengine/tests/test_multi_head_self_attention.py index 692f99b2c..2872c9a38 100644 --- a/application/predictionengine/tests/test_multi_head_self_attention.py +++ b/application/predictionengine/tests/test_multi_head_self_attention.py @@ -61,7 +61,7 @@ def test_multi_head_attention_longer_sequences() -> None: input_tensor = Tensor(rng.standard_normal((1, seq_len, 64))) output, _ = attention.forward(input_tensor) - assert output.shape == (1, seq_len, 64) + assert output.shape == (1, sequence_length, 64) def test_multi_head_attention_batch_processing() -> None: diff --git a/workflows/prediction_model.py b/workflows/prediction_model.py deleted file mode 100644 index c381e0c05..000000000 --- a/workflows/prediction_model.py +++ /dev/null @@ -1,48 +0,0 @@ -import os -import pickle -import statistics -import uuid -from datetime import datetime -from pathlib import Path -from typing import Any - -import requests -from flytekit import task, workflow - - -@task -def fetch_data(start_date: datetime, end_date: datetime) -> list[dict[str, Any]]: - base_url = os.getenv("DATAMANAGER_BASE_URL", "http://localhost:8080") - response = requests.get( - f"{base_url}/equity-bars", - params={"start_date": start_date.isoformat(), "end_date": end_date.isoformat()}, - timeout=10, - ) - response.raise_for_status() - return response.json().get("data", []) - - -@task -def train_dummy_model(data: list[dict[str, Any]]) -> bytes: - """Train a trivial model that stores the average close price.""" - close_prices = [row.get("close_price", 0.0) for row in data] - mean_close = statistics.mean(close_prices) if close_prices else 0.0 - model = {"average_close_price": mean_close} - return pickle.dumps(model) - - -@task -def store_model(model_bytes: bytes) -> str: - """Store the serialized model in blob storage.""" - bucket_path = os.getenv("MODEL_BUCKET") - filename = f"model-{uuid.uuid4().hex}.pkl" - path = Path(bucket_path) / filename - path.write_bytes(model_bytes) - return str(path) - - -@workflow -def training_workflow(start_date: datetime, end_date: datetime) -> None: - data = fetch_data(start_date=start_date, end_date=end_date) - model_bytes = train_dummy_model(data=data) - store_model(model_bytes=model_bytes) From 7a2cb67f119e5045e48094678f714d92a58fe7c6 Mon Sep 17 00:00:00 2001 From: chrisaddy Date: Fri, 6 Jun 2025 10:26:11 -0400 Subject: [PATCH 11/17] fix some linting issues fix linting From 3a43f9867a0c2e11c927dd9f7e20f88d982fd9f8 Mon Sep 17 00:00:00 2001 From: chrisaddy Date: Fri, 6 Jun 2025 10:42:06 -0400 Subject: [PATCH 12/17] fix linting --- application/positionmanager/pyproject.toml | 2 +- .../predictionengine/src/predictionengine/post_processor.py | 2 -- .../predictionengine/tests/test_long_short_term_memory.py | 2 +- .../predictionengine/tests/test_multi_head_self_attention.py | 2 +- infrastructure/pyproject.toml | 2 +- pyproject.toml | 2 +- workflows/pyproject.toml | 2 +- 7 files changed, 6 insertions(+), 8 deletions(-) diff --git a/application/positionmanager/pyproject.toml b/application/positionmanager/pyproject.toml index 2d1bf4840..f4ba3d169 100644 --- a/application/positionmanager/pyproject.toml +++ b/application/positionmanager/pyproject.toml @@ -2,7 +2,7 @@ name = "positionmanager" version = "0.1.0" description = "Position management service" -requires-python = "==3.13" +requires-python = "==3.12.10" dependencies = [ "fastapi>=0.115.12", "uvicorn>=0.34.2", diff --git a/application/predictionengine/src/predictionengine/post_processor.py b/application/predictionengine/src/predictionengine/post_processor.py index f82ab737c..3d832159d 100644 --- a/application/predictionengine/src/predictionengine/post_processor.py +++ b/application/predictionengine/src/predictionengine/post_processor.py @@ -1,5 +1,3 @@ -from typing import Any - import numpy as np import numpy.typing as npt import polars as pl diff --git a/application/predictionengine/tests/test_long_short_term_memory.py b/application/predictionengine/tests/test_long_short_term_memory.py index f0d59d4a9..2e08fbba9 100644 --- a/application/predictionengine/tests/test_long_short_term_memory.py +++ b/application/predictionengine/tests/test_long_short_term_memory.py @@ -52,7 +52,7 @@ def test_lstm_different_sequence_lengths() -> None: input_tensor = Tensor(rng.standard_normal((2, seq_len, 8))) output, hidden_state = lstm.forward(input_tensor) - assert output.shape == (2, sequence_length, 16) + assert output.shape == (2, seq_len, 16) def test_lstm_multiple_layers() -> None: diff --git a/application/predictionengine/tests/test_multi_head_self_attention.py b/application/predictionengine/tests/test_multi_head_self_attention.py index 2872c9a38..692f99b2c 100644 --- a/application/predictionengine/tests/test_multi_head_self_attention.py +++ b/application/predictionengine/tests/test_multi_head_self_attention.py @@ -61,7 +61,7 @@ def test_multi_head_attention_longer_sequences() -> None: input_tensor = Tensor(rng.standard_normal((1, seq_len, 64))) output, _ = attention.forward(input_tensor) - assert output.shape == (1, sequence_length, 64) + assert output.shape == (1, seq_len, 64) def test_multi_head_attention_batch_processing() -> None: diff --git a/infrastructure/pyproject.toml b/infrastructure/pyproject.toml index 3f854f5ca..675653569 100644 --- a/infrastructure/pyproject.toml +++ b/infrastructure/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "infrastructure" version = "20250602.4" -requires-python = ">=3.13" +requires-python = "==3.12.10" dependencies = [ "pulumi>=3.169.0", "pulumi-gcp>=8.30.1", diff --git a/pyproject.toml b/pyproject.toml index 0ee81df90..89530f844 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ name = "pocketsizefund" version = "20250602.4" description = "Open source quantitative hedge fund 🍊" -requires-python = ">=3.12" +requires-python = "==3.12.10" dependencies = [ "flytekit>=1.15.4", "httpx>=0.28.1", diff --git a/workflows/pyproject.toml b/workflows/pyproject.toml index ff5006991..2b0e6010f 100644 --- a/workflows/pyproject.toml +++ b/workflows/pyproject.toml @@ -2,7 +2,7 @@ name = "workflows" description = "Data and model workflows" version = "0.1.0" -requires-python = ">=3.13" +requires-python = "==3.12.10" dependencies = [ "flytekit>=1.10.0", "httpx>=0.28.1", From a75adf06bcb75fbc8e1fe74b493f4e3d2a29239f Mon Sep 17 00:00:00 2001 From: John Forstmeier Date: Mon, 2 Jun 2025 21:26:45 -0400 Subject: [PATCH 13/17] Fix inter-service communication patterns --- .../positionmanager/src/positionmanager/clients.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/application/positionmanager/src/positionmanager/clients.py b/application/positionmanager/src/positionmanager/clients.py index ac5c5a91f..a8a1af54f 100644 --- a/application/positionmanager/src/positionmanager/clients.py +++ b/application/positionmanager/src/positionmanager/clients.py @@ -89,9 +89,12 @@ def get_data( message = f"Data manager service call error: {err}" raise RuntimeError(message) from err - if not response.ok: - message = f"Data service error: {response.text}, status code: {response.status_code}" # noqa: E501 - raise requests.HTTPError(message) + if response.status_code == 404: + return pl.DataFrame() + elif response.status_code != 200: + raise Exception( + f"Data service error: {response.text}, status code: {response.status_code}", + ) buffer = pa.py_buffer(response.content) reader = pa.ipc.RecordBatchStreamReader(buffer) From 494b0a506abebef8a6519abd67aaa987199463c1 Mon Sep 17 00:00:00 2001 From: John Forstmeier Date: Wed, 4 Jun 2025 20:36:37 -0400 Subject: [PATCH 14/17] Integrate predictionengine into the workflows --- .../src/positionmanager/clients.py | 9 ++-- .../src/predictionengine/main.py | 5 +- .../src/predictionengine/post_processor.py | 6 ++- .../predictionengine/tests/test_dataset.py | 16 ++----- .../tests/test_long_short_term_memory.py | 2 +- .../tests/test_multi_head_self_attention.py | 2 +- workflows/prediction_model.py | 48 ------------------- 7 files changed, 18 insertions(+), 70 deletions(-) delete mode 100644 workflows/prediction_model.py diff --git a/application/positionmanager/src/positionmanager/clients.py b/application/positionmanager/src/positionmanager/clients.py index a8a1af54f..423c85754 100644 --- a/application/positionmanager/src/positionmanager/clients.py +++ b/application/positionmanager/src/positionmanager/clients.py @@ -89,11 +89,12 @@ def get_data( message = f"Data manager service call error: {err}" raise RuntimeError(message) from err - if response.status_code == 404: + if response.status_code == requests.codes.not_found: return pl.DataFrame() - elif response.status_code != 200: - raise Exception( - f"Data service error: {response.text}, status code: {response.status_code}", + if response.status_code != requests.codes.ok: + message = f"Data service error: {response.text}, status code: {response.status_code}" # noqa: E501 + raise requests.HTTPError( + message, ) buffer = pa.py_buffer(response.content) diff --git a/application/predictionengine/src/predictionengine/main.py b/application/predictionengine/src/predictionengine/main.py index bf0861288..9da95c7f8 100644 --- a/application/predictionengine/src/predictionengine/main.py +++ b/application/predictionengine/src/predictionengine/main.py @@ -88,8 +88,9 @@ def load_or_initialize_model(data: pl.DataFrame) -> MiniatureTemporalFusionTrans try: model.load(model_path) logger.info("Loaded existing model weights") - except LoadError as e: - logger.error(f"Failed to load model weights: {e}") + except Exception as e: # noqa: BLE001 + logger.warning(f"Failed to load model weights: {e}") + logger.warning(f"Failed to load model weights: {e}") return model diff --git a/application/predictionengine/src/predictionengine/post_processor.py b/application/predictionengine/src/predictionengine/post_processor.py index 7f07c2409..f82ab737c 100644 --- a/application/predictionengine/src/predictionengine/post_processor.py +++ b/application/predictionengine/src/predictionengine/post_processor.py @@ -1,3 +1,5 @@ +from typing import Any + import numpy as np import numpy.typing as npt import polars as pl @@ -44,8 +46,8 @@ def post_process_predictions( ticker not in self.means_by_ticker or ticker not in self.standard_deviations_by_ticker ): - msg = f"Statistics not found for ticker: {ticker}" - raise ValueError(msg) + message = f"Statistics not found for ticker: {ticker}" + raise ValueError(message) mean = self.means_by_ticker[ticker].numpy() standard_deviation = self.standard_deviations_by_ticker[ticker].numpy() diff --git a/application/predictionengine/tests/test_dataset.py b/application/predictionengine/tests/test_dataset.py index 9d3b804c0..64f8cf20a 100644 --- a/application/predictionengine/tests/test_dataset.py +++ b/application/predictionengine/tests/test_dataset.py @@ -13,18 +13,10 @@ def test_dataset_initialization() -> None: sample_count=3, ) - class Expected(NamedTuple): - batch_size: int = 2 - sequence_length: int = 3 - sample_count: int = 3 - observations: int = 2 - - expected = Expected() - - assert dataset.batch_size == expected.batch_size - assert dataset.sequence_length == expected.sequence_length - assert dataset.sample_count == expected.sample_count - assert len(dataset) == expected.observations + assert dataset.batch_size == 2 # noqa: PLR2004 + assert dataset.sequence_length == 3 # noqa: PLR2004 + assert dataset.sample_count == 3 # noqa: PLR2004 + assert len(dataset) == 2 # noqa: PLR2004 def test_dataset_load_data() -> None: diff --git a/application/predictionengine/tests/test_long_short_term_memory.py b/application/predictionengine/tests/test_long_short_term_memory.py index 2e08fbba9..f0d59d4a9 100644 --- a/application/predictionengine/tests/test_long_short_term_memory.py +++ b/application/predictionengine/tests/test_long_short_term_memory.py @@ -52,7 +52,7 @@ def test_lstm_different_sequence_lengths() -> None: input_tensor = Tensor(rng.standard_normal((2, seq_len, 8))) output, hidden_state = lstm.forward(input_tensor) - assert output.shape == (2, seq_len, 16) + assert output.shape == (2, sequence_length, 16) def test_lstm_multiple_layers() -> None: diff --git a/application/predictionengine/tests/test_multi_head_self_attention.py b/application/predictionengine/tests/test_multi_head_self_attention.py index 692f99b2c..2872c9a38 100644 --- a/application/predictionengine/tests/test_multi_head_self_attention.py +++ b/application/predictionengine/tests/test_multi_head_self_attention.py @@ -61,7 +61,7 @@ def test_multi_head_attention_longer_sequences() -> None: input_tensor = Tensor(rng.standard_normal((1, seq_len, 64))) output, _ = attention.forward(input_tensor) - assert output.shape == (1, seq_len, 64) + assert output.shape == (1, sequence_length, 64) def test_multi_head_attention_batch_processing() -> None: diff --git a/workflows/prediction_model.py b/workflows/prediction_model.py deleted file mode 100644 index c381e0c05..000000000 --- a/workflows/prediction_model.py +++ /dev/null @@ -1,48 +0,0 @@ -import os -import pickle -import statistics -import uuid -from datetime import datetime -from pathlib import Path -from typing import Any - -import requests -from flytekit import task, workflow - - -@task -def fetch_data(start_date: datetime, end_date: datetime) -> list[dict[str, Any]]: - base_url = os.getenv("DATAMANAGER_BASE_URL", "http://localhost:8080") - response = requests.get( - f"{base_url}/equity-bars", - params={"start_date": start_date.isoformat(), "end_date": end_date.isoformat()}, - timeout=10, - ) - response.raise_for_status() - return response.json().get("data", []) - - -@task -def train_dummy_model(data: list[dict[str, Any]]) -> bytes: - """Train a trivial model that stores the average close price.""" - close_prices = [row.get("close_price", 0.0) for row in data] - mean_close = statistics.mean(close_prices) if close_prices else 0.0 - model = {"average_close_price": mean_close} - return pickle.dumps(model) - - -@task -def store_model(model_bytes: bytes) -> str: - """Store the serialized model in blob storage.""" - bucket_path = os.getenv("MODEL_BUCKET") - filename = f"model-{uuid.uuid4().hex}.pkl" - path = Path(bucket_path) / filename - path.write_bytes(model_bytes) - return str(path) - - -@workflow -def training_workflow(start_date: datetime, end_date: datetime) -> None: - data = fetch_data(start_date=start_date, end_date=end_date) - model_bytes = train_dummy_model(data=data) - store_model(model_bytes=model_bytes) From 973ad6b2614f62dcfde3d9fec39391c03fedc4f3 Mon Sep 17 00:00:00 2001 From: chrisaddy Date: Fri, 6 Jun 2025 10:47:13 -0400 Subject: [PATCH 15/17] remove vars --- .flox/env/manifest.toml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/.flox/env/manifest.toml b/.flox/env/manifest.toml index 993d21e53..82807107a 100644 --- a/.flox/env/manifest.toml +++ b/.flox/env/manifest.toml @@ -12,14 +12,6 @@ nushell.pkg-path = "nushell" fd.pkg-path = "fd" fselect.pkg-path = "fselect" -[vars] -ALPACA_API_KEY = "${ALPACA_API_KEY}" -ALPACA_API_SECRET = "${ALPACA_API_SECRET}" -POLYGON_API_KEY = "${POLYGON_API_KEY}" -DATA_BUCKET = "${DATA_BUCKET}" -DUCKDB_ACCESS_KEY = "${DUCKDB_ACCESS_KEY}" -DUCKDB_SECRET = "${DUCKDB_SECRET}" - [options] systems = [ "aarch64-darwin", From 70739d69deebcd4c69d0b5b751cf50a0b59d7d18 Mon Sep 17 00:00:00 2001 From: John Forstmeier Date: Fri, 6 Jun 2025 10:49:34 -0400 Subject: [PATCH 16/17] Add linting fixes --- .../predictionengine/src/predictionengine/post_processor.py | 2 -- .../predictionengine/tests/test_long_short_term_memory.py | 4 ++-- .../predictionengine/tests/test_multi_head_self_attention.py | 4 ++-- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/application/predictionengine/src/predictionengine/post_processor.py b/application/predictionengine/src/predictionengine/post_processor.py index f82ab737c..3d832159d 100644 --- a/application/predictionengine/src/predictionengine/post_processor.py +++ b/application/predictionengine/src/predictionengine/post_processor.py @@ -1,5 +1,3 @@ -from typing import Any - import numpy as np import numpy.typing as npt import polars as pl diff --git a/application/predictionengine/tests/test_long_short_term_memory.py b/application/predictionengine/tests/test_long_short_term_memory.py index f0d59d4a9..bebabf759 100644 --- a/application/predictionengine/tests/test_long_short_term_memory.py +++ b/application/predictionengine/tests/test_long_short_term_memory.py @@ -48,8 +48,8 @@ def test_lstm_different_sequence_lengths() -> None: input_size=8, hidden_size=16, layer_count=1, dropout_rate=0.0 ) - for seq_len in [5, 10, 20]: - input_tensor = Tensor(rng.standard_normal((2, seq_len, 8))) + for sequence_length in [5, 10, 20]: + input_tensor = Tensor(rng.standard_normal((2, sequence_length, 8))) output, hidden_state = lstm.forward(input_tensor) assert output.shape == (2, sequence_length, 16) diff --git a/application/predictionengine/tests/test_multi_head_self_attention.py b/application/predictionengine/tests/test_multi_head_self_attention.py index 2872c9a38..11a257a22 100644 --- a/application/predictionengine/tests/test_multi_head_self_attention.py +++ b/application/predictionengine/tests/test_multi_head_self_attention.py @@ -57,8 +57,8 @@ def test_multi_head_attention_single_sequence() -> None: def test_multi_head_attention_longer_sequences() -> None: attention = MultiHeadSelfAttention(heads_count=4, embedding_size=64) - for seq_len in [10, 20, 50]: - input_tensor = Tensor(rng.standard_normal((1, seq_len, 64))) + for sequence_length in [10, 20, 50]: + input_tensor = Tensor(rng.standard_normal((1, sequence_length, 64))) output, _ = attention.forward(input_tensor) assert output.shape == (1, sequence_length, 64) From e0bbcb64251662024e8d2e32202fb2f43a716785 Mon Sep 17 00:00:00 2001 From: John Forstmeier Date: Fri, 6 Jun 2025 11:07:30 -0400 Subject: [PATCH 17/17] Fix CodeRabbit feedback --- .../positionmanager/src/positionmanager/clients.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/application/positionmanager/src/positionmanager/clients.py b/application/positionmanager/src/positionmanager/clients.py index 423c85754..20094faf4 100644 --- a/application/positionmanager/src/positionmanager/clients.py +++ b/application/positionmanager/src/positionmanager/clients.py @@ -89,12 +89,13 @@ def get_data( message = f"Data manager service call error: {err}" raise RuntimeError(message) from err - if response.status_code == requests.codes.not_found: + if response.status_code == requests.codes["no_content"]: return pl.DataFrame() - if response.status_code != requests.codes.ok: + if response.status_code != requests.codes["ok"]: message = f"Data service error: {response.text}, status code: {response.status_code}" # noqa: E501 raise requests.HTTPError( message, + response=response, ) buffer = pa.py_buffer(response.content) @@ -103,7 +104,9 @@ def get_data( data = pl.DataFrame(pl.from_arrow(table)) - data = data.with_columns(pl.col("t").cast(pl.Datetime).dt.date().alias("date")) + data = data.with_columns( + pl.col("datetime").cast(pl.Datetime).dt.date().alias("date") + ) return ( data.sort("date")