Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/komodo_defi_sdk/lib/komodo_defi_sdk.dart
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ export 'src/assets/asset_extensions.dart'
export 'src/assets/asset_pubkey_extensions.dart';
export 'src/assets/legacy_asset_extensions.dart';
export 'src/komodo_defi_sdk.dart' show KomodoDefiSdk;
export 'src/transaction_history/transaction_merge_utils.dart'
show TransactionListReconciler, TransactionMergeUtils;
export 'src/widgets/asset_balance_text.dart';
export 'src/zcash_params/models/download_progress.dart';
export 'src/zcash_params/models/download_result.dart';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,18 @@ class ActivationNetworkException extends ActivationFailedException {
return 'ActivationNetworkException: Asset ${assetId.name} activation failed due to network issues: $message';
}
}

/// Exception thrown when an activation is cancelled by caller or lifecycle.
class ActivationCancelledException extends ActivationFailedException {
const ActivationCancelledException({
required super.assetId,
required super.message,
super.errorCode = 'ACTIVATION_CANCELLED',
super.originalError,
});

@override
String toString() {
return 'ActivationCancelledException: Asset ${assetId.name} activation cancelled: $message';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class ActivationManager {
static const SdkErrorMapper _errorMapper = SdkErrorMapper();

final Map<AssetId, Completer<void>> _activationCompleters = {};
final Map<AssetId, String> _cancelledActivations = <AssetId, String>{};
bool _isDisposed = false;

/// Helper for mutex-protected operations with timeout
Expand All @@ -56,6 +57,35 @@ class ActivationManager {
Stream<ActivationProgress> activateAsset(Asset asset) =>
activateAssets([asset]);

/// Request cancellation of an in-flight activation for [assetId].
///
/// Cancellation is best-effort. The current activation stream is terminated
/// at the next progress boundary and emits an error completion state.
void cancelActivation(
AssetId assetId, {
String reason = 'Activation cancelled by caller',
}) {
if (_isDisposed) return;
// Only record cancellation for activations that are currently in-flight.
// This avoids stale cancellation markers cancelling future fresh attempts.
if (!_activationCompleters.containsKey(assetId)) {
_cancelledActivations.remove(assetId);
return;
}
_cancelledActivations[assetId] = reason;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Ignore stale cancel requests when no activation is running

cancelActivation stores a cancellation token unconditionally, even if the asset has no in-flight activation. If a caller invokes cancel slightly after completion (or before a future retry), the next activateAssets call will hit the pre-check and emit ACTIVATION_CANCELLED immediately, cancelling a brand-new activation unexpectedly. This contradicts the in-flight-only contract and creates hard-to-reproduce false failures on later retries.

Useful? React with 👍 / 👎.

}

/// Request cancellation for all in-flight activations.
void cancelAllActivations({
String reason = 'Activation cancelled by caller',
}) {
if (_isDisposed) return;
final pendingIds = _activationCompleters.keys.toList();
for (final assetId in pendingIds) {
_cancelledActivations[assetId] = reason;
}
}

/// Activate multiple assets
Stream<ActivationProgress> activateAssets(List<Asset> assets) async* {
if (_isDisposed) {
Expand All @@ -65,6 +95,18 @@ class ActivationManager {
final groups = _AssetGroup._groupByPrimary(assets);

for (final group in groups) {
if (_cancelledActivations.containsKey(group.primary.id)) {
final reason =
_cancelledActivations[group.primary.id] ??
'Activation cancelled by caller';
yield ActivationProgress.error(
message: reason,
errorCode: 'ACTIVATION_CANCELLED',
);
_cancelledActivations.remove(group.primary.id);
continue;
}

// Check activation status atomically
final activationStatus = await _checkActivationStatus(group);
if (activationStatus.isComplete) {
Expand Down Expand Up @@ -117,6 +159,24 @@ class ActivationManager {
parentAsset ?? group.primary,
group.children?.toList(),
)) {
if (_cancelledActivations.containsKey(group.primary.id)) {
final reason =
_cancelledActivations[group.primary.id] ??
'Activation cancelled by caller';
final cancellationError = ActivationCancelledException(
assetId: group.primary.id,
message: reason,
);
if (!primaryCompleter.isCompleted) {
primaryCompleter.completeError(cancellationError);
}
yield ActivationProgress.error(
message: reason,
errorCode: 'ACTIVATION_CANCELLED',
);
break;
}

yield _attachSdkError(progress, group.primary.id);

if (progress.isComplete) {
Expand Down Expand Up @@ -149,10 +209,7 @@ class ActivationManager {
}

final errorMessage = progress.errorMessage ?? 'Activation failed';
final sdkError = _mapError(
errorMessage,
assetId,
);
final sdkError = _mapError(errorMessage, assetId);

return progress.copyWith(
errorMessage: sdkError.fallbackMessage,
Expand All @@ -163,18 +220,16 @@ class ActivationManager {
SdkError _mapError(Object error, AssetId assetId) {
return _errorMapper.map(
error,
context: SdkErrorContext(
operation: 'activation',
assetId: assetId.id,
),
context: SdkErrorContext(operation: 'activation', assetId: assetId.id),
);
}

/// Check if asset and its children are already activated
Future<ActivationProgress> _checkActivationStatus(_AssetGroup group) async {
try {
// Use cache instead of direct RPC call to avoid excessive requests
final enabledAssetIds = await _activatedAssetsCache.getActivatedAssetIds();
final enabledAssetIds = await _activatedAssetsCache
.getActivatedAssetIds();

final isActive = enabledAssetIds.contains(group.primary.id);
final childrenActive =
Expand Down Expand Up @@ -264,6 +319,7 @@ class ActivationManager {
Future<void> _cleanupActivation(AssetId assetId) async {
await _protectedOperation(() async {
_activationCompleters.remove(assetId);
_cancelledActivations.remove(assetId);
});
}

Expand Down Expand Up @@ -319,6 +375,7 @@ class ActivationManager {
}

_activationCompleters.clear();
_cancelledActivations.clear();
});
}
}
Expand Down
18 changes: 18 additions & 0 deletions packages/komodo_defi_sdk/lib/src/assets/asset_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,24 @@ class AssetManager implements IAssetProvider {
Stream<ActivationProgress> activateAssets(List<Asset> assets) =>
_activationManager().activateAssets(assets);

/// Requests cancellation of an in-flight activation for [assetId].
///
/// Cancellation is best-effort and will be observed on the next activation
/// progress boundary.
void cancelActivation(
AssetId assetId, {
String reason = 'Activation cancelled by caller',
}) {
_activationManager().cancelActivation(assetId, reason: reason);
}

/// Requests cancellation for all in-flight activation tasks.
void cancelAllActivations({
String reason = 'Activation cancelled by caller',
}) {
_activationManager().cancelAllActivations(reason: reason);
}

/// Disposes of the asset manager, cleaning up resources.
///
/// This is called automatically by the SDK when disposing.
Expand Down
33 changes: 30 additions & 3 deletions packages/komodo_defi_sdk/lib/src/balances/balance_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ abstract class IBalanceManager {
bool activateIfNeeded = true,
});

/// Returns whether [assetId] currently has an active watcher subscription.
bool hasActiveWatcher(AssetId assetId);

/// Counts how many [assetIds] do not currently have active watcher coverage.
int countMissingWatchersForAssets(Iterable<AssetId> assetIds);

/// Returns true when any asset in [assetIds] lacks active watcher coverage.
bool hasMissingWatchersForAssets(Iterable<AssetId> assetIds);

/// Gets the last known balance for an asset without triggering a refresh.
/// Returns null if no balance has been fetched yet.
BalanceInfo? lastKnown(AssetId assetId);
Expand Down Expand Up @@ -117,6 +126,26 @@ class BalanceManager implements IBalanceManager {
/// Getter for pubkeyManager to make it accessible
PubkeyManager? get pubkeyManager => _pubkeyManager;

@override
bool hasActiveWatcher(AssetId assetId) {
if (_isDisposed) return false;
return _activeWatchers.containsKey(assetId);
}

@override
int countMissingWatchersForAssets(Iterable<AssetId> assetIds) {
if (_isDisposed) return assetIds.toSet().length;
final uniqueIds = assetIds.toSet();
return uniqueIds
.where((assetId) => !_activeWatchers.containsKey(assetId))
.length;
}

@override
bool hasMissingWatchersForAssets(Iterable<AssetId> assetIds) {
return countMissingWatchersForAssets(assetIds) > 0;
}

/// Setter for activationCoordinator to resolve circular dependencies
void setActivationCoordinator(SharedActivationCoordinator coordinator) {
_activationCoordinator = coordinator;
Expand Down Expand Up @@ -837,9 +866,7 @@ class BalanceManager implements IBalanceManager {

// Snapshot controllers and close all concurrently; swallow errors
final List<StreamSubscription<AssetPubkeys>> pubkeyHintSubs =
List<StreamSubscription<AssetPubkeys>>.from(
_pubkeyHintWatchers.values,
);
List<StreamSubscription<AssetPubkeys>>.from(_pubkeyHintWatchers.values);
_pubkeyHintWatchers.clear();
for (final StreamSubscription<AssetPubkeys> sub in pubkeyHintSubs) {
cancelFutures.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ library _transaction_history;
export 'strategies/etherscan_transaction_history_strategy.dart';
export 'strategies/zhtlc_transaction_strategy.dart';
export 'transaction_history_manager.dart';
export 'transaction_merge_utils.dart';
export 'transaction_history_strategies.dart';
export 'transaction_storage.dart';
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ abstract interface class _TransactionHistoryManager {
/// with the initial batch from storage and the latest transactions from
/// the API. The stream will close when the latest transaction is reached.
Stream<List<Transaction>> getTransactionsStreamed(Asset asset);

/// High-level merged history stream for UI list consumption.
///
/// This stream handles update reconciliation, pending->confirmed bridging,
/// and stable ordering internally.
Stream<List<Transaction>> watchTransactionHistoryMerged(
Asset asset, {
Transaction Function(Transaction transaction)? transform,
});
}

class TransactionHistoryManager implements _TransactionHistoryManager {
Expand Down Expand Up @@ -336,6 +345,35 @@ class TransactionHistoryManager implements _TransactionHistoryManager {
}
}

@override
Stream<List<Transaction>> watchTransactionHistoryMerged(
Asset asset, {
Transaction Function(Transaction transaction)? transform,
}) async* {
final reconciler = TransactionListReconciler();
var merged = <Transaction>[];
var emittedInitial = false;

await for (final batch in getTransactionsStreamed(asset)) {
final incoming = transform == null
? batch
: batch.map(transform).toList(growable: false);
merged = reconciler.merge(existing: merged, incoming: incoming);
emittedInitial = true;
yield List<Transaction>.unmodifiable(merged);
}

if (!emittedInitial) {
yield const <Transaction>[];
}

await for (final transaction in watchTransactions(asset)) {
final normalized = transform?.call(transaction) ?? transaction;
merged = reconciler.merge(existing: merged, incoming: [normalized]);
yield List<Transaction>.unmodifiable(merged);
}
}

@override
Stream<Transaction> watchTransactions(Asset asset) {
if (_isDisposed) {
Expand Down
Loading
Loading