Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions .github/workflows/lifecycle.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ jobs:
env:
PULUMI_ACCESS_TOKEN: ${{ secrets.PULUMI_ACCESS_TOKEN }}
with:
command: |
cd infrastructure
uv run pulumi up --yes
nu ping.nu
command: mise tasks run infrastructure:up
teardown:
if: github.event.schedule == '0 23 * * 1,2,3,4,5'
runs-on: ubuntu-latest
Expand All @@ -37,6 +34,4 @@ jobs:
env:
PULUMI_ACCESS_TOKEN: ${{ secrets.PULUMI_ACCESS_TOKEN }}
with:
command: |
cd infrastructure
uv run pulumi down --yes
command: mise tasks run infrastructure:down
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
.python_coverage.*
__pycache__/
infrastructure/Pulumi.infrastructure.yaml
infrastructure/Pulumi.production.yaml
.envrc
.coverage*
.coverage/
coverage.xml

todos.md
10 changes: 9 additions & 1 deletion .mise.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ description = "Launch cloud infrastructure"
run = """
set -e
cd infrastructure
uv run pulumi up --yes
uv run pulumi up --yes --stack pocketsizefund/pocketsizefund/production
nu ping.nu
"""

[tasks."infrastructure:down"]
description = "Teardown cloud infrastructure"
run = """
set -e
cd infrastructure
uv run pulumi down --yes --stack pocketsizefund/pocketsizefund/production
"""
2 changes: 1 addition & 1 deletion application/datamanager/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ services:
- 8080:8080
environment:
- POLYGON_API_KEY=${POLYGON_API_KEY}
- DATA_BUCKET=${DATA_BUCKET}
- DATA_BUCKET_NAME=${DATA_BUCKET_NAME}
- GOOGLE_APPLICATION_CREDENTIALS=/root/.config/gcloud/application_default_credentials.json
- DUCKDB_ACCESS_KEY=${DUCKDB_ACCESS_KEY}
- DUCKDB_SECRET=${DUCKDB_SECRET}
Expand Down
4 changes: 2 additions & 2 deletions application/datamanager/src/datamanager/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ class Polygon(BaseModel):


class Bucket(BaseModel):
name: str = Field(default=os.getenv("DATA_BUCKET", ""))
name: str = Field(default=os.getenv("DATA_BUCKET_NAME", ""))
project: str = Field(default=os.getenv("GCP_PROJECT", ""))

@computed_field
def daily_bars_path(self) -> str:
if self.name is None:
msg = "DATA_BUCKET environment variable is required"
msg = "DATA_BUCKET_NAME environment variable is required"
raise ValueError(msg)
return f"gs://{self.name}/equity/bars/"

Expand Down
17 changes: 17 additions & 0 deletions application/eventtrigger/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
FROM python:3.12.10

COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv

ENV PYTHONPATH=/app/src

WORKDIR /app

COPY pyproject.toml ./

RUN uv sync --no-dev

COPY ./src ./src

EXPOSE 8080

ENTRYPOINT ["uv", "run", "uvicorn", "eventtrigger.main:application", "--host", "0.0.0.0", "--port", "8080", "--app-dir", "src"]
20 changes: 20 additions & 0 deletions application/eventtrigger/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[project]
name = "eventtrigger"
version = "0.1.0"
description = "Event trigger service for scheduled HTTP requests"
requires-python = "==3.12.10"
dependencies = [
"fastapi>=0.115.12",
"uvicorn>=0.34.2",
"prometheus-fastapi-instrumentator>=7.1.0",
"loguru>=0.7.3",
"pydantic>=2.5.0",
"requests>=2.31.0",
]

[tool.hatch.build.targets.wheel]
packages = ["eventtrigger"]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
Empty file.
98 changes: 98 additions & 0 deletions application/eventtrigger/src/eventtrigger/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import os
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo

import requests
from fastapi import FastAPI, Response, status
from loguru import logger
from prometheus_fastapi_instrumentator import Instrumentator

from .models import TriggerEventRequest

application = FastAPI()
Instrumentator().instrument(application).expose(application)


@application.get("/health")
def health_check() -> Response:
return Response(status_code=status.HTTP_200_OK)


@application.post("/trigger")
def trigger_event(request: TriggerEventRequest) -> Response:
event_type = request.event
try:
url = ""
data = {}
method = ""

if event_type == "fetch_data":
eastern_timezone = ZoneInfo("America/New_York")

current_time = datetime.now(eastern_timezone)

rounding_minute_threshold = 30 # threshold for rounding to the next hour

rounded_time = None
if current_time.minute >= rounding_minute_threshold:
rounded_time = current_time.replace(
minute=0, second=0, microsecond=0
) + timedelta(hours=1)
else:
rounded_time = current_time.replace(minute=0, second=0, microsecond=0)

datamanager_base_url = os.getenv("DATAMANAGER_BASE_URL")

url = f"{datamanager_base_url}/equity-bars"

data = {"date": rounded_time.strftime("%Y-%m-%d")}

method = "POST"

elif event_type == "create_positions":
predictionengine_base_url = os.getenv("PREDICTIONENGINE_BASE_URL")

url = f"{predictionengine_base_url}/create-positions"

method = "POST"

elif event_type == "close_positions":
positionmanager_base_url = os.getenv("POSITIONMANAGER_BASE_URL")

url = f"{positionmanager_base_url}/positions"

method = "DELETE"

else:
logger.warning(f"Unknown event type: {event_type}")
return Response(status_code=status.HTTP_400_BAD_REQUEST)

if method == "POST":
response = requests.post(
url,
json=data,
timeout=10,
)
response.raise_for_status()
logger.info(f"Event {event_type} triggered successfully.")

elif method == "DELETE":
response = requests.delete(
url,
timeout=10,
)
response.raise_for_status()
logger.info(f"Event {event_type} triggered successfully.")

else:
logger.error(f"Unsupported HTTP method: {method}")
return Response(status_code=status.HTTP_400_BAD_REQUEST)

return Response(status_code=status.HTTP_200_OK)

except requests.RequestException as e:
logger.error(f"Error triggering event {event_type}: {e}")
return Response(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=f"Error triggering event: {e}",
)
5 changes: 5 additions & 0 deletions application/eventtrigger/src/eventtrigger/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from pydantic import BaseModel


class TriggerEventRequest(BaseModel):
event: str
Empty file.
114 changes: 114 additions & 0 deletions application/eventtrigger/tests/test_eventtrigger_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import unittest
from unittest.mock import MagicMock, patch

import requests
from fastapi import status
from fastapi.testclient import TestClient

from application.eventtrigger.src.eventtrigger.main import application

client = TestClient(application)


def test_health_check() -> None:
response = client.get("/health")
assert response.status_code == status.HTTP_200_OK


class TestTriggerEventEndpoint(unittest.TestCase):
def test_trigger_event_missing_body(self) -> None:
response = client.post("/trigger")
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY

def test_trigger_event_invalid_event_type(self) -> None:
response = client.post("/trigger", json={"event": "invalid_event"})
assert response.status_code == status.HTTP_400_BAD_REQUEST

@patch("application.eventtrigger.src.eventtrigger.main.os.getenv")
@patch("application.eventtrigger.src.eventtrigger.main.requests.post")
def test_trigger_event_fetch_data_success(
self, mock_post: MagicMock, mock_getenv: MagicMock
) -> None:
mock_getenv.return_value = "https://datamanager.example.com"
mock_response = MagicMock()
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response

response = client.post("/trigger", json={"event": "fetch_data"})

assert response.status_code == status.HTTP_200_OK
mock_post.assert_called_once()
call_args = mock_post.call_args
assert "https://datamanager.example.com/equity-bars" in str(call_args[0][0])
assert "json" in call_args[1]
assert "date" in call_args[1]["json"]

@patch("application.eventtrigger.src.eventtrigger.main.os.getenv")
@patch("application.eventtrigger.src.eventtrigger.main.requests.post")
def test_trigger_event_create_positions_success(
self, mock_post: MagicMock, mock_getenv: MagicMock
) -> None:
mock_getenv.return_value = "https://predictionengine.example.com"
mock_response = MagicMock()
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response

response = client.post("/trigger", json={"event": "create_positions"})

assert response.status_code == status.HTTP_200_OK
mock_post.assert_called_once()
call_args = mock_post.call_args
assert "https://predictionengine.example.com/create-positions" in str(
call_args[0][0]
)

@patch("application.eventtrigger.src.eventtrigger.main.os.getenv")
@patch("application.eventtrigger.src.eventtrigger.main.requests.delete")
def test_trigger_event_close_positions_success(
self, mock_delete: MagicMock, mock_getenv: MagicMock
) -> None:
mock_getenv.return_value = "https://positionmanager.example.com"
mock_response = MagicMock()
mock_response.raise_for_status.return_value = None
mock_delete.return_value = mock_response

response = client.post("/trigger", json={"event": "close_positions"})

assert response.status_code == status.HTTP_200_OK
mock_delete.assert_called_once()
call_args = mock_delete.call_args
assert "https://positionmanager.example.com/positions" in str(call_args[0][0])

@patch("application.eventtrigger.src.eventtrigger.main.os.getenv")
@patch("application.eventtrigger.src.eventtrigger.main.requests.post")
def test_trigger_event_request_exception(
self, mock_post: MagicMock, mock_getenv: MagicMock
) -> None:
mock_getenv.return_value = "https://datamanager.example.com"
mock_post.side_effect = requests.exceptions.ConnectionError("Connection failed")

response = client.post("/trigger", json={"event": "fetch_data"})

assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
assert "Error triggering event" in response.text

@patch("application.eventtrigger.src.eventtrigger.main.os.getenv")
@patch("application.eventtrigger.src.eventtrigger.main.requests.post")
def test_trigger_event_http_error(
self, mock_post: MagicMock, mock_getenv: MagicMock
) -> None:
mock_getenv.return_value = "https://datamanager.example.com"
mock_response = MagicMock()
mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError(
"HTTP Error"
)
mock_post.return_value = mock_response

response = client.post("/trigger", json={"event": "fetch_data"})

assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR
assert "Error triggering event" in response.text


if __name__ == "__main__":
unittest.main()
2 changes: 1 addition & 1 deletion application/positionmanager/src/positionmanager/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def create_position(payload: PredictionPayload) -> dict[str, Any]:

try:
optimized_portfolio = portfolio_optimizer.get_optimized_portfolio(
data=historical_data,
historical_data=historical_data,
portfolio_value=cash_balance,
predictions=payload.predictions,
)
Expand Down
4 changes: 2 additions & 2 deletions application/positionmanager/src/positionmanager/portfolio.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ def __init__(

def get_optimized_portfolio(
self,
data: pl.DataFrame,
historical_data: pl.DataFrame,
portfolio_value: Money,
predictions: dict[str, float],
prediction_weight: float = 0.3,
) -> dict[str, int]:
converted_data = data.to_pandas()
converted_data = historical_data.to_pandas()

if "date" in converted_data.columns:
converted_data = converted_data.set_index("date")
Expand Down
3 changes: 2 additions & 1 deletion application/predictionengine/compose.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: predictionengine integration tests
name: Prediction engine integration tests

services:
predictionengine:
Expand All @@ -9,6 +9,7 @@ services:
- 8080:8080
environment:
- DATAMANAGER_BASE_URL=${DATAMANAGER_BASE_URL}
- POSITIONMANAGER_BASE_URL=${POSITIONMANAGER_BASE_URL}
volumes:
- ./:/app/predictionengine
- ~/.config/gcloud/application_default_credentials.json:/root/.config/gcloud/application_default_credentials.json:ro
Expand Down
3 changes: 2 additions & 1 deletion application/predictionengine/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ dependencies = [
"uvicorn>=0.34.2",
"tinygrad>=0.10.3",
"polars>=1.29.0",
"category-encoders>=2.8.1",
"requests>=2.31.0",
"prometheus-fastapi-instrumentator>=7.1.0",
"loguru>=0.7.3",
"numpy>=2.2.6",
"pyarrow>=20.0.0",
]

[tool.hatch.build.targets.wheel]
Expand Down
Loading