Replace SageMaker with Prefect for training pipeline#775
Conversation
Migrate the TiDE model training pipeline from SageMaker to Prefect v3, adding self-hosted Prefect server infrastructure on ECS and refactoring the preprocessing pipeline into testable stages. Preprocessing pipeline (tide_data.py): - Refactor monolithic preprocess_and_set_data into 6 named stages (ValidateColumns, ExpandDateRange, FillNulls, EngineerFeatures, CleanData, ScaleAndEncode) with Pipeline class - Add snapshot/load capability for intermediate pipeline state - Public API unchanged - Data class delegates to Pipeline internally Model training (trainer.py, tide_model.py): - Refactor trainer.py from module-level script into train_model() function returning (Model, Data) tuple - Add empty batch guard in tide_model.py to skip zero-size batches - Configurable paths via env vars instead of hardcoded SageMaker paths Prefect training flow (tools/flows/training_flow.py): - 4 Prefect tasks: sync_equity_bars, sync_equity_details, prepare_data, train_tide_model - Artifacts packaged as model.tar.gz matching existing server expectations for find_latest_artifact_key() - Flow reports to self-hosted Prefect server dashboard Infrastructure (infrastructure/__main__.py): - Add RDS PostgreSQL for Prefect database - Add ElastiCache Redis for Prefect messaging - Add Prefect server and worker ECS services - Add ECR repository for Prefect server image - Add ALB listener on port 4200 with IP-restricted access - Remove SageMaker IAM role and execution policy Tests: - 20 tests for preprocessing pipeline stages and Data class - 14 tests for TiDE model including empty batch handling - 3 tests for Prefect-based run_training_job Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a staged time-series preprocessing pipeline and Data coordinator; extracts training into a reusable train_model with checkpointing and environment-configurable paths; replaces SageMaker orchestration with Prefect flows/deployments, tooling, and infra; updates Dockerfile entrypoint and adds extensive unit tests across data, model, trainer, flows, and notifications. Changes
Sequence DiagramsequenceDiagram
participant User
participant CLI
participant RunTrainingJob
participant TrainingFlow
participant DataManager
participant S3
participant ModelTrainer
User->>CLI: invoke training command
CLI->>RunTrainingJob: call(base_url, data_bucket, artifacts_bucket, lookback_days)
RunTrainingJob->>TrainingFlow: invoke training_pipeline(...)
TrainingFlow->>DataManager: sync_equity_bars(base_url, lookback_days)
TrainingFlow->>DataManager: sync_equity_details(base_url)
TrainingFlow->>S3: prepare_data -> upload filtered training data
S3-->>TrainingFlow: return training_data_key
TrainingFlow->>ModelTrainer: train_tide_model(training_data_key)
ModelTrainer->>ModelTrainer: Data.preprocess (Pipeline stages: ValidateColumns -> ... -> ScaleAndEncode)
ModelTrainer->>ModelTrainer: build batches
alt empty batches
ModelTrainer-->>ModelTrainer: skip empty batch
end
ModelTrainer->>ModelTrainer: train epochs (checkpointing)
ModelTrainer->>S3: upload model artifacts
S3-->>TrainingFlow: artifact_path
TrainingFlow-->>RunTrainingJob: artifact_path
RunTrainingJob-->>User: return artifact_path
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related issues
Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
applications/equitypricemodel/src/equitypricemodel/tide_data.py (2)
487-492: 🧹 Nitpick | 🔵 TrivialLog messages should use sentence case.
Multiple log messages in
get_batchesuse underscore-separated identifiers instead of sentence case format.Suggested fixes
- logger.info("partitioning_data_by_ticker") + logger.info("Partitioning data by ticker") ticker_groups = self.batch_data.sort("time_idx").partition_by( "ticker", as_dict=True ) total_tickers = len(ticker_groups) - logger.info("batch_creation_started", total_tickers=total_tickers) + logger.info("Batch creation started", total_tickers=total_tickers)Similar changes needed for lines 497, 548, 551, 578.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@applications/equitypricemodel/src/equitypricemodel/tide_data.py` around lines 487 - 492, The log messages in get_batches use underscore identifiers; update the logger.info calls (e.g., the call that logs "partitioning_data_by_ticker" and other calls near the ticker_groups/total_tickers logic) to use sentence case strings like "Partitioning data by ticker" and similar sentence-case phrases for the other messages referenced (lines near 497, 548, 551, 578). Locate logger.info invocations inside the get_batches method and replace underscore-separated messages with human-readable sentence-case messages while preserving any structured fields (e.g., total_tickers) passed as keyword args.
234-240: 🧹 Nitpick | 🔵 TrivialLog message should use sentence case.
The log message uses technical identifiers but lacks sentence case format. Per coding guidelines, structured log messages should be short sentences with sentence case.
Suggested fix
logger.warning( - "Invalid values in continuous column before scaling", + "Found invalid values in continuous column before scaling", column=col, nan_count=nan_count, null_count=null_count, inf_count=inf_count, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@applications/equitypricemodel/src/equitypricemodel/tide_data.py` around lines 234 - 240, Update the structured log call so the message uses sentence case (short sentence) instead of all-lower/technical style; modify the logger.warning invocation in tide_data.py (the call using logger with keys column=col, nan_count=nan_count, null_count=null_count, inf_count=inf_count) to supply a sentence-case message like "Invalid values in continuous column before scaling." while keeping the structured fields unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@applications/equitypricemodel/src/equitypricemodel/tide_data.py`:
- Around line 328-340: DEFAULT_STAGES currently holds singleton stage instances
(e.g., ScaleAndEncode) so multiple Pipeline()s share mutable state; update
Pipeline.__init__ to instantiate fresh stage objects when stages is None (e.g.,
replace using list(DEFAULT_STAGES) with creating new instances for each default
stage or change DEFAULT_STAGES to store stage classes/factories and call them to
produce new Stage instances) so ScaleAndEncode.scaler and .mappings are not
shared across pipelines.
- Around line 316-325: The return type annotation of
_create_mapping_and_encoding is too generic; change the second element of the
tuple from dict to a typed dict like dict[Any, int] (or dict[str, int] if you
know the column values are strings) and add the necessary import (from typing
import Any) at the top; update the function signature to -> tuple[pl.DataFrame,
dict[Any, int]] and ensure any uses of mapping are compatible with the new type.
In `@applications/equitypricemodel/tests/test_tide_data.py`:
- Around line 199-206: The test function test_pipeline_snapshot_roundtrip uses
the pytest tmp_path fixture but types it as str; change the tmp_path parameter
type to pathlib.Path (or from pathlib import Path and use Path) and build the
snapshot path using Path-style operations (e.g., tmp_path / "snapshot.parquet")
so the variable snapshot_path is a Path; when calling Pipeline.snapshot or
Pipeline.load_snapshot, convert to str only if those methods require a string
path—update references to tmp_path, snapshot_path, Pipeline.snapshot, and
Pipeline.load_snapshot accordingly.
In `@infrastructure/__main__.py`:
- Around line 1615-1626: The inline bash+Python one-liner that sets
PREFECT_API_DATABASE_CONNECTION_URL in infrastructure.__main__.py (the
multi-line "command" that uses python3 -c to URL-encode PREFECT_DB_PASSWORD and
prints postgresql+asyncpg://prefect:{p}@{args[1]}/prefect) is hard to maintain;
replace it with a small entrypoint script or add a clear comment/docstring
explaining the runtime URL-encoding requirement and why it's done here.
Concretely, move the logic into an executable entrypoint (e.g., entrypoint.sh or
entrypoint.py) that reads PREFECT_DB_PASSWORD, URL-encodes it, exports
PREFECT_API_DATABASE_CONNECTION_URL and then runs "prefect server start --host
0.0.0.0", or alternatively add an inline comment above the "command" explaining
that PREFECT_DB_PASSWORD must be URL-encoded and referencing the exact one-liner
currently used so future maintainers can understand/replicate it.
- Around line 2170-2173: The exported Prefect UI URL currently uses the
unencrypted protocol variable and may expose the dashboard over HTTP; update the
export and ALB listener setup so that when acm_certificate_arn is set the
protocol is "https://" and the ALB has an HTTPS listener with that certificate,
and if acm_certificate_arn is not set avoid advertising plain "http://" —
instead either (a) provision or require a TLS certificate before exposing the
UI, or (b) export a placeholder/flag indicating TLS is missing; change the
export call referencing protocol and alb.dns_name accordingly and ensure checks
around acm_certificate_arn and ALB listener creation (symbols: protocol, alb,
acm_certificate_arn, and the prefetched pulumi.export("prefect_ui_url", ...))
are updated to reflect HTTPS-only exposure.
- Around line 1494-1508: The RDS instance creation uses skip_final_snapshot=True
which prevents a final backup on deletion; update the aws.rds.Instance call for
prefect_database to set skip_final_snapshot=False and add a
final_snapshot_identifier (or make it configurable) so a snapshot is retained on
delete; consider also enabling deletion_protection or exposing
skip_final_snapshot/deletion_protection as a configuration flag to allow
non-production overrides while ensuring production keeps snapshots.
In `@tools/pyproject.toml`:
- Line 8: Update the Prefect dependency constraint to add an upper bound:
replace the unbounded "prefect>=3.0.0" entry with a semantically-pinned
constraint "prefect>=3.0.0,<4.0.0" so upgrades to Prefect 4.x won't be pulled in
unexpectedly; adjust the same dependency line in tools/pyproject.toml (the entry
currently shown as "prefect>=3.0.0") accordingly and consider applying the same
pinning pattern to other >= dependencies if you want consistency.
In `@tools/src/tools/flows/training_flow.py`:
- Around line 131-144: The pipeline ignores the return value of prepare_data
(the S3 key of prepared data) and calls train_tide_model without it; change
training_pipeline to capture prepare_data(...)->prepared_key and pass that
prepared_key into train_tide_model so the trainer uses the explicit S3 key
(update call sites and ensure train_tide_model signature accepts the key name if
it doesn't already — reference functions prepare_data and train_tide_model).
---
Outside diff comments:
In `@applications/equitypricemodel/src/equitypricemodel/tide_data.py`:
- Around line 487-492: The log messages in get_batches use underscore
identifiers; update the logger.info calls (e.g., the call that logs
"partitioning_data_by_ticker" and other calls near the
ticker_groups/total_tickers logic) to use sentence case strings like
"Partitioning data by ticker" and similar sentence-case phrases for the other
messages referenced (lines near 497, 548, 551, 578). Locate logger.info
invocations inside the get_batches method and replace underscore-separated
messages with human-readable sentence-case messages while preserving any
structured fields (e.g., total_tickers) passed as keyword args.
- Around line 234-240: Update the structured log call so the message uses
sentence case (short sentence) instead of all-lower/technical style; modify the
logger.warning invocation in tide_data.py (the call using logger with keys
column=col, nan_count=nan_count, null_count=null_count, inf_count=inf_count) to
supply a sentence-case message like "Invalid values in continuous column before
scaling." while keeping the structured fields unchanged.
ℹ️ Review info
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (14)
applications/equitypricemodel/Dockerfileapplications/equitypricemodel/src/equitypricemodel/tide_data.pyapplications/equitypricemodel/src/equitypricemodel/tide_model.pyapplications/equitypricemodel/src/equitypricemodel/trainer.pyapplications/equitypricemodel/tests/test_tide_data.pyapplications/equitypricemodel/tests/test_tide_model.pyinfrastructure/Pulumi.production.yamlinfrastructure/__main__.pymaskfile.mdtools/pyproject.tomltools/src/tools/flows/__init__.pytools/src/tools/flows/training_flow.pytools/src/tools/run_training_job.pytools/tests/test_run_training_job.py
Greptile SummaryThis PR is a well-scoped migration from SageMaker to Prefect v3 for the TiDE model training pipeline, paired with a clean six-stage refactor of Critical Issue: The inference path in
This causes silent correctness failures in predictions: wrong absolute return values and potential ticker misidentification. The fix requires using a separate preprocessing path at inference time that applies the loaded scaler and mappings rather than re-fitting them. Previous issues (HTTP-only dashboard, missing RDS snapshots, discarded Confidence Score: 1/5
Last reviewed commit: e7909dd |
Additional Comments (1)
These four CIDR entries appear to be individual developer home IPs (IPv4 and IPv6). Committing them directly to the stack config means every time a team member's IP changes (DHCP reassignment, ISP change, travel), the stack must be updated and re-deployed to regain dashboard access. Consider managing allowed CIDRs via a Pulumi secret or an external configuration mechanism, or documenting an update runbook so this doesn't silently lock out team members. Prompt To Fix With AIThis is a comment left during a code review.
Path: infrastructure/Pulumi.production.yaml
Line: 1338-1342
Comment:
**Hardcoded personal IP addresses committed to source control**
These four CIDR entries appear to be individual developer home IPs (IPv4 and IPv6). Committing them directly to the stack config means every time a team member's IP changes (DHCP reassignment, ISP change, travel), the stack must be updated and re-deployed to regain dashboard access. Consider managing allowed CIDRs via a Pulumi secret or an external configuration mechanism, or documenting an update runbook so this doesn't silently lock out team members.
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time! |
There was a problem hiding this comment.
Pull request overview
This PR migrates the TiDE model training pipeline from SageMaker to Prefect v3, refactors training/data-prep code into callable and testable components, and adds AWS infrastructure for a self-hosted Prefect server/worker on ECS.
Changes:
- Replace the SageMaker-based training job runner with a Prefect flow (
training_pipeline) and update tools/tests/docs accordingly. - Refactor
tide_data.pyinto a staged preprocessing pipeline and maketrainer.pyexpose a callabletrain_model(). - Add Prefect server/worker infrastructure on ECS (RDS Postgres, ElastiCache Redis, ECR repo, ALB listener on 4200, remove SageMaker IAM role).
Reviewed changes
Copilot reviewed 13 out of 15 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
uv.lock |
Adds Prefect (and transitive deps) and removes SageMaker from the resolved lock. |
tools/pyproject.toml |
Swaps sagemaker dependency for prefect. |
tools/src/tools/run_training_job.py |
Replaces SageMaker Estimator invocation with Prefect training_pipeline() call + env var changes. |
tools/src/tools/flows/training_flow.py |
Introduces Prefect flow/tasks to sync data, prepare parquet, train, and upload artifacts. |
tools/tests/test_run_training_job.py |
Updates tests to mock Prefect pipeline instead of SageMaker modules. |
maskfile.md |
Updates training command to use Prefect + removes SageMaker-specific setup. |
infrastructure/__main__.py |
Adds ECS-hosted Prefect server/worker + backing RDS/Redis + ALB listener + ECR repo; removes SageMaker role references. |
infrastructure/Pulumi.production.yaml |
Adds prefectAllowedCidrs configuration for dashboard access restriction. |
applications/equitypricemodel/src/equitypricemodel/trainer.py |
Refactors into callable train_model() and keeps CLI behavior under __main__. |
applications/equitypricemodel/src/equitypricemodel/tide_model.py |
Adds guard to skip zero-sized batches during training. |
applications/equitypricemodel/src/equitypricemodel/tide_data.py |
Refactors preprocessing into named stages and a Pipeline runner used by Data. |
applications/equitypricemodel/tests/test_tide_data.py |
Adds tests for each preprocessing stage, pipeline, and Data behaviors. |
applications/equitypricemodel/tests/test_tide_model.py |
Adds model/quantile-loss/training behavior tests (incl. empty batch guard). |
applications/equitypricemodel/Dockerfile |
Runs trainer via module entrypoint and sets training/model path env vars. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
forstmeier
left a comment
There was a problem hiding this comment.
Love it. Some changes requested and the bots caught a bunch of stuff.
I was doing cleanup work on the same code but you beat me to it and yours is a much better/more comprehensive pull request so if you can just pull in a few changes from mine here.
- I renamed the variables in the model to be more TiDE-specific (not a hard requirement but it feels clearer than the legacy TFT conventions)
- I deprecated references to
pyrightsince it was causing issues with my editor and it wasn't really being used - I think you've got some data snapshotting but I also added in "best loss" snapshotting on mine that might be worth copying over
- I added in a Parameter Store "version selection" for dynamic model selection while the application is live - I think that'd be useful to include
…ations - Add custom Prefect worker Dockerfile with project code and dependencies - Make training flow dates dynamic (rolling window from lookback_days) - Add SES email notification hooks on flow completion and failure - Add flow deployment script with module-path entrypoint for process workers - Add ECR repo, SES identity, and SES IAM policy to infrastructure - Add worker env vars for datamanager URL and notification emails - Encrypt Prefect allowed CIDRs as Pulumi secrets - Add maskfile commands for building worker image and deploying flows Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 7
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@infrastructure/__main__.py`:
- Around line 1456-1480: The IAM policy currently allows SES send actions across
all identities; narrow it to the specific sender by replacing the wildcard
Resource with the SES identity ARN for training_notification_email_identity:
build the ARN using Pulumi Outputs (e.g., pulumi.Output.all(region, account_id,
training_notification_email_identity.id or .arn) and format/apply to produce
"arn:aws:ses:{region}:{account_id}:identity/{identityName}") and use that Output
as the policy's Resource when creating aws.iam.RolePolicy for task_role.id so
the policy grants ses:SendEmail and ses:SendRawEmail only for the
training_notification_email_identity.
In `@tools/Dockerfile`:
- Around line 19-21: The runtime stage in the Dockerfile is installing compilers
(build-essential and clang) which increases image size and attack surface;
remove those packages from the RUN that updates apt and installs runtime deps
and instead install them only in the builder stage used for native builds (or
keep them behind a conditional build ARG if runtime compilation is truly
required). Update the multi-stage build so the builder stage installs
build-essential and clang, compiles whatever native modules there, and then COPY
only the built artifacts into the runtime/worker stage; if any runtime packages
were installed only to satisfy build-time compilation, replace them with the
compiled outputs in the runtime stage and remove build-essential and clang from
the runtime RUN line.
- Around line 17-31: Add a HEALTHCHECK to the worker Dockerfile and wire it into
ECS: create a lightweight health-check script (e.g.,
tools/worker_healthcheck.sh) that verifies the Prefect worker process started by
ENTRYPOINT ("uv run --package tools prefect worker start ...") is alive and
responsive (for example check process pid, Prefect API liveness, or use
`prefect` CLI to list worker status) and COPY it into the image, then add a
HEALTHCHECK instruction to the Dockerfile that runs that script with sensible
intervals/retries; also update the ECS task definition generator in
infrastructure/__main__.py to add a healthCheck block for the Prefect worker
container name so ECS uses the container HEALTHCHECK result to determine
container health.
- Around line 17-31: Add a non-root runtime user and ensure files are owned by
it so the long-running worker doesn't run as root: in the Dockerfile create a
dedicated user/group (e.g., addgroup --system worker && adduser --system
--ingroup worker --disabled-login worker), chown the application directory and
the copied /bin/uv to that user (chown -R worker:worker /app && chown
worker:worker /bin/uv) and then switch to that user with USER worker before the
ENTRYPOINT so the existing ENTRYPOINT ["uv", "run", "--package", "tools",
"prefect", "worker", "start", "--pool", "training-pool", "--type", "process"]
runs unprivileged; also set a safe HOME for the user if needed and keep build
steps that require root before the USER instruction.
- Around line 1-3: The Dockerfile uses mutable image tags (python:3.12.10-slim
and ghcr.io/astral-sh/uv:latest); replace each image reference with its
immutable SHA256 digest form (e.g., python@sha256:... and
ghcr.io/astral-sh/uv@sha256:...) so builds are reproducible and secure. Locate
every occurrence of "python:3.12.10-slim" and "ghcr.io/astral-sh/uv:latest" in
the Dockerfile and update them to their corresponding `@sha256`:<digest> values
(use docker pull or the registry UI/API to retrieve the exact digest), then
verify the Docker build still succeeds.
In `@tools/src/tools/run_training_job.py`:
- Around line 42-43: The LOOKBACK_DAYS parsing currently does
int(os.getenv("LOOKBACK_DAYS", "365")) outside the try/except so a malformed
value raises ValueError before your error handling; move the int() conversion of
os.getenv("LOOKBACK_DAYS") into the existing try block (where the rest of job
setup runs), validate that the parsed lookback_days is a positive integer (>0),
and catch ValueError/TypeError to handle bad input (using the existing error
handling/logging flow) while keeping the default of 365 when the env var is
missing or empty.
In `@tools/tests/test_notifications.py`:
- Around line 117-136: The test currently only ensures
send_training_notification doesn't raise when SES fails; patch the
logger.exception used by send_training_notification (e.g., patch
"tools.flows.notifications.logger") and add an assertion that logger.exception
was called (mock_logger.exception.assert_called()) after calling
send_training_notification; optionally also assert the mocked SES client's
send_email was invoked to confirm the error originated from send_email.
ℹ️ Review info
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (10)
infrastructure/Pulumi.production.yamlinfrastructure/__main__.pymaskfile.mdtools/Dockerfiletools/src/tools/deploy_training_flow.pytools/src/tools/flows/notifications.pytools/src/tools/flows/training_flow.pytools/src/tools/run_training_job.pytools/tests/test_notifications.pytools/tests/test_run_training_job.py
#775 review - Replace shared mutable DEFAULT_STAGES list with default_stages() factory function to prevent ScaleAndEncode state leaking across Pipeline instances - Wrap Model.validate() in try/finally to restore Tensor.training state, matching the existing train() pattern - Wire prepare_data return value through training_pipeline by adding output_key parameter and threading training_data_key explicitly - Raise ValueError on NaN scaler values in ScaleAndEncode instead of silently logging errors - Add stage name validation to Pipeline.run_to() for unknown stage names - Remove dead Data._create_mapping_and_encoding method - Scope SES task role policy to specific email identity ARN instead of wildcard - Harden RDS configuration with final snapshot, encryption, and deletion protection - Add test suites for training_flow.py, deploy_training_flow.py, and trainer.py - Add tests for NaN validation, run_to validation, and validate() state restore Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 5
♻️ Duplicate comments (3)
infrastructure/__main__.py (2)
816-829: 🧹 Nitpick | 🔵 TrivialPrefect dashboard exposed over HTTP.
The Prefect listener uses HTTP on port 4200. While access is IP-restricted via security group rules, consider prioritizing TLS certificate setup to encrypt dashboard traffic, especially if accessed over untrusted networks.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@infrastructure/__main__.py` around lines 816 - 829, The Prefect listener "prefect_listener" is created with protocol "HTTP" on port 4200; change it to use TLS by creating or referencing an ACM certificate and updating the listener to use protocol "HTTPS" (or port 443) with certificate_arn set and an appropriate ssl_policy, while keeping the default forward action to prefect_tg.arn; ensure any ALB security group and redirect rules are updated to allow/redirect HTTPS traffic and remove plaintext exposure from alb.arn on port 4200.
1681-1692: 🧹 Nitpick | 🔵 TrivialComplex inline command for database URL construction.
The inline bash/Python command to URL-encode the password and construct the connection string works but is difficult to maintain. Consider adding a comment explaining why runtime URL-encoding is needed, or extracting this to an entrypoint script.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@infrastructure/__main__.py` around lines 1681 - 1692, The inline bash/Python one-liner in the Docker command construction (the "command" array building in infrastructure/__main__.py) is hard to maintain; extract the password URL-encoding and PREFECT_API_DATABASE_CONNECTION_URL assembly into a separate entrypoint script (e.g., create an entrypoint.sh that uses Python or urllib.parse to quote os.environ['PREFECT_DB_PASSWORD'] and exports the connection string, then runs "prefect server start --host 0.0.0.0"), update the "command" to call that script, and/or add a concise comment above the current inlined block explaining why runtime URL-encoding is required and referencing the new entrypoint filename if added.tools/src/tools/flows/training_flow.py (1)
134-151: 🧹 Nitpick | 🔵 TrivialConsider capturing
prepare_datareturn value for robustness.While the current implementation passes the same
training_data_keyto bothprepare_dataandtrain_tide_model, capturing the actual return value fromprepare_datawould provide explicit coupling and catch any future path changes:♻️ Suggested improvement
def training_pipeline( base_url: str, data_bucket: str, artifacts_bucket: str, lookback_days: int = 365, ) -> str: """End-to-end training pipeline.""" training_data_key = "training/filtered_tide_training_data.parquet" sync_equity_bars(base_url, lookback_days) sync_equity_details(base_url) - prepare_data(data_bucket, artifacts_bucket, lookback_days, training_data_key) - return train_tide_model(artifacts_bucket, training_data_key) + prepared_key = prepare_data(data_bucket, artifacts_bucket, lookback_days, training_data_key) + return train_tide_model(artifacts_bucket, prepared_key)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/src/tools/flows/training_flow.py` around lines 134 - 151, The pipeline currently ignores prepare_data's return and always passes the hardcoded training_data_key into train_tide_model; modify training_pipeline so it captures the return value from prepare_data (e.g., prepared_key = prepare_data(...)) and then pass that prepared_key into train_tide_model(artifacts_bucket, prepared_key) to ensure the actual prepared artifact/path is used (refer to functions prepare_data and train_tide_model).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@applications/equitypricemodel/tests/test_trainer.py`:
- Around line 11-47: The _make_raw_data helper is duplicated across tests;
extract it into a shared test utility (e.g., a top-level conftest.py or
tests/utils.py) and update both test_trainer.py and test_tide_data.py to use the
shared function or a pytest fixture instead of their local copies. Move the
function definition (keeping the same signature, defaults, type hints, and
behavior), export it (or wrap it with `@pytest.fixture` if you prefer fixture
usage), update imports in tests to reference the shared symbol _make_raw_data,
and remove the duplicate local definitions so both files call the single shared
implementation.
- Around line 59-65: Rename the unused unpacked variable in
test_train_model_uses_custom_configuration: when calling
train_model(training_data, configuration=custom_config) change the second return
binding from data to _data (or _ ) to follow Python convention for intentionally
unused variables; keep the rest of the test (custom_config,
DEFAULT_CONFIGURATION, assertion on model.hidden_size) unchanged.
- Around line 68-71: Update the
test_test_train_model_raises_on_insufficient_data to assert the specific error
message by using pytest.raises(..., match=...). In the test function
test_train_model_raises_on_insufficient_data wrap the call to
train_model(short_data) with pytest.raises(ValueError, match="...") where the
match string exactly matches the ValueError text raised by train_model for
insufficient data (use the actual message emitted by train_model to ensure the
regex matches). This will make the test validate both the exception type and its
message.
In `@tools/tests/test_deploy_training_flow.py`:
- Around line 40-41: The test currently imports EntrypointType inside the test
body—move the "from prefect.flows import EntrypointType" import to the
module-level imports at the top of the file and remove the inline import on the
test line; ensure any references to EntrypointType (and related variables like
mock_deploy and call_kwargs) continue to work after relocating the import.
In `@tools/tests/test_training_flow.py`:
- Around line 84-87: Remove the no-op nested patch around
train_tide_model.__wrapped__ and only keep the meaningful patch on train_model;
specifically, delete the outer with
patch("tools.flows.training_flow.train_tide_model.__wrapped__",
side_effect=None) block and its indentation so the test uses the existing patch
of train_model (and still calls train_tide_model.fn()) directly, leaving
train_tide_model and train_model references intact.
---
Duplicate comments:
In `@infrastructure/__main__.py`:
- Around line 816-829: The Prefect listener "prefect_listener" is created with
protocol "HTTP" on port 4200; change it to use TLS by creating or referencing an
ACM certificate and updating the listener to use protocol "HTTPS" (or port 443)
with certificate_arn set and an appropriate ssl_policy, while keeping the
default forward action to prefect_tg.arn; ensure any ALB security group and
redirect rules are updated to allow/redirect HTTPS traffic and remove plaintext
exposure from alb.arn on port 4200.
- Around line 1681-1692: The inline bash/Python one-liner in the Docker command
construction (the "command" array building in infrastructure/__main__.py) is
hard to maintain; extract the password URL-encoding and
PREFECT_API_DATABASE_CONNECTION_URL assembly into a separate entrypoint script
(e.g., create an entrypoint.sh that uses Python or urllib.parse to quote
os.environ['PREFECT_DB_PASSWORD'] and exports the connection string, then runs
"prefect server start --host 0.0.0.0"), update the "command" to call that
script, and/or add a concise comment above the current inlined block explaining
why runtime URL-encoding is required and referencing the new entrypoint filename
if added.
In `@tools/src/tools/flows/training_flow.py`:
- Around line 134-151: The pipeline currently ignores prepare_data's return and
always passes the hardcoded training_data_key into train_tide_model; modify
training_pipeline so it captures the return value from prepare_data (e.g.,
prepared_key = prepare_data(...)) and then pass that prepared_key into
train_tide_model(artifacts_bucket, prepared_key) to ensure the actual prepared
artifact/path is used (refer to functions prepare_data and train_tide_model).
ℹ️ Review info
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (9)
applications/equitypricemodel/src/equitypricemodel/tide_data.pyapplications/equitypricemodel/src/equitypricemodel/tide_model.pyapplications/equitypricemodel/tests/test_tide_data.pyapplications/equitypricemodel/tests/test_tide_model.pyapplications/equitypricemodel/tests/test_trainer.pyinfrastructure/__main__.pytools/src/tools/flows/training_flow.pytools/tests/test_deploy_training_flow.pytools/tests/test_training_flow.py
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 22 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 7
♻️ Duplicate comments (3)
infrastructure/__main__.py (1)
831-834:⚠️ Potential issue | 🟠 MajorEnforce TLS for Prefect and derive all exported/runtime Prefect URLs from the same scheme.
acm_certificate_arnis hardcoded toNone, which keeps Prefect dashboard exposure on HTTP, andPREFECT_UI_API_URLis also hardcoded tohttp://.... This leaves dashboard traffic unencrypted and will also cause mixed-content failures once HTTPS is enabled elsewhere.Suggested fix
-# TODO: Enable HTTPS for the Prefect dashboard when an ACM certificate is provisioned. -# Set acm_certificate_arn to the certificate ARN to activate the HTTPS listener below. -acm_certificate_arn = None +acm_certificate_arn = stack_config.require("acmCertificateArn") ... - "value": f"http://{args[4]}:4200/api", + "value": f"https://{args[4]}:4200/api", ... -protocol = "https://" if acm_certificate_arn else "http://" +protocol = "https://"Also applies to: 1703-1706, 2204-2259
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@infrastructure/__main__.py` around lines 831 - 834, acm_certificate_arn is hardcoded to None causing the Prefect dashboard and PREFECT_UI_API_URL to always use HTTP; change the logic so when a valid acm_certificate_arn is provided you enable the HTTPS listener and set the scheme variable to "https", otherwise leave it "http", then derive all exported/runtime Prefect URLs (e.g. PREFECT_UI_API_URL) from that single scheme variable so they consistently use the same protocol and avoid mixed-content issues; update the code around acm_certificate_arn and any places that build PREFECT_UI_API_URL to use this computed scheme and enable the HTTPS listener when acm_certificate_arn is present (refer to acm_certificate_arn and PREFECT_UI_API_URL).applications/equitypricemodel/src/equitypricemodel/tide_data.py (1)
332-333:⚠️ Potential issue | 🟡 MinorRespect explicit empty stage lists in
Pipeline.__init__.
stages or default_stages()makesPipeline(stages=[])impossible because an empty list is treated as falsy and replaced with defaults.Suggested fix
- self.stages = stages or default_stages() + self.stages = stages if stages is not None else default_stages()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@applications/equitypricemodel/src/equitypricemodel/tide_data.py` around lines 332 - 333, Pipeline.__init__ treats an empty list as falsy by using stages or default_stages(), preventing callers from passing Pipeline(stages=[]); change the assignment to only use defaults when stages is None (e.g., check if stages is None and call default_stages(), otherwise assign the given stages) so explicit empty lists are preserved; update the line in Pipeline.__init__ that assigns self.stages to reference this new conditional behavior and keep the default_stages() call for the None case.tools/src/tools/run_training_job.py (1)
55-61:⚠️ Potential issue | 🟡 MinorValidate
LOOKBACK_DAYSis strictly positive before running the pipeline.Line 55 parses the value but allows
0/negative values through, which can produce invalid date windows and deferred runtime failures.♻️ Suggested fix
try: lookback_days = int(os.getenv("LOOKBACK_DAYS", "365")) + if lookback_days <= 0: + logger.error( + "LOOKBACK_DAYS must be a positive integer", + lookback_days=lookback_days, + ) + sys.exit(1) run_training_job( base_url=base_url, data_bucket=data_bucket, artifacts_bucket=artifacts_bucket, lookback_days=lookback_days, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tools/src/tools/run_training_job.py` around lines 55 - 61, Validate that the parsed LOOKBACK_DAYS environment variable is strictly positive before calling run_training_job: after computing lookback_days = int(os.getenv("LOOKBACK_DAYS", "365")), check lookback_days > 0 and if not, raise a clear ValueError or log an error and exit; ensure this validation is done immediately before the call to run_training_job(base_url=..., data_bucket=..., artifacts_bucket=..., lookback_days=lookback_days) so invalid 0/negative values are rejected early.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@applications/equitypricemodel/src/equitypricemodel/server.py`:
- Around line 170-172: The except ClientError handler that currently calls
logger.info("SSM parameter not available, using default artifact path") should
instead call logger.exception(...) so the stack trace is captured; update the
exception block in server.py (the except ClientError branch that sets
model_version = "latest") to use logger.exception with a clear message (e.g.,
"SSM parameter not available, using default artifact path") and keep setting
model_version = "latest" afterwards.
In `@applications/equitypricemodel/src/equitypricemodel/tide_model.py`:
- Around line 322-326: The code unconditionally restores whenever
os.path.exists(checkpoint_path) even if no checkpoint was saved in this run;
change the restore logic around the os.path.exists(checkpoint_path) check to
ensure the checkpoint was created by the current run (e.g., create a
run-specific subdirectory or write a sentinel file when saving, or compare
checkpoint_path mtime against process start time) and only restore if that
sentinel exists or mtime is newer than the current process start; update the
restore branch that references checkpoint_directory/checkpoint_path to use this
safer check so stale checkpoints from previous runs are not loaded.
In `@applications/equitypricemodel/src/equitypricemodel/trainer.py`:
- Around line 33-54: The code uses the incoming configuration directly which
causes KeyError for partial overrides; before any access, merge
DEFAULT_CONFIGURATION with the incoming configuration (e.g., create
merged_configuration = dict(DEFAULT_CONFIGURATION) and update it with
configuration or {}), then use merged_configuration for logger.info and when
reading keys for get_batches and validation/input/output/batch sizes; update all
references to configuration in trainer.py (including the
logger.info("Configuration loaded") call and the arguments passed to
tide_data.get_batches) to use the merged configuration.
In `@infrastructure/__main__.py`:
- Around line 1789-1825: The TRAINING_NOTIFICATION_SENDER_EMAIL and
TRAINING_NOTIFICATION_RECIPIENT_EMAILS values are being placed in the ECS task's
environment (exposed as plain text); change them to use the task definition's
secrets field instead (like PREFECT_DB_PASSWORD in the prefect-server task). In
the lambda that builds the task JSON (the apply lambda producing
"prefect-worker"), remove those two entries from the "environment" array and add
corresponding entries in a "secrets" array with objects using "name":
"TRAINING_NOTIFICATION_SENDER_EMAIL" and "name":
"TRAINING_NOTIFICATION_RECIPIENT_EMAILS" and "valueFrom": args[5] / args[6]
respectively so the task pulls values from Secrets Manager/SSM. Ensure the
secret names/ARNs passed in args[5] and args[6] are used verbatim in valueFrom
and maintain the existing environment entries unchanged.
In `@maskfile.md`:
- Line 816: Replace the derived PREFECT_API_URL assignment that appends "/api"
to the UI URL (the export PREFECT_API_URL="$(pulumi stack output
prefect_ui_url)/api") with a direct Pulumi output for the API endpoint; call
pulumi stack output prefect_api_url and export that value to PREFECT_API_URL (do
the same for the other occurrence), so the environment variable uses the
dedicated prefect_api_url output instead of assuming UI routing.
- Line 268: The rolling redeploy for-loop currently lists services
"fund-datamanager fund-portfoliomanager fund-equitypricemodel
fund-prefect-worker" but omits the Prefect server; update the for loop (the line
starting with for service in ...) to include "fund-prefect-server" alongside the
other service names so the Prefect server is also forced to redeploy during
`stack up`, ensuring it isn't left on a stale image/config.
In `@tools/Dockerfile`:
- Around line 35-36: The current HEALTHCHECK entry is a no-op (CMD ["python",
"-c", "import sys; sys.exit(0)"]) and must be replaced with a real liveness
probe that verifies the Prefect worker is actually alive; update the HEALTHCHECK
to run a command that either (a) checks the worker process (e.g., pgrep for the
Prefect worker binary/name such as "prefect" or "prefect-worker") or (b) queries
a local health/readiness endpoint (e.g., curl against the Prefect API/agent on
localhost) and exits non‑zero on failure so ECS can mark the task unhealthy.
Locate and replace the HEALTHCHECK line in the Dockerfile accordingly and ensure
the chosen probe returns exit code 0 on success and non‑zero on failure.
---
Duplicate comments:
In `@applications/equitypricemodel/src/equitypricemodel/tide_data.py`:
- Around line 332-333: Pipeline.__init__ treats an empty list as falsy by using
stages or default_stages(), preventing callers from passing Pipeline(stages=[]);
change the assignment to only use defaults when stages is None (e.g., check if
stages is None and call default_stages(), otherwise assign the given stages) so
explicit empty lists are preserved; update the line in Pipeline.__init__ that
assigns self.stages to reference this new conditional behavior and keep the
default_stages() call for the None case.
In `@infrastructure/__main__.py`:
- Around line 831-834: acm_certificate_arn is hardcoded to None causing the
Prefect dashboard and PREFECT_UI_API_URL to always use HTTP; change the logic so
when a valid acm_certificate_arn is provided you enable the HTTPS listener and
set the scheme variable to "https", otherwise leave it "http", then derive all
exported/runtime Prefect URLs (e.g. PREFECT_UI_API_URL) from that single scheme
variable so they consistently use the same protocol and avoid mixed-content
issues; update the code around acm_certificate_arn and any places that build
PREFECT_UI_API_URL to use this computed scheme and enable the HTTPS listener
when acm_certificate_arn is present (refer to acm_certificate_arn and
PREFECT_UI_API_URL).
In `@tools/src/tools/run_training_job.py`:
- Around line 55-61: Validate that the parsed LOOKBACK_DAYS environment variable
is strictly positive before calling run_training_job: after computing
lookback_days = int(os.getenv("LOOKBACK_DAYS", "365")), check lookback_days > 0
and if not, raise a clear ValueError or log an error and exit; ensure this
validation is done immediately before the call to run_training_job(base_url=...,
data_bucket=..., artifacts_bucket=..., lookback_days=lookback_days) so invalid
0/negative values are rejected early.
ℹ️ Review info
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (3)
.flox/env/manifest.lockis excluded by!**/*.lockCargo.lockis excluded by!**/*.lockuv.lockis excluded by!**/*.lock
📒 Files selected for processing (21)
applications/equitypricemodel/pyproject.tomlapplications/equitypricemodel/src/equitypricemodel/server.pyapplications/equitypricemodel/src/equitypricemodel/tide_data.pyapplications/equitypricemodel/src/equitypricemodel/tide_model.pyapplications/equitypricemodel/src/equitypricemodel/trainer.pyapplications/equitypricemodel/tests/conftest.pyapplications/equitypricemodel/tests/test_server.pyapplications/equitypricemodel/tests/test_tide_data.pyapplications/equitypricemodel/tests/test_tide_model.pyapplications/equitypricemodel/tests/test_trainer.pyinfrastructure/__main__.pyinfrastructure/parameters.pymaskfile.mdpyproject.tomltools/Dockerfiletools/pyproject.tomltools/src/tools/flows/training_flow.pytools/src/tools/run_training_job.pytools/tests/test_deploy_training_flow.pytools/tests/test_notifications.pytools/tests/test_training_flow.py
💤 Files with no reviewable changes (1)
- pyproject.toml
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 26 out of 29 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Addressed open review feedback across Prefect migration changes and fixed the failing Python code check. Root causes fixed: - Training flow passed an S3 URI where an object key was expected and recomputed date ranges in multiple steps. - Trainer config handling did not safely merge partial overrides. - Model checkpoint restore logic could load stale checkpoints from previous runs. - Prefect worker healthcheck was a no-op and deploy/run scripts had unguarded LOOKBACK_DAYS parsing. - Notification handling was brittle around whitespace recipients and zero-duration formatting. - Infrastructure exposed notification emails as env vars instead of ECS secrets and had several least-privilege/snapshot hardening gaps. - Remaining lint/type issues in touched and adjacent files blocked the Python check. Fixes included: - Deterministic encoding in tide_data and partial-config merge in trainer. - Safe checkpoint restore guard + checkpointing in training flow task. - Date-range threading through flow tasks + robust S3 URI/key handling. - LOOKBACK_DAYS validation in deploy/run entrypoints. - Real worker process liveness healthcheck in tools Docker image. - Notification parsing/logging improvements and server exception logging update. - ECS secrets wiring for training notification emails via SecureString SSM parameters. - Prefect deployment/CLI mask updates, RDS snapshot hardening, IAM scope tightening. - Supporting test updates plus minor lint cleanups needed for green checks. Validation: - mask development python lint - mask development python type-check - mask development python test
|
Completed a full pass through all open review feedback on this branch and pushed commit 78581aa.\n\nHighlights of what was addressed:\n- Resolved all open review threads (including outdated ones) with per-thread replies.\n- Fixed remaining Python check failures and validated with lint/type-check/tests.\n- Hardened Prefect training flow (date range threading, key/URI handling, checkpointing + stale-restore guard, LOOKBACK validation).\n- Tightened infra/security items (ECS secrets wiring for notification emails, SES IAM scope reduction, snapshot retention hardening, Prefect TLS handling improvements, mask updates).\n- Replaced no-op worker healthcheck with a real process liveness probe.\n\nAll GitHub checks are now passing. Please re-review when you get a chance. |
Root cause: model artifact version lookup from SSM used get_parameter without WithDecryption. That works for String parameters today, but would return ciphertext if the parameter were ever migrated to SecureString, producing invalid artifact keys and startup failures. Fix: pass WithDecryption=True when reading MODEL_VERSION_SSM_PARAMETER so both String and SecureString values are safely handled. Validation: - mask development python lint - mask development python type-check - uv run pytest applications/equitypricemodel/tests/test_server.py
Additional Comments (1)
The PR description states 88% coverage. CLAUDE.md requires at least 90% line or statement coverage per service (line 35). The remaining gaps should be covered before merging. Prompt To Fix With AIThis is a comment left during a code review.
Path: applications/equitypricemodel/src/equitypricemodel/trainer.py
Line: 1
Comment:
Test coverage is below the required 90% threshold.
The PR description states 88% coverage. CLAUDE.md requires at least 90% line or statement coverage per service (line 35). The remaining gaps should be covered before merging.
How can I resolve this? If you propose a fix, please make it concise. |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 28 out of 31 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Root causes: - Prefect UI API URL inside the server task definition was always built with http, which would cause mixed-content failures when TLS is enabled. - Pipeline(stages=[]) unintentionally fell back to default stages due truthy/falsy evaluation. - Training flow failed when /equity-details returned 501 Not Implemented. - Test setup used an uninitialized ClientError instance, diverging from runtime behavior. - Category mapping annotations were too narrow for boolean category keys. Fixes: - Make PREFECT_UI_API_URL protocol conditional on ACM/TLS configuration. - Honor explicit empty stage lists with an explicit None check in Pipeline.__init__. - Treat 501 from equity-details sync as a non-fatal skip in training flow, while re-raising other errors. - Use a real constructed ClientError in server tests. - Widen mapping types and add a regression test for explicit empty stage lists. Validation: - mask development python lint - mask development python type-check - uv run pytest applications/equitypricemodel/tests/test_server.py applications/equitypricemodel/tests/test_tide_data.py tools/tests/test_training_flow.py
Additional Comments (1)
# In preprocess_and_set_data():
self.scaler = cast("Scaler", scale_and_encode.scaler) # ← inference scaler, not training scaler
self.mappings = cast("FeatureMappings", scale_and_encode.mappings) # ← freshly fitted from inference dataThis creates two silent correctness problems:
The Fix: Use a separate preprocessing path at inference time that applies the loaded scaler and mappings rather than re-fitting them. For example, add an |
|
Quick follow-up pass complete.
There are currently no unresolved review threads on the PR. |
Overview
Changes
tide_data.pyfrom 230-line monolith into 6 named, testable pipeline stages (ValidateColumns, ExpandDateRange, FillNulls, EngineerFeatures, CleanData, ScaleAndEncode)trainer.pyfrom module-level script into callabletrain_model()functiontide_model.pyto skip zero-size batches during trainingsagemakerdependency withprefectin toolsmaskfile.mdtraining commands to use PrefectContext
SageMaker added unnecessary complexity and cost for training a model that runs fine on CPU. Prefect v3 provides pipeline orchestration with a dashboard for monitoring, task-level retries, and flow visibility without the overhead of SageMaker infrastructure.
The training pipeline has been verified end-to-end: data preparation from S3 (938k rows), model training with early stopping (4 epochs), artifact packaging and upload to S3. All 129 tests pass with 88% coverage.
Prefect dashboard:
http://fund-alb-344913377.us-east-1.elb.amazonaws.com:4200Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Tests