diff --git a/.flox/env/manifest.lock b/.flox/env/manifest.lock index 964e64525..ff15369b9 100644 --- a/.flox/env/manifest.lock +++ b/.flox/env/manifest.lock @@ -12,9 +12,6 @@ "pulumi-python": { "pkg-path": "pulumiPackages.pulumi-python" }, - "pulumictl": { - "pkg-path": "pulumictl" - }, "ruff": { "pkg-path": "ruff" }, @@ -398,126 +395,6 @@ "group": "toplevel", "priority": 5 }, - { - "attr_path": "pulumictl", - "broken": false, - "derivation": "/nix/store/kx43jzcfslw28byvs6h5ngsgl432pvvv-pulumictl-0.0.49.drv", - "description": "Swiss Army Knife for Pulumi Development", - "install_id": "pulumictl", - "license": "Apache-2.0", - "locked_url": "https://github.com/flox/nixpkgs?rev=979daf34c8cacebcd917d540070b52a3c2b9b16e", - "name": "pulumictl-0.0.49", - "pname": "pulumictl", - "rev": "979daf34c8cacebcd917d540070b52a3c2b9b16e", - "rev_count": 793735, - "rev_date": "2025-05-04T03:14:55Z", - "scrape_date": "2025-05-05T04:19:37.687142Z", - "stabilities": [ - "staging", - "unstable" - ], - "unfree": false, - "version": "0.0.49", - "outputs_to_install": [ - "out" - ], - "outputs": { - "out": "/nix/store/ny69c9bfkf4w179240ch45injfb2ajqr-pulumictl-0.0.49" - }, - "system": "aarch64-darwin", - "group": "toplevel", - "priority": 5 - }, - { - "attr_path": "pulumictl", - "broken": false, - "derivation": "/nix/store/www9nfncvv7l339n8dks22x5vs5lz1mk-pulumictl-0.0.49.drv", - "description": "Swiss Army Knife for Pulumi Development", - "install_id": "pulumictl", - "license": "Apache-2.0", - "locked_url": "https://github.com/flox/nixpkgs?rev=979daf34c8cacebcd917d540070b52a3c2b9b16e", - "name": "pulumictl-0.0.49", - "pname": "pulumictl", - "rev": "979daf34c8cacebcd917d540070b52a3c2b9b16e", - "rev_count": 793735, - "rev_date": "2025-05-04T03:14:55Z", - "scrape_date": "2025-05-05T04:37:42.118866Z", - "stabilities": [ - "staging", - "unstable" - ], - "unfree": false, - "version": "0.0.49", - "outputs_to_install": [ - "out" - ], - "outputs": { - "out": "/nix/store/xpdh5dijdki4cngh7k7n4rg84i6c28zs-pulumictl-0.0.49" - }, - "system": "aarch64-linux", - "group": "toplevel", - "priority": 5 - }, - { - "attr_path": "pulumictl", - "broken": false, - "derivation": "/nix/store/17wf5x1kk3v5ch5npwhamnix629y07wg-pulumictl-0.0.49.drv", - "description": "Swiss Army Knife for Pulumi Development", - "install_id": "pulumictl", - "license": "Apache-2.0", - "locked_url": "https://github.com/flox/nixpkgs?rev=979daf34c8cacebcd917d540070b52a3c2b9b16e", - "name": "pulumictl-0.0.49", - "pname": "pulumictl", - "rev": "979daf34c8cacebcd917d540070b52a3c2b9b16e", - "rev_count": 793735, - "rev_date": "2025-05-04T03:14:55Z", - "scrape_date": "2025-05-05T04:54:38.447587Z", - "stabilities": [ - "staging", - "unstable" - ], - "unfree": false, - "version": "0.0.49", - "outputs_to_install": [ - "out" - ], - "outputs": { - "out": "/nix/store/6wmig1w7f3vmfrlyg2qzv21bvacj3as8-pulumictl-0.0.49" - }, - "system": "x86_64-darwin", - "group": "toplevel", - "priority": 5 - }, - { - "attr_path": "pulumictl", - "broken": false, - "derivation": "/nix/store/ib7hqxg7xdf5kyh78jqggzdcs97q1224-pulumictl-0.0.49.drv", - "description": "Swiss Army Knife for Pulumi Development", - "install_id": "pulumictl", - "license": "Apache-2.0", - "locked_url": "https://github.com/flox/nixpkgs?rev=979daf34c8cacebcd917d540070b52a3c2b9b16e", - "name": "pulumictl-0.0.49", - "pname": "pulumictl", - "rev": "979daf34c8cacebcd917d540070b52a3c2b9b16e", - "rev_count": 793735, - "rev_date": "2025-05-04T03:14:55Z", - "scrape_date": "2025-05-05T05:16:19.858098Z", - "stabilities": [ - "staging", - "unstable" - ], - "unfree": false, - "version": "0.0.49", - "outputs_to_install": [ - "out" - ], - "outputs": { - "out": "/nix/store/rmh9mjkxijxcc7cvjhsqc9657fbw0yyg-pulumictl-0.0.49" - }, - "system": "x86_64-linux", - "group": "toplevel", - "priority": 5 - }, { "attr_path": "ruff", "broken": false, diff --git a/application/datamanager/features/environment.py b/application/datamanager/features/environment.py index 8385cc9d8..796fe865c 100644 --- a/application/datamanager/features/environment.py +++ b/application/datamanager/features/environment.py @@ -4,5 +4,4 @@ def before_all(context: Context) -> None: - """Set up test environment.""" context.base_url = os.environ.get("BASE_URL", "http://datamanager:8080") diff --git a/application/datamanager/pyproject.toml b/application/datamanager/pyproject.toml index a18dd7b95..64ecf54dd 100644 --- a/application/datamanager/pyproject.toml +++ b/application/datamanager/pyproject.toml @@ -22,3 +22,8 @@ packages = ["datamanager"] [build-system] requires = ["hatchling"] build-backend = "hatchling.build" + +[dependency-groups] +dev = [ + "behave>=1.2.6", +] diff --git a/application/datamanager/src/datamanager/config.py b/application/datamanager/src/datamanager/config.py index fa234e05a..089c640bc 100644 --- a/application/datamanager/src/datamanager/config.py +++ b/application/datamanager/src/datamanager/config.py @@ -19,7 +19,8 @@ class Bucket(BaseModel): @computed_field def daily_bars_path(self) -> str: if self.name is None: - raise ValueError("DATA_BUCKET environment variable is required") + msg = "DATA_BUCKET environment variable is required" + raise ValueError(msg) return f"gs://{self.name}/equity/bars/" diff --git a/application/datamanager/src/datamanager/main.py b/application/datamanager/src/datamanager/main.py index 6f1019059..9de041380 100644 --- a/application/datamanager/src/datamanager/main.py +++ b/application/datamanager/src/datamanager/main.py @@ -47,7 +47,7 @@ def bars_query(*, bucket: str, start_date: date, end_date: date) -> str: @asynccontextmanager -async def lifespan(app: FastAPI) -> AsyncGenerator[None]: +async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: app.state.settings = Settings() app.state.bucket = storage.Client(os.getenv("GCP_PROJECT")).bucket( app.state.settings.gcp.bucket.name, diff --git a/application/positionmanager/pyproject.toml b/application/positionmanager/pyproject.toml index c2d2de6f1..0dac627de 100644 --- a/application/positionmanager/pyproject.toml +++ b/application/positionmanager/pyproject.toml @@ -13,6 +13,7 @@ dependencies = [ "pandas>=2.1.0", "pyportfolioopt>=1.5.6", "ecos>=2.0.14", + "prometheus-fastapi-instrumentator>=7.1.0", ] [tool.hatch.build.targets.wheel] diff --git a/application/positionmanager/src/positionmanager/clients.py b/application/positionmanager/src/positionmanager/clients.py index fb5daddad..8850d8390 100644 --- a/application/positionmanager/src/positionmanager/clients.py +++ b/application/positionmanager/src/positionmanager/clients.py @@ -13,8 +13,6 @@ class AlpacaClient: def __init__( self, *, - api_key: str | None = "", - api_secret: str | None = "", api_key: str | None = None, api_secret: str | None = None, paper: bool = True, @@ -23,14 +21,17 @@ def __init__( msg = "Alpaca API key and secret are required" raise ValueError(msg) - self.trading_client = TradingClient(api_key, api_secret, paper=paper) + self.trading_client: TradingClient = TradingClient( + api_key, api_secret, paper=paper + ) def get_cash_balance(self) -> Money: account = self.trading_client.get_account() cash_balance = getattr(account, "cash", None) if cash_balance is None: - raise ValueError("Cash balance is not available") + msg = "Cash balance is not available" + raise ValueError(msg) return Money.from_float(float(cash_balance)) diff --git a/application/positionmanager/src/positionmanager/main.py b/application/positionmanager/src/positionmanager/main.py index a64764b4b..1a7670c08 100644 --- a/application/positionmanager/src/positionmanager/main.py +++ b/application/positionmanager/src/positionmanager/main.py @@ -1,11 +1,6 @@ import os -from datetime import datetime, timedelta, timezone -import polars as pl -from typing import Dict, Any -from .models import Money, DateRange, PredictionPayload -from .clients import AlpacaClient, DataClient -from .portfolio import PortfolioOptimizer -from prometheus_fastapi_instrumentator import Instrumentator +from datetime import UTC, datetime, timedelta +from typing import Any import polars as pl import requests diff --git a/application/predictionengine/compose.yaml b/application/predictionengine/compose.yaml new file mode 100644 index 000000000..b05558070 --- /dev/null +++ b/application/predictionengine/compose.yaml @@ -0,0 +1,20 @@ +name: predictionengine integration tests + +services: + predictionengine: + build: + context: . + dockerfile: Dockerfile + ports: + - 8080:8080 + environment: + - DATAMANAGER_BASE_URL=${DATAMANAGER_BASE_URL} + volumes: + - ./:/app/predictionengine + - ~/.config/gcloud/application_default_credentials.json:/root/.config/gcloud/application_default_credentials.json:ro + healthcheck: + test: ["CMD", "curl", "-f", "http://0.0.0.0:8080/health"] + interval: 10s + timeout: 5s + retries: 3 + start_period: 1s diff --git a/application/predictionengine/miniature_temporal_fusion_transformer.safetensor b/application/predictionengine/miniature_temporal_fusion_transformer.safetensor new file mode 100644 index 000000000..e69de29bb diff --git a/application/predictionengine/pyproject.toml b/application/predictionengine/pyproject.toml index 930c98b58..ed10cef5d 100644 --- a/application/predictionengine/pyproject.toml +++ b/application/predictionengine/pyproject.toml @@ -10,6 +10,8 @@ dependencies = [ "polars>=1.29.0", "category-encoders>=2.8.1", "requests>=2.31.0", + "prometheus-fastapi-instrumentator>=7.1.0", + "loguru>=0.7.3", ] [tool.hatch.build.targets.wheel] diff --git a/application/predictionengine/src/predictionengine/dataset.py b/application/predictionengine/src/predictionengine/dataset.py index a77761e09..a87fcf4aa 100644 --- a/application/predictionengine/src/predictionengine/dataset.py +++ b/application/predictionengine/src/predictionengine/dataset.py @@ -1,8 +1,9 @@ -from typing import Dict, List, Any, Tuple, Generator -from tinygrad.tensor import Tensor +from collections.abc import Generator +from typing import Any + import polars as pl from category_encoders import OrdinalEncoder - +from tinygrad.tensor import Tensor continuous_variable_columns = [ "open_price", @@ -20,13 +21,13 @@ def __init__( batch_size: int, sequence_length: int, sample_count: int, - scalers: Dict[str, Dict[str, Tensor]] = {}, + scalers: dict[str, dict[str, Tensor]] | None = None, ) -> None: - self.batch_size = batch_size - self.sequence_length = sequence_length - self.sample_count = sample_count - self.scalers = scalers if scalers is not None else {} - self.preprocessors: Dict[str, Any] = {} + self.batch_size: int = batch_size + self.sequence_length: int = sequence_length + self.sample_count: int = sample_count + self.scalers: dict[str, dict[str, Tensor]] = scalers or {} + self.preprocessors: dict[str, Any] = {} def __len__(self) -> int: return (self.sample_count + self.batch_size - 1) // self.batch_size @@ -106,7 +107,7 @@ def _encode_tickers(self, data: pl.DataFrame) -> pl.DataFrame: def _compute_scalers(self, data: pl.DataFrame) -> None: if len(self.scalers) == 0: - self.scalers: Dict[str, Dict[str, Tensor]] = {} + self.scalers: dict[str, dict[str, Tensor]] = {} for ticker_key, group in data.group_by("ticker"): ticker = ticker_key[0] means = group[continuous_variable_columns].mean() @@ -118,7 +119,7 @@ def _compute_scalers(self, data: pl.DataFrame) -> None: } def _scale_data(self, data: pl.DataFrame) -> Tensor: - groups: List[Tensor] = [] + groups: list[Tensor] = [] for ticker_key, group in data.group_by("ticker"): ticker = ticker_key[0] means = self.scalers[str(ticker)]["means"] @@ -133,7 +134,8 @@ def _scale_data(self, data: pl.DataFrame) -> Tensor: groups.append(combined_group) if not groups: - raise ValueError("No data available after preprocessing") + msg = "No data available after preprocessing" + raise ValueError(msg) output_data = Tensor.empty(groups[0].shape) return output_data.cat(*groups, dim=0) @@ -150,9 +152,10 @@ def load_data(self, data: pl.DataFrame) -> None: self._compute_scalers(data) self.data = self._scale_data(data) - def get_preprocessors(self) -> Dict[str, Any]: + def get_preprocessors(self) -> dict[str, Any]: if not self.preprocessors: - raise ValueError("Preprocessors have not been initialized.") + msg = "Preprocessors have not been initialized." + raise ValueError(msg) means_by_ticker = { ticker: values["means"] for ticker, values in self.scalers.items() @@ -169,7 +172,7 @@ def get_preprocessors(self) -> Dict[str, Any]: "indices": self.preprocessors["indices"], } - def batches(self) -> Generator[Tuple[Tensor, Tensor, Tensor], None, None]: + def batches(self) -> Generator[tuple[Tensor, Tensor, Tensor], None, None]: close_price_idx = self.preprocessors["indices"]["close_price"] for i in range(0, self.sample_count, self.batch_size): @@ -193,9 +196,8 @@ def batches(self) -> Generator[Tuple[Tensor, Tensor, Tensor], None, None]: ] if not batch_tensors: - raise ValueError( - "Cannot stack empty batch tensors (batch_size must be ≥ 1)" - ) + msg = "Cannot stack empty batch tensors (batch_size must be ≥ 1)" + raise ValueError(msg) if len(batch_tensors) == 1: historical_features = batch_tensors[0].unsqueeze(0) else: diff --git a/application/predictionengine/src/predictionengine/gated_residual_network.py b/application/predictionengine/src/predictionengine/gated_residual_network.py index 13ea9e2cb..617399692 100644 --- a/application/predictionengine/src/predictionengine/gated_residual_network.py +++ b/application/predictionengine/src/predictionengine/gated_residual_network.py @@ -1,7 +1,7 @@ from typing import cast + +from tinygrad.nn import LayerNorm, Linear from tinygrad.tensor import Tensor -from tinygrad.nn import Linear, LayerNorm -from typing import Optional class GatedResidualNetwork: @@ -9,7 +9,7 @@ def __init__( self, input_size: int, hidden_size: int, - output_size: Optional[int] = None, + output_size: int | None = None, ) -> None: output_size = output_size if output_size is not None else input_size @@ -30,18 +30,18 @@ def __init__( def forward( self, - input: Tensor, + input_: Tensor, ) -> Tensor: - hidden_state = self.dense_input(input).relu() + hidden_state = self.dense_input(input_).relu() output_state = self.dense_output(hidden_state) gate_state = self.gate(hidden_state).sigmoid() if self.residual_projection is not None: - residual = self.residual_projection(input) + residual = self.residual_projection(input_) else: - residual = input + residual = input_ - gated_output = cast(Tensor, gate_state * output_state + residual) + gated_output = cast("Tensor", gate_state * output_state + residual) return self.layer_normalizer(gated_output) diff --git a/application/predictionengine/src/predictionengine/long_short_term_memory.py b/application/predictionengine/src/predictionengine/long_short_term_memory.py index f3e4420cd..671d78e7f 100644 --- a/application/predictionengine/src/predictionengine/long_short_term_memory.py +++ b/application/predictionengine/src/predictionengine/long_short_term_memory.py @@ -1,6 +1,5 @@ -from typing import List, Tuple -from tinygrad.tensor import Tensor from tinygrad.nn import LSTMCell +from tinygrad.tensor import Tensor class LongShortTermMemory: @@ -15,16 +14,16 @@ def __init__( self.layer_count = layer_count self.dropout_rate = dropout_rate - self.layers: List[LSTMCell] = [] + self.layers: list[LSTMCell] = [] for index in range(layer_count): input_size = input_size if index == 0 else self.hidden_size self.layers.append(LSTMCell(input_size, self.hidden_size)) def forward( self, - input: Tensor, - ) -> Tuple[Tensor, Tuple[Tensor, Tensor]]: - batch_size, sequence_length, _ = input.shape + input_: Tensor, + ) -> tuple[Tensor, tuple[Tensor, Tensor]]: + batch_size, sequence_length, _ = input_.shape hidden_state = Tensor.zeros( self.layer_count, batch_size, self.hidden_size @@ -36,7 +35,7 @@ def forward( outputs = [] for t in range(int(sequence_length)): - layer_input = input[:, t] + layer_input = input_[:, t] for index, layer in enumerate(self.layers): layer_hidden_state, layer_cell_state = layer( @@ -59,8 +58,10 @@ def forward( outputs.append(hidden_state[-1]) if not outputs: - raise ValueError("Cannot stack empty outputs list") - elif len(outputs) == 1: + msg = "Cannot stack empty outputs list" + raise ValueError(msg) + + if len(outputs) == 1: output_tensor = outputs[0].unsqueeze(1) else: output_tensor = Tensor.stack(outputs[0], *outputs[1:], dim=1) diff --git a/application/predictionengine/src/predictionengine/loss_function.py b/application/predictionengine/src/predictionengine/loss_function.py index 66479e77a..0d3603305 100644 --- a/application/predictionengine/src/predictionengine/loss_function.py +++ b/application/predictionengine/src/predictionengine/loss_function.py @@ -1,6 +1,7 @@ -from tinygrad.tensor import Tensor from typing import cast +from tinygrad.tensor import Tensor + Quantiles = tuple[float, float, float] | tuple[float, float, float, float, float] @@ -11,18 +12,18 @@ def quantile_loss( quantiles = (0.25, 0.5, 0.75) if y_pred.shape != y_true.shape: - raise ValueError( - f"Shape mismatch: y_pred {y_pred.shape} vs y_true {y_true.shape}" - ) + msg = f"Shape mismatch: y_pred {y_pred.shape} vs y_true {y_true.shape}" + raise ValueError(msg) if not all(0 <= q <= 1 for q in quantiles): - raise ValueError("All quantiles must be between 0 and 1") + msg = "All quantiles must be between 0 and 1" + raise ValueError(msg) loss: Tensor = Tensor.zeros(1) - error = cast(Tensor, y_true - y_pred) + error = cast("Tensor", y_true - y_pred) for quantile in quantiles: - quantile_error = cast(Tensor, quantile * error) - quantile_minus_one_error = cast(Tensor, (quantile - 1) * error) + quantile_error = cast("Tensor", quantile * error) + quantile_minus_one_error = cast("Tensor", (quantile - 1) * error) loss += Tensor.maximum(quantile_error, quantile_minus_one_error).mean() return loss diff --git a/application/predictionengine/src/predictionengine/main.py b/application/predictionengine/src/predictionengine/main.py index 20075aee5..06c5a7cd3 100644 --- a/application/predictionengine/src/predictionengine/main.py +++ b/application/predictionengine/src/predictionengine/main.py @@ -1,20 +1,29 @@ import os import traceback -from typing import AsyncGenerator -from datetime import date, datetime, timedelta +from collections.abc import AsyncGenerator from contextlib import asynccontextmanager -import requests +from datetime import UTC, date, datetime, timedelta +from pathlib import Path + import polars as pl -from fastapi import FastAPI, Request, Response, status, HTTPException -from prometheus_fastapi_instrumentator import Instrumentator +import requests +from fastapi import FastAPI, HTTPException, Request, Response, status from loguru import logger -from .miniature_temporal_fusion_transformer import MiniatureTemporalFusionTransformer +from prometheus_fastapi_instrumentator import Instrumentator + from .dataset import DataSet +from .miniature_temporal_fusion_transformer import MiniatureTemporalFusionTransformer from .models import PredictionResponse +LOOKBACK_DAYS = 30 + + +class LoadError(Exception): + """Raised when loading a file fails due to format or content issues.""" + @asynccontextmanager -async def lifespan(app: FastAPI) -> AsyncGenerator[None]: +async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: datamanager_base_url = os.getenv("DATAMANAGER_BASE_URL", "") app.state.datamanager_base_url = datamanager_base_url @@ -55,7 +64,7 @@ def fetch_historical_data( def load_or_initialize_model(data: pl.DataFrame) -> MiniatureTemporalFusionTransformer: dataset = DataSet( batch_size=32, - sequence_length=30, + sequence_length=LOOKBACK_DAYS, sample_count=len(data), ) dataset.load_data(data) @@ -76,22 +85,22 @@ def load_or_initialize_model(data: pl.DataFrame) -> MiniatureTemporalFusionTrans ) model_path = "miniature_temporal_fusion_transformer.safetensor" - if os.path.exists(model_path): + if Path(model_path).exists(): try: model.load(model_path) logger.info("Loaded existing model weights") - except Exception as e: - logger.warning(f"Failed to load model weights: {e}") + except LoadError as e: + logger.error(f"Failed to load model weights: {e}") return model -@application.post("/create-predictions", response_model=PredictionResponse) +@application.post("/create-predictions") async def create_predictions( request: Request, ) -> PredictionResponse: try: - end_date = datetime.now().date() + end_date = datetime.now(tz=UTC).date() start_date = end_date - timedelta(days=30) logger.info(f"Fetching data from {start_date} to {end_date}") @@ -100,7 +109,7 @@ async def create_predictions( ) if data.is_empty(): - raise HTTPException( + raise HTTPException( # noqa: TRY301 status_code=404, detail="No data available for prediction" ) @@ -115,15 +124,15 @@ async def create_predictions( for ticker in unique_tickers: ticker_data = data.filter(pl.col("ticker") == ticker) - if len(ticker_data) < 30: + if len(ticker_data) < LOOKBACK_DAYS: logger.warning(f"Insufficient data for ticker {ticker}") continue - recent_data = ticker_data.tail(30) + recent_data = ticker_data.tail(LOOKBACK_DAYS) dataset = DataSet( batch_size=1, - sequence_length=30, + sequence_length=LOOKBACK_DAYS, sample_count=1, ) dataset.load_data(recent_data) @@ -145,7 +154,7 @@ async def create_predictions( } if not predictions: - raise HTTPException( + raise HTTPException( # noqa: TRY301 status_code=404, detail="No predictions could be generated" ) @@ -157,5 +166,5 @@ async def create_predictions( logger.error(f"Error creating predictions: {e}") logger.error(traceback.format_exc()) raise HTTPException( - status_code=500, detail=f"Internal server error: {str(e)}" + status_code=500, detail=f"Internal server error: {e!s}" ) from e diff --git a/application/predictionengine/src/predictionengine/miniature_temporal_fusion_transformer.py b/application/predictionengine/src/predictionengine/miniature_temporal_fusion_transformer.py index 97f0a5037..36a3dc58d 100644 --- a/application/predictionengine/src/predictionengine/miniature_temporal_fusion_transformer.py +++ b/application/predictionengine/src/predictionengine/miniature_temporal_fusion_transformer.py @@ -1,27 +1,27 @@ -from typing import Dict +import numpy as np +import numpy.typing as npt from category_encoders import OrdinalEncoder -from .ticker_embedding import TickerEmbedding -from .long_short_term_memory import LongShortTermMemory -from .gated_residual_network import GatedResidualNetwork -from .multi_head_self_attention import MultiHeadSelfAttention -from .post_processor import PostProcessor -from tinygrad.tensor import Tensor from tinygrad.nn.optim import Adam from tinygrad.nn.state import ( get_parameters, get_state_dict, - safe_save, - safe_load, load_state_dict, + safe_load, + safe_save, ) -from typing import Tuple, List -import numpy as np +from tinygrad.tensor import Tensor + from .dataset import DataSet +from .gated_residual_network import GatedResidualNetwork +from .long_short_term_memory import LongShortTermMemory from .loss_function import quantile_loss +from .multi_head_self_attention import MultiHeadSelfAttention +from .post_processor import PostProcessor +from .ticker_embedding import TickerEmbedding class MiniatureTemporalFusionTransformer: - def __init__( + def __init__( # noqa: PLR0913 self, input_size: int, hidden_size: int, @@ -30,41 +30,41 @@ def __init__( ticker_count: int, embedding_size: int, attention_head_count: int, - means_by_ticker: Dict[str, Tensor], - standard_deviations_by_ticker: Dict[str, Tensor], + means_by_ticker: dict[str, Tensor], + standard_deviations_by_ticker: dict[str, Tensor], ticker_encoder: OrdinalEncoder, dropout_rate: float, # non-zero indicates training ) -> None: - self.ticker_embedding = TickerEmbedding( + self.ticker_embedding: TickerEmbedding = TickerEmbedding( ticker_count=ticker_count, embedding_size=embedding_size, ) - self.lstm_encoder = LongShortTermMemory( + self.lstm_encoder: LongShortTermMemory = LongShortTermMemory( input_size=input_size + embedding_size, hidden_size=hidden_size, layer_count=layer_count, dropout_rate=dropout_rate, ) - self.feature_processor = GatedResidualNetwork( + self.feature_processor: GatedResidualNetwork = GatedResidualNetwork( input_size=hidden_size, hidden_size=hidden_size, output_size=hidden_size, ) - self.self_attention = MultiHeadSelfAttention( + self.self_attention: MultiHeadSelfAttention = MultiHeadSelfAttention( heads_count=attention_head_count, embedding_size=hidden_size, ) - self.output_layer = GatedResidualNetwork( + self.output_layer: GatedResidualNetwork = GatedResidualNetwork( input_size=hidden_size, hidden_size=hidden_size, output_size=output_size, ) - self.post_processor = PostProcessor( + self.post_processor: PostProcessor = PostProcessor( means_by_ticker=means_by_ticker, standard_deviations_by_ticker=standard_deviations_by_ticker, ticker_encoder=ticker_encoder, @@ -76,7 +76,13 @@ def forward( self, tickers: Tensor, features: Tensor, - ) -> Tuple[Tensor, Tensor, Tuple[np.ndarray, np.ndarray, np.ndarray]]: + ) -> tuple[ + Tensor, + Tensor, + tuple[ + npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.float64] + ], + ]: ticker_embeddings = self.ticker_embedding.forward( tickers ) # (batch_size, embedding_dim) @@ -109,14 +115,14 @@ def train( dataset: DataSet, epoch_count: int, learning_rate: float = 1e-3, - ) -> List[float]: + ) -> list[float]: optimizer = Adam(params=self.parameters, lr=learning_rate) - quantiles = (0.25, 0.50, 0.75) - losses: List[float] = [] + quantiles: tuple[float, float, float] = (0.25, 0.50, 0.75) + losses: list[float] = [] for _ in range(epoch_count): - epoch_loss = 0.0 + epoch_loss: float = 0.0 for tickers, historical_features, targets in dataset.batches(): predictions, _, _ = self.forward( @@ -124,15 +130,15 @@ def train( historical_features, ) - loss = quantile_loss(predictions, targets, quantiles) + loss: Tensor = quantile_loss(predictions, targets, quantiles) optimizer.zero_grad() - loss.backward() + _ = loss.backward() optimizer.step() epoch_loss += loss.numpy().item() - avgerage_epoch_loss = epoch_loss / len(dataset) + avgerage_epoch_loss: float = epoch_loss / len(dataset) losses.append(avgerage_epoch_loss) return losses @@ -150,9 +156,7 @@ def validate( total_loss += loss.item() batch_count += 1 - average_loss = total_loss / batch_count - - return average_loss + return total_loss / batch_count def save( self, @@ -166,14 +170,16 @@ def load( path_and_file: str = "miniature_temporal_fusion_transformer.safetensor", ) -> None: states = safe_load(path_and_file) - load_state_dict(self, states) + _ = load_state_dict(self, states) def predict( self, tickers: Tensor, - input: Tensor, - ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: - predictions, _, _ = self.forward(tickers, input) + input_: Tensor, + ) -> tuple[ + npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.float64] + ]: + predictions, _, _ = self.forward(tickers, input_) percentile_25, percentile_50, percentile_75 = ( self.post_processor.post_process_predictions( diff --git a/application/predictionengine/src/predictionengine/models.py b/application/predictionengine/src/predictionengine/models.py index 37ba3e0d9..123d30a33 100644 --- a/application/predictionengine/src/predictionengine/models.py +++ b/application/predictionengine/src/predictionengine/models.py @@ -1,6 +1,5 @@ from pydantic import BaseModel -from typing import Dict class PredictionResponse(BaseModel): - predictions: Dict[str, Dict[str, float]] + predictions: dict[str, dict[str, float]] diff --git a/application/predictionengine/src/predictionengine/multi_head_self_attention.py b/application/predictionengine/src/predictionengine/multi_head_self_attention.py index 5ba0f83b9..c947def60 100644 --- a/application/predictionengine/src/predictionengine/multi_head_self_attention.py +++ b/application/predictionengine/src/predictionengine/multi_head_self_attention.py @@ -1,7 +1,8 @@ -from typing import Tuple, cast -from tinygrad.tensor import Tensor -from tinygrad.nn import Linear +from typing import cast + from tinygrad.dtype import dtypes +from tinygrad.nn import Linear +from tinygrad.tensor import Tensor class MultiHeadSelfAttention: @@ -11,29 +12,32 @@ def __init__( embedding_size: int, ) -> None: if embedding_size % heads_count != 0: - raise ValueError("Embedding dimension must be divisible by heads count") + msg = "Embedding dimension must be divisible by heads count" + raise ValueError(msg) - self.heads_count = heads_count - self.embedding_size = embedding_size - self.heads_dimension = embedding_size // heads_count + self.heads_count: int = heads_count + self.embedding_size: int = embedding_size + self.heads_dimension: int = embedding_size // heads_count - self.query_weight = Linear(self.embedding_size, self.embedding_size) - self.key_weight = Linear(self.embedding_size, self.embedding_size) - self.value_weight = Linear(self.embedding_size, self.embedding_size) + self.query_weight: Linear = Linear(self.embedding_size, self.embedding_size) + self.key_weight: Linear = Linear(self.embedding_size, self.embedding_size) + self.value_weight: Linear = Linear(self.embedding_size, self.embedding_size) - self.fully_connected_out = Linear(self.embedding_size, self.embedding_size) + self.fully_connected_out: Linear = Linear( + self.embedding_size, self.embedding_size + ) - self.scale = Tensor(self.heads_dimension**0.5, dtype=dtypes.float32) + self.scale: Tensor = Tensor(self.heads_dimension**0.5, dtype=dtypes.float32) def forward( self, - input: Tensor, - ) -> Tuple[Tensor, Tensor]: - batch_size, sequence_length, _ = input.shape + input_: Tensor, + ) -> tuple[Tensor, Tensor]: + batch_size, sequence_length, _ = input_.shape - query_weights = self.query_weight(input) - key_weights = self.key_weight(input) - value_weights = self.value_weight(input) + query_weights = self.query_weight(input_) + key_weights = self.key_weight(input_) + value_weights = self.value_weight(input_) query_weights = query_weights.view( (batch_size, sequence_length, self.heads_count, self.heads_dimension), @@ -49,7 +53,7 @@ def forward( query_weights.matmul(key_weights.transpose(-2, -1)) / self.scale ) - attention_weights: Tensor = cast(Tensor, attention_scores).softmax(axis=-1) + attention_weights: Tensor = cast("Tensor", attention_scores).softmax(axis=-1) attention_output = attention_weights.matmul(value_weights) diff --git a/application/predictionengine/src/predictionengine/post_processor.py b/application/predictionengine/src/predictionengine/post_processor.py index d0b615010..7f07c2409 100644 --- a/application/predictionengine/src/predictionengine/post_processor.py +++ b/application/predictionengine/src/predictionengine/post_processor.py @@ -1,29 +1,33 @@ -from typing import Dict, Tuple, Any -from tinygrad.tensor import Tensor -from category_encoders import OrdinalEncoder import numpy as np +import numpy.typing as npt import polars as pl +from category_encoders import OrdinalEncoder +from tinygrad.tensor import Tensor + +TensorMapping = dict[str, Tensor] class PostProcessor: def __init__( self, - means_by_ticker: Dict[str, Tensor], - standard_deviations_by_ticker: Dict[str, Tensor], + means_by_ticker: TensorMapping, + standard_deviations_by_ticker: TensorMapping, ticker_encoder: OrdinalEncoder, ) -> None: - self.means_by_ticker = means_by_ticker - self.standard_deviations_by_ticker = standard_deviations_by_ticker - self.ticker_encoder = ticker_encoder + self.means_by_ticker: TensorMapping = means_by_ticker + self.standard_deviations_by_ticker: TensorMapping = ( + standard_deviations_by_ticker + ) + self.ticker_encoder: OrdinalEncoder = ticker_encoder def post_process_predictions( self, - encoded_tickers: np.ndarray, - predictions: np.ndarray, - ) -> Tuple[ - np.ndarray[Any, np.dtype[np.float64]], - np.ndarray[Any, np.dtype[np.float64]], - np.ndarray[Any, np.dtype[np.float64]], + encoded_tickers: npt.NDArray[np.float64], + predictions: npt.NDArray[np.float64], + ) -> tuple[ + npt.NDArray[np.float64], + npt.NDArray[np.float64], + npt.NDArray[np.float64], ]: decoded_tickers = self.ticker_encoder.inverse_transform( pl.DataFrame( @@ -40,7 +44,9 @@ def post_process_predictions( ticker not in self.means_by_ticker or ticker not in self.standard_deviations_by_ticker ): - raise ValueError(f"Statistics not found for ticker: {ticker}") + msg = f"Statistics not found for ticker: {ticker}" + raise ValueError(msg) + mean = self.means_by_ticker[ticker].numpy() standard_deviation = self.standard_deviations_by_ticker[ticker].numpy() rescaled_predictions[i, :] = predictions[i, :] * standard_deviation + mean diff --git a/application/predictionengine/tests/test_dataset.py b/application/predictionengine/tests/test_dataset.py index c43cb19a6..9d3b804c0 100644 --- a/application/predictionengine/tests/test_dataset.py +++ b/application/predictionengine/tests/test_dataset.py @@ -1,5 +1,8 @@ +from typing import NamedTuple + import polars as pl import pytest + from application.predictionengine.src.predictionengine.dataset import DataSet @@ -10,10 +13,18 @@ def test_dataset_initialization() -> None: sample_count=3, ) - assert dataset.batch_size == 2 - assert dataset.sequence_length == 3 - assert dataset.sample_count == 3 - assert len(dataset) == 2 + class Expected(NamedTuple): + batch_size: int = 2 + sequence_length: int = 3 + sample_count: int = 3 + observations: int = 2 + + expected = Expected() + + assert dataset.batch_size == expected.batch_size + assert dataset.sequence_length == expected.sequence_length + assert dataset.sample_count == expected.sample_count + assert len(dataset) == expected.observations def test_dataset_load_data() -> None: @@ -102,12 +113,26 @@ def test_dataset_batches() -> None: dataset.load_data(data) + class Expected(NamedTuple): + batch_size: int = 1 + sequence_length: int = 2 + sample_count: int = 3 + observations: int = 2 + features: int = 6 + target: int = 1 + + expected = Expected() + batch_count = 0 for tickers, features, targets in dataset.batches(): batch_count += 1 - assert tickers.shape[0] == 1 # batch_size - assert features.shape == (1, 2, 6) # batch_size, sequence_length, features - assert targets.shape == (1, 1) # batch_size, 1 + assert tickers.shape[0] == expected.batch_size + assert features.shape == ( + expected.batch_size, + expected.sequence_length, + expected.features, + ) + assert targets.shape == (expected.batch_size, expected.target) assert batch_count > 0 @@ -120,4 +145,4 @@ def test_dataset_preprocessors_validation() -> None: ) with pytest.raises(ValueError, match="Preprocessors have not been initialized"): - dataset.get_preprocessors() + _ = dataset.get_preprocessors() diff --git a/application/predictionengine/tests/test_gated_residual_network.py b/application/predictionengine/tests/test_gated_residual_network.py index 617676302..dd6a6d432 100644 --- a/application/predictionengine/tests/test_gated_residual_network.py +++ b/application/predictionengine/tests/test_gated_residual_network.py @@ -1,9 +1,13 @@ -from tinygrad.tensor import Tensor import numpy as np +from numpy.random import PCG64, Generator +from tinygrad.tensor import Tensor + from application.predictionengine.src.predictionengine.gated_residual_network import ( GatedResidualNetwork, ) +rng = Generator(PCG64()) + def test_gated_residual_network_initialization() -> None: input_size = 64 @@ -26,7 +30,7 @@ def test_gated_residual_network_initialization() -> None: def test_gated_residual_network_forward() -> None: grn = GatedResidualNetwork(input_size=32, hidden_size=64, output_size=32) - input_tensor = Tensor(np.random.randn(8, 32)) + input_tensor = Tensor(rng.standard_normal((8, 32))) output = grn.forward(input_tensor) assert output.shape == (8, 32) @@ -35,7 +39,7 @@ def test_gated_residual_network_forward() -> None: def test_gated_residual_network_different_sizes() -> None: grn = GatedResidualNetwork(input_size=16, hidden_size=32, output_size=8) - input_tensor = Tensor(np.random.randn(4, 16)) + input_tensor = Tensor(rng.standard_normal((4, 16))) output = grn.forward(input_tensor) assert output.shape == (4, 8) @@ -44,7 +48,7 @@ def test_gated_residual_network_different_sizes() -> None: def test_gated_residual_network_single_sample() -> None: grn = GatedResidualNetwork(input_size=10, hidden_size=20, output_size=10) - input_tensor = Tensor(np.random.randn(1, 10)) + input_tensor = Tensor(rng.standard_normal((1, 10))) output = grn.forward(input_tensor) assert output.shape == (1, 10) @@ -53,7 +57,7 @@ def test_gated_residual_network_single_sample() -> None: def test_gated_residual_network_consistency() -> None: grn = GatedResidualNetwork(input_size=16, hidden_size=32, output_size=16) - input_tensor = Tensor(np.random.randn(2, 16)) + input_tensor = Tensor(rng.standard_normal((2, 16))) output1 = grn.forward(input_tensor) output2 = grn.forward(input_tensor) diff --git a/application/predictionengine/tests/test_long_short_term_memory.py b/application/predictionengine/tests/test_long_short_term_memory.py index dd631c1bf..2e08fbba9 100644 --- a/application/predictionengine/tests/test_long_short_term_memory.py +++ b/application/predictionengine/tests/test_long_short_term_memory.py @@ -1,18 +1,31 @@ -from tinygrad.tensor import Tensor +from typing import NamedTuple + import numpy as np +from numpy.random import PCG64, Generator +from tinygrad.tensor import Tensor + from application.predictionengine.src.predictionengine.long_short_term_memory import ( LongShortTermMemory, ) +rng = Generator(PCG64()) + def test_lstm_initialization() -> None: lstm = LongShortTermMemory( input_size=32, hidden_size=64, layer_count=2, dropout_rate=0.1 ) - assert lstm.hidden_size == 64 - assert lstm.layer_count == 2 - assert lstm.dropout_rate == 0.1 + class Expected(NamedTuple): + hidden_state: int = 64 + layer_count: int = 2 + dropout_rate: float = 0.1 + + expected = Expected(hidden_state=64, layer_count=2, dropout_rate=0.1) + + assert lstm.hidden_size == expected.hidden_state + assert lstm.layer_count == expected.layer_count + assert lstm.dropout_rate == expected.dropout_rate def test_lstm_forward() -> None: @@ -20,12 +33,14 @@ def test_lstm_forward() -> None: input_size=16, hidden_size=32, layer_count=1, dropout_rate=0.0 ) - input_tensor = Tensor(np.random.randn(4, 10, 16)) + input_tensor = Tensor(rng.standard_normal((4, 10, 16))) output, hidden_state = lstm.forward(input_tensor) + expected_hidden_state = 2 + assert output.shape == (4, 10, 32) assert isinstance(hidden_state, tuple) - assert len(hidden_state) == 2 + assert len(hidden_state) == expected_hidden_state def test_lstm_different_sequence_lengths() -> None: @@ -34,7 +49,7 @@ def test_lstm_different_sequence_lengths() -> None: ) for seq_len in [5, 10, 20]: - input_tensor = Tensor(np.random.randn(2, seq_len, 8)) + input_tensor = Tensor(rng.standard_normal((2, seq_len, 8))) output, hidden_state = lstm.forward(input_tensor) assert output.shape == (2, seq_len, 16) @@ -45,7 +60,7 @@ def test_lstm_multiple_layers() -> None: input_size=10, hidden_size=20, layer_count=3, dropout_rate=0.0 ) - input_tensor = Tensor(np.random.randn(2, 5, 10)) + input_tensor = Tensor(rng.standard_normal((2, 5, 10))) output, hidden_state = lstm.forward(input_tensor) assert output.shape == (2, 5, 20) @@ -57,7 +72,7 @@ def test_lstm_single_timestep() -> None: input_size=12, hidden_size=24, layer_count=1, dropout_rate=0.0 ) - input_tensor = Tensor(np.random.randn(3, 1, 12)) + input_tensor = Tensor(rng.standard_normal((3, 1, 12))) output, _ = lstm.forward(input_tensor) assert output.shape == (3, 1, 24) @@ -68,7 +83,7 @@ def test_lstm_consistency() -> None: input_size=6, hidden_size=12, layer_count=1, dropout_rate=0.0 ) - input_tensor = Tensor(np.random.randn(1, 3, 6)) + input_tensor = Tensor(rng.standard_normal((1, 3, 6))) first_output, _ = lstm.forward(input_tensor) second_output, _ = lstm.forward(input_tensor) diff --git a/application/predictionengine/tests/test_loss_function.py b/application/predictionengine/tests/test_loss_function.py index 6bebf839f..d9e5bd43f 100644 --- a/application/predictionengine/tests/test_loss_function.py +++ b/application/predictionengine/tests/test_loss_function.py @@ -1,10 +1,14 @@ -from tinygrad.tensor import Tensor import numpy as np import pytest +from numpy.random import PCG64, Generator +from tinygrad.tensor import Tensor + from application.predictionengine.src.predictionengine.loss_function import ( quantile_loss, ) +rng = Generator(PCG64()) + def test_quantile_loss_basic() -> None: predictions = Tensor([[1.0], [2.0], [3.0]]) @@ -14,7 +18,7 @@ def test_quantile_loss_basic() -> None: loss = quantile_loss(predictions, targets, quantiles) assert isinstance(loss, Tensor) - assert loss.shape == () or loss.shape == (1,) + assert len(loss.shape) == 0 or loss.shape in [(), (0,), (1,)] def test_quantile_loss_multiple_samples() -> None: @@ -22,10 +26,10 @@ def test_quantile_loss_multiple_samples() -> None: targets = Tensor([[2.5], [5.5]]) quantiles = (0.25, 0.5, 0.75) - loss = quantile_loss(predictions, targets, quantiles) + loss: Tensor = quantile_loss(predictions, targets, quantiles) assert isinstance(loss, Tensor) - assert loss.shape == () or loss.shape == (1,) + assert len(loss.shape) == 0 or loss.shape in [(), (0,), (1,)] def test_quantile_loss_perfect_prediction() -> None: @@ -51,8 +55,8 @@ def test_quantile_loss_different_quantiles() -> None: def test_quantile_loss_shapes() -> None: for batch_size in [1, 2, 4, 8]: - predictions = Tensor(np.random.randn(batch_size, 1).astype(np.float32)) - targets = Tensor(np.random.randn(batch_size, 1).astype(np.float32)) + predictions = Tensor(rng.standard_normal((batch_size, 1)).astype(np.float32)) + targets = Tensor(rng.standard_normal((batch_size, 1)).astype(np.float32)) quantiles = (0.25, 0.5, 0.75) loss = quantile_loss(predictions, targets, quantiles) diff --git a/application/predictionengine/tests/test_multi_head_self_attention.py b/application/predictionengine/tests/test_multi_head_self_attention.py index d818eea00..692f99b2c 100644 --- a/application/predictionengine/tests/test_multi_head_self_attention.py +++ b/application/predictionengine/tests/test_multi_head_self_attention.py @@ -1,26 +1,34 @@ +from numpy.random import PCG64, Generator from tinygrad.tensor import Tensor -import numpy as np -from application.predictionengine.src.predictionengine.multi_head_self_attention import ( + +from application.predictionengine.src.predictionengine.multi_head_self_attention import ( # noqa: E501 MultiHeadSelfAttention, ) +rng = Generator(PCG64()) + def test_multi_head_attention_initialization() -> None: + heads_count = 8 + embedding_size = 64 attention = MultiHeadSelfAttention(heads_count=8, embedding_size=64) - assert attention.heads_count == 8 - assert attention.embedding_size == 64 + assert attention.heads_count == heads_count + assert attention.embedding_size == embedding_size def test_multi_head_attention_forward() -> None: attention = MultiHeadSelfAttention(heads_count=4, embedding_size=32) - input_tensor = Tensor(np.random.randn(2, 10, 32)) + input_tensor = Tensor(rng.standard_normal((2, 10, 32))) output, attention_weights = attention.forward(input_tensor) - assert output.shape == (2, 10, 32) - assert attention_weights.shape[0] == 2 # batch size - assert attention_weights.shape[1] == 4 # heads count + batch_size = 2 + heads_count = 4 + + assert output.shape == (batch_size, 10, 32) + assert attention_weights.shape[0] == batch_size + assert attention_weights.shape[1] == heads_count def test_multi_head_attention_different_heads() -> None: @@ -30,7 +38,7 @@ def test_multi_head_attention_different_heads() -> None: heads_count=heads_count, embedding_size=embedding_size ) - input_tensor = Tensor(np.random.randn(1, 5, embedding_size)) + input_tensor = Tensor(rng.standard_normal((1, 5, embedding_size))) output, attention_weights = attention.forward(input_tensor) assert output.shape == (1, 5, embedding_size) @@ -40,7 +48,7 @@ def test_multi_head_attention_different_heads() -> None: def test_multi_head_attention_single_sequence() -> None: attention = MultiHeadSelfAttention(heads_count=2, embedding_size=16) - input_tensor = Tensor(np.random.randn(1, 1, 16)) + input_tensor = Tensor(rng.standard_normal((1, 1, 16))) output, _ = attention.forward(input_tensor) assert output.shape == (1, 1, 16) @@ -50,7 +58,7 @@ def test_multi_head_attention_longer_sequences() -> None: attention = MultiHeadSelfAttention(heads_count=4, embedding_size=64) for seq_len in [10, 20, 50]: - input_tensor = Tensor(np.random.randn(1, seq_len, 64)) + input_tensor = Tensor(rng.standard_normal((1, seq_len, 64))) output, _ = attention.forward(input_tensor) assert output.shape == (1, seq_len, 64) @@ -60,7 +68,7 @@ def test_multi_head_attention_batch_processing() -> None: attention = MultiHeadSelfAttention(heads_count=2, embedding_size=32) for batch_size in [1, 2, 4, 8]: - input_tensor = Tensor(np.random.randn(batch_size, 5, 32)) + input_tensor = Tensor(rng.standard_normal((batch_size, 5, 32))) output, attention_weights = attention.forward(input_tensor) assert output.shape == (batch_size, 5, 32) diff --git a/application/predictionengine/tests/test_post_processor.py b/application/predictionengine/tests/test_post_processor.py index 0caf8c0cb..dd08caaa4 100644 --- a/application/predictionengine/tests/test_post_processor.py +++ b/application/predictionengine/tests/test_post_processor.py @@ -1,7 +1,8 @@ -from category_encoders import OrdinalEncoder +import numpy as np import polars as pl +from category_encoders import OrdinalEncoder from tinygrad.tensor import Tensor -import numpy as np + from application.predictionengine.src.predictionengine.post_processor import ( PostProcessor, ) @@ -66,9 +67,11 @@ def test_post_processor_predictions() -> None: assert isinstance(percentile_25, np.ndarray) assert isinstance(percentile_50, np.ndarray) assert isinstance(percentile_75, np.ndarray) - assert len(percentile_25) == 2 - assert len(percentile_50) == 2 - assert len(percentile_75) == 2 + + percentile_size = 2 + assert len(percentile_25) == percentile_size + assert len(percentile_50) == percentile_size + assert len(percentile_75) == percentile_size assert np.all(percentile_25 <= percentile_50) assert np.all(percentile_50 <= percentile_75) diff --git a/application/predictionengine/tests/test_ticker_embedding.py b/application/predictionengine/tests/test_ticker_embedding.py index abb70d349..5928832d6 100644 --- a/application/predictionengine/tests/test_ticker_embedding.py +++ b/application/predictionengine/tests/test_ticker_embedding.py @@ -1,4 +1,5 @@ from tinygrad.tensor import Tensor + from application.predictionengine.src.predictionengine.ticker_embedding import ( TickerEmbedding, ) diff --git a/infrastructure/__main__.py b/infrastructure/__main__.py index fd7bc99ca..a309cac1b 100644 --- a/infrastructure/__main__.py +++ b/infrastructure/__main__.py @@ -1,5 +1,70 @@ +import base64 + import buckets # noqa: F401 -import cloud_run # noqa: F401 -import images # noqa: F401 -import monitoring # noqa: F401 -import topics # noqa: F401 +import topics +from environment_variables import ( + ALPACA_API_KEY_ID, + ALPACA_API_SECRET_KEY, + DATA_BUCKET, + DUCKDB_ACCESS_KEY, + DUCKDB_SECRET, + GCP_PROJECT, + POLYGON_API_KEY, + create_environment_variable, +) +from project import platform_service_account +from pulumi_gcp import cloudscheduler, pubsub +from services import create_service + +datamanager_service = create_service( + name="datamanager", + envs=[ + ALPACA_API_KEY_ID, + ALPACA_API_SECRET_KEY, + GCP_PROJECT, + DATA_BUCKET, + DUCKDB_ACCESS_KEY, + DUCKDB_SECRET, + POLYGON_API_KEY, + ], +) + +DATAMANAGER_BASE_URL = create_environment_variable( + "DATAMANAGER_BASE_URL", datamanager_service.statuses[0].url +) + +predictionengine_service = create_service( + "predictionengine", envs=[DATAMANAGER_BASE_URL] +) + + +positionmanager_service = create_service( + "positionmanager", + envs=[ + ALPACA_API_KEY_ID, + ALPACA_API_SECRET_KEY, + DATAMANAGER_BASE_URL, + ], +) + + +datamanager_subscription = pubsub.Subscription( + "datamanager-subscription", + topic=topics.datamanager_ping.id, + push_config=pubsub.SubscriptionPushConfigArgs( + push_endpoint=datamanager_service.statuses[0].url, + oidc_token=pubsub.SubscriptionPushConfigOidcTokenArgs( + service_account_email=platform_service_account.email + ), + ), +) + +datamanager_job = cloudscheduler.Job( + "datamanager-job", + schedule="0 0 * * *", + time_zone="UTC", + pubsub_target=cloudscheduler.JobPubsubTargetArgs( + topic_name=topics.datamanager_ping.id, + data=base64.b64encode(b"{}").decode("utf-8"), + ), +) diff --git a/infrastructure/buckets.py b/infrastructure/buckets.py index 84ca8e2f0..379e55991 100644 --- a/infrastructure/buckets.py +++ b/infrastructure/buckets.py @@ -10,3 +10,12 @@ location=project.REGION, uniform_bucket_level_access=True, ) + +storage.BucketIAMMember( + "platform-write-access", + bucket=production_data_bucket.name, + role="roles/storage.objectCreator", + member=project.platform_service_account.email.apply( + lambda e: f"serviceAccount:{e}" + ), +) diff --git a/infrastructure/cloud_run.py b/infrastructure/cloud_run.py deleted file mode 100644 index 7177d0107..000000000 --- a/infrastructure/cloud_run.py +++ /dev/null @@ -1,150 +0,0 @@ -import base64 - -import buckets -import project -import topics -from pulumi import Config -from pulumi_gcp import cloudrun, cloudscheduler, pubsub - -config = Config() - -alpaca_api_key = config.require_secret("ALPACA_API_KEY_ID") -alpaca_api_secret = config.require_secret("ALPACA_API_SECRET_KEY") -duckdb_access_key = config.require_secret("DUCKDB_ACCESS_KEY") -duckdb_secret = config.require_secret("DUCKDB_SECRET") - - -datamanager_service = cloudrun.Service( - "datamanager", - location=project.REGION, - template=cloudrun.ServiceTemplateArgs( - spec=cloudrun.ServiceTemplateSpecArgs( - service_account_name=project.platform_service_account.email, - containers=[ - cloudrun.ServiceTemplateSpecContainerArgs( - image="pocketsizefund/datamanager:latest", - envs=[ - cloudrun.ServiceTemplateSpecContainerEnvArgs( - name="ALPACA_API_KEY_ID", - value=alpaca_api_key, - ), - cloudrun.ServiceTemplateSpecContainerEnvArgs( - name="ALPACA_API_SECRET_KEY", - value=alpaca_api_secret, - ), - cloudrun.ServiceTemplateSpecContainerEnvArgs( - name="GCP_PROJECT", - value=project.PROJECT, - ), - cloudrun.ServiceTemplateSpecContainerEnvArgs( - name="DATA_BUCKET", - value=buckets.production_data_bucket.name, - ), - cloudrun.ServiceTemplateSpecContainerEnvArgs( - name="DUCKDB_ACCESS_KEY", - value=duckdb_access_key, - ), - cloudrun.ServiceTemplateSpecContainerEnvArgs( - name="DUCKDB_SECRET", - value=duckdb_secret, - ), - cloudrun.ServiceTemplateSpecContainerEnvArgs( - name="DATA_BUCKET", - value=buckets.production_data_bucket.name, - ), - cloudrun.ServiceTemplateSpecContainerEnvArgs( - name="DUCKDB_ACCESS_KEY", value=duckdb_access_key - ), - cloudrun.ServiceTemplateSpecContainerEnvArgs( - name="DUCKDB_SECRET", - value=duckdb_secret, - ), - ], - ), - ], - ), - ), -) - -subscription = pubsub.Subscription( - "datamanager-subscription", - topic=topics.datamanager_ping.id, - push_config=pubsub.SubscriptionPushConfigArgs( - push_endpoint=datamanager_service.statuses[0].url, - oidc_token=pubsub.SubscriptionPushConfigOidcTokenArgs( - service_account_email=project.platform_service_account.email, - ), - ), -) - -job = cloudscheduler.Job( - "datamanager-job", - schedule="0 0 * * *", - time_zone="UTC", - pubsub_target=cloudscheduler.JobPubsubTargetArgs( - topic_name=topics.datamanager_ping.id, - data=base64.b64encode(b"{}").decode("utf-8"), - ), -) - -positionmanager_service = cloudrun.Service( - "positionmanager", - location=project.REGION, - template=cloudrun.ServiceTemplateArgs( - spec=cloudrun.ServiceTemplateSpecArgs( - service_account_name=project.platform_service_account.email, - containers=[ - cloudrun.ServiceTemplateSpecContainerArgs( - image="pocketsizefund/positionmanager:latest", - envs=[ - cloudrun.ServiceTemplateSpecContainerEnvArgs( - name="ALPACA_API_KEY", - value=config.require_secret("ALPACA_API_KEY"), - ), - cloudrun.ServiceTemplateSpecContainerEnvArgs( - name="ALPACA_API_SECRET", - value=config.require_secret("ALPACA_API_SECRET"), - ), - cloudrun.ServiceTemplateSpecContainerEnvArgs( - name="ALPACA_PAPER", - value=config.get("ALPACA_PAPER") or "true", - ), - cloudrun.ServiceTemplateSpecContainerEnvArgs( - name="DATAMANAGER_BASE_URL", - value=datamanager_service.statuses[0].url, - ), - cloudrun.ServiceTemplateSpecContainerEnvArgs( - name="MINIMUM_PORTFOLIO_TICKERS", - value=config.get("MINIMUM_PORTFOLIO_TICKERS") or "5", - ), - cloudrun.ServiceTemplateSpecContainerEnvArgs( - name="MAXIMUM_PORTFOLIO_TICKERS", - value=config.get("MAXIMUM_PORTFOLIO_TICKERS") or "20", - ), - ], - ) - ], - ) - ), -) - -predictionengine_service = cloudrun.Service( - "predictionengine", - location=project.REGION, - template=cloudrun.ServiceTemplateArgs( - spec=cloudrun.ServiceTemplateSpecArgs( - service_account_name=project.platform_service_account.email, - containers=[ - cloudrun.ServiceTemplateSpecContainerArgs( - image="pocketsizefund/predictionengine:latest", - envs=[ - cloudrun.ServiceTemplateSpecContainerEnvArgs( - name="DATAMANAGER_BASE_URL", - value=datamanager_service.statuses[0].url, - ) - ], - ) - ], - ) - ), -) diff --git a/infrastructure/environment_variables.py b/infrastructure/environment_variables.py new file mode 100644 index 000000000..22f6f4353 --- /dev/null +++ b/infrastructure/environment_variables.py @@ -0,0 +1,38 @@ +from pulumi import Output +from pulumi.config import Config +from pulumi_gcp.cloudrun import ServiceTemplateSpecContainerEnvArgs + +config = Config() + +ENVIRONMENT_VARIABLE = ServiceTemplateSpecContainerEnvArgs + + +def create_environment_variable( + name: str, value: str | Output[str] +) -> ENVIRONMENT_VARIABLE: + return ServiceTemplateSpecContainerEnvArgs(name=name, value=value) + + +GCP_PROJECT = create_environment_variable( + name="GCP_PROJECT", value=config.require_secret("GCP_PROJECT") +) + +ALPACA_API_KEY_ID = create_environment_variable( + name="ALPACA_API_KEY_ID", value=config.require_secret("ALPACA_API_KEY_ID") +) +ALPACA_API_SECRET_KEY = create_environment_variable( + name="ALPACA_API_SECRET_KEY", value=config.require_secret("ALPACA_API_SECRET_KEY") +) + +DATA_BUCKET = create_environment_variable( + name="DATA_BUCKET", value=config.require_secret("DATA_BUCKET") +) +DUCKDB_ACCESS_KEY = create_environment_variable( + name="DUCKDB_ACCESS_KEY", value=config.require_secret("DUCKDB_ACCESS_KEY") +) +DUCKDB_SECRET = create_environment_variable( + name="DUCKDB_SECRET", value=config.require_secret("DUCKDB_SECRET") +) +POLYGON_API_KEY = create_environment_variable( + name="POLYGON_API_KEY", value=config.require_secret("POLYGON_API_KEY") +) diff --git a/infrastructure/images.py b/infrastructure/images.py deleted file mode 100644 index bb2408e9e..000000000 --- a/infrastructure/images.py +++ /dev/null @@ -1,51 +0,0 @@ -from datetime import UTC, datetime -from pathlib import Path - -import pulumi -import pulumi_docker_build as docker_build -from loguru import logger -from pulumi import Config - -config = Config() -dockerhub_username = config.require_secret("dockerhub_username") -dockerhub_password = config.require_secret("dockerhub_password") - -application_path = Path("../application/").resolve() -dockerfile_paths = [ - app.relative_to(application_path) for app in application_path.glob("*/Dockerfile") -] - -tags = [ - "latest", - datetime.now(tz=UTC).strftime("%Y%m%d"), -] - -images = {} -for dockerfile in dockerfile_paths: - service_dir = dockerfile.parent - service_name = dockerfile.name - logger.info(f"Creating image for service: {service_name}") - - images[service_name] = docker_build.Image( - f"{service_name}-image", - tags=[f"pocketsizefund/{service_name}:{tag}" for tag in tags], - context=docker_build.BuildContextArgs( - location=service_dir, - ), - platforms=[ - docker_build.Platform.LINUX_AMD64, - docker_build.Platform.LINUX_ARM64, - ], - push=True, - registries=[ - docker_build.RegistryArgs( - address="docker.io", - username=dockerhub_username, - password=dockerhub_password, - ), - ], - ) - - pulumi.export(f"{service_name}-ref", images[service_name].ref) - -logger.info(f"Available image services: {list(images.keys())}") diff --git a/infrastructure/ping.nu b/infrastructure/ping.nu new file mode 100644 index 000000000..0f03ebe88 --- /dev/null +++ b/infrastructure/ping.nu @@ -0,0 +1,14 @@ +let headers = [Authorization $"Bearer (gcloud auth print-identity-token)"] +let services = gcloud run services list --format=json +| from json +| get status.address.url +| each {|url| + { + service: ($url | split row "https://" | get 1 | split row "-" | get 0) + url: $url + } +} + +let datamanager_url = ($services | where service == "datamanager" | get url.0) + +http get --full --headers $headers $"($datamanager_url)/health" diff --git a/infrastructure/pyproject.toml b/infrastructure/pyproject.toml index 4b23da475..3f854f5ca 100644 --- a/infrastructure/pyproject.toml +++ b/infrastructure/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "infrastructure" -version = "0.1.0" +version = "20250602.4" requires-python = ">=3.13" dependencies = [ "pulumi>=3.169.0", diff --git a/infrastructure/services.py b/infrastructure/services.py new file mode 100644 index 000000000..5ad1c8e29 --- /dev/null +++ b/infrastructure/services.py @@ -0,0 +1,83 @@ +import tomllib +from pathlib import Path + +import project +import pulumi_docker_build as docker_build +from environment_variables import ENVIRONMENT_VARIABLE +from pulumi import ResourceOptions +from pulumi.config import Config +from pulumi_gcp.cloudrun import ( + Service, + ServiceTemplateArgs, + ServiceTemplateSpecArgs, + ServiceTemplateSpecContainerArgs, + ServiceTemplateSpecContainerStartupProbeArgs, + ServiceTemplateSpecContainerStartupProbeHttpGetArgs, +) + +config = Config() + + +def create_service( + name: str, envs: list[ENVIRONMENT_VARIABLE] | None = None +) -> Service: + if envs is None: + envs = [] + + try: + with Path("pyproject.toml").open("rb") as f: + project_data = tomllib.load(f) + version = project_data.get("project", {}).get("version") + + except (FileNotFoundError, tomllib.TOMLDecodeError, ValueError) as e: + msg = f"Failed to read version from pyproject.toml: {e}" + raise RuntimeError(msg) from e + + service_dir = Path("../application") / name + if not service_dir.exists(): + msg = f"Service directory not found: {service_dir}" + raise FileNotFoundError(msg) + + image = docker_build.Image( + f"{name}-image", + tags=[f"pocketsizefund/{name}:{version}"], + context=docker_build.BuildContextArgs(location=str(service_dir)), + platforms=[ + docker_build.Platform.LINUX_AMD64, + docker_build.Platform.LINUX_ARM64, + ], + push=True, + registries=[ + docker_build.RegistryArgs( + address="docker.io", + username=config.require_secret("dockerhub_username"), + password=config.require_secret("dockerhub_password"), + ) + ], + ) + + return Service( + name, + opts=ResourceOptions(depends_on=[image]), + location=project.REGION, + template=ServiceTemplateArgs( + spec=ServiceTemplateSpecArgs( + service_account_name=project.platform_service_account.email, + containers=[ + ServiceTemplateSpecContainerArgs( + image=f"pocketsizefund/{name}:{version}", + envs=envs, + startup_probe=ServiceTemplateSpecContainerStartupProbeArgs( + initial_delay_seconds=60, + period_seconds=60, + failure_threshold=50, + http_get=ServiceTemplateSpecContainerStartupProbeHttpGetArgs( + path="/health", + port=8080, + ), + ), + ) + ], + ), + ), + ) diff --git a/pyproject.toml b/pyproject.toml index feef765f7..0ee81df90 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,8 +1,15 @@ [project] name = "pocketsizefund" -version = "0.1.0" +version = "20250602.4" description = "Open source quantitative hedge fund 🍊" requires-python = ">=3.12" +dependencies = [ + "flytekit>=1.15.4", + "httpx>=0.28.1", + "pulumi-docker-build>=0.0.12", + "pulumi-gcp>=8.32.0", + "requests>=2.32.3", +] [tool.uv.workspace] members = [ @@ -131,3 +138,6 @@ unresolved-import = "ignore" invalid-return-type = "error" invalid-argument-type = "error" unresolved-reference = "error" + +[tool.pyright] +reportMissingImports = "none" diff --git a/uv.lock b/uv.lock index 9a26a5cbb..12bafa67e 100644 --- a/uv.lock +++ b/uv.lock @@ -511,6 +511,11 @@ dependencies = [ { name = "uvicorn" }, ] +[package.dev-dependencies] +dev = [ + { name = "behave" }, +] + [package.metadata] requires-dist = [ { name = "duckdb", specifier = ">=1.2.2" }, @@ -524,6 +529,9 @@ requires-dist = [ { name = "uvicorn", specifier = ">=0.34.2" }, ] +[package.metadata.requires-dev] +dev = [{ name = "behave", specifier = ">=1.2.6" }] + [[package]] name = "debugpy" version = "1.8.14" @@ -960,7 +968,7 @@ wheels = [ [[package]] name = "infrastructure" -version = "0.1.0" +version = "20250602.4" source = { virtual = "infrastructure" } dependencies = [ { name = "loguru" }, @@ -1495,8 +1503,15 @@ wheels = [ [[package]] name = "pocketsizefund" -version = "0.1.0" +version = "20250602.4" source = { editable = "." } +dependencies = [ + { name = "flytekit" }, + { name = "httpx" }, + { name = "pulumi-docker-build" }, + { name = "pulumi-gcp" }, + { name = "requests" }, +] [package.dev-dependencies] dev = [ @@ -1506,6 +1521,13 @@ dev = [ ] [package.metadata] +requires-dist = [ + { name = "flytekit", specifier = ">=1.15.4" }, + { name = "httpx", specifier = ">=0.28.1" }, + { name = "pulumi-docker-build", specifier = ">=0.0.12" }, + { name = "pulumi-gcp", specifier = ">=8.32.0" }, + { name = "requests", specifier = ">=2.32.3" }, +] [package.metadata.requires-dev] dev = [ @@ -1538,6 +1560,7 @@ dependencies = [ { name = "fastapi" }, { name = "pandas" }, { name = "polars" }, + { name = "prometheus-fastapi-instrumentator" }, { name = "pydantic" }, { name = "pyportfolioopt" }, { name = "requests" }, @@ -1551,6 +1574,7 @@ requires-dist = [ { name = "fastapi", specifier = ">=0.115.12" }, { name = "pandas", specifier = ">=2.1.0" }, { name = "polars", specifier = ">=1.29.0" }, + { name = "prometheus-fastapi-instrumentator", specifier = ">=7.1.0" }, { name = "pydantic", specifier = ">=2.8.2" }, { name = "pyportfolioopt", specifier = ">=1.5.6" }, { name = "requests", specifier = ">=2.31.0" }, @@ -1564,7 +1588,9 @@ source = { editable = "application/predictionengine" } dependencies = [ { name = "category-encoders" }, { name = "fastapi" }, + { name = "loguru" }, { name = "polars" }, + { name = "prometheus-fastapi-instrumentator" }, { name = "requests" }, { name = "tinygrad" }, { name = "uvicorn" }, @@ -1574,7 +1600,9 @@ dependencies = [ requires-dist = [ { name = "category-encoders", specifier = ">=2.8.1" }, { name = "fastapi", specifier = ">=0.115.12" }, + { name = "loguru", specifier = ">=0.7.3" }, { name = "polars", specifier = ">=1.29.0" }, + { name = "prometheus-fastapi-instrumentator", specifier = ">=7.1.0" }, { name = "requests", specifier = ">=2.31.0" }, { name = "tinygrad", specifier = ">=0.10.3" }, { name = "uvicorn", specifier = ">=0.34.2" }, diff --git a/workflows/prediction_model.py b/workflows/prediction_model.py index 986821143..c381e0c05 100644 --- a/workflows/prediction_model.py +++ b/workflows/prediction_model.py @@ -4,14 +4,14 @@ import uuid from datetime import datetime from pathlib import Path -from typing import Any, Dict, List +from typing import Any import requests from flytekit import task, workflow @task -def fetch_data(start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: +def fetch_data(start_date: datetime, end_date: datetime) -> list[dict[str, Any]]: base_url = os.getenv("DATAMANAGER_BASE_URL", "http://localhost:8080") response = requests.get( f"{base_url}/equity-bars", @@ -23,7 +23,7 @@ def fetch_data(start_date: datetime, end_date: datetime) -> List[Dict[str, Any]] @task -def train_dummy_model(data: List[Dict[str, Any]]) -> bytes: +def train_dummy_model(data: list[dict[str, Any]]) -> bytes: """Train a trivial model that stores the average close price.""" close_prices = [row.get("close_price", 0.0) for row in data] mean_close = statistics.mean(close_prices) if close_prices else 0.0 @@ -34,7 +34,7 @@ def train_dummy_model(data: List[Dict[str, Any]]) -> bytes: @task def store_model(model_bytes: bytes) -> str: """Store the serialized model in blob storage.""" - bucket_path = os.getenv("MODEL_BUCKET", "/tmp") + bucket_path = os.getenv("MODEL_BUCKET") filename = f"model-{uuid.uuid4().hex}.pkl" path = Path(bucket_path) / filename path.write_bytes(model_bytes) @@ -45,5 +45,4 @@ def store_model(model_bytes: bytes) -> str: def training_workflow(start_date: datetime, end_date: datetime) -> None: data = fetch_data(start_date=start_date, end_date=end_date) model_bytes = train_dummy_model(data=data) - artifact_path = store_model(model_bytes=model_bytes) - return + store_model(model_bytes=model_bytes)