Add internal scheduler to portfolio manager#803
Conversation
…flow The create_or_update_portfolio GitHub Actions workflow triggered the portfolio rebalance externally on a cron schedule. This replaces that mechanism with a self-contained async scheduler inside the service. Changes: - Add scheduler.py: DST-aware scheduler targeting 10:00 AM Eastern using zoneinfo (handles EST/EDT automatically). Fires on weekdays; skips weekends as a basic guard with the Alpaca market clock check handling holidays. Includes an idempotency guard that queries the prior portfolio from data_manager to skip rebalances already completed today. Captures Sentry exceptions on failure. Uses a background tasks set (RUF006 compliant) to hold a strong reference to the asyncio task. - Add rebalance.py: Extracts full orchestration logic from server.py into run_rebalance(alpaca_client) for use by both the /portfolio endpoint and the scheduler. All helper functions (get_prior_portfolio, evaluate_prior_pairs, get_optimal_portfolio, get_positions, get_raw_predictions) move here. - Slim server.py to wiring only: lifespan, app setup, /health, and /portfolio (one-liner calling run_rebalance). Spawns the scheduler from lifespan after initializing AlpacaClient. - Add AlpacaClient.is_market_open(): calls Alpaca get_clock() to check whether the market is open before executing the scheduled rebalance. - Delete create_or_update_portfolio.yaml: superseded by the scheduler. - Rename test_portfolio_server.py to test_rebalance.py and update all import and patch paths from portfolio_manager.server to portfolio_manager.rebalance. - Add test_scheduler.py: covers _seconds_until_next_rebalance (weekday targeting, post-window rollover, Saturday/Sunday skipping) and _already_rebalanced_today (today match, yesterday, empty, error status, network exception, missing timestamp field). - Add tests for AlpacaClient.is_market_open (open and closed cases). - Rename FUND_DATAMANAGER_BASE_URL to FUND_DATA_MANAGER_BASE_URL across all services (portfolio_manager, ensemble_manager, tide model), infrastructure (compute.py, docker-compose.yaml), and maskfile.md. Rename the Python variable from DATAMANAGER_BASE_URL to DATA_MANAGER_BASE_URL in portfolio_manager and ensemble_manager. The /portfolio POST endpoint and its mask command are retained for manual ad-hoc rebalancing. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThis PR refactors the portfolio rebalancing system by extracting complex orchestration logic from the server into dedicated modules, introduces an async scheduled rebalancing workflow with market-open checks and idempotency guards, and standardizes naming conventions (renaming Changes
Sequence DiagramsequenceDiagram
participant Scheduler as Scheduler
participant AlpacaAPI as Alpaca API
participant DataManager as Data Manager
participant EnsembleManager as Ensemble Manager
participant Orchestrator as Rebalance Orchestrator
loop Scheduler Loop
Scheduler->>Scheduler: Calculate next 10:00 AM ET weekday
Scheduler->>Scheduler: Sleep until scheduled time
Scheduler->>AlpacaAPI: get_clock()
alt Market Closed
AlpacaAPI-->>Scheduler: is_open = false
Scheduler->>Scheduler: Skip execution, loop
else Market Open
AlpacaAPI-->>Scheduler: is_open = true
Scheduler->>DataManager: GET /portfolios (idempotency check)
alt Already Rebalanced Today
DataManager-->>Scheduler: timestamp matches today
Scheduler->>Scheduler: Skip execution, loop
else No Prior Rebalance
DataManager-->>Scheduler: no matching timestamp
Scheduler->>Orchestrator: run_rebalance()
Orchestrator->>DataManager: GET equity bars, details, prices
DataManager-->>Orchestrator: historical data
Orchestrator->>EnsembleManager: POST /predictions
EnsembleManager-->>Orchestrator: ensemble predictions
Orchestrator->>Orchestrator: Evaluate pairs, compute portfolio
Orchestrator->>Orchestrator: Generate trade orders
Orchestrator->>AlpacaAPI: Execute close/open orders
AlpacaAPI-->>Orchestrator: order confirmations
Orchestrator->>DataManager: POST /portfolios (persist state)
DataManager-->>Orchestrator: confirmation
Orchestrator-->>Scheduler: completion status
end
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
Adds an internal, always-on rebalance scheduler to the portfolio manager service (shifting responsibility away from a GitHub Actions cron), and standardizes the Data Manager base URL environment variable naming across services and tooling.
Changes:
- Renamed
FUND_DATAMANAGER_BASE_URL→FUND_DATA_MANAGER_BASE_URLacross apps, infra, and CLI tooling. - Split the portfolio manager rebalance logic out of
server.pyinto a dedicatedrebalance.pymodule. - Added an internal async scheduler loop to trigger rebalances on a weekday 10:00am ET cadence and removed the GitHub Actions scheduled workflow.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| models/tide/src/tide/workflow.py | Updates env var name used when running the TiDE workflow directly. |
| models/tide/src/tide/run.py | Updates env var name used by the TiDE runner entrypoint. |
| models/tide/src/tide/deploy.py | Updates env var name used by the TiDE deploy entrypoint. |
| maskfile.md | Updates local/dev commands to export the renamed Data Manager base URL env var. |
| infrastructure/docker-compose.yaml | Updates compose env var wiring to the renamed Data Manager base URL. |
| infrastructure/compute.py | Updates ECS task env var names to the renamed Data Manager base URL. |
| applications/portfolio_manager/tests/test_scheduler.py | Adds unit tests for scheduler timing and idempotency guard behavior. |
| applications/portfolio_manager/tests/test_rebalance.py | Updates imports/mocks to reflect extraction of rebalance logic into rebalance.py. |
| applications/portfolio_manager/tests/test_alpaca_client.py | Adds tests for the new AlpacaClient.is_market_open() method. |
| applications/portfolio_manager/src/portfolio_manager/server.py | Starts the internal scheduler on app lifespan and delegates /portfolio to run_rebalance. |
| applications/portfolio_manager/src/portfolio_manager/scheduler.py | New internal scheduler loop for weekday 10:00am ET rebalance triggering. |
| applications/portfolio_manager/src/portfolio_manager/rebalance.py | New module containing the rebalance implementation previously embedded in server.py. |
| applications/portfolio_manager/src/portfolio_manager/alpaca_client.py | Adds is_market_open() to support scheduler gating. |
| applications/ensemble_manager/src/ensemble_manager/server.py | Renames base URL env var and updates references accordingly. |
| .github/workflows/create_or_update_portfolio.yaml | Removes the GitHub Actions scheduled rebalance workflow (now replaced by internal scheduler). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Greptile SummaryThis PR migrates the portfolio manager's rebalance trigger from a GitHub Actions cron workflow to an in-process asyncio scheduler, splits Confidence Score: 3/5The scheduled rebalance path is sound, but the manual POST /portfolio recovery endpoint is permanently broken — merge should wait for that one-line fix. All prior review concerns are resolved and the scheduler/rebalance split is well-implemented with good test coverage. The single remaining issue is a concrete, reproducible bug that disables the manual trigger endpoint entirely, keeping the score at 3. applications/portfolio_manager/src/portfolio_manager/server.py — the lock acquisition on line 101
|
| Filename | Overview |
|---|---|
| applications/portfolio_manager/src/portfolio_manager/server.py | Slim refactored server; asyncio.wait_for(lock.acquire(), timeout=0) in create_portfolio always raises TimeoutError, permanently breaking the manual rebalance endpoint. |
| applications/portfolio_manager/src/portfolio_manager/scheduler.py | New scheduler with weekend-skip, catch-up detection, market-open check, idempotency guard, response-code logging, and self-healing outer exception handler; all prior concerns resolved. |
| applications/portfolio_manager/src/portfolio_manager/rebalance.py | Core rebalance logic extracted from old server.py; save_portfolio return value now captured and propagated as HTTP 207; async httpx replaces sync requests. |
| applications/portfolio_manager/src/portfolio_manager/alpaca_client.py | Adds is_market_open() helper wrapping get_clock() with rate-limit sleep; minimal and correctly tested. |
| applications/portfolio_manager/tests/test_scheduler.py | New test file covering _seconds_until_next_rebalance and _already_rebalanced_today with weekday/weekend/DST-boundary and error-path cases. |
| applications/portfolio_manager/tests/test_rebalance.py | Renamed from test_portfolio_server.py; patch targets updated to new module; async helpers added; error-response test now correctly expects raise rather than empty return. |
| infrastructure/compute.py | Env var renamed from FUND_DATAMANAGER_BASE_URL to FUND_DATA_MANAGER_BASE_URL for both portfolio_manager and ensemble_manager task definitions. |
Comments Outside Diff (1)
-
applications/portfolio_manager/src/portfolio_manager/server.py, line 101-103 (link)asyncio.wait_for(lock.acquire(), timeout=0)always raisesTimeoutErrorIn CPython's asyncio,
wait_forwithtimeout <= 0callsensure_future(coro)to wrap the coroutine in a Task, then immediately checksfut.done(). A freshly created Task has not yet had a chance to run (its first step is only scheduled viacall_soon), sofut.done()is alwaysFalse— even when the lock is completely free.wait_forthen cancels the task and raisesTimeoutError.The consequence is that
POST /portfolioalways returns HTTP 409 Conflict, making manual rebalancing permanently broken.The idiomatic asyncio way to attempt a non-blocking acquire is to check
.locked()— which is safe without a race because asyncio is single-threaded and there is noawaitbetween the check and the acquire:
Prompt To Fix All With AI
This is a comment left during a code review.
Path: applications/portfolio_manager/src/portfolio_manager/server.py
Line: 101-103
Comment:
**`asyncio.wait_for(lock.acquire(), timeout=0)` always raises `TimeoutError`**
In CPython's asyncio, `wait_for` with `timeout <= 0` calls `ensure_future(coro)` to wrap the coroutine in a Task, then immediately checks `fut.done()`. A freshly created Task has not yet had a chance to run (its first step is only scheduled via `call_soon`), so `fut.done()` is always `False` — even when the lock is completely free. `wait_for` then cancels the task and raises `TimeoutError`.
The consequence is that `POST /portfolio` always returns HTTP 409 Conflict, making manual rebalancing permanently broken.
The idiomatic asyncio way to attempt a non-blocking acquire is to check `.locked()` — which is safe without a race because asyncio is single-threaded and there is no `await` between the check and the acquire:
```suggestion
if _rebalance_lock.locked():
return Response(status_code=status.HTTP_409_CONFLICT)
await _rebalance_lock.acquire()
```
How can I resolve this? If you propose a fix, please make it concise.Reviews (8): Last reviewed commit: "Address pull request #803 feedback: metr..." | Re-trigger Greptile
There was a problem hiding this comment.
Warning
CodeRabbit couldn't request changes on this pull request because it doesn't have sufficient GitHub permissions.
Please grant CodeRabbit Pull requests: Read and write permission and re-run the review.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@applications/portfolio_manager/src/portfolio_manager/rebalance.py`:
- Around line 215-257: The current flow persists the target portfolio from
get_optimal_portfolio() before executing close/open trades, which can leave
/portfolios ahead of broker state if closes fail; change rebalance.py so that
the code in the close_positions loop (where close_results is built and
exceptions are caught) does not allow persisting the pre-trade target — instead
collate actual executed outcomes (use close_results and any successful opens)
and only write/update the data-manager /portfolios with the executed state (or a
merged prior-portfolio adjusted by close_results) after all trade attempts
complete; also ensure the caller that runs replacement opens checks
close_results for failures and skips or rolls back opens when closes failed so
you never double-expose positions.
- Around line 115-127: The code currently removes all prior tickers from
consolidated_signals after calling evaluate_prior_pairs(), which drops even the
ones evaluate_prior_pairs() returned as held; change the filter to only remove
tickers that should be closed by computing closing_tickers =
set(prior_portfolio_tickers) - set(held_tickers) (or the equivalent in Polars)
and then filter out only those closing_tickers from consolidated_signals; update
the same pattern where you currently filter using prior_portfolio_tickers
(locations around the other referenced blocks) so held_tickers remain in the
saved portfolio and are managed on subsequent rebalances.
- Around line 173-192: The except block for InsufficientPairsError currently
returns immediately and skips the existing get_positions() + close loop, leaving
stale broker positions open; update the except handler for
InsufficientPairsError thrown by get_optimal_portfolio to still call
get_positions() and run the same close-loop logic that normally executes (the
code that iterates current positions and issues closes) before returning;
reference get_optimal_portfolio, InsufficientPairsError, get_positions and the
close-loop / close_position logic so you locate and reuse the exact closing code
path rather than duplicating different behavior.
- Around line 57-93: The run_rebalance coroutine currently calls blocking
network functions (e.g., alpaca_client.get_account,
alpaca_client.get_shortable_tickers, alpaca_client.close_position,
fetch_historical_prices, fetch_equity_details, fetch_spy_prices,
get_prior_portfolio which uses requests.get, and requests.post) directly on the
event loop; wrap these synchronous calls so they run off the loop (use
asyncio.to_thread or loop.run_in_executor) or replace them with true async
clients (e.g., alpaca async client, httpx.AsyncClient) and await their async
variants; update calls in run_rebalance to await the offloaded tasks (e.g.,
account = await asyncio.to_thread(alpaca_client.get_account) or account = await
alpaca_async.get_account()) and do the same for fetch_historical_prices,
fetch_equity_details, fetch_spy_prices, get_prior_portfolio/requests.post and
any alpaca_client methods to avoid blocking the FastAPI event loop.
In `@applications/portfolio_manager/src/portfolio_manager/scheduler.py`:
- Around line 39-61: Change _already_rebalanced_today to compute dates in
Eastern time and to "fail closed" on any inability to verify prior state:
convert today using the US/Eastern timezone (use zoneinfo/pendulum/pytz as
available) and when iterating rows convert row timestamps with the same Eastern
TZ before comparing; treat non-200 responses, empty payloads, JSON errors, or
any exception as "already rebalanced" (return True) so the scheduler will skip
the run on verification failure; update logging in logger.exception to record
the error but ensure the function returns True on errors or timeouts; keep the
datamanager_base_url and requests.get usage but flip the false/true returns
accordingly.
In `@applications/portfolio_manager/tests/test_scheduler.py`:
- Around line 87-157: Tests for _already_rebalanced_today use the real wall
clock causing flakiness around UTC midnight and miss a Monday-evening ET ->
Tuesday UTC boundary; freeze the scheduler clock in the idempotency tests (e.g.,
patch datetime.now or use freezegun.freeze_time) so both the fixture timestamp
and the call to _already_rebalanced_today use the same fixed "now", add an
explicit test that sets "now" to Tuesday 00:30 UTC while creating a portfolio
timestamp for Monday 20:30 ET (which is Tuesday 00:30 UTC) to ensure the
function treats that entry as today, and apply this freezing pattern to the
existing tests like
test_already_rebalanced_today_returns_true_when_todays_portfolio_exists and the
other idempotency tests to prevent flakiness.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: a86541fc-51a2-491f-af16-dbad21dafbb3
📒 Files selected for processing (15)
.github/workflows/create_or_update_portfolio.yamlapplications/ensemble_manager/src/ensemble_manager/server.pyapplications/portfolio_manager/src/portfolio_manager/alpaca_client.pyapplications/portfolio_manager/src/portfolio_manager/rebalance.pyapplications/portfolio_manager/src/portfolio_manager/scheduler.pyapplications/portfolio_manager/src/portfolio_manager/server.pyapplications/portfolio_manager/tests/test_alpaca_client.pyapplications/portfolio_manager/tests/test_rebalance.pyapplications/portfolio_manager/tests/test_scheduler.pyinfrastructure/compute.pyinfrastructure/docker-compose.yamlmaskfile.mdmodels/tide/src/tide/deploy.pymodels/tide/src/tide/run.pymodels/tide/src/tide/workflow.py
💤 Files with no reviewable changes (1)
- .github/workflows/create_or_update_portfolio.yaml
…balance Group A - simple fixes: - Fix DATA_MANAGER_BASE_URL default from http://datamanager:8080 to http://data-manager:8080 to match docker-compose service name - Rename datamanager_base_url to data_manager_base_url in _already_rebalanced_today, spawn_rebalance_scheduler, _rebalance_loop, and callers in server.py Group B - scheduler robustness: - Wrap is_market_open() in try/except in _rebalance_loop; log exception and capture to Sentry, then continue so transient Alpaca errors do not kill the loop - Wrap entire _rebalance_loop body in try/except asyncio.CancelledError to exit cleanly on task cancellation without a noisy traceback - Return asyncio.Task from spawn_rebalance_scheduler; lifespan in server.py stores it in _app.state.scheduler_task and cancels/awaits it after yield for clean shutdown - Change _already_rebalanced_today to use _EASTERN timezone for both today and row timestamp comparisons; fail closed (return True) on HTTP errors >= 400 and on any exception so transient data_manager failures skip rather than trigger a duplicate rebalance - Freeze portfolio_manager.scheduler.datetime in idempotency tests so both fixture timestamps and the function under test see the same clock; add test_already_rebalanced_today_handles_eastern_utc_day_boundary to cover Monday 20:30 ET stored as Tuesday UTC; update error/exception test assertions to True (fail closed) Group C - rebalance.py fixes: - Move requests.post out of get_optimal_portfolio into a new save_portfolio sync helper; get_optimal_portfolio is now a pure computation function - Call save_portfolio from run_rebalance after all close and open loops complete, concatenating optimal_portfolio with held_rows so held pairs are preserved in saved portfolio state for subsequent rebalances Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Warning
CodeRabbit couldn't request changes on this pull request because it doesn't have sufficient GitHub permissions.
Please grant CodeRabbit Pull requests: Read and write permission and re-run the review.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@applications/portfolio_manager/src/portfolio_manager/rebalance.py`:
- Around line 382-398: The code currently discards the boolean return from
save_portfolio(final_portfolio, current_timestamp); update the rebalance flow to
capture its return (e.g., saved = save_portfolio(...)), log an error if saved is
False (use logger.error with context including current_timestamp and any
failed_trades), and return an appropriate non-200 status (e.g.,
Response(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) or another error
code consistent with the API) when saving fails so callers/scheduler/idempotency
logic (get_prior_portfolio() and _already_rebalanced_today()) do not see stale
state.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: b0475937-fe1b-4d7b-9645-3f71e3b0e3c5
📒 Files selected for processing (4)
applications/portfolio_manager/src/portfolio_manager/rebalance.pyapplications/portfolio_manager/src/portfolio_manager/scheduler.pyapplications/portfolio_manager/src/portfolio_manager/server.pyapplications/portfolio_manager/tests/test_scheduler.py
…sponse Both greptile (#2972557623) and CodeRabbit (#2972564944) flagged that the return value of save_portfolio() was being silently discarded. If the POST to the data manager fails after trades have already executed at Alpaca, the portfolio state is not persisted — causing _already_rebalanced_today() to return False on the next run and potentially re-executing trades against stale prior-portfolio data. Fix: capture the bool return of save_portfolio(), log it in the completion message, and include it in the failure condition so HTTP 207 is returned when either trades failed or the save failed. To stay within the cyclomatic complexity budget (rank D ≤ 30), the redundant `if candidate_pairs.height > 0:` guard around schema validation was removed. Pandera validates empty DataFrames correctly (unique/check_pair_tickers_different pass trivially on zero rows), so the guard was unnecessary. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Warning
CodeRabbit couldn't request changes on this pull request because it doesn't have sufficient GitHub permissions.
Please grant CodeRabbit Pull requests: Read and write permission and re-run the review.
Actionable comments posted: 1
♻️ Duplicate comments (3)
applications/portfolio_manager/src/portfolio_manager/rebalance.py (3)
172-191:⚠️ Potential issue | 🔴 CriticalKeep the close path running when there are no replacement pairs.
InsufficientPairsErrorreturns beforeget_positions()and the close loop. If today's signals say an existing pair should be exited but there are not enough new pairs to replace it, the stale broker positions stay open. Reuse the existing close path here and only skip the open side.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@applications/portfolio_manager/src/portfolio_manager/rebalance.py` around lines 172 - 191, The InsufficientPairsError handler currently returns early and skips the closing path; instead catch InsufficientPairsError from get_optimal_portfolio (referencing get_optimal_portfolio and InsufficientPairsError), log the warning, set a flag (e.g., no_replacements=True) or set optimal_portfolio to an empty/open-only result, then continue execution to call get_positions() and run the existing close-loop logic while ensuring the open-loop that creates new trades is skipped when the no_replacements flag is set; update uses of optimal_portfolio and any open-trade logic to respect that flag so only closes execute.
62-77:⚠️ Potential issue | 🟠 MajorOffload the synchronous I/O out of the async rebalance path.
run_rebalance()is now shared by the FastAPI route and the in-process scheduler, but it still performs blocking Alpaca/data-manager calls directly. One slow upstream call will stall the event loop and can hold/healthand every other request until the rebalance finishes. Useasyncio.to_thread()or async clients consistently in this path.This verification should show
run_rebalance()being awaited on the request/background path and the synchronous helpers still invoked inside the coroutine.#!/bin/bash set -euo pipefail rg -n --type=py '\bawait\s+run_rebalance\s*\(' applications/portfolio_manager/src rg -n --type=py \ '\b(get_account|get_shortable_tickers|close_position|open_position|fetch_historical_prices|fetch_equity_details|fetch_spy_prices|get_prior_portfolio|save_portfolio)\b' \ applications/portfolio_manager/src/portfolio_manager/rebalance.pyAlso applies to: 107-139, 217-303, 383-383
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@applications/portfolio_manager/src/portfolio_manager/rebalance.py` around lines 62 - 77, run_rebalance currently performs blocking I/O (e.g., alpaca_client.get_account, get_shortable_tickers, close_position, open_position, fetch_historical_prices, fetch_equity_details, fetch_spy_prices, get_prior_portfolio, save_portfolio) directly inside the coroutine, which stalls the event loop; update run_rebalance to keep its async signature but offload all blocking helpers by calling them with asyncio.to_thread(...) (or replace them with async clients) and ensure all call sites await run_rebalance; search for those function names in rebalance.py and wrap each synchronous call in asyncio.to_thread (or swap to an async alternative) so the FastAPI route and scheduler can await run_rebalance without blocking other requests.
214-383:⚠️ Potential issue | 🔴 CriticalDo not advance saved state from the target portfolio after partial execution.
close_resultsandopen_resultsare recorded, but they never drive control flow or persistence: opens still proceed after close failures, andfinal_portfolioalways uses the fulloptimal_portfolio. That can leave old positions live, save rows that never opened, and poison the next rebalance/idempotency check. If partial execution cannot be represented underportfolio_schema, fail the rebalance and do not persist target state.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@applications/portfolio_manager/src/portfolio_manager/rebalance.py` around lines 214 - 383, The code currently always computes final_portfolio and calls save_portfolio even when some close/open actions were skipped or failed; add a post-execution validation using close_results and open_results to detect any non-"success" entries and, if any exist, abort persisting the target state: log the summary (use skipped_insufficient_buying_power and skipped_not_shortable as well) and raise/return an error instead of calling save_portfolio(final_portfolio, current_timestamp). Locate the check just before computing held_rows/final_portfolio (referencing close_results, open_results, skipped_insufficient_buying_power, skipped_not_shortable, final_portfolio, optimal_portfolio, prior_portfolio, held_tickers, and save_portfolio) and implement the early-fail behavior so partial execution does not advance saved state.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@applications/portfolio_manager/src/portfolio_manager/rebalance.py`:
- Around line 410-451: get_prior_portfolio currently swallows data-manager HTTP
errors and parsing exceptions by returning an empty DataFrame, which lets
rebalance proceed incorrectly; change it so only an explicit empty payload
(response.text empty or "[]") maps to empty and all other failures bubble to the
caller: use response.raise_for_status() (or raise a requests.HTTPError) instead
of returning empty on non-2xx, do not return empty inside the except block for
ValueError/JSONDecodeError/pl.exceptions.PolarsError in get_prior_portfolio —
log the error if desired and re-raise it (or let it propagate), and only
construct/return an empty pl.DataFrame when response_text indicates "no
portfolio yet" or when prior_portfolio.is_empty() after successful parsing.
Ensure references to get_prior_portfolio, response.raise_for_status(),
prior_portfolio, and the same exception classes guide where to change behavior.
---
Duplicate comments:
In `@applications/portfolio_manager/src/portfolio_manager/rebalance.py`:
- Around line 172-191: The InsufficientPairsError handler currently returns
early and skips the closing path; instead catch InsufficientPairsError from
get_optimal_portfolio (referencing get_optimal_portfolio and
InsufficientPairsError), log the warning, set a flag (e.g.,
no_replacements=True) or set optimal_portfolio to an empty/open-only result,
then continue execution to call get_positions() and run the existing close-loop
logic while ensuring the open-loop that creates new trades is skipped when the
no_replacements flag is set; update uses of optimal_portfolio and any open-trade
logic to respect that flag so only closes execute.
- Around line 62-77: run_rebalance currently performs blocking I/O (e.g.,
alpaca_client.get_account, get_shortable_tickers, close_position, open_position,
fetch_historical_prices, fetch_equity_details, fetch_spy_prices,
get_prior_portfolio, save_portfolio) directly inside the coroutine, which stalls
the event loop; update run_rebalance to keep its async signature but offload all
blocking helpers by calling them with asyncio.to_thread(...) (or replace them
with async clients) and ensure all call sites await run_rebalance; search for
those function names in rebalance.py and wrap each synchronous call in
asyncio.to_thread (or swap to an async alternative) so the FastAPI route and
scheduler can await run_rebalance without blocking other requests.
- Around line 214-383: The code currently always computes final_portfolio and
calls save_portfolio even when some close/open actions were skipped or failed;
add a post-execution validation using close_results and open_results to detect
any non-"success" entries and, if any exist, abort persisting the target state:
log the summary (use skipped_insufficient_buying_power and skipped_not_shortable
as well) and raise/return an error instead of calling
save_portfolio(final_portfolio, current_timestamp). Locate the check just before
computing held_rows/final_portfolio (referencing close_results, open_results,
skipped_insufficient_buying_power, skipped_not_shortable, final_portfolio,
optimal_portfolio, prior_portfolio, held_tickers, and save_portfolio) and
implement the early-fail behavior so partial execution does not advance saved
state.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 85749210-479b-4b80-a917-5cbd7b7c1453
📒 Files selected for processing (1)
applications/portfolio_manager/src/portfolio_manager/rebalance.py
…rior portfolio error propagation Scheduler catch-up on restart: before the rebalance loop starts, check whether the pod restarted after the 10 AM ET window on a weekday without having run a rebalance; if so, skip the initial sleep and run immediately. Scheduler response logging: capture the return value of run_rebalance and emit a warning log when the status code is not 200 (e.g. 207 for partial failures), rather than silently discarding it. Scheduler outer exception guard: add a catch-all except Exception clause around the outer loop body so that unexpected errors (e.g. from _seconds_until_next_rebalance) are logged and sent to Sentry instead of crashing the background task permanently. Prior portfolio error propagation: replace the silent-return-empty behaviour in get_prior_portfolio with raise_for_status() for HTTP errors and re-raise for parse errors (ValueError, JSONDecodeError, PolarsError). The caller in run_rebalance already wraps this in a try/except that returns HTTP 500, so the rebalance is now correctly aborted rather than proceeding with an empty prior portfolio and potentially duplicating exposure. Remove the now-unused HTTP_BAD_REQUEST constant. Tests: rename test_get_prior_portfolio_returns_empty_dataframe_on_error_response to test_get_prior_portfolio_raises_on_error_response and update it to expect HTTPError; add test_get_prior_portfolio_raises_on_parse_error. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
♻️ Duplicate comments (3)
applications/portfolio_manager/src/portfolio_manager/rebalance.py (3)
171-190:⚠️ Potential issue | 🔴 CriticalStill run the close path when no replacement pairs are available.
If
get_optimal_portfolio()raisesInsufficientPairsError, this returns beforeget_positions()and the close loop. Any prior pair thatevaluate_prior_pairs()marked for exit stays live at Alpaca even though the response says “no trades will be made”; keep an emptyoptimal_portfolioand continue through the close path instead.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@applications/portfolio_manager/src/portfolio_manager/rebalance.py` around lines 171 - 190, When get_optimal_portfolio(...) raises InsufficientPairsError, the code currently returns early and skips get_positions()/the close loop; instead catch InsufficientPairsError, log the warning (including error and candidate_pairs.height), set optimal_portfolio = [] (empty list) and allow execution to continue to the existing get_positions()/close loop so evaluate_prior_pairs() exits are processed; keep the Response for the "no trades" outcome only after completing the close path if that is the intended end state. Ensure references to get_optimal_portfolio, InsufficientPairsError, optimal_portfolio, get_positions, and evaluate_prior_pairs are updated accordingly.
241-254:⚠️ Potential issue | 🔴 CriticalPersist the executed book, not the target book.
final_portfoliois always built fromoptimal_portfolio + held_rows, and Line 394 only treats"failed"as non-success. A skipped open can therefore be saved and returned as HTTP 200 even though it never reached the broker, while a close exception drops a still-live position from saved state; derive the saved portfolio from successful opens/closes (and failed closes that remain open) before callingsave_portfolio().Also applies to: 266-285, 319-355, 380-395
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@applications/portfolio_manager/src/portfolio_manager/rebalance.py` around lines 241 - 254, The code currently builds final_portfolio from optimal_portfolio + held_rows and calls save_portfolio using that target book; instead derive the persisted portfolio from the actual executed/failed actions: update final_portfolio by applying only successful opens from open_results, successful closes from close_results (and keep positions for closes that failed), and also include any skipped/ignored opens as not added; specifically adjust the logic around save_portfolio, final_portfolio, close_results, open_results, close_position and open_position so that save_portfolio is called with the executed book (include held_rows + successful opens - successful closes + any closes that failed and therefore remain live) rather than the original optimal_portfolio.
417-426:⚠️ Potential issue | 🟠 MajorDon't collapse falsy non-list payloads to an empty prior portfolio.
if not prior_portfolio_data:still treats{},null, orfalseas “no portfolio yet.” Only blank/[]should map toempty; any other JSON shape should raise so rebalance fails closed instead of opening fresh positions against holdings it could not parse.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@applications/portfolio_manager/src/portfolio_manager/rebalance.py` around lines 417 - 426, The code currently treats any falsy JSON (e.g., {}, null, false) as an empty prior portfolio because of "if not prior_portfolio_data:"; change this to only accept an actual empty list as the empty-case. After parsing response.json() into prior_portfolio_data, explicitly check "if isinstance(prior_portfolio_data, list) and len(prior_portfolio_data) == 0: return empty"; if prior_portfolio_data is not a list raise an exception (or logger.error + raise) so rebalance fails closed instead of proceeding with unexpected shapes; keep the variables response_text, prior_portfolio_data, and empty unchanged to locate the fix.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@applications/portfolio_manager/src/portfolio_manager/rebalance.py`:
- Around line 171-190: When get_optimal_portfolio(...) raises
InsufficientPairsError, the code currently returns early and skips
get_positions()/the close loop; instead catch InsufficientPairsError, log the
warning (including error and candidate_pairs.height), set optimal_portfolio = []
(empty list) and allow execution to continue to the existing
get_positions()/close loop so evaluate_prior_pairs() exits are processed; keep
the Response for the "no trades" outcome only after completing the close path if
that is the intended end state. Ensure references to get_optimal_portfolio,
InsufficientPairsError, optimal_portfolio, get_positions, and
evaluate_prior_pairs are updated accordingly.
- Around line 241-254: The code currently builds final_portfolio from
optimal_portfolio + held_rows and calls save_portfolio using that target book;
instead derive the persisted portfolio from the actual executed/failed actions:
update final_portfolio by applying only successful opens from open_results,
successful closes from close_results (and keep positions for closes that
failed), and also include any skipped/ignored opens as not added; specifically
adjust the logic around save_portfolio, final_portfolio, close_results,
open_results, close_position and open_position so that save_portfolio is called
with the executed book (include held_rows + successful opens - successful closes
+ any closes that failed and therefore remain live) rather than the original
optimal_portfolio.
- Around line 417-426: The code currently treats any falsy JSON (e.g., {}, null,
false) as an empty prior portfolio because of "if not prior_portfolio_data:";
change this to only accept an actual empty list as the empty-case. After parsing
response.json() into prior_portfolio_data, explicitly check "if
isinstance(prior_portfolio_data, list) and len(prior_portfolio_data) == 0:
return empty"; if prior_portfolio_data is not a list raise an exception (or
logger.error + raise) so rebalance fails closed instead of proceeding with
unexpected shapes; keep the variables response_text, prior_portfolio_data, and
empty unchanged to locate the fix.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 7749f7c9-269f-415c-83fa-e7194c62ae70
📒 Files selected for processing (3)
applications/portfolio_manager/src/portfolio_manager/rebalance.pyapplications/portfolio_manager/src/portfolio_manager/scheduler.pyapplications/portfolio_manager/tests/test_rebalance.py
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Warning
CodeRabbit couldn't request changes on this pull request because it doesn't have sufficient GitHub permissions.
Please grant CodeRabbit Pull requests: Read and write permission and re-run the review.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@applications/portfolio_manager/src/portfolio_manager/server.py`:
- Around line 95-96: create_portfolio currently calls run_rebalance directly,
which allows concurrent manual POSTs to collide with the scheduled rebalance;
introduce a shared asyncio.Lock (e.g., rebalance_lock) and use it to serialize
access to run_rebalance from both places: acquire the same rebalance_lock in
create_portfolio (async with rebalance_lock: await run_rebalance(...)) and
modify the scheduler wiring in
applications/portfolio_manager/src/portfolio_manager/scheduler.py to accept and
pass that same rebalance_lock into its task and to wrap its run_rebalance
invocation with the same async with rebalance_lock; ensure
_already_rebalanced_today() checks remain but do not replace the lock—both call
sites must use the shared lock to prevent overlapping executions.
- Around line 71-78: The scheduler blocks the event loop because _rebalance_loop
calls synchronous functions (_already_rebalanced_today) and run_rebalance
invokes blocking I/O (alpaca_client.get_account and fetch_historical_prices /
fetch_equity_details / fetch_spy_prices); fix by offloading those blocking calls
to a threadpool (use asyncio.to_thread(...) or loop.run_in_executor(...)) or
replace them with async equivalents (e.g., httpx or an async Alpaca client),
e.g., wrap calls to _already_rebalanced_today, alpaca_client.get_account,
fetch_historical_prices, fetch_equity_details, and fetch_spy_prices inside
asyncio.to_thread or equivalent before awaiting so spawn_rebalance_scheduler and
_rebalance_loop no longer block shutdown/health/metrics.
In `@maskfile.md`:
- Around line 819-823: The deploy block exports unused environment
variables—FUND_DATA_MANAGER_BASE_URL, AWS_S3_DATA_BUCKET_NAME, and
AWS_S3_MODEL_ARTIFACTS_BUCKET_NAME—that are consumed by tide.run but not by the
model deploy flow (uv run python -m tide.deploy); remove these three exports
from the deploy section (and the analogous exports at lines 827-830) so only the
actual deploy consumer variable FUND_LOOKBACK_DAYS is exported, ensuring the
model deploy block only sets environment vars consumed by tide.deploy and
avoiding the silent mismatch between tide.run and tide.deploy.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: e37e27ad-fa51-4961-950b-243abe625ed0
📒 Files selected for processing (5)
applications/ensemble_manager/src/ensemble_manager/server.pyapplications/portfolio_manager/src/portfolio_manager/server.pyinfrastructure/compute.pymaskfile.mdmodels/tide/src/tide/deploy.py
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…est flakiness, maskfile fix - rebalance.py: Call metrics.observe_duration(start) on all return paths, not just the success path. Added the call before each early return so error durations are visible in latency dashboards. - rebalance.py: Track positions_opened_count and positions_closed_count from actual trade results rather than planned positions. Counters are incremented in the existing success branches of the close/open loops, so gauges reflect executed trades, not pre-trade intent. - rebalance.py: Use side.value instead of str(side) for trade_dollar_amount_total metric labels. TradeSide.BUY.value is "BUY" whereas str(TradeSide.BUY) yields "TradeSide.BUY", which made metric labels inconsistent. - server.py: Replace locked() pre-check + async with pattern with asyncio.wait_for(_rebalance_lock.acquire(), timeout=0) so contention deterministically returns 409 rather than racing between the check and the acquire. - test_scheduler.py: Freeze datetime in test_seconds_until_next_rebalance_ returns_positive_value so it is not wall-clock-dependent and cannot be flaky at scheduling boundaries. - maskfile.md: Replace non-existent pulumi stack output training_api_url with fund_base_url, which is the correct exported output name. Co-Authored-By: Claude <noreply@anthropic.com>
Overview
Changes
server.pyinto separate filesContext
The stuff that you've added in #796 for data manager made sense so I did the same for portfolio manager, @chrisaddy. This should go in after that pull request since yours is significantly bigger.
Summary by CodeRabbit
Release Notes
New Features
Chores