diff --git a/packages/komodo_defi_sdk/lib/komodo_defi_sdk.dart b/packages/komodo_defi_sdk/lib/komodo_defi_sdk.dart index 22ea5671b..b612cae13 100644 --- a/packages/komodo_defi_sdk/lib/komodo_defi_sdk.dart +++ b/packages/komodo_defi_sdk/lib/komodo_defi_sdk.dart @@ -59,6 +59,8 @@ export 'src/assets/asset_extensions.dart' export 'src/assets/asset_pubkey_extensions.dart'; export 'src/assets/legacy_asset_extensions.dart'; export 'src/komodo_defi_sdk.dart' show KomodoDefiSdk; +export 'src/transaction_history/transaction_merge_utils.dart' + show TransactionListReconciler, TransactionMergeUtils; export 'src/widgets/asset_balance_text.dart'; export 'src/zcash_params/models/download_progress.dart'; export 'src/zcash_params/models/download_result.dart'; diff --git a/packages/komodo_defi_sdk/lib/src/activation/activation_exceptions.dart b/packages/komodo_defi_sdk/lib/src/activation/activation_exceptions.dart index fb7420111..6a0237d0e 100644 --- a/packages/komodo_defi_sdk/lib/src/activation/activation_exceptions.dart +++ b/packages/komodo_defi_sdk/lib/src/activation/activation_exceptions.dart @@ -70,3 +70,18 @@ class ActivationNetworkException extends ActivationFailedException { return 'ActivationNetworkException: Asset ${assetId.name} activation failed due to network issues: $message'; } } + +/// Exception thrown when an activation is cancelled by caller or lifecycle. +class ActivationCancelledException extends ActivationFailedException { + const ActivationCancelledException({ + required super.assetId, + required super.message, + super.errorCode = 'ACTIVATION_CANCELLED', + super.originalError, + }); + + @override + String toString() { + return 'ActivationCancelledException: Asset ${assetId.name} activation cancelled: $message'; + } +} diff --git a/packages/komodo_defi_sdk/lib/src/activation/activation_manager.dart b/packages/komodo_defi_sdk/lib/src/activation/activation_manager.dart index 09091da0b..fd061e571 100644 --- a/packages/komodo_defi_sdk/lib/src/activation/activation_manager.dart +++ b/packages/komodo_defi_sdk/lib/src/activation/activation_manager.dart @@ -39,6 +39,7 @@ class ActivationManager { static const SdkErrorMapper _errorMapper = SdkErrorMapper(); final Map> _activationCompleters = {}; + final Map _cancelledActivations = {}; bool _isDisposed = false; /// Helper for mutex-protected operations with timeout @@ -56,6 +57,35 @@ class ActivationManager { Stream activateAsset(Asset asset) => activateAssets([asset]); + /// Request cancellation of an in-flight activation for [assetId]. + /// + /// Cancellation is best-effort. The current activation stream is terminated + /// at the next progress boundary and emits an error completion state. + void cancelActivation( + AssetId assetId, { + String reason = 'Activation cancelled by caller', + }) { + if (_isDisposed) return; + // Only record cancellation for activations that are currently in-flight. + // This avoids stale cancellation markers cancelling future fresh attempts. + if (!_activationCompleters.containsKey(assetId)) { + _cancelledActivations.remove(assetId); + return; + } + _cancelledActivations[assetId] = reason; + } + + /// Request cancellation for all in-flight activations. + void cancelAllActivations({ + String reason = 'Activation cancelled by caller', + }) { + if (_isDisposed) return; + final pendingIds = _activationCompleters.keys.toList(); + for (final assetId in pendingIds) { + _cancelledActivations[assetId] = reason; + } + } + /// Activate multiple assets Stream activateAssets(List assets) async* { if (_isDisposed) { @@ -65,6 +95,18 @@ class ActivationManager { final groups = _AssetGroup._groupByPrimary(assets); for (final group in groups) { + if (_cancelledActivations.containsKey(group.primary.id)) { + final reason = + _cancelledActivations[group.primary.id] ?? + 'Activation cancelled by caller'; + yield ActivationProgress.error( + message: reason, + errorCode: 'ACTIVATION_CANCELLED', + ); + _cancelledActivations.remove(group.primary.id); + continue; + } + // Check activation status atomically final activationStatus = await _checkActivationStatus(group); if (activationStatus.isComplete) { @@ -117,6 +159,24 @@ class ActivationManager { parentAsset ?? group.primary, group.children?.toList(), )) { + if (_cancelledActivations.containsKey(group.primary.id)) { + final reason = + _cancelledActivations[group.primary.id] ?? + 'Activation cancelled by caller'; + final cancellationError = ActivationCancelledException( + assetId: group.primary.id, + message: reason, + ); + if (!primaryCompleter.isCompleted) { + primaryCompleter.completeError(cancellationError); + } + yield ActivationProgress.error( + message: reason, + errorCode: 'ACTIVATION_CANCELLED', + ); + break; + } + yield _attachSdkError(progress, group.primary.id); if (progress.isComplete) { @@ -149,10 +209,7 @@ class ActivationManager { } final errorMessage = progress.errorMessage ?? 'Activation failed'; - final sdkError = _mapError( - errorMessage, - assetId, - ); + final sdkError = _mapError(errorMessage, assetId); return progress.copyWith( errorMessage: sdkError.fallbackMessage, @@ -163,10 +220,7 @@ class ActivationManager { SdkError _mapError(Object error, AssetId assetId) { return _errorMapper.map( error, - context: SdkErrorContext( - operation: 'activation', - assetId: assetId.id, - ), + context: SdkErrorContext(operation: 'activation', assetId: assetId.id), ); } @@ -174,7 +228,8 @@ class ActivationManager { Future _checkActivationStatus(_AssetGroup group) async { try { // Use cache instead of direct RPC call to avoid excessive requests - final enabledAssetIds = await _activatedAssetsCache.getActivatedAssetIds(); + final enabledAssetIds = await _activatedAssetsCache + .getActivatedAssetIds(); final isActive = enabledAssetIds.contains(group.primary.id); final childrenActive = @@ -264,6 +319,7 @@ class ActivationManager { Future _cleanupActivation(AssetId assetId) async { await _protectedOperation(() async { _activationCompleters.remove(assetId); + _cancelledActivations.remove(assetId); }); } @@ -319,6 +375,7 @@ class ActivationManager { } _activationCompleters.clear(); + _cancelledActivations.clear(); }); } } diff --git a/packages/komodo_defi_sdk/lib/src/assets/asset_manager.dart b/packages/komodo_defi_sdk/lib/src/assets/asset_manager.dart index ea19febfc..0015fd552 100644 --- a/packages/komodo_defi_sdk/lib/src/assets/asset_manager.dart +++ b/packages/komodo_defi_sdk/lib/src/assets/asset_manager.dart @@ -214,6 +214,24 @@ class AssetManager implements IAssetProvider { Stream activateAssets(List assets) => _activationManager().activateAssets(assets); + /// Requests cancellation of an in-flight activation for [assetId]. + /// + /// Cancellation is best-effort and will be observed on the next activation + /// progress boundary. + void cancelActivation( + AssetId assetId, { + String reason = 'Activation cancelled by caller', + }) { + _activationManager().cancelActivation(assetId, reason: reason); + } + + /// Requests cancellation for all in-flight activation tasks. + void cancelAllActivations({ + String reason = 'Activation cancelled by caller', + }) { + _activationManager().cancelAllActivations(reason: reason); + } + /// Disposes of the asset manager, cleaning up resources. /// /// This is called automatically by the SDK when disposing. diff --git a/packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart b/packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart index db4738878..fe2d2b04b 100644 --- a/packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart +++ b/packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart @@ -37,6 +37,15 @@ abstract class IBalanceManager { bool activateIfNeeded = true, }); + /// Returns whether [assetId] currently has an active watcher subscription. + bool hasActiveWatcher(AssetId assetId); + + /// Counts how many [assetIds] do not currently have active watcher coverage. + int countMissingWatchersForAssets(Iterable assetIds); + + /// Returns true when any asset in [assetIds] lacks active watcher coverage. + bool hasMissingWatchersForAssets(Iterable assetIds); + /// Gets the last known balance for an asset without triggering a refresh. /// Returns null if no balance has been fetched yet. BalanceInfo? lastKnown(AssetId assetId); @@ -117,6 +126,26 @@ class BalanceManager implements IBalanceManager { /// Getter for pubkeyManager to make it accessible PubkeyManager? get pubkeyManager => _pubkeyManager; + @override + bool hasActiveWatcher(AssetId assetId) { + if (_isDisposed) return false; + return _activeWatchers.containsKey(assetId); + } + + @override + int countMissingWatchersForAssets(Iterable assetIds) { + if (_isDisposed) return assetIds.toSet().length; + final uniqueIds = assetIds.toSet(); + return uniqueIds + .where((assetId) => !_activeWatchers.containsKey(assetId)) + .length; + } + + @override + bool hasMissingWatchersForAssets(Iterable assetIds) { + return countMissingWatchersForAssets(assetIds) > 0; + } + /// Setter for activationCoordinator to resolve circular dependencies void setActivationCoordinator(SharedActivationCoordinator coordinator) { _activationCoordinator = coordinator; @@ -837,9 +866,7 @@ class BalanceManager implements IBalanceManager { // Snapshot controllers and close all concurrently; swallow errors final List> pubkeyHintSubs = - List>.from( - _pubkeyHintWatchers.values, - ); + List>.from(_pubkeyHintWatchers.values); _pubkeyHintWatchers.clear(); for (final StreamSubscription sub in pubkeyHintSubs) { cancelFutures.add( diff --git a/packages/komodo_defi_sdk/lib/src/transaction_history/_transaction_history_index.dart b/packages/komodo_defi_sdk/lib/src/transaction_history/_transaction_history_index.dart index e0210c00d..21d427799 100644 --- a/packages/komodo_defi_sdk/lib/src/transaction_history/_transaction_history_index.dart +++ b/packages/komodo_defi_sdk/lib/src/transaction_history/_transaction_history_index.dart @@ -6,5 +6,6 @@ library _transaction_history; export 'strategies/etherscan_transaction_history_strategy.dart'; export 'strategies/zhtlc_transaction_strategy.dart'; export 'transaction_history_manager.dart'; +export 'transaction_merge_utils.dart'; export 'transaction_history_strategies.dart'; export 'transaction_storage.dart'; diff --git a/packages/komodo_defi_sdk/lib/src/transaction_history/transaction_history_manager.dart b/packages/komodo_defi_sdk/lib/src/transaction_history/transaction_history_manager.dart index 8383ce631..a3f09b494 100644 --- a/packages/komodo_defi_sdk/lib/src/transaction_history/transaction_history_manager.dart +++ b/packages/komodo_defi_sdk/lib/src/transaction_history/transaction_history_manager.dart @@ -32,6 +32,15 @@ abstract interface class _TransactionHistoryManager { /// with the initial batch from storage and the latest transactions from /// the API. The stream will close when the latest transaction is reached. Stream> getTransactionsStreamed(Asset asset); + + /// High-level merged history stream for UI list consumption. + /// + /// This stream handles update reconciliation, pending->confirmed bridging, + /// and stable ordering internally. + Stream> watchTransactionHistoryMerged( + Asset asset, { + Transaction Function(Transaction transaction)? transform, + }); } class TransactionHistoryManager implements _TransactionHistoryManager { @@ -336,6 +345,35 @@ class TransactionHistoryManager implements _TransactionHistoryManager { } } + @override + Stream> watchTransactionHistoryMerged( + Asset asset, { + Transaction Function(Transaction transaction)? transform, + }) async* { + final reconciler = TransactionListReconciler(); + var merged = []; + var emittedInitial = false; + + await for (final batch in getTransactionsStreamed(asset)) { + final incoming = transform == null + ? batch + : batch.map(transform).toList(growable: false); + merged = reconciler.merge(existing: merged, incoming: incoming); + emittedInitial = true; + yield List.unmodifiable(merged); + } + + if (!emittedInitial) { + yield const []; + } + + await for (final transaction in watchTransactions(asset)) { + final normalized = transform?.call(transaction) ?? transaction; + merged = reconciler.merge(existing: merged, incoming: [normalized]); + yield List.unmodifiable(merged); + } + } + @override Stream watchTransactions(Asset asset) { if (_isDisposed) { diff --git a/packages/komodo_defi_sdk/lib/src/transaction_history/transaction_merge_utils.dart b/packages/komodo_defi_sdk/lib/src/transaction_history/transaction_merge_utils.dart new file mode 100644 index 000000000..693c7a99b --- /dev/null +++ b/packages/komodo_defi_sdk/lib/src/transaction_history/transaction_merge_utils.dart @@ -0,0 +1,191 @@ +import 'package:collection/collection.dart'; +import 'package:komodo_defi_types/komodo_defi_types.dart'; + +/// Shared helpers for reconciling transaction lifecycle updates in clients. +/// +/// This utility keeps identity stable by internal transaction ID while still +/// supporting a Tendermint-specific pending->confirmed bridge when a matching +/// event is re-emitted with a different internal ID. +class TransactionMergeUtils { + TransactionMergeUtils._(); + + static const ListEquality _listEquality = ListEquality(); + + /// Canonical lifecycle key for transaction list reconciliation. + static String transactionKey(Transaction transaction) { + return transaction.internalId; + } + + /// Merge commonly-updated fields from [incoming] into [existing]. + static Transaction mergeTransactionFields( + Transaction existing, + Transaction incoming, + ) { + return existing.copyWith( + confirmations: incoming.confirmations, + blockHeight: incoming.blockHeight, + fee: incoming.fee ?? existing.fee, + memo: incoming.memo ?? existing.memo, + timestamp: incoming.timestamp, + ); + } + + static bool isTendermintAsset(AssetId assetId) { + return assetId.subClass == CoinSubClass.tendermint || + assetId.subClass == CoinSubClass.tendermintToken; + } + + static bool isConfirmed(Transaction transaction) { + return transaction.confirmations > 0 || transaction.blockHeight > 0; + } + + static bool isPending(Transaction transaction) { + return transaction.confirmations <= 0 && transaction.blockHeight == 0; + } + + static bool matchesTransferFingerprint( + Transaction first, + Transaction second, + ) { + return _listEquality.equals(first.from, second.from) && + _listEquality.equals(first.to, second.to) && + first.balanceChanges.netChange == second.balanceChanges.netChange && + first.balanceChanges.totalAmount == second.balanceChanges.totalAmount; + } + + /// Finds a pending transaction key that should be replaced by [incoming]. + /// + /// This bridge is intentionally limited to Tendermint/TendermintToken assets + /// with exactly one fingerprint match to avoid collapsing distinct transfers + /// that share the same hash. + static String? findPendingReplacementKey({ + required Map byKey, + required Transaction incoming, + }) { + if (!isTendermintAsset(incoming.assetId) || !isConfirmed(incoming)) { + return null; + } + + final txHash = incoming.txHash; + if (txHash == null || txHash.isEmpty) { + return null; + } + + final matchingEntries = byKey.entries.where((entry) { + final existing = entry.value; + return existing.assetId.isSameAsset(incoming.assetId) && + isPending(existing) && + existing.txHash == txHash && + matchesTransferFingerprint(existing, incoming); + }).toList(); + + if (matchingEntries.length != 1) { + return null; + } + + return matchingEntries.first.key; + } +} + +/// Stateful reconciler for merging transaction update batches into a list view. +class TransactionListReconciler { + final Map _firstSeenAtByInternalId = {}; + + /// Clears internal ordering state. + void reset() => _firstSeenAtByInternalId.clear(); + + /// Merges [incoming] updates into [existing] and returns sorted results. + List merge({ + required List existing, + required Iterable incoming, + }) { + final byKey = { + for (final tx in existing) TransactionMergeUtils.transactionKey(tx): tx, + }; + + for (final tx in incoming) { + _mergeInPlace(byKey, tx); + _firstSeenAtByInternalId.putIfAbsent( + tx.internalId, + () => tx.timestamp.millisecondsSinceEpoch != 0 + ? tx.timestamp + : DateTime.now(), + ); + } + + final merged = byKey.values.toList()..sort(_compareTransactions); + return merged; + } + + void _mergeInPlace(Map byKey, Transaction incoming) { + final incomingKey = TransactionMergeUtils.transactionKey(incoming); + final existing = byKey[incomingKey]; + + if (existing != null) { + byKey[incomingKey] = TransactionMergeUtils.mergeTransactionFields( + existing, + incoming, + ); + return; + } + + final pendingReplacementKey = + TransactionMergeUtils.findPendingReplacementKey( + byKey: byKey, + incoming: incoming, + ); + + if (pendingReplacementKey != null) { + final pending = byKey.remove(pendingReplacementKey); + if (pending != null) { + final mergedPending = TransactionMergeUtils.mergeTransactionFields( + pending, + incoming, + ).copyWith(internalId: incoming.internalId); + + final pendingFirstSeen = _firstSeenAtByInternalId.remove( + pendingReplacementKey, + ); + if (pendingFirstSeen != null) { + _firstSeenAtByInternalId.putIfAbsent( + incoming.internalId, + () => pendingFirstSeen, + ); + } + + byKey[incomingKey] = mergedPending; + return; + } + } + + byKey[incomingKey] = incoming; + } + + int _compareTransactions(Transaction left, Transaction right) { + final unconfirmedTimestamp = DateTime.fromMillisecondsSinceEpoch(0); + final leftIsUnconfirmed = left.timestamp == unconfirmedTimestamp; + final rightIsUnconfirmed = right.timestamp == unconfirmedTimestamp; + + if (leftIsUnconfirmed && rightIsUnconfirmed) { + final leftFirstSeen = + _firstSeenAtByInternalId[left.internalId] ?? unconfirmedTimestamp; + final rightFirstSeen = + _firstSeenAtByInternalId[right.internalId] ?? unconfirmedTimestamp; + final compareByFirstSeen = rightFirstSeen.compareTo(leftFirstSeen); + if (compareByFirstSeen != 0) { + return compareByFirstSeen; + } + return right.internalId.compareTo(left.internalId); + } + + if (leftIsUnconfirmed) { + return -1; + } + + if (rightIsUnconfirmed) { + return 1; + } + + return right.timestamp.compareTo(left.timestamp); + } +}