Skip to content

equity data integration#668

Closed
chrisaddy wants to merge 1 commit intodatamanager-fixesfrom
equity-data-integration
Closed

equity data integration#668
chrisaddy wants to merge 1 commit intodatamanager-fixesfrom
equity-data-integration

Conversation

@chrisaddy
Copy link
Copy Markdown
Collaborator

  • fix issues with datamanager and added to infrastructure
  • prep data for training
  • integration fixes

Overview

Changes

Comments

Copilot AI review requested due to automatic review settings January 16, 2026 02:12
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jan 16, 2026

Caution

Review failed

Failed to post review comments

Walkthrough

This pull request adds comprehensive observability via Sentry and structured logging across the datamanager, equitypricemodel, and portfoliomanager services. It introduces new training infrastructure by adding SageMaker-based and local model training workflows, new data preparation tooling, and support for S3-based model artifact management. Infrastructure changes create dedicated ECR repositories for each service, S3 buckets for data and model artifacts, and configure corresponding ECS services with environment variables and secrets. Configuration files are expanded with additional permissions, dependencies, and Docker build environment variables for DuckDB integration. Collectively, these changes establish a complete observability, training, and deployment pipeline for the equity price prediction system.

Sequence Diagram

sequenceDiagram
    participant Server as equitypricemodel Server
    participant S3 as AWS S3
    participant Filesystem as Temp Directory
    participant Model as TiDE Model
    participant Client as HTTP Client

    Server->>Server: App startup (lifespan context)
    Server->>S3: Find latest artifact key
    S3-->>Server: Artifact key path
    Server->>S3: Download artifact (.tar.gz)
    S3-->>Server: Artifact file
    Server->>Filesystem: Extract to temp directory
    Filesystem-->>Server: Model files ready
    Server->>Model: Load model from directory
    Model-->>Server: Model instance loaded
    Server->>Server: Store model in app.state
    Server-->>Client: Application ready

    Client->>Server: POST /model/predictions
    Server->>Server: Retrieve model from app.state
    Server->>Model: predict(data)
    Model-->>Server: Predictions
    Server-->>Client: JSON response

    Server->>Server: App shutdown (lifespan cleanup)
    Server->>Filesystem: Clean up temp directory
Loading

Possibly related PRs

  • got 2 services talking to each other #622: Overlapping code-level changes across datamanager, portfoliomanager, infrastructure configuration, and .claude/settings.local.json permission expansion.
  • Refactor datamanager module #635: Modifies identical datamanager source files (data.rs, storage.rs, state.rs, router.rs, predictions.rs) with related logging and error handling enhancements.
  • Model rebuild #637: Overlapping code-level changes across datamanager, equitypricemodel, infrastructure provisioning, and training tooling (run_training_job.py, model loading workflows).

Suggested labels

rust, python, applications, infrastructure, feature, tools

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 51.22% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'equity data integration' is vague and overly broad, describing a general concept rather than the specific primary changes in the changeset. Use a more specific title that highlights the main change, such as 'Add Sentry observability and data pipeline enhancements to datamanager and equitypricemodel' or 'Integrate Sentry monitoring and refactor data preparation workflows'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description check ✅ Passed The description mentions fixing datamanager issues, adding to infrastructure, prepping data for training, and integration fixes, which are all present in the changeset and relate to the actual changes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@pulumi
Copy link
Copy Markdown

pulumi Bot commented Jan 16, 2026

🚀 The Update (preview) for forstmeier/pocketsizefund/production (at 99b3ea8) was successful.

✨ Neo Explanation

Initial production deployment creating complete AWS infrastructure for a containerized financial application with three microservices (datamanager, equitypricemodel, portfoliomanager) running on ECS with load balancing, private networking, and ML training capabilities via SageMaker.

Root Cause Analysis

This deployment was triggered by a code change in the repository. The infrastructure is being created from scratch - this appears to be an initial deployment of the "pocketsizefund" application to the production environment. The developer has defined a complete AWS infrastructure stack for a microservices-based financial application with three main services: datamanager, equitypricemodel, and portfoliomanager.

Dependency Chain

The deployment follows a logical infrastructure creation order:

  1. Foundation Layer: VPC, subnets (public/private across 2 availability zones), internet gateway, and NAT gateway establish the network foundation
  2. Security Layer: Security groups and their rules control traffic between the ALB, ECS tasks, and VPC endpoints
  3. Storage & Registry: S3 buckets (with versioning) for data and model artifacts, plus ECR repositories for container images
  4. Compute Layer: ECS cluster, task definitions, and services deploy the three microservices
  5. Routing Layer: Application Load Balancer with listener rules routes traffic to the appropriate target groups based on path
  6. Service Discovery: Private DNS namespace enables internal service-to-service communication
  7. Permissions: IAM roles grant ECS tasks and SageMaker the necessary AWS service access

Risk analysis

LOW RISK - This is a greenfield deployment with no existing resources to impact. All resources are being created fresh, so there's no risk of data loss or service disruption.

Resource Changes

    Name                                 Type                                                 Operation
+   equitypricemodel_trainer_repository  aws:ecr/repository:Repository                        create
+   s3_gateway_endpoint                  aws:ec2/vpcEndpoint:VpcEndpoint                      create
+   nat_route                            aws:ec2/route:Route                                  create
+   task_role                            aws:iam/role:Role                                    create
+   model_artifacts_bucket               aws:s3/bucketV2:BucketV2                             create
+   sagemaker_s3_policy                  aws:iam/rolePolicy:RolePolicy                        create
+   private_subnet_2_rta                 aws:ec2/routeTableAssociation:RouteTableAssociation  create
+   equitypricemodel_repository          aws:ecr/repository:Repository                        create
+   vpc                                  aws:ec2/vpc:Vpc                                      create
+   igw                                  aws:ec2/internetGateway:InternetGateway              create
+   public_subnet_1                      aws:ec2/subnet:Subnet                                create
+   public_route_table                   aws:ec2/routeTable:RouteTable                        create
+   portfoliomanager_sd                  aws:servicediscovery/service:Service                 create
+   ecr_dkr_endpoint                     aws:ec2/vpcEndpoint:VpcEndpoint                      create
+   datamanager_service                  aws:ecs/service:Service                              create
+   datamanager_logs                     aws:cloudwatch/logGroup:LogGroup                     create
+   ecs_sg                               aws:ec2/securityGroup:SecurityGroup                  create
+   vpc_endpoints_sg                     aws:ec2/securityGroup:SecurityGroup                  create
+   private_route_table                  aws:ec2/routeTable:RouteTable                        create
+   alb_sg                               aws:ec2/securityGroup:SecurityGroup                  create
+   ecs_from_alb                         aws:ec2/securityGroupRule:SecurityGroupRule          create
+   equitypricemodel_task                aws:ecs/taskDefinition:TaskDefinition                create
+   datamanager_rule                     aws:lb/listenerRule:ListenerRule                     create
+   equitypricemodel_tg                  aws:lb/targetGroup:TargetGroup                       create
+   public_subnet_1_rta                  aws:ec2/routeTableAssociation:RouteTableAssociation  create
+   equitypricemodel_rule                aws:lb/listenerRule:ListenerRule                     create
+   datamanager_tg                       aws:lb/targetGroup:TargetGroup                       create
+   private_subnet_2                     aws:ec2/subnet:Subnet                                create
+   task_role_s3_policy                  aws:iam/rolePolicy:RolePolicy                        create
+   nat_gateway                          aws:ec2/natGateway:NatGateway                        create
+   http_listener                        aws:lb/listener:Listener                             create
+   portfoliomanager_rule                aws:lb/listenerRule:ListenerRule                     create
+   equitypricemodel_service             aws:ecs/service:Service                              create
+   equitypricemodel_logs                aws:cloudwatch/logGroup:LogGroup                     create
+   sagemaker_cloudwatch_policy          aws:iam/rolePolicy:RolePolicy                        create
+   sagemaker_ecr_policy                 aws:iam/rolePolicy:RolePolicy                        create
+   vpc_endpoints_ingress                aws:ec2/securityGroupRule:SecurityGroupRule          create
+   portfoliomanager_task                aws:ecs/taskDefinition:TaskDefinition                create
+   portfoliomanager_service             aws:ecs/service:Service                              create
+   execution_role                       aws:iam/role:Role                                    create
+   public_subnet_2                      aws:ec2/subnet:Subnet                                create
... and 26 other changes

@chrisaddy chrisaddy changed the base branch from master to data-prep January 16, 2026 02:14
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Jan 16, 2026

Confidence Score: 4/5

  • This PR is mostly safe to merge with one critical syntax fix needed
  • Score reflects well-structured implementation with comprehensive logging and error handling, but contains one breaking syntax error in infrastructure code (.region should be .name) that will cause deployment failure. The data pipeline integration is sound with proper schema validation, normalization, and S3 storage patterns.
  • Pay close attention to infrastructure/__main__.py - contains a syntax error that will break deployment

Copy link
Copy Markdown
Contributor

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional Comments (1)

  1. infrastructure/__main__.py, line 10 (link)

    syntax: Changed from .name to .region, but aws.get_region() returns an AwaitableGetRegionResult with a name field, not region

30 files reviewed, 2 comments

Edit Code Review Agent Settings | Greptile


logger = structlog.get_logger()

logger.info(f"Using device: {Device.DEFAULT}")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Don't use f-strings with structlog, pass values as kwargs for structured logging

Suggested change
logger.info(f"Using device: {Device.DEFAULT}")
logger.info("Using device", device=Device.DEFAULT)

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Prompt To Fix With AI
This is a comment left during a code review.
Path: tools/run_training_local.py
Line: 17:17

Comment:
**style:** Don't use f-strings with `structlog`, pass values as kwargs for structured logging

```suggestion
logger.info("Using device", device=Device.DEFAULT)
```

<sub>Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!</sub>

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request implements equity data integration for model training with significant infrastructure and application improvements. It adds data preparation pipelines, enhances the training workflow, integrates Sentry for error monitoring, and improves the datamanager's S3 query capabilities.

Changes:

  • Added new data management tools for syncing equity categories and preparing training data
  • Integrated Sentry SDK for error monitoring across all services
  • Enhanced infrastructure with S3 buckets, ECR repositories, and SageMaker IAM roles
  • Improved training workflow with local training support, GPU support, and enhanced logging
  • Fixed datamanager S3 query patterns and added comprehensive logging

Reviewed changes

Copilot reviewed 22 out of 25 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
uv.lock Added sentry-sdk, boto3, numpy, structlog, and tinygrad dependencies for equitypricemodel
tools/sync_equity_categories.py New script to fetch and sync equity sector/industry data from Polygon API to S3
tools/sync_equity_bars_data.py Reduced API rate limit sleep time from 15s to 1s
tools/run_training_local.py New script for local model training with GPU support
tools/run_training_job.py Added configurable SageMaker instance types with presets
tools/prepare_training_data.py New script to consolidate equity bars with categories for training
maskfile.md Added data management commands and model training workflows
infrastructure/main.py Created S3 buckets, ECR repositories, SageMaker roles, and improved resource management
infrastructure/Pulumi.production.yaml Changed AWS region from encrypted to plain text
applications/portfoliomanager Added Sentry integration and structured logging
applications/equitypricemodel/trainer.py Added comprehensive logging with structlog and device detection
applications/equitypricemodel/tide_model.py Added data validation, early stopping, and improved training metrics
applications/equitypricemodel/tide_data.py Optimized batch creation with numpy and added NaN/Inf validation
applications/equitypricemodel/server.py Added S3 artifact loading, Sentry integration, and improved data handling
applications/equitypricemodel/preprocess.py Optimized filtering using semi-join instead of filtering after aggregation
applications/equitypricemodel/predictions_schema.py Fixed timestamp dtype and predictions count validation
applications/equitypricemodel/Dockerfile Changed trainer base image to CUDA-enabled for GPU support
applications/datamanager Added Sentry integration, improved S3 glob patterns, enhanced logging throughout
Cargo.lock Added sentry and sentry-tower dependencies
.flox/env/manifest.toml Added direnv package
.claude/settings.local.json Added AWS CLI and Sentry MCP permissions

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +194 to +195
days=70
) # need 42 trading days (35 input + 7 output), ~60 calendar days + buffer
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The magic number 70 is hardcoded with an explanatory comment. Consider extracting this to a named constant like LOOKBACK_DAYS = 70 with the comment explaining the calculation (35 input days + 7 output days + buffer for non-trading days). This makes the code more maintainable and the intent clearer.

Copilot uses AI. Check for mistakes.
Comment on lines +376 to +378
cat_array = ticker_df[self.categorical_columns].to_numpy().astype(np.int32)
cont_array = ticker_df[self.continuous_columns].to_numpy().astype(np.float32)
static_array = ticker_df[self.static_categorical_columns].head(1).to_numpy().astype(np.int32)
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dtype conversion from float64 to float32 is mentioned in a comment about Metal GPU compatibility. Consider adding a configuration parameter or constant for the target dtype instead of hardcoding float32, especially since this could affect model precision. Document the trade-off between precision and GPU compatibility.

Copilot uses AI. Check for mistakes.
pa.Check(
check_fn=lambda data: check_dates_count_per_ticker(
data=data, dates_count=7
data=data, dates_count=1
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The predictions schema check changed from expecting 7 dates per ticker to expecting 1 date per ticker. This seems inconsistent with the model's output_length of 7 days. If the model predicts 7 days ahead, shouldn't each ticker have 7 prediction rows? This change may indicate a logic error in how predictions are being structured or validated.

Suggested change
data=data, dates_count=1
data=data

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expected output should be seven dates - is there a reason it was changed to 1?

RUN uv sync --no-dev

FROM python:3.12.10-slim AS trainer
FROM nvidia/cuda:12.4.1-runtime-ubuntu22.04 AS trainer
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The base image changed from python:3.12.10-slim to nvidia/cuda:12.4.1-runtime-ubuntu22.04 for GPU support, but the comment on line 31 sets ENV CUDA=1. Consider documenting whether this is for tinygrad GPU detection or if additional CUDA environment variables may be needed. Also verify that CUDA 12.4.1 is compatible with the target SageMaker GPU instances.

Copilot uses AI. Check for mistakes.
#[derive(Deserialize)]
pub struct SavePayload {
pub data: DataFrame,
pub data: Vec<Prediction>,
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SavePayload now expects Vec<Prediction> instead of DataFrame. This is a breaking API change that requires the client to send JSON-serialized predictions instead of binary Parquet. While this may be intentional for better error handling and validation, ensure all clients are updated to use the new format. Document this change in API documentation and consider versioning the endpoint if backward compatibility is needed.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seconded. ☝️

);

let required_columns = vec!["sector", "industry"];
let required_columns = vec!["ticker", "sector", "industry"];
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The required columns for equity_details_dataframe now include 'ticker', but the equity_details_schema in the Python code doesn't include 'ticker' as a required field (only 'sector' and 'industry'). This mismatch will cause validation failures when the dataframe is sent to equitypricemodel. Either add 'ticker' to the Python schema or remove it from the required columns in Rust.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. ☝️

Copy link
Copy Markdown
Collaborator

@forstmeier forstmeier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments/questions and also there are Ruff errors on a chunk of the Python stuff that needs to be fixed.

);

let required_columns = vec!["sector", "industry"];
let required_columns = vec!["ticker", "sector", "industry"];
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. ☝️

#[derive(Deserialize)]
pub struct SavePayload {
pub data: DataFrame,
pub data: Vec<Prediction>,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seconded. ☝️

pa.Check(
check_fn=lambda data: check_dates_count_per_ticker(
data=data, dates_count=7
data=data, dates_count=1
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expected output should be seven dates - is there a reason it was changed to 1?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused comments.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I like extracting the model from the image. In the future, I'd like to explore having things like equitypricemodel be more of an aggregator and normalizer with just a pass-through call to an external service (e.g. SageMaker inference, HuggingFace inference, etc).

You should be able to remove some of the logic around downloading artifacts from the launch_infrastructure.yaml GitHub workflow (I say we still keep the Mask command/helper script for now). Another cleanup (not necessary for this pull request) would likely be extracting the common logic from that tools/ helper script and here into the shared library.

nan_count = data.filter(pl.col(col).is_nan()).height
null_count = data.filter(pl.col(col).is_null()).height
inf_count = data.filter(~pl.col(col).is_finite()).height
if nan_count > 0 or null_count > 0 or inf_count > 0:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should any of these checks exit the method or is it safe to continue even if there are issues?

Comment on lines +350 to +371
if early_stopping_patience is not None:
if epoch_loss < best_loss - early_stopping_min_delta:
best_loss = epoch_loss
epochs_without_improvement = 0
logger.info(
"New best loss",
best_loss=f"{best_loss:.4f}",
)
else:
epochs_without_improvement += 1
logger.info(
"No improvement",
epochs_without_improvement=epochs_without_improvement,
patience=early_stopping_patience,
)

if epochs_without_improvement >= early_stopping_patience:
logger.info(
"Early stopping triggered",
epoch=epoch + 1,
best_loss=f"{best_loss:.4f}",
epochs_without_improvement=epochs_without_improvement,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really nice. Trick of the trade I didn't know.

tags=tags,
)

equitypricemodel_tg = aws.lb.TargetGroup(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You said this is the one that's temporary, right? If so, just toss in a comment indicating it is.

input_dir.mkdir(parents=True, exist_ok=True)
output_dir.mkdir(parents=True, exist_ok=True)

training_data_path = input_dir / "filtered_tft_training_data.parquet"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These files and S3 references should be updated from tft to tide.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In-line comments can be pulled.

Comment thread maskfile.md
uv run python tools/run_training_job.py
```

### train-local (application_name)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick but maybe have one of the options for "instance type" in the above be local and that does this logic here? A bit of a concept overload but I love a clean Mask command list.

Base automatically changed from data-prep to datamanager-fixes January 16, 2026 18:56
@chrisaddy chrisaddy closed this Jan 20, 2026
@chrisaddy chrisaddy deleted the equity-data-integration branch January 20, 2026 20:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants