Skip to content

Commit c7ab0dd

Browse files
authored
[Storage] [DataMovement] Fixed bug where adding multiple transfer in parallel could cause a Dictionary Collision in the transfers stored (#46919)
* WIP * WIP -test * Fixed bug where adding multiple transfers in parallel could cause a collision * Merge main test continued * Fix tests
1 parent 26cc2f8 commit c7ab0dd

File tree

7 files changed

+62
-14
lines changed

7 files changed

+62
-14
lines changed

sdk/storage/Azure.Storage.DataMovement.Blobs/tests/PauseResumeTransferTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1255,7 +1255,7 @@ public async Task PauseAllTriggersCorrectPauses()
12551255
{
12561256
unpausable.Add(transfer);
12571257
}
1258-
manager._dataTransfers.Add(Guid.NewGuid().ToString(), transfer.Object);
1258+
manager._dataTransfers.TryAdd(Guid.NewGuid().ToString(), transfer.Object);
12591259
}
12601260

12611261
await manager.PauseAllRunningTransfersAsync(_mockingToken);

sdk/storage/Azure.Storage.DataMovement/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
### Breaking Changes
88

99
### Bugs Fixed
10+
- Fixed bug where adding multiple transfers in parallel could cause a collision (`InvalidOperationException`) in the data transfers stored within the `TransferManager`.
1011

1112
### Other Changes
1213

sdk/storage/Azure.Storage.DataMovement/src/Shared/Errors.DataMovement.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,5 +131,8 @@ public static ArgumentException UnexpectedPropertyType(string propertyName, para
131131

132132
public static InvalidOperationException CheckpointerDisabled(string method)
133133
=> new InvalidOperationException($"Unable to perform {method}. The transfer checkpointer is disabled.");
134+
135+
public static InvalidOperationException CollisionTransferId(string id)
136+
=> new InvalidOperationException($"Transfer Id Collision: The transfer id, {id}, already exists in the transfer manager.");
134137
}
135138
}

sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using System.Buffers;
6+
using System.Collections.Concurrent;
67
using System.Collections.Generic;
78
using System.Linq;
89
using System.Runtime.CompilerServices;
@@ -28,7 +29,7 @@ public class TransferManager : IAsyncDisposable
2829
/// <summary>
2930
/// Ongoing transfers indexed at the transfer id.
3031
/// </summary>
31-
internal readonly Dictionary<string, DataTransfer> _dataTransfers = new();
32+
internal readonly ConcurrentDictionary<string, DataTransfer> _dataTransfers = new();
3233

3334
/// <summary>
3435
/// Designated checkpointer for the respective transfer manager.
@@ -290,12 +291,11 @@ bool TryGetStorageResourceProvider(DataTransferProperties properties, bool getSo
290291

291292
transferOptions ??= new DataTransferOptions();
292293

293-
if (_dataTransfers.ContainsKey(dataTransferProperties.TransferId))
294-
{
295-
// Remove the stale DataTransfer so we can pass a new DataTransfer object
296-
// to the user and also track the transfer from the DataTransfer object
297-
_dataTransfers.Remove(dataTransferProperties.TransferId);
298-
}
294+
// Remove the stale DataTransfer so we can pass a new DataTransfer object
295+
// to the user and also track the transfer from the DataTransfer object
296+
// No need to check if we were able to remove the transfer or not.
297+
// If there's no stale DataTransfer to remove, move on.
298+
_dataTransfers.TryRemove(dataTransferProperties.TransferId, out DataTransfer transfer);
299299

300300
if (!TryGetStorageResourceProvider(dataTransferProperties, getSource: true, out StorageResourceProvider sourceProvider))
301301
{
@@ -409,7 +409,10 @@ private async Task<DataTransfer> BuildAndAddTransferJobAsync(
409409
.ConfigureAwait(false);
410410

411411
transfer.TransferManager = this;
412-
_dataTransfers.Add(transfer.Id, transfer);
412+
if (!_dataTransfers.TryAdd(transfer.Id, transfer))
413+
{
414+
throw Errors.CollisionTransferId(transfer.Id);
415+
}
413416
await _jobsProcessor.QueueAsync(transferJobInternal, cancellationToken).ConfigureAwait(false);
414417

415418
return transfer;
@@ -424,12 +427,17 @@ private async Task SetDataTransfers(CancellationToken cancellationToken = defaul
424427
foreach (string transferId in storedTransfers)
425428
{
426429
DataTransferStatus jobStatus = await _checkpointer.GetJobStatusAsync(transferId, cancellationToken).ConfigureAwait(false);
427-
_dataTransfers.Add(transferId, new DataTransfer(
430+
// If TryAdd fails here, we need to check if in other places where we are
431+
// adding that every transferId is unique.
432+
if (!_dataTransfers.TryAdd(transferId, new DataTransfer(
428433
id: transferId,
429434
status: jobStatus)
430435
{
431436
TransferManager = this,
432-
});
437+
}))
438+
{
439+
throw Errors.CollisionTransferId(transferId);
440+
}
433441
}
434442
}
435443

sdk/storage/Azure.Storage.DataMovement/tests/GetTransfersTests.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,10 @@ public async Task GetTransfers_LocalCheckpointer()
222222

223223
// Act
224224
IList<DataTransfer> result = await manager.GetTransfersAsync().ToListAsync();
225+
List<string> resultIds = result.Select(t => t.Id).ToList();
225226

226227
// Assert
227-
Assert.AreEqual(checkpointerTransfers, result.Select(d => d.Id).ToList());
228+
Assert.IsTrue(Enumerable.SequenceEqual(checkpointerTransfers.OrderBy(id => id), result.Select(t => t.Id).OrderBy(id => id)));
228229
}
229230

230231
[Test]

sdk/storage/Azure.Storage.DataMovement/tests/Models/StepProcessor.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT License.
33

4+
using System.Collections.Concurrent;
45
using System.Collections.Generic;
56
using System.Threading;
67
using System.Threading.Tasks;
@@ -14,7 +15,7 @@ namespace Azure.Storage.DataMovement.Tests
1415
/// <typeparam name="T"></typeparam>
1516
internal class StepProcessor<T> : IProcessor<T>
1617
{
17-
private readonly Queue<T> _queue = new();
18+
private readonly ConcurrentQueue<T> _queue = new();
1819

1920
public int ItemsInQueue => _queue.Count;
2021

@@ -39,7 +40,8 @@ public async ValueTask<bool> TryStepAsync(CancellationToken cancellationToken =
3940
{
4041
if (_queue.Count > 0)
4142
{
42-
await Process?.Invoke(_queue.Dequeue(), cancellationToken);
43+
_queue.TryDequeue(out T result);
44+
await Process?.Invoke(result, cancellationToken);
4345
return true;
4446
}
4547
else

sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using System.Buffers;
6+
using System.Collections.Concurrent;
67
using System.Collections.Generic;
78
using System.IO;
89
using System.Linq;
@@ -430,6 +431,38 @@ public async Task TransferFailAtPartProcess(
430431
// TODO determine checkpointer status of job chunks
431432
// need checkpointer API refactor for this
432433
}
434+
435+
[Test]
436+
[TestCase(5)]
437+
[TestCase(10)]
438+
[TestCase(12345)]
439+
public async Task MultipleTransfersAddedCheckpointer(int numJobs)
440+
{
441+
Uri srcUri = new("file:///foo/bar");
442+
Uri dstUri = new("https://example.com/fizz/buzz");
443+
444+
(var jobsProcessor, var partsProcessor, var chunksProcessor) = StepProcessors();
445+
JobBuilder jobBuilder = new(ArrayPool<byte>.Shared, default, new(ClientOptions.Default));
446+
Mock<ITransferCheckpointer> checkpointer = new(MockBehavior.Loose);
447+
448+
(StorageResource srcResource, StorageResource dstResource, Func<IDisposable> srcThrowScope, Func<IDisposable> dstThrowScope)
449+
= GetBasicSetupResources(false, srcUri, dstUri);
450+
451+
await using TransferManager transferManager = new(
452+
jobsProcessor,
453+
partsProcessor,
454+
chunksProcessor,
455+
jobBuilder,
456+
checkpointer.Object,
457+
default);
458+
459+
// Add jobs on separate Tasks
460+
var loopResult = Parallel.For(0, numJobs, i =>
461+
{
462+
Task<DataTransfer> task = transferManager.StartTransferAsync(srcResource, dstResource);
463+
});
464+
Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(numJobs), "Error during initial Job queueing.");
465+
}
433466
}
434467

435468
internal static partial class MockExtensions

0 commit comments

Comments
 (0)