From 0dacad424966410767541f5d2dc116999199db16 Mon Sep 17 00:00:00 2001 From: nickliu-msft Date: Thu, 5 Dec 2024 15:47:12 -0500 Subject: [PATCH 1/5] initial commit --- .../src/JobBuilder.cs | 5 +- .../tests/MockStorageResourceProvider.cs | 58 +++ .../tests/PauseResumeTransferMockedTests.cs | 418 ++++++++++++++++-- .../Shared/MemoryTransferCheckpointer.cs | 4 +- .../tests/TransferManagerTests.cs | 24 +- 5 files changed, 477 insertions(+), 32 deletions(-) create mode 100644 sdk/storage/Azure.Storage.DataMovement/tests/MockStorageResourceProvider.cs diff --git a/sdk/storage/Azure.Storage.DataMovement/src/JobBuilder.cs b/sdk/storage/Azure.Storage.DataMovement/src/JobBuilder.cs index c2a8adea5f65..bca3c12151d3 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/JobBuilder.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/JobBuilder.cs @@ -135,7 +135,10 @@ private async Task BuildSingleTransferJob( arrayPool: _arrayPool, clientDiagnostics: ClientDiagnostics); - if (resumeJob) + int jobPartCount = await checkpointer.GetCurrentJobPartCountAsync( + transferId: dataTransfer.Id, + cancellationToken: cancellationToken).ConfigureAwait(false); + if (resumeJob && jobPartCount > 0) { JobPartPlanHeader part = await checkpointer.GetJobPartAsync(dataTransfer.Id, partNumber: 0).ConfigureAwait(false); job.AppendJobPart( diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/MockStorageResourceProvider.cs b/sdk/storage/Azure.Storage.DataMovement/tests/MockStorageResourceProvider.cs new file mode 100644 index 000000000000..01eec8d975ab --- /dev/null +++ b/sdk/storage/Azure.Storage.DataMovement/tests/MockStorageResourceProvider.cs @@ -0,0 +1,58 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Threading; +using System.Threading.Tasks; +using Azure.Storage.Common; + +namespace Azure.Storage.DataMovement.Tests +{ + /// + /// Provider for a configured for mocking. + /// + public class MockStorageResourceProvider : StorageResourceProvider + { + /// + protected internal override string ProviderId => "mock"; + + internal MemoryTransferCheckpointer checkpointer; + + /// + /// Default constructor. + /// + internal MockStorageResourceProvider( + MemoryTransferCheckpointer checkpter) + { + checkpointer = checkpter; + } + + /// + protected internal override Task FromSourceAsync(DataTransferProperties properties, CancellationToken cancellationToken) + => Task.FromResult(FromTransferProperties(properties, getSource: true)); + + /// + protected internal override Task FromDestinationAsync(DataTransferProperties properties, CancellationToken cancellationToken) + => Task.FromResult(FromTransferProperties(properties, getSource: false)); + + private StorageResource FromTransferProperties(DataTransferProperties properties, bool getSource) + { + Argument.AssertNotNull(properties, nameof(properties)); + + string transferId = properties.TransferId; + if (!checkpointer.Jobs.TryGetValue(transferId, out var job)) + { + throw new Exception("Job does not exist."); + } + + if (getSource) + { + return job.Source; + } + else + { + return job.Destination; + } + } + } +} diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferMockedTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferMockedTests.cs index ddbf3d50233c..513d507c8f38 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferMockedTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferMockedTests.cs @@ -30,7 +30,7 @@ private Dictionary GetJobsStateCount(List // populate jobsStateCount foreach (DataTransfer transfer in transfers) { - var job = checkpointer.Jobs[transfer.Id]; + Job job = checkpointer.Jobs[transfer.Id]; ++jobsStateCount[job.Status.State]; } return jobsStateCount; @@ -47,7 +47,7 @@ private Dictionary GetJobPartsStateCount(List GetJobPartsStateCount(List transfers, MemoryTransferCheckpointer checkpointer) + { + return transfers.Count(transfer => checkpointer.Jobs[transfer.Id].EnumerationComplete); + } + [Test] [Combinatorial] - public async Task PauseDuringJobProcessing_ItemTransfer( + public async Task PauseResumeDuringJobProcessing_ItemTransfer( [Values(2, 6)] int items, [Values(333, 500, 1024)] int itemSize, [Values(333, 1024)] int chunkSize, @@ -69,7 +74,7 @@ public async Task PauseDuringJobProcessing_ItemTransfer( int chunksPerPart = (int)Math.Ceiling((float)itemSize / chunkSize); // TODO: below should be only `items * chunksPerPart` but can't in some cases due to // a bug in how work items are processed on multipart uploads. - int expectedChunksInQueue = Math.Max(chunksPerPart - 1, 1) * items; + int numChunks = Math.Max(chunksPerPart - 1, 1) * items; Uri srcUri = new("file:///foo/bar"); Uri dstUri = new("https://example.com/fizz/buzz"); @@ -88,13 +93,15 @@ public async Task PauseDuringJobProcessing_ItemTransfer( return (Source: srcResource, Destination: dstResource); }).ToList(); + List resumeProviders = new() { new MockStorageResourceProvider(checkpointer) }; + await using TransferManager transferManager = new( jobsProcessor, partsProcessor, chunksProcessor, jobBuilder, checkpointer, - default); + resumeProviders); List transfers = new(); @@ -145,6 +152,7 @@ public async Task PauseDuringJobProcessing_ItemTransfer( int pausedJobsCount = GetJobsStateCount(transfers, checkpointer)[DataTransferState.Paused]; int queuedPartsCount = GetJobPartsStateCount(transfers, checkpointer)[DataTransferState.Queued]; int jobPartsCreatedCount = transfers.Sum(transfer => checkpointer.Jobs[transfer.Id].Parts.Count); + int enumerationCompleteCount = GetEnumerationCompleteCount(transfers, checkpointer); // Assert that we properly paused for PauseProcessHalfway & PauseProcessStart if (pauseLocation == PauseLocation.PauseProcessHalfway) @@ -153,6 +161,7 @@ public async Task PauseDuringJobProcessing_ItemTransfer( Assert.That(queuedPartsCount, Is.EqualTo(items / 2), "Error in Pausing half"); Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(items / 2), "Error in Pausing half"); Assert.That(jobPartsCreatedCount, Is.EqualTo(items / 2), "Error in Pausing half"); + Assert.That(enumerationCompleteCount, Is.EqualTo(items / 2), "Error: half of the jobs should have finished enumerating"); } else { @@ -160,7 +169,7 @@ public async Task PauseDuringJobProcessing_ItemTransfer( Assert.That(queuedPartsCount, Is.EqualTo(0), "Error in Pausing all"); Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(0), "Error in Pausing all"); Assert.That(jobPartsCreatedCount, Is.EqualTo(0), "Error in Pausing all"); - Assert.Pass(); + Assert.That(enumerationCompleteCount, Is.EqualTo(0), "Error: none of the jobs should have finished enumerating"); } // At this point, we are continuing with the leftovers from PauseProcessHalfway @@ -168,17 +177,65 @@ public async Task PauseDuringJobProcessing_ItemTransfer( await partsProcessor.StepAll(); await Task.Delay(50); - int pausedJobsCount2 = GetJobsStateCount(transfers, checkpointer)[DataTransferState.Paused]; - int pausedPartsCount2 = GetJobPartsStateCount(transfers, checkpointer)[DataTransferState.Paused]; - Assert.That(pausedJobsCount2, Is.EqualTo(items), "Error in transitioning all jobs to Paused state"); - // only half of the job part checkpointers are created in PauseProcessHalfway so only half will be in Paused state - Assert.That(pausedPartsCount2, Is.EqualTo(items / 2), "Error in transitioning all created job parts to Paused state"); - Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(0), "Error: no items should proceed to chunking"); + if (pauseLocation == PauseLocation.PauseProcessHalfway) + { + int pausedJobsCount2 = GetJobsStateCount(transfers, checkpointer)[DataTransferState.Paused]; + int pausedPartsCount2 = GetJobPartsStateCount(transfers, checkpointer)[DataTransferState.Paused]; + Assert.That(pausedJobsCount2, Is.EqualTo(items), "Error in transitioning all jobs to Paused state"); + // only half of the job part checkpointers are created in PauseProcessHalfway so only half will be in Paused state + Assert.That(pausedPartsCount2, Is.EqualTo(items / 2), "Error in transitioning all created job parts to Paused state"); + Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(0), "Error: no items should proceed to chunking"); + } + + // START RESUME TRANSFERS + List resumedTransfers = await transferManager.ResumeAllTransfersAsync(new() + { + InitialTransferSize = chunkSize, + MaximumTransferChunkSize = chunkSize, + }); + + await Task.Delay(50); + int pausedJobsCount_resume = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; + Assert.That(pausedJobsCount_resume, Is.EqualTo(items)); + + // process jobs on resume + Assert.That(await jobsProcessor.StepAll(), Is.EqualTo(items), "Error job processing on resume"); + + await Task.Delay(50); + int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; + int enumerationCompleteCount2 = GetEnumerationCompleteCount(transfers, checkpointer); + Assert.That(enumerationCompleteCount2, Is.EqualTo(items), "Error: all jobs should have finished enumerating"); + Assert.That(inProgressJobsCount, Is.EqualTo(items), "Error: all jobs should be in InProgress state after Job Processing on resume"); + + // process job parts on resume + Assert.That(await partsProcessor.StepAll(), Is.EqualTo(items), "Error job part processing on resume"); + + await Task.Delay(50); + int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; + Assert.That(inProgressJobPartsCount, Is.EqualTo(items), "Error: all job parts should be in InProgress state after Job Processing on resume"); + + // process chunks on resume + Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks), "Error chunk processing on resume"); + + // Assert that all jobs and job parts are completed successfully + await Task.Delay(50); + int completedJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Completed]; + int completedPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.Completed]; + Assert.That(completedJobsCount, Is.EqualTo(items), "Error in transitioning all jobs to Completed state"); + Assert.That(completedPartsCount, Is.EqualTo(items), "Error in transitioning all job parts to Completed state"); + foreach (DataTransfer resumedTransfer in resumedTransfers) + { + Assert.That(resumedTransfer.HasCompleted); + Job job = checkpointer.Jobs[resumedTransfer.Id]; + Assert.That(job.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all jobs to Completed state"); + JobPart jobPart = job.Parts.First().Value; + Assert.That(jobPart.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all job parts to Completed state"); + } } [Test] [Combinatorial] - public async Task PauseDuringPartProcessing_ItemTransfer( + public async Task PauseResumeDuringPartProcessing_ItemTransfer( [Values(2, 6)] int items, [Values(333, 500, 1024)] int itemSize, [Values(333, 1024)] int chunkSize, @@ -207,13 +264,15 @@ public async Task PauseDuringPartProcessing_ItemTransfer( return (Source: srcResource, Destination: dstResource); }).ToList(); + List resumeProviders = new() { new MockStorageResourceProvider(checkpointer) }; + await using TransferManager transferManager = new( jobsProcessor, partsProcessor, chunksProcessor, jobBuilder, checkpointer, - default); + resumeProviders); List transfers = new(); @@ -250,6 +309,9 @@ public async Task PauseDuringPartProcessing_ItemTransfer( Assert.That(partsDict.First().Value.Status.State, Is.EqualTo(DataTransferState.Queued), "Error during Job part file creation."); } + int enumerationCompleteCount = GetEnumerationCompleteCount(transfers, checkpointer); + Assert.That(enumerationCompleteCount, Is.EqualTo(items), "Error: all jobs should have finished enumerating"); + // Setup PauseProcessHalfway & PauseProcessStart before issuing pause if (pauseLocation == PauseLocation.PauseProcessHalfway) { @@ -300,11 +362,54 @@ public async Task PauseDuringPartProcessing_ItemTransfer( int pausedPartsCount2 = GetJobPartsStateCount(transfers, checkpointer)[DataTransferState.Paused]; Assert.That(pausedJobsCount2, Is.EqualTo(items), "Error in transitioning all jobs to Paused state"); Assert.That(pausedPartsCount2, Is.EqualTo(items), "Error in transitioning all job parts to Paused state"); + + // START RESUME TRANSFERS + List resumedTransfers = await transferManager.ResumeAllTransfersAsync(); + + await Task.Delay(50); + int pausedJobsCount_resume = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; + Assert.That(pausedJobsCount_resume, Is.EqualTo(items)); + + // process jobs on resume + Assert.That(await jobsProcessor.StepAll(), Is.EqualTo(items), "Error job processing on resume"); + + await Task.Delay(50); + int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; + int pausedJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; + int enumerationCompleteCount2 = GetEnumerationCompleteCount(transfers, checkpointer); + Assert.That(enumerationCompleteCount2, Is.EqualTo(items), "Error: all jobs should have finished enumerating"); + Assert.That(inProgressJobsCount, Is.EqualTo(items), "Error: all jobs should be in InProgress state after Job Processing on resume"); + Assert.That(pausedJobPartsCount, Is.EqualTo(items)); + + // process job parts on resume + Assert.That(await partsProcessor.StepAll(), Is.EqualTo(items), "Error job part processing on resume"); + + await Task.Delay(50); + int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; + Assert.That(inProgressJobPartsCount, Is.EqualTo(items), "Error: all job parts should be in InProgress state after Part Processing on resume"); + + // process chunks on resume + Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks), "Error chunk processing on resume"); + + // Assert that all jobs and job parts are completed successfully + await Task.Delay(50); + int completedJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Completed]; + int completedPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.Completed]; + Assert.That(completedJobsCount, Is.EqualTo(items), "Error in transitioning all jobs to Completed state"); + Assert.That(completedPartsCount, Is.EqualTo(items), "Error in transitioning all job parts to Completed state"); + foreach (DataTransfer resumedTransfer in resumedTransfers) + { + Assert.That(resumedTransfer.HasCompleted); + Job job = checkpointer.Jobs[resumedTransfer.Id]; + Assert.That(job.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all jobs to Completed state"); + JobPart jobPart = job.Parts.First().Value; + Assert.That(jobPart.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all job parts to Completed state"); + } } [Test] [Combinatorial] - public async Task PauseDuringChunkProcessing_ItemTransfer( + public async Task PauseResumeDuringChunkProcessing_ItemTransfer( [Values(2, 6)] int items, [Values(1024)] int itemSize, [Values(1024)] int chunkSize, @@ -333,13 +438,15 @@ public async Task PauseDuringChunkProcessing_ItemTransfer( return (Source: srcResource, Destination: dstResource); }).ToList(); + List resumeProviders = new() { new MockStorageResourceProvider(checkpointer) }; + await using TransferManager transferManager = new( jobsProcessor, partsProcessor, chunksProcessor, jobBuilder, checkpointer, - default); + resumeProviders); List transfers = new(); @@ -376,9 +483,18 @@ public async Task PauseDuringChunkProcessing_ItemTransfer( Assert.That(partsDict.First().Value.Status.State, Is.EqualTo(DataTransferState.Queued), "Error during Job part file creation."); } + int enumerationCompleteCount = GetEnumerationCompleteCount(transfers, checkpointer); + Assert.That(enumerationCompleteCount, Is.EqualTo(items), "Error: all jobs should have finished enumerating"); + // Process parts Assert.That(await partsProcessor.StepAll(), Is.EqualTo(items), "Error in part processing"); + foreach (DataTransfer transfer in transfers) + { + var partsDict = checkpointer.Jobs[transfer.Id].Parts; + Assert.That(partsDict.First().Value.Status.State, Is.EqualTo(DataTransferState.InProgress), "Error transitioning Job Part to InProgress"); + } + // Setup PauseProcessHalfway & PauseProcessStart before issuing pause if (pauseLocation == PauseLocation.PauseProcessHalfway) { @@ -435,11 +551,84 @@ public async Task PauseDuringChunkProcessing_ItemTransfer( Assert.That(pausedPartsCount, Is.EqualTo(items), "Error in Pausing all"); Assert.That(completedPartsCount, Is.EqualTo(0), "Error in Pausing all"); } + + // START RESUME TRANSFERS + List resumedTransfers = await transferManager.ResumeAllTransfersAsync(); + + await Task.Delay(50); + int pausedJobsCount_resume = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; + if (pauseLocation == PauseLocation.PauseProcessHalfway) + { + Assert.That(pausedJobsCount_resume, Is.EqualTo(items / 2)); + Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(items / 2), "Error in job processor on resume"); + } + else + { + Assert.That(pausedJobsCount_resume, Is.EqualTo(items)); + Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(items), "Error in job processor on resume"); + } + + // process jobs on resume + await jobsProcessor.StepAll(); + + await Task.Delay(50); + int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; + int pausedJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; + int enumerationCompleteCount2 = GetEnumerationCompleteCount(transfers, checkpointer); + Assert.That(enumerationCompleteCount2, Is.EqualTo(items), "Error: all jobs should have finished enumerating"); + if (pauseLocation == PauseLocation.PauseProcessHalfway) + { + // the count for all jobs for PauseProcessHalfway is (items / 2) since half have already completed transfer + Assert.That(inProgressJobsCount, Is.EqualTo(items / 2), "Error: all jobs should be in InProgress state after Job Processing on resume"); + Assert.That(pausedJobPartsCount, Is.EqualTo(items / 2)); + Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(items / 2), "Error in job part processor on resume"); + } + else + { + Assert.That(inProgressJobsCount, Is.EqualTo(items), "Error: all jobs should be in InProgress state after Job Processing on resume"); + Assert.That(pausedJobPartsCount, Is.EqualTo(items)); + Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(items), "Error in job part processor on resume"); + } + + // process job parts on resume + await partsProcessor.StepAll(); + + await Task.Delay(50); + int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; + if (pauseLocation == PauseLocation.PauseProcessHalfway) + { + // the count for all job parts for PauseProcessHalfway is (items / 2) since half have already completed transfer + Assert.That(inProgressJobPartsCount, Is.EqualTo(items / 2), "Error: all job parts should be in InProgress state after Part Processing on resume"); + Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(numChunks / 2), "Error in chunk processor on resume"); + } + else + { + Assert.That(inProgressJobPartsCount, Is.EqualTo(items), "Error: all job parts should be in InProgress state after Part Processing on resume"); + Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(numChunks), "Error in chunk processor on resume"); + } + + // process chunks on resume + await chunksProcessor.StepAll(); + + // Assert that all jobs and job parts are completed successfully + await Task.Delay(50); + int completedJobsCount2 = GetJobsStateCount(transfers, checkpointer)[DataTransferState.Completed]; + int completedPartsCount2 = GetJobPartsStateCount(transfers, checkpointer)[DataTransferState.Completed]; + Assert.That(completedJobsCount2, Is.EqualTo(items), "Error in transitioning all jobs to Completed state"); + Assert.That(completedPartsCount2, Is.EqualTo(items), "Error in transitioning all job parts to Completed state"); + foreach (DataTransfer resumedTransfer in resumedTransfers) + { + Assert.That(resumedTransfer.HasCompleted); + Job job = checkpointer.Jobs[resumedTransfer.Id]; + Assert.That(job.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all jobs to Completed state"); + JobPart jobPart = job.Parts.First().Value; + Assert.That(jobPart.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all job parts to Completed state"); + } } [Test] [Combinatorial] - public async Task PauseDuringJobProcessing_ContainerTransfer( + public async Task PauseResumeDuringJobProcessing_ContainerTransfer( [Values(2, 6)] int numJobs, [Values(333, 500, 1024)] int itemSize, [Values(333, 1024)] int chunkSize, @@ -469,13 +658,15 @@ public async Task PauseDuringJobProcessing_ContainerTransfer( return (Source: srcResource, Destination: dstResource); }).ToList(); + List resumeProviders = new() { new MockStorageResourceProvider(checkpointer) }; + await using TransferManager transferManager = new( jobsProcessor, partsProcessor, chunksProcessor, jobBuilder, checkpointer, - default); + resumeProviders); List transfers = new(); @@ -527,6 +718,7 @@ public async Task PauseDuringJobProcessing_ContainerTransfer( int queuedPartsCount = GetJobPartsStateCount(transfers, checkpointer)[DataTransferState.Queued]; int expectedPartsCreatedCount = Enumerable.Range(1, numJobs/2).Select(GetItemCountFromContainerIndex).Sum(); int jobPartsCreatedCount = transfers.Sum(transfer => checkpointer.Jobs[transfer.Id].Parts.Count); + int enumerationCompleteCount = GetEnumerationCompleteCount(transfers, checkpointer); // Assert that we properly paused for PauseProcessHalfway & PauseProcessStart if (pauseLocation == PauseLocation.PauseProcessHalfway) @@ -535,6 +727,7 @@ public async Task PauseDuringJobProcessing_ContainerTransfer( Assert.That(queuedPartsCount, Is.EqualTo(expectedPartsCreatedCount), "Error in Pausing half"); Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(expectedPartsCreatedCount), "Error in Pausing half"); Assert.That(jobPartsCreatedCount, Is.EqualTo(expectedPartsCreatedCount), "Error in Pausing half"); + Assert.That(enumerationCompleteCount, Is.EqualTo(numJobs / 2), "Error: half of the jobs should have finished enumerating"); } else { @@ -542,7 +735,7 @@ public async Task PauseDuringJobProcessing_ContainerTransfer( Assert.That(queuedPartsCount, Is.EqualTo(0), "Error in Pausing all"); Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(0), "Error in Pausing all"); Assert.That(jobPartsCreatedCount, Is.EqualTo(0), "Error in Pausing all"); - Assert.Pass(); + Assert.That(enumerationCompleteCount, Is.EqualTo(0), "Error: none of the jobs should have finished enumerating"); } // At this point, we are continuing with the leftovers from PauseProcessStart @@ -550,16 +743,66 @@ public async Task PauseDuringJobProcessing_ContainerTransfer( await partsProcessor.StepAll(); await Task.Delay(50); - int pausedJobsCount2 = GetJobsStateCount(transfers, checkpointer)[DataTransferState.Paused]; - int pausedPartsCount2 = GetJobPartsStateCount(transfers, checkpointer)[DataTransferState.Paused]; - Assert.That(pausedJobsCount2, Is.EqualTo(numJobs), "Error in transitioning all jobs to Paused state"); - Assert.That(pausedPartsCount2, Is.EqualTo(expectedPartsCreatedCount), "Error in transitioning all created job parts to Paused state"); - Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(0), "Error: no items should proceed to chunking"); + if (pauseLocation == PauseLocation.PauseProcessHalfway) + { + int pausedJobsCount2 = GetJobsStateCount(transfers, checkpointer)[DataTransferState.Paused]; + int pausedPartsCount2 = GetJobPartsStateCount(transfers, checkpointer)[DataTransferState.Paused]; + Assert.That(pausedJobsCount2, Is.EqualTo(numJobs), "Error in transitioning all jobs to Paused state"); + Assert.That(pausedPartsCount2, Is.EqualTo(expectedPartsCreatedCount), "Error in transitioning all created job parts to Paused state"); + Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(0), "Error: no items should proceed to chunking"); + } + + // START RESUME TRANSFERS + List resumedTransfers = await transferManager.ResumeAllTransfersAsync(new() + { + InitialTransferSize = chunkSize, + MaximumTransferChunkSize = chunkSize, + }); + + await Task.Delay(50); + int pausedJobsCount_resume = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; + Assert.That(pausedJobsCount_resume, Is.EqualTo(numJobs)); + + // process jobs on resume + Assert.That(await jobsProcessor.StepAll(), Is.EqualTo(numJobs), "Error job processing on resume"); + + await Task.Delay(50); + int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; + int enumerationCompleteCount2 = GetEnumerationCompleteCount(transfers, checkpointer); + Assert.That(enumerationCompleteCount2, Is.EqualTo(numJobs), "Error: all jobs should have finished enumerating"); + Assert.That(inProgressJobsCount, Is.EqualTo(numJobs), "Error: all jobs should be in InProgress state after Job Processing on resume"); + + // process job parts on resume + Assert.That(await partsProcessor.StepAll(), Is.EqualTo(numJobParts), "Error job part processing on resume"); + + await Task.Delay(50); + int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; + Assert.That(inProgressJobPartsCount, Is.EqualTo(numJobParts), "Error: all job parts should be in InProgress state after Part Processing on resume"); + + // process chunks on resume + Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks), "Error chunk processing on resume"); + + // Assert that all jobs and job parts are completed successfully + await Task.Delay(50); + int completedJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Completed]; + int completedPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.Completed]; + Assert.That(completedJobsCount, Is.EqualTo(numJobs), "Error in transitioning all jobs to Completed state"); + Assert.That(completedPartsCount, Is.EqualTo(numJobParts), "Error in transitioning all job parts to Completed state"); + foreach (DataTransfer resumedTransfer in resumedTransfers) + { + Assert.That(resumedTransfer.HasCompleted); + Job job = checkpointer.Jobs[resumedTransfer.Id]; + Assert.That(job.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all jobs to Completed state"); + foreach (var jobPart in job.Parts) + { + Assert.That(jobPart.Value.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all job parts to Completed state"); + } + } } [Test] [Combinatorial] - public async Task PauseDuringPartProcessing_ContainerTransfer( + public async Task PauseResumeDuringPartProcessing_ContainerTransfer( [Values(2, 6)] int numJobs, [Values(333, 500, 1024)] int itemSize, [Values(333, 1024)] int chunkSize, @@ -589,13 +832,15 @@ public async Task PauseDuringPartProcessing_ContainerTransfer( return (Source: srcResource, Destination: dstResource); }).ToList(); + List resumeProviders = new() { new MockStorageResourceProvider(checkpointer) }; + await using TransferManager transferManager = new( jobsProcessor, partsProcessor, chunksProcessor, jobBuilder, checkpointer, - default); + resumeProviders); List transfers = new(); @@ -635,6 +880,9 @@ public async Task PauseDuringPartProcessing_ContainerTransfer( Is.True, "Error during Job part file creation: Not all parts are in the Queued state."); } + int enumerationCompleteCount = GetEnumerationCompleteCount(transfers, checkpointer); + Assert.That(enumerationCompleteCount, Is.EqualTo(numJobs), "Error: all jobs should have finished enumerating"); + // Setup PauseProcessHalfway & PauseProcessStart before issuing pause if (pauseLocation == PauseLocation.PauseProcessHalfway) { @@ -681,11 +929,56 @@ public async Task PauseDuringPartProcessing_ContainerTransfer( int pausedPartsCount2 = GetJobPartsStateCount(transfers, checkpointer)[DataTransferState.Paused]; Assert.That(pausedJobsCount2, Is.EqualTo(numJobs), "Error in transitioning all jobs to Paused state"); Assert.That(pausedPartsCount2, Is.EqualTo(numJobParts), "Error in transitioning all parts to Paused state"); + + // START RESUME TRANSFERS + List resumedTransfers = await transferManager.ResumeAllTransfersAsync(); + + await Task.Delay(50); + int pausedJobsCount_resume = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; + Assert.That(pausedJobsCount_resume, Is.EqualTo(numJobs)); + + // process jobs on resume + Assert.That(await jobsProcessor.StepAll(), Is.EqualTo(numJobs), "Error job processing on resume"); + + await Task.Delay(50); + int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; + int pausedJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; + int enumerationCompleteCount2 = GetEnumerationCompleteCount(transfers, checkpointer); + Assert.That(enumerationCompleteCount2, Is.EqualTo(numJobs), "Error: all jobs should have finished enumerating"); + Assert.That(inProgressJobsCount, Is.EqualTo(numJobs), "Error: all jobs should be in InProgress state after Job Processing on resume"); + Assert.That(pausedJobPartsCount, Is.EqualTo(numJobParts)); + + // process job parts on resume + Assert.That(await partsProcessor.StepAll(), Is.EqualTo(numJobParts), "Error job part processing on resume"); + + await Task.Delay(50); + int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; + Assert.That(inProgressJobPartsCount, Is.EqualTo(numJobParts), "Error: all job parts should be in InProgress state after Part Processing on resume"); + + // process chunks on resume + Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks), "Error chunk processing on resume"); + + // Assert that all jobs and job parts are completed successfully + await Task.Delay(50); + int completedJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Completed]; + int completedPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.Completed]; + Assert.That(completedJobsCount, Is.EqualTo(numJobs), "Error in transitioning all jobs to Completed state"); + Assert.That(completedPartsCount, Is.EqualTo(numJobParts), "Error in transitioning all job parts to Completed state"); + foreach (DataTransfer resumedTransfer in resumedTransfers) + { + Assert.That(resumedTransfer.HasCompleted); + Job job = checkpointer.Jobs[resumedTransfer.Id]; + Assert.That(job.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all jobs to Completed state"); + foreach (var jobPart in job.Parts) + { + Assert.That(jobPart.Value.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all job parts to Completed state"); + } + } } [Test] [Combinatorial] - public async Task PauseDuringChunkProcessing_ContainerTransfer( + public async Task PauseResumeDuringChunkProcessing_ContainerTransfer( [Values(2, 6)] int numJobs, [Values(1024)] int itemSize, [Values(1024)] int chunkSize, @@ -715,13 +1008,15 @@ public async Task PauseDuringChunkProcessing_ContainerTransfer( return (Source: srcResource, Destination: dstResource); }).ToList(); + List resumeProviders = new() { new MockStorageResourceProvider(checkpointer) }; + await using TransferManager transferManager = new( jobsProcessor, partsProcessor, chunksProcessor, jobBuilder, checkpointer, - default); + resumeProviders); List transfers = new(); @@ -761,9 +1056,19 @@ public async Task PauseDuringChunkProcessing_ContainerTransfer( Is.True, "Error during Job part file creation: Not all parts are in the Queued state."); } + int enumerationCompleteCount = GetEnumerationCompleteCount(transfers, checkpointer); + Assert.That(enumerationCompleteCount, Is.EqualTo(numJobs), "Error: all jobs should have finished enumerating"); + // Process parts Assert.That(await partsProcessor.StepAll(), Is.EqualTo(numJobParts), "Error processing job parts"); + foreach (DataTransfer transfer in transfers) + { + var partsDict = checkpointer.Jobs[transfer.Id].Parts; + Assert.That(partsDict.Values.All(part => part.Status.State == DataTransferState.InProgress), + Is.True, "Error transitioning each Job Part to InProgress"); + } + // Setup PauseProcessHalfway & PauseProcessStart before issuing pause if (pauseLocation == PauseLocation.PauseProcessHalfway) { @@ -816,6 +1121,63 @@ public async Task PauseDuringChunkProcessing_ContainerTransfer( Assert.That(pausedPartsCount, Is.EqualTo(numJobParts), "Error in Pausing all"); Assert.That(completedPartsCount, Is.EqualTo(0), "Error in Pausing all"); } + + // START RESUME TRANSFERS + List resumedTransfers = await transferManager.ResumeAllTransfersAsync(); + + await Task.Delay(50); + int pausedJobsCount_resume = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; + if (pauseLocation == PauseLocation.PauseProcessStart) + { + Assert.That(pausedJobsCount_resume, Is.EqualTo(numJobs)); + Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(numJobs), "Error in job processor on resume"); + } + + // process jobs on resume + await jobsProcessor.StepAll(); + + await Task.Delay(50); + int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; + int pausedJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; + int enumerationCompleteCount2 = GetEnumerationCompleteCount(transfers, checkpointer); + Assert.That(enumerationCompleteCount2, Is.EqualTo(numJobs), "Error: all jobs should have finished enumerating"); + if (pauseLocation == PauseLocation.PauseProcessStart) + { + Assert.That(inProgressJobsCount, Is.EqualTo(numJobs), "Error: all jobs should be in InProgress state after Job Processing on resume"); + Assert.That(pausedJobPartsCount, Is.EqualTo(numJobParts)); + Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(numJobParts), "Error in parts processor on resume"); + } + + // process job parts on resume + await partsProcessor.StepAll(); + + await Task.Delay(50); + int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; + if (pauseLocation == PauseLocation.PauseProcessStart) + { + Assert.That(inProgressJobPartsCount, Is.EqualTo(numJobParts), "Error: all job parts should be in InProgress state after Part Processing on resume"); + Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(numChunks), "Error in chunks processor on resume"); + } + + // process chunks on resume + await chunksProcessor.StepAll(); + + // Assert that all jobs and job parts are completed successfully + await Task.Delay(50); + int completedJobsCount2 = GetJobsStateCount(transfers, checkpointer)[DataTransferState.Completed]; + int completedPartsCount2 = GetJobPartsStateCount(transfers, checkpointer)[DataTransferState.Completed]; + Assert.That(completedJobsCount2, Is.EqualTo(numJobs), "Error in transitioning all jobs to Completed state"); + Assert.That(completedPartsCount2, Is.EqualTo(numJobParts), "Error in transitioning all job parts to Completed state"); + foreach (DataTransfer resumedTransfer in resumedTransfers) + { + Assert.That(resumedTransfer.HasCompleted); + Job job = checkpointer.Jobs[resumedTransfer.Id]; + Assert.That(job.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all jobs to Completed state"); + foreach (var jobPart in job.Parts) + { + Assert.That(jobPart.Value.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all job parts to Completed state"); + } + } } public enum PauseLocation diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/Shared/MemoryTransferCheckpointer.cs b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/MemoryTransferCheckpointer.cs index 36da63a5221a..7acde9eca916 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/Shared/MemoryTransferCheckpointer.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/MemoryTransferCheckpointer.cs @@ -126,9 +126,9 @@ public virtual Task GetJobPartAsync(string transferId, int pa { throw new Exception("Job does not exist."); } - if (job.Parts.TryGetValue(partNumber, out JobPart part)) + if (!job.Parts.TryGetValue(partNumber, out JobPart part)) { - throw new Exception($"Job part {partNumber} already exists for job {job.TransferId}."); + throw new Exception($"Job part {partNumber} does not exists for job {job.TransferId}."); } return Task.FromResult(part.Plan); } diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs index d25df4411545..eede06b413fd 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs @@ -605,6 +605,10 @@ public static void BasicSetup( new(itemSize, default, default, new()))); }); + items.Source.Setup(r => r.IsContainer).Returns(false); + + items.Source.Setup(r => r.ProviderId).Returns("mock"); + items.Destination.Setup(r => r.CopyFromStreamAsync( It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) @@ -623,6 +627,10 @@ public static void BasicSetup( CancellationHelper.ThrowIfCancellationRequested(cancellationToken); return Task.CompletedTask; }); + + items.Destination.Setup(r => r.IsContainer).Returns(false); + + items.Destination.Setup(r => r.ProviderId).Returns("mock"); } public static void BasicSetup( @@ -660,11 +668,25 @@ async IAsyncEnumerable SubResourcesAsAsyncEnumerable( containers.Source.Setup(r => r.GetStorageResourcesAsync(It.IsAny(), It.IsAny())) .Returns(SubResourcesAsAsyncEnumerable); + containers.Source.Setup(r => r.GetStorageResourceReference(It.IsAny(), It.IsAny())) + .Returns((path, resId) => subResources + .Where(pair => pair.Source.Object.Uri.AbsolutePath.Contains(path)) + .FirstOrDefault().Source?.Object + ); + containers.Destination.Setup(r => r.GetStorageResourceReference(It.IsAny(), It.IsAny())) .Returns((path, resId) => subResources .Where(pair => pair.Source.Object.Uri.AbsolutePath.Contains(path)) .FirstOrDefault().Destination?.Object - ); + ); + + containers.Source.Setup(r => r.IsContainer).Returns(true); + + containers.Source.Setup(r => r.ProviderId).Returns("mock"); + + containers.Destination.Setup(r => r.IsContainer).Returns(true); + + containers.Destination.Setup(r => r.ProviderId).Returns("mock"); } public static void VerifyTransferManagerCtorInvocations(this Mock> processor) From 7c8d3c281ee14f87a909e2108605c5af7e3c788b Mon Sep 17 00:00:00 2001 From: nickliu-msft Date: Thu, 5 Dec 2024 19:07:20 -0500 Subject: [PATCH 2/5] change to internal --- .../tests/MockStorageResourceProvider.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/MockStorageResourceProvider.cs b/sdk/storage/Azure.Storage.DataMovement/tests/MockStorageResourceProvider.cs index 01eec8d975ab..10b8be2cf0ed 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/MockStorageResourceProvider.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/MockStorageResourceProvider.cs @@ -11,7 +11,7 @@ namespace Azure.Storage.DataMovement.Tests /// /// Provider for a configured for mocking. /// - public class MockStorageResourceProvider : StorageResourceProvider + internal class MockStorageResourceProvider : StorageResourceProvider { /// protected internal override string ProviderId => "mock"; From 7b41ef516bbdcc0830f84febde478efff9e8fc57 Mon Sep 17 00:00:00 2001 From: nickliu-msft Date: Fri, 6 Dec 2024 14:55:12 -0500 Subject: [PATCH 3/5] Added checkpointer verify --- .../Azure.Storage.DataMovement/tests/TransferManagerTests.cs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs index eede06b413fd..2ac16421c8b9 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs @@ -443,6 +443,10 @@ public async Task TransferFailAtJobProcess( It.IsAny())); checkpointer.Verify(c => c.SetJobStatusAsync(transferId, It.IsAny(), It.IsAny()), Times.Exactly(3)); + if (!isContainer) + { + checkpointer.Verify(c => c.GetCurrentJobPartCountAsync(It.IsAny(), It.IsAny()), Times.Once); + } Assert.That(capturedTransferStatuses[0].State, Is.EqualTo(DataTransferState.InProgress)); Assert.That(capturedTransferStatuses[1].State, Is.EqualTo(DataTransferState.Stopping)); Assert.That(capturedTransferStatuses[2].IsCompletedWithFailedItems); From 3823cb5fd948aa3de7fe67275a83adcae90ab1da Mon Sep 17 00:00:00 2001 From: nickliu-msft Date: Sat, 7 Dec 2024 01:34:30 -0500 Subject: [PATCH 4/5] Refaactored out jobs and parts completion asserts --- .../tests/PauseResumeTransferMockedTests.cs | 107 ++++-------------- 1 file changed, 23 insertions(+), 84 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferMockedTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferMockedTests.cs index 513d507c8f38..3318d94a992c 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferMockedTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferMockedTests.cs @@ -62,6 +62,23 @@ private int GetEnumerationCompleteCount(List transfers, MemoryTran return transfers.Count(transfer => checkpointer.Jobs[transfer.Id].EnumerationComplete); } + private void AssertAllJobsAndPartsCompleted(int numJobs, int numJobParts, List transfers, MemoryTransferCheckpointer checkpointer) + { + int completedJobsCount = GetJobsStateCount(transfers, checkpointer)[DataTransferState.Completed]; + int completedPartsCount = GetJobPartsStateCount(transfers, checkpointer)[DataTransferState.Completed]; + Assert.That(completedJobsCount, Is.EqualTo(numJobs), "Error in transitioning all jobs to Completed state"); + Assert.That(completedPartsCount, Is.EqualTo(numJobParts), "Error in transitioning all job parts to Completed state"); + foreach (DataTransfer transfer in transfers) + { + Job job = checkpointer.Jobs[transfer.Id]; + Assert.That(job.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all jobs to Completed state"); + foreach (var jobPart in job.Parts) + { + Assert.That(jobPart.Value.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all job parts to Completed state"); + } + } + } + [Test] [Combinatorial] public async Task PauseResumeDuringJobProcessing_ItemTransfer( @@ -217,20 +234,8 @@ public async Task PauseResumeDuringJobProcessing_ItemTransfer( // process chunks on resume Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks), "Error chunk processing on resume"); - // Assert that all jobs and job parts are completed successfully await Task.Delay(50); - int completedJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Completed]; - int completedPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.Completed]; - Assert.That(completedJobsCount, Is.EqualTo(items), "Error in transitioning all jobs to Completed state"); - Assert.That(completedPartsCount, Is.EqualTo(items), "Error in transitioning all job parts to Completed state"); - foreach (DataTransfer resumedTransfer in resumedTransfers) - { - Assert.That(resumedTransfer.HasCompleted); - Job job = checkpointer.Jobs[resumedTransfer.Id]; - Assert.That(job.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all jobs to Completed state"); - JobPart jobPart = job.Parts.First().Value; - Assert.That(jobPart.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all job parts to Completed state"); - } + AssertAllJobsAndPartsCompleted(items, items, transfers, checkpointer); } [Test] @@ -391,20 +396,8 @@ public async Task PauseResumeDuringPartProcessing_ItemTransfer( // process chunks on resume Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks), "Error chunk processing on resume"); - // Assert that all jobs and job parts are completed successfully await Task.Delay(50); - int completedJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Completed]; - int completedPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.Completed]; - Assert.That(completedJobsCount, Is.EqualTo(items), "Error in transitioning all jobs to Completed state"); - Assert.That(completedPartsCount, Is.EqualTo(items), "Error in transitioning all job parts to Completed state"); - foreach (DataTransfer resumedTransfer in resumedTransfers) - { - Assert.That(resumedTransfer.HasCompleted); - Job job = checkpointer.Jobs[resumedTransfer.Id]; - Assert.That(job.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all jobs to Completed state"); - JobPart jobPart = job.Parts.First().Value; - Assert.That(jobPart.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all job parts to Completed state"); - } + AssertAllJobsAndPartsCompleted(items, items, transfers, checkpointer); } [Test] @@ -610,20 +603,8 @@ public async Task PauseResumeDuringChunkProcessing_ItemTransfer( // process chunks on resume await chunksProcessor.StepAll(); - // Assert that all jobs and job parts are completed successfully await Task.Delay(50); - int completedJobsCount2 = GetJobsStateCount(transfers, checkpointer)[DataTransferState.Completed]; - int completedPartsCount2 = GetJobPartsStateCount(transfers, checkpointer)[DataTransferState.Completed]; - Assert.That(completedJobsCount2, Is.EqualTo(items), "Error in transitioning all jobs to Completed state"); - Assert.That(completedPartsCount2, Is.EqualTo(items), "Error in transitioning all job parts to Completed state"); - foreach (DataTransfer resumedTransfer in resumedTransfers) - { - Assert.That(resumedTransfer.HasCompleted); - Job job = checkpointer.Jobs[resumedTransfer.Id]; - Assert.That(job.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all jobs to Completed state"); - JobPart jobPart = job.Parts.First().Value; - Assert.That(jobPart.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all job parts to Completed state"); - } + AssertAllJobsAndPartsCompleted(items, items, transfers, checkpointer); } [Test] @@ -782,22 +763,8 @@ public async Task PauseResumeDuringJobProcessing_ContainerTransfer( // process chunks on resume Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks), "Error chunk processing on resume"); - // Assert that all jobs and job parts are completed successfully await Task.Delay(50); - int completedJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Completed]; - int completedPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.Completed]; - Assert.That(completedJobsCount, Is.EqualTo(numJobs), "Error in transitioning all jobs to Completed state"); - Assert.That(completedPartsCount, Is.EqualTo(numJobParts), "Error in transitioning all job parts to Completed state"); - foreach (DataTransfer resumedTransfer in resumedTransfers) - { - Assert.That(resumedTransfer.HasCompleted); - Job job = checkpointer.Jobs[resumedTransfer.Id]; - Assert.That(job.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all jobs to Completed state"); - foreach (var jobPart in job.Parts) - { - Assert.That(jobPart.Value.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all job parts to Completed state"); - } - } + AssertAllJobsAndPartsCompleted(numJobs, numJobParts, transfers, checkpointer); } [Test] @@ -958,22 +925,8 @@ public async Task PauseResumeDuringPartProcessing_ContainerTransfer( // process chunks on resume Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks), "Error chunk processing on resume"); - // Assert that all jobs and job parts are completed successfully await Task.Delay(50); - int completedJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Completed]; - int completedPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.Completed]; - Assert.That(completedJobsCount, Is.EqualTo(numJobs), "Error in transitioning all jobs to Completed state"); - Assert.That(completedPartsCount, Is.EqualTo(numJobParts), "Error in transitioning all job parts to Completed state"); - foreach (DataTransfer resumedTransfer in resumedTransfers) - { - Assert.That(resumedTransfer.HasCompleted); - Job job = checkpointer.Jobs[resumedTransfer.Id]; - Assert.That(job.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all jobs to Completed state"); - foreach (var jobPart in job.Parts) - { - Assert.That(jobPart.Value.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all job parts to Completed state"); - } - } + AssertAllJobsAndPartsCompleted(numJobs, numJobParts, transfers, checkpointer); } [Test] @@ -1162,22 +1115,8 @@ public async Task PauseResumeDuringChunkProcessing_ContainerTransfer( // process chunks on resume await chunksProcessor.StepAll(); - // Assert that all jobs and job parts are completed successfully await Task.Delay(50); - int completedJobsCount2 = GetJobsStateCount(transfers, checkpointer)[DataTransferState.Completed]; - int completedPartsCount2 = GetJobPartsStateCount(transfers, checkpointer)[DataTransferState.Completed]; - Assert.That(completedJobsCount2, Is.EqualTo(numJobs), "Error in transitioning all jobs to Completed state"); - Assert.That(completedPartsCount2, Is.EqualTo(numJobParts), "Error in transitioning all job parts to Completed state"); - foreach (DataTransfer resumedTransfer in resumedTransfers) - { - Assert.That(resumedTransfer.HasCompleted); - Job job = checkpointer.Jobs[resumedTransfer.Id]; - Assert.That(job.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all jobs to Completed state"); - foreach (var jobPart in job.Parts) - { - Assert.That(jobPart.Value.Status.State, Is.EqualTo(DataTransferState.Completed), "Error in transitioning all job parts to Completed state"); - } - } + AssertAllJobsAndPartsCompleted(numJobs, numJobParts, transfers, checkpointer); } public enum PauseLocation From 177df0fb55a4731a4da94abd28772c081c48e30d Mon Sep 17 00:00:00 2001 From: nickliu-msft Date: Sun, 8 Dec 2024 19:23:23 -0500 Subject: [PATCH 5/5] Refactored and added more assertions for pause/resume during chunk processor - container --- .../tests/PauseResumeTransferMockedTests.cs | 40 +++++++++++++++---- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferMockedTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferMockedTests.cs index 3318d94a992c..99345209e50f 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferMockedTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferMockedTests.cs @@ -1061,9 +1061,22 @@ public async Task PauseResumeDuringChunkProcessing_ContainerTransfer( int pausedPartsCount = partsStateCount[DataTransferState.Paused]; int completedPartsCount = partsStateCount[DataTransferState.Completed]; + int expectedAlreadyCompletedJobsCount_half = 0; + for (int i = 1, numChunksCompleted = numChunks / 2; i <= numJobParts / 2 && numChunksCompleted > 0; ++i) + { + numChunksCompleted -= i * 2; + if (numChunksCompleted >= 0) + { + ++expectedAlreadyCompletedJobsCount_half; + } + } + if (pauseLocation == PauseLocation.PauseProcessHalfway) { + int expectedPausedJobsCount = numJobs - expectedAlreadyCompletedJobsCount_half; // For this test, job parts is 1:1 with job chunks + Assert.That(pausedJobsCount, Is.EqualTo(expectedPausedJobsCount), "Error in Pausing half"); + Assert.That(completedJobsCount, Is.EqualTo(expectedAlreadyCompletedJobsCount_half), "Error in Pausing half"); Assert.That(pausedPartsCount, Is.EqualTo(numJobParts / 2), "Error in Pausing half"); Assert.That(completedPartsCount, Is.EqualTo(numJobParts / 2), "Error in Pausing half"); } @@ -1079,10 +1092,13 @@ public async Task PauseResumeDuringChunkProcessing_ContainerTransfer( List resumedTransfers = await transferManager.ResumeAllTransfersAsync(); await Task.Delay(50); - int pausedJobsCount_resume = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; - if (pauseLocation == PauseLocation.PauseProcessStart) + int expectedJobsCount_half = numJobs - expectedAlreadyCompletedJobsCount_half; + if (pauseLocation == PauseLocation.PauseProcessHalfway) + { + Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(expectedJobsCount_half), "Error in job processor on resume"); + } + else { - Assert.That(pausedJobsCount_resume, Is.EqualTo(numJobs)); Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(numJobs), "Error in job processor on resume"); } @@ -1091,13 +1107,18 @@ public async Task PauseResumeDuringChunkProcessing_ContainerTransfer( await Task.Delay(50); int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; - int pausedJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; int enumerationCompleteCount2 = GetEnumerationCompleteCount(transfers, checkpointer); Assert.That(enumerationCompleteCount2, Is.EqualTo(numJobs), "Error: all jobs should have finished enumerating"); - if (pauseLocation == PauseLocation.PauseProcessStart) + int expectedPartsCount_half = Enumerable.Range(numJobs + 1 - expectedJobsCount_half, expectedJobsCount_half) + .Sum(i => i * 2); + if (pauseLocation == PauseLocation.PauseProcessHalfway) + { + Assert.That(inProgressJobsCount, Is.EqualTo(expectedJobsCount_half), "Error: all remaining jobs should be in InProgress state after Job Processing on resume"); + Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(expectedPartsCount_half), "Error in parts processor on resume"); + } + else { Assert.That(inProgressJobsCount, Is.EqualTo(numJobs), "Error: all jobs should be in InProgress state after Job Processing on resume"); - Assert.That(pausedJobPartsCount, Is.EqualTo(numJobParts)); Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(numJobParts), "Error in parts processor on resume"); } @@ -1106,7 +1127,12 @@ public async Task PauseResumeDuringChunkProcessing_ContainerTransfer( await Task.Delay(50); int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; - if (pauseLocation == PauseLocation.PauseProcessStart) + if (pauseLocation == PauseLocation.PauseProcessHalfway) + { + Assert.That(inProgressJobPartsCount, Is.EqualTo(expectedPartsCount_half), "Error: all remaining job parts should be in InProgress state after Part Processing on resume"); + Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(expectedPartsCount_half), "Error in chunks processor on resume"); // For this test, job parts is 1:1 with job chunks + } + else { Assert.That(inProgressJobPartsCount, Is.EqualTo(numJobParts), "Error: all job parts should be in InProgress state after Part Processing on resume"); Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(numChunks), "Error in chunks processor on resume");