diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferMockedTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferMockedTests.cs index 99345209e50f..d7cb44165f71 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferMockedTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferMockedTests.cs @@ -79,6 +79,42 @@ private void AssertAllJobsAndPartsCompleted(int numJobs, int numJobParts, List resumedTransfers, + MemoryTransferCheckpointer checkpointer, + StepProcessor jobsProcessor, + StepProcessor partsProcessor, + StepProcessor> chunksProcessor) + { + await Task.Delay(50); + int pausedJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; + Assert.That(pausedJobsCount, Is.EqualTo(numJobs)); + + // process jobs on resume + Assert.That(await jobsProcessor.StepAll(), Is.EqualTo(numJobs), "Error job processing on resume"); + + await Task.Delay(50); + int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; + int enumerationCompleteCount = GetEnumerationCompleteCount(resumedTransfers, checkpointer); + Assert.That(enumerationCompleteCount, Is.EqualTo(numJobs), "Error: all jobs should have finished enumerating"); + Assert.That(inProgressJobsCount, Is.EqualTo(numJobs), "Error: all jobs should be in InProgress state after Job Processing on resume"); + + // process job parts on resume + Assert.That(await partsProcessor.StepAll(), Is.EqualTo(numJobParts), "Error job part processing on resume"); + + await Task.Delay(50); + int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; + Assert.That(inProgressJobPartsCount, Is.EqualTo(numJobParts), "Error: all job parts should be in InProgress state after Part Processing on resume"); + + // process chunks on resume + Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks), "Error chunk processing on resume"); + await Task.Delay(50); + AssertAllJobsAndPartsCompleted(numJobs, numJobParts, resumedTransfers, checkpointer); + } + [Test] [Combinatorial] public async Task PauseResumeDuringJobProcessing_ItemTransfer( @@ -211,31 +247,7 @@ public async Task PauseResumeDuringJobProcessing_ItemTransfer( MaximumTransferChunkSize = chunkSize, }); - await Task.Delay(50); - int pausedJobsCount_resume = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; - Assert.That(pausedJobsCount_resume, Is.EqualTo(items)); - - // process jobs on resume - Assert.That(await jobsProcessor.StepAll(), Is.EqualTo(items), "Error job processing on resume"); - - await Task.Delay(50); - int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; - int enumerationCompleteCount2 = GetEnumerationCompleteCount(transfers, checkpointer); - Assert.That(enumerationCompleteCount2, Is.EqualTo(items), "Error: all jobs should have finished enumerating"); - Assert.That(inProgressJobsCount, Is.EqualTo(items), "Error: all jobs should be in InProgress state after Job Processing on resume"); - - // process job parts on resume - Assert.That(await partsProcessor.StepAll(), Is.EqualTo(items), "Error job part processing on resume"); - - await Task.Delay(50); - int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; - Assert.That(inProgressJobPartsCount, Is.EqualTo(items), "Error: all job parts should be in InProgress state after Job Processing on resume"); - - // process chunks on resume - Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks), "Error chunk processing on resume"); - - await Task.Delay(50); - AssertAllJobsAndPartsCompleted(items, items, transfers, checkpointer); + await AssertResumeTransfer(items, items, numChunks, resumedTransfers, checkpointer, jobsProcessor, partsProcessor, chunksProcessor); } [Test] @@ -371,33 +383,7 @@ public async Task PauseResumeDuringPartProcessing_ItemTransfer( // START RESUME TRANSFERS List resumedTransfers = await transferManager.ResumeAllTransfersAsync(); - await Task.Delay(50); - int pausedJobsCount_resume = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; - Assert.That(pausedJobsCount_resume, Is.EqualTo(items)); - - // process jobs on resume - Assert.That(await jobsProcessor.StepAll(), Is.EqualTo(items), "Error job processing on resume"); - - await Task.Delay(50); - int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; - int pausedJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; - int enumerationCompleteCount2 = GetEnumerationCompleteCount(transfers, checkpointer); - Assert.That(enumerationCompleteCount2, Is.EqualTo(items), "Error: all jobs should have finished enumerating"); - Assert.That(inProgressJobsCount, Is.EqualTo(items), "Error: all jobs should be in InProgress state after Job Processing on resume"); - Assert.That(pausedJobPartsCount, Is.EqualTo(items)); - - // process job parts on resume - Assert.That(await partsProcessor.StepAll(), Is.EqualTo(items), "Error job part processing on resume"); - - await Task.Delay(50); - int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; - Assert.That(inProgressJobPartsCount, Is.EqualTo(items), "Error: all job parts should be in InProgress state after Part Processing on resume"); - - // process chunks on resume - Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks), "Error chunk processing on resume"); - - await Task.Delay(50); - AssertAllJobsAndPartsCompleted(items, items, transfers, checkpointer); + await AssertResumeTransfer(items, items, numChunks, resumedTransfers, checkpointer, jobsProcessor, partsProcessor, chunksProcessor); } [Test] @@ -548,63 +534,14 @@ public async Task PauseResumeDuringChunkProcessing_ItemTransfer( // START RESUME TRANSFERS List resumedTransfers = await transferManager.ResumeAllTransfersAsync(); - await Task.Delay(50); - int pausedJobsCount_resume = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; if (pauseLocation == PauseLocation.PauseProcessHalfway) { - Assert.That(pausedJobsCount_resume, Is.EqualTo(items / 2)); - Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(items / 2), "Error in job processor on resume"); + await AssertResumeTransfer(items / 2, items / 2, numChunks / 2, resumedTransfers, checkpointer, jobsProcessor, partsProcessor, chunksProcessor); } else { - Assert.That(pausedJobsCount_resume, Is.EqualTo(items)); - Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(items), "Error in job processor on resume"); + await AssertResumeTransfer(items, items, numChunks, resumedTransfers, checkpointer, jobsProcessor, partsProcessor, chunksProcessor); } - - // process jobs on resume - await jobsProcessor.StepAll(); - - await Task.Delay(50); - int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; - int pausedJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; - int enumerationCompleteCount2 = GetEnumerationCompleteCount(transfers, checkpointer); - Assert.That(enumerationCompleteCount2, Is.EqualTo(items), "Error: all jobs should have finished enumerating"); - if (pauseLocation == PauseLocation.PauseProcessHalfway) - { - // the count for all jobs for PauseProcessHalfway is (items / 2) since half have already completed transfer - Assert.That(inProgressJobsCount, Is.EqualTo(items / 2), "Error: all jobs should be in InProgress state after Job Processing on resume"); - Assert.That(pausedJobPartsCount, Is.EqualTo(items / 2)); - Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(items / 2), "Error in job part processor on resume"); - } - else - { - Assert.That(inProgressJobsCount, Is.EqualTo(items), "Error: all jobs should be in InProgress state after Job Processing on resume"); - Assert.That(pausedJobPartsCount, Is.EqualTo(items)); - Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(items), "Error in job part processor on resume"); - } - - // process job parts on resume - await partsProcessor.StepAll(); - - await Task.Delay(50); - int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; - if (pauseLocation == PauseLocation.PauseProcessHalfway) - { - // the count for all job parts for PauseProcessHalfway is (items / 2) since half have already completed transfer - Assert.That(inProgressJobPartsCount, Is.EqualTo(items / 2), "Error: all job parts should be in InProgress state after Part Processing on resume"); - Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(numChunks / 2), "Error in chunk processor on resume"); - } - else - { - Assert.That(inProgressJobPartsCount, Is.EqualTo(items), "Error: all job parts should be in InProgress state after Part Processing on resume"); - Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(numChunks), "Error in chunk processor on resume"); - } - - // process chunks on resume - await chunksProcessor.StepAll(); - - await Task.Delay(50); - AssertAllJobsAndPartsCompleted(items, items, transfers, checkpointer); } [Test] @@ -740,31 +677,7 @@ public async Task PauseResumeDuringJobProcessing_ContainerTransfer( MaximumTransferChunkSize = chunkSize, }); - await Task.Delay(50); - int pausedJobsCount_resume = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; - Assert.That(pausedJobsCount_resume, Is.EqualTo(numJobs)); - - // process jobs on resume - Assert.That(await jobsProcessor.StepAll(), Is.EqualTo(numJobs), "Error job processing on resume"); - - await Task.Delay(50); - int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; - int enumerationCompleteCount2 = GetEnumerationCompleteCount(transfers, checkpointer); - Assert.That(enumerationCompleteCount2, Is.EqualTo(numJobs), "Error: all jobs should have finished enumerating"); - Assert.That(inProgressJobsCount, Is.EqualTo(numJobs), "Error: all jobs should be in InProgress state after Job Processing on resume"); - - // process job parts on resume - Assert.That(await partsProcessor.StepAll(), Is.EqualTo(numJobParts), "Error job part processing on resume"); - - await Task.Delay(50); - int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; - Assert.That(inProgressJobPartsCount, Is.EqualTo(numJobParts), "Error: all job parts should be in InProgress state after Part Processing on resume"); - - // process chunks on resume - Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks), "Error chunk processing on resume"); - - await Task.Delay(50); - AssertAllJobsAndPartsCompleted(numJobs, numJobParts, transfers, checkpointer); + await AssertResumeTransfer(numJobs, numJobParts, numChunks, resumedTransfers, checkpointer, jobsProcessor, partsProcessor, chunksProcessor); } [Test] @@ -900,33 +813,7 @@ public async Task PauseResumeDuringPartProcessing_ContainerTransfer( // START RESUME TRANSFERS List resumedTransfers = await transferManager.ResumeAllTransfersAsync(); - await Task.Delay(50); - int pausedJobsCount_resume = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; - Assert.That(pausedJobsCount_resume, Is.EqualTo(numJobs)); - - // process jobs on resume - Assert.That(await jobsProcessor.StepAll(), Is.EqualTo(numJobs), "Error job processing on resume"); - - await Task.Delay(50); - int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; - int pausedJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused]; - int enumerationCompleteCount2 = GetEnumerationCompleteCount(transfers, checkpointer); - Assert.That(enumerationCompleteCount2, Is.EqualTo(numJobs), "Error: all jobs should have finished enumerating"); - Assert.That(inProgressJobsCount, Is.EqualTo(numJobs), "Error: all jobs should be in InProgress state after Job Processing on resume"); - Assert.That(pausedJobPartsCount, Is.EqualTo(numJobParts)); - - // process job parts on resume - Assert.That(await partsProcessor.StepAll(), Is.EqualTo(numJobParts), "Error job part processing on resume"); - - await Task.Delay(50); - int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; - Assert.That(inProgressJobPartsCount, Is.EqualTo(numJobParts), "Error: all job parts should be in InProgress state after Part Processing on resume"); - - // process chunks on resume - Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks), "Error chunk processing on resume"); - - await Task.Delay(50); - AssertAllJobsAndPartsCompleted(numJobs, numJobParts, transfers, checkpointer); + await AssertResumeTransfer(numJobs, numJobParts, numChunks, resumedTransfers, checkpointer, jobsProcessor, partsProcessor, chunksProcessor); } [Test] @@ -1091,58 +978,17 @@ public async Task PauseResumeDuringChunkProcessing_ContainerTransfer( // START RESUME TRANSFERS List resumedTransfers = await transferManager.ResumeAllTransfersAsync(); - await Task.Delay(50); - int expectedJobsCount_half = numJobs - expectedAlreadyCompletedJobsCount_half; - if (pauseLocation == PauseLocation.PauseProcessHalfway) - { - Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(expectedJobsCount_half), "Error in job processor on resume"); - } - else - { - Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(numJobs), "Error in job processor on resume"); - } - - // process jobs on resume - await jobsProcessor.StepAll(); - - await Task.Delay(50); - int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; - int enumerationCompleteCount2 = GetEnumerationCompleteCount(transfers, checkpointer); - Assert.That(enumerationCompleteCount2, Is.EqualTo(numJobs), "Error: all jobs should have finished enumerating"); - int expectedPartsCount_half = Enumerable.Range(numJobs + 1 - expectedJobsCount_half, expectedJobsCount_half) - .Sum(i => i * 2); if (pauseLocation == PauseLocation.PauseProcessHalfway) { - Assert.That(inProgressJobsCount, Is.EqualTo(expectedJobsCount_half), "Error: all remaining jobs should be in InProgress state after Job Processing on resume"); - Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(expectedPartsCount_half), "Error in parts processor on resume"); + int expectedJobsCount = numJobs - expectedAlreadyCompletedJobsCount_half; + int expectedPartsCount = Enumerable.Range(numJobs + 1 - expectedJobsCount, expectedJobsCount) + .Sum(i => i * 2); + await AssertResumeTransfer(expectedJobsCount, expectedPartsCount, expectedPartsCount, resumedTransfers, checkpointer, jobsProcessor, partsProcessor, chunksProcessor); } else { - Assert.That(inProgressJobsCount, Is.EqualTo(numJobs), "Error: all jobs should be in InProgress state after Job Processing on resume"); - Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(numJobParts), "Error in parts processor on resume"); + await AssertResumeTransfer(numJobs, numJobParts, numChunks, resumedTransfers, checkpointer, jobsProcessor, partsProcessor, chunksProcessor); } - - // process job parts on resume - await partsProcessor.StepAll(); - - await Task.Delay(50); - int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress]; - if (pauseLocation == PauseLocation.PauseProcessHalfway) - { - Assert.That(inProgressJobPartsCount, Is.EqualTo(expectedPartsCount_half), "Error: all remaining job parts should be in InProgress state after Part Processing on resume"); - Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(expectedPartsCount_half), "Error in chunks processor on resume"); // For this test, job parts is 1:1 with job chunks - } - else - { - Assert.That(inProgressJobPartsCount, Is.EqualTo(numJobParts), "Error: all job parts should be in InProgress state after Part Processing on resume"); - Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(numChunks), "Error in chunks processor on resume"); - } - - // process chunks on resume - await chunksProcessor.StepAll(); - - await Task.Delay(50); - AssertAllJobsAndPartsCompleted(numJobs, numJobParts, transfers, checkpointer); } public enum PauseLocation