Conversation
WalkthroughThis update migrates the infrastructure from Google Cloud Platform (GCP) and Cloud Run to AWS EKS with Knative for event-driven architecture. It removes the eventtrigger service, refactors service endpoints to use CloudEvents, introduces new AWS and Kubernetes provisioning modules, updates dependencies, and restructures environment variable management and monitoring. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant KnativeSchedule
participant KnativeBroker
participant predictionengine
participant positionmanager
participant datamanager
participant S3
participant PolygonAPI
%% Scheduled fetch event
KnativeSchedule->>datamanager: HTTP POST /equity-bars/fetch (CloudEvent)
datamanager->>PolygonAPI: Fetch daily equity bars
PolygonAPI-->>datamanager: Equity bars data
datamanager->>S3: Store data as Parquet
datamanager-->>KnativeSchedule: CloudEvent (success/error)
%% Prediction trigger
KnativeSchedule->>predictionengine: HTTP POST /predictions/create (CloudEvent)
predictionengine->>datamanager: Fetch historical data
datamanager-->>predictionengine: Data
predictionengine->>predictionengine: Generate predictions
predictionengine-->>KnativeBroker: CloudEvent (predictions)
%% Position open/close via event
KnativeBroker->>positionmanager: CloudEvent (predictions/close)
positionmanager->>positionmanager: Open/close positions
positionmanager-->>KnativeBroker: CloudEvent (result)
Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Pull Request Overview
This PR overhauls the deployment infrastructure by migrating from GCP Cloud Run to AWS EKS using Pulumi and refactors application services to use Knative and CloudEvent-driven interfaces.
- Remove GCP Cloud Run setup and introduce AWS EKS cluster provisioning, IAM roles, and Pulumi AWS/Kubernetes modules.
- Add Knative Serving & Eventing core resources, container image build pipeline, environment variable management, and AWS-managed Prometheus scraper.
- Refactor
predictionengine,positionmanager, anddatamanagerservices to emit CloudEvents, update endpoints, and adjust tests accordingly.
Reviewed Changes
Copilot reviewed 36 out of 40 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| workflows/pyproject.toml | Bump flytekit version in workflows project |
| pyproject.toml | Drop GCP and eventtrigger dependencies, simplify workspace config and dev group |
| infrastructure/services.py | Remove legacy GCP Cloud Run service creation code |
| infrastructure/roles.py | Add AWS IAM roles for EKS cluster and worker nodes |
| infrastructure/pyproject.toml | Switch infra dependencies from GCP to AWS/EKS and Kubernetes |
| infrastructure/publishers_subscribers.py | Add Knative Serving/Eventing core and service resource definitions |
| infrastructure/project.py | Remove GCP project and service enablement setup |
| infrastructure/ping.nu | Update ping script to use EKS cluster endpoint and AWS auth |
| infrastructure/monitors.py | Add AWS AMP scraper configuration for EKS cluster |
| infrastructure/monitoring.py | Remove GCP monitoring and Grafana secret setup |
| infrastructure/images.py | Introduce Pulumi-based container image build logic |
| infrastructure/environment_variables.py | Replace GCP Cloud Run env helpers with Pulumi Config outputs |
| infrastructure/cluster.py | Add VPC, subnets, NAT, and EKS cluster provisioning with Pulumi |
| infrastructure/buckets.py | Remove GCP storage bucket IAM configuration |
| infrastructure/main.py | Rewire Pulumi entrypoint to orchestrate AWS/EKS and Knative resources |
| application/predictionengine/tests/test_predictionengine_main.py | Update tests for CloudEvent-based predictions endpoint |
| application/predictionengine/models.py | Remove old PredictionResponse model |
| application/predictionengine/main.py | Refactor prediction endpoint to return CloudEvents and modularize get_predictions |
| application/predictionengine/pyproject.toml | Add cloudevents dependency |
| application/positionmanager/tests/test_positionmanager_main.py | Adapt tests for new open/close position CloudEvent endpoints |
| application/positionmanager/models.py | Remove unused PredictionPayload and adjust imports |
| application/positionmanager/main.py | Refactor position endpoints to CloudEvents and rename routes |
| application/positionmanager/pyproject.toml | Add cloudevents dependency |
| application/eventtrigger/tests/test_eventtrigger_main.py | Remove obsolete eventtrigger tests |
| application/eventtrigger/models.py | Remove TriggerEventRequest model |
| application/eventtrigger/main.py | Remove legacy eventtrigger service implementation |
| application/eventtrigger/pyproject.toml | Remove eventtrigger project configuration |
| application/eventtrigger/Dockerfile | Remove eventtrigger Dockerfile |
| application/datamanager/tests/test_datamanager_main.py | Update datamanager tests for metrics, equity-bars, and CloudEvents |
| application/datamanager/src/datamanager/main.py | Refactor data workflows to CloudEvent responses and S3/DuckDB clients |
| application/datamanager/src/datamanager/config.py | Remove legacy settings defined by Pydantic |
| application/datamanager/src/datamanager/clients.py | Introduce PolygonClient and S3Client for data access |
| application/datamanager/pyproject.toml | Update dependencies to include boto3, polygon-api-client, cloudevents |
| application/datamanager/features/steps/health_steps.py | Add # type: ignore for behave import |
| application/datamanager/features/steps/equity_bars_steps.py | Cleanup sys.path hack and add # type: ignore |
| .mise.toml | Enhance install task with --all-groups, clean up infra up task comments |
Comments suppressed due to low confidence (3)
infrastructure/publishers_subscribers.py:42
- [nitpick] This function creates both the Serving core ConfigGroup and a separate network ConfigMap but only returns the ConfigGroup. Consider returning the ConfigMap resource as well so callers can manage dependencies explicitly.
k8s.core.v1.ConfigMap(
infrastructure/cluster.py:1
- The new
cluster.pymodule contains complex provisioning logic but lacks unit or integration tests. Consider adding tests or mocks to validate VPC, subnet, NAT and EKS cluster creation.
import json
infrastructure/roles.py:6
- [nitpick] You are specifying both
resource_nameand AWSnamewith the same literal. To avoid duplication and potential drift, consider deriving the AWS rolenamefrom the Pulumiresource_nameor documenting the difference.
resource_name="pocketsizefund-cluster-role",
There was a problem hiding this comment.
Actionable comments posted: 23
🔭 Outside diff range comments (2)
application/datamanager/src/datamanager/main.py (2)
29-30: Critical: Still using Google Cloud Storage paths after AWS migration.The bars_query function and the metrics endpoint use "gs://" prefix which is for Google Cloud Storage, but according to the PR objectives, the infrastructure has been migrated to AWS S3. This will cause runtime failures.
def bars_query(*, bucket: str, start_date: date, end_date: date) -> str: - path_pattern = f"gs://{bucket}/equity/bars/*/*/*/*" + path_pattern = f"s3://{bucket}/equity/bars/*/*/*/*"And update line 101:
- 'gs://{request.app.state.s3_client.data_bucket_name}/equity/bars/*/*/*/*', + 's3://{request.app.state.s3_client.data_bucket_name}/equity/bars/*/*/*/*',Also applies to: 101-101, 150-150
63-73: DuckDB configuration still uses GCS secret type instead of S3.The secret type should be changed to S3 to match the AWS migration.
CREATE SECRET ( - TYPE GCS, + TYPE S3, KEY_ID '{DUCKDB_ACCESS_KEY}', SECRET '{DUCKDB_SECRET}' );
🧹 Nitpick comments (19)
infrastructure/roles.py (2)
9-22: Consider externalizing the inline JSON policy.The inline JSON assume role policy works but could be improved for maintainability. Consider using
pulumi_aws.iam.get_policy_document()or external policy files.+ assume_role_policy_doc = aws.iam.get_policy_document( + statements=[ + aws.iam.GetPolicyDocumentStatementArgs( + effect="Allow", + principals=[ + aws.iam.GetPolicyDocumentStatementPrincipalArgs( + type="Service", + identifiers=["eks.amazonaws.com"], + ), + ], + actions=["sts:AssumeRole"], + ), + ], + ) + cluster_role = aws.iam.Role( resource_name="pocketsizefund-cluster-role", description="Role for EKS cluster to manage resources", name="pocketsizefund-cluster-role", - assume_role_policy="""{ - "Version": "2012-10-17", - "Statement": [ - { - "Effect": "Allow", - "Principal": { - "Service": [ - "eks.amazonaws.com" - ] - }, - "Action": "sts:AssumeRole" - } - ] - }""", + assume_role_policy=assume_role_policy_doc.json, )
39-50: Consider externalizing the inline JSON policy.Similar to the cluster role, the node role's assume role policy could benefit from using
pulumi_aws.iam.get_policy_document()for better maintainability.+ assume_role_policy_doc = aws.iam.get_policy_document( + statements=[ + aws.iam.GetPolicyDocumentStatementArgs( + effect="Allow", + principals=[ + aws.iam.GetPolicyDocumentStatementPrincipalArgs( + type="Service", + identifiers=["ec2.amazonaws.com"], + ), + ], + actions=["sts:AssumeRole"], + ), + ], + ) + node_role = aws.iam.Role( resource_name="pocketsizefund-node-role", description="Role for EKS worker nodes to manage resources", name="pocketsizefund-node-role", - assume_role_policy="""{ - "Version": "2012-10-17", - "Statement": [ - { - "Effect": "Allow", - "Principal": { - "Service": "ec2.amazonaws.com" - }, - "Action": "sts:AssumeRole" - } - ] - }""", + assume_role_policy=assume_role_policy_doc.json, )application/predictionengine/pyproject.toml (1)
16-16: Add an upper-bound to avoid the next major breaking release
cloudeventsfollows semantic versioning; v2 is currently in RC and contains breaking API changes. Pinning only a lower bound (>=1.12.0) risks an unexpected break when v2 lands.- "cloudevents>=1.12.0", + "cloudevents>=1.12.0,<2.0",application/datamanager/features/steps/equity_bars_steps.py (1)
5-5: Prefer typed stubs over blanket# type: ignore
behavehas community stubs on PyPI (types-behave). Importing them keeps type-checking strict without suppressing all errors:-from behave import given, then, when # type: ignore +from behave import given, then, whenand add to
devdeps:types-behave>=1.2.6infrastructure/images.py (1)
14-17: Consider using absolute path resolution.The relative path
../applicationassumes the function is always called from theinfrastructuredirectory. This could break if the working directory changes.- service_directory = Path("../application") / service_name + service_directory = Path(__file__).parent.parent / "application" / service_nameapplication/predictionengine/tests/test_predictionengine_main.py (1)
17-70: Comprehensive mocking strategy with room for improvement.The test effectively mocks all external dependencies, but the test could be more robust:
- The mock data structure is hardcoded and doesn't test edge cases
- Missing assertions for the actual prediction data content
- Could benefit from testing error scenarios
Consider adding tests for:
- Empty prediction results
- External service failures
- Invalid model states
- Data fetching errors
Example additional test:
def test_create_predictions_no_data(self, mock_requests_get): # Test behavior when no historical data is available mock_df = MagicMock() mock_df.is_empty.return_value = True # ... rest of test setupapplication/datamanager/src/datamanager/clients.py (1)
21-44: S3Client has good error handling but could be more robust.The implementation correctly handles the case where no objects are found, but there are some areas for improvement:
- No error handling for AWS service exceptions
- The
delete_objectsmethod doesn't handle partial failures- Missing pagination for large object lists
Consider adding error handling:
def list_objects(self, prefix: str = "") -> list[str]: try: response = self.s3_client.list_objects_v2( Bucket=self.data_bucket_name, Prefix=prefix ) if "Contents" in response: return [obj["Key"] for obj in response["Contents"]] return [] except ClientError as e: # Log error and re-raise or handle appropriately raiseinfrastructure/ping.nu (1)
5-5: Parameterize hard-coded values for flexibility.The cluster name and service port are hard-coded, which reduces flexibility and reusability.
Consider making these configurable:
# At the top of the script let cluster_name = $env.CLUSTER_NAME? | default "pocketsizefund-cluster" let service_port = $env.SERVICE_PORT? | default "8080" # Then use them let token = aws eks get-token --cluster-name $cluster_name | from json | get status.token # In service URLs url: $"($cluster_endpoint)/api/v1/namespaces/default/services/datamanager:($service_port)/proxy"Also applies to: 9-22
application/predictionengine/src/predictionengine/main.py (1)
109-111: Log warnings consistently for insufficient data.The warning is logged but the issue is silently skipped. Consider collecting these warnings to include in the response.
Track skipped tickers to inform the user:
def get_predictions( tickers: list[str], data: pl.DataFrame, model: MiniatureTemporalFusionTransformer, ) -> dict[str, dict[str, float]]: predictions = {} + skipped_tickers = [] for ticker in tickers: ticker_data = data.filter(pl.col("ticker") == ticker) if len(ticker_data) < SEQUENCE_LENGTH: logger.warning(f"Insufficient data for ticker: {ticker}") + skipped_tickers.append(ticker) continueThen include
skipped_tickersin the CloudEvent data.application/positionmanager/src/positionmanager/main.py (1)
97-107: Unusual JSON structure with single values in lists.The response wraps single values in lists (e.g.,
[account_information["portfolio_value"]]), which creates an unusual API response format.Consider using a more conventional structure:
- { - "portfolio_value": [account_information["portfolio_value"]], - "cash_balance": [account_information["cash"]], - "positions_count": [len(positions)], - "positions": position_metrics, - } + { + "portfolio_value": account_information["portfolio_value"], + "cash_balance": account_information["cash"], + "positions_count": len(positions), + "positions": position_metrics, + }infrastructure/cluster.py (2)
88-88: Replace magic number with named constant.The magic number
10for CIDR offset lacks explanation and makes the code less maintainable.Use a named constant:
+# At the top of the function +PRIVATE_SUBNET_CIDR_OFFSET = 10 # Offset to separate private from public subnet ranges + # In the loop - cidr_block=f"10.0.{i + 10}.0/24", + cidr_block=f"10.0.{i + PRIVATE_SUBNET_CIDR_OFFSET}.0/24",
99-109: Parameterize cluster configuration for different environments.The cluster configuration uses fixed values which may not be suitable for all environments (dev/staging/prod).
Consider making these configurable through Pulumi config:
# Get from Pulumi config config = pulumi.Config() cluster_config = { "desired_capacity": config.get_int("cluster_desired_capacity") or 2, "min_size": config.get_int("cluster_min_size") or 1, "max_size": config.get_int("cluster_max_size") or 3, "instance_type": config.get("cluster_instance_type") or "t3.medium", } cluster = eks.Cluster( resource_name="pocketsizefund-cluster", **cluster_config, instance_role=node_role, # ... rest of the configuration )infrastructure/publishers_subscribers.py (7)
119-119: Consider parameterizing the namespace instead of hardcoding "default".This would improve flexibility and allow services to be deployed to different namespaces.
def create_knative_service( kubernetes_provider: k8s.Provider, service_name: str, image_reference: pulumi.Output[str], environment_variables: pulumi.Output[dict[str, str]] | None = None, + namespace: str = "default", depends_on: list[pulumi.Resource] | None = None, ) -> k8s.yaml.v2.ConfigGroup:And update line 119:
- "metadata": {"name": service_name, "namespace": "default"}, + "metadata": {"name": service_name, "namespace": namespace},
152-172: Consider parameterizing the broker name and namespace.Hardcoding "default" for both the broker name and namespace limits flexibility for multi-tenant or multi-environment deployments.
def create_knative_broker( kubernetes_provider: k8s.Provider, knative_eventing_core: k8s.yaml.v2.ConfigGroup, + broker_name: str = "default", + namespace: str = "default", ) -> k8s.yaml.v2.ConfigGroup: content = { "apiVersion": "eventing.knative.dev/v1", "kind": "Broker", "metadata": { - "name": "default", - "namespace": "default", + "name": broker_name, + "namespace": namespace, }, } return k8s.yaml.v2.ConfigGroup( - resource_name="pocketsizefund-default-broker", + resource_name=f"pocketsizefund-{broker_name}-broker", objs=[content],
255-255: Fix typo in variable name.- formated_cron_schedule = cron_schedule.replace(" ", "-") + formatted_cron_schedule = cron_schedule.replace(" ", "-")And update line 258:
- resource_name=f"{target_service_name}-{formated_cron_schedule}-schedule", + resource_name=f"{target_service_name}-{formatted_cron_schedule}-schedule",
103-103: Improve type annotation for better type safety.The union type annotation can be simplified using Python 3.10+ syntax or made more explicit for better readability.
- environment_variables: pulumi.Output[dict[str, str]] | None = None, + environment_variables: pulumi.Output[dict[str, str]] | None = None,Or for Python < 3.10:
- environment_variables: pulumi.Output[dict[str, str]] | None = None, + environment_variables: Optional[pulumi.Output[dict[str, str]]] = None,And add the import:
from typing import Optional
255-255: Fix typo in variable name.There's a typo in the variable name "formated" - should be "formatted".
- formated_cron_schedule = cron_schedule.replace(" ", "-") + formatted_cron_schedule = cron_schedule.replace(" ", "-")And update the usage on line 258:
- resource_name=f"{target_service_name}-{formated_cron_schedule}-schedule", + resource_name=f"{target_service_name}-{formatted_cron_schedule}-schedule",
268-268: Missing newline at end of file.The file is missing a newline at the end, which violates common Python conventions.
Add a newline at the end of the file.
1-268: Consider adding comprehensive documentation.The module lacks docstrings and inline comments explaining the purpose of each function and the overall architecture. This would improve maintainability for future developers.
Add module-level and function-level docstrings:
""" Knative Serving and Eventing infrastructure provisioning for AWS EKS. This module provides functions to deploy Knative components on a Kubernetes cluster, including Serving for containerized applications and Eventing for event-driven workflows. """ def create_knative_serving_core( kubernetes_provider: k8s.Provider, ) -> k8s.yaml.v2.ConfigGroup: """ Deploy Knative Serving core components to the cluster. Args: kubernetes_provider: The Kubernetes provider for the target cluster Returns: ConfigGroup containing the Knative Serving core components """
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (37)
.gitignore(1 hunks).mise.toml(2 hunks)application/datamanager/features/steps/equity_bars_steps.py(1 hunks)application/datamanager/features/steps/health_steps.py(1 hunks)application/datamanager/pyproject.toml(1 hunks)application/datamanager/src/datamanager/clients.py(1 hunks)application/datamanager/src/datamanager/config.py(0 hunks)application/datamanager/src/datamanager/main.py(8 hunks)application/datamanager/tests/test_datamanager_main.py(3 hunks)application/eventtrigger/Dockerfile(0 hunks)application/eventtrigger/pyproject.toml(0 hunks)application/eventtrigger/src/eventtrigger/main.py(0 hunks)application/eventtrigger/src/eventtrigger/models.py(0 hunks)application/eventtrigger/tests/test_eventtrigger_main.py(0 hunks)application/positionmanager/pyproject.toml(1 hunks)application/positionmanager/src/positionmanager/main.py(8 hunks)application/positionmanager/src/positionmanager/models.py(0 hunks)application/positionmanager/tests/test_positionmanager_main.py(4 hunks)application/predictionengine/pyproject.toml(1 hunks)application/predictionengine/src/predictionengine/main.py(3 hunks)application/predictionengine/src/predictionengine/models.py(0 hunks)application/predictionengine/tests/test_predictionengine_main.py(1 hunks)infrastructure/__main__.py(1 hunks)infrastructure/buckets.py(0 hunks)infrastructure/cluster.py(1 hunks)infrastructure/environment_variables.py(1 hunks)infrastructure/images.py(1 hunks)infrastructure/monitoring.py(0 hunks)infrastructure/monitors.py(1 hunks)infrastructure/ping.nu(1 hunks)infrastructure/project.py(0 hunks)infrastructure/publishers_subscribers.py(1 hunks)infrastructure/pyproject.toml(1 hunks)infrastructure/roles.py(1 hunks)infrastructure/services.py(0 hunks)pyproject.toml(3 hunks)workflows/pyproject.toml(1 hunks)
💤 Files with no reviewable changes (12)
- application/eventtrigger/src/eventtrigger/models.py
- application/eventtrigger/pyproject.toml
- application/positionmanager/src/positionmanager/models.py
- application/eventtrigger/Dockerfile
- infrastructure/buckets.py
- application/predictionengine/src/predictionengine/models.py
- infrastructure/monitoring.py
- infrastructure/services.py
- infrastructure/project.py
- application/eventtrigger/tests/test_eventtrigger_main.py
- application/eventtrigger/src/eventtrigger/main.py
- application/datamanager/src/datamanager/config.py
🧰 Additional context used
🧠 Learnings (1)
.mise.toml (1)
Learnt from: CR
PR: pocketsizefund/pocketsizefund#0
File: CLAUDE.md:0-0
Timestamp: 2025-06-25T03:05:52.781Z
Learning: Always use mise tasks for running tests, linting, and formatting. If a required command is missing in .mise.toml, recommend adding it.
🧬 Code Graph Analysis (2)
application/datamanager/src/datamanager/main.py (4)
application/datamanager/src/datamanager/models.py (1)
SummaryDate(8-31)application/positionmanager/src/positionmanager/main.py (1)
get_health(54-55)application/predictionengine/src/predictionengine/main.py (1)
get_health(39-40)application/datamanager/src/datamanager/clients.py (3)
get_all_equity_bars(12-18)list_objects(27-34)delete_objects(36-44)
application/predictionengine/src/predictionengine/main.py (4)
application/predictionengine/src/predictionengine/dataset.py (3)
DataSet(121-318)load_data(246-256)batches(278-318)application/predictionengine/src/predictionengine/miniature_temporal_fusion_transformer.py (2)
MiniatureTemporalFusionTransformer(22-195)predict(177-195)application/datamanager/src/datamanager/main.py (2)
lifespan(52-78)get_health(91-92)application/positionmanager/src/positionmanager/main.py (1)
get_health(54-55)
🔇 Additional comments (20)
application/positionmanager/pyproject.toml (1)
18-18: LGTM! CloudEvents dependency aligns with event-driven architecture.The addition of
cloudevents>=1.12.0is appropriate for the migration to event-driven architecture using CloudEvents pattern, supporting the new EKS-based infrastructure.workflows/pyproject.toml (1)
14-14: Verify compatibility of workflows with Flytekit 1.16.1The upgrade from
flytekit>=1.10.0to>=1.16.1may introduce breaking changes in decorator signatures or workflow behavior. Please ensure existing workflows run as expected:• Files importing Flytekit decorators:
- workflows/fetch_data.py
- workflows/train_predictionengine.py
• Actions:
- Run your full workflow test suite against Flytekit 1.16.1.
- Review the Flytekit 1.11.0–1.16.1 changelog for any API or decorator changes (e.g. task/workflow decorator params, type handling).
- Update any affected task or workflow definitions accordingly.
infrastructure/roles.py (2)
4-31: LGTM! Proper IAM role setup for EKS cluster.The cluster role creation follows AWS best practices with appropriate trust policy and managed policy attachment for EKS cluster operations.
34-71: LGTM! Proper IAM role setup for EKS worker nodes.The node role creation follows AWS best practices with appropriate trust policy and all necessary managed policies for EKS worker node operations (worker node policy, ECR access, and CNI policy).
pyproject.toml (3)
8-14: LGTM! Workspace configuration updated to remove eventtrigger service.The removal of
application/eventtriggerfrom workspace members aligns with the architectural migration to Knative event-driven architecture, replacing the previous eventtrigger service.
34-35: LGTM! Test paths updated to reflect eventtrigger removal.The removal of
application/eventtrigger/testsfrom pytest testpaths is consistent with the service removal and maintains clean test configuration.
65-117: LGTM! Improved formatting of linting configuration.The reformatting of the ruff linting rules improves readability while maintaining the same functionality.
application/datamanager/features/steps/health_steps.py (1)
2-2: LGTM! Type ignore comment improves type checking.The addition of
# type: ignorecomment for the behave import is appropriate for suppressing type checking warnings from third-party libraries with incomplete type annotations..gitignore (1)
14-14: Good call ignoring kubeconfig – prevents accidental credential leaksapplication/datamanager/pyproject.toml (1)
13-17: All specified package versions exist on PyPI
Both boto3>=1.38.23 (latest 1.39.4) and polygon-api-client>=1.14.6 (latest 1.15.1) are available, so no version adjustments are needed.infrastructure/images.py (2)
19-35: Excellent multi-platform Docker image configuration.The implementation correctly handles:
- Multi-platform builds (AMD64 and ARM64)
- Secure credential management using Pulumi secrets
- Proper resource naming with project prefix
- Docker Hub registry configuration
37-37: Verify Pulumi export key naming conventionEnsure the uppercase
SERVICE_NAME_IMAGEkey matches Pulumi’s output naming conventions and the format expected by any consuming code.
- Location: infrastructure/images.py:37
pulumi.export(f"{service_name.upper()}_IMAGE", image.ref).mise.toml (1)
2-2: Environment variable formatting looks correct.The spacing around the equals sign is consistent with TOML formatting standards.
application/predictionengine/tests/test_predictionengine_main.py (1)
12-14: Simple and effective health check test.The test correctly verifies the health endpoint returns HTTP 200 OK.
application/datamanager/src/datamanager/clients.py (1)
8-18: PolygonClient implementation looks solid.The client properly wraps the Polygon REST API with type hints and appropriate casting. The use of
cast()is necessary due to the library's dynamic typing.infrastructure/monitors.py (1)
6-9: Function signature and return type are correct.The function properly accepts Pulumi outputs and returns the expected AWS resource type.
application/positionmanager/tests/test_positionmanager_main.py (1)
1-136: Test updates correctly align with the CloudEvent-based API changes.The test refactoring properly covers the new endpoint structure and CloudEvent responses.
application/datamanager/tests/test_datamanager_main.py (1)
1-289: Excellent test coverage for the refactored datamanager service.The tests comprehensively cover success and error scenarios for all endpoints with proper mocking of external dependencies.
infrastructure/publishers_subscribers.py (2)
41-57: Review network configuration settings.The network configuration disables auto-TLS and cluster domain claims. Verify these settings align with your security requirements, especially the
"auto-tls": "false"setting which disables automatic TLS certificate provisioning.
206-206: Verify correct service name field in ConfigGroupThe variable
target_service_nameon line 206 may not correspond to the actual Knative Service name defined in yourConfigGroupobjects. Please confirm which attribute inConfigGroupholds the service’s name (e.g..metadata.name,.spec.service_name, etc.) and update this reference if needed.
- File: infrastructure/publishers_subscribers.py
- Line: 206
0455dd2 to
2a952f1
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (4)
application/predictionengine/src/predictionengine/main.py (2)
101-139: DataSet creation inefficiency persists from previous review.The
get_predictionsfunction still creates a new DataSet instance for each ticker iteration, which redundantly recomputes preprocessors. This inefficiency was flagged in previous reviews but remains unaddressed.Consider processing all tickers in a single batch to avoid redundant preprocessing:
def get_predictions( tickers: list[str], data: pl.DataFrame, model: MiniatureTemporalFusionTransformer, ) -> dict[str, dict[str, list[float]]]: predictions: dict[str, dict[str, list[float]]] = {} filtered_data = data.filter(pl.col("ticker").is_in(tickers)) dataset = DataSet( batch_size=len(tickers), sequence_length=SEQUENCE_LENGTH, sample_count=len(tickers), ) dataset.load_data(filtered_data) - for ticker in tickers: - ticker_data = data.filter(pl.col("ticker") == ticker) - if len(ticker_data) < SEQUENCE_LENGTH: - logger.warning(f"Insufficient data for ticker: {ticker}") - continue - - try: - tickers_batch, features_batch, _ = next(iter(dataset.batches())) - except StopIteration: - logger.warning(f"No batches available for ticker: {ticker}") - continue - - percentile_25, percentile_50, percentile_75 = model.predict( - tickers_batch, - features_batch, - ) - - predictions[ticker] = { - "percentile_25": percentile_25.tolist(), - "percentile_50": percentile_50.tolist(), - "percentile_75": percentile_75.tolist(), - } + try: + tickers_batch, features_batch, _ = next(iter(dataset.batches())) + percentile_25, percentile_50, percentile_75 = model.predict( + tickers_batch, + features_batch, + ) + + for i, ticker in enumerate(tickers): + predictions[ticker] = { + "percentile_25": [percentile_25[i]], + "percentile_50": [percentile_50[i]], + "percentile_75": [percentile_75[i]], + } + except StopIteration: + logger.warning("No batches available for prediction") return predictions
202-202: Exception handling remains too broad.The code still catches the base
Exceptionclass, which was flagged in previous reviews as being too broad and potentially masking specific error types.Be more specific with exception handling:
- except Exception as e: # noqa: BLE001 + except (requests.RequestException, ValueError, KeyError, FileNotFoundError) as e:infrastructure/publishers_subscribers.py (2)
17-18: Update Knative Serving version to address security and compatibility concerns.The hardcoded version v1.12.0 is outdated. Based on previous review comments, this should be updated to v1.18.1 for security and compatibility.
- "https://github.com/knative/serving/releases/download/knative-v1.12.0/serving-crds.yaml" + "https://github.com/knative/serving/releases/download/knative-v1.18.1/serving-crds.yaml"
28-29: Update Knative Serving core components version.Similar to the CRDs, the core components should use the updated version for consistency and security.
- "https://github.com/knative/serving/releases/download/knative-v1.12.0/serving-core.yaml" + "https://github.com/knative/serving/releases/download/knative-v1.18.1/serving-core.yaml"
🧹 Nitpick comments (6)
.github/workflows/teardown_application.yaml (4)
9-9: Redundant / brittleif:guardThe job is already scoped by
on.schedule.
Keepingif: github.event.schedule == '0 23 * * 1,2,3,4,5'adds no protection and blocks manualworkflow_dispatchor reruns.
Consider removing it or rewriting as
${{ github.event_name == 'schedule' && github.event.schedule == '0 23 * * 1,2,3,4,5' }}if double-checking is required.
5-5: DST-sensitive comment
# teardown at 6:00 PM ESTis wrong during daylight-saving time. Prefer “6 PM US Eastern” or rely solely on the explicit23:00 UTC.
10-12: Lock down default tokenAdd an explicit
permissionsblock (e.g.contents: read) to enforce least privilege and avoid silent privilege creep.permissions: contents: read
14-14: Shallow clone is sufficientSpeed up checkout by limiting history depth:
- uses: actions/checkout@v4 + uses: actions/checkout@v4 + with: + fetch-depth: 1application/datamanager/tests/test_datamanager_main.py (1)
57-57: Add constant for magic number.The value
1000should be defined as a named constant to improve readability and maintainability.+ EXPECTED_TOTAL_ROWS = 1000 - assert response_data["total_rows"] == 1000 # noqa: PLR2004 + assert response_data["total_rows"] == EXPECTED_TOTAL_ROWSinfrastructure/publishers_subscribers.py (1)
246-246: Fix typo in variable name.There's a typo in the variable name
formated_cron_schedule- it should beformatted_cron_schedule.- formated_cron_schedule = cron_schedule.replace(" ", "-") + formatted_cron_schedule = cron_schedule.replace(" ", "-")And update the usage:
- resource_name=f"{target_service_name}-{formated_cron_schedule}-schedule", + resource_name=f"{target_service_name}-{formatted_cron_schedule}-schedule",
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (46)
.github/workflows/check_code_quality.yaml(1 hunks).github/workflows/close_stale_issues_and_pull_requests.yaml(1 hunks).github/workflows/deploy.yaml(0 hunks).github/workflows/launch_application.yaml(1 hunks).github/workflows/lifecycle.yaml(0 hunks).github/workflows/run_tests.yaml(1 hunks).github/workflows/teardown_application.yaml(1 hunks).gitignore(1 hunks).mise.toml(2 hunks)application/datamanager/features/steps/equity_bars_steps.py(1 hunks)application/datamanager/features/steps/health_steps.py(1 hunks)application/datamanager/pyproject.toml(1 hunks)application/datamanager/src/datamanager/clients.py(1 hunks)application/datamanager/src/datamanager/config.py(0 hunks)application/datamanager/src/datamanager/main.py(8 hunks)application/datamanager/tests/test_datamanager_main.py(3 hunks)application/eventtrigger/Dockerfile(0 hunks)application/eventtrigger/pyproject.toml(0 hunks)application/eventtrigger/src/eventtrigger/main.py(0 hunks)application/eventtrigger/src/eventtrigger/models.py(0 hunks)application/eventtrigger/tests/test_eventtrigger_main.py(0 hunks)application/positionmanager/pyproject.toml(1 hunks)application/positionmanager/src/positionmanager/main.py(9 hunks)application/positionmanager/src/positionmanager/models.py(0 hunks)application/positionmanager/src/positionmanager/portfolio.py(1 hunks)application/positionmanager/tests/test_positionmanager_main.py(4 hunks)application/predictionengine/pyproject.toml(1 hunks)application/predictionengine/src/predictionengine/main.py(3 hunks)application/predictionengine/src/predictionengine/models.py(0 hunks)application/predictionengine/tests/test_predictionengine_main.py(1 hunks)infrastructure/__main__.py(1 hunks)infrastructure/buckets.py(0 hunks)infrastructure/cluster.py(1 hunks)infrastructure/environment_variables.py(1 hunks)infrastructure/images.py(1 hunks)infrastructure/monitoring.py(0 hunks)infrastructure/monitors.py(1 hunks)infrastructure/ping.nu(1 hunks)infrastructure/project.py(0 hunks)infrastructure/publishers_subscribers.py(1 hunks)infrastructure/pyproject.toml(1 hunks)infrastructure/roles.py(1 hunks)infrastructure/services.py(0 hunks)infrastructure/tags.py(1 hunks)pyproject.toml(3 hunks)workflows/pyproject.toml(1 hunks)
💤 Files with no reviewable changes (14)
- application/positionmanager/src/positionmanager/models.py
- application/predictionengine/src/predictionengine/models.py
- infrastructure/buckets.py
- application/eventtrigger/src/eventtrigger/models.py
- application/eventtrigger/pyproject.toml
- infrastructure/monitoring.py
- infrastructure/services.py
- application/eventtrigger/Dockerfile
- .github/workflows/lifecycle.yaml
- application/eventtrigger/tests/test_eventtrigger_main.py
- infrastructure/project.py
- application/eventtrigger/src/eventtrigger/main.py
- .github/workflows/deploy.yaml
- application/datamanager/src/datamanager/config.py
✅ Files skipped from review due to trivial changes (6)
- .github/workflows/run_tests.yaml
- .github/workflows/close_stale_issues_and_pull_requests.yaml
- .github/workflows/check_code_quality.yaml
- infrastructure/tags.py
- .github/workflows/launch_application.yaml
- application/datamanager/src/datamanager/clients.py
🚧 Files skipped from review as they are similar to previous changes (19)
- application/positionmanager/pyproject.toml
- application/datamanager/features/steps/health_steps.py
- application/predictionengine/pyproject.toml
- .gitignore
- workflows/pyproject.toml
- application/datamanager/features/steps/equity_bars_steps.py
- application/datamanager/pyproject.toml
- infrastructure/pyproject.toml
- infrastructure/images.py
- .mise.toml
- infrastructure/ping.nu
- infrastructure/roles.py
- infrastructure/monitors.py
- infrastructure/environment_variables.py
- pyproject.toml
- application/predictionengine/tests/test_predictionengine_main.py
- application/positionmanager/tests/test_positionmanager_main.py
- infrastructure/cluster.py
- application/datamanager/src/datamanager/main.py
🧰 Additional context used
🧬 Code Graph Analysis (2)
application/positionmanager/src/positionmanager/main.py (6)
application/positionmanager/src/positionmanager/clients.py (3)
AlpacaClient(16-103)DataClient(106-154)get_cash_balance(35-43)application/positionmanager/src/positionmanager/models.py (3)
DateRange(38-60)Money(8-35)to_payload(56-60)application/datamanager/src/datamanager/models.py (1)
DateRange(34-49)application/datamanager/src/datamanager/main.py (1)
get_health(91-92)application/predictionengine/src/predictionengine/main.py (1)
get_health(39-40)application/positionmanager/src/positionmanager/portfolio.py (1)
get_optimized_portfolio(18-62)
infrastructure/__main__.py (5)
infrastructure/cluster.py (3)
create_kubernetes_cluster(10-130)create_kubernetes_provider(133-145)update_kubernetes_cluster_access(148-200)infrastructure/images.py (1)
build_image(10-39)infrastructure/monitors.py (1)
create_prometheus_scraper(7-88)infrastructure/publishers_subscribers.py (6)
create_knative_broker(152-171)create_knative_eventing_core(62-96)create_knative_schedule(219-255)create_knative_service(99-149)create_knative_serving_core(5-59)create_knative_trigger(174-216)infrastructure/roles.py (2)
create_cluster_role(5-33)create_node_role(36-74)
🪛 YAMLlint (1.37.1)
.github/workflows/teardown_application.yaml
[error] 23-23: no new line character at the end of file
(new-line-at-end-of-file)
[error] 23-23: trailing spaces
(trailing-spaces)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Run Python tests
🔇 Additional comments (24)
.github/workflows/teardown_application.yaml (1)
18-22: Confirm Flox command path & failure handlingEnsure
mise tasks run infrastructure:downexists in the project and returns non-zero on error so the workflow fails loudly.application/datamanager/tests/test_datamanager_main.py (6)
9-9: LGTM: Import cleanup aligns with architecture changes.The removal of
BarsSummaryimport is consistent with the migration to CloudEvents-based responses as mentioned in the AI summary.
14-14: LGTM: Function rename reflects endpoint changes.The rename from
test_health_checktotest_get_healthaligns with the synchronous endpoint implementation mentioned in the summary.
105-129: LGTM: Comprehensive database error testing.The test properly mocks the DuckDB connection and verifies error handling behavior. The use of
IOExceptionand proper assertion of the 500 status code is appropriate.
130-156: LGTM: Good coverage for no data scenario.The test effectively simulates the case where no data is found and verifies the appropriate 404 response. The mocking of
arrow_result.num_rows = 0properly triggers the no-data condition.
171-209: LGTM: S3Client abstraction properly tested.The DELETE endpoint tests properly mock the new S3Client abstraction and test both success and not-found scenarios. The assertions verify both status codes and response data structure.
211-279: Verify CloudEvent error handling design.The fetch endpoint returns HTTP 200 even when write errors occur (line 275), which may be confusing for API consumers. This pattern suggests CloudEvents are used for both success and error scenarios.
Please confirm this is the intended behavior where errors are communicated through CloudEvent payloads rather than HTTP status codes. This design choice should be documented for clarity.
Additionally, the test properly validates:
- PolygonClient integration with
get_all_equity_barsmethod- Polars DataFrame operations and S3 path handling
- CloudEvent structure with correct event types
application/positionmanager/src/positionmanager/portfolio.py (2)
22-22: Type annotation change is consistent with the new prediction format.The update from
dict[str, float]todict[str, list[float]]correctly reflects the new percentile-based prediction data structure.
32-36: Predictions passed are floats, not lists—Series creation is safe
Thepredictions_percentile_50dict you pass intoget_optimized_portfoliomaps each ticker to a singlefloat(the 50th percentile), soprediction_series = pd.Series(predictions).reindex(mu.index).dropna()creates a numeric
floatSeries as intended. You can ignore the concern about list values here.• If you’d like to keep type hints in sync, consider updating the signature in
application/positionmanager/src/positionmanager/portfolio.pyfrompredictions: dict[str, list[float]]to
predictions: dict[str, float]Likely an incorrect or invalid review comment.
application/positionmanager/src/positionmanager/main.py (7)
1-1: Import addition for JSON handling is appropriate.Adding the
jsonimport is necessary for the new metrics endpoint response formatting.
9-9: CloudEvent import supports the event-driven architecture migration.The addition of CloudEvent import is consistent with the infrastructure refactoring to use Knative eventing.
54-55: Health endpoint simplification follows the pattern from other services.The change to return a simple Response without JSON content is consistent with the health endpoints in datamanager and predictionengine services.
151-169: Prediction extraction correctly handles the new CloudEvent format.The code properly extracts predictions from the CloudEvent data and uses the 50th percentile for portfolio optimization, addressing the previous concern about empty predictions.
223-236: CloudEvent response provides comprehensive position opening details.The response includes all necessary information: initial and final cash balances, optimized portfolio, executed trades, and time period data.
269-279: Error handling helper function promotes consistency.The
create_cloud_event_errorfunction ensures uniform error response format across all endpoints.
96-107: Double-check JSON metrics endpoint consumersWe verified that wrapping the scalar values in single-element lists only affects the JSON response in
application/positionmanager/src/positionmanager/main.py(around lines 96–107). A repo-wide search found:
- No Python, YAML, or JSON files parsing the old scalar JSON fields.
- Grafana dashboards under
infrastructure/grafana_dashboard.jsonuse PromQL metrics (portfolio_cash_balance,portfolio_positions_count), which are sourced from the Prometheus gauges and remain unchanged.However, any external clients or monitoring tools consuming this JSON endpoint will see
"portfolio_value": [value]instead of a bare number. Please confirm there are no downstream API consumers expecting the previous format and update documentation or bump the API version if needed.application/predictionengine/src/predictionengine/main.py (3)
12-12: CloudEvent import enables event-driven response format.Adding CloudEvent import is consistent with the migration to event-driven architecture.
39-40: Health endpoint synchronization aligns with service patterns.Changing from async to sync and renaming from
health_checktoget_healthcreates consistency across services.
191-200: CloudEvent success response provides comprehensive prediction data.The successful prediction response includes the date and detailed percentile predictions for all tickers, supporting the new event-driven workflow.
infrastructure/publishers_subscribers.py (1)
42-57: Network configuration appropriately disables unnecessary features.The ConfigMap settings disable auto-TLS and cluster domain claims, which is appropriate for a managed cluster setup where these features may not be needed.
infrastructure/__main__.py (4)
53-61: Version extraction from pyproject.toml is robust.The error handling covers common file and parsing errors, providing clear error messages for debugging deployment issues.
102-108: Explicit service names address previous review concerns.The trigger creation now uses explicit
source_service_nameandtarget_service_nameparameters, which addresses the fragility concerns from previous reviews about extracting names from Pulumi resource names.
110-132: Cron schedules implement the intended trading workflow.The schedules correctly implement:
- Daily data fetch at midnight (0 0 * * *)
- Monday morning position opening (0 10 * * 1)
- Friday evening position closing (0 13 * * 5)
This replaces the previous GCP Cloud Scheduler jobs with Knative-native scheduling.
134-137: Prometheus monitoring integration completes the observability setup.The cluster monitoring scraper ensures metrics collection from the EKS cluster, maintaining observability in the new infrastructure.
2a952f1 to
e80daa1
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (3)
application/datamanager/tests/test_datamanager_main.py (1)
35-75: This test structure issue was already flagged in previous reviewsThe TestMetricsEndpoint class continues to create a separate FastAPI app instance and TestClient, which is inconsistent with the rest of the test suite that uses the shared
client. This approach reduces maintainability and was specifically mentioned in previous review comments.infrastructure/ping.nu (1)
3-8: Add error handling for external commands.The kubectl and AWS CLI commands could fail without proper error handling, causing the script to fail unexpectedly with confusing error messages later.
Add error checking as suggested in the previous review:
-let cluster_endpoint = kubectl config view --minify --output jsonpath='{.clusters[0].cluster.server}' +let cluster_endpoint = try { + kubectl config view --minify --output jsonpath='{.clusters[0].cluster.server}' +} catch { + print -e "Failed to get cluster endpoint" + exit 1 +} -let token = aws eks get-token --cluster-name pocketsizefund-cluster | from json | get status.token +let token = try { + aws eks get-token --cluster-name pocketsizefund-cluster | from json | get status.token +} catch { + print -e "Failed to get EKS token" + exit 1 +}application/predictionengine/src/predictionengine/main.py (1)
202-202: Improve exception handling specificity.This is the same issue flagged in previous reviews. Catching the broad
Exceptionis still too generic and should be more specific to the types of errors that can occur during prediction creation.Consider catching more specific exceptions:
- except Exception as e: # noqa: BLE001 + except (requests.RequestException, ValueError, KeyError) as e:
🧹 Nitpick comments (1)
application/datamanager/tests/test_datamanager_main.py (1)
56-56: Consider explaining the magic numberThe assertion uses a magic number
1000with a noqa comment. While the test is correct, adding a comment explaining why this specific value is expected would improve readability.- assert response_data["total_rows"] == 1000 # noqa: PLR2004 + assert response_data["total_rows"] == 1000 # Expected value from mock fetchone() return
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (46)
.github/workflows/check_code_quality.yaml(1 hunks).github/workflows/close_stale_issues_and_pull_requests.yaml(1 hunks).github/workflows/deploy.yaml(0 hunks).github/workflows/launch_application.yaml(1 hunks).github/workflows/lifecycle.yaml(0 hunks).github/workflows/run_tests.yaml(1 hunks).github/workflows/teardown_application.yaml(1 hunks).gitignore(1 hunks).mise.toml(2 hunks)application/datamanager/features/steps/equity_bars_steps.py(1 hunks)application/datamanager/features/steps/health_steps.py(1 hunks)application/datamanager/pyproject.toml(1 hunks)application/datamanager/src/datamanager/clients.py(1 hunks)application/datamanager/src/datamanager/config.py(0 hunks)application/datamanager/src/datamanager/main.py(8 hunks)application/datamanager/tests/test_datamanager_main.py(3 hunks)application/eventtrigger/Dockerfile(0 hunks)application/eventtrigger/pyproject.toml(0 hunks)application/eventtrigger/src/eventtrigger/main.py(0 hunks)application/eventtrigger/src/eventtrigger/models.py(0 hunks)application/eventtrigger/tests/test_eventtrigger_main.py(0 hunks)application/positionmanager/pyproject.toml(1 hunks)application/positionmanager/src/positionmanager/main.py(9 hunks)application/positionmanager/src/positionmanager/models.py(0 hunks)application/positionmanager/src/positionmanager/portfolio.py(1 hunks)application/positionmanager/tests/test_positionmanager_main.py(4 hunks)application/predictionengine/pyproject.toml(1 hunks)application/predictionengine/src/predictionengine/main.py(3 hunks)application/predictionengine/src/predictionengine/models.py(0 hunks)application/predictionengine/tests/test_predictionengine_main.py(1 hunks)infrastructure/__main__.py(1 hunks)infrastructure/buckets.py(0 hunks)infrastructure/cluster.py(1 hunks)infrastructure/environment_variables.py(1 hunks)infrastructure/images.py(1 hunks)infrastructure/monitoring.py(0 hunks)infrastructure/monitors.py(1 hunks)infrastructure/ping.nu(1 hunks)infrastructure/project.py(0 hunks)infrastructure/publishers_subscribers.py(1 hunks)infrastructure/pyproject.toml(1 hunks)infrastructure/roles.py(1 hunks)infrastructure/services.py(0 hunks)infrastructure/tags.py(1 hunks)pyproject.toml(3 hunks)workflows/pyproject.toml(1 hunks)
💤 Files with no reviewable changes (14)
- application/positionmanager/src/positionmanager/models.py
- infrastructure/buckets.py
- application/predictionengine/src/predictionengine/models.py
- application/eventtrigger/src/eventtrigger/models.py
- infrastructure/monitoring.py
- application/eventtrigger/pyproject.toml
- application/eventtrigger/Dockerfile
- application/eventtrigger/tests/test_eventtrigger_main.py
- infrastructure/services.py
- infrastructure/project.py
- .github/workflows/lifecycle.yaml
- application/eventtrigger/src/eventtrigger/main.py
- .github/workflows/deploy.yaml
- application/datamanager/src/datamanager/config.py
✅ Files skipped from review due to trivial changes (2)
- workflows/pyproject.toml
- application/datamanager/features/steps/equity_bars_steps.py
🚧 Files skipped from review as they are similar to previous changes (26)
- .github/workflows/run_tests.yaml
- application/datamanager/features/steps/health_steps.py
- application/predictionengine/pyproject.toml
- application/positionmanager/pyproject.toml
- .gitignore
- .github/workflows/close_stale_issues_and_pull_requests.yaml
- .github/workflows/check_code_quality.yaml
- application/datamanager/pyproject.toml
- infrastructure/tags.py
- .github/workflows/launch_application.yaml
- .github/workflows/teardown_application.yaml
- application/predictionengine/tests/test_predictionengine_main.py
- infrastructure/images.py
- application/positionmanager/src/positionmanager/portfolio.py
- infrastructure/pyproject.toml
- infrastructure/roles.py
- .mise.toml
- infrastructure/environment_variables.py
- infrastructure/monitors.py
- application/datamanager/src/datamanager/clients.py
- pyproject.toml
- application/positionmanager/src/positionmanager/main.py
- application/positionmanager/tests/test_positionmanager_main.py
- infrastructure/cluster.py
- infrastructure/publishers_subscribers.py
- application/datamanager/src/datamanager/main.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Run Python tests
- GitHub Check: Run Python quality checks
🔇 Additional comments (24)
application/datamanager/tests/test_datamanager_main.py (9)
5-6: LGTM: Good addition of specific exception importsThe addition of
IOExceptionfrom duckdb and explicit FastAPI imports improves test clarity and specificity.
9-10: LGTM: Import updates align with application refactoringThe imports correctly reflect the refactored application structure and new components.
15-15: LGTM: Function rename matches endpoint refactoringThe rename from
test_health_checktotest_get_healthcorrectly aligns with the endpoint naming convention.
92-114: LGTM: Comprehensive error handling testThe database error test properly mocks the DuckDB connection and verifies the error response format. The use of
IOExceptionand proper status code assertion is correct.
115-141: LGTM: Good coverage of no-data scenarioThe test correctly simulates an empty result set and verifies the 404 response. The arrow result mocking is appropriate for this scenario.
156-173: LGTM: Well-structured delete success testThe test properly mocks the S3Client class and verifies both the response status and that the delete operation was called. The use of
HTTP_204_NO_CONTENTfor successful deletion is correct.
174-194: LGTM: Appropriate not-found scenario testingThe test correctly handles the case where no objects are found for deletion, returning a 404 status with an error message in the response body.
196-227: LGTM: Comprehensive CloudEvents integration testThe test properly validates the new CloudEvents response format, including the correct event type
application.datamanager.equity.bars.created. The mocking strategy for PolygonClient and DataFrame is appropriate.
232-261: LGTM: Good error scenario coverageThe test correctly simulates a write error and verifies that the CloudEvent type changes to
application.datamanager.equity.bars.erroredwith error details in the data field. This validates proper error handling in the fetch endpoint.infrastructure/ping.nu (3)
9-22: Service definitions correctly updated for Kubernetes.The service URL definitions correctly use the Kubernetes API proxy pattern to access services within the cluster. The URL structure follows the standard format for accessing services through the Kubernetes API server.
30-34: Good improvements to datamanager interaction.The use of
firstmethod improves safety over direct indexing, and the API change from POST to GET with query parameters appears to be a sensible refactoring of the datamanager service interface.
36-46: Query construction correctly adapted for Kubernetes.The URL construction properly adapts to the Kubernetes service proxy format, and the string manipulation to extract the host from the cluster endpoint is appropriate.
infrastructure/__main__.py (7)
1-26: Architecture migration imports look comprehensive.The imports represent a well-structured migration from GCP to AWS EKS with Knative. The use of
tomllibfor reading project configuration and the modular approach to infrastructure components is appropriate.
28-42: Proper AWS EKS cluster setup with security best practices.The cluster setup correctly creates separate IAM roles for cluster and nodes, uses Pulumi secrets for sensitive ARN values, and establishes the proper dependency chain for cluster provisioning.
44-51: Knative components properly configured.The Knative setup follows the correct dependency order with serving and eventing cores established first, followed by the broker that depends on eventing core.
53-61: Excellent error handling for configuration reading.The version reading implementation properly handles all relevant exception types and provides clear error messages. Using pyproject.toml as the single source of truth for version information is a good practice.
62-100: Consistent and well-structured service deployment pattern.The service deployment follows a clean pattern of image building followed by Knative service creation. The use of shared environment variables and proper dependency declarations on Knative serving core is appropriate.
102-132: Event-driven architecture properly implemented.The Knative triggers and schedules correctly implement the event-driven workflows. The past review feedback about explicit service names has been properly addressed, and the dependency declarations ensure correct ordering.
134-137: Monitoring integration properly configured.The Prometheus scraper setup correctly integrates with AWS Managed Prometheus using Pulumi secrets for sensitive configuration and the proper cluster reference.
application/predictionengine/src/predictionengine/main.py (5)
12-12: LGTM: CloudEvent import supports the architecture migration.The addition of CloudEvent import aligns with the infrastructure migration to event-driven architecture using Knative.
38-40: LGTM: Health endpoint changes are appropriate.The renaming from
health_checktoget_healthand making it synchronous are good improvements for consistency and performance.
54-58: LGTM: Request formatting improvement.The reformatted
requests.getcall improves readability without changing functionality.
142-217: LGTM: CloudEvent migration with improved error handling approach.The migration to CloudEvents is well-implemented with proper error cases handled. The lazy model initialization is also a good optimization.
25-32: Environment variable usage verified — no missing dependencies
- The only environment variable in
application/predictionengine/src/predictionengine/main.pyisDATAMANAGER_BASE_URL.- This variable is still referenced by:
workflows/train_predictionengine.pyapplication/positionmanager/src/positionmanager/main.py- No other environment variables were removed from this file, and no references to them exist elsewhere in the codebase.
e80daa1 to
bcaa359
Compare
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (4)
application/positionmanager/src/positionmanager/main.py (1)
151-169: Excellent fix for the empty predictions issue!This resolves the previous concern about passing empty predictions to the portfolio optimizer. The code now correctly extracts predictions from the CloudEvent data and processes the 50th percentile values for portfolio optimization.
The implementation properly:
- Validates event data format
- Extracts nested predictions structure
- Converts to the expected format for the optimizer
application/predictionengine/src/predictionengine/main.py (2)
101-145: Excellent fix for the dataset batching issue!This new implementation correctly addresses the critical logic error identified in past reviews. The function now:
- Validates data sufficiency for each ticker upfront
- Processes all valid tickers together in a single batch
- Maps predictions back to individual tickers correctly
This resolves the previous issue where each ticker was getting identical batch data.
208-208: Exception handling is still too broad.The broad
Exceptioncatch remains unchanged from previous reviews. Consider being more specific about the types of exceptions expected.Replace with more specific exception types:
- except Exception as e: # noqa: BLE001 + except (requests.RequestException, ValueError, KeyError) as e:infrastructure/ping.nu (1)
3-8: Add error handling for external commands.The kubectl and AWS CLI commands could fail without proper error handling, causing the script to fail unexpectedly.
The previous review comment about error handling is still applicable. Add error checking:
-let cluster_endpoint = kubectl config view --minify --output jsonpath='{.clusters[0].cluster.server}' +let cluster_endpoint = try { + kubectl config view --minify --output jsonpath='{.clusters[0].cluster.server}' +} catch { + print -e "Failed to get cluster endpoint" + exit 1 +} -let token = aws eks get-token --cluster-name pocketsizefund-cluster | from json | get status.token +let token = try { + aws eks get-token --cluster-name pocketsizefund-cluster | from json | get status.token +} catch { + print -e "Failed to get EKS token" + exit 1 +}
🧹 Nitpick comments (1)
application/positionmanager/src/positionmanager/main.py (1)
59-59: Metrics endpoint formatting needs clarification.The response format wraps single values in arrays (e.g.,
"portfolio_value": [account_information["portfolio_value"]]) which seems inconsistent with thepositionsarray that contains objects.Consider whether single values should be arrays or scalars:
- "portfolio_value": [account_information["portfolio_value"]], - "cash_balance": [account_information["cash"]], - "positions_count": [len(positions)], + "portfolio_value": account_information["portfolio_value"], + "cash_balance": account_information["cash"], + "positions_count": len(positions),Also applies to: 96-107
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (46)
.github/workflows/check_code_quality.yaml(1 hunks).github/workflows/close_stale_issues_and_pull_requests.yaml(1 hunks).github/workflows/deploy.yaml(0 hunks).github/workflows/launch_application.yaml(1 hunks).github/workflows/lifecycle.yaml(0 hunks).github/workflows/run_tests.yaml(1 hunks).github/workflows/teardown_application.yaml(1 hunks).gitignore(1 hunks).mise.toml(2 hunks)application/datamanager/features/steps/equity_bars_steps.py(1 hunks)application/datamanager/features/steps/health_steps.py(1 hunks)application/datamanager/pyproject.toml(1 hunks)application/datamanager/src/datamanager/clients.py(1 hunks)application/datamanager/src/datamanager/config.py(0 hunks)application/datamanager/src/datamanager/main.py(8 hunks)application/datamanager/tests/test_datamanager_main.py(3 hunks)application/eventtrigger/Dockerfile(0 hunks)application/eventtrigger/pyproject.toml(0 hunks)application/eventtrigger/src/eventtrigger/main.py(0 hunks)application/eventtrigger/src/eventtrigger/models.py(0 hunks)application/eventtrigger/tests/test_eventtrigger_main.py(0 hunks)application/positionmanager/pyproject.toml(1 hunks)application/positionmanager/src/positionmanager/main.py(9 hunks)application/positionmanager/src/positionmanager/models.py(0 hunks)application/positionmanager/src/positionmanager/portfolio.py(1 hunks)application/positionmanager/tests/test_positionmanager_main.py(4 hunks)application/predictionengine/pyproject.toml(1 hunks)application/predictionengine/src/predictionengine/main.py(3 hunks)application/predictionengine/src/predictionengine/models.py(0 hunks)application/predictionengine/tests/test_predictionengine_main.py(1 hunks)infrastructure/__main__.py(1 hunks)infrastructure/buckets.py(0 hunks)infrastructure/cluster.py(1 hunks)infrastructure/environment_variables.py(1 hunks)infrastructure/images.py(1 hunks)infrastructure/monitoring.py(0 hunks)infrastructure/monitors.py(1 hunks)infrastructure/ping.nu(1 hunks)infrastructure/project.py(0 hunks)infrastructure/publishers_subscribers.py(1 hunks)infrastructure/pyproject.toml(1 hunks)infrastructure/roles.py(1 hunks)infrastructure/services.py(0 hunks)infrastructure/tags.py(1 hunks)pyproject.toml(3 hunks)workflows/pyproject.toml(1 hunks)
💤 Files with no reviewable changes (14)
- application/predictionengine/src/predictionengine/models.py
- application/positionmanager/src/positionmanager/models.py
- infrastructure/buckets.py
- application/eventtrigger/src/eventtrigger/models.py
- application/eventtrigger/Dockerfile
- application/eventtrigger/pyproject.toml
- application/eventtrigger/src/eventtrigger/main.py
- application/eventtrigger/tests/test_eventtrigger_main.py
- infrastructure/monitoring.py
- infrastructure/project.py
- .github/workflows/lifecycle.yaml
- infrastructure/services.py
- .github/workflows/deploy.yaml
- application/datamanager/src/datamanager/config.py
✅ Files skipped from review due to trivial changes (1)
- application/datamanager/src/datamanager/clients.py
🚧 Files skipped from review as they are similar to previous changes (27)
- application/positionmanager/pyproject.toml
- .github/workflows/run_tests.yaml
- application/predictionengine/pyproject.toml
- application/datamanager/features/steps/health_steps.py
- .gitignore
- application/datamanager/features/steps/equity_bars_steps.py
- .github/workflows/close_stale_issues_and_pull_requests.yaml
- .github/workflows/check_code_quality.yaml
- infrastructure/tags.py
- application/datamanager/pyproject.toml
- .github/workflows/launch_application.yaml
- application/predictionengine/tests/test_predictionengine_main.py
- infrastructure/environment_variables.py
- .github/workflows/teardown_application.yaml
- .mise.toml
- infrastructure/images.py
- application/positionmanager/src/positionmanager/portfolio.py
- infrastructure/monitors.py
- infrastructure/roles.py
- infrastructure/pyproject.toml
- pyproject.toml
- workflows/pyproject.toml
- application/datamanager/tests/test_datamanager_main.py
- application/positionmanager/tests/test_positionmanager_main.py
- infrastructure/cluster.py
- infrastructure/publishers_subscribers.py
- application/datamanager/src/datamanager/main.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Run Python tests
🔇 Additional comments (25)
application/positionmanager/src/positionmanager/main.py (7)
1-1: Import changes look good for CloudEvents integration.The addition of CloudEvent import and removal of PredictionPayload model aligns with the migration to event-driven architecture.
Also applies to: 9-10, 16-16
54-55: Health endpoint simplified correctly.The endpoint now returns a simple 200 OK response, which is appropriate for health checks in containerized environments.
116-117: Endpoint rename and CloudEvent integration implemented correctly.The rename from
/positionsto/positions/openand the CloudEvent parameter adoption align with the event-driven architecture migration.
135-135: CloudEvent error handling is consistent.Good use of the helper function
create_cloud_event_errorfor consistent error responses across the service.Also applies to: 148-148, 172-172
223-236: CloudEvent response structure is well-designed.The response includes appropriate metadata with source, type, and comprehensive data payload including trade execution details.
239-240: Method change from DELETE to POST is appropriate.Changing from DELETE to POST aligns with CloudEvent patterns where all operations are typically POST requests carrying event payloads.
269-279: Helper function promotes consistent error handling.The
create_cloud_event_errorfunction ensures consistent error response structure across all endpoints.application/predictionengine/src/predictionengine/main.py (8)
12-13: CloudEvent import added correctly.Import changes support the migration to CloudEvent-based responses.
25-26: Lifespan management simplified appropriately.Removing unused environment variables and adding explicit model cleanup on shutdown improves resource management.
Also applies to: 30-32
39-40: Health endpoint conversion from async to sync is correct.Since the endpoint doesn't perform any async operations, making it synchronous is more appropriate.
54-58: Good addition of request timeout.Adding a 120-second timeout prevents hanging requests when fetching historical data.
148-149: Endpoint path and signature changes align with CloudEvents.The response model removal and CloudEvent return type support the event-driven architecture.
160-169: CloudEvent error handling is comprehensive.Good coverage of error scenarios with appropriate CloudEvent responses for:
- No data available
- No predictions generated
- General exceptions
Also applies to: 186-195, 212-223
171-175: Deferred model initialization is a good optimization.Loading the model only when needed and caching it in app state improves startup time and resource usage.
197-206: CloudEvent success response is well-structured.The response includes appropriate metadata and comprehensive prediction data with proper date attribution.
infrastructure/ping.nu (3)
9-22: Service URL construction looks correct.The migration to Kubernetes proxy URLs follows the correct pattern for accessing services through the API server. The static service definitions align well with the new EKS architecture.
32-34: API method change from POST to GET is appropriate.The change to use GET with query parameters follows REST conventions better and aligns with the new service architecture.
36-46: Robust URL construction approach.Using
url joinwith proper scheme, host, path, and params is a good practice that makes the URL construction more maintainable and less error-prone.infrastructure/__main__.py (7)
1-25: Well-organized imports and module structure.The imports are logically grouped and the separation of concerns into different modules (cluster, roles, publishers_subscribers, etc.) follows good architectural practices for infrastructure code.
26-42: Proper infrastructure setup with secure configuration.The infrastructure components are created in the correct dependency order, and using
configuration.require_secret()for sensitive IAM ARNs is a security best practice.
44-51: Clean Knative infrastructure setup.The Knative serving and eventing components are set up in the correct order with proper dependency management.
53-60: Excellent error handling for version reading.The comprehensive exception handling covers all likely failure scenarios, and the error message provides clear context about what went wrong.
62-100: Consistent and clean service creation pattern.The uniform approach to building images and creating Knative services with shared environment variables promotes maintainability and reduces configuration drift.
102-132: Past review comment has been properly addressed.The function calls now correctly use explicit service name parameters (
source_service_name,target_service_name) as suggested in the previous review. The event-driven architecture setup with proper dependencies looks solid.
134-137: Proper monitoring integration.The Prometheus scraper setup follows the same secure configuration patterns and integrates well with the AWS Managed Prometheus service.

Overview
Changes
Comments
This is a major overhaul because I couldn't get GCP Cloud Run to work nicely. Maybe it can work and we can revert it back at some point but I'm leaning towards AWS because of the managed Prometheus and Grafana offerings plus everything remains in a single cluster.
Summary by CodeRabbit
New Features
Refactor
Bug Fixes
Tests
Chores