diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/perf/Azure.Storage.DataMovement.Blobs.Perf/Infrastructure/DirectoryTransferTest.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/perf/Azure.Storage.DataMovement.Blobs.Perf/Infrastructure/DirectoryTransferTest.cs index e4171485e7a8..61d95c751c25 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/perf/Azure.Storage.DataMovement.Blobs.Perf/Infrastructure/DirectoryTransferTest.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/perf/Azure.Storage.DataMovement.Blobs.Perf/Infrastructure/DirectoryTransferTest.cs @@ -34,6 +34,12 @@ public DirectoryTransferTest(TOptions options) : base(options) _transferManager = new TransferManager(managerOptions); } + public override async Task CleanupAsync() + { + await ((IAsyncDisposable)_transferManager).DisposeAsync(); + await base.CleanupAsync(); + } + protected string CreateLocalDirectory(bool populate = false) { string directory = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName()); diff --git a/sdk/storage/Azure.Storage.DataMovement/src/ChannelProcessing.cs b/sdk/storage/Azure.Storage.DataMovement/src/ChannelProcessing.cs index 172fa7e6b847..6a0b165aa8e7 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/ChannelProcessing.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/ChannelProcessing.cs @@ -91,6 +91,9 @@ public async Task CleanUpAsync() { _channel.Writer.TryComplete(); await _processorTaskCompletionSource.Task.ConfigureAwait(false); + + // Null out the Process delegate to release references + Interlocked.Exchange(ref _process, null); } protected abstract ValueTask NotifyOfPendingItemProcessing(); diff --git a/sdk/storage/Azure.Storage.DataMovement/src/CommitChunkHandler.cs b/sdk/storage/Azure.Storage.DataMovement/src/CommitChunkHandler.cs index fc5ec066de68..34a4d2dd560d 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/CommitChunkHandler.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/CommitChunkHandler.cs @@ -83,10 +83,10 @@ public CommitChunkHandler( _isChunkHandlerRunning = true; } - public Task CleanUpAsync() + public async Task CleanUpAsync() { _isChunkHandlerRunning = false; - return _stageChunkProcessor.CleanUpAsync(); + await _stageChunkProcessor.CleanUpAsync().ConfigureAwait(false); } public async ValueTask QueueChunkAsync(QueueStageChunkArgs args, CancellationToken cancellationToken = default) @@ -137,7 +137,21 @@ await _queuePutBlockTask( if (_isChunkHandlerRunning) { // This will trigger the job part to call Dispose on this object - _ = Task.Run(() => _invokeFailedEventHandler(ex)); + _ = Task.Run(async () => + { + try + { + await _invokeFailedEventHandler(ex).ConfigureAwait(false); + } + catch + { + // Log and swallow any exceptions to prevent crashing the process + DataMovementEventSource.Singleton + .UnexpectedTransferFailed( + nameof(CommitChunkHandler), + ex.ToString()); + } + }); } } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/DataMovementEventSource.cs b/sdk/storage/Azure.Storage.DataMovement/src/DataMovementEventSource.cs index 6c65c77b230b..6acdcc982be3 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/DataMovementEventSource.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/DataMovementEventSource.cs @@ -16,6 +16,7 @@ internal class DataMovementEventSource : AzureEventSource private const int EnumerationCompleteEvent = 4; private const int ResumeTransferEvent = 5; private const int ResumeEnumerationCompleteEvent = 6; + private const int UnexpectedTransferFailedEvent = 7; private DataMovementEventSource() : base(EventSourceName) { } @@ -92,5 +93,11 @@ public void ResumeEnumerationComplete(string transferId, int jobPartCount) { WriteEvent(ResumeEnumerationCompleteEvent, transferId, jobPartCount); } + + [Event(UnexpectedTransferFailedEvent, Level = EventLevel.Error, Message = "Transfer [{0}] Transfer failed: {1}")] + public void UnexpectedTransferFailed(string transferId, string errorMessage) + { + WriteEvent(UnexpectedTransferFailedEvent, transferId, errorMessage); + } } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs b/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs index 06c8e225f008..21445576e790 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs @@ -136,7 +136,21 @@ await _copyToDestinationFile( if (_isChunkHandlerRunning) { // This will trigger the job part to call Dispose on this object - _ = Task.Run(() => _invokeFailedEventHandler(ex)); + _ = Task.Run(async () => + { + try + { + await _invokeFailedEventHandler(ex).ConfigureAwait(false); + } + catch + { + // Log and swallow any exceptions to prevent crashing the process + DataMovementEventSource.Singleton + .UnexpectedTransferFailed( + nameof(CommitChunkHandler), + ex.ToString()); + } + }); } } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/TransferInternalState.cs b/sdk/storage/Azure.Storage.DataMovement/src/TransferInternalState.cs index dcbfed107715..c2d898b32faa 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/TransferInternalState.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/TransferInternalState.cs @@ -107,6 +107,9 @@ public bool SetTransferState(TransferState state) // Tell the transfer manager to clean up the completed/paused job. TransferManager.TryRemoveTransfer(_id); + // Remove Transfer Manager reference after no longer needed. + TransferManager = null; + // Once we reach a Completed/Paused, Dispose the CancellationTokenSource to release resources (since it is no longer needed). DisposeCancellationTokenSource(); }