diff --git a/.github/workflows/lifecycle.yaml b/.github/workflows/lifecycle.yaml index d08d5c599..d4dae7f80 100644 --- a/.github/workflows/lifecycle.yaml +++ b/.github/workflows/lifecycle.yaml @@ -19,10 +19,7 @@ jobs: env: PULUMI_ACCESS_TOKEN: ${{ secrets.PULUMI_ACCESS_TOKEN }} with: - command: | - cd infrastructure - uv run pulumi up --yes - nu ping.nu + command: mise tasks run infrastructure:up teardown: if: github.event.schedule == '0 23 * * 1,2,3,4,5' runs-on: ubuntu-latest @@ -37,6 +34,4 @@ jobs: env: PULUMI_ACCESS_TOKEN: ${{ secrets.PULUMI_ACCESS_TOKEN }} with: - command: | - cd infrastructure - uv run pulumi down --yes + command: mise tasks run infrastructure:down diff --git a/.gitignore b/.gitignore index ccaeaf4f3..eaa811f0c 100644 --- a/.gitignore +++ b/.gitignore @@ -6,9 +6,8 @@ .python_coverage.* __pycache__/ infrastructure/Pulumi.infrastructure.yaml +infrastructure/Pulumi.production.yaml .envrc .coverage* .coverage/ coverage.xml - -todos.md diff --git a/.mise.toml b/.mise.toml index a73abb053..ac553b5f8 100644 --- a/.mise.toml +++ b/.mise.toml @@ -90,6 +90,14 @@ description = "Launch cloud infrastructure" run = """ set -e cd infrastructure -uv run pulumi up --yes +uv run pulumi up --yes --stack pocketsizefund/pocketsizefund/production nu ping.nu """ + +[tasks."infrastructure:down"] +description = "Teardown cloud infrastructure" +run = """ +set -e +cd infrastructure +uv run pulumi down --yes --stack pocketsizefund/pocketsizefund/production +""" diff --git a/application/datamanager/compose.yaml b/application/datamanager/compose.yaml index f362f47b5..bc5fecc11 100644 --- a/application/datamanager/compose.yaml +++ b/application/datamanager/compose.yaml @@ -9,7 +9,7 @@ services: - 8080:8080 environment: - POLYGON_API_KEY=${POLYGON_API_KEY} - - DATA_BUCKET=${DATA_BUCKET} + - DATA_BUCKET_NAME=${DATA_BUCKET_NAME} - GOOGLE_APPLICATION_CREDENTIALS=/root/.config/gcloud/application_default_credentials.json - DUCKDB_ACCESS_KEY=${DUCKDB_ACCESS_KEY} - DUCKDB_SECRET=${DUCKDB_SECRET} diff --git a/application/datamanager/src/datamanager/config.py b/application/datamanager/src/datamanager/config.py index 089c640bc..287b818e7 100644 --- a/application/datamanager/src/datamanager/config.py +++ b/application/datamanager/src/datamanager/config.py @@ -13,13 +13,13 @@ class Polygon(BaseModel): class Bucket(BaseModel): - name: str = Field(default=os.getenv("DATA_BUCKET", "")) + name: str = Field(default=os.getenv("DATA_BUCKET_NAME", "")) project: str = Field(default=os.getenv("GCP_PROJECT", "")) @computed_field def daily_bars_path(self) -> str: if self.name is None: - msg = "DATA_BUCKET environment variable is required" + msg = "DATA_BUCKET_NAME environment variable is required" raise ValueError(msg) return f"gs://{self.name}/equity/bars/" diff --git a/application/eventtrigger/Dockerfile b/application/eventtrigger/Dockerfile new file mode 100644 index 000000000..f550d6a9b --- /dev/null +++ b/application/eventtrigger/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.12.10 + +COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv + +ENV PYTHONPATH=/app/src + +WORKDIR /app + +COPY pyproject.toml ./ + +RUN uv sync --no-dev + +COPY ./src ./src + +EXPOSE 8080 + +ENTRYPOINT ["uv", "run", "uvicorn", "eventtrigger.main:application", "--host", "0.0.0.0", "--port", "8080", "--app-dir", "src"] diff --git a/application/eventtrigger/pyproject.toml b/application/eventtrigger/pyproject.toml new file mode 100644 index 000000000..2f60488b9 --- /dev/null +++ b/application/eventtrigger/pyproject.toml @@ -0,0 +1,20 @@ +[project] +name = "eventtrigger" +version = "0.1.0" +description = "Event trigger service for scheduled HTTP requests" +requires-python = "==3.12.10" +dependencies = [ + "fastapi>=0.115.12", + "uvicorn>=0.34.2", + "prometheus-fastapi-instrumentator>=7.1.0", + "loguru>=0.7.3", + "pydantic>=2.5.0", + "requests>=2.31.0", +] + +[tool.hatch.build.targets.wheel] +packages = ["eventtrigger"] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" diff --git a/application/eventtrigger/src/eventtrigger/__init__.py b/application/eventtrigger/src/eventtrigger/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/application/eventtrigger/src/eventtrigger/main.py b/application/eventtrigger/src/eventtrigger/main.py new file mode 100644 index 000000000..355bf3a95 --- /dev/null +++ b/application/eventtrigger/src/eventtrigger/main.py @@ -0,0 +1,98 @@ +import os +from datetime import datetime, timedelta +from zoneinfo import ZoneInfo + +import requests +from fastapi import FastAPI, Response, status +from loguru import logger +from prometheus_fastapi_instrumentator import Instrumentator + +from .models import TriggerEventRequest + +application = FastAPI() +Instrumentator().instrument(application).expose(application) + + +@application.get("/health") +def health_check() -> Response: + return Response(status_code=status.HTTP_200_OK) + + +@application.post("/trigger") +def trigger_event(request: TriggerEventRequest) -> Response: + event_type = request.event + try: + url = "" + data = {} + method = "" + + if event_type == "fetch_data": + eastern_timezone = ZoneInfo("America/New_York") + + current_time = datetime.now(eastern_timezone) + + rounding_minute_threshold = 30 # threshold for rounding to the next hour + + rounded_time = None + if current_time.minute >= rounding_minute_threshold: + rounded_time = current_time.replace( + minute=0, second=0, microsecond=0 + ) + timedelta(hours=1) + else: + rounded_time = current_time.replace(minute=0, second=0, microsecond=0) + + datamanager_base_url = os.getenv("DATAMANAGER_BASE_URL") + + url = f"{datamanager_base_url}/equity-bars" + + data = {"date": rounded_time.strftime("%Y-%m-%d")} + + method = "POST" + + elif event_type == "create_positions": + predictionengine_base_url = os.getenv("PREDICTIONENGINE_BASE_URL") + + url = f"{predictionengine_base_url}/create-positions" + + method = "POST" + + elif event_type == "close_positions": + positionmanager_base_url = os.getenv("POSITIONMANAGER_BASE_URL") + + url = f"{positionmanager_base_url}/positions" + + method = "DELETE" + + else: + logger.warning(f"Unknown event type: {event_type}") + return Response(status_code=status.HTTP_400_BAD_REQUEST) + + if method == "POST": + response = requests.post( + url, + json=data, + timeout=10, + ) + response.raise_for_status() + logger.info(f"Event {event_type} triggered successfully.") + + elif method == "DELETE": + response = requests.delete( + url, + timeout=10, + ) + response.raise_for_status() + logger.info(f"Event {event_type} triggered successfully.") + + else: + logger.error(f"Unsupported HTTP method: {method}") + return Response(status_code=status.HTTP_400_BAD_REQUEST) + + return Response(status_code=status.HTTP_200_OK) + + except requests.RequestException as e: + logger.error(f"Error triggering event {event_type}: {e}") + return Response( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content=f"Error triggering event: {e}", + ) diff --git a/application/eventtrigger/src/eventtrigger/models.py b/application/eventtrigger/src/eventtrigger/models.py new file mode 100644 index 000000000..d2882e354 --- /dev/null +++ b/application/eventtrigger/src/eventtrigger/models.py @@ -0,0 +1,5 @@ +from pydantic import BaseModel + + +class TriggerEventRequest(BaseModel): + event: str diff --git a/application/eventtrigger/tests/__init__.py b/application/eventtrigger/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/application/eventtrigger/tests/test_eventtrigger_main.py b/application/eventtrigger/tests/test_eventtrigger_main.py new file mode 100644 index 000000000..e2d1d88ce --- /dev/null +++ b/application/eventtrigger/tests/test_eventtrigger_main.py @@ -0,0 +1,114 @@ +import unittest +from unittest.mock import MagicMock, patch + +import requests +from fastapi import status +from fastapi.testclient import TestClient + +from application.eventtrigger.src.eventtrigger.main import application + +client = TestClient(application) + + +def test_health_check() -> None: + response = client.get("/health") + assert response.status_code == status.HTTP_200_OK + + +class TestTriggerEventEndpoint(unittest.TestCase): + def test_trigger_event_missing_body(self) -> None: + response = client.post("/trigger") + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + def test_trigger_event_invalid_event_type(self) -> None: + response = client.post("/trigger", json={"event": "invalid_event"}) + assert response.status_code == status.HTTP_400_BAD_REQUEST + + @patch("application.eventtrigger.src.eventtrigger.main.os.getenv") + @patch("application.eventtrigger.src.eventtrigger.main.requests.post") + def test_trigger_event_fetch_data_success( + self, mock_post: MagicMock, mock_getenv: MagicMock + ) -> None: + mock_getenv.return_value = "https://datamanager.example.com" + mock_response = MagicMock() + mock_response.raise_for_status.return_value = None + mock_post.return_value = mock_response + + response = client.post("/trigger", json={"event": "fetch_data"}) + + assert response.status_code == status.HTTP_200_OK + mock_post.assert_called_once() + call_args = mock_post.call_args + assert "https://datamanager.example.com/equity-bars" in str(call_args[0][0]) + assert "json" in call_args[1] + assert "date" in call_args[1]["json"] + + @patch("application.eventtrigger.src.eventtrigger.main.os.getenv") + @patch("application.eventtrigger.src.eventtrigger.main.requests.post") + def test_trigger_event_create_positions_success( + self, mock_post: MagicMock, mock_getenv: MagicMock + ) -> None: + mock_getenv.return_value = "https://predictionengine.example.com" + mock_response = MagicMock() + mock_response.raise_for_status.return_value = None + mock_post.return_value = mock_response + + response = client.post("/trigger", json={"event": "create_positions"}) + + assert response.status_code == status.HTTP_200_OK + mock_post.assert_called_once() + call_args = mock_post.call_args + assert "https://predictionengine.example.com/create-positions" in str( + call_args[0][0] + ) + + @patch("application.eventtrigger.src.eventtrigger.main.os.getenv") + @patch("application.eventtrigger.src.eventtrigger.main.requests.delete") + def test_trigger_event_close_positions_success( + self, mock_delete: MagicMock, mock_getenv: MagicMock + ) -> None: + mock_getenv.return_value = "https://positionmanager.example.com" + mock_response = MagicMock() + mock_response.raise_for_status.return_value = None + mock_delete.return_value = mock_response + + response = client.post("/trigger", json={"event": "close_positions"}) + + assert response.status_code == status.HTTP_200_OK + mock_delete.assert_called_once() + call_args = mock_delete.call_args + assert "https://positionmanager.example.com/positions" in str(call_args[0][0]) + + @patch("application.eventtrigger.src.eventtrigger.main.os.getenv") + @patch("application.eventtrigger.src.eventtrigger.main.requests.post") + def test_trigger_event_request_exception( + self, mock_post: MagicMock, mock_getenv: MagicMock + ) -> None: + mock_getenv.return_value = "https://datamanager.example.com" + mock_post.side_effect = requests.exceptions.ConnectionError("Connection failed") + + response = client.post("/trigger", json={"event": "fetch_data"}) + + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert "Error triggering event" in response.text + + @patch("application.eventtrigger.src.eventtrigger.main.os.getenv") + @patch("application.eventtrigger.src.eventtrigger.main.requests.post") + def test_trigger_event_http_error( + self, mock_post: MagicMock, mock_getenv: MagicMock + ) -> None: + mock_getenv.return_value = "https://datamanager.example.com" + mock_response = MagicMock() + mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError( + "HTTP Error" + ) + mock_post.return_value = mock_response + + response = client.post("/trigger", json={"event": "fetch_data"}) + + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert "Error triggering event" in response.text + + +if __name__ == "__main__": + unittest.main() diff --git a/application/positionmanager/src/positionmanager/main.py b/application/positionmanager/src/positionmanager/main.py index 445f6f4b4..2cba07628 100644 --- a/application/positionmanager/src/positionmanager/main.py +++ b/application/positionmanager/src/positionmanager/main.py @@ -148,7 +148,7 @@ def create_position(payload: PredictionPayload) -> dict[str, Any]: try: optimized_portfolio = portfolio_optimizer.get_optimized_portfolio( - data=historical_data, + historical_data=historical_data, portfolio_value=cash_balance, predictions=payload.predictions, ) diff --git a/application/positionmanager/src/positionmanager/portfolio.py b/application/positionmanager/src/positionmanager/portfolio.py index 0e9a10a27..dfed971db 100644 --- a/application/positionmanager/src/positionmanager/portfolio.py +++ b/application/positionmanager/src/positionmanager/portfolio.py @@ -17,12 +17,12 @@ def __init__( def get_optimized_portfolio( self, - data: pl.DataFrame, + historical_data: pl.DataFrame, portfolio_value: Money, predictions: dict[str, float], prediction_weight: float = 0.3, ) -> dict[str, int]: - converted_data = data.to_pandas() + converted_data = historical_data.to_pandas() if "date" in converted_data.columns: converted_data = converted_data.set_index("date") diff --git a/application/predictionengine/compose.yaml b/application/predictionengine/compose.yaml index b05558070..ab9695828 100644 --- a/application/predictionengine/compose.yaml +++ b/application/predictionengine/compose.yaml @@ -1,4 +1,4 @@ -name: predictionengine integration tests +name: Prediction engine integration tests services: predictionengine: @@ -9,6 +9,7 @@ services: - 8080:8080 environment: - DATAMANAGER_BASE_URL=${DATAMANAGER_BASE_URL} + - POSITIONMANAGER_BASE_URL=${POSITIONMANAGER_BASE_URL} volumes: - ./:/app/predictionengine - ~/.config/gcloud/application_default_credentials.json:/root/.config/gcloud/application_default_credentials.json:ro diff --git a/application/predictionengine/pyproject.toml b/application/predictionengine/pyproject.toml index fe6b95551..fe84e9337 100644 --- a/application/predictionengine/pyproject.toml +++ b/application/predictionengine/pyproject.toml @@ -8,10 +8,11 @@ dependencies = [ "uvicorn>=0.34.2", "tinygrad>=0.10.3", "polars>=1.29.0", - "category-encoders>=2.8.1", "requests>=2.31.0", "prometheus-fastapi-instrumentator>=7.1.0", "loguru>=0.7.3", + "numpy>=2.2.6", + "pyarrow>=20.0.0", ] [tool.hatch.build.targets.wheel] diff --git a/application/predictionengine/src/predictionengine/dataset.py b/application/predictionengine/src/predictionengine/dataset.py index 9de197069..44c272c2b 100644 --- a/application/predictionengine/src/predictionengine/dataset.py +++ b/application/predictionengine/src/predictionengine/dataset.py @@ -2,9 +2,112 @@ from typing import Any import polars as pl -from category_encoders import OrdinalEncoder from tinygrad.tensor import Tensor + +class OrdinalEncoder: # implemented due to category-encoders package dependency issues + def __init__( + self, + columns: list[str] | None = None, + handle_unknown: str = "use_encoded_value", + handle_missing: str = "use_encoded_value", + ) -> None: + self.columns: list[str] = columns or [] + self.mapping_: dict[str, dict[str, int]] = {} + self.handle_unknown: str = handle_unknown + self.handle_missing: str = handle_missing + + def fit_transform(self, transformation_input: pl.DataFrame) -> pl.DataFrame: + result = transformation_input.clone() + + for column in self.columns: + if column not in transformation_input.columns: + continue + + unique_values = ( + transformation_input.select(column) + .drop_nulls() + .unique() + .to_series() + .to_list() + ) + + self.mapping_[column] = { + str(val): idx + 1 for idx, val in enumerate(unique_values) + } + + if self.handle_unknown == "use_encoded_value": + self.mapping_[column]["__unknown__"] = 0 + if self.handle_missing == "use_encoded_value": + self.mapping_[column]["__missing__"] = 0 + + result = result.with_columns( + pl.col(column) + .fill_null("__missing__") + .cast(pl.Utf8) + .map_elements( + lambda x, col=column: self.mapping_[col].get( + str(x), self.mapping_[col].get("__unknown__", 0) + ), + return_dtype=pl.Int32, + ) + .alias(column) + ) + + return result + + def transform(self, transformation_input: pl.DataFrame) -> pl.DataFrame: + result = transformation_input.clone() + + for column in self.columns: + if ( + column not in transformation_input.columns + or column not in self.mapping_ + ): + continue + + result = result.with_columns( + pl.col(column) + .fill_null("__missing__") + .cast(pl.Utf8) + .map_elements( + lambda x, col=column: self.mapping_[col].get( + str(x), self.mapping_[col].get("__unknown__", 0) + ), + return_dtype=pl.Int32, + ) + .alias(column) + ) + + return result + + def inverse_transform(self, transformation_input: pl.DataFrame) -> pl.DataFrame: + result = transformation_input.clone() + + for column in self.columns: + if ( + column not in transformation_input.columns + or column not in self.mapping_ + ): + continue + + reverse_mapping = {v: k for k, v in self.mapping_[column].items()} + + result = result.with_columns( + pl.col(column) + .cast(pl.Int32) + .map_elements( + lambda x, reverse_mapping=reverse_mapping: reverse_mapping.get( + int(x), "__unknown__" + ), + return_dtype=pl.Utf8, + ) + .alias(column) + ) + + return result + + continuous_variable_columns = [ "open_price", "high_price", @@ -94,16 +197,16 @@ def _generate_complete_time_series(self, data: pl.DataFrame) -> pl.DataFrame: def _encode_tickers(self, data: pl.DataFrame) -> pl.DataFrame: ticker_encoder = OrdinalEncoder( - cols=["ticker"], + columns=["ticker"], handle_unknown="use_encoded_value", handle_missing="use_encoded_value", ) self.preprocessors["ticker_encoder"] = ticker_encoder - ticker_df = data.select("ticker").to_pandas() - encoded_tickers = ticker_encoder.fit_transform(ticker_df) + ticker_df = data.select("ticker") + encoded_data = ticker_encoder.fit_transform(ticker_df) - return data.with_columns(pl.Series("ticker", encoded_tickers["ticker"])) + return data.with_columns(encoded_data.select("ticker")) def _compute_scalers(self, data: pl.DataFrame) -> None: if len(self.scalers) == 0: diff --git a/application/predictionengine/src/predictionengine/main.py b/application/predictionengine/src/predictionengine/main.py index a86bcbf3d..b24eb7ef6 100644 --- a/application/predictionengine/src/predictionengine/main.py +++ b/application/predictionengine/src/predictionengine/main.py @@ -1,4 +1,5 @@ import os +import random import traceback from collections.abc import AsyncGenerator from contextlib import asynccontextmanager @@ -7,6 +8,7 @@ from zoneinfo import ZoneInfo import polars as pl +import pyarrow as pa import requests from fastapi import FastAPI, HTTPException, Request, Response, status from loguru import logger @@ -28,6 +30,9 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: datamanager_base_url = os.getenv("DATAMANAGER_BASE_URL", "") app.state.datamanager_base_url = datamanager_base_url + positionmanager_base_url = os.getenv("POSITIONMANAGER_BASE_URL", "") + app.state.positionmanager_base_url = positionmanager_base_url + app.state.model = None yield @@ -42,7 +47,9 @@ async def health_check() -> Response: def fetch_historical_data( - datamanager_url: str, start_date: date, end_date: date + datamanager_url: str, + start_date: date, + end_date: date, ) -> pl.DataFrame: url = f"{datamanager_url}/equity-bars" parameters = { @@ -50,11 +57,9 @@ def fetch_historical_data( "end_date": end_date.isoformat(), } - response = requests.get(url, params=parameters, timeout=SEQUENCE_LENGTH) + response = requests.get(url, params=parameters, timeout=120) response.raise_for_status() - import pyarrow as pa - buffer = pa.py_buffer(response.content) reader = pa.ipc.RecordBatchStreamReader(buffer) table = reader.read_all() @@ -96,10 +101,32 @@ def load_or_initialize_model(data: pl.DataFrame) -> MiniatureTemporalFusionTrans return model +# temporarily including with knative unavailable +def send_predictions( + positionmanager_base_url: str, + predictions: dict[str, float], +) -> None: + url = f"{positionmanager_base_url}/predictions" + headers = {"Content-Type": "application/json"} + response = requests.post( + url=url, + json={"predictions": predictions}, + headers=headers, + timeout=SEQUENCE_LENGTH, + ) + + if response.status_code != status.HTTP_200_OK: + logger.error( + f"Failed to send predictions to position manager: {response.status_code} - {response.text}" # noqa: E501 + ) + raise HTTPException( + status_code=response.status_code, + detail=f"Failed to send predictions: {response.text}", + ) + + @application.post("/create-predictions") -async def create_predictions( - request: Request, -) -> PredictionResponse: +async def create_predictions(request: Request) -> PredictionResponse: try: end_date = datetime.now(tz=ZoneInfo("America/New_York")).date() start_date = end_date - timedelta(days=SEQUENCE_LENGTH) @@ -114,52 +141,71 @@ async def create_predictions( status_code=404, detail="No data available for prediction" ) - if request.app.state.model is None: - logger.info("Initializing model") - request.app.state.model = load_or_initialize_model(data) - - model = request.app.state.model - unique_tickers = data["ticker"].unique().to_list() - predictions = {} + selected_tickers = random.sample(unique_tickers, min(10, len(unique_tickers))) - for ticker in unique_tickers: + average_prices = {} + for ticker in selected_tickers: ticker_data = data.filter(pl.col("ticker") == ticker) - if len(ticker_data) < SEQUENCE_LENGTH: - logger.warning(f"Insufficient data for ticker {ticker}") - continue + if not ticker_data.is_empty(): + avg_price = ticker_data.select( + ((pl.col("open_price") + pl.col("close_price")) / 2).mean() + ).item() + average_prices[ticker] = avg_price - recent_data = ticker_data.tail(SEQUENCE_LENGTH) + send_predictions(request.app.state.positionmanager_base_url, average_prices) - dataset = DataSet( - batch_size=1, - sequence_length=SEQUENCE_LENGTH, - sample_count=1, - ) - dataset.load_data(recent_data) + return PredictionResponse(predictions=average_prices) # this is temporary - try: - tickers_batch, features_batch, _ = next(iter(dataset.batches())) - except StopIteration: - logger.warning(f"No batches available for ticker {ticker}") - continue + # if request.app.state.model is None: + # logger.info("Initializing model") + # request.app.state.model = load_or_initialize_model(data) - percentile_25, percentile_50, percentile_75 = model.predict( - tickers_batch, features_batch - ) + # model = request.app.state.model - predictions[ticker] = { - "percentile_25": float(percentile_25[0]), - "percentile_50": float(percentile_50[0]), - "percentile_75": float(percentile_75[0]), - } + # unique_tickers = data["ticker"].unique().to_list() + # predictions = {} - if not predictions: - raise HTTPException( # noqa: TRY301 - status_code=404, detail="No predictions could be generated" - ) + # for ticker in unique_tickers: + # ticker_data = data.filter(pl.col("ticker") == ticker) + # if len(ticker_data) < SEQUENCE_LENGTH: + # logger.warning(f"Insufficient data for ticker {ticker}") + # continue + + # recent_data = ticker_data.tail(SEQUENCE_LENGTH) + + # dataset = DataSet( + # batch_size=1, + # sequence_length=SEQUENCE_LENGTH, + # sample_count=1, + # ) + # dataset.load_data(recent_data) + + # try: + # tickers_batch, features_batch, _ = next(iter(dataset.batches())) + # except StopIteration: + # logger.warning(f"No batches available for ticker {ticker}") + # continue + + # percentile_25, percentile_50, percentile_75 = model.predict( + # tickers_batch, + # features_batch, + # ) + + # predictions[ticker] = { + # "percentile_25": float(percentile_25[0]), + # "percentile_50": float(percentile_50[0]), + # "percentile_75": float(percentile_75[0]), + # } + + # if not predictions: + # raise HTTPException( + # status_code=404, detail="No predictions could be generated" + # ) + + # send_predictions(request.app.state.positionmanager_base_url, predictions) - return PredictionResponse(predictions=predictions) + # return PredictionResponse(predictions=predictions) except HTTPException: raise diff --git a/application/predictionengine/src/predictionengine/miniature_temporal_fusion_transformer.py b/application/predictionengine/src/predictionengine/miniature_temporal_fusion_transformer.py index b04d40b58..bbf9f8e8c 100644 --- a/application/predictionengine/src/predictionengine/miniature_temporal_fusion_transformer.py +++ b/application/predictionengine/src/predictionengine/miniature_temporal_fusion_transformer.py @@ -1,6 +1,5 @@ import numpy as np import numpy.typing as npt -from category_encoders import OrdinalEncoder from tinygrad.nn.optim import Adam from tinygrad.nn.state import ( get_parameters, @@ -11,7 +10,7 @@ ) from tinygrad.tensor import Tensor -from .dataset import DataSet +from .dataset import DataSet, OrdinalEncoder from .gated_residual_network import GatedResidualNetwork from .long_short_term_memory import LongShortTermMemory from .loss_function import quantile_loss @@ -73,7 +72,6 @@ def __init__( # noqa: PLR0913 self.parameters = get_parameters(self) def get_parameters(self) -> list[Tensor]: - """Return all trainable parameters of the model.""" return self.parameters def forward( @@ -181,7 +179,9 @@ def predict( tickers: Tensor, input_: Tensor, ) -> tuple[ - npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.float64] + npt.NDArray[np.float64], + npt.NDArray[np.float64], + npt.NDArray[np.float64], ]: predictions, _, _ = self.forward(tickers, input_) diff --git a/application/predictionengine/src/predictionengine/post_processor.py b/application/predictionengine/src/predictionengine/post_processor.py index 3d832159d..ed9b1aece 100644 --- a/application/predictionengine/src/predictionengine/post_processor.py +++ b/application/predictionengine/src/predictionengine/post_processor.py @@ -1,9 +1,10 @@ import numpy as np import numpy.typing as npt import polars as pl -from category_encoders import OrdinalEncoder from tinygrad.tensor import Tensor +from .dataset import OrdinalEncoder + TensorMapping = dict[str, Tensor] @@ -29,13 +30,18 @@ def post_process_predictions( npt.NDArray[np.float64], npt.NDArray[np.float64], ]: - decoded_tickers = self.ticker_encoder.inverse_transform( - pl.DataFrame( - { - "ticker": encoded_tickers, - } - ).to_pandas() - )["ticker"] + decoded_tickers = ( + self.ticker_encoder.inverse_transform( + transformation_input=pl.DataFrame( + { + "ticker": encoded_tickers, + } + ) + ) + .select("ticker") + .to_series() + .to_list() + ) rescaled_predictions = np.empty_like(predictions) diff --git a/application/predictionengine/tests/test_post_processor.py b/application/predictionengine/tests/test_post_processor.py index dd08caaa4..380b85d74 100644 --- a/application/predictionengine/tests/test_post_processor.py +++ b/application/predictionengine/tests/test_post_processor.py @@ -1,15 +1,15 @@ import numpy as np import polars as pl -from category_encoders import OrdinalEncoder from tinygrad.tensor import Tensor +from application.predictionengine.src.predictionengine.dataset import OrdinalEncoder from application.predictionengine.src.predictionengine.post_processor import ( PostProcessor, ) def test_post_processor_initialization() -> None: - ticker_encoder = OrdinalEncoder(cols=["ticker"]) + ticker_encoder = OrdinalEncoder(columns=["ticker"]) means_by_ticker = {"AAPL": Tensor([150.0])} standard_deviations_by_ticker = {"AAPL": Tensor([5.0])} @@ -27,8 +27,8 @@ def test_post_processor_initialization() -> None: def test_post_processor_predictions() -> None: tickers = ["AAPL", "GOOGL"] - ticker_encoder = OrdinalEncoder(cols=["ticker"]) - ticker_encoder.fit(pl.DataFrame({"ticker": tickers}).to_pandas()) + ticker_encoder = OrdinalEncoder(columns=["ticker"]) + ticker_encoder.fit_transform(transformation_input=pl.DataFrame({"ticker": tickers})) means_by_ticker = { "AAPL": Tensor([150.0]), @@ -41,7 +41,7 @@ def test_post_processor_predictions() -> None: } encoded_tickers = ticker_encoder.transform( - pl.DataFrame({"ticker": tickers}).to_pandas() + transformation_input=pl.DataFrame({"ticker": tickers}) )["ticker"].to_numpy() predictions = np.array( @@ -78,14 +78,16 @@ def test_post_processor_predictions() -> None: def test_post_processor_single_ticker() -> None: - ticker_encoder = OrdinalEncoder(cols=["ticker"]) - ticker_encoder.fit(pl.DataFrame({"ticker": ["AAPL"]}).to_pandas()) + ticker_encoder = OrdinalEncoder(columns=["ticker"]) + ticker_encoder.fit_transform( + transformation_input=pl.DataFrame({"ticker": ["AAPL"]}) + ) means_by_ticker = {"AAPL": Tensor([100.0])} standard_deviations_by_ticker = {"AAPL": Tensor([10.0])} encoded_tickers = ticker_encoder.transform( - pl.DataFrame({"ticker": ["AAPL"]}).to_pandas() + transformation_input=pl.DataFrame({"ticker": ["AAPL"]}) )["ticker"].to_numpy() predictions = np.array([[0.5, 1.0, 1.5]]) # single prediction diff --git a/infrastructure/Pulumi.yaml b/infrastructure/Pulumi.yaml index a139b0c60..d5baba95c 100644 --- a/infrastructure/Pulumi.yaml +++ b/infrastructure/Pulumi.yaml @@ -1,4 +1,4 @@ --- -name: pocketsizefund-infrastructure +name: pocketsizefund runtime: python -description: Pocket Size Fund Infrastructure +description: Pocket Size Fund infrastructure diff --git a/infrastructure/__main__.py b/infrastructure/__main__.py index 25f662235..1d5538dd1 100644 --- a/infrastructure/__main__.py +++ b/infrastructure/__main__.py @@ -1,11 +1,11 @@ -import base64 - import buckets # noqa: F401 -import topics +import monitoring # noqa: F401 +import project +import pulumi_std as std from environment_variables import ( - ALPACA_API_KEY_ID, - ALPACA_API_SECRET_KEY, - DATA_BUCKET, + ALPACA_API_KEY, + ALPACA_API_SECRET, + DATA_BUCKET_NAME, DUCKDB_ACCESS_KEY, DUCKDB_SECRET, GCP_PROJECT, @@ -13,17 +13,17 @@ create_environment_variable, ) from project import platform_service_account -from pulumi import export -from pulumi_gcp import cloudscheduler, pubsub +from pulumi import ResourceOptions, export +from pulumi_gcp import cloudscheduler from services import create_service datamanager_service = create_service( name="datamanager", - envs=[ - ALPACA_API_KEY_ID, - ALPACA_API_SECRET_KEY, + environment_variables=[ + ALPACA_API_KEY, + ALPACA_API_SECRET, GCP_PROJECT, - DATA_BUCKET, + DATA_BUCKET_NAME, DUCKDB_ACCESS_KEY, DUCKDB_SECRET, POLYGON_API_KEY, @@ -31,54 +31,111 @@ ) DATAMANAGER_BASE_URL = create_environment_variable( - "DATAMANAGER_BASE_URL", datamanager_service.statuses[0].url + name="DATAMANAGER_BASE_URL", + value=datamanager_service.statuses[0].url, +) + +positionmanager_service = create_service( + name="positionmanager", + environment_variables=[ + ALPACA_API_KEY, + ALPACA_API_SECRET, + DATAMANAGER_BASE_URL, + ], +) + +POSITIONMANAGER_BASE_URL = create_environment_variable( + name="POSITIONMANAGER_BASE_URL", + value=positionmanager_service.statuses[0].url, ) predictionengine_service = create_service( - "predictionengine", envs=[DATAMANAGER_BASE_URL] + name="predictionengine", + environment_variables=[ + DATAMANAGER_BASE_URL, + POSITIONMANAGER_BASE_URL, + ], ) +PREDICTIONENGINE_BASE_URL = create_environment_variable( + name="PREDICTIONENGINE_BASE_URL", + value=predictionengine_service.statuses[0].url, +) -positionmanager_service = create_service( - "positionmanager", - envs=[ - ALPACA_API_KEY_ID, - ALPACA_API_SECRET_KEY, +eventtrigger_service = create_service( + name="eventtrigger", + environment_variables=[ DATAMANAGER_BASE_URL, + POSITIONMANAGER_BASE_URL, + PREDICTIONENGINE_BASE_URL, ], ) +datamanager_data_fetch = cloudscheduler.Job( + resource_name="datamanager-data-fetch", + description="Fetch prior day data for storage", + schedule="0 0 * * 1-5", + time_zone="America/New_York", + http_target=cloudscheduler.JobHttpTargetArgs( + uri=eventtrigger_service.statuses[0].url.apply(lambda url: f"{url}/trigger"), + http_method="POST", + body=std.base64encode(input='{"event": "fetch_data"}').result, + oidc_token=cloudscheduler.JobHttpTargetOidcTokenArgs( + service_account_email=platform_service_account.email + ), + ), + opts=ResourceOptions(depends_on=[project.cloudscheduler_api]), +) -datamanager_subscription = pubsub.Subscription( - "datamanager-subscription", - topic=topics.datamanager_ping.id, - push_config=pubsub.SubscriptionPushConfigArgs( - push_endpoint=datamanager_service.statuses[0].url, - oidc_token=pubsub.SubscriptionPushConfigOidcTokenArgs( +predictionengine_create_positions = cloudscheduler.Job( + resource_name="predictionengine-create-positions", + description="Generate predictions and create positions", + schedule="0 10 * * 1", + time_zone="America/New_York", + http_target=cloudscheduler.JobHttpTargetArgs( + uri=eventtrigger_service.statuses[0].url.apply(lambda url: f"{url}/trigger"), + http_method="POST", + body=std.base64encode(input='{"event": "create_positions"}').result, + oidc_token=cloudscheduler.JobHttpTargetOidcTokenArgs( service_account_email=platform_service_account.email ), ), + opts=ResourceOptions(depends_on=[project.cloudscheduler_api]), ) -datamanager_job = cloudscheduler.Job( - "datamanager-job", - schedule="0 0 * * *", - time_zone="UTC", - pubsub_target=cloudscheduler.JobPubsubTargetArgs( - topic_name=topics.datamanager_ping.id, - data=base64.b64encode(b"{}").decode("utf-8"), + +positionmanager_close_positions = cloudscheduler.Job( + resource_name="positionmanager-close-positions", + description="Close open positions", + schedule="0 15 * * 5", + time_zone="America/New_York", + http_target=cloudscheduler.JobHttpTargetArgs( + uri=eventtrigger_service.statuses[0].url.apply(lambda url: f"{url}/trigger"), + http_method="POST", + body=std.base64encode(input='{"event": "close_positions"}').result, + oidc_token=cloudscheduler.JobHttpTargetOidcTokenArgs( + service_account_email=platform_service_account.email + ), ), + opts=ResourceOptions(depends_on=[project.cloudscheduler_api]), ) +export( + name="DATAMANAGER_BASE_URL", + value=datamanager_service.statuses[0].url, +) -export("DATAMANAGER_BASE_URL", datamanager_service.statuses[0].url) +export( + name="DATAMANAGER_METRICS_URL", + value=datamanager_service.statuses[0].url.apply(lambda url: f"{url}/metrics"), +) export( - "DATAMANAGER_METRICS_URL", - datamanager_service.statuses[0].url.apply(lambda url: f"{url}/metrics"), + name="POSITIONMANAGER_METRICS_URL", + value=positionmanager_service.statuses[0].url.apply(lambda url: f"{url}/metrics"), ) export( - "POSITIONMANAGER_METRICS_URL", - positionmanager_service.statuses[0].url.apply(lambda url: f"{url}/metrics"), + name="EVENTTRIGGER_BASE_URL", + value=eventtrigger_service.statuses[0].url, ) diff --git a/infrastructure/buckets.py b/infrastructure/buckets.py index 38aba1d0f..1856532ab 100644 --- a/infrastructure/buckets.py +++ b/infrastructure/buckets.py @@ -4,17 +4,11 @@ config = Config() -production_data_bucket = storage.Bucket( - "production-data-bucket", - name=config.require_secret("production_data_bucket_name"), - location=project.REGION, - uniform_bucket_level_access=True, -) - +data_bucket_name = config.require_secret("DATA_BUCKET_NAME") storage.BucketIAMMember( "platform-object-administrator-access", - bucket=production_data_bucket.name, + bucket=data_bucket_name, role="roles/storage.objectAdmin", member=project.platform_service_account.email.apply( lambda e: f"serviceAccount:{e}" @@ -23,7 +17,7 @@ grafana_dashboards_bucket = storage.Bucket( "grafana-dashboards-bucket", - name=config.require_secret("grafana_dashboards_bucket_name"), + name=config.require_secret("GRAFANA_DASHBOARDS_BUCKET_NAME"), location=project.REGION, uniform_bucket_level_access=True, ) diff --git a/infrastructure/environment_variables.py b/infrastructure/environment_variables.py index 1fcc7cb5c..62d161f95 100644 --- a/infrastructure/environment_variables.py +++ b/infrastructure/environment_variables.py @@ -19,15 +19,15 @@ def create_environment_variable( name="GCP_PROJECT", value=config.require_secret("GCP_PROJECT") ) -ALPACA_API_KEY_ID = create_environment_variable( - name="ALPACA_API_KEY_ID", value=config.require_secret("ALPACA_API_KEY_ID") +ALPACA_API_KEY = create_environment_variable( + name="ALPACA_API_KEY", value=config.require_secret("ALPACA_API_KEY") ) -ALPACA_API_SECRET_KEY = create_environment_variable( - name="ALPACA_API_SECRET_KEY", value=config.require_secret("ALPACA_API_SECRET_KEY") +ALPACA_API_SECRET = create_environment_variable( + name="ALPACA_API_SECRET", value=config.require_secret("ALPACA_API_SECRET") ) -DATA_BUCKET = create_environment_variable( - name="DATA_BUCKET", value=config.require_secret("DATA_BUCKET") +DATA_BUCKET_NAME = create_environment_variable( + name="DATA_BUCKET_NAME", value=config.require_secret("DATA_BUCKET_NAME") ) POLYGON_API_KEY = create_environment_variable( name="POLYGON_API_KEY", value=config.require_secret("POLYGON_API_KEY") @@ -44,5 +44,5 @@ def create_environment_variable( ) DUCKDB_SECRET = create_environment_variable(name="DUCKDB_SECRET", value=hmac_key.secret) -export("duckdb_access_key", hmac_key.access_id) -export("duckdb_secret", hmac_key.secret) +export("DUCKDB_ACCESS_KEY", hmac_key.access_id) +export("DUCKDB_SECRET", hmac_key.secret) diff --git a/infrastructure/monitoring.py b/infrastructure/monitoring.py index 706ea62ca..1b810c899 100644 --- a/infrastructure/monitoring.py +++ b/infrastructure/monitoring.py @@ -1,11 +1,13 @@ -import buckets import project +from pulumi import ResourceOptions from pulumi.config import Config -from pulumi_gcp import cloudrun, secretmanager, storage +from pulumi_gcp import monitoring, secretmanager configuration = Config() -grafana_administrator_password = configuration.require_secret("GRAFANA_ADMIN_PASSWORD") +grafana_administrator_password = configuration.require_secret( + "GRAFANA_ADMINISTRATOR_PASSWORD" +) grafana_administrator_password_secret = secretmanager.Secret( "grafana-administrator-password", @@ -21,122 +23,37 @@ ] } }, + opts=ResourceOptions(depends_on=[project.secretmanager]), ) + grafana_administrator_password_version = secretmanager.SecretVersion( "grafana-administrator-password-version", secret=grafana_administrator_password_secret.id, secret_data=grafana_administrator_password, ) -prometheus_configuration = """ -global: - scrape_interval: 30s +prometheus_alert_emails = configuration.require_secret("PROMETHEUS_ALERT_EMAILS") -scrape_configs: - - job_name: 'cloud-run-services' - metrics_path: /metrics - static_configs: - - targets: - - datamanager - - positionmanager -""" -prometheus_config_object = storage.BucketObject( - "prometheus-configuration", - bucket=buckets.grafana_dashboards_bucket.name, - content=prometheus_configuration, - content_type="text/yaml", - name="prometheus.yaml", -) +def create_notification_channels( + email_string: str, +) -> list[monitoring.NotificationChannel]: + emails = email_string.split(",") + channels = [] + for i, email in enumerate(emails): + channel = monitoring.NotificationChannel( + f"email-notifications-{i}", + display_name=f"Email Notifications - {email.strip()}", + type="email", + labels={"email_address": email.strip()}, + enabled=True, + opts=ResourceOptions(depends_on=[project.monitoring_api]), + ) + channels.append(channel) + + return channels -prometheus_service = cloudrun.Service( - "prometheus", - location=project.REGION, - template=cloudrun.ServiceTemplateArgs( - spec=cloudrun.ServiceTemplateSpecArgs( - service_account_name=project.platform_service_account.email, - containers=[ - cloudrun.ServiceTemplateSpecContainerArgs( - image="prom/prometheus:v2.51.2", - args=[ - "--config.file=/etc/prometheus/prometheus.yaml", - "--storage.tsdb.path=/prometheus", - ], - resources=cloudrun.ServiceTemplateSpecContainerResourcesArgs( - limits={"cpu": "500m", "memory": "512Mi"} - ), - volume_mounts=[ - cloudrun.ServiceTemplateSpecContainerVolumeMountArgs( - name="prometheus-configuration-volume", - mount_path="/etc/prometheus", - ), - ], - ports=[ - cloudrun.ServiceTemplateSpecContainerPortArgs( - container_port=9090, - ), - ], - ), - ], - volumes=[ - cloudrun.ServiceTemplateSpecVolumeArgs( - name="prometheus-configuration-volume", - csi=cloudrun.ServiceTemplateSpecVolumeCsiArgs( - driver="gcsfuse.run.app", - read_only=True, - volume_attributes={ - "bucketName": buckets.grafana_dashboards_bucket.name, - "mountOptions": "implicit-dirs", - }, - ), - ), - cloudrun.ServiceTemplateSpecVolumeArgs( - name="prometheus-data", - empty_dir=cloudrun.ServiceTemplateSpecVolumeEmptyDirArgs(), - ), - ], - ), - ), -) -grafana_service = cloudrun.Service( - "grafana", - location=project.REGION, - template=cloudrun.ServiceTemplateArgs( - spec=cloudrun.ServiceTemplateSpecArgs( - service_account_name=project.platform_service_account.email, - containers=[ - cloudrun.ServiceTemplateSpecContainerArgs( - image="grafana/grafana:10.4.1", - envs=[ - cloudrun.ServiceTemplateSpecContainerEnvArgs( - name="GF_SECURITY_ADMIN_PASSWORD", - value_from=cloudrun.ServiceTemplateSpecContainerEnvValueFromArgs( - secret_key_ref=cloudrun.ServiceTemplateSpecContainerEnvValueFromSecretKeyRefArgs( - name=grafana_administrator_password_secret.name, - key=grafana_administrator_password_version.version, - ) - ), - ), - cloudrun.ServiceTemplateSpecContainerEnvArgs( - name="GF_INSTALL_PLUGINS", - value="grafana-piechart-panel", - ), - cloudrun.ServiceTemplateSpecContainerEnvArgs( - name="GRAFANA_DASHBOARD_BUCKET", - value=buckets.grafana_dashboards_bucket.name, - ), - ], - ports=[ - cloudrun.ServiceTemplateSpecContainerPortArgs( - container_port=3000, - ), - ], - resources=cloudrun.ServiceTemplateSpecContainerResourcesArgs( - limits={"cpu": "1", "memory": "1Gi"} - ), - ) - ], - ), - ), +email_notification_channels = prometheus_alert_emails.apply( + create_notification_channels ) diff --git a/infrastructure/project.py b/infrastructure/project.py index 18a99876f..23b949194 100644 --- a/infrastructure/project.py +++ b/infrastructure/project.py @@ -6,21 +6,23 @@ REGION = pulumi.Config("gcp").require("region") cloudrun = Service( - "enable-run", + resource_name="enable-run", project=PROJECT, service="run.googleapis.com", disable_dependent_services=True, disable_on_destroy=True, ) + eventarc = Service( - "enable-eventarc", + resource_name="enable-eventarc", project=PROJECT, service="eventarc.googleapis.com", disable_dependent_services=True, disable_on_destroy=True, ) + secretmanager = Service( - "enable-secretmanager", + resource_name="enable-secretmanager", project=PROJECT, service="secretmanager.googleapis.com", disable_dependent_services=True, @@ -28,7 +30,7 @@ ) pubsub_api = Service( - "enable-pubsub", + resource_name="enable-pubsub", project=PROJECT, service="pubsub.googleapis.com", disable_dependent_services=True, @@ -36,23 +38,47 @@ ) container_registry = Service( - "enable-containerregistry", + resource_name="enable-containerregistry", project=PROJECT, service="containerregistry.googleapis.com", disable_dependent_services=True, disable_on_destroy=True, ) +cloudbuild = Service( + resource_name="enable-cloudbuild", + project=PROJECT, + service="cloudbuild.googleapis.com", + disable_dependent_services=True, + disable_on_destroy=True, +) + +monitoring_api = Service( + resource_name="enable-monitoring", + project=PROJECT, + service="monitoring.googleapis.com", + disable_dependent_services=True, + disable_on_destroy=True, +) + +cloudscheduler_api = Service( + resource_name="enable-cloudscheduler", + project=PROJECT, + service="cloudscheduler.googleapis.com", + disable_dependent_services=True, + disable_on_destroy=True, +) + platform_service_account = Account( - "platform-service-account", + resource_name="platform-service-account", account_id="platform", display_name="Platform-Wide Cloud Run Service Account", ) IAMMember( - "pubsub-token-access", + resource_name="pubsub-token-access", project=PROJECT, role="roles/pubsub.subscriber", member=platform_service_account.email.apply( @@ -61,7 +87,7 @@ ) IAMMember( - "platform-service-account-owner", + resource_name="platform-service-account-owner", project=PROJECT, role="roles/owner", member=platform_service_account.email.apply(lambda e: f"serviceAccount:{e}"), # type: ignore diff --git a/infrastructure/prometheus.yaml b/infrastructure/prometheus.yaml deleted file mode 100644 index c114e14f8..000000000 --- a/infrastructure/prometheus.yaml +++ /dev/null @@ -1,11 +0,0 @@ -global: - scrape_interval: 30s - -scrape_configs: - - job_name: 'cloud-run-services' - metrics_path: /metrics - static_configs: - - targets: - - datamanager - - positionmanager - - predictionengine diff --git a/infrastructure/pyproject.toml b/infrastructure/pyproject.toml index dd4fcc400..d606b3e83 100644 --- a/infrastructure/pyproject.toml +++ b/infrastructure/pyproject.toml @@ -1,10 +1,11 @@ [project] name = "infrastructure" -version = "20250606.4" +version = "20250629.1" requires-python = "==3.12.10" dependencies = [ "pulumi>=3.169.0", "pulumi-gcp>=8.30.1", + "pulumi-std>=2.2.0", "pulumi-docker>=3.0.0", "pulumi-docker-build>=0.0.12", "loguru>=0.7.3", diff --git a/infrastructure/services.py b/infrastructure/services.py index 5ad1c8e29..993f4c378 100644 --- a/infrastructure/services.py +++ b/infrastructure/services.py @@ -9,6 +9,7 @@ from pulumi_gcp.cloudrun import ( Service, ServiceTemplateArgs, + ServiceTemplateMetadataArgs, ServiceTemplateSpecArgs, ServiceTemplateSpecContainerArgs, ServiceTemplateSpecContainerStartupProbeArgs, @@ -19,10 +20,12 @@ def create_service( - name: str, envs: list[ENVIRONMENT_VARIABLE] | None = None + name: str, + environment_variables: list[ENVIRONMENT_VARIABLE] | None = None, + enable_prometheus: bool = True, # noqa: FBT001, FBT002 ) -> Service: - if envs is None: - envs = [] + if environment_variables is None: + environment_variables = [] try: with Path("pyproject.toml").open("rb") as f: @@ -30,16 +33,16 @@ def create_service( version = project_data.get("project", {}).get("version") except (FileNotFoundError, tomllib.TOMLDecodeError, ValueError) as e: - msg = f"Failed to read version from pyproject.toml: {e}" - raise RuntimeError(msg) from e + message = f"Failed to read version from pyproject.toml: {e}" + raise RuntimeError(message) from e service_dir = Path("../application") / name if not service_dir.exists(): - msg = f"Service directory not found: {service_dir}" - raise FileNotFoundError(msg) + message = f"Service directory not found: {service_dir}" + raise FileNotFoundError(message) image = docker_build.Image( - f"{name}-image", + resource_name=f"{name}-image", tags=[f"pocketsizefund/{name}:{version}"], context=docker_build.BuildContextArgs(location=str(service_dir)), platforms=[ @@ -50,23 +53,37 @@ def create_service( registries=[ docker_build.RegistryArgs( address="docker.io", - username=config.require_secret("dockerhub_username"), - password=config.require_secret("dockerhub_password"), + username=config.require_secret("DOCKERHUB_USERNAME"), + password=config.require_secret("DOCKERHUB_PASSWORD"), ) ], ) + # annotations for Managed Service for Prometheus + annotations = {} + if enable_prometheus: + annotations.update( + { + "run.googleapis.com/cpu-throttling": "false", + "prometheus.googleapis.com/scrape": "true", + "prometheus.googleapis.com/port": "8080", + "prometheus.googleapis.com/path": "/metrics", + "prometheus.googleapis.com/scrape_interval": "60s", + } + ) + return Service( - name, - opts=ResourceOptions(depends_on=[image]), + resource_name=name, + opts=ResourceOptions(depends_on=[image, project.cloudrun]), location=project.REGION, template=ServiceTemplateArgs( + metadata=ServiceTemplateMetadataArgs(annotations=annotations), spec=ServiceTemplateSpecArgs( service_account_name=project.platform_service_account.email, containers=[ ServiceTemplateSpecContainerArgs( image=f"pocketsizefund/{name}:{version}", - envs=envs, + envs=environment_variables, startup_probe=ServiceTemplateSpecContainerStartupProbeArgs( initial_delay_seconds=60, period_seconds=60, diff --git a/infrastructure/topics.py b/infrastructure/topics.py deleted file mode 100644 index 0c5d197e4..000000000 --- a/infrastructure/topics.py +++ /dev/null @@ -1,3 +0,0 @@ -from pulumi_gcp.pubsub import Topic - -datamanager_ping = Topic("datamanager-ping") diff --git a/pyproject.toml b/pyproject.toml index 89530f844..095464683 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ members = [ "application/datamanager", "application/positionmanager", "application/predictionengine", + "application/eventtrigger", "workflows", ] @@ -24,6 +25,7 @@ members = [ datamanager = { workspace = true } positionmanager = { workspace = true } predictionengine = { workspace = true } +eventtrigger = { workspace = true } workflows = { workspace = true } [tool.hatch.build.targets.wheel] @@ -45,6 +47,7 @@ testpaths = [ "application/datamanager/tests", "application/positionmanager/tests", "application/predictionengine/tests", + "application/eventtrigger/tests", ] python_files = ["test_*.py"] python_classes = ["Test*"] @@ -132,6 +135,7 @@ ignore = [ [tool.ruff.lint.per-file-ignores] "**/tests/**/*.py" = ["S101"] "**/features/steps/**/*.py" = ["S101"] +"application/predictionengine/src/predictionengine/main.py" = ["ERA001"] # temporary [tool.ty.rules] unresolved-import = "ignore" diff --git a/uv.lock b/uv.lock index fce5db03c..0e835ef3c 100644 --- a/uv.lock +++ b/uv.lock @@ -5,6 +5,7 @@ requires-python = "==3.12.10" [manifest] members = [ "datamanager", + "eventtrigger", "infrastructure", "pocketsizefund", "positionmanager", @@ -274,23 +275,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/72/76/20fa66124dbe6be5cafeb312ece67de6b61dd91a0247d1ea13db4ebb33c2/cachetools-5.5.2-py3-none-any.whl", hash = "sha256:d26a22bcc62eb95c3beabd9f1ee5e820d3d2704fe2967cbe350e20c8ffcd3f0a", size = 10080, upload-time = "2025-02-20T21:01:16.647Z" }, ] -[[package]] -name = "category-encoders" -version = "2.8.1" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "numpy" }, - { name = "pandas" }, - { name = "patsy" }, - { name = "scikit-learn" }, - { name = "scipy" }, - { name = "statsmodels" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/89/32/f5dff088ebae54d5464124298a9262bdd07c55558ba7e9b961be7ecdb0e6/category_encoders-2.8.1.tar.gz", hash = "sha256:57af8a23bde3cf622ee7e17c11547011795e4d337839d40cbd16c36b67291b33", size = 57856, upload-time = "2025-03-15T16:17:23.176Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/92/fb/908cb215a30b117bb079a767176038599a5447f2506e21aa2e90d0aabfff/category_encoders-2.8.1-py3-none-any.whl", hash = "sha256:ba77bde0a0afe13732b04997635b1ae82569e42c9696bc361c68674197988303", size = 85710, upload-time = "2025-03-15T16:17:21.839Z" }, -] - [[package]] name = "certifi" version = "2025.4.26" @@ -623,6 +607,29 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/56/9a/ca30572f3e3ff3cef6a0ea8aa6cdc12c36f9fefe559f65c7d6265713196a/ecos-2.0.14-cp312-cp312-win_amd64.whl", hash = "sha256:718eb62afb8e45426bcc365ebaf3ca9f610afcbb754de6073ef5f104da8fca1f", size = 72248, upload-time = "2024-06-18T03:48:51.504Z" }, ] +[[package]] +name = "eventtrigger" +version = "0.1.0" +source = { editable = "application/eventtrigger" } +dependencies = [ + { name = "fastapi" }, + { name = "loguru" }, + { name = "prometheus-fastapi-instrumentator" }, + { name = "pydantic" }, + { name = "requests" }, + { name = "uvicorn" }, +] + +[package.metadata] +requires-dist = [ + { name = "fastapi", specifier = ">=0.115.12" }, + { name = "loguru", specifier = ">=0.7.3" }, + { name = "prometheus-fastapi-instrumentator", specifier = ">=7.1.0" }, + { name = "pydantic", specifier = ">=2.5.0" }, + { name = "requests", specifier = ">=2.31.0" }, + { name = "uvicorn", specifier = ">=0.34.2" }, +] + [[package]] name = "fastapi" version = "0.115.12" @@ -993,7 +1000,7 @@ wheels = [ [[package]] name = "infrastructure" -version = "20250606.4" +version = "20250629.1" source = { virtual = "infrastructure" } dependencies = [ { name = "loguru" }, @@ -1001,6 +1008,7 @@ dependencies = [ { name = "pulumi-docker" }, { name = "pulumi-docker-build" }, { name = "pulumi-gcp" }, + { name = "pulumi-std" }, ] [package.metadata] @@ -1010,6 +1018,7 @@ requires-dist = [ { name = "pulumi-docker", specifier = ">=3.0.0" }, { name = "pulumi-docker-build", specifier = ">=0.0.12" }, { name = "pulumi-gcp", specifier = ">=8.30.1" }, + { name = "pulumi-std", specifier = ">=2.2.0" }, ] [[package]] @@ -1440,18 +1449,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0f/4c/f98024021bef4d44dce3613feebd702c7ad8883f777ff8488384c59e9774/parver-0.5-py3-none-any.whl", hash = "sha256:2281b187276c8e8e3c15634f62287b2fb6fe0efe3010f739a6bd1e45fa2bf2b2", size = 15172, upload-time = "2023-10-03T21:06:52.796Z" }, ] -[[package]] -name = "patsy" -version = "1.0.1" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "numpy" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/d1/81/74f6a65b848ffd16c18f920620ce999fe45fe27f01ab3911260ce4ed85e4/patsy-1.0.1.tar.gz", hash = "sha256:e786a9391eec818c054e359b737bbce692f051aee4c661f4141cc88fb459c0c4", size = 396010, upload-time = "2024-11-12T14:10:54.642Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/87/2b/b50d3d08ea0fc419c183a84210571eba005328efa62b6b98bc28e9ead32a/patsy-1.0.1-py2.py3-none-any.whl", hash = "sha256:751fb38f9e97e62312e921a1954b81e1bb2bcda4f5eeabaf94db251ee791509c", size = 232923, upload-time = "2024-11-12T14:10:52.85Z" }, -] - [[package]] name = "pip" version = "25.1.1" @@ -1570,11 +1567,12 @@ name = "predictionengine" version = "0.1.0" source = { editable = "application/predictionengine" } dependencies = [ - { name = "category-encoders" }, { name = "fastapi" }, { name = "loguru" }, + { name = "numpy" }, { name = "polars" }, { name = "prometheus-fastapi-instrumentator" }, + { name = "pyarrow" }, { name = "requests" }, { name = "tinygrad" }, { name = "uvicorn" }, @@ -1582,11 +1580,12 @@ dependencies = [ [package.metadata] requires-dist = [ - { name = "category-encoders", specifier = ">=2.8.1" }, { name = "fastapi", specifier = ">=0.115.12" }, { name = "loguru", specifier = ">=0.7.3" }, + { name = "numpy", specifier = ">=2.2.6" }, { name = "polars", specifier = ">=1.29.0" }, { name = "prometheus-fastapi-instrumentator", specifier = ">=7.1.0" }, + { name = "pyarrow", specifier = ">=20.0.0" }, { name = "requests", specifier = ">=2.31.0" }, { name = "tinygrad", specifier = ">=0.10.3" }, { name = "uvicorn", specifier = ">=0.34.2" }, @@ -1737,6 +1736,17 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/52/8c/b0b6257d9e10c3f37760e0c86331aed0a490bd3005266865ae221135cbdf/pulumi_gcp-8.32.1-py3-none-any.whl", hash = "sha256:9a25311ba06338a1f4227f5158913246f5d05bca0cdb8f58089d3d51ad859ea4", size = 9741113, upload-time = "2025-05-22T16:58:43.932Z" }, ] +[[package]] +name = "pulumi-std" +version = "2.2.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "parver" }, + { name = "pulumi" }, + { name = "semver" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/95/f1/1623662490d001ead46064cd25475d9732a1ada974d9c5c92af11616e4c2/pulumi_std-2.2.0.tar.gz", hash = "sha256:e46759862b5068f26ff4c2252a9c9e9c720e6ea55ae58ac7b83d0bf7a9ac25bb", size = 25811, upload-time = "2025-02-13T17:30:16.232Z" } + [[package]] name = "pyarrow" version = "20.0.0" @@ -2035,25 +2045,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d5/c0/f5cc95ec88694429fcb841a37456be0a27463bc39d43edbd36e3164120ed/s3fs-2025.5.1-py3-none-any.whl", hash = "sha256:7475e7c40a3a112f17144907ffae50782ab6c03487fe0b45a9c3942bb7a5c606", size = 30476, upload-time = "2025-05-24T12:14:10.056Z" }, ] -[[package]] -name = "scikit-learn" -version = "1.6.1" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "joblib" }, - { name = "numpy" }, - { name = "scipy" }, - { name = "threadpoolctl" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/9e/a5/4ae3b3a0755f7b35a280ac90b28817d1f380318973cff14075ab41ef50d9/scikit_learn-1.6.1.tar.gz", hash = "sha256:b4fc2525eca2c69a59260f583c56a7557c6ccdf8deafdba6e060f94c1c59738e", size = 7068312, upload-time = "2025-01-10T08:07:55.348Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/0a/18/c797c9b8c10380d05616db3bfb48e2a3358c767affd0857d56c2eb501caa/scikit_learn-1.6.1-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:926f207c804104677af4857b2c609940b743d04c4c35ce0ddc8ff4f053cddc1b", size = 12104516, upload-time = "2025-01-10T08:06:40.009Z" }, - { url = "https://files.pythonhosted.org/packages/c4/b7/2e35f8e289ab70108f8cbb2e7a2208f0575dc704749721286519dcf35f6f/scikit_learn-1.6.1-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:2c2cae262064e6a9b77eee1c8e768fc46aa0b8338c6a8297b9b6759720ec0ff2", size = 11167837, upload-time = "2025-01-10T08:06:43.305Z" }, - { url = "https://files.pythonhosted.org/packages/a4/f6/ff7beaeb644bcad72bcfd5a03ff36d32ee4e53a8b29a639f11bcb65d06cd/scikit_learn-1.6.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1061b7c028a8663fb9a1a1baf9317b64a257fcb036dae5c8752b2abef31d136f", size = 12253728, upload-time = "2025-01-10T08:06:47.618Z" }, - { url = "https://files.pythonhosted.org/packages/29/7a/8bce8968883e9465de20be15542f4c7e221952441727c4dad24d534c6d99/scikit_learn-1.6.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2e69fab4ebfc9c9b580a7a80111b43d214ab06250f8a7ef590a4edf72464dd86", size = 13147700, upload-time = "2025-01-10T08:06:50.888Z" }, - { url = "https://files.pythonhosted.org/packages/62/27/585859e72e117fe861c2079bcba35591a84f801e21bc1ab85bce6ce60305/scikit_learn-1.6.1-cp312-cp312-win_amd64.whl", hash = "sha256:70b1d7e85b1c96383f872a519b3375f92f14731e279a7b4c6cfd650cf5dffc52", size = 11110613, upload-time = "2025-01-10T08:06:54.115Z" }, -] - [[package]] name = "scipy" version = "1.15.3" @@ -2184,27 +2175,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f4/d0/c9543b52c067a390ae6ae632d7fd1b97a35cdc8d69d40c0b7d334b326410/statsd-4.0.1-py2.py3-none-any.whl", hash = "sha256:c2676519927f7afade3723aca9ca8ea986ef5b059556a980a867721ca69df093", size = 13118, upload-time = "2022-11-06T14:17:34.258Z" }, ] -[[package]] -name = "statsmodels" -version = "0.14.4" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "numpy" }, - { name = "packaging" }, - { name = "pandas" }, - { name = "patsy" }, - { name = "scipy" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/1f/3b/963a015dd8ea17e10c7b0e2f14d7c4daec903baf60a017e756b57953a4bf/statsmodels-0.14.4.tar.gz", hash = "sha256:5d69e0f39060dc72c067f9bb6e8033b6dccdb0bae101d76a7ef0bcc94e898b67", size = 20354802, upload-time = "2024-10-03T16:15:36.273Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/f5/99/654fd41a9024643ee70b239e5ebc987bf98ce9fc2693bd550bee58136564/statsmodels-0.14.4-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:5221dba7424cf4f2561b22e9081de85f5bb871228581124a0d1b572708545199", size = 10220508, upload-time = "2024-10-03T17:10:31.183Z" }, - { url = "https://files.pythonhosted.org/packages/67/d8/ac30cf4cf97adaa48548be57e7cf02e894f31b45fd55bf9213358d9781c9/statsmodels-0.14.4-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:17672b30c6b98afe2b095591e32d1d66d4372f2651428e433f16a3667f19eabb", size = 9912317, upload-time = "2024-10-03T16:22:29.504Z" }, - { url = "https://files.pythonhosted.org/packages/e0/77/2440d551eaf27f9c1d3650e13b3821a35ad5b21d3a19f62fb302af9203e8/statsmodels-0.14.4-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ab5e6312213b8cfb9dca93dd46a0f4dccb856541f91d3306227c3d92f7659245", size = 10301662, upload-time = "2024-10-03T17:13:04.537Z" }, - { url = "https://files.pythonhosted.org/packages/fa/e1/60a652f18996a40a7410aeb7eb476c18da8a39792c7effe67f06883e9852/statsmodels-0.14.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4bbb150620b53133d6cd1c5d14c28a4f85701e6c781d9b689b53681effaa655f", size = 10741763, upload-time = "2024-10-03T17:13:17.594Z" }, - { url = "https://files.pythonhosted.org/packages/81/0c/2453eec3ac25e300847d9ed97f41156de145e507391ecb5ac989e111e525/statsmodels-0.14.4-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:bb695c2025d122a101c2aca66d2b78813c321b60d3a7c86bb8ec4467bb53b0f9", size = 10879534, upload-time = "2024-10-03T17:13:31.19Z" }, - { url = "https://files.pythonhosted.org/packages/59/9a/e466a1b887a1441141e52dbcc98152f013d85076576da6eed2357f2016ae/statsmodels-0.14.4-cp312-cp312-win_amd64.whl", hash = "sha256:7f7917a51766b4e074da283c507a25048ad29a18e527207883d73535e0dc6184", size = 9823866, upload-time = "2024-10-03T16:14:23.828Z" }, -] - [[package]] name = "tenacity" version = "9.1.2" @@ -2214,15 +2184,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e5/30/643397144bfbfec6f6ef821f36f33e57d35946c44a2352d3c9f0ae847619/tenacity-9.1.2-py3-none-any.whl", hash = "sha256:f77bf36710d8b73a50b2dd155c97b870017ad21afe6ab300326b0371b3b05138", size = 28248, upload-time = "2025-04-02T08:25:07.678Z" }, ] -[[package]] -name = "threadpoolctl" -version = "3.6.0" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/b7/4d/08c89e34946fce2aec4fbb45c9016efd5f4d7f24af8e5d93296e935631d8/threadpoolctl-3.6.0.tar.gz", hash = "sha256:8ab8b4aa3491d812b623328249fab5302a68d2d71745c8a4c719a2fcaba9f44e", size = 21274, upload-time = "2025-03-13T13:49:23.031Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/32/d5/f9a850d79b0851d1d4ef6456097579a9005b31fea68726a4ae5f2d82ddd9/threadpoolctl-3.6.0-py3-none-any.whl", hash = "sha256:43a0b8fd5a2928500110039e43a5eed8480b918967083ea48dc3ab9f13c4a7fb", size = 18638, upload-time = "2025-03-13T13:49:21.846Z" }, -] - [[package]] name = "tinygrad" version = "0.10.3" diff --git a/workflows/fetch_data.py b/workflows/fetch_data.py index 8d18a6bdd..9b8829a5d 100644 --- a/workflows/fetch_data.py +++ b/workflows/fetch_data.py @@ -23,7 +23,7 @@ def get_identity_token() -> str: @task def fetch_dates(start_date: str, end_date: str, token: str) -> pl.DataFrame | pl.Series: client = run_v2.ServicesClient() - project = os.getenv("GCP_PROJECT", "fund-alpha") + project = os.getenv("GCP_PROJECT", "pocketsizefund") region = os.getenv("GCP_REGION", "us-east1") parent: str = f"projects/{project}/locations/{region}"