Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,42 @@ private void AssertAllJobsAndPartsCompleted(int numJobs, int numJobParts, List<D
}
}

private async Task AssertResumeTransfer(
int numJobs,
int numJobParts,
int numChunks,
List<DataTransfer> resumedTransfers,
MemoryTransferCheckpointer checkpointer,
StepProcessor<TransferJobInternal> jobsProcessor,
StepProcessor<JobPartInternal> partsProcessor,
StepProcessor<Func<Task>> chunksProcessor)
{
await Task.Delay(50);
int pausedJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused];
Assert.That(pausedJobsCount, Is.EqualTo(numJobs));

// process jobs on resume
Assert.That(await jobsProcessor.StepAll(), Is.EqualTo(numJobs), "Error job processing on resume");

await Task.Delay(50);
int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress];
int enumerationCompleteCount = GetEnumerationCompleteCount(resumedTransfers, checkpointer);
Assert.That(enumerationCompleteCount, Is.EqualTo(numJobs), "Error: all jobs should have finished enumerating");
Assert.That(inProgressJobsCount, Is.EqualTo(numJobs), "Error: all jobs should be in InProgress state after Job Processing on resume");

// process job parts on resume
Assert.That(await partsProcessor.StepAll(), Is.EqualTo(numJobParts), "Error job part processing on resume");

await Task.Delay(50);
int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress];
Assert.That(inProgressJobPartsCount, Is.EqualTo(numJobParts), "Error: all job parts should be in InProgress state after Part Processing on resume");

// process chunks on resume
Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks), "Error chunk processing on resume");
await Task.Delay(50);
AssertAllJobsAndPartsCompleted(numJobs, numJobParts, resumedTransfers, checkpointer);
}

[Test]
[Combinatorial]
public async Task PauseResumeDuringJobProcessing_ItemTransfer(
Expand Down Expand Up @@ -211,31 +247,7 @@ public async Task PauseResumeDuringJobProcessing_ItemTransfer(
MaximumTransferChunkSize = chunkSize,
});

await Task.Delay(50);
int pausedJobsCount_resume = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused];
Assert.That(pausedJobsCount_resume, Is.EqualTo(items));

// process jobs on resume
Assert.That(await jobsProcessor.StepAll(), Is.EqualTo(items), "Error job processing on resume");

await Task.Delay(50);
int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress];
int enumerationCompleteCount2 = GetEnumerationCompleteCount(transfers, checkpointer);
Assert.That(enumerationCompleteCount2, Is.EqualTo(items), "Error: all jobs should have finished enumerating");
Assert.That(inProgressJobsCount, Is.EqualTo(items), "Error: all jobs should be in InProgress state after Job Processing on resume");

// process job parts on resume
Assert.That(await partsProcessor.StepAll(), Is.EqualTo(items), "Error job part processing on resume");

await Task.Delay(50);
int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress];
Assert.That(inProgressJobPartsCount, Is.EqualTo(items), "Error: all job parts should be in InProgress state after Job Processing on resume");

// process chunks on resume
Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks), "Error chunk processing on resume");

await Task.Delay(50);
AssertAllJobsAndPartsCompleted(items, items, transfers, checkpointer);
await AssertResumeTransfer(items, items, numChunks, resumedTransfers, checkpointer, jobsProcessor, partsProcessor, chunksProcessor);
}

[Test]
Expand Down Expand Up @@ -371,33 +383,7 @@ public async Task PauseResumeDuringPartProcessing_ItemTransfer(
// START RESUME TRANSFERS
List<DataTransfer> resumedTransfers = await transferManager.ResumeAllTransfersAsync();

await Task.Delay(50);
int pausedJobsCount_resume = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused];
Assert.That(pausedJobsCount_resume, Is.EqualTo(items));

// process jobs on resume
Assert.That(await jobsProcessor.StepAll(), Is.EqualTo(items), "Error job processing on resume");

await Task.Delay(50);
int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress];
int pausedJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused];
int enumerationCompleteCount2 = GetEnumerationCompleteCount(transfers, checkpointer);
Assert.That(enumerationCompleteCount2, Is.EqualTo(items), "Error: all jobs should have finished enumerating");
Assert.That(inProgressJobsCount, Is.EqualTo(items), "Error: all jobs should be in InProgress state after Job Processing on resume");
Assert.That(pausedJobPartsCount, Is.EqualTo(items));

// process job parts on resume
Assert.That(await partsProcessor.StepAll(), Is.EqualTo(items), "Error job part processing on resume");

await Task.Delay(50);
int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress];
Assert.That(inProgressJobPartsCount, Is.EqualTo(items), "Error: all job parts should be in InProgress state after Part Processing on resume");

// process chunks on resume
Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks), "Error chunk processing on resume");

await Task.Delay(50);
AssertAllJobsAndPartsCompleted(items, items, transfers, checkpointer);
await AssertResumeTransfer(items, items, numChunks, resumedTransfers, checkpointer, jobsProcessor, partsProcessor, chunksProcessor);
}

[Test]
Expand Down Expand Up @@ -548,63 +534,14 @@ public async Task PauseResumeDuringChunkProcessing_ItemTransfer(
// START RESUME TRANSFERS
List<DataTransfer> resumedTransfers = await transferManager.ResumeAllTransfersAsync();

await Task.Delay(50);
int pausedJobsCount_resume = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused];
if (pauseLocation == PauseLocation.PauseProcessHalfway)
{
Assert.That(pausedJobsCount_resume, Is.EqualTo(items / 2));
Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(items / 2), "Error in job processor on resume");
await AssertResumeTransfer(items / 2, items / 2, numChunks / 2, resumedTransfers, checkpointer, jobsProcessor, partsProcessor, chunksProcessor);
}
else
{
Assert.That(pausedJobsCount_resume, Is.EqualTo(items));
Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(items), "Error in job processor on resume");
await AssertResumeTransfer(items, items, numChunks, resumedTransfers, checkpointer, jobsProcessor, partsProcessor, chunksProcessor);
}

// process jobs on resume
await jobsProcessor.StepAll();

await Task.Delay(50);
int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress];
int pausedJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused];
int enumerationCompleteCount2 = GetEnumerationCompleteCount(transfers, checkpointer);
Assert.That(enumerationCompleteCount2, Is.EqualTo(items), "Error: all jobs should have finished enumerating");
if (pauseLocation == PauseLocation.PauseProcessHalfway)
{
// the count for all jobs for PauseProcessHalfway is (items / 2) since half have already completed transfer
Assert.That(inProgressJobsCount, Is.EqualTo(items / 2), "Error: all jobs should be in InProgress state after Job Processing on resume");
Assert.That(pausedJobPartsCount, Is.EqualTo(items / 2));
Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(items / 2), "Error in job part processor on resume");
}
else
{
Assert.That(inProgressJobsCount, Is.EqualTo(items), "Error: all jobs should be in InProgress state after Job Processing on resume");
Assert.That(pausedJobPartsCount, Is.EqualTo(items));
Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(items), "Error in job part processor on resume");
}

// process job parts on resume
await partsProcessor.StepAll();

await Task.Delay(50);
int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress];
if (pauseLocation == PauseLocation.PauseProcessHalfway)
{
// the count for all job parts for PauseProcessHalfway is (items / 2) since half have already completed transfer
Assert.That(inProgressJobPartsCount, Is.EqualTo(items / 2), "Error: all job parts should be in InProgress state after Part Processing on resume");
Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(numChunks / 2), "Error in chunk processor on resume");
}
else
{
Assert.That(inProgressJobPartsCount, Is.EqualTo(items), "Error: all job parts should be in InProgress state after Part Processing on resume");
Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(numChunks), "Error in chunk processor on resume");
}

// process chunks on resume
await chunksProcessor.StepAll();

await Task.Delay(50);
AssertAllJobsAndPartsCompleted(items, items, transfers, checkpointer);
}

[Test]
Expand Down Expand Up @@ -740,31 +677,7 @@ public async Task PauseResumeDuringJobProcessing_ContainerTransfer(
MaximumTransferChunkSize = chunkSize,
});

await Task.Delay(50);
int pausedJobsCount_resume = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused];
Assert.That(pausedJobsCount_resume, Is.EqualTo(numJobs));

// process jobs on resume
Assert.That(await jobsProcessor.StepAll(), Is.EqualTo(numJobs), "Error job processing on resume");

await Task.Delay(50);
int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress];
int enumerationCompleteCount2 = GetEnumerationCompleteCount(transfers, checkpointer);
Assert.That(enumerationCompleteCount2, Is.EqualTo(numJobs), "Error: all jobs should have finished enumerating");
Assert.That(inProgressJobsCount, Is.EqualTo(numJobs), "Error: all jobs should be in InProgress state after Job Processing on resume");

// process job parts on resume
Assert.That(await partsProcessor.StepAll(), Is.EqualTo(numJobParts), "Error job part processing on resume");

await Task.Delay(50);
int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress];
Assert.That(inProgressJobPartsCount, Is.EqualTo(numJobParts), "Error: all job parts should be in InProgress state after Part Processing on resume");

// process chunks on resume
Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks), "Error chunk processing on resume");

await Task.Delay(50);
AssertAllJobsAndPartsCompleted(numJobs, numJobParts, transfers, checkpointer);
await AssertResumeTransfer(numJobs, numJobParts, numChunks, resumedTransfers, checkpointer, jobsProcessor, partsProcessor, chunksProcessor);
}

[Test]
Expand Down Expand Up @@ -900,33 +813,7 @@ public async Task PauseResumeDuringPartProcessing_ContainerTransfer(
// START RESUME TRANSFERS
List<DataTransfer> resumedTransfers = await transferManager.ResumeAllTransfersAsync();

await Task.Delay(50);
int pausedJobsCount_resume = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused];
Assert.That(pausedJobsCount_resume, Is.EqualTo(numJobs));

// process jobs on resume
Assert.That(await jobsProcessor.StepAll(), Is.EqualTo(numJobs), "Error job processing on resume");

await Task.Delay(50);
int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress];
int pausedJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.Paused];
int enumerationCompleteCount2 = GetEnumerationCompleteCount(transfers, checkpointer);
Assert.That(enumerationCompleteCount2, Is.EqualTo(numJobs), "Error: all jobs should have finished enumerating");
Assert.That(inProgressJobsCount, Is.EqualTo(numJobs), "Error: all jobs should be in InProgress state after Job Processing on resume");
Assert.That(pausedJobPartsCount, Is.EqualTo(numJobParts));

// process job parts on resume
Assert.That(await partsProcessor.StepAll(), Is.EqualTo(numJobParts), "Error job part processing on resume");

await Task.Delay(50);
int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress];
Assert.That(inProgressJobPartsCount, Is.EqualTo(numJobParts), "Error: all job parts should be in InProgress state after Part Processing on resume");

// process chunks on resume
Assert.That(await chunksProcessor.StepAll(), Is.EqualTo(numChunks), "Error chunk processing on resume");

await Task.Delay(50);
AssertAllJobsAndPartsCompleted(numJobs, numJobParts, transfers, checkpointer);
await AssertResumeTransfer(numJobs, numJobParts, numChunks, resumedTransfers, checkpointer, jobsProcessor, partsProcessor, chunksProcessor);
}

[Test]
Expand Down Expand Up @@ -1091,58 +978,17 @@ public async Task PauseResumeDuringChunkProcessing_ContainerTransfer(
// START RESUME TRANSFERS
List<DataTransfer> resumedTransfers = await transferManager.ResumeAllTransfersAsync();

await Task.Delay(50);
int expectedJobsCount_half = numJobs - expectedAlreadyCompletedJobsCount_half;
if (pauseLocation == PauseLocation.PauseProcessHalfway)
{
Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(expectedJobsCount_half), "Error in job processor on resume");
}
else
{
Assert.That(jobsProcessor.ItemsInQueue, Is.EqualTo(numJobs), "Error in job processor on resume");
}

// process jobs on resume
await jobsProcessor.StepAll();

await Task.Delay(50);
int inProgressJobsCount = GetJobsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress];
int enumerationCompleteCount2 = GetEnumerationCompleteCount(transfers, checkpointer);
Assert.That(enumerationCompleteCount2, Is.EqualTo(numJobs), "Error: all jobs should have finished enumerating");
int expectedPartsCount_half = Enumerable.Range(numJobs + 1 - expectedJobsCount_half, expectedJobsCount_half)
.Sum(i => i * 2);
if (pauseLocation == PauseLocation.PauseProcessHalfway)
{
Assert.That(inProgressJobsCount, Is.EqualTo(expectedJobsCount_half), "Error: all remaining jobs should be in InProgress state after Job Processing on resume");
Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(expectedPartsCount_half), "Error in parts processor on resume");
int expectedJobsCount = numJobs - expectedAlreadyCompletedJobsCount_half;
int expectedPartsCount = Enumerable.Range(numJobs + 1 - expectedJobsCount, expectedJobsCount)
.Sum(i => i * 2);
await AssertResumeTransfer(expectedJobsCount, expectedPartsCount, expectedPartsCount, resumedTransfers, checkpointer, jobsProcessor, partsProcessor, chunksProcessor);
}
else
{
Assert.That(inProgressJobsCount, Is.EqualTo(numJobs), "Error: all jobs should be in InProgress state after Job Processing on resume");
Assert.That(partsProcessor.ItemsInQueue, Is.EqualTo(numJobParts), "Error in parts processor on resume");
await AssertResumeTransfer(numJobs, numJobParts, numChunks, resumedTransfers, checkpointer, jobsProcessor, partsProcessor, chunksProcessor);
}

// process job parts on resume
await partsProcessor.StepAll();

await Task.Delay(50);
int inProgressJobPartsCount = GetJobPartsStateCount(resumedTransfers, checkpointer)[DataTransferState.InProgress];
if (pauseLocation == PauseLocation.PauseProcessHalfway)
{
Assert.That(inProgressJobPartsCount, Is.EqualTo(expectedPartsCount_half), "Error: all remaining job parts should be in InProgress state after Part Processing on resume");
Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(expectedPartsCount_half), "Error in chunks processor on resume"); // For this test, job parts is 1:1 with job chunks
}
else
{
Assert.That(inProgressJobPartsCount, Is.EqualTo(numJobParts), "Error: all job parts should be in InProgress state after Part Processing on resume");
Assert.That(chunksProcessor.ItemsInQueue, Is.EqualTo(numChunks), "Error in chunks processor on resume");
}

// process chunks on resume
await chunksProcessor.StepAll();

await Task.Delay(50);
AssertAllJobsAndPartsCompleted(numJobs, numJobParts, transfers, checkpointer);
}

public enum PauseLocation
Expand Down
Loading