Skip to content

data prep#666

Closed
chrisaddy wants to merge 2 commits intomasterfrom
data-prep
Closed

data prep#666
chrisaddy wants to merge 2 commits intomasterfrom
data-prep

Conversation

@chrisaddy
Copy link
Copy Markdown
Collaborator

  • fix issues with datamanager and added to infrastructure
  • prep data for training

Overview

Changes

Comments

Copilot AI review requested due to automatic review settings January 14, 2026 05:18
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jan 14, 2026

Caution

Review failed

The pull request is closed.

Walkthrough

This PR establishes AWS infrastructure for data and model management, adds comprehensive observability to the datamanager Rust application, and introduces Python utilities for data preparation pipelines. Changes include: new S3 buckets for data and model artifacts with versioning enabled, ECR repositories for containerized services, IAM roles and policies for SageMaker integration, CloudWatch log groups for application observability, DuckDB environment variable configuration in the datamanager Dockerfile, extensive distributed tracing instrumentation across datamanager modules (data.rs, storage.rs, equity_bars.rs, state.rs, health.rs, main.rs), new Python scripts for syncing equity categories from Polygon API and consolidating training data from S3, development workflow automation via maskfile commands, and adjusted rate-limiting in existing data sync tooling.

Sequence Diagrams

sequenceDiagram
    participant User
    participant Mask as Mask Command
    participant S3 as AWS S3
    participant PolygonAPI as Polygon API
    participant DuckDB as DuckDB/Local
    participant SageMaker as SageMaker

    User->>Mask: data sync-categories
    Mask->>PolygonAPI: Fetch all US stock tickers
    PolygonAPI-->>Mask: Ticker list with sectors/industries
    Mask->>S3: Upload categories CSV
    S3-->>Mask: Stored

    User->>Mask: models prepare (app_name)
    Mask->>S3: Read equity bars parquet (date range)
    Mask->>S3: Read categories CSV
    S3-->>Mask: Data retrieved
    Mask->>DuckDB: Filter & join data
    DuckDB-->>Mask: Consolidated training dataset
    Mask->>S3: Write training data parquet
    S3-->>Mask: Stored

    User->>Mask: models train (app_name)
    Mask->>SageMaker: Submit training job
    SageMaker->>S3: Read training data
    SageMaker->>S3: Write model artifacts
    SageMaker-->>Mask: Training complete
Loading
sequenceDiagram
    participant Client as Client/API
    participant Datamanager as Datamanager
    participant DuckDB as DuckDB
    participant S3 as AWS S3

    Client->>Datamanager: Request equity bars
    Datamanager->>Datamanager: Log request (tracing)
    Datamanager->>S3: Query parquet with glob (daily/**/*.parquet)
    S3-->>DuckDB: S3 credential setup + read_parquet
    DuckDB->>DuckDB: Apply date range filter (YYYYMMDD)
    DuckDB->>DuckDB: Hive partitioned scan
    DuckDB-->>Datamanager: DataFrame results
    Datamanager->>Datamanager: Log row count & dimensions
    Datamanager-->>Client: Return data

    Client->>Datamanager: Request portfolio snapshot
    Datamanager->>Datamanager: Log timestamp filter
    Datamanager->>S3: Glob most recent partition
    S3-->>DuckDB: read_parquet with Hive partitioning
    DuckDB-->>Datamanager: Latest portfolio state
    Datamanager->>Datamanager: Log final row count
    Datamanager-->>Client: Return portfolio
Loading

Possibly related PRs

  • Refactor datamanager module #635: Modifies same datamanager source files (data.rs, storage.rs, equity_bars.rs, state.rs, health.rs) with refactoring that this PR builds logging on top of
  • Reimplement infrastructure with EKS #605: Infrastructure migration to AWS resources (ECR repositories, S3 buckets, IAM roles) directly related to the Pulumi infrastructure changes
  • Model rebuild #637: Overlapping code-level changes across datamanager modules, tools for data preparation, and infrastructure modifications

Suggested labels

python, rust, tools, applications, infrastructure, feature

✨ Finishing touches
  • 📝 Generate docstrings


📜 Recent review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3e30ab3 and 43cf7b9.

⛔ Files ignored due to path filters (1)
  • .flox/env/manifest.lock is excluded by !**/*.lock
📒 Files selected for processing (14)
  • .claude/settings.local.json
  • applications/datamanager/Dockerfile
  • applications/datamanager/src/data.rs
  • applications/datamanager/src/equity_bars.rs
  • applications/datamanager/src/health.rs
  • applications/datamanager/src/main.rs
  • applications/datamanager/src/state.rs
  • applications/datamanager/src/storage.rs
  • infrastructure/Pulumi.production.yaml
  • infrastructure/__main__.py
  • maskfile.md
  • tools/prepare_training_data.py
  • tools/sync_equity_bars_data.py
  • tools/sync_equity_categories.py

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@pulumi
Copy link
Copy Markdown

pulumi Bot commented Jan 14, 2026

🚀 The Update (preview) for forstmeier/pocketsizefund/production (at 43cf7b9) was successful.

✨ Neo Explanation

Initial production deployment of a containerized financial portfolio management platform on AWS, creating a complete microservices architecture with ECS, load balancing, service discovery, and ML training capabilities.

Root Cause Analysis

This is a complete initial deployment of the PocketSizeFund production infrastructure. The developer has written new Pulumi infrastructure-as-code that defines a microservices architecture for a financial portfolio management application running on AWS ECS (containerized services).

Dependency Chain

The infrastructure builds in layers:

  1. Foundation Layer: A VPC with public/private subnets across two availability zones, internet gateway, and NAT gateway for secure networking
  2. Security Layer: Security groups and VPC endpoints (for ECR and S3) to enable private container image pulling and data access
  3. Storage Layer: S3 buckets for application data and ML model artifacts, with versioning enabled; ECR repositories for container images
  4. Compute Layer: ECS cluster with three microservices (datamanager, equitypricemodel, portfoliomanager) running as Fargate tasks
  5. Access Layer: Application Load Balancer with routing rules to expose specific services externally
  6. Service Discovery: Private DNS namespace for inter-service communication within the VPC
  7. Permissions: IAM roles for ECS task execution, container runtime, and SageMaker ML training jobs

Risk Analysis

MODERATE RISK - This is a greenfield deployment creating all new infrastructure. The primary risks are:

  • S3 buckets will be created without encryption or access logging configured by default
  • ALB is configured for HTTP only (no HTTPS/TLS), exposing traffic in plaintext
  • Four deprecation warnings for S3 resource types that should be addressed before deployment

Resource Changes

    Name                                 Type                                                          Operation
+   public_subnet_2                      aws:ec2/subnet:Subnet                                         create
+   public_subnet_1_rta                  aws:ec2/routeTableAssociation:RouteTableAssociation           create
+   data_bucket_versioning               aws:s3/bucketVersioningV2:BucketVersioningV2                  create
+   portfoliomanager_tg                  aws:lb/targetGroup:TargetGroup                                create
+   nat_gateway                          aws:ec2/natGateway:NatGateway                                 create
+   s3_gateway_endpoint                  aws:ec2/vpcEndpoint:VpcEndpoint                               create
+   datamanager_repository               aws:ecr/repository:Repository                                 create
+   ecs_cluster                          aws:ecs/cluster:Cluster                                       create
+   sagemaker_execution_role             aws:iam/role:Role                                             create
+   equitypricemodel_logs                aws:cloudwatch/logGroup:LogGroup                              create
+   vpc_endpoints_sg                     aws:ec2/securityGroup:SecurityGroup                           create
+   service_discovery                    aws:servicediscovery/privateDnsNamespace:PrivateDnsNamespace  create
+   sagemaker_cloudwatch_policy          aws:iam/rolePolicy:RolePolicy                                 create
+   public_internet_route                aws:ec2/route:Route                                           create
+   private_subnet_2                     aws:ec2/subnet:Subnet                                         create
+   ecr_dkr_endpoint                     aws:ec2/vpcEndpoint:VpcEndpoint                               create
+   equitypricemodel_sd                  aws:servicediscovery/service:Service                          create
+   portfoliomanager_rule                aws:lb/listenerRule:ListenerRule                              create
+   equitypricemodel_service             aws:ecs/service:Service                                       create
+   equitypricemodel_trainer_repository  aws:ecr/repository:Repository                                 create
+   public_subnet_2_rta                  aws:ec2/routeTableAssociation:RouteTableAssociation           create
+   model_artifacts_bucket_versioning    aws:s3/bucketVersioningV2:BucketVersioningV2                  create
+   igw                                  aws:ec2/internetGateway:InternetGateway                       create
+   alb_sg                               aws:ec2/securityGroup:SecurityGroup                           create
+   alb                                  aws:lb/loadBalancer:LoadBalancer                              create
+   portfoliomanager_service             aws:ecs/service:Service                                       create
+   datamanager_tg                       aws:lb/targetGroup:TargetGroup                                create
+   datamanager_task                     aws:ecs/taskDefinition:TaskDefinition                         create
+   private_subnet_1_rta                 aws:ec2/routeTableAssociation:RouteTableAssociation           create
+   vpc                                  aws:ec2/vpc:Vpc                                               create
+   execution_role_secrets_policy        aws:iam/rolePolicy:RolePolicy                                 create
+   execution_role_policy                aws:iam/rolePolicyAttachment:RolePolicyAttachment             create
+   ecs_egress                           aws:ec2/securityGroupRule:SecurityGroupRule                   create
+   public_subnet_1                      aws:ec2/subnet:Subnet                                         create
+   ecs_sg                               aws:ec2/securityGroup:SecurityGroup                           create
+   ecs_from_alb                         aws:ec2/securityGroupRule:SecurityGroupRule                   create
+   nat_route                            aws:ec2/route:Route                                           create
+   datamanager_service                  aws:ecs/service:Service                                       create
+   task_role_s3_policy                  aws:iam/rolePolicy:RolePolicy                                 create
+   sagemaker_s3_policy                  aws:iam/rolePolicy:RolePolicy                                 create
+   datamanager_sd                       aws:servicediscovery/service:Service                          create
... and 24 other changes

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Jan 14, 2026

Confidence Score: 4/5

  • This PR is safe to merge with minimal risk - the changes are primarily additive infrastructure and tooling with one critical bug fix
  • Score reflects solid implementation of data preparation infrastructure with proper type corrections in datamanager. The Rust type changes from u64 to f64 fix a real bug where decimal prices couldn't be handled. New Python tools follow project conventions with proper type hints and error handling. Infrastructure expansion is comprehensive with appropriate IAM permissions. Only minor issues found: one log message mismatch and one attribute access change that should be verified but is likely correct for the SDK version being used.
  • Verify that infrastructure/__main__.py line 10 uses the correct attribute for the Pulumi AWS provider version

Copy link
Copy Markdown
Contributor

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

14 files reviewed, 2 comments

Edit Code Review Agent Settings | Greptile

Comment on lines 121 to +122
logger.info("Waiting 15 seconds before next request")
time.sleep(15) # Massive API rate limit
time.sleep(1) # Massive API rate limit
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syntax: Log message says "Waiting 15 seconds" but code sleeps for 1 second

Suggested change
logger.info("Waiting 15 seconds before next request")
time.sleep(15) # Massive API rate limit
time.sleep(1) # Massive API rate limit
logger.info("Waiting 1 second before next request")
time.sleep(1) # Massive API rate limit
Prompt To Fix With AI
This is a comment left during a code review.
Path: tools/sync_equity_bars_data.py
Line: 121:122

Comment:
**syntax:** Log message says "Waiting 15 seconds" but code sleeps for 1 second

```suggestion
            logger.info("Waiting 1 second before next request")
            time.sleep(1)  # Massive API rate limit
```

How can I resolve this? If you propose a fix, please make it concise.

account_id = current_identity.account_id

region = aws.get_region().name
region = aws.get_region().region
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Changed from aws.get_region().name to .region - check that this is the correct attribute for the AWS provider version being used

Prompt To Fix With AI
This is a comment left during a code review.
Path: infrastructure/__main__.py
Line: 10:10

Comment:
**style:** Changed from `aws.get_region().name` to `.region` - check that this is the correct attribute for the AWS provider version being used

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request prepares data infrastructure for model training by adding new data synchronization tools, fixing datamanager issues, and expanding infrastructure to support SageMaker training jobs.

Changes:

  • Added two new Python scripts for syncing equity categories from Polygon API and preparing consolidated training data
  • Updated infrastructure to create S3 buckets, ECR repositories, and SageMaker execution roles instead of looking up existing resources
  • Enhanced datamanager with improved logging, better S3 query patterns using glob and hive partitioning, and fixed data type handling

Reviewed changes

Copilot reviewed 14 out of 15 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
tools/sync_equity_categories.py New script to fetch ticker categories (sector/industry) from Polygon API and upload to S3
tools/prepare_training_data.py New script to consolidate equity bars with categories and filter by price/volume thresholds
tools/sync_equity_bars_data.py Reduced API rate limit wait time from 15s to 1s
infrastructure/main.py Major refactor: creates S3 buckets, ECR repositories, SageMaker role; fixes region attribute; updates task definitions
infrastructure/Pulumi.production.yaml Changed region from secure/encrypted value to plain text
applications/datamanager/src/storage.rs Improved logging, switched to glob patterns with hive partitioning for S3 queries, better error handling
applications/datamanager/src/state.rs Enhanced initialization logging and improved default value handling
applications/datamanager/src/main.rs Updated logging filter and added startup log
applications/datamanager/src/health.rs Added debug logging to health check
applications/datamanager/src/equity_bars.rs Fixed data types (u64→f64 for prices/volumes), improved logging, enhanced error messages
applications/datamanager/src/data.rs Added comprehensive logging throughout data transformation functions
applications/datamanager/Dockerfile Added DuckDB environment variables for build
maskfile.md Added new commands for syncing categories and preparing training data
.flox/env/manifest.lock Minor formatting fix (trailing newline)
.claude/settings.local.json Reformatted JSON and added AWS CLI permissions
Comments suppressed due to low confidence (1)

applications/datamanager/src/storage.rs:1

  • Logging raw API response content could expose sensitive data if the API returns any credentials, tokens, or PII in error responses. Consider sanitizing or limiting what gets logged, or use debug level instead of warn.
use crate::data::{

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

if next_url:
url = next_url
params = {"apiKey": api_key}
time.sleep(0.25)
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sleep duration of 0.25 seconds may be too aggressive for API rate limiting. Consider documenting the specific rate limit from Polygon API or making this configurable via environment variable.

Copilot uses AI. Check for mistakes.
output_key: str,
) -> str:
"""Write consolidated training data to S3 as parquet."""
import io
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import statement should be moved to the top of the file with other imports (after line 17) following PEP 8 style guidelines.

Copilot uses AI. Check for mistakes.
if current_date <= end_date:
logger.info("Waiting 15 seconds before next request")
time.sleep(15) # Massive API rate limit
time.sleep(1) # Massive API rate limit
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reduced sleep time from 15 seconds to 1 second represents a 15x increase in request rate. Verify this doesn't violate the Massive API rate limits, as comment still references 'Massive API rate limit'. If rate limit has changed, update the comment to reflect the actual limit.

Copilot uses AI. Check for mistakes.
account_id = current_identity.account_id

region = aws.get_region().name
region = aws.get_region().region
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the attribute from .name to .region. Verify that .region is the correct attribute for aws.get_region() in the Pulumi AWS SDK version being used. The old code used .name which is a common pattern.

Suggested change
region = aws.get_region().region
region = aws.get_region().name

Copilot uses AI. Check for mistakes.
config:
aws:region:
secure: AAABALPeEekY8m4V3bzTX5idnUTmjjjRWfJQit7uhk+w4mxTWyDK5r0=
aws:region: us-east-1
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AWS region was changed from an encrypted value to plain text. While region information is generally not sensitive, if the previous configuration used encryption for consistency or organizational policy, this change breaks that pattern. Consider whether this was intentional or if it should remain encrypted.

Copilot uses AI. Check for mistakes.
t: u64,
v: Option<u64>,
vw: Option<u64>,
v: Option<f64>,
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed price and volume fields from u64 to f64. While this fixes the type mismatch issue (prices should be floating point), verify this doesn't introduce precision issues for volume which was originally u64. Volume typically represents share counts (whole numbers) and may have precision requirements that exceed f64's 53-bit mantissa for high-volume stocks.

Suggested change
v: Option<f64>,
v: Option<u64>,

Copilot uses AI. Check for mistakes.
Comment on lines +185 to 187
let start_date_int = start_timestamp.format("%Y%m%d").to_string().parse::<i32>().unwrap_or(0);
let end_date_int = end_timestamp.format("%Y%m%d").to_string().parse::<i32>().unwrap_or(99999999);

Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using unwrap_or with fallback values (0 and 99999999) silently hides parse errors. If the timestamp formatting or parsing fails, the query will execute with incorrect date bounds without warning. Consider logging a warning or returning an error instead of using fallback values.

Suggested change
let start_date_int = start_timestamp.format("%Y%m%d").to_string().parse::<i32>().unwrap_or(0);
let end_date_int = end_timestamp.format("%Y%m%d").to_string().parse::<i32>().unwrap_or(99999999);
let start_date_str = start_timestamp.format("%Y%m%d").to_string();
let end_date_str = end_timestamp.format("%Y%m%d").to_string();
let start_date_int = start_date_str.parse::<i32>().map_err(|e| {
warn!(
"Failed to parse start date '{}' as integer for date filter: {}",
start_date_str, e
);
Error::Other(format!(
"Failed to parse start date '{}' as integer",
start_date_str
))
})?;
let end_date_int = end_date_str.parse::<i32>().map_err(|e| {
warn!(
"Failed to parse end date '{}' as integer for date filter: {}",
end_date_str, e
);
Error::Other(format!(
"Failed to parse end date '{}' as integer",
end_date_str
))
})?;

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants