Skip to content

Conversation

@ward-taoshi
Copy link
Contributor

Taoshi Pull Request

Description

[Provide a brief description of the changes introduced by this pull request.]

Related Issues (JIRA)

[Reference any related issues or tasks that this pull request addresses or closes.]

Checklist

  • I have tested my changes on testnet.
  • I have updated any necessary documentation.
  • I have added unit tests for my changes (if applicable).
  • If there are breaking changes for validators, I have (or will) notify the community in Discord of the release.

Reviewer Instructions

[Provide any specific instructions or areas you would like the reviewer to focus on.]

Definition of Done

  • Code has been reviewed.
  • All checks and tests pass.
  • Documentation is up to date.
  • Approved by at least one reviewer.

Checklist (for the reviewer)

  • Code follows project conventions.
  • Code is well-documented.
  • Changes are necessary and align with the project's goals.
  • No breaking changes introduced.

Optional: Deploy Notes

[Any instructions or notes related to deployment, if applicable.]

/cc @mention_reviewer

@github-actions
Copy link

github-actions bot commented Oct 29, 2025

🤖 Claude AI Code Review

Last reviewed on: 14:23:47

Summary

This PR introduces a client-server RPC architecture for the LivePriceFetcher service, moves several manager components to daemon processes, implements a sync coordination mechanism using shared memory flags, adds message cooldown to Slack notifications, and enhances health monitoring. The changes aim to improve process isolation, prevent data corruption during sync operations, and reduce notification spam.

✅ Strengths

  1. Process Isolation: Moving LivePriceFetcher to an RPC server isolates potential crashes and makes the service more resilient
  2. Sync Coordination: The sync_in_progress flag and sync_epoch counter provide a solid foundation for preventing race conditions during position syncing
  3. Health Monitoring: The automatic health checking and restart mechanism for LivePriceFetcher is well-designed
  4. Error Handling: Comprehensive error handling with Slack notifications throughout daemon processes
  5. Notification Rate Limiting: The Slack cooldown mechanism (5 minutes) prevents spam effectively
  6. Process Naming: Using setproctitle makes daemon processes easily identifiable

⚠️ Concerns

CRITICAL ISSUES

  1. Generator Not Materialized in polygon_data_service.py (Lines 812, 817)

    return list(_fetch_raw_polygon_aggs())
    • Converting generator to list materializes all data in memory
    • Could cause OOM issues for large datasets
    • Suggestion: Profile memory usage or implement pagination/streaming
  2. Missing newline at EOF in slack_notifier.py (Line 618)

    self.message_cooldown_lock = threading.Lock()
    • Missing newline at end of file (minor but breaks conventions)
  3. Race Condition in Health Check (live_price_fetcher.py, Lines 265-279)

    if not self._server_process.is_alive():
        raise RuntimeError("Health checker process failed to start")
    • Comment states checking is_alive() doesn't work cross-process, but code still uses it
    • The health check logic contradicts its own comments about cross-process limitations
    • Suggestion: Remove is_alive() checks or clarify the cross-process behavior
  4. Epoch Validation Logic Inconsistency (multiple files)

    • Some managers check epoch before save, others don't validate consistently
    • Example in challengeperiod_manager.py (Line 757): checks hasattr for _current_iteration_epoch
    • This attribute is set locally in refresh() but may not exist if called directly
    • Risk: Silent failures or inconsistent data protection
  5. Lock Recreation After Unpickling (slack_notifier.py, Lines 613-619)

    def __setstate__(self, state):
        self.__dict__.update(state)
        self.daily_summary_lock = threading.Lock()
        self.message_cooldown_lock = threading.Lock()
    • Recreating locks after unpickling is correct but dangerous if any thread was holding the lock
    • No documentation warns users about serialization implications
  6. Potential Deadlock in Auto Sync (auto_sync.py, Lines 82-115)

    with self.signal_sync_lock:
        while self.n_orders_being_processed[0] > 0:
            self.signal_sync_condition.wait()
    • If an exception occurs between setting sync_in_progress and the finally block, other processes wait forever
    • The finally block does clear the flag, but complex control flow increases risk
    • Suggestion: Add timeout to the condition wait
  7. No Validation of RPC Connection State

    • LivePriceFetcherClient.__getattr__ blindly proxies calls to self._client
    • If connection dies after initial setup, calls will fail with unclear errors
    • Suggestion: Add connection state validation or retry logic
  8. Hardcoded Sleep Durations Throughout

    • time.sleep(0.1), time.sleep(1), time.sleep(60) scattered everywhere
    • Makes testing slow and behavior unpredictable
    • Suggestion: Extract as constants or configuration

DESIGN CONCERNS

  1. Position Locks Not Using Manager in All Cases (position_lock.py, Lines 10-16)

    def get_new_lock(self):
        if self.is_backtesting:
            return Lock()
        elif self.ipc_manager:
            return self.ipc_manager.Lock()
        else:
            return MPLock()
    • Fallback to MPLock() when no manager provided is risky
    • MPLock doesn't work well across fork() boundaries in modern Python
    • Risk: Locks may not actually synchronize across processes
  2. LivePriceFetcher Initialization Complexity

    • Client creates server process, waits 2s, then connects
    • No graceful handling if server takes longer than 2s to start
    • Server crash during startup results in unclear errors
    • Suggestion: Use proper synchronization (e.g., Event) to signal when server is ready
  3. Test Changes Insufficient (test_auto_sync.py, Line 1595)

    mock_secrets.return_value = {'ms': 'mothership_secret', 'polygon_apikey': "", 'tiingo_apikey': ""}
    • Added empty API keys to fix test, but doesn't test the new RPC architecture
    • No tests for the health checker, sync epoch logic, or daemon processes
    • Risk: Major refactoring with minimal test coverage increases regression risk
  4. Multiple Sources of Truth for "is market open"

    • Before: live_price_fetcher.polygon_data_service.is_market_open(tp)
    • After: live_price_fetcher.is_market_open(tp) (proxies to polygon)
    • Inconsistent usage patterns remain in codebase
    • Suggestion: Audit and standardize all market open checks

💡 Suggestions

HIGH PRIORITY

  1. Add Connection Pooling/Retry Logic for RPC

    def __getattr__(self, name):
        max_retries = 3
        for attempt in range(max_retries):
            try:
                return getattr(self._client, name)
            except ConnectionError:
                if attempt < max_retries - 1:
                    self.connect()  # Reconnect
                else:
                    raise
  2. Add Timeout to Condition Waits

    # In auto_sync.py
    if not self.signal_sync_condition.wait(timeout=300):  # 5 min timeout
        bt.logging.error("Timeout waiting for orders to complete")
        raise TimeoutError("Sync timeout")
  3. Validate Epoch Logic More Strictly

    # Add a required_epoch parameter to all save methods
    def _write_challengeperiod_from_memory_to_disk(self, required_epoch=None):
        if required_epoch is not None and self.sync_epoch.value != required_epoch:
            bt.logging.warning(f"Epoch mismatch, aborting save")
            return
  4. Add Health Metrics to RPC Server

    def health_check(self) -> dict:
        return {
            "status": "ok",
            "timestamp_ms": current_time_ms,
            "is_backtesting": self.is_backtesting,
            "websocket_connected": self.polygon_data_service.ws_connected,  # Add diagnostics
            "last_price_update_ms": self.last_price_fetch_time_ms
        }

MEDIUM PRIORITY

  1. Extract Magic Numbers to Configuration

    # Create a config class
    class DaemonConfig:
        HEALTH_CHECK_INTERVAL_MS = 60 * 1000
        HEALTH_CHECK_FAILURE_THRESHOLD = 3
        SERVER_START_TIMEOUT_S = 2
        RETRY_SLEEP_S = 10
  2. Add Metrics/Telemetry

    • Track RPC call latency
    • Monitor health check success rate
    • Count sync epoch conflicts
    • Log process restart frequency
  3. Improve Error Messages

    # Instead of generic errors:
    raise RuntimeError("Health checker process failed to start")
    # Provide actionable information:
    raise RuntimeError(
        f"Health checker process failed to start within {timeout}s. "
        f"Check logs for startup errors. PID was {self.health_checker_process.pid}"
    )
  4. Document RPC Architecture

    • Add architecture diagram showing client-server relationship
    • Document failure modes and recovery mechanisms
    • Explain sync epoch coordination protocol
    • Add troubleshooting guide for common issues

🔒 Security Notes

  1. Authkey Generation (live_price_fetcher.py, Line 92)

    self._authkey = str(uuid.uuid4()).encode()
    • Good: Uses UUID for authkey, preventing unauthorized RPC access
    • ⚠️ Concern: No validation that authkey hasn't been compromised
    • Suggestion: Consider rotating authkey periodically or on health check failures
  2. Slack Webhook Exposure

    • Slack webhook URLs passed through multiple processes
    • Consider if sensitive notifications could leak information
    • ✅ Cooldown mechanism prevents spam/DoS of Slack webhooks
  3. Process Isolation

    • Good: Daemon processes can't directly corrupt validator state
    • ⚠️ Concern: Shared memory through Manager could still be corrupted
    • Suggestion: Add checksums or validation when reading shared state
  4. Error Message Information Disclosure

    • Tracebacks sent to Slack may contain sensitive information
    • Review ErrorUtils.format_error_for_slack() to ensure no leakage
    • Consider sanitizing paths, API keys, or internal details

🧪 Testing Recommendations

  1. Add Integration Tests for RPC

    • Test server crash and automatic restart
    • Test client reconnection after server restart
    • Test concurrent RPC calls from multiple processes
  2. Test Sync Epoch Coordination

    • Test that stale writes are properly rejected
    • Test race condition where sync starts during iteration
    • Test epoch counter overflow (unlikely but possible)
  3. Test Lock Behavior Across Processes

    • Verify Manager locks actually synchronize across processes
    • Test lock behavior after fork()
    • Test lock cleanup when process crashes
  4. Test Daemon Process Lifecycle

    • Test graceful shutdown of all daemon processes
    • Test restart behavior under various failure modes
    • Test memory leaks in long-running daemons

📝 Documentation Improvements Needed

  1. Add RPC Architecture Documentation

    • Why was this architecture chosen?
    • How does the health check work?
    • What happens during server restart?
  2. Document Sync Coordination Protocol

    • Explain sync_in_progress flag usage
    • Explain sync_epoch counter purpose
    • Document ordering requirements (flag before epoch increment)
  3. Update Deployment Guide

    • Document new daemon processes
    • Explain monitoring requirements
    • Add troubleshooting section for RPC issues
  4. Add Inline Comments for Complex Logic

    • The epoch validation logic needs more explanation
    • The lock acquisition order in auto_sync needs documentation
    • The health check retry logic needs clarification

🎯 Overall Assessment

This is a significant architectural improvement that addresses real concurrency and reliability issues. However, the PR introduces considerable complexity with multiple daemon processes, RPC communication, and shared memory coordination.

Before merging:

  • ✅ The epoch-based validation is a clever solution to data corruption
  • ⚠️ Test coverage is insufficient for the complexity introduced
  • ⚠️ Several edge cases (timeouts, connection failures, process crashes) need hardening
  • ⚠️ Documentation must be added to prevent future maintainability issues

Recommendation: Request additional testing and documentation before approval. Consider a phased rollout with careful monitoring of daemon process health and RPC performance.

@ward-taoshi ward-taoshi force-pushed the refactor/price-fetcher branch from 6aef91b to 27cfc21 Compare November 3, 2025 17:30
Copy link
Collaborator

@jbonilla-tao jbonilla-tao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One open question i have is if we should allow consumers to use live price fetcher directly as it was before if they are only making REST requests (perf ledger manager, emissions ledger manager, backtest manager) and then for consumers like validator.py (order fill), mdd checker and limit orders manager, we have them use RPC

@ward-taoshi ward-taoshi force-pushed the refactor/price-fetcher branch from 3678600 to 3d27793 Compare November 5, 2025 19:14
self.contract_manager = contract_manager
self.cached_miner_account_sizes = {} # Deepcopy of contract_manager.miner_account_sizes
self.cache_last_refreshed_date = None # 'YYYY-MM-DD' format, refresh daily
self.pds = live_price_fetcher.polygon_data_service if live_price_fetcher else None # Load it later once the process starts so ipc works.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ward-taoshi why do we need this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RPC client couldn't directly reference objects, so the methods used through polygon had to be wrapped as a pass method.

@jbonilla-tao jbonilla-tao force-pushed the refactor/price-fetcher branch from 161e8ef to 76a6ff0 Compare November 8, 2025 00:31
ward-taoshi and others added 18 commits November 8, 2025 02:28
- Update PositionLocks to use ipc_manager for cross-process synchronization
- Add run_update_loop to MDDChecker for daemon process execution
- Pass position_locks to MDDChecker via constructor
- Start MDD checker as separate process in validator initialization
- Remove mdd_check call from main validator loop
- Add MDD checker process to graceful shutdown sequence

This refactor prevents the MDD checker from blocking the main validator loop
while maintaining proper lock synchronization across processes.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
- Update EliminationManager.__init__ to accept position_locks parameter
- Add run_update_loop to EliminationManager for daemon process execution
- Set position_locks on elimination_manager after creation in validator
- Start elimination manager as separate process in validator initialization
- Remove process_eliminations call from main validator loop
- Add elimination manager process to graceful shutdown sequence
- Renumber initialization steps from 11 to 12

This refactor prevents the elimination manager from blocking the main validator
loop while maintaining proper lock synchronization across processes.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Similar to MDDChecker and EliminationManager, move ChallengePeriodManager
to run in its own process to prevent blocking the main validator loop.

Changes:
- Added shutdown_dict parameter to ChallengePeriodManager.__init__
- Updated refresh() to accept optional current_time parameter with
  default using TimeUtil.now_in_millis()
- Added run_update_loop() method with setproctitle, 1-second sleep,
  and error handling
- Started challengeperiod_manager_process as daemon Process in validator
- Removed challengeperiod_manager.refresh() call from main loop
- Added challengeperiod_manager_process.join() to shutdown sequence
- Renumbered initialization steps from 12 to 13

This improves validator performance by preventing blocking operations
during challenge period evaluations.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Implement comprehensive synchronization mechanism for daemon processes:

1. sync_in_progress flag (IPC Value):
   - Blocks new iterations from starting when auto sync runs
   - Set BEFORE epoch increment to prevent race condition

2. sync_epoch counter (IPC Value):
   - Incremented each time auto sync runs
   - Managers capture at START of iteration
   - Validated BEFORE save to detect stale data
   - If epoch changed, abort save to prevent corruption

3. Deadlock prevention:
   - Wrapped all operations in try/finally
   - Guarantees sync_in_progress flag is always cleared

Updated components:
- PositionSyncer: Set flag then increment epoch, with try/finally
- MDDChecker: Check flag and validate epoch before save
- EliminationManager: Check flag and validate epoch before save
- ChallengePeriodManager: Check flag and validate epoch before save
- All managers: Added Slack error notifications

This replaces threading-based signal_sync_lock/condition which don't work
across processes. Two-phase protection (flag + epoch) ensures data integrity
by preventing both new iterations from starting AND invalidating in-flight
iterations that could write stale data.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Refactored daemon classes to be nested within their respective parent classes:

Changes to live_price_fetcher.py:
- Added LivePriceFetcherClient.HealthChecker as nested class
- Daemon monitors LivePriceFetcher server health every 60 seconds
- Includes Slack error logging and graceful shutdown handling

Changes to price_slippage_model.py:
- Added PriceSlippageModel.FeatureRefresher as nested class
- Daemon refreshes daily slippage features every 10 minutes
- Added `import time` for sleep() calls
- Includes Slack error logging and graceful shutdown handling

Changes to validator.py:
- Updated to use LivePriceFetcherClient.HealthChecker instead of standalone class
- Updated to use PriceSlippageModel.FeatureRefresher instead of standalone class
- Removed import of deleted health_checker module

Benefits:
- Better encapsulation - daemons are co-located with their parent classes
- Clearer code organization and ownership
- No standalone health_checker.py module needed

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Remove is_alive() check from health_check() method to fix
"can only test a child process" error. The RPC health check
is sufficient to detect server failures.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
- Add message_cooldown_seconds (default 5 minutes)
- Track last send time per message type (based on first line)
- Suppress duplicate messages within cooldown period
- Add bypass_cooldown parameter for critical messages
- Log suppressed messages at debug level

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Issue #2: Remove is_alive() checks from LivePriceFetcher._start_server()
- Removed all is_alive() checks that would crash when called from HealthChecker daemon
- Use terminate() and kill() directly (safe to call on dead processes)
- Rely on connect() to verify server is working instead of process checks
- Simplified startup logic with 2s wait + connection attempt

Issue #4: Move timestamp update inside lock in auto_sync.py
- Moved last_signal_sync_time_ms update into finally block inside lock
- Prevents race condition where second sync starts before timestamp updates
- Ensures timestamp is always updated even if sync fails

Issue #5: Remove unused shutdown_dict logic from daemons
- Removed shutdown_dict parameter from HealthChecker.__init__()
- Removed shutdown_dict parameter from FeatureRefresher.__init__()
- Changed while loops from "while not self.shutdown_dict" to "while True"
- Daemons run indefinitely and terminate when main process exits (daemon=True)
- Updated validator.py to not pass shutdown_dict to daemon initializations

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
@jbonilla-tao jbonilla-tao force-pushed the refactor/price-fetcher branch from c1011bd to 9377d82 Compare November 8, 2025 07:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants