Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ public DirectoryTransferTest(TOptions options) : base(options)
_transferManager = new TransferManager(managerOptions);
}

public override async Task CleanupAsync()
{
await ((IAsyncDisposable)_transferManager).DisposeAsync();
await base.CleanupAsync();
}

protected string CreateLocalDirectory(bool populate = false)
{
string directory = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ public async Task CleanUpAsync()
{
_channel.Writer.TryComplete();
await _processorTaskCompletionSource.Task.ConfigureAwait(false);

// Null out the Process delegate to release references
Interlocked.Exchange(ref _process, null);
}

protected abstract ValueTask NotifyOfPendingItemProcessing();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ public CommitChunkHandler(
_isChunkHandlerRunning = true;
}

public Task CleanUpAsync()
public async Task CleanUpAsync()
{
_isChunkHandlerRunning = false;
return _stageChunkProcessor.CleanUpAsync();
await _stageChunkProcessor.CleanUpAsync().ConfigureAwait(false);
}

public async ValueTask QueueChunkAsync(QueueStageChunkArgs args, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -137,7 +137,21 @@ await _queuePutBlockTask(
if (_isChunkHandlerRunning)
{
// This will trigger the job part to call Dispose on this object
_ = Task.Run(() => _invokeFailedEventHandler(ex));
_ = Task.Run(async () =>
{
try
{
await _invokeFailedEventHandler(ex).ConfigureAwait(false);
}
catch
{
// Log and swallow any exceptions to prevent crashing the process
DataMovementEventSource.Singleton
.UnexpectedTransferFailed(
nameof(CommitChunkHandler),
ex.ToString());
}
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ internal class DataMovementEventSource : AzureEventSource
private const int EnumerationCompleteEvent = 4;
private const int ResumeTransferEvent = 5;
private const int ResumeEnumerationCompleteEvent = 6;
private const int UnexpectedTransferFailedEvent = 7;

private DataMovementEventSource() : base(EventSourceName) { }

Expand Down Expand Up @@ -92,5 +93,11 @@ public void ResumeEnumerationComplete(string transferId, int jobPartCount)
{
WriteEvent(ResumeEnumerationCompleteEvent, transferId, jobPartCount);
}

[Event(UnexpectedTransferFailedEvent, Level = EventLevel.Error, Message = "Transfer [{0}] Transfer failed: {1}")]
public void UnexpectedTransferFailed(string transferId, string errorMessage)
{
WriteEvent(UnexpectedTransferFailedEvent, transferId, errorMessage);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,21 @@ await _copyToDestinationFile(
if (_isChunkHandlerRunning)
{
// This will trigger the job part to call Dispose on this object
_ = Task.Run(() => _invokeFailedEventHandler(ex));
_ = Task.Run(async () =>
{
try
{
await _invokeFailedEventHandler(ex).ConfigureAwait(false);
}
catch
{
// Log and swallow any exceptions to prevent crashing the process
DataMovementEventSource.Singleton
.UnexpectedTransferFailed(
nameof(CommitChunkHandler),
ex.ToString());
}
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ public bool SetTransferState(TransferState state)
// Tell the transfer manager to clean up the completed/paused job.
TransferManager.TryRemoveTransfer(_id);

// Remove Transfer Manager reference after no longer needed.
TransferManager = null;

// Once we reach a Completed/Paused, Dispose the CancellationTokenSource to release resources (since it is no longer needed).
DisposeCancellationTokenSource();
}
Expand Down
Loading