Skip to content

fix(rpc): minimise RPC usage with comprehensive caching and streaming…#269

Merged
ca333 merged 4 commits intomainfrom
dev
Oct 30, 2025
Merged

fix(rpc): minimise RPC usage with comprehensive caching and streaming…#269
ca333 merged 4 commits intomainfrom
dev

Conversation

@ca333
Copy link
Copy Markdown
Contributor

@ca333 ca333 commented Oct 29, 2025

… support (#262)

  • feat(pubkeys): persist AssetPubkeys across sessions using Hive TypeAdapters; hydrate on cold start\n\n- Add Hive adapters for stored pubkeys\n- Persist on fetch, hydrate before first RPC\n- Align balance polling to 60s and integrate with tx watcher\n\nBREAKING CHANGE: none

  • chore(format): run dart format on pubkey persistence and balance manager files

  • perf(sdk): dedupe pubkeys/address fetch, cache-first hydrate; throttle health checks; cache wallet names (#3238)

  • test(local-auth): add ensureKdfHealthy to FakeAuthService for Trezor tests

  • Refactor: Wallet-aware pubkey persistence and retrieval

This change ensures that pubkey data is correctly associated with the active wallet, preventing cross-wallet contamination. It also improves the accuracy of KDF health checks by bypassing cached user data.

  • Refactor: Improve pubkey and balance fetching logic

  • fix: market data resource improvements

  • perf(assets): cache activated assets and coalesce activation checks

  • Wire SDK ActivatedAssetsCache into activation/coins flows: updates across CoinsBloc, AssetOverviewBloc, custom token import, and sdk_auth_activation_extension to reuse activation state instead of re-querying.
  • Debounce/polish polling in portfolio_growth_bloc and profit_loss_bloc to prevent overlapping requests.
  • Remove duplicate activation/balance checks in maker/taker validators and forms.
  • Consolidate repeated calls in mm2_api/mm2_api_nft/rpc_native; prefer cached values.
  • Reduce startup RPCs in app_bootstrapper; stop background timers in window_close_handler on app close to avoid trailing calls.
  • Add shared intervals in shared/constants; introduce lib/shared/utils/activated_assets_cache.dart for app-specific helpers.
  • No UI changes; measurable reduction in RPC volume and improved responsiveness.

Refs #3238

  • feat(streaming): add typed stream RPCs and web SharedWorker integration; expose streaming API in framework and rpc methods

  • feat(web): package event_streaming_worker.js in framework assets and load via package path

  • fix(web): correct SharedWorker path to package asset under assets/web/

  • refactor(streaming): improve type safety with sealed event classes

  • Replace string-based event types with sealed class hierarchy
  • Create typed event classes for all stream types (Balance, Orderbook, Network, Heartbeat, SwapStatus, OrderStatus, TxHistory, ShutdownSignal)
  • Use private enum for internal string mapping while exposing typed API
  • Make StreamEnableResponse generic to link responses to event types
  • Update event_streaming_service with type-safe filtering methods
  • Organize events into separate files using part directives
  • Enable exhaustive pattern matching with switch expressions

Benefits:

  • Compile-time type safety eliminates string-based checks
  • Better IDE support with autocomplete and type hints
  • Reduced runtime errors from type mismatches
  • Clearer public API with explicit event types
  • feat(sdk): add internal event streaming manager with lifecycle management
  • Create EventStreamingManager in komodo_defi_sdk package
  • Implement automatic stream lifecycle handling (enable/disable)
  • Add reference counting for shared stream subscriptions
  • Support all event types: balance, orderbook, tx history, swap status, order status, network, heartbeat, and shutdown signals
  • Reduce boilerplate with generic _subscribeToStream method using template method pattern
  • Register manager in DI container for internal use by domain managers
  • Manager is not publicly exposed, intended for use by domain-specific managers to provide real-time updates
  • perf: eliminate RPC polling by using event streaming

Replace periodic polling with real-time event streaming in BalanceManager and TransactionHistoryManager to reduce RPC spam and improve efficiency.

Changes:

  • BalanceManager: Replace 1-minute polling interval with balance event streaming
  • TransactionHistoryManager: Replace balance-driven polling with TX history event streaming
  • Bootstrap: Inject EventStreamingManager into both managers
  • Remove polling configuration (intervals, retry counters, debug flags)
  • Fix shouldEnableTransactionHistory to always return true for streaming support

Benefits:

  • Eliminates periodic RPC calls every 60 seconds
  • Real-time updates instead of up to 1-minute delays
  • Better resource utilization (updates only when data changes)
  • Automatic reconnection and error handling via EventStreamingManager

Refs: #3238

  • fix(cache): address PR review issues - error handling and race conditions
  • Health check: log transient RPC failures instead of triggering false sign-outs
  • ActivatedAssetsCache: fix race condition using generation counter and Completer pattern
  • NFT activation: aggregate and report all errors instead of only the last one
  • Auth service: document 5-minute user cache staleness trade-off

Refs: #262

  • chore: add event streaming logging

  • fix(rpc): address critical review feedback on caching implementation

  • Fix incorrect unawaited() usage in pubkey_manager by properly extracting Future
  • Add eagerError: false to event_streaming_manager dispose for robust cleanup
  • Replace unsafe String cast with whereType() in pubkeys_storage
  • Add race condition check in transaction_history_manager _startStreaming
  • Capture timestamp at fetch start in activated_assets_cache for accurate TTL
  • Add error handling to sparkline_repository dispose to ensure all cleanup
  • perf(auth): use shutdown event streaming to minimize RPC polling

Subscribe to KDF shutdown signals to immediately detect when KDF shuts down, eliminating the need for frequent polling. This provides near-instant shutdown detection (< 1 second) compared to periodic health checks.

  • Add shutdown signal subscription in KdfAuthService
  • Subscribe to shutdown events and immediately update auth state
  • Enable shutdown signal stream via RPC on initialization
  • Clean up subscription on dispose
  • Health checks now serve as backup for edge cases

Benefits:

  • Reduces getWalletNames RPC calls significantly
  • Provides instant user sign-out on KDF shutdown
  • Maintains graceful degradation if streaming unavailable
  • feat(rpc): optimize initial balance/history for newly created wallets
  • Assume zero balance for first-time asset enablement in new wallets
  • Assume empty transaction history for first-time asset enablement in new wallets
  • Detect new wallets by absence of any asset activation history
  • Avoids unnecessary RPC spam when activating first assets in new wallets
  • Does NOT apply to imported wallets (they have activation history)
  • Uses AssetHistoryStorage to track which assets have been enabled per wallet
  • Wire up shared AssetHistoryStorage instance in SDK bootstrap
  • fix(auth): track imported vs created wallets to prevent incorrect optimizations
  • Add 'isImported' metadata to KdfUser during registration
  • Pass mnemonic presence to _registerNewUser to determine if imported
  • Update balance/history managers to check isImported flag
  • Prevents incorrectly assuming zero balance for imported wallets
  • Optimization now only applies to genuinely new wallets (not imported)

BREAKING: Imported wallets will now correctly fetch real balances/history on first use instead of incorrectly showing zero

  • fix: remove errors from KDF merge

  • chore: roll coins

  • fix: misc streaming fixes

  • feat(sdk): add event streaming support for task status updates

  • Add event streaming service and configuration
  • Implement task event handling and unknown event fallback
  • Add RPC task shepherd method for task status monitoring
  • Update balance manager to support event-driven updates
  • Add platform-specific event streaming implementations
  • Enhance sync status with event streaming capabilities

This reduces RPC polling by leveraging KDF event streaming for task status updates.

  • fix(activation): force cache refresh when verifying asset availability

The _waitForCoinAvailability method was failing to verify asset activation because isAssetActive() was using cached data instead of fetching fresh data from the backend. This caused transaction history to fail with a connection error even though assets were successfully activated.

Changes:

  • Add forceRefresh parameter to ActivationManager.isAssetActive()
  • Update SharedActivationCoordinator._waitForCoinAvailability() to force refresh on each availability check
  • This ensures we bypass the 2-second cache TTL and get real-time data

Fixes issue where transaction history shows 'Connection to Komodo servers are failing' error after asset activation completes successfully.

  • fix(streaming): use asset config ID instead of display name for event subscriptions

The balance and transaction history event subscriptions were using asset.id.name (the human-friendly display name like 'Bitcoin') instead of asset.id.id (the config ID like 'BTC-segwit'). This caused the RPC enable streaming calls to fail because the backend expects the config ID.

Changes:

  • BalanceManager: Use assetId.id instead of assetId.name for subscribeToBalance
  • TransactionHistoryManager: Use asset.id.id instead of asset.id.name for subscribeToTxHistory
  • Update event filtering to match using config ID as well

This fixes the 'Failed to start balance watcher' errors and resolves the transaction history connection error.

  • perf: reduce RPC spam in activation strategies and managers

  • fix(auth-service): update cache alongside storage for metadata updates

  • chore: roll KDF to release preview

Roll KDF to the dev branch version which will be used for KW release.

  • fix(stream): normalize KDF stream _type parsing for suffixed IDs; format modified files

  • fix: use correct streaming worker js path

  • fix: tx history streaming

  • fix: improve robustness of event parsing

  • fix: backwards compatibility for KDF API status

  • Refactor: Improve transaction comparison logic (Investigate transaction storage stack overflow error #268)

This change refactors the transaction comparison logic in InMemoryTransactionStorage to ensure that all transactions are available during comparison. This prevents potential exceptions and improves the stability of the transaction history storage.


Summary by CodeRabbit

  • New Features

    • Real-time event streaming: SSE/web-worker support, typed streaming APIs, streaming manager and client-side service; subscribe to balance, orderbook, tx history, swap/order status, network, heartbeat, shutdown signals.
    • NFT activation service and activation helpers.
    • Activated-assets cache and pubkey persistence for faster startup and fewer network calls.
  • Chores

    • Added web worker asset and build/config metadata updates.

… support (#262)

* feat(pubkeys): persist AssetPubkeys across sessions using Hive TypeAdapters; hydrate on cold start\n\n- Add Hive adapters for stored pubkeys\n- Persist on fetch, hydrate before first RPC\n- Align balance polling to 60s and integrate with tx watcher\n\nBREAKING CHANGE: none

* chore(format): run dart format on pubkey persistence and balance manager files

* perf(sdk): dedupe pubkeys/address fetch, cache-first hydrate; throttle health checks; cache wallet names (#3238)

* test(local-auth): add ensureKdfHealthy to FakeAuthService for Trezor tests

* Refactor: Wallet-aware pubkey persistence and retrieval

This change ensures that pubkey data is correctly associated with the active wallet, preventing cross-wallet contamination. It also improves the accuracy of KDF health checks by bypassing cached user data.

Co-authored-by: charl <charl@vanstaden.info>

* Refactor: Improve pubkey and balance fetching logic

Co-authored-by: charl <charl@vanstaden.info>

* fix: market data resource improvements

* perf(assets): cache activated assets and coalesce activation checks

- Wire SDK `ActivatedAssetsCache` into activation/coins flows:
  updates across `CoinsBloc`, `AssetOverviewBloc`, custom token import, and
  `sdk_auth_activation_extension` to reuse activation state instead of re-querying.
- Debounce/polish polling in `portfolio_growth_bloc` and `profit_loss_bloc`
  to prevent overlapping requests.
- Remove duplicate activation/balance checks in maker/taker validators and forms.
- Consolidate repeated calls in `mm2_api`/`mm2_api_nft`/`rpc_native`; prefer cached values.
- Reduce startup RPCs in `app_bootstrapper`; stop background timers in
  `window_close_handler` on app close to avoid trailing calls.
- Add shared intervals in `shared/constants`; introduce
  `lib/shared/utils/activated_assets_cache.dart` for app-specific helpers.
- No UI changes; measurable reduction in RPC volume and improved responsiveness.

Refs #3238

* feat(streaming): add typed stream RPCs and web SharedWorker integration; expose streaming API in framework and rpc methods

* feat(web): package event_streaming_worker.js in framework assets and load via package path

* fix(web): correct SharedWorker path to package asset under assets/web/

* refactor(streaming): improve type safety with sealed event classes

- Replace string-based event types with sealed class hierarchy
- Create typed event classes for all stream types (Balance, Orderbook, Network, Heartbeat, SwapStatus, OrderStatus, TxHistory, ShutdownSignal)
- Use private enum for internal string mapping while exposing typed API
- Make StreamEnableResponse generic to link responses to event types
- Update event_streaming_service with type-safe filtering methods
- Organize events into separate files using part directives
- Enable exhaustive pattern matching with switch expressions

Benefits:
- Compile-time type safety eliminates string-based checks
- Better IDE support with autocomplete and type hints
- Reduced runtime errors from type mismatches
- Clearer public API with explicit event types

* feat(sdk): add internal event streaming manager with lifecycle management

- Create EventStreamingManager in komodo_defi_sdk package
- Implement automatic stream lifecycle handling (enable/disable)
- Add reference counting for shared stream subscriptions
- Support all event types: balance, orderbook, tx history, swap status, order status, network, heartbeat, and shutdown signals
- Reduce boilerplate with generic _subscribeToStream method using template method pattern
- Register manager in DI container for internal use by domain managers
- Manager is not publicly exposed, intended for use by domain-specific managers to provide real-time updates

* perf: eliminate RPC polling by using event streaming

Replace periodic polling with real-time event streaming in BalanceManager
and TransactionHistoryManager to reduce RPC spam and improve efficiency.

Changes:
- BalanceManager: Replace 1-minute polling interval with balance event streaming
- TransactionHistoryManager: Replace balance-driven polling with TX history event streaming
- Bootstrap: Inject EventStreamingManager into both managers
- Remove polling configuration (intervals, retry counters, debug flags)
- Fix shouldEnableTransactionHistory to always return true for streaming support

Benefits:
- Eliminates periodic RPC calls every 60 seconds
- Real-time updates instead of up to 1-minute delays
- Better resource utilization (updates only when data changes)
- Automatic reconnection and error handling via EventStreamingManager

Refs: #3238

* fix(cache): address PR review issues - error handling and race conditions

- Health check: log transient RPC failures instead of triggering false sign-outs
- ActivatedAssetsCache: fix race condition using generation counter and Completer pattern
- NFT activation: aggregate and report all errors instead of only the last one
- Auth service: document 5-minute user cache staleness trade-off

Refs: #262

* chore: add event streaming logging

* fix(rpc): address critical review feedback on caching implementation

- Fix incorrect unawaited() usage in pubkey_manager by properly extracting Future
- Add eagerError: false to event_streaming_manager dispose for robust cleanup
- Replace unsafe String cast with whereType<String>() in pubkeys_storage
- Add race condition check in transaction_history_manager _startStreaming
- Capture timestamp at fetch start in activated_assets_cache for accurate TTL
- Add error handling to sparkline_repository dispose to ensure all cleanup

* perf(auth): use shutdown event streaming to minimize RPC polling

Subscribe to KDF shutdown signals to immediately detect when KDF shuts down,
eliminating the need for frequent polling. This provides near-instant shutdown
detection (< 1 second) compared to periodic health checks.

- Add shutdown signal subscription in KdfAuthService
- Subscribe to shutdown events and immediately update auth state
- Enable shutdown signal stream via RPC on initialization
- Clean up subscription on dispose
- Health checks now serve as backup for edge cases

Benefits:
- Reduces getWalletNames RPC calls significantly
- Provides instant user sign-out on KDF shutdown
- Maintains graceful degradation if streaming unavailable

* feat(rpc): optimize initial balance/history for newly created wallets

- Assume zero balance for first-time asset enablement in new wallets
- Assume empty transaction history for first-time asset enablement in new wallets
- Detect new wallets by absence of any asset activation history
- Avoids unnecessary RPC spam when activating first assets in new wallets
- Does NOT apply to imported wallets (they have activation history)
- Uses AssetHistoryStorage to track which assets have been enabled per wallet
- Wire up shared AssetHistoryStorage instance in SDK bootstrap

* fix(auth): track imported vs created wallets to prevent incorrect optimizations

- Add 'isImported' metadata to KdfUser during registration
- Pass mnemonic presence to _registerNewUser to determine if imported
- Update balance/history managers to check isImported flag
- Prevents incorrectly assuming zero balance for imported wallets
- Optimization now only applies to genuinely new wallets (not imported)

BREAKING: Imported wallets will now correctly fetch real balances/history
on first use instead of incorrectly showing zero

* fix: remove errors from KDF merge

* chore: roll `coins`

* fix: misc streaming fixes

* feat(sdk): add event streaming support for task status updates

- Add event streaming service and configuration
- Implement task event handling and unknown event fallback
- Add RPC task shepherd method for task status monitoring
- Update balance manager to support event-driven updates
- Add platform-specific event streaming implementations
- Enhance sync status with event streaming capabilities

This reduces RPC polling by leveraging KDF event streaming for task status updates.

* fix(activation): force cache refresh when verifying asset availability

The _waitForCoinAvailability method was failing to verify asset activation
because isAssetActive() was using cached data instead of fetching fresh data
from the backend. This caused transaction history to fail with a connection
error even though assets were successfully activated.

Changes:
- Add forceRefresh parameter to ActivationManager.isAssetActive()
- Update SharedActivationCoordinator._waitForCoinAvailability() to force
  refresh on each availability check
- This ensures we bypass the 2-second cache TTL and get real-time data

Fixes issue where transaction history shows 'Connection to Komodo servers
are failing' error after asset activation completes successfully.

* fix(streaming): use asset config ID instead of display name for event subscriptions

The balance and transaction history event subscriptions were using
asset.id.name (the human-friendly display name like 'Bitcoin') instead of
asset.id.id (the config ID like 'BTC-segwit'). This caused the RPC enable
streaming calls to fail because the backend expects the config ID.

Changes:
- BalanceManager: Use assetId.id instead of assetId.name for subscribeToBalance
- TransactionHistoryManager: Use asset.id.id instead of asset.id.name for subscribeToTxHistory
- Update event filtering to match using config ID as well

This fixes the 'Failed to start balance watcher' errors and resolves the
transaction history connection error.

* perf: reduce RPC spam in activation strategies and managers

* fix(auth-service): update cache alongside storage for metadata updates

* chore: roll KDF to release preview

Roll KDF to the `dev` branch version which will be used for KW release.

* fix(stream): normalize KDF stream _type parsing for suffixed IDs; format modified files

* fix: use correct streaming worker js path

* fix: tx history streaming

* fix: improve robustness of event parsing

* fix: backwards compatibility for KDF API status

* Refactor: Improve transaction comparison logic (#268)

This change refactors the transaction comparison logic in `InMemoryTransactionStorage` to ensure that all transactions are available during comparison. This prevents potential exceptions and improves the stability of the transaction history storage.

Co-authored-by: Cursor Agent <cursoragent@cursor.com>

---------

Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: Francois <takenagain@users.noreply.github.com>
Copilot AI review requested due to automatic review settings October 29, 2025 14:15
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Oct 29, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds a new event-streaming subsystem (platform IO/web/stub, service, typed events, RPC surface, manager) and integrates streaming into SDK managers; introduces ActivatedAssetsCache, pubkeys Hive persistence, NFT activation, disposal/lifecycle hooks, auth caching/shutdown handling, and various bootstrap/config updates.

Changes

Cohort / File(s) Summary
Event streaming core & platform
packages/komodo_defi_framework/lib/src/streaming/event_streaming_platform_io.dart, packages/komodo_defi_framework/lib/src/streaming/event_streaming_platform_web.dart, packages/komodo_defi_framework/lib/src/streaming/event_streaming_platform_stub.dart, packages/komodo_defi_framework/lib/src/streaming/event_streaming_service.dart, packages/komodo_defi_framework/assets/web/event_streaming_worker.js, packages/komodo_defi_framework/pubspec.yaml
Add platform IO/web/stub implementations, SharedWorker script, asset registration, and KdfEventStreamingService facade (connect, parse, lifecycle, dispose).
Event types & parsing
packages/komodo_defi_framework/lib/src/streaming/events/*
Introduce sealed KdfEvent system and concrete event classes (Balance, Heartbeat, Network, Orderbook, OrderStatus, SwapStatus, Task, TxHistory, ShutdownSignal, Unknown) with fromJson dispatch and normalization.
Streaming RPC surface
packages/komodo_defi_rpc_methods/lib/src/rpc_methods/streaming/*.dart, .../streaming_rpc_namespace.dart, .../rpc_methods_library.dart
Add streaming RPC request/response types, StreamConfig, topic-specific enable/disable requests, and expose StreamingMethodsNamespace on the RPC client.
Framework config & integration
packages/komodo_defi_framework/lib/src/config/event_streaming_config.dart, packages/komodo_defi_framework/lib/src/config/kdf_startup_config.dart, packages/komodo_defi_framework/lib/komodo_defi_framework.dart
Add EventStreamingConfiguration, wire it into KdfStartupConfig, and expose streaming getter on framework public API.
EventStreamingManager & SDK wiring
packages/komodo_defi_sdk/lib/src/streaming/event_streaming_manager.dart, packages/komodo_defi_sdk/lib/src/bootstrap.dart, packages/komodo_defi_sdk/lib/komodo_defi_sdk.dart
New EventStreamingManager for ref-counted subscriptions; DI/bootstrap updated to register/dispose manager; SDK exposes streaming and activatedAssetsCache/nftActivation getters and threshold helpers.
Activated assets cache & integration
packages/komodo_defi_sdk/lib/src/assets/activated_assets_cache.dart, packages/komodo_defi_sdk/lib/src/assets/asset_manager.dart, packages/komodo_defi_sdk/lib/src/sdk/komodo_defi_sdk_config.dart, packages/komodo_defi_sdk/lib/src/assets/_assets_index.dart
Add ActivatedAssetsCache with TTL, generation invalidation and dedupe; integrate into AssetManager, ActivationManager, and SDK config wiring.
Balance & transaction history streaming
packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart, packages/komodo_defi_sdk/lib/src/transaction_history/transaction_history_manager.dart, packages/komodo_defi_sdk/lib/src/transaction_history/transaction_history_strategies.dart, packages/komodo_defi_sdk/lib/src/transaction_history/strategies/*.dart, packages/komodo_defi_sdk/lib/src/transaction_history/transaction_storage.dart
Wire BalanceManager and TransactionHistoryManager to EventStreamingManager with streaming-first paths and polling fallbacks; add timers, confirmation refresh, activation-aware flows, and more robust transaction storage ordering.
Pubkeys persistence & manager
packages/komodo_defi_sdk/lib/src/pubkeys/hive_pubkeys_adapters.dart, packages/komodo_defi_sdk/lib/src/pubkeys/pubkeys_storage.dart, packages/komodo_defi_sdk/lib/src/pubkeys/pubkey_manager.dart
Add Hive adapters and HivePubkeysStorage, PubkeysStorage interface, storage-backed hydration/persistence, and in-flight deduplication in PubkeyManager.
NFT activation & activation exceptions
packages/komodo_defi_sdk/lib/src/activation/nft_activation_service.dart, packages/komodo_defi_sdk/lib/src/activation/activation_exceptions.dart, packages/komodo_defi_sdk/lib/src/activation/activation_manager.dart, packages/komodo_defi_sdk/lib/src/activation/_activation_index.dart
Add NftActivationService (retry/backoff), ActivatedAssetsCache dependency, and a hierarchy of ActivationFailedException types; ActivationManager updated to accept cache and new isAssetActive(forceRefresh) option.
Repository disposal & tests
packages/komodo_cex_market_data/lib/..., packages/komodo_cex_market_data/test/..., packages/komodo_cex_market_data/lib/src/bootstrap/market_data_bootstrap.dart, packages/komodo_cex_market_data/lib/src/cex_repository.dart
Add CexRepository.dispose hook, implement dispose in repositories/providers (no-op or closing owned clients), add ownership flags, and register dispose callbacks in bootstrap; tests updated with no-op dispose implementations.
Auth enhancements & shutdown handling
packages/komodo_defi_local_auth/lib/src/auth/auth_service.dart, packages/komodo_defi_local_auth/lib/src/auth/auth_service_auth_extension.dart, packages/komodo_defi_local_auth/lib/src/auth/auth_service_operations_extension.dart
Add a 5-minute users cache with invalidation, isImported metadata on registration, shutdown-signal subscription handling immediate sign-out, adjusted health-check cadence/handling, and caching/invalidation helpers.
Pubspec, build, CLI, native cleanup, misc
packages/komodo_defi_framework/app_build/build_config.json, packages/komodo_coin_updates/lib/src/coins_config/_coins_config_index.dart, packages/komodo_defi_framework/lib/src/operations/kdf_operations_native.dart, packages/komodo_wallet_cli/bin/update_api_config.dart, packages/komodo_defi_framework/pubspec.yaml
Update build metadata and checksums, name a library, close native HTTP client in dispose, improve CLI checksum update logging/behavior, add web asset path and add flutter_client_sse dependency.

Sequence Diagram(s)

sequenceDiagram
    participant App
    participant SDK
    participant EventMgr as EventStreamingManager
    participant Service as KdfEventStreamingService
    participant Platform as Platform (IO/Web/Worker)
    participant KDF

    App->>SDK: initialize()
    SDK->>EventMgr: create(EventService)
    EventMgr->>Service: initialize/connectIfNeeded()
    Service->>Platform: connectEventStream(onMessage)
    Platform->>KDF: open SSE / SharedWorker registers
    KDF-->>Platform: send event (SSE message)
    Platform-->>Service: onMessage(raw)
    Service->>Service: parse -> KdfEvent
    Service-->>EventMgr: push typed event
    EventMgr-->>App: StreamSubscription emits (BalanceEvent, etc.)
Loading
sequenceDiagram
    participant BalanceMgr as BalanceManager
    participant Cache as ActivatedAssetsCache
    participant EventMgr as EventStreamingManager
    participant Service as KdfEventStreamingService

    BalanceMgr->>Cache: getActivatedAssetIds(forceRefresh?)
    Cache-->>BalanceMgr: returns set
    alt supportsBalanceStreaming
        BalanceMgr->>EventMgr: subscribeToBalance(coin)
        EventMgr->>Service: enable stream via RPC
        Service-->>EventMgr: BalanceEvent stream
        EventMgr-->>BalanceMgr: StreamSubscription<BalanceEvent>
        BalanceMgr-->>App: emit real-time balance updates
    else
        BalanceMgr->>BalanceMgr: start polling timer
        BalanceMgr-->>App: emit polled balance updates
    end
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

  • Areas needing focused review:
    • event_streaming_service, platform IO/web interop, and worker script (SSE parsing, first-byte handling, reconnection/backoff)
    • EventStreamingManager: ref-counting, in-flight enable dedupe, RPC enable/disable and error/UnknownClient paths
    • ActivatedAssetsCache: concurrency, generation invalidation, pending completer correctness and disposal
    • Pubkeys Hive adapters/storage: adapter typeIds, migration/back-compat, and stored data shape
    • BalanceManager & TransactionHistoryManager: streaming vs polling fallbacks, timers, subscription lifecycles, activation error propagation
    • Bootstrap/DI: constructor signature changes, ordering and dispose registration across singletons

Possibly related issues

Possibly related PRs

Suggested labels

bug, enhancement

Suggested reviewers

  • takenagain

"I hop through threads and parse each streaming line,
I close the sockets gently, tidy caches, all divine.
Workers hum like wind and events fall like rain,
I guard the state and tidy logs — a rabbit's small refrain.
Hooray for streams, disposes, and a clean deploy again!" 🐇✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The PR title "fix(rpc): minimise RPC usage with comprehensive caching and streaming…" directly corresponds to the primary objectives and changes in this changeset. The changeset implements multiple caching mechanisms (ActivatedAssetsCache, Hive-based PubkeyStorage, TTL-aware caching) and extensive event streaming infrastructure (KdfEventStreamingService, EventStreamingManager with typed subscriptions) specifically to reduce RPC volume, which aligns perfectly with the stated title focus. The title is concise, uses specific technical terminology (caching and streaming), avoids vague language, and clearly communicates the main intent to a developer scanning commit history.
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch dev

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces real-time event streaming capabilities to the SDK, replacing polling-based approaches with Server-Sent Events (SSE) for balance updates, transaction history, and other KDF events. The implementation includes fallback mechanisms to polling when streaming is unavailable, optimizations to reduce RPC spam, and improved activation logic.

  • Implements event streaming infrastructure with SSE support (native) and SharedWorker support (web)
  • Adds streaming-based balance and transaction history updates with automatic fallback to polling
  • Introduces activated assets cache with TTL to reduce repeated RPC calls
  • Optimizes new wallet behavior to reduce unnecessary RPC requests

Reviewed Changes

Copilot reviewed 80 out of 81 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
packages/komodo_defi_sdk/lib/src/streaming/event_streaming_manager.dart New internal manager for handling event stream lifecycle and subscriptions
packages/komodo_defi_sdk/lib/src/transaction_history/transaction_history_manager.dart Refactored to use streaming for transaction updates with polling fallback
packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart Updated to use streaming for balance updates with stale-guard timers
packages/komodo_defi_sdk/lib/src/assets/activated_assets_cache.dart New cache to reduce repeated get_enabled_coins RPC calls
packages/komodo_defi_framework/lib/src/streaming/event_streaming_service.dart Core streaming service with platform-specific implementations
packages/komodo_defi_rpc_methods/lib/src/rpc_methods/streaming/*.dart New RPC method wrappers for streaming endpoints
packages/komodo_defi_types/lib/src/generic/sync_status.dart Simplified parsing using enum values
packages/komodo_defi_local_auth/lib/src/auth/auth_service.dart Added shutdown signal streaming and reduced health check frequency

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread packages/komodo_defi_types/lib/src/generic/sync_status.dart
// maintaining a safety net for detecting stale KDF instances.
_healthCheckTimer = Timer.periodic(
const Duration(seconds: 5),
const Duration(minutes: 5),
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment on line 17 states 'Reduced from 5 minutes to 30 minutes' but the code shows Duration(minutes: 5). This inconsistency suggests either the comment or the code is incorrect. Based on the rationale to 'minimize RPC spam', the duration should likely be 30 minutes as stated in the comment.

Copilot uses AI. Check for mistakes.
Comment on lines +73 to +76
throw TransactionStorageException(
'Transaction not found in comparison: '
'${txA == null ? a : b} missing from transactions map',
);
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message construction '${txA == null ? a : b}' will only show one missing transaction ID. If both txA and txB are null, only a will be reported. Consider improving the message to handle both cases: 'Transaction(s) not found in comparison: ${txA == null ? a : ''} ${txB == null ? b : ''}'.trim().

Copilot uses AI. Check for mistakes.
Comment on lines +150 to +152
eventStreamingConfiguration:
eventStreamingConfiguration ??
EventStreamingConfiguration.defaultConfig(),
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Multi-line null-coalescing assignment is split across three lines unnecessarily. This can be simplified to a single line or two lines maximum for better readability: eventStreamingConfiguration: eventStreamingConfiguration ?? EventStreamingConfiguration.defaultConfig(),

Suggested change
eventStreamingConfiguration:
eventStreamingConfiguration ??
EventStreamingConfiguration.defaultConfig(),
eventStreamingConfiguration: eventStreamingConfiguration ?? EventStreamingConfiguration.defaultConfig(),

Copilot uses AI. Check for mistakes.
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Oct 29, 2025

Visit the preview URL for this PR (updated for commit c2188c1):

https://komodo-playground--pr269-dev-10ruth21.web.app

(expires Thu, 06 Nov 2025 21:10:40 GMT)

🔥 via Firebase Hosting GitHub Action 🌎

Sign: 2bfedd77fdea45b25ba7c784416e81f177aa5c47

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Oct 29, 2025

Visit the preview URL for this PR (updated for commit c2188c1):

https://kdf-sdk--pr269-dev-gnu1u1sj.web.app

(expires Thu, 06 Nov 2025 21:11:04 GMT)

🔥 via Firebase Hosting GitHub Action 🌎

Sign: 9c1b6e6c010cf0b965c455ba7a69c4aedafa8a1d

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 22

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (6)
packages/komodo_defi_sdk/lib/src/transaction_history/strategies/etherscan_transaction_history_strategy.dart (1)

118-119: Guard against division by zero in totalPages when pageSize can be 0

Transaction-based pagination sets pageSize=itemCount; if itemCount==0, this divides by zero.

Apply:

-        totalPages: (allTransactions.length / paginatedResults.pageSize).ceil(),
+        totalPages: paginatedResults.pageSize > 0
+            ? (allTransactions.length / paginatedResults.pageSize).ceil()
+            : 0,

Optionally validate itemsPerPage/itemCount > 0 in validatePagination or here and throw UnsupportedError for zero.

Also applies to: 196-213

packages/komodo_defi_sdk/lib/src/activation/activation_manager.dart (1)

63-66: Critical: _groupByPrimary can set a child as primary, breaking dedupe and history writes.

When a child asset is encountered before its parent, the group is created with primary = child. Because primary is final, it cannot be corrected later when the parent appears. This causes:

  • Activation dedupe keyed on a child’s AssetId instead of the parent.
  • History writes and log messages referencing the child rather than the parent.
  • Cleanup/removal keyed to the wrong id.

Fix: ensure the parent is always used as primary by looking it up at grouping time.

@@
-    final groups = _AssetGroup._groupByPrimary(assets);
+    final groups = _AssetGroup._groupByPrimary(assets, _assetLookup);
@@
-class _AssetGroup {
+class _AssetGroup {
   _AssetGroup({required this.primary, this.children})
@@
   static List<_AssetGroup> _groupByPrimary(List<Asset> assets) {
-    final groups = <AssetId, _AssetGroup>{};
-
-    for (final asset in assets) {
-      if (asset.id.parentId != null) {
-        // Child asset
-        final group = groups.putIfAbsent(
-          asset.id.parentId!,
-          () => _AssetGroup(primary: asset, children: {}),
-        );
-        group.children?.add(asset);
-      } else {
-        // Primary asset
-        groups.putIfAbsent(asset.id, () => _AssetGroup(primary: asset));
-      }
-    }
-
-    return groups.values.toList();
-  }
+    return _groupByPrimary(assets, _assetLookup /* placeholder to fail compile if not updated */);
+  }
+
+  static List<_AssetGroup> _groupByPrimary(
+    List<Asset> assets,
+    IAssetLookup assetLookup,
+  ) {
+    final groups = <AssetId, _AssetGroup>{};
+    for (final asset in assets) {
+      final parentId = asset.id.parentId;
+      if (parentId != null) {
+        // Ensure the group's primary is the parent asset.
+        final group = groups.putIfAbsent(
+          parentId,
+          () {
+            final parent = assetLookup.fromId(parentId) ??
+                (throw StateError('Parent asset $parentId not found'));
+            return _AssetGroup(primary: parent, children: <Asset>{});
+          },
+        );
+        group.children?.add(asset);
+      } else {
+        // Primary asset
+        groups.putIfAbsent(asset.id, () => _AssetGroup(primary: asset));
+      }
+    }
+    return groups.values.toList();
+  }

Note: The temporary call in the old no-arg _groupByPrimary provides a compile-time nudge to update call sites if any remain. If you prefer not to keep the delegating overload, remove it and update all call sites explicitly.

Also applies to: 312-329

packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart (2)

145-194: Stale-guard timers leak across wallet changes; cancel them in _resetState.

Timers continue running after reset, wasting resources and emitting on stale controllers.

Apply this diff:

   Future<void> _resetState() async {
     _logger.fine('Resetting state');
     final stopwatch = Stopwatch()..start();

     final List<Future<void>> cleanupFutures = <Future<void>>[];
+    // Cancel stale-guard timers first
+    for (final timer in _staleBalanceTimers.values) {
+      timer.cancel();
+    }
+    _staleBalanceTimers.clear();

     final List<StreamSubscription<dynamic>> watcherSubs = _activeWatchers.values
         .toList();
     _activeWatchers.clear();

218-228: Don’t blanket-wrap all errors into StateError; preserve documented exceptions.

Docs say this may throw AuthException/TimeoutException. Preserve known types and only wrap unexpected ones.

Apply this diff:

-    } catch (e) {
-      // Rethrow with more context
-      throw StateError('Failed to get balance for ${assetId.name}: $e');
+    } catch (e) {
+      if (e is AuthException || e is TimeoutException || e is ActivationFailedException) {
+        rethrow;
+      }
+      throw StateError('Failed to get balance for ${assetId.name}: $e');
     }
packages/komodo_defi_sdk/lib/src/transaction_history/transaction_history_manager.dart (2)

803-833: Dispose leaks: balance fallback subscriptions not canceled; stale balance cache not cleared.

dispose() cancels tx-history subs and timers, but not _balanceFallbackSubscriptions; _lastBalanceForPolling also persists.

Apply:

     await _authSubscription?.cancel();
 
     for (final sub in _txHistorySubscriptions.values) {
       await sub.cancel();
     }
     _txHistorySubscriptions.clear();
 
+    // Cancel balance fallback subscriptions
+    for (final sub in _balanceFallbackSubscriptions.values) {
+      await sub.cancel();
+    }
+    _balanceFallbackSubscriptions.clear();
+
     final timers = _pollingTimers.values.toList();
     _pollingTimers.clear();
     for (final timer in timers) {
       timer.cancel();
     }
 
     final controllers = _streamControllers.values.toList();
     _streamControllers.clear();
     for (final controller in controllers) {
       await controller.close();
     }
 
     _syncInProgress.clear();
 
     // Cancel confirmations refresh timers
     for (final timer in _confirmationsTimers.values) {
       timer.cancel();
     }
     _confirmationsTimers.clear();
+
+    // Drop last-known balances
+    _lastBalanceForPolling.clear();

345-361: onCancel can NPE if controller removed concurrently.

_streamControllers[asset.id]!.hasListener force-unwrap can fail if the controller was closed/removed (e.g., logout). Add a null-safe read.

Apply:

-        onCancel: () async {
-          if (!_streamControllers[asset.id]!.hasListener) {
-            _stopStreaming(asset.id);
-            await _streamControllers[asset.id]?.close();
-            _streamControllers.remove(asset.id);
-          }
-        },
+        onCancel: () async {
+          final controller = _streamControllers[asset.id];
+          if (controller == null) return;
+          if (!controller.hasListener) {
+            _stopStreaming(asset.id);
+            await controller.close();
+            _streamControllers.remove(asset.id);
+          }
+        },
🧹 Nitpick comments (46)
packages/komodo_defi_types/lib/src/generic/sync_status.dart (1)

15-17: Remove unnecessary enum prefix removal in sanitization.

The .replaceAll('SyncStatusEnum.', '') line is defensive but not needed. All data sources parsed by tryParse() (API responses, database fields) use plain enum value names like 'Ok' or 'InProgress', never with the 'SyncStatusEnum.' prefix. The case-insensitive matching is appropriate; simplify by removing line 16:

final sanitizedValue = value.toLowerCase();
packages/komodo_defi_rpc_methods/lib/src/rpc_methods/utility/rpc_task_shepherd.dart (1)

25-29: Good addition: Clear guidance on the streaming alternative.

The documentation appropriately calls out the polling approach's limitation and directs developers toward the event-based alternative. This aligns well with the PR's goal of reducing RPC calls through streaming.

Consider adding a brief usage example in a future update to help developers transition more smoothly from polling to streaming (e.g., showing side-by-side comparison of the polling vs. streaming approaches).

packages/komodo_wallet_cli/bin/update_api_config.dart (1)

429-453: Consider defensive checksum initialization for enhanced robustness, but not critical.

After verification, the edge case you flagged is not currently present—all platforms in the config have checksums defined. The code logic is sound: the first platform update creates the checksums list, and subsequent updates with the same commit hash safely assume it exists.

However, the edge case is theoretically possible if the config is manually edited or modified by external code. The defensive check you suggested (adding as List<dynamic>? with null coalescing) improves robustness without performance cost and prevents unexpected crashes if the config becomes inconsistent.

The suggested fix is reasonable as optional defensive programming. If this script is the sole manager of the config file, the risk is minimal; if other tools may modify it externally, the defensive check becomes more valuable.

packages/komodo_defi_local_auth/test/src/trezor/trezor_auth_service_test.dart (3)

270-273: Redundant cascade; keep for clarity or remove

users already defaults to an empty list in _FakeAuthService. You can drop the cascade for brevity, or keep it for explicitness. If this pattern repeats, consider a helper like _FakeAuthService.empty().


451-466: Fixture looks correct; add negative side‑effect checks and teardown

To make the “no stored password” path more robust, assert that no repo/monitor side effects happened and ensure cleanup even on early failure.

Apply:

@@
-      await expectLater(
+      await expectLater(
         service.signIn(
@@
       );
+
+      // No device init side effects expected on early throw
+      expect(monitor.started, isFalse);
+      expect(repo.providedPassphrases, isEmpty);

Optionally add teardown right after repo creation to guarantee close on failure:

@@
-      final repo = _FakeTrezorRepository();
+      final repo = _FakeTrezorRepository();
+      addTearDown(repo.close);

525-526: Also assert monitor flags after signOut

You already check stopCalls. Add flag assertions to verify state reset.

Apply:

@@
         await service.signOut();
         expect(monitor.stopCalls, 1);
         expect(auth.signOutCalled, isTrue);
+        expect(monitor.started, isFalse);
+        expect(monitor.stopped, isTrue);
packages/komodo_defi_sdk/lib/src/activation/protocol_strategies/zhtlc_activation_strategy.dart (2)

71-73: Clamp user-provided polling interval; define 0/negative semantics.

Unbounded small values can spike RPC; 0/negative is ambiguous. Clamp to a sane range and document 0 meaning “use default” (or “disable”) explicitly.

Proposed minimal change:

-      final effectivePollingInterval =
-          userConfig.taskStatusPollingIntervalMs != null &&
-              userConfig.taskStatusPollingIntervalMs! > 0
-          ? Duration(milliseconds: userConfig.taskStatusPollingIntervalMs!)
-          : pollingInterval;
+      final ms = userConfig.taskStatusPollingIntervalMs;
+      // 0/neg => fallback to default; clamp to [200, 60000] ms to avoid RPC spam or stalls.
+      final effectivePollingInterval = Duration(
+        milliseconds: (ms != null && ms > 0)
+            ? (ms < 200 ? 200 : (ms > 60000 ? 60000 : ms))
+            : pollingInterval.inMilliseconds,
+      );

Please confirm whether “0” is intended to disable polling; if yes, handle as a distinct branch.


95-98: Avoid reconstructing ActivationMode; prefer copyWith to preserve fields.

Directly instantiating ActivationMode risks dropping future/unknown fields. Use copyWith on both params and mode if available.

-          params = params.copyWith(
-            mode: ActivationMode(
-              rpc: params.mode!.rpc,
-              rpcData: updatedRpcData,
-            ),
-          );
+          params = params.copyWith(
+            mode: params.mode!.copyWith(rpcData: updatedRpcData),
+          );

If ActivationMode lacks copyWith, consider adding it or ensure all relevant fields are passed through here.

packages/komodo_defi_types/lib/src/coin_classes/protocol_class.dart (1)

144-172: Consider extracting shared unsupported protocol checks.

The checks for slp and tendermintToken are duplicated between the two methods, and the qrc20 child asset check appears in both. While minor, extracting shared logic could improve maintainability.

Example refactor:

bool _isUnsupportedForStreaming() {
  return subClass == CoinSubClass.slp ||
         subClass == CoinSubClass.tendermintToken;
}

bool _isQrc20ChildAsset(bool isChildAsset) {
  return subClass == CoinSubClass.qrc20 && isChildAsset;
}

bool supportsBalanceStreaming({required bool isChildAsset}) {
  if (_isUnsupportedForStreaming() || 
      subClass == CoinSubClass.sia ||
      _isQrc20ChildAsset(isChildAsset)) {
    return false;
  }
  return true;
}

bool supportsTxHistoryStreaming({required bool isChildAsset}) {
  if (evmCoinSubClasses.contains(subClass) ||
      _isUnsupportedForStreaming() ||
      _isQrc20ChildAsset(isChildAsset)) {
    return false;
  }
  return true;
}
packages/komodo_defi_types/lib/src/assets/asset.dart (1)

88-95: Consider gating streaming by wallet-only and tighten docs

  • If isWalletOnly is true, streaming likely should be false to avoid accidental enables.
  • Wording says “KDF supports…”, but implementation delegates to protocol; clarify to avoid confusion.

Suggested tweak:

-  bool get supportsBalanceStreaming =>
-      protocol.supportsBalanceStreaming(isChildAsset: id.parentId != null);
+  bool get supportsBalanceStreaming =>
+      !isWalletOnly &&
+      protocol.supportsBalanceStreaming(isChildAsset: id.parentId != null);

-  bool get supportsTxHistoryStreaming =>
-      protocol.supportsTxHistoryStreaming(isChildAsset: id.parentId != null);
+  bool get supportsTxHistoryStreaming =>
+      !isWalletOnly &&
+      protocol.supportsTxHistoryStreaming(isChildAsset: id.parentId != null);
packages/komodo_defi_sdk/lib/src/transaction_history/strategies/etherscan_transaction_history_strategy.dart (1)

136-153: Add request timeout to prevent hangs and improve failure semantics

http.Client.get has no timeout; transient stalls will block history fetching and UI updates.

Inject a timeout:

-      final response = await _client.get(uri);
+      // Consider making this configurable.
+      final response = await _client.get(uri).timeout(const Duration(seconds: 15));

Also catch TimeoutException to map to an HttpException with a clear message.

packages/komodo_defi_rpc_methods/lib/src/rpc_methods/streaming/streaming_balance_enable.dart (1)

7-37: RPC request shape looks correct; consider small ergonomics

  • Add a convenience factory taking Asset to reduce call-site string coupling.
  • Optionally assert coin.isNotEmpty to catch early misuse.

Example:

+  factory StreamBalanceEnableRequest.forAsset({
+    required String rpcPass,
+    required Asset asset,
+    int? clientId,
+    StreamConfig? config,
+  }) =>
+      StreamBalanceEnableRequest(
+        rpcPass: rpcPass,
+        coin: asset.id.id,
+        clientId: clientId,
+        config: config,
+      );
packages/komodo_defi_rpc_methods/lib/src/rpc_methods/streaming/streaming_tx_history_enable.dart (1)

7-35: Consistent with streaming API; add convenience factory for Asset

Good addition. For DX and type-safety, mirror the balance enable convenience factory with an Asset-based constructor.

+  factory StreamTxHistoryEnableRequest.forAsset({
+    required String rpcPass,
+    required Asset asset,
+    int? clientId,
+  }) =>
+      StreamTxHistoryEnableRequest(
+        rpcPass: rpcPass,
+        coin: asset.id.id,
+        clientId: clientId,
+      );
packages/komodo_defi_sdk/lib/src/pubkeys/pubkeys_storage.dart (1)

27-39: Add optional encryption and lifecycle helpers; current JSON shape LGTM

  • Consider supporting an optional HiveAesCipher to encrypt address data at rest; addresses are sensitive metadata.
  • Provide dispose() and clearForWallet(walletId) to clean up on logout/account removal.

Sketch:

-class HivePubkeysStorage implements PubkeysStorage {
+class HivePubkeysStorage implements PubkeysStorage {
+  HiveCipher? _cipher;
+  HivePubkeysStorage({HiveCipher? cipher}) : _cipher = cipher;
@@
-    _box = await Hive.openBox<HiveAssetPubkeysRecord>(_boxName);
+    _box = await Hive.openBox<HiveAssetPubkeysRecord>(
+      _boxName,
+      encryptionCipher: _cipher,
+    );
@@
+  Future<void> dispose() async {
+    if (_box?.isOpen == true) await _box!.close();
+    _box = null;
+  }
+
+  Future<void> clearForWallet(WalletId walletId) async {
+    final box = await _openBox();
+    final prefix = '${walletId.compoundId}|';
+    final toDelete = box.keys.whereType<String>().where((k) => k.startsWith(prefix)).toList();
+    await box.deleteAll(toDelete);
+  }

Also applies to: 41-73

packages/komodo_cex_market_data/lib/src/sparkline_repository.dart (1)

180-200: Consider clearing in-flight requests on disposal.

The disposal logic properly cleans up repositories and the Hive box. However, the _inFlightRequests map is not cleared. If disposal occurs while requests are pending, this could lead to memory leaks or unexpected behavior.

Consider adding cleanup for in-flight requests:

  Future<void> dispose() async {
+   // Cancel and clear any in-flight requests
+   _inFlightRequests.clear();
+
    for (final repository in _repositories) {
      try {
        repository.dispose();
packages/komodo_defi_framework/lib/src/streaming/events/network_event.dart (1)

4-28: Make constructor const; confirm payload keys.

Implementation looks solid. Two small nits:

  • Mark the constructor const for cheaper allocations in tests and logs.
  • Please verify the stream payload really uses keys 'netid' and 'peers' across all supported KDF builds.
-class NetworkEvent extends KdfEvent {
-  NetworkEvent({
+class NetworkEvent extends KdfEvent {
+  const NetworkEvent({
     required this.netid,
     required this.peers,
   });
packages/komodo_defi_sdk/lib/src/sdk/komodo_defi_sdk_config.dart (1)

12-13: Guard TTL against negatives and confirm wiring into ActivatedAssetsCache.

Add an assert to prevent accidental negative durations. Also confirm the TTL is passed to ActivatedAssetsCache wherever it’s constructed.

   const KomodoDefiSdkConfig({
@@
-    this.activatedAssetsCacheTtl = const Duration(seconds: 2),
+    this.activatedAssetsCacheTtl = const Duration(seconds: 2),
@@
   final Duration activatedAssetsCacheTtl;
@@
   KomodoDefiSdkConfig copyWith({
@@
   }) {
-    return KomodoDefiSdkConfig(
+    return KomodoDefiSdkConfig(
+      // Disallow negative TTLs at construction time.
+      // ignore: prefer_asserts_in_initializer_lists
+      // (asserts in body for const-compat classes)
+      // Note: If you prefer ctor-time asserts, move this to the ctor instead.
+      // Keep one approach consistently across config types.
+      // This protects against accidental negative durations that would break cache semantics.
+      // (If your lints disallow body asserts, place it in the constructor initializer list.)
+      assert((activatedAssetsCacheTtl ?? this.activatedAssetsCacheTtl) >= Duration.zero),
       defaultAssets: defaultAssets ?? this.defaultAssets,

Also applies to: 34-37, 62-64

packages/komodo_defi_framework/lib/src/streaming/event_streaming_platform_stub.dart (1)

5-11: Add a debug-time warning to catch accidental use of the stub.

The no-op is fine for unsupported platforms, but it can hide misconfiguration. Emit a debug-only notice and underscore param names to quiet lints.

-EventStreamUnsubscribe connectEventStream({
-  IKdfHostConfig? hostConfig,
-  required void Function(Object? data) onMessage,
-}) {
-  // No-op default implementation; actual logic provided by IO/Web variants
-  return () {};
-}
+EventStreamUnsubscribe connectEventStream({
+  IKdfHostConfig? _hostConfig,
+  required void Function(Object? data) _onMessage,
+}) {
+  // No-op default implementation; actual logic provided by IO/Web variants.
+  // Debug hint to surface accidental stub usage.
+  assert(() {
+    // Keep this extremely lightweight to avoid side-effects.
+    // ignore: avoid_print
+    print('[EventStreaming] Using platform stub (no stream connected). '
+          'Ensure IO/Web platform implementation is picked via conditional imports.');
+    return true;
+  }());
+  return () {};
+}
packages/komodo_defi_framework/lib/src/streaming/events/balance_event.dart (1)

4-28: Const ctor; confirm event key aligns with asset-config-ID transition.

  • Mark constructor const for minor perf wins.
  • Given the PR’s shift to asset config IDs for subscriptions, confirm the event payload still uses 'coin' (ticker). If payloads move to config IDs (e.g., 'asset_id'), consider adding a parallel field or migration path.
-class BalanceEvent extends KdfEvent {
-  BalanceEvent({
+class BalanceEvent extends KdfEvent {
+  const BalanceEvent({
     required this.coin,
     required this.balance,
   });
packages/komodo_defi_sdk/lib/src/activation/activation_manager.dart (1)

241-252: Error fallback returns empty set; consider last-known-good to avoid flapping.

On failure, returning {} will mark everything inactive transiently, potentially triggering redundant work. If the cache can expose a snapshot, consider falling back to it before {}.

packages/komodo_defi_framework/lib/src/streaming/events/unknown_event.dart (1)

14-16: Avoid throwing from typeEnum; return a sentinel or ensure it’s never read

Throwing from a simple getter risks unexpected crashes if generic code accesses typeEnum for logging/metrics. Prefer a sentinel (e.g., EventTypeString.unknown) or make the contract explicit and guard all call sites. If a sentinel exists, switch to it; otherwise document and harden callers.

If EventTypeString.unknown exists, apply:

-  @override
-  EventTypeString get typeEnum =>
-      throw UnsupportedError('UnknownEvent does not have a type enum mapping');
+  @override
+  EventTypeString get typeEnum => EventTypeString.unknown;
packages/komodo_defi_framework/lib/src/streaming/events/task_event.dart (1)

8-10: Make taskData immutable to prevent accidental mutation

Wrap the parsed map to avoid downstream writes mutating shared state.

Apply:

-  factory TaskEvent.fromJson(JsonMap json, int taskId) {
-    return TaskEvent(taskId: taskId, taskData: json);
-  }
+  factory TaskEvent.fromJson(JsonMap json, int taskId) {
+    return TaskEvent(taskId: taskId, taskData: Map.unmodifiable(json));
+  }

Overall event wiring looks good.

Also applies to: 18-19

packages/komodo_defi_framework/lib/src/streaming/event_streaming_platform_web.dart (1)

28-41: Use SharedWorker feature‑detection, pass hostConfig to worker, and add error handlers

  • hostConfig is currently ignored; if the worker needs RPC origin/credentials, post an initial config message.
  • Prefer explicit feature detection over exception flow for environments without SharedWorker.
  • Add onerror/onmessageerror handlers and log the catch path for observability.

Apply minimal guards and error handlers:

@@
-import 'package:js/js_util.dart' as jsu;
+import 'package:js/js_util.dart' as jsu;
@@
-  try {
-    final Object sharedWorkerCtor = _getGlobalProperty('SharedWorker');
+  try {
+    // Bail early if SharedWorker is not available
+    if (!jsu.hasProperty(jsu.globalThis, 'SharedWorker')) {
+      if (kDebugMode) print('EventStream: SharedWorker not supported');
+      return () {};
+    }
+    final Object sharedWorkerCtor = _getGlobalProperty('SharedWorker');
@@
-    _callMethod<void>(port, 'start', const <Object>[]);
+    _callMethod<void>(port, 'start', const <Object>[]);
+    // Optionally pass host configuration to the worker on connect
+    if (hostConfig != null) {
+      _callMethod<void>(port, 'postMessage', <Object?>[hostConfig]);
+    }
@@
-    _setProperty(port, 'onmessage', jsu.allowInterop(handler));
+    _setProperty(port, 'onmessage', jsu.allowInterop(handler));
+    _setProperty(port, 'onerror', jsu.allowInterop((Object e) {
+      if (kDebugMode) print('EventStream: port error: $e');
+    }));
+    _setProperty(port, 'onmessageerror', jsu.allowInterop((Object e) {
+      if (kDebugMode) print('EventStream: message error: $e');
+    }));
@@
   } catch (_) {
-    return () {};
+    if (kDebugMode) print('EventStream: failed to initialize SharedWorker');
+    return () {};
   }

Note: replace hostConfig payload shape with what your worker expects (e.g., map of baseUrl/clientId).

Also applies to: 51-58, 59-61

packages/komodo_defi_rpc_methods/lib/src/rpc_methods/streaming/streaming_orderbook_enable.dart (1)

21-33: Confirm base/rel are asset config IDs, not display names

PR states streams now use asset config IDs (asset.id.id). Ensure callers pass IDs here; otherwise orderbook subscriptions may not match new back‑end expectations.

Optionally assert non‑empty IDs in debug:

   StreamOrderbookEnableRequest({
     required String rpcPass,
     required this.base,
     required this.rel,
     this.clientId,
   }) : super(
          method: 'stream::orderbook::enable',
          rpcPass: rpcPass,
          mmrpc: RpcVersion.v2_0,
        );
+  // Debug-time sanity
+  assert(base.isNotEmpty && rel.isNotEmpty, 'base/rel must be non-empty asset IDs');
packages/komodo_defi_local_auth/lib/src/auth/auth_service_operations_extension.dart (1)

60-63: Align log with actual platform guard

You return early for Web and Windows but log only “Web”. Clarify for Windows too.

Apply:

-      _logger.info('Shutdown signal stream not supported on Web');
+      _logger.info('Shutdown signal stream not supported on Web/Windows');
packages/komodo_defi_framework/lib/komodo_defi_framework.dart (2)

110-116: Double-check initialize() contract

..initialize() is not awaited. If initialize() is async (returns Future), consider either returning a Future<KdfEventStreamingService> or explicitly ignoring via unawaited(streaming.initialize()) and documenting eventual consistency.


63-66: Default debug RPC logging to false in production builds

enableDebugLogging = true can be noisy and may leak operational details. Recommend default false and enabling via config/feature flag.

-  static bool enableDebugLogging = true;
+  static bool enableDebugLogging = false;
packages/komodo_defi_framework/lib/src/streaming/events/shutdown_signal_event.dart (1)

13-17: Harden JSON parsing; align with doc fallback

Parsing only message can throw if payload differs. Add tolerant parsing and fallback to UNKNOWN(<id>) to match the doc.

-  factory ShutdownSignalEvent.fromJson(JsonMap json) {
-    return ShutdownSignalEvent(
-      signalName: json.value<String>('message'),
-    );
-  }
+  factory ShutdownSignalEvent.fromJson(JsonMap json) {
+    final msg = json.tryValue<String>('message');
+    final name = msg ??
+        json.tryValue<String>('signal') ??
+        (json.tryValue<int>('signal_id') != null
+            ? 'UNKNOWN(${json.tryValue<int>('signal_id')})'
+            : 'UNKNOWN');
+    return ShutdownSignalEvent(signalName: name);
+  }

Also applies to: 19-21

packages/komodo_defi_framework/lib/src/streaming/events/heartbeat_event.dart (1)

10-14: Parse timestamp defensively

Some emitters send numbers as num or numeric strings. Be lenient and coerce to int.

-  factory HeartbeatEvent.fromJson(JsonMap json) {
-    return HeartbeatEvent(
-      timestamp: json.value<int>('timestamp'),
-    );
-  }
+  factory HeartbeatEvent.fromJson(JsonMap json) {
+    final ts = json.tryValue<int>('timestamp') ??
+        (json.tryValue<num>('timestamp')?.toInt()) ??
+        int.parse(json.value<String>('timestamp'));
+    return HeartbeatEvent(timestamp: ts);
+  }
packages/komodo_defi_framework/lib/src/streaming/events/order_status_event.dart (1)

13-18: Avoid throwing on malformed order payloads

If order is missing/invalid, fromJson will throw and may collapse the stream. Prefer a guarded parse with a clear error path (e.g., fallback to UnknownEvent upstream or return a minimal struct).

Do we have an UnknownEvent or a safe MyOrderInfo.tryFromJson? If yes, use it here to keep the stream resilient.

packages/komodo_defi_framework/lib/src/streaming/event_streaming_platform_io.dart (1)

10-10: Consider async unsubscribe typedef

Unsubscribe performs async cancel(). Make the API explicit:

-typedef EventStreamUnsubscribe = void Function();
+typedef EventStreamUnsubscribe = Future<void> Function();

Call sites can still ignore the returned future if desired.

packages/komodo_defi_local_auth/lib/src/auth/auth_service.dart (1)

275-307: Consider protecting user cache against concurrent initialization.

If multiple concurrent calls to getUsers() occur before the cache is populated, all will miss the cache and issue RPC calls. Consider using a mutex or single-flight pattern around cache initialization.

Example pattern:

Future<List<KdfUser>>? _ongoingUsersFetch;

return _runReadOperation(() async {
  // Return cached value if fresh
  if (_usersCache != null && _usersCacheTimestamp != null &&
      DateTime.now().difference(_usersCacheTimestamp!) < _usersCacheTtl) {
    return _usersCache!;
  }

  // Single-flight: if fetch in progress, await it
  if (_ongoingUsersFetch != null) {
    return _ongoingUsersFetch!;
  }

  // Start fetch and cache the future
  _ongoingUsersFetch = _fetchAndCacheUsers();
  try {
    return await _ongoingUsersFetch!;
  } finally {
    _ongoingUsersFetch = null;
  }
});
packages/komodo_defi_sdk/lib/src/komodo_defi_sdk.dart (1)

2-2: Unify logging and streaming manager import usage.

Looks good. Minor: consider routing init logs via the SDK’s logger callback for consistency instead of dart:developer directly, or wrap log(...) behind a small logger util to swap implementations easily.

Also applies to: 15-15

packages/komodo_defi_sdk/lib/src/pubkeys/hive_pubkeys_adapters.dart (2)

32-43: Compute total to avoid null where callers may expect a value.

If BalanceInfo.total is often used, set it to spendable + unspendable to reduce null-checks and surprises.

-  PubkeyInfo toDomain(String coinTicker) => PubkeyInfo(
+  PubkeyInfo toDomain(String coinTicker) => PubkeyInfo(
     address: address,
     derivationPath: derivationPath,
     chain: chain,
     balance: BalanceInfo(
-      total: null,
-      spendable: Decimal.parse(spendable),
-      unspendable: Decimal.parse(unspendable),
+      total: Decimal.parse(spendable) + Decimal.parse(unspendable),
+      spendable: Decimal.parse(spendable),
+      unspendable: Decimal.parse(unspendable),
     ),
     coinTicker: coinTicker,
   );

45-79: Future‑proof Hive layout with a schema/version byte.

Current read/write relies on strict field order. Consider prefixing a small version byte to enable additive fields later without migrations.

-  HiveStoredPubkey read(BinaryReader reader) {
+  HiveStoredPubkey read(BinaryReader reader) {
+    final version = reader.readByte(); // start using at v1
     final address = reader.readString();
@@
-  void write(BinaryWriter writer, HiveStoredPubkey obj) {
-    writer
+  void write(BinaryWriter writer, HiveStoredPubkey obj) {
+    writer
+      ..writeByte(1) // schema v1
       ..writeString(obj.address)

Apply similarly to HiveAssetPubkeysRecordAdapter.

packages/komodo_defi_rpc_methods/lib/src/rpc_methods/streaming/streaming_common.dart (2)

5-28: Generic type parameter T is unused.

Either remove <T extends KdfEvent> or use it to carry the event type metadata. Keeping unused generics increases complexity.

-class StreamEnableResponse<T extends KdfEvent> extends BaseResponse {
+class StreamEnableResponse extends BaseResponse {
@@
-  factory StreamEnableResponse.parse(JsonMap json) {
+  factory StreamEnableResponse.parse(JsonMap json) {
@@
-    return StreamEnableResponse<T>(
+    return StreamEnableResponse(

51-62: StreamConfig looks good; consider validating positive intervals.

Optional: guard against non‑positive values before serializing.

 class StreamConfig implements RpcRequestParams {
   const StreamConfig({this.streamIntervalSeconds});
@@
   @override
   JsonMap toRpcParams() => {
-    if (streamIntervalSeconds != null)
+    if (streamIntervalSeconds != null && streamIntervalSeconds! > 0)
       'stream_interval_seconds': streamIntervalSeconds,
   };
 }
packages/komodo_defi_framework/lib/src/config/kdf_startup_config.dart (1)

40-41: Streaming config semantics: make intent explicit (non-nullable vs. disable).

Right now factories always inject EventStreamingConfiguration.defaultConfig() when null is passed, making it impossible to intentionally disable streaming through public APIs. Either:

  • Make eventStreamingConfiguration non-nullable everywhere and always include it in start params; or
  • Treat null as "disable streaming" and propagate null to mm2 by not sending the key.

Pick one and document it to avoid ambiguity. If you keep it always-on, you can simplify by making the field non-nullable and dropping the null-check in encode.

Would you like me to generate a small follow-up to make the field non-nullable and always serialized?

Also applies to: 98-99, 150-153, 177-178, 206-209

packages/komodo_defi_sdk/lib/src/assets/asset_manager.dart (1)

223-224: Disposal invalidation has global side-effects.

Invalidating a shared cache during AssetManager.dispose() may flush cache for other consumers. If this manager doesn't own the cache's lifecycle, consider removing this call and rely on the cache's own auth-listener invalidation.

packages/komodo_defi_sdk/lib/src/assets/activated_assets_cache.dart (1)

45-84: Solid TTL + in-flight dedupe; return unmodifiable to protect cache.

Expose unmodifiable views to prevent external mutation of internal cache contents.

Apply this diff:

-      if (_generation == generation) {
-        _cache = assets;
-        _lastFetchAt = fetchStart; // Use start time for accurate TTL
-      }
+      if (_generation == generation) {
+        _cache = List<Asset>.unmodifiable(assets);
+        _lastFetchAt = fetchStart; // Use start time for accurate TTL
+      }

Optionally also:

-    return assets.map((asset) => asset.id).toSet();
+    return Set<AssetId>.unmodifiable(assets.map((a) => a.id));
packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart (1)

540-612: Polling fallback path: clear and informative; consider reducing default log level.

enableDebugLogging defaults to true and logs at INFO every 30s per asset. Consider defaulting to false or lowering to FINE to avoid noisy logs in production.

-  static bool enableDebugLogging = true;
+  static bool enableDebugLogging = false;
packages/komodo_defi_sdk/lib/src/streaming/event_streaming_manager.dart (1)

35-36: Make clientId configurable (optional).

Hard‑coding _defaultClientId = 0 can clash if multiple clients exist. Consider passing clientId via constructor or deriving from app context.

Confirm whether KDF supports multiple concurrent client ids in your app context.

packages/komodo_defi_sdk/lib/src/transaction_history/transaction_history_manager.dart (4)

275-305: Reduce duplicates in getTransactionsStreamed: start API paging from latest stored txId.

Currently yields cached page then re-fetches page 1, causing likely duplicates. Initialize fromId from storage.

Apply:

-    String? fromId;
+    String? fromId = await _storage.getLatestTransactionId(
+      asset.id,
+      await _getCurrentWalletId(),
+    );
     var hasMore = true;

Optionally, if fromId != null, consider skipping the initial cached yield or documenting that duplicates may occur and should be de-duped by consumers.


745-801: Confirmations refresh re-emits full first page every 30s; consider emitting only changed txs.

This can spam listeners with duplicates and increase CPU/I/O, especially for active assets. Prefer diffing by txId + confirmation count before emitting.

Minimal approach:

  • After _batchStoreTransactions, fetch only transactions whose confirmations changed since last emit and emit those.
  • Alternatively, add a storeTransactions(..., returnUpsertsOnly: true) API to get upserted/changed entries to emit.

610-629: Balance-first fallback: force path may thrash on activation failures.

If activation fails, force: true path falls back to _pollNewTransactions, which calls _ensureAssetActivated again and retries with backoff elsewhere. Consider short-circuiting on known ActivationFailedException for a cooldown interval.


418-426: Clearing history closes the asset stream; confirm desired public behavior.

clearTransactionHistory stops streaming and closes/removes the controller, permanently ending watchTransactions for existing subscribers. If callers expect the stream to continue after clearing, keep the controller open and restart streaming/polling instead.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 10b2e25 and c963681.

📒 Files selected for processing (81)
  • packages/komodo_cex_market_data/lib/src/binance/data/binance_repository.dart (1 hunks)
  • packages/komodo_cex_market_data/lib/src/bootstrap/market_data_bootstrap.dart (2 hunks)
  • packages/komodo_cex_market_data/lib/src/cex_repository.dart (1 hunks)
  • packages/komodo_cex_market_data/lib/src/coingecko/data/coingecko_repository.dart (1 hunks)
  • packages/komodo_cex_market_data/lib/src/coinpaprika/data/coinpaprika_cex_provider.dart (4 hunks)
  • packages/komodo_cex_market_data/lib/src/coinpaprika/data/coinpaprika_repository.dart (2 hunks)
  • packages/komodo_cex_market_data/lib/src/sparkline_repository.dart (2 hunks)
  • packages/komodo_cex_market_data/test/repository_priority_manager_test.dart (1 hunks)
  • packages/komodo_cex_market_data/test/repository_selection_strategy_test.dart (4 hunks)
  • packages/komodo_coin_updates/lib/src/coins_config/_coins_config_index.dart (1 hunks)
  • packages/komodo_defi_framework/app_build/build_config.json (4 hunks)
  • packages/komodo_defi_framework/assets/web/event_streaming_worker.js (1 hunks)
  • packages/komodo_defi_framework/lib/komodo_defi_framework.dart (9 hunks)
  • packages/komodo_defi_framework/lib/src/config/event_streaming_config.dart (1 hunks)
  • packages/komodo_defi_framework/lib/src/config/kdf_startup_config.dart (9 hunks)
  • packages/komodo_defi_framework/lib/src/operations/kdf_operations_native.dart (1 hunks)
  • packages/komodo_defi_framework/lib/src/streaming/event_streaming_platform_io.dart (1 hunks)
  • packages/komodo_defi_framework/lib/src/streaming/event_streaming_platform_stub.dart (1 hunks)
  • packages/komodo_defi_framework/lib/src/streaming/event_streaming_platform_web.dart (1 hunks)
  • packages/komodo_defi_framework/lib/src/streaming/event_streaming_service.dart (1 hunks)
  • packages/komodo_defi_framework/lib/src/streaming/events/balance_event.dart (1 hunks)
  • packages/komodo_defi_framework/lib/src/streaming/events/heartbeat_event.dart (1 hunks)
  • packages/komodo_defi_framework/lib/src/streaming/events/kdf_event.dart (1 hunks)
  • packages/komodo_defi_framework/lib/src/streaming/events/network_event.dart (1 hunks)
  • packages/komodo_defi_framework/lib/src/streaming/events/order_status_event.dart (1 hunks)
  • packages/komodo_defi_framework/lib/src/streaming/events/orderbook_event.dart (1 hunks)
  • packages/komodo_defi_framework/lib/src/streaming/events/shutdown_signal_event.dart (1 hunks)
  • packages/komodo_defi_framework/lib/src/streaming/events/swap_status_event.dart (1 hunks)
  • packages/komodo_defi_framework/lib/src/streaming/events/task_event.dart (1 hunks)
  • packages/komodo_defi_framework/lib/src/streaming/events/tx_history_event.dart (1 hunks)
  • packages/komodo_defi_framework/lib/src/streaming/events/unknown_event.dart (1 hunks)
  • packages/komodo_defi_framework/pubspec.yaml (2 hunks)
  • packages/komodo_defi_local_auth/lib/src/auth/auth_service.dart (12 hunks)
  • packages/komodo_defi_local_auth/lib/src/auth/auth_service_auth_extension.dart (2 hunks)
  • packages/komodo_defi_local_auth/lib/src/auth/auth_service_operations_extension.dart (2 hunks)
  • packages/komodo_defi_local_auth/test/src/trezor/trezor_auth_service_test.dart (3 hunks)
  • packages/komodo_defi_rpc_methods/lib/src/rpc_methods/rpc_methods.dart (1 hunks)
  • packages/komodo_defi_rpc_methods/lib/src/rpc_methods/streaming/streaming_balance_enable.dart (1 hunks)
  • packages/komodo_defi_rpc_methods/lib/src/rpc_methods/streaming/streaming_common.dart (1 hunks)
  • packages/komodo_defi_rpc_methods/lib/src/rpc_methods/streaming/streaming_disable.dart (1 hunks)
  • packages/komodo_defi_rpc_methods/lib/src/rpc_methods/streaming/streaming_heartbeat_enable.dart (1 hunks)
  • packages/komodo_defi_rpc_methods/lib/src/rpc_methods/streaming/streaming_network_enable.dart (1 hunks)
  • packages/komodo_defi_rpc_methods/lib/src/rpc_methods/streaming/streaming_order_status_enable.dart (1 hunks)
  • packages/komodo_defi_rpc_methods/lib/src/rpc_methods/streaming/streaming_orderbook_enable.dart (1 hunks)
  • packages/komodo_defi_rpc_methods/lib/src/rpc_methods/streaming/streaming_rpc_namespace.dart (1 hunks)
  • packages/komodo_defi_rpc_methods/lib/src/rpc_methods/streaming/streaming_shutdown_signal_enable.dart (1 hunks)
  • packages/komodo_defi_rpc_methods/lib/src/rpc_methods/streaming/streaming_swap_status_enable.dart (1 hunks)
  • packages/komodo_defi_rpc_methods/lib/src/rpc_methods/streaming/streaming_tx_history_enable.dart (1 hunks)
  • packages/komodo_defi_rpc_methods/lib/src/rpc_methods/utility/rpc_task_shepherd.dart (1 hunks)
  • packages/komodo_defi_rpc_methods/lib/src/rpc_methods_library.dart (2 hunks)
  • packages/komodo_defi_sdk/lib/komodo_defi_sdk.dart (1 hunks)
  • packages/komodo_defi_sdk/lib/src/activation/_activation_index.dart (1 hunks)
  • packages/komodo_defi_sdk/lib/src/activation/activation_exceptions.dart (1 hunks)
  • packages/komodo_defi_sdk/lib/src/activation/activation_manager.dart (4 hunks)
  • packages/komodo_defi_sdk/lib/src/activation/nft_activation_service.dart (1 hunks)
  • packages/komodo_defi_sdk/lib/src/activation/protocol_strategies/eth_task_activation_strategy.dart (1 hunks)
  • packages/komodo_defi_sdk/lib/src/activation/protocol_strategies/eth_with_tokens_activation_strategy.dart (1 hunks)
  • packages/komodo_defi_sdk/lib/src/activation/protocol_strategies/utxo_activation_strategy.dart (3 hunks)
  • packages/komodo_defi_sdk/lib/src/activation/protocol_strategies/zhtlc_activation_strategy.dart (3 hunks)
  • packages/komodo_defi_sdk/lib/src/activation/shared_activation_coordinator.dart (1 hunks)
  • packages/komodo_defi_sdk/lib/src/assets/_assets_index.dart (1 hunks)
  • packages/komodo_defi_sdk/lib/src/assets/activated_assets_cache.dart (1 hunks)
  • packages/komodo_defi_sdk/lib/src/assets/asset_manager.dart (4 hunks)
  • packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart (8 hunks)
  • packages/komodo_defi_sdk/lib/src/bootstrap.dart (9 hunks)
  • packages/komodo_defi_sdk/lib/src/komodo_defi_sdk.dart (5 hunks)
  • packages/komodo_defi_sdk/lib/src/pubkeys/hive_pubkeys_adapters.dart (1 hunks)
  • packages/komodo_defi_sdk/lib/src/pubkeys/pubkey_manager.dart (11 hunks)
  • packages/komodo_defi_sdk/lib/src/pubkeys/pubkeys_storage.dart (1 hunks)
  • packages/komodo_defi_sdk/lib/src/sdk/komodo_defi_sdk_config.dart (4 hunks)
  • packages/komodo_defi_sdk/lib/src/streaming/event_streaming_manager.dart (1 hunks)
  • packages/komodo_defi_sdk/lib/src/transaction_history/strategies/etherscan_transaction_history_strategy.dart (2 hunks)
  • packages/komodo_defi_sdk/lib/src/transaction_history/strategies/zhtlc_transaction_strategy.dart (1 hunks)
  • packages/komodo_defi_sdk/lib/src/transaction_history/transaction_history_manager.dart (15 hunks)
  • packages/komodo_defi_sdk/lib/src/transaction_history/transaction_history_strategies.dart (2 hunks)
  • packages/komodo_defi_sdk/lib/src/transaction_history/transaction_storage.dart (4 hunks)
  • packages/komodo_defi_types/lib/src/assets/asset.dart (1 hunks)
  • packages/komodo_defi_types/lib/src/coin_classes/protocol_class.dart (1 hunks)
  • packages/komodo_defi_types/lib/src/generic/sync_status.dart (2 hunks)
  • packages/komodo_defi_types/lib/src/transactions/transaction_history_strategy.dart (1 hunks)
  • packages/komodo_wallet_cli/bin/update_api_config.dart (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
packages/komodo_defi_rpc_methods/lib/src/rpc_methods/rpc_methods.dart (1)
packages/komodo_defi_rpc_methods/lib/src/util/methods_index_generator.py (1)
  • generate_dart_code_refined (38-95)
🪛 Biome (2.1.2)
packages/komodo_defi_framework/assets/web/event_streaming_worker.js

[error] 5-6: A global variable should not be reassigned.

Assigning to a global variable can override essential functionality.

(lint/suspicious/noGlobalAssign)

🪛 Gitleaks (8.28.0)
packages/komodo_defi_framework/app_build/build_config.json

[high] 3-3: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.

(generic-api-key)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: build_and_preview_playground_preview
  • GitHub Check: build_and_preview_sdk_example_preview
  • GitHub Check: Flutter tests (all packages)
  • GitHub Check: build_and_deploy_playground

Comment thread packages/komodo_defi_framework/assets/web/event_streaming_worker.js
Comment thread packages/komodo_defi_framework/assets/web/event_streaming_worker.js
Comment thread packages/komodo_defi_framework/lib/komodo_defi_framework.dart
Comment on lines 384 to 386
// Log full activation params for detailed debugging
_logger.fine('[ACTIVATION] Full params: $activationParams');
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard full activation params logging; risk of exposing sensitive endpoints

_logger.fine('[ACTIVATION] Full params: $activationParams') runs unconditionally. Activation params can include full RPC URLs (potential API keys) and node lists.

-        _logger.fine('[ACTIVATION] Full params: $activationParams');
+        if (KdfLoggingConfig.verboseLogging) {
+          _logger.fine('[ACTIVATION] Full params: $activationParams');
+        }

Optionally mask obvious secrets (e.g., redact query strings in rpc_urls). I can add a small _sanitizeActivationParams helper if desired.

🤖 Prompt for AI Agents
In packages/komodo_defi_framework/lib/komodo_defi_framework.dart around lines
384 to 386, the unconditional `_logger.fine('[ACTIVATION] Full params:
$activationParams')` may leak sensitive data (RPC URLs, API keys); update the
code to only log activation params when appropriate and to sanitize them first:
add or call a helper like `_sanitizeActivationParams` that redacts query strings
and known secret keys (e.g., remove or replace values for rpc_urls' query
params, api_key, token, password), and wrap the log behind the logger check
(e.g., only log if fine/debug level enabled) or a runtime config flag. Ensure
the logged output contains no raw secrets and retains only safe, redacted detail
for debugging.

Comment on lines +115 to +118
hdAccountId == null,
'HD Account ID is not supported yet in the SDK. '
'Use at your own risk.',
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Assert-only HD account guard will be stripped in release; enforce at runtime.

Asserts don't run in release. If hdAccountId truly isn't supported, enforce it with a runtime check to prevent accidental prod usage.

Apply this diff:

-    assert(
-      hdAccountId == null,
-      'HD Account ID is not supported yet in the SDK. '
-      'Use at your own risk.',
-    );
+    if (hdAccountId != null) {
+      throw UnsupportedError(
+        'HD Account ID is not supported yet in the SDK.',
+      );
+    }
🤖 Prompt for AI Agents
In packages/komodo_defi_framework/lib/src/config/kdf_startup_config.dart around
lines 115-118, the current guard uses an assert which is stripped in release
builds; replace it with a runtime check that throws a descriptive exception when
hdAccountId is non-null (for example throw ArgumentError or StateError with the
existing message) so the unsupported HD account is prevented in all environments
and includes the same explanatory text.

Comment on lines +261 to +278
final addresses =
(raw['addresses'] as List<dynamic>?)?.cast<Map<String, dynamic>>() ??
const <Map<String, dynamic>>[];
final keys = <PubkeyInfo>[];
for (final addr in addresses) {
final bal = BalanceInfo.fromJson(
(addr['balance'] as Map).cast<String, dynamic>(),
);
keys.add(
PubkeyInfo(
address: addr['address'] as String,
derivationPath: addr['derivation_path'] as String?,
chain: addr['chain'] as String?,
balance: bal,
coinTicker: asset.id.id,
),
);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Hydration parsing is brittle; guard per-address and tolerate missing fields.

A single malformed entry drops the entire hydration. Parse addresses defensively and skip bad entries instead of failing the whole asset.

Apply this diff:

-      final addresses =
-          (raw['addresses'] as List<dynamic>?)?.cast<Map<String, dynamic>>() ??
-          const <Map<String, dynamic>>[];
-      final keys = <PubkeyInfo>[];
-      for (final addr in addresses) {
-        final bal = BalanceInfo.fromJson(
-          (addr['balance'] as Map).cast<String, dynamic>(),
-        );
-        keys.add(
-          PubkeyInfo(
-            address: addr['address'] as String,
-            derivationPath: addr['derivation_path'] as String?,
-            chain: addr['chain'] as String?,
-            balance: bal,
-            coinTicker: asset.id.id,
-          ),
-        );
-      }
+      final rawAddresses = raw['addresses'];
+      final addrList = rawAddresses is List ? rawAddresses : const <dynamic>[];
+      final keys = <PubkeyInfo>[];
+      for (final item in addrList) {
+        try {
+          final addr = (item as Map).cast<String, dynamic>();
+          final balMap = addr['balance'];
+          final bal = balMap is Map<String, dynamic>
+              ? BalanceInfo.fromJson(balMap)
+              : BalanceInfo(total: Decimal.zero, spendable: Decimal.zero, unspendable: Decimal.zero);
+          final address = addr['address'] as String?;
+          if (address == null || address.isEmpty) continue;
+          keys.add(
+            PubkeyInfo(
+              address: address,
+              derivationPath: addr['derivation_path'] as String?,
+              chain: addr['chain'] as String?,
+              balance: bal,
+              coinTicker: asset.id.id,
+            ),
+          );
+        } catch (_) {
+          // Skip malformed entry
+        }
+      }

Note: add import 'package:decimal/decimal.dart'; if BalanceInfo requires Decimal zeros.

Also applies to: 280-294

🤖 Prompt for AI Agents
In packages/komodo_defi_sdk/lib/src/pubkeys/pubkey_manager.dart around lines
261-278 (and similarly at 280-294), the current hydration assumes every address
map and its fields are present and correctly typed; a single malformed entry
will throw and abort hydration. Change the loop to defensively validate and
parse each address: check types (Map, String) for address, derivation_path,
chain and balance before using, wrap per-address parsing in a try/catch or
conditional to skip invalid entries, supply safe defaults for missing optional
fields (e.g., null or empty) and construct BalanceInfo from a validated Map (or
create a zero BalanceInfo when missing). Also add import
'package:decimal/decimal.dart'; if BalanceInfo needs Decimal.zero for default
balances.

Comment on lines +74 to +86
Future<StreamSubscription<BalanceEvent>> subscribeToBalance({
required String coin,
StreamConfig? config,
}) => _subscribeToStream<BalanceEvent>(
key: 'balance:$coin',
enableStream: () => _rpcMethods.streaming.enableBalance(
coin: coin,
clientId: _defaultClientId,
config: config,
),
eventStream: _eventService.balanceEvents.where((e) => e.coin == coin),
);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Stream keys ignore config/alwaysSend; can merge incompatible subscribers.

Balance, Network, and Heartbeat use keys that omit config/alwaysSend. First subscriber “wins” the remote config; later ones silently reuse it.

  • Include a stable hash of config/flags in the key, or
  • Validate new requests match the active config and throw if not.

Example:

-  key: 'balance:$coin',
+  key: 'balance:$coin:${config?.hashKey ?? ''}',

Similarly:

-  key: 'network',
+  key: 'network:${config?.hashKey ?? ''}:${alwaysSend == true}',
-  key: 'heartbeat',
+  key: 'heartbeat:${config?.hashKey ?? ''}:${alwaysSend == true}',

Where hashKey is a small, deterministic string derived from StreamConfig.

Also applies to: 149-160, 166-177

🤖 Prompt for AI Agents
In packages/komodo_defi_sdk/lib/src/streaming/event_streaming_manager.dart
around lines 74 to 86 (and similarly at 149-160 and 166-177), the stream keys
omit StreamConfig/alwaysSend so the first subscriber's remote config is reused
by later subscribers; update key generation to include a stable, deterministic
short hash derived from StreamConfig and any relevant flags (e.g., alwaysSend)
OR, alternatively, add runtime validation that a new subscribe call's
config/flags exactly match the active config for that key and throw an error if
they differ; ensure the hash function is stable, small (e.g., base64/hex of a
few bytes) and excludes non-deterministic fields, and apply the same change to
all affected subscribe methods so subscribers cannot silently merge incompatible
configs.

Comment on lines +193 to +217
/// Create a typed subscription that handles reference counting and cleanup.
StreamSubscription<T> _createTypedSubscription<T extends KdfEvent>(
String key,
Stream<T> stream,
) {
// Create a broadcast stream controller to wrap the original stream
// This allows us to properly handle cleanup
final controller = StreamController<T>.broadcast();

final innerSubscription = stream.listen(
controller.add,
onError: controller.addError,
onDone: controller.close,
);

// Wrap the subscription to handle cleanup on cancel
return _ManagedStreamSubscription<T>(
controller.stream.listen(null),
onCancel: () async {
await innerSubscription.cancel();
await controller.close();
await _handleStreamCancelled(key);
},
);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Pause/resume doesn’t back‑pressure upstream; potential buffering/memory growth. Also no ref decrement on upstream onDone.

The returned subscription pauses only the proxy, not innerSubscription. Upstream continues to push into the controller buffer. On upstream completion, _handleStreamCancelled isn’t called.

Apply the following adjustments:

   StreamSubscription<T> _createTypedSubscription<T extends KdfEvent>(
     String key,
     Stream<T> stream,
   ) {
-    final controller = StreamController<T>.broadcast();
-
-    final innerSubscription = stream.listen(
-      controller.add,
-      onError: controller.addError,
-      onDone: controller.close,
-    );
-
-    return _ManagedStreamSubscription<T>(
-      controller.stream.listen(null),
-      onCancel: () async {
-        await innerSubscription.cancel();
-        await controller.close();
-        await _handleStreamCancelled(key);
-      },
-    );
+    final controller = StreamController<T>.broadcast();
+    // Upstream subscription (to eventService)
+    final upstream = stream.listen(
+      controller.add,
+      onError: controller.addError,
+      onDone: () async {
+        await controller.close();
+        await _handleStreamCancelled(key); // decrement refs on natural completion
+      },
+    );
+    // Downstream subscription (returned to caller)
+    final downstream = controller.stream.listen(null);
+    return _ManagedStreamSubscription<T>(
+      downstream,
+      upstream: upstream,
+      onCancel: () async {
+        await downstream.cancel();
+        await upstream.cancel();
+        await controller.close();
+        await _handleStreamCancelled(key);
+      },
+    );
   }

And enhance _ManagedStreamSubscription to propagate pause/resume upstream:

-class _ManagedStreamSubscription<T> implements StreamSubscription<T> {
-  _ManagedStreamSubscription(this._inner, {required this.onCancel});
+class _ManagedStreamSubscription<T> implements StreamSubscription<T> {
+  _ManagedStreamSubscription(this._inner, {required this.onCancel, required this.upstream});
 
   final StreamSubscription<T> _inner;
   final Future<void> Function() onCancel;
+  final StreamSubscription<T> upstream;
@@
   @override
   Future<void> cancel() async {
     await _inner.cancel();
-    await onCancel();
+    await onCancel(); // will also cancel upstream
   }
@@
   @override
   bool get isPaused => _inner.isPaused;
@@
   @override
   void pause([Future<void>? resumeSignal]) {
     _inner.pause(resumeSignal);
+    upstream.pause(resumeSignal);
   }
@@
   @override
   void resume() {
     _inner.resume();
+    upstream.resume();
   }

Also applies to: 298-343

🤖 Prompt for AI Agents
packages/komodo_defi_sdk/lib/src/streaming/event_streaming_manager.dart around
lines 193-217 (and similarly 298-343): the current _createTypedSubscription
wraps the upstream stream with a broadcast controller but does not propagate
pause/resume to the innerSubscription and does not call _handleStreamCancelled
on upstream onDone, which can cause unbounded buffering and missed ref
decrements; modify the function to create the innerSubscription first and pass
it into _ManagedStreamSubscription (or otherwise capture it) so that pause()
calls call innerSubscription.pause(), resume() calls innerSubscription.resume(),
and cancel() still cancels innerSubscription and closes the controller, and
ensure onDone of innerSubscription triggers controller.close() and calls
_handleStreamCancelled(key) (or decrements refs) so upstream completion is
handled; update _ManagedStreamSubscription to accept an optional upstream
Subscription and forward pause/resume/whenPaused/whenResumed to it and ensure
cleanup is idempotent to avoid double-calls.

Comment on lines +96 to +121
void _stopAllStreaming() {
if (_isDisposed) return;

// Cancel all polling timers
// Cancel all transaction history subscriptions
for (final sub in _txHistorySubscriptions.values) {
sub.cancel().ignore();
}
_txHistorySubscriptions.clear();

// Cancel polling timers
for (final timer in _pollingTimers.values) {
timer.cancel();
}
_pollingTimers.clear();

// Cancel confirmations refresh timers
for (final timer in _confirmationsTimers.values) {
timer.cancel();
}
_confirmationsTimers.clear();

for (final sub in _balanceFallbackSubscriptions.values) {
sub.cancel().ignore();
}
_balanceFallbackSubscriptions.clear();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Logout cleanup misses _lastBalanceForPolling reset.

_stopAllStreaming() cancels subs/timers but keeps _lastBalanceForPolling, risking stale balance suppression after account switch.

Apply:

     _balanceFallbackSubscriptions.clear();
 
+    // Clear last-known balances to avoid stale suppression post-logout
+    _lastBalanceForPolling.clear();
+
     // Close controllers in a separate iteration to avoid modification during iteration

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In
packages/komodo_defi_sdk/lib/src/transaction_history/transaction_history_manager.dart
around lines 96-121, _stopAllStreaming cancels subscriptions/timers but doesn't
reset _lastBalanceForPolling, which can cause stale balance suppression after an
account switch; after clearing _balanceFallbackSubscriptions (or at the end of
_stopAllStreaming), reset _lastBalanceForPolling to its initial state (set to
null if the field is nullable, or to the defined default/zero value if
non-nullable) so polling logic won't use stale balance after logout/account
change.

Comment on lines +158 to +161
// Only optimize for genuinely new wallets, not imported ones
final isImported = user.metadata['isImported'] == true;
final isNewWallet = previouslyEnabledAssets.isEmpty && !isImported;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Guard against null metadata to avoid NPE on first-run users.

user.metadata['isImported'] can throw if metadata is null. Use a safe lookup.

Apply:

-        final isImported = user.metadata['isImported'] == true;
+        final isImported =
+            (user.metadata?['isImported'] as bool?) == true;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Only optimize for genuinely new wallets, not imported ones
final isImported = user.metadata['isImported'] == true;
final isNewWallet = previouslyEnabledAssets.isEmpty && !isImported;
// Only optimize for genuinely new wallets, not imported ones
final isImported =
(user.metadata?['isImported'] as bool?) == true;
final isNewWallet = previouslyEnabledAssets.isEmpty && !isImported;
🤖 Prompt for AI Agents
In
packages/komodo_defi_sdk/lib/src/transaction_history/transaction_history_manager.dart
around lines 158 to 161, the code accesses user.metadata['isImported'] directly
which can throw an NPE when metadata is null; change the check to use a
null-safe lookup and a default (e.g. treat missing metadata as not imported)
such as using the null-aware operator or null-coalescing
(user.metadata?['isImported'] == true or (user.metadata?['isImported'] as bool?
?? false)) so isImported is reliably false when metadata is null, preserving the
existing isNewWallet logic.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart (1)

89-91: Consider making debug logging configurable rather than static.

The static mutable enableDebugLogging flag is convenient but could be improved by accepting it as a constructor parameter or reading from configuration. This would make testing and environment-specific logging easier to manage.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2bfb026 and 420cb6e.

📒 Files selected for processing (1)
  • packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart (9 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: build_and_preview_sdk_example_preview
  • GitHub Check: build_and_preview_playground_preview
  • GitHub Check: build_and_deploy_playground
  • GitHub Check: build_and_deploy_sdk_example
  • GitHub Check: Flutter tests (all packages)
🔇 Additional comments (8)
packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart (8)

3-11: New dependencies properly support streaming and optimization features.

The added imports enable precise balance arithmetic (decimal), activation error handling (activation_exceptions), first-time wallet optimization (AssetHistoryStorage), and event streaming (EventStreamingManager). All align with the PR objectives.


67-74: Constructor properly extended for streaming integration.

The required EventStreamingManager and optional AssetHistoryStorage parameters are correctly integrated. Field initialization follows best practices.

Also applies to: 85-86


343-372: First-time wallet optimization correctly implemented; relies on isImported metadata fix.

The logic properly emits an immediate zero balance for newly created (non-imported) wallets on first asset enablement, then fetches the real balance post-activation. This optimization depends on the isImported metadata being consistently set across all wallet creation paths, which is flagged in past review comments for auth_service.dart. Once that fix is applied, this code will work correctly.


405-534: Excellent streaming implementation with comprehensive fallback and error handling.

The balance streaming logic is well-architected:

  • Capability-based routing (streaming vs. polling)
  • Robust fallback mechanism with double-fallback protection
  • Proper resource tracking and lifecycle management
  • Cache updates and downstream notifications (pubkey refresh)
  • Multi-level error handling (try-catch, onError, onDone)

The integration with EventStreamingManager follows best practices, and the fallback to polling ensures reliability when streaming is unavailable.


634-692: Stale-balance guard provides reliability/freshness tradeoff for streaming.

The periodic guard (30s interval) supplements streaming by catching missed events and ensuring balance freshness. This design balances the PR objective of reducing RPC calls with the need for reliability:

  • Eliminated 60s polling replaced by streaming
  • 30s stale-guard runs as backup safety mechanism
  • Only emits on actual balance changes
  • Immediate one-shot refresh on watcher start (line 670)

The tradeoff is acceptable for production reliability. Minor: the unawaited(() async { ... }()) syntax on line 670 is unconventional but functional; consider Future(() async { ... }).ignore() for clarity.


536-619: Polling fallback method is well-structured and handles errors gracefully.

The extracted _startBalancePolling method provides a clean fallback path when streaming is unavailable. Error handling, lifecycle checks, and graceful degradation (returning lastKnown on failure) are all properly implemented.


162-167: Proper timer cleanup prevents resource leaks.

The stale-balance timers are correctly cancelled in both _resetState (on wallet change) and dispose() (on manager cleanup). This prevents timer leaks across wallet sessions and during teardown.

Also applies to: 758-763


378-391: Explicit activation failure handling improves error visibility.

The addition of ActivationFailedException when activation is requested but fails provides clear, typed error feedback to stream consumers. This is a good improvement over silent failures or generic errors.

url: url.toString(),
method: sset.SSERequestType.GET,
header: {
'userpass': cfg.rpcPassword,
Copy link
Copy Markdown

@mariocynicys mariocynicys Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the SSE endpoint is unauthenticated. there is no userpass required. (though ofc u will need the userpass to perform enable requests via normal RPC to enable various streamers)

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (10)
packages/komodo_defi_sdk/lib/src/komodo_defi_sdk.dart (3)

209-218: NftActivationService disposal is still missing.

As previously flagged, NftActivationService is exposed as a public getter but is not included in the disposal sequence (lines 507-523). This will leak resources if the service maintains any state or subscriptions.

Add disposal for NftActivationService:

     await Future.wait([
       _disposeIfRegistered<EventStreamingManager>((m) => m.dispose()),
       _disposeIfRegistered<KomodoDefiLocalAuth>((m) => m.dispose()),
       _disposeIfRegistered<AssetManager>((m) => m.dispose()),
       _disposeIfRegistered<ActivatedAssetsCache>((m) => m.dispose()),
+      _disposeIfRegistered<NftActivationService>((m) => m.dispose()),
       _disposeIfRegistered<ActivationManager>((m) => m.dispose()),

316-371: Wait loop should check disposal state and add jitter.

As previously noted, the polling loop (line 350) does not check if the SDK has been disposed, which can cause the loop to continue unnecessarily after disposal. Additionally, synchronized polling intervals across multiple callers can create request spikes.

Consider these improvements:

   while (true) {
+    _assertNotDisposed(); // Exit fast if disposed
     final enabled = await activatedAssetsCache.getActivatedAssetIds(
       forceRefresh: forceRefresh,
     );
     forceRefresh = false;

     final matched = enabled.intersection(targets).length;
     final coverage = matched / targets.length;
     if (coverage >= threshold) {
       return true;
     }

     if (stopwatch.elapsed >= timeout) {
       return false;
     }

     final remaining = timeout - stopwatch.elapsed;
-    await Future<void>.delayed(
-      remaining < pollInterval ? remaining : pollInterval,
-    );
+    final delay = remaining < pollInterval ? remaining : pollInterval;
+    // Add jitter (0-200ms) to reduce synchronized polling spikes
+    final jitter = delay.inMilliseconds > 200
+        ? Duration(milliseconds: DateTime.now().microsecondsSinceEpoch % 200)
+        : Duration.zero;
+    await Future<void>.delayed(delay + jitter);
   }

373-391: Parameter name "tickers" is misleading—should be "configIds".

As previously identified, the documentation explicitly states "Matches assets by config ID" (line 374), and the implementation uses findAssetsByConfigId (line 383), yet the parameter is named tickers. This creates confusion about what values should be passed.

Rename the parameter to match its actual purpose:

- /// Convenience helper that accepts asset tickers instead of [AssetId]s.
+ /// Convenience helper that accepts asset config IDs instead of [AssetId]s.
  /// Matches assets by config ID (`asset.id.id`) before delegating to
  /// [waitForEnabledAssetsToPassThreshold].
  Future<bool> waitForEnabledTickersToPassThreshold(
-   Iterable<String> tickers, {
+   Iterable<String> configIds, {
    double threshold = 0.5,
    Duration timeout = const Duration(seconds: 30),
    Duration pollInterval = const Duration(seconds: 2),
  }) {
-   final ids = tickers
-       .expand((ticker) => assets.findAssetsByConfigId(ticker))
+   final ids = configIds
+       .expand((configId) => assets.findAssetsByConfigId(configId))
        .map((asset) => asset.id);
    return waitForEnabledAssetsToPassThreshold(
      ids,
      threshold: threshold,
      timeout: timeout,
      pollInterval: pollInterval,
    );
  }

Alternatively, deprecate this method and introduce a correctly-named version to avoid breaking existing callers.

packages/komodo_defi_framework/lib/komodo_defi_framework.dart (2)

110-118: Streaming service still not disposed (duplicate concern)

As flagged in previous reviews, _streamingService is initialized but never cleaned up in dispose() (lines 459-472). This leaves SSE/SharedWorker connections open.

The fix remains the same—add cleanup to dispose():

   Future<void> dispose() async {
     // Cancel subscription first before closing the stream
     await _loggerSub?.cancel();
     _loggerSub = null;

     // Close the log stream
     if (!_logStream.isClosed) {
       await _logStream.close();
     }

+    // Dispose streaming service (SSE/SharedWorker) if initialized
+    final svc = _streamingService;
+    if (svc != null) {
+      await svc.dispose();
+      _streamingService = null;
+    }
+
     // Dispose of KDF operations to free native resources
     final operations = _kdfOperations;
     operations.dispose();
   }

386-387: Full activation params still logged unconditionally (duplicate concern)

Line 387 logs the complete activationParams at fine level, which can expose RPC URLs with embedded API keys, node lists, and other sensitive config. While fine is less visible than info, it's still logged when that level is enabled.

The previous review suggested wrapping behind a config check:

       _logger.info('[ACTIVATION] Parameters: $paramsSummary');

       // Log full activation params for detailed debugging
-      _logger.fine('[ACTIVATION] Full params: $activationParams');
+      if (KdfLoggingConfig.verboseLogging) {
+        _logger.fine('[ACTIVATION] Full params: $activationParams');
+      }

Better yet, sanitize URLs in activationParams before logging (redact query strings and known secret keys).

packages/komodo_defi_framework/lib/src/streaming/event_streaming_service.dart (1)

200-203: Dispose still prevents re-initialization (duplicate concern)

After dispose(), the service cannot be reused because _unsubscribe isn't nulled and _events is closed. The past review suggested either:

  1. Making it reusable, or
  2. Documenting that it's one-shot

To make it reusable:

   Future<void> dispose() async {
     _unsubscribe?.call();
+    _unsubscribe = null;
     await _events.close();
+    // If reuse is desired, recreate the stream controller:
+    // _events = StreamController<KdfEvent>.broadcast();
+    // _connectionState = SseConnectionState.disconnected;
+    // _firstByteCompleter = Completer<void>();
   }

Alternatively, document in the class comment that disposal is final and a new instance must be created.

packages/komodo_defi_sdk/lib/src/streaming/event_streaming_manager.dart (2)

227-240: Stream keys still omit config/alwaysSend, causing shared-config conflicts (duplicate concern)

As flagged previously, keys like 'balance:$coin' (line 231), 'network' (line 314), and 'heartbeat' (line 332) don't include StreamConfig or alwaysSend parameters. When multiple subscribers request the same stream with different configs, the first one's config is used for all.

The past review suggested including a config hash in the key:

-  key: 'balance:$coin',
+  key: 'balance:$coin:${_configHash(config)}',
-  key: 'network',
+  key: 'network:${_configHash(config)}:${alwaysSend == true}',

Where _configHash creates a stable, deterministic string from StreamConfig (or just serialize it). Alternatively, validate that new subscription requests match the active config and throw if they differ.

Also applies to: 310-340


357-381: Stream completion and pause/resume issues remain (duplicate concerns)

Two issues from past reviews are still present:

  1. Line 369: onDone: controller.close doesn't call _handleStreamCancelled, so reference count isn't decremented when upstream completes naturally.

  2. Pause/resume not propagated: _ManagedStreamSubscription doesn't forward pause/resume to innerSubscription, causing unbounded buffering when downstream is paused.

Apply the fix from the previous review:

   StreamSubscription<T> _createTypedSubscription<T extends KdfEvent>(
     String key,
     Stream<T> stream,
   ) {
     final controller = StreamController<T>.broadcast();

     final innerSubscription = stream.listen(
       controller.add,
       onError: controller.addError,
-      onDone: controller.close,
+      onDone: () async {
+        await controller.close();
+        await _handleStreamCancelled(key);
+      },
     );

     return _ManagedStreamSubscription<T>(
       controller.stream.listen(null),
+      upstream: innerSubscription,
       onCancel: () async {
         await innerSubscription.cancel();
         await controller.close();
         await _handleStreamCancelled(key);
       },
     );
   }

And update _ManagedStreamSubscription to propagate pause/resume (see lines 463-507).

packages/komodo_defi_framework/lib/src/streaming/event_streaming_platform_io.dart (2)

100-107: Nullable parameter force-unwrapped will crash

Line 106 force-unwraps hostConfig with !, but the parameter is nullable. This will throw if null is passed.

Fix by making it required:

 EventStreamUnsubscribe connectEventStream({
   required void Function(Object? data) onMessage,
   required void Function() onFirstByte,
-  IKdfHostConfig? hostConfig,
+  required IKdfHostConfig hostConfig,
   int clientId = _kDefaultClientId,
 }) {
-  final IKdfHostConfig cfg = hostConfig!;
+  final IKdfHostConfig cfg = hostConfig;

245-256: Async cleanup in sync return signature

The returned unsubscribe function is typed as void Function() (line 8), but line 250 performs await streamSubscription?.cancel(). Callers can't await cleanup, risking race conditions during disposal.

Either:

  1. Change typedef to typedef EventStreamUnsubscribe = Future<void> Function(); (breaking change), or
  2. Remove await and accept fire-and-forget cleanup:
   return () async {
     if (isClosed) return;
     isClosed = true;
     _log('SSE Disconnect: Closing connection (client_id=$clientId)');
     try {
-      await streamSubscription?.cancel();
+      streamSubscription?.cancel();
       httpClient?.close(force: true);
     } catch (e) {
       _log('SSE Disconnect: Error during cleanup - $e');
     }
   };

Note: The second option may leave subscriptions active briefly but avoids the type mismatch.

🧹 Nitpick comments (2)
packages/komodo_defi_framework/lib/src/streaming/event_streaming_platform_web.dart (1)

29-29: Unused hostConfig parameter

The hostConfig parameter is declared but never used. If web configuration isn't needed, consider removing it for API consistency. If it's planned for future use, document the intent.

packages/komodo_defi_framework/lib/src/streaming/event_streaming_platform_io.dart (1)

127-134: Recursive retry without await could exhaust stack

The retry logic uses unawaited(start()) which schedules the retry but doesn't wait for it. While this prevents blocking, deeply nested retries could theoretically exhaust resources.

Consider a retry loop instead of recursion:

for (int attempt = 0; attempt < maxRetries; attempt++) {
  final preflightOk = await _preflightCheck(cfg);
  if (preflightOk) break;
  if (attempt < maxRetries - 1) {
    await Future.delayed(retryDelay);
  }
}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 420cb6e and c2188c1.

📒 Files selected for processing (8)
  • packages/komodo_defi_framework/lib/komodo_defi_framework.dart (9 hunks)
  • packages/komodo_defi_framework/lib/src/streaming/event_streaming_platform_io.dart (1 hunks)
  • packages/komodo_defi_framework/lib/src/streaming/event_streaming_platform_stub.dart (1 hunks)
  • packages/komodo_defi_framework/lib/src/streaming/event_streaming_platform_web.dart (1 hunks)
  • packages/komodo_defi_framework/lib/src/streaming/event_streaming_service.dart (1 hunks)
  • packages/komodo_defi_framework/pubspec.yaml (1 hunks)
  • packages/komodo_defi_sdk/lib/src/komodo_defi_sdk.dart (5 hunks)
  • packages/komodo_defi_sdk/lib/src/streaming/event_streaming_manager.dart (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • packages/komodo_defi_framework/lib/src/streaming/event_streaming_platform_stub.dart
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: build_and_preview_playground_preview
  • GitHub Check: build_and_preview_sdk_example_preview
  • GitHub Check: Flutter tests (all packages)
  • GitHub Check: setup
🔇 Additional comments (16)
packages/komodo_defi_framework/pubspec.yaml (1)

69-69: Asset path declaration looks good.

Adding assets/web/ enables the event streaming worker script and related web assets to be bundled with the framework package. The placement and format align with the existing asset declarations.

packages/komodo_defi_sdk/lib/src/komodo_defi_sdk.dart (2)

2-2: LGTM: Import additions support new streaming and logging functionality.

The dart:developer import enables logging throughout the SDK, and the event_streaming_manager import provides access to the internal streaming infrastructure needed for disposal at line 508.

Also applies to: 15-15


296-308: LGTM: Streaming service getter provides clean access to event streaming.

The getter follows SDK patterns consistently, includes clear documentation about SSE lifecycle management, and properly enforces initialization checks.

packages/komodo_defi_framework/lib/src/streaming/event_streaming_platform_web.dart (2)

38-40: Silent failure on missing port

Returning a no-op unsubscribe when port is null silently masks a critical failure. The SharedWorker API should always provide a port.

Consider logging or throwing to detect misconfiguration:

   final Object? portMaybe = _getProperty(worker, 'port');
-  if (portMaybe == null) return () {};
+  if (portMaybe == null) {
+    if (kDebugMode) {
+      print('EventStream: SharedWorker port is unexpectedly null');
+    }
+    return () {};
+  }
   final Object port = portMaybe;

43-58: LGTM! Clean first-byte tracking and message forwarding

The implementation correctly tracks the first message to signal connection readiness and forwards all messages to the callback. Good use of debug-only logging.

packages/komodo_defi_framework/lib/src/streaming/event_streaming_platform_io.dart (2)

38-81: Solid preflight check implementation

The preflight verification properly handles timeouts, cleanup, and error cases. Good use of finally block for client disposal.


84-98: LGTM! Proper SSE handshake validation

Correctly validates HTTP status and content-type before proceeding with stream consumption.

packages/komodo_defi_framework/lib/komodo_defi_framework.dart (2)

282-313: Excellent transport error detection and recovery

The errno-based detection of fatal socket errors (EPIPE, ECONNRESET, ETIMEDOUT, ECONNREFUSED) with automatic HTTP client reset is a robust solution for handling KDF crashes or network issues.


48-61: LGTM! Clean initialization pattern

The constructor properly initializes KDF operations and conditionally sets up log streaming. Good separation of concerns.

packages/komodo_defi_framework/lib/src/streaming/event_streaming_service.dart (3)

81-95: Disconnect properly resets state for reconnection

Good improvement! Lines 88 and 92-94 properly reset _unsubscribe and _firstByteCompleter, allowing connectIfNeeded() to establish a new connection after disconnect.


111-151: Robust event parsing with fallback strategies

The two-pass JSON parsing (direct parse, then decode-and-parse) handles both direct JSON objects and string-wrapped payloads. Good defensive error handling.


154-197: LGTM! Clean typed stream API

The generic whereEventType<T> with specific getters provides a type-safe, convenient API for consumers. The task ID filtering is a nice touch.

packages/komodo_defi_sdk/lib/src/streaming/event_streaming_manager.dart (4)

64-104: Solid SSE readiness workflow with timeout and fallback

The first-byte wait with timeout (lines 76-89) plus grace period (lines 92-100) ensures robust connection establishment. Good fallback to longer delay when first byte isn't received.


108-149: Excellent deduplication and in-flight tracking

The in-flight enable tracking (lines 124-148) prevents duplicate enable calls and the finally block ensures cleanup. Good use of reference counting for shared streams.


187-213: Robust UnknownClient error recovery

The forced SSE reconnection (lines 199-210) when KDF forgets the client registration is a strong resilience pattern. Good state cleanup (lines 200-201).


400-423: LGTM! Defensive cleanup on disable errors

Lines 419-421 ensure that even when the disable RPC fails, the local stream state is cleaned up. This prevents leaked state.

@ca333 ca333 merged commit 42ec148 into main Oct 30, 2025
10 of 12 checks passed
@coderabbitai coderabbitai bot mentioned this pull request Oct 31, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants