diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/Azure.Storage.DataMovement.Blobs.csproj b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/Azure.Storage.DataMovement.Blobs.csproj index b4fdfe379725..e15a0dce44aa 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/Azure.Storage.DataMovement.Blobs.csproj +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/Azure.Storage.DataMovement.Blobs.csproj @@ -39,6 +39,7 @@ + diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs index a9f61ca8ee18..ac906a2a0499 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs @@ -5,6 +5,7 @@ using System.Text; using Azure.Core; using Azure.Storage.Blobs.Models; +using static Azure.Storage.DataMovement.JobPlanExtensions; using Metadata = System.Collections.Generic.IDictionary; using Tags = System.Collections.Generic.IDictionary; @@ -278,23 +279,6 @@ private int CalculateLength() return length; } - private void WriteVariableLengthFieldInfo(BinaryWriter writer, byte[] bytes, ref int currentVariableLengthIndex) - { - // Write the offset, -1 if size is 0 - if (bytes.Length > 0) - { - writer.Write(currentVariableLengthIndex); - currentVariableLengthIndex += bytes.Length; - } - else - { - writer.Write(-1); - } - - // Write the length - writer.Write(bytes.Length); - } - private static void CheckSchemaVersion(int version) { if (version != DataMovementBlobConstants.DestinationJobPartHeader.SchemaVersion) diff --git a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs index ee62763d45fa..87fdf1441a0d 100644 --- a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs +++ b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs @@ -89,6 +89,10 @@ protected internal DataTransferStatus(Azure.Storage.DataMovement.DataTransferSta public bool HasSkippedItems { get { throw null; } } public Azure.Storage.DataMovement.DataTransferState State { get { throw null; } } public bool Equals(Azure.Storage.DataMovement.DataTransferStatus other) { throw null; } + public override bool Equals(object obj) { throw null; } + public override int GetHashCode() { throw null; } + public static bool operator ==(Azure.Storage.DataMovement.DataTransferStatus left, Azure.Storage.DataMovement.DataTransferStatus right) { throw null; } + public static bool operator !=(Azure.Storage.DataMovement.DataTransferStatus left, Azure.Storage.DataMovement.DataTransferStatus right) { throw null; } } public partial class LocalFilesStorageResourceProvider : Azure.Storage.DataMovement.StorageResourceProvider { diff --git a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs index ee62763d45fa..87fdf1441a0d 100644 --- a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs +++ b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs @@ -89,6 +89,10 @@ protected internal DataTransferStatus(Azure.Storage.DataMovement.DataTransferSta public bool HasSkippedItems { get { throw null; } } public Azure.Storage.DataMovement.DataTransferState State { get { throw null; } } public bool Equals(Azure.Storage.DataMovement.DataTransferStatus other) { throw null; } + public override bool Equals(object obj) { throw null; } + public override int GetHashCode() { throw null; } + public static bool operator ==(Azure.Storage.DataMovement.DataTransferStatus left, Azure.Storage.DataMovement.DataTransferStatus right) { throw null; } + public static bool operator !=(Azure.Storage.DataMovement.DataTransferStatus left, Azure.Storage.DataMovement.DataTransferStatus right) { throw null; } } public partial class LocalFilesStorageResourceProvider : Azure.Storage.DataMovement.StorageResourceProvider { diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Azure.Storage.DataMovement.csproj b/sdk/storage/Azure.Storage.DataMovement/src/Azure.Storage.DataMovement.csproj index 08a31d0b233e..e4e4c3f67753 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Azure.Storage.DataMovement.csproj +++ b/sdk/storage/Azure.Storage.DataMovement/src/Azure.Storage.DataMovement.csproj @@ -38,5 +38,6 @@ + \ No newline at end of file diff --git a/sdk/storage/Azure.Storage.DataMovement/src/CheckpointerExtensions.cs b/sdk/storage/Azure.Storage.DataMovement/src/CheckpointerExtensions.cs index 75ebf0a1c658..28200dbf1466 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/CheckpointerExtensions.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/CheckpointerExtensions.cs @@ -1,38 +1,41 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -using System; +using System.IO; using System.Threading; using System.Threading.Tasks; +using Azure.Storage.DataMovement.JobPlan; namespace Azure.Storage.DataMovement { internal partial class CheckpointerExtensions { - internal static async Task IsResumableAsync( + internal static async Task GetJobStatusAsync( this TransferCheckpointer checkpointer, string transferId, - CancellationToken cancellationToken) + CancellationToken cancellationToken = default) { - DataTransferState transferState = (DataTransferState) await checkpointer.GetByteValue( - transferId, - DataMovementConstants.JobPartPlanFile.AtomicJobStatusStateIndex, - cancellationToken).ConfigureAwait(false); - - byte hasFailedItemsByte = await checkpointer.GetByteValue( + using (Stream stream = await checkpointer.ReadJobPlanFileAsync( transferId, - DataMovementConstants.JobPartPlanFile.AtomicJobStatusHasFailedIndex, - cancellationToken).ConfigureAwait(false); - bool hasFailedItems = Convert.ToBoolean(hasFailedItemsByte); + DataMovementConstants.JobPlanFile.JobStatusIndex, + DataMovementConstants.IntSizeInBytes, + cancellationToken).ConfigureAwait(false)) + { + BinaryReader reader = new BinaryReader(stream); + JobPlanStatus jobPlanStatus = (JobPlanStatus)reader.ReadInt32(); + return jobPlanStatus.ToDataTransferStatus(); + } + } - byte hasSkippedItemsByte = await checkpointer.GetByteValue( - transferId, - DataMovementConstants.JobPartPlanFile.AtomicJobStatusHasSkippedIndex, - cancellationToken).ConfigureAwait(false); - bool hasSkippedItems = Convert.ToBoolean(hasSkippedItemsByte); + internal static async Task IsResumableAsync( + this TransferCheckpointer checkpointer, + string transferId, + CancellationToken cancellationToken) + { + DataTransferStatus jobStatus = await checkpointer.GetJobStatusAsync(transferId, cancellationToken).ConfigureAwait(false); // Transfers marked as fully completed are not resumable - return transferState != DataTransferState.Completed || hasFailedItems || hasSkippedItems; + return jobStatus.State != DataTransferState.Completed || jobStatus.HasFailedItems || jobStatus.HasSkippedItems; } internal static async Task GetDataTransferPropertiesAsync( diff --git a/sdk/storage/Azure.Storage.DataMovement/src/DataTransferStatus.cs b/sdk/storage/Azure.Storage.DataMovement/src/DataTransferStatus.cs index fde3d6e26603..9b7486acc968 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/DataTransferStatus.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/DataTransferStatus.cs @@ -2,7 +2,6 @@ // Licensed under the MIT License. using System; -using System.Runtime.CompilerServices; using System.Threading; namespace Azure.Storage.DataMovement @@ -105,15 +104,54 @@ internal bool TrySetTransferStateChange(DataTransferState state) return Interlocked.Exchange(ref _stateValue, (int)state) != (int)state; } + /// + public bool Equals(DataTransferStatus other) + { + if (other == null) + { + return false; + } + + return State.Equals(other.State) && + HasFailedItems.Equals(other.HasFailedItems) && + HasSkippedItems.Equals(other.HasSkippedItems); + } + /// - /// Indicates whether the current object is equal to another object of the same type. + /// Equality operator. /// - /// An object to compare with this object. - /// Returns true if the current object is equal to the other parameter; otherwise, false. - public bool Equals(DataTransferStatus other) - => State.Equals(other.State) && - HasFailedItems.Equals(other.HasFailedItems) && - HasSkippedItems.Equals(other.HasSkippedItems); + /// The left hand side. + /// The right hand side. + /// True, if the two values are equal; otherwise false. + public static bool operator ==(DataTransferStatus left, DataTransferStatus right) + { + if (left is null != right is null) + { + return false; + } + return left?.Equals(right) ?? true; + } + + /// + /// Inequality operator. + /// + /// The left hand side. + /// The right hand side. + /// True, if the two values are not equal; otherwise false. + public static bool operator !=(DataTransferStatus left, DataTransferStatus right) => !(left == right); + + /// + public override bool Equals(object obj) => Equals(obj as DataTransferStatus); + + /// + public override int GetHashCode() + { + int hashCode = 1225395075; + hashCode = hashCode * -1521134295 + State.GetHashCode(); + hashCode = hashCode * -1521134295 + HasFailedItems.GetHashCode(); + hashCode = hashCode * -1521134295 + HasSkippedItems.GetHashCode(); + return hashCode; + } /// /// Performs a Deep Copy of the . diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs index 8ee2f90917d6..45a72ffaefa0 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs @@ -16,6 +16,7 @@ internal class DataMovementConstants internal const int MaxJobPartReaders = 64; internal const int MaxJobChunkTasks = 3000; internal const int StatusCheckInSec = 10; + internal const int DefaultArrayPoolArraySize = 4 * 1024; internal static class ConcurrencyTuner { diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataTransferStatusInternal.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataTransferStatusInternal.cs index ab00ece7466e..5ebae0b04a3e 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataTransferStatusInternal.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataTransferStatusInternal.cs @@ -5,6 +5,9 @@ namespace Azure.Storage.DataMovement { internal class DataTransferStatusInternal : DataTransferStatus { + public DataTransferStatusInternal() : base() + { } + public DataTransferStatusInternal( DataTransferState state, bool hasFailedItems, diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/JobPlan/JobPlanHeader.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/JobPlan/JobPlanHeader.cs index 9d42aec4fe5d..b8e4ffebecd2 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/JobPlan/JobPlanHeader.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/JobPlan/JobPlanHeader.cs @@ -38,7 +38,7 @@ internal class JobPlanHeader /// /// The current status of the transfer job. /// - public JobPlanStatus JobStatus; + public DataTransferStatus JobStatus; /// /// The parent path for the source of the transfer. @@ -56,7 +56,7 @@ public JobPlanHeader( DateTimeOffset createTime, JobPlanOperation operationType, bool enumerationComplete, - JobPlanStatus jobStatus, + DataTransferStatus jobStatus, string parentSourcePath, string parentDestinationPath) { @@ -100,23 +100,15 @@ public void Serialize(Stream stream) writer.Write(Convert.ToByte(EnumerationComplete)); // JobStatus - writer.Write((int)JobStatus); + writer.Write((int)JobStatus.ToJobPlanStatus()); - // ParentSourcePath offset + // ParentSourcePath offset/length byte[] parentSourcePathBytes = Encoding.UTF8.GetBytes(ParentSourcePath); - writer.Write(currentVariableLengthIndex); - currentVariableLengthIndex += parentSourcePathBytes.Length; - - // ParentSourcePath length - writer.Write(parentSourcePathBytes.Length); + JobPlanExtensions.WriteVariableLengthFieldInfo(writer, parentSourcePathBytes, ref currentVariableLengthIndex); - // ParentDestinationPath offset + // ParentDestinationPath offset/length byte[] parentDestinationPathBytes = Encoding.UTF8.GetBytes(ParentDestinationPath); - writer.Write(currentVariableLengthIndex); - currentVariableLengthIndex += parentDestinationPathBytes.Length; - - // ParentDestinationPath length - writer.Write(parentDestinationPathBytes.Length); + JobPlanExtensions.WriteVariableLengthFieldInfo(writer, parentDestinationPathBytes, ref currentVariableLengthIndex); // ParentSourcePath writer.Write(parentSourcePathBytes); @@ -156,7 +148,7 @@ public static JobPlanHeader Deserialize(Stream stream) bool enumerationComplete = Convert.ToBoolean(enumerationCompleteByte); // JobStatus - JobPlanStatus jobStatus = (JobPlanStatus)reader.ReadInt32(); + JobPlanStatus jobPlanStatus = (JobPlanStatus)reader.ReadInt32(); // ParentSourcePath offset int parentSourcePathOffset = reader.ReadInt32(); @@ -194,7 +186,7 @@ public static JobPlanHeader Deserialize(Stream stream) createTime, operationType, enumerationComplete, - jobStatus, + jobPlanStatus.ToDataTransferStatus(), parentSourcePath, parentDestinationPath); } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/JobPlanExtensions.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/JobPlanExtensions.cs index 31dc4a73e443..59101bfbf938 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/JobPlanExtensions.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/JobPlanExtensions.cs @@ -51,86 +51,44 @@ internal static JobPartPlanHeader GetJobPartPlanHeader(this JobPartPlanFileName string transferId, CancellationToken cancellationToken) { - int startIndex = DataMovementConstants.JobPartPlanFile.SourcePathLengthIndex; - int readLength = DataMovementConstants.JobPartPlanFile.DestinationExtraQueryLengthIndex - startIndex; + int startIndex = DataMovementConstants.JobPlanFile.ParentSourcePathOffsetIndex; - int partCount = await checkpointer.CurrentJobPartCountAsync(transferId).ConfigureAwait(false); - string storedSourcePath = default; - string storedDestPath = default; - for (int i = 0; i < partCount; i++) + string parentSourcePath = default; + string parentDestinationPath = default; + using (Stream stream = await checkpointer.ReadJobPlanFileAsync( + transferId: transferId, + offset: startIndex, + length: 0, // Read to the end + cancellationToken: cancellationToken).ConfigureAwait(false)) { - using (Stream stream = await checkpointer.ReadableStreamAsync( - transferId: transferId, - partNumber: i, - offset: startIndex, - readSize: readLength, - cancellationToken: cancellationToken).ConfigureAwait(false)) - { - BinaryReader reader = new BinaryReader(stream); - - // Read Source Path Length - byte[] pathLengthBuffer = reader.ReadBytes(DataMovementConstants.UShortSizeInBytes); - ushort pathLength = pathLengthBuffer.ToUShort(); - - // Read Source Path - byte[] pathBuffer = reader.ReadBytes(DataMovementConstants.JobPartPlanFile.PathStrNumBytes); - string sourcePath = pathBuffer.ToString(pathLength); - - // Set the stream position to the start of the destination path - reader.BaseStream.Position = DataMovementConstants.JobPartPlanFile.DestinationPathLengthIndex - startIndex; + BinaryReader reader = new BinaryReader(stream); - // Read Destination Path Length - pathLengthBuffer = reader.ReadBytes(DataMovementConstants.UShortSizeInBytes); - pathLength = pathLengthBuffer.ToUShort(); + // ParentSourcePath offset/length + int parentSourcePathOffset = reader.ReadInt32() - startIndex; + int parentSourcePathLength = reader.ReadInt32(); - // Read Destination Path - pathBuffer = reader.ReadBytes(DataMovementConstants.JobPartPlanFile.PathStrNumBytes); - string destPath = pathBuffer.ToString(pathLength); + // ParentDestinationPath offset/length + int parentDestinationPathOffset = reader.ReadInt32() - startIndex; + int parentDestinationPathLength = reader.ReadInt32(); - if (string.IsNullOrEmpty(storedSourcePath)) - { - // If we currently don't have a path - storedSourcePath = sourcePath; - storedDestPath = destPath; - } - else - { - // If there's already an existing path, let's compare the two paths - // and find the common parent path. - storedSourcePath = GetLongestCommonString(storedSourcePath, sourcePath); - storedDestPath = GetLongestCommonString(storedDestPath, destPath); - } + // ParentSourcePath + if (parentSourcePathOffset > 0) + { + reader.BaseStream.Position = parentSourcePathOffset; + byte[] parentSourcePathBytes = reader.ReadBytes(parentSourcePathLength); + parentSourcePath = parentSourcePathBytes.ToString(parentSourcePathLength); } - } - if (partCount == 1) - { - return (storedSourcePath, storedDestPath); - } - else - { - // The resulting stored paths are just longest common string, trim to last / to get a path - return (TrimStringToPath(storedSourcePath), TrimStringToPath(storedDestPath)); - } - } - - private static string GetLongestCommonString(string path1, string path2) - { - int length = Math.Min(path1.Length, path2.Length); - int index = 0; - - while (index < length && path1[index] == path2[index]) - { - index++; + // ParentDestinationPath + if (parentDestinationPathOffset > 0) + { + reader.BaseStream.Position = parentDestinationPathOffset; + byte[] parentDestinationPathBytes = reader.ReadBytes(parentDestinationPathLength); + parentDestinationPath = parentDestinationPathBytes.ToString(parentDestinationPathLength); + } } - return path1.Substring(0, index); - } - - private static string TrimStringToPath(string path) - { - int lastSlash = path.Replace('\\', '/').LastIndexOf('/'); - return path.Substring(0, lastSlash); + return (parentSourcePath, parentDestinationPath); } internal static async Task<(string Source, string Destination)> GetResourceIdsAsync( @@ -143,11 +101,11 @@ private static string TrimStringToPath(string path) string sourceResourceId; string destinationResourceId; - using (Stream stream = await checkpointer.ReadableStreamAsync( + using (Stream stream = await checkpointer.ReadJobPartPlanFileAsync( transferId: transferId, partNumber: 0, offset: startIndex, - readSize: readLength, + length: readLength, cancellationToken: cancellationToken).ConfigureAwait(false)) { BinaryReader reader = new BinaryReader(stream); @@ -214,11 +172,11 @@ internal static async Task GetHeaderUShortValue( CancellationToken cancellationToken) { string value; - using (Stream stream = await checkpointer.ReadableStreamAsync( + using (Stream stream = await checkpointer.ReadJobPartPlanFileAsync( transferId: transferId, partNumber: 0, offset: startIndex, - readSize: streamReadLength, + length: streamReadLength, cancellationToken: cancellationToken).ConfigureAwait(false)) { BinaryReader reader = new BinaryReader(stream); @@ -243,11 +201,11 @@ internal static async Task GetHeaderLongValue( CancellationToken cancellationToken) { string value; - using (Stream stream = await checkpointer.ReadableStreamAsync( + using (Stream stream = await checkpointer.ReadJobPartPlanFileAsync( transferId: transferId, partNumber: 0, offset: startIndex, - readSize: streamReadLength, + length: streamReadLength, cancellationToken: cancellationToken).ConfigureAwait(false)) { BinaryReader reader = new BinaryReader(stream); @@ -270,11 +228,11 @@ internal static async Task GetByteValue( CancellationToken cancellationToken) { byte value; - using (Stream stream = await checkpointer.ReadableStreamAsync( + using (Stream stream = await checkpointer.ReadJobPartPlanFileAsync( transferId: transferId, partNumber: 0, offset: startIndex, - readSize: DataMovementConstants.OneByte, + length: DataMovementConstants.OneByte, cancellationToken: cancellationToken).ConfigureAwait(false)) { BinaryReader reader = new BinaryReader(stream); @@ -284,5 +242,93 @@ internal static async Task GetByteValue( } return value; } + + internal static JobPlanStatus ToJobPlanStatus(this DataTransferStatus transferStatus) + { + if (transferStatus == default) + { + return JobPlanStatus.None; + } + + JobPlanStatus jobPlanStatus = (JobPlanStatus)Enum.Parse(typeof(JobPlanStatus), transferStatus.State.ToString()); + if (transferStatus.HasFailedItems) + { + jobPlanStatus |= JobPlanStatus.HasFailed; + } + if (transferStatus.HasSkippedItems) + { + jobPlanStatus |= JobPlanStatus.HasSkipped; + } + + return jobPlanStatus; + } + + internal static DataTransferStatus ToDataTransferStatus(this JobPlanStatus jobPlanStatus) + { + DataTransferState state; + if (jobPlanStatus.HasFlag(JobPlanStatus.Queued)) + { + state = DataTransferState.Queued; + } + else if (jobPlanStatus.HasFlag(JobPlanStatus.InProgress)) + { + state = DataTransferState.InProgress; + } + else if (jobPlanStatus.HasFlag(JobPlanStatus.Pausing)) + { + state = DataTransferState.Pausing; + } + else if (jobPlanStatus.HasFlag(JobPlanStatus.Stopping)) + { + state = DataTransferState.Stopping; + } + else if (jobPlanStatus.HasFlag(JobPlanStatus.Paused)) + { + state = DataTransferState.Paused; + } + else if (jobPlanStatus.HasFlag(JobPlanStatus.Completed)) + { + state = DataTransferState.Completed; + } + else + { + state = DataTransferState.None; + } + + bool hasFailed = jobPlanStatus.HasFlag(JobPlanStatus.HasFailed); + bool hasSkipped = jobPlanStatus.HasFlag(JobPlanStatus.HasSkipped); + + return new DataTransferStatusInternal(state, hasFailed, hasSkipped); + } + + /// + /// Writes the length and offset field for the given byte array + /// and increments currentVariableLengthIndex accordingly. + /// + /// The writer to write to. + /// The data to write info about. + /// + /// A reference to the current index of the variable length fields + /// that will be used to set the offset and then incremented. + /// + internal static void WriteVariableLengthFieldInfo( + BinaryWriter writer, + byte[] bytes, + ref int currentVariableLengthIndex) + { + // Write the offset, -1 if size is 0 + if (bytes.Length > 0) + { + writer.Write(currentVariableLengthIndex); + currentVariableLengthIndex += bytes.Length; + } + else + { + writer.Write(-1); + } + + // Write the length + writer.Write(bytes.Length); + } } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/LocalTransferCheckpointer.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/LocalTransferCheckpointer.cs index d29c956049be..8de5ea3ab7bf 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/LocalTransferCheckpointer.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/LocalTransferCheckpointer.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using System; +using System.Buffers; using System.Collections.Generic; using System.IO; using System.IO.MemoryMappedFiles; @@ -10,6 +11,7 @@ using System.Threading.Tasks; using Azure.Core; using Azure.Storage.DataMovement.JobPlan; +using Azure.Storage.Shared; namespace Azure.Storage.DataMovement { @@ -71,13 +73,14 @@ public override async Task AddNewJobAsync( DateTimeOffset.UtcNow, GetOperationType(source, destination), false, /* enumerationComplete */ - JobPlanStatus.Queued, + new DataTransferStatusInternal(), source.Uri.AbsoluteUri, destination.Uri.AbsoluteUri); using (Stream headerStream = new MemoryStream()) { header.Serialize(headerStream); + headerStream.Position = 0; JobPlanFile jobPlanFile = await JobPlanFile.CreateJobPlanFileAsync( _pathToCheckpointer, transferId, @@ -125,41 +128,65 @@ public override Task CurrentJobPartCountAsync( CancellationToken cancellationToken = default) { CancellationHelper.ThrowIfCancellationRequested(cancellationToken); - if (_transferStates.TryGetValue(transferId, out var result)) + if (_transferStates.TryGetValue(transferId, out JobPlanFile result)) { - return Task.FromResult(result.JobParts.Count); + return Task.FromResult(result.JobParts.Count); } throw Errors.MissingTransferIdCheckpointer(transferId); } /// - public override async Task ReadableStreamAsync( + public override async Task ReadJobPlanFileAsync( + string transferId, + int offset, + int length, + CancellationToken cancellationToken = default) + { + int maxArraySize = length > 0 ? length : DataMovementConstants.DefaultArrayPoolArraySize; + Stream copiedStream = new PooledMemoryStream(ArrayPool.Shared, maxArraySize); + + CancellationHelper.ThrowIfCancellationRequested(cancellationToken); + if (_transferStates.TryGetValue(transferId, out JobPlanFile jobPlanFile)) + { + using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPlanFile.FilePath)) + using (MemoryMappedViewStream mmfStream = mmf.CreateViewStream(offset, length, MemoryMappedFileAccess.Read)) + { + await mmfStream.CopyToAsync(copiedStream).ConfigureAwait(false); + copiedStream.Position = 0; + return copiedStream; + } + } + else + { + throw Errors.MissingTransferIdCheckpointer(transferId); + } + } + + /// + public override async Task ReadJobPartPlanFileAsync( string transferId, int partNumber, - long offset, - long readSize, + int offset, + int length, CancellationToken cancellationToken = default) { if (_transferStates.TryGetValue(transferId, out JobPlanFile jobPlanFile)) { if (jobPlanFile.JobParts.TryGetValue(partNumber, out JobPartPlanFile jobPartPlanFile)) { - Stream copiedStream = new MemoryStream(DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes); + int maxArraySize = length > 0 ? length : DataMovementConstants.DefaultArrayPoolArraySize; + Stream copiedStream = new PooledMemoryStream(ArrayPool.Shared, maxArraySize); + // MMF lock await jobPartPlanFile.WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false); // Open up MemoryMappedFile - using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile( - path: jobPartPlanFile.FilePath, - mode: FileMode.Open, - mapName: null, - capacity: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) + using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPartPlanFile.FilePath)) + using (MemoryMappedViewStream mmfStream = mmf.CreateViewStream(offset, length, MemoryMappedFileAccess.Read)) { - using (MemoryMappedViewStream mmfStream = mmf.CreateViewStream(offset, readSize, MemoryMappedFileAccess.Read)) - { - await mmfStream.CopyToAsync(copiedStream).ConfigureAwait(false); - } + await mmfStream.CopyToAsync(copiedStream).ConfigureAwait(false); } + // MMF release jobPartPlanFile.WriteLock.Release(); copiedStream.Position = 0; @@ -177,48 +204,29 @@ public override async Task ReadableStreamAsync( } /// - public override async Task WriteToCheckpointAsync( + public override async Task WriteToJobPlanFileAsync( string transferId, - int partNumber, - long chunkIndex, + int fileOffset, byte[] buffer, + int bufferOffset, + int length, CancellationToken cancellationToken = default) { - Argument.AssertNotNullOrEmpty(transferId, nameof(transferId)); - Argument.AssertNotDefault(ref partNumber, nameof(partNumber)); - if (buffer?.Length == 0) - { - throw new ArgumentException("Buffer cannot be empty"); - } - + CancellationHelper.ThrowIfCancellationRequested(cancellationToken); if (_transferStates.TryGetValue(transferId, out JobPlanFile jobPlanFile)) { - if (jobPlanFile.JobParts.TryGetValue(partNumber, out JobPartPlanFile jobPartPlanFile)) - { - // Lock MMF - await jobPartPlanFile.WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false); - - using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile( - path: jobPartPlanFile.FilePath, - mode: FileMode.Open, - mapName: null, - capacity: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) - { - using (MemoryMappedViewAccessor accessor = mmf.CreateViewAccessor(chunkIndex, buffer.Length, MemoryMappedFileAccess.Write)) - { - accessor.WriteArray(0, buffer, 0, buffer.Length); - // to flush to the underlying file that supports the mmf - accessor.Flush(); - } - } + // Lock MMF + await jobPlanFile.WriteLock.WaitAsync().ConfigureAwait(false); - // Release MMF - jobPartPlanFile.WriteLock.Release(); - } - else + using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPlanFile.FilePath, FileMode.Open)) + using (MemoryMappedViewAccessor accessor = mmf.CreateViewAccessor(fileOffset, length, MemoryMappedFileAccess.Write)) { - throw Errors.MissingPartNumberCheckpointer(transferId, partNumber); + accessor.WriteArray(0, buffer, bufferOffset, length); + accessor.Flush(); } + + // Release MMF + jobPlanFile.WriteLock.Release(); } else { @@ -285,42 +293,25 @@ public override async Task SetJobTransferStatusAsync( DataTransferStatus status, CancellationToken cancellationToken = default) { - long length = DataMovementConstants.OneByte * 3; - int offset = DataMovementConstants.JobPartPlanFile.AtomicJobStatusStateIndex; + long length = DataMovementConstants.IntSizeInBytes; + int offset = DataMovementConstants.JobPlanFile.JobStatusIndex; CancellationHelper.ThrowIfCancellationRequested(cancellationToken); if (_transferStates.TryGetValue(transferId, out JobPlanFile jobPlanFile)) { - foreach (KeyValuePair jobPartPair in jobPlanFile.JobParts) + // Lock MMF + await jobPlanFile.WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false); + + using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPlanFile.FilePath, FileMode.Open)) + using (MemoryMappedViewAccessor accessor = mmf.CreateViewAccessor(offset, length)) { - CancellationHelper.ThrowIfCancellationRequested(cancellationToken); - // Lock MMF - await jobPartPair.Value.WriteLock.WaitAsync(cancellationToken).ConfigureAwait(false); - using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile( - path: jobPartPair.Value.FilePath, - mode: FileMode.Open, - mapName: null, - capacity: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) - { - using (MemoryMappedViewAccessor accessor = mmf.CreateViewAccessor(offset, length)) - { - accessor.Write( - position: 0, - value: (byte)status.State); - accessor.Write( - position: 1, - value: status.HasFailedItems); - accessor.Write( - position: 2, - value: status.HasSkippedItems); - // to flush to the underlying file that supports the mmf - accessor.Flush(); - } - } - // Release MMF - jobPartPair.Value.WriteLock.Release(); + accessor.Write(0, (int)status.ToJobPlanStatus()); + accessor.Flush(); } + + // Release MMF + jobPlanFile.WriteLock.Release(); } else { diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/TransferCheckpointer.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/TransferCheckpointer.cs index 0d8adc3a5e1e..aaeeed368b9e 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/TransferCheckpointer.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/TransferCheckpointer.cs @@ -72,12 +72,32 @@ public abstract Task CurrentJobPartCountAsync( CancellationToken cancellationToken = default); /// - /// Creates a stream to the stored memory stored checkpointing information. + /// Reads the specified part of the job plan file and returns it in a Stream. + /// + /// The transfer ID. + /// The offset to start reading the job plan file at. + /// + /// The maximum number of bytes to read. + /// Specify 0 (zero) to create a stream over teh whole file. + /// + /// + /// Optional to propagate + /// notifications that the operation should be canceled. + /// + /// A Stream of the requested part of the Job Plan File. + public abstract Task ReadJobPlanFileAsync( + string transferId, + int offset, + int length, + CancellationToken cancellationToken = default); + + /// + /// Reads the specified part of the job part plan file and returns it in a Stream. /// /// The transfer ID. /// The job part number. - /// The offset of the current transfer. - /// + /// The offset to start reading the job part plan file at. + /// /// The size of how many bytes to read. /// Specify 0 (zero) to create a stream that ends approximately at the end of the file. /// @@ -86,32 +106,31 @@ public abstract Task CurrentJobPartCountAsync( /// notifications that the operation should be canceled. /// /// The Stream to the checkpoint of the respective job ID and part number. - public abstract Task ReadableStreamAsync( + public abstract Task ReadJobPartPlanFileAsync( string transferId, int partNumber, - long offset, - long readSize, + int offset, + int length, CancellationToken cancellationToken = default); /// - /// Writes to the memory mapped file to store the checkpointing information. - /// - /// Creates the file for the respective ID if it does not currently exist. + /// Writes to the job plan file at the given offset. /// /// The transfer ID. - /// The job part number. - /// The offset of the current transfer. - /// The buffer to write data from to the checkpoint. + /// The offset into the job plan file to start writing at. + /// The data to write. + /// The offset into the given buffer to start reading from. + /// The length of data to read from the buffer and write to the job plan file. /// /// Optional to propagate /// notifications that the operation should be canceled. /// - /// - public abstract Task WriteToCheckpointAsync( + public abstract Task WriteToJobPlanFileAsync( string transferId, - int partNumber, - long offset, + int fileOffset, byte[] buffer, + int bufferOffset, + int length, CancellationToken cancellationToken = default); /// diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/TransferManager.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/TransferManager.cs index f104d47865e5..b5e6dc45a952 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/TransferManager.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/TransferManager.cs @@ -550,11 +550,11 @@ private async Task BuildSingleTransferJob( if (resumeJob) { - using (Stream stream = await _checkpointer.ReadableStreamAsync( + using (Stream stream = await _checkpointer.ReadJobPartPlanFileAsync( transferId: dataTransfer.Id, partNumber: 0, offset: 0, - readSize: 0, + length: 0, cancellationToken: _cancellationToken).ConfigureAwait(false)) { streamToUriJob.AppendJobPart( @@ -590,11 +590,11 @@ await streamToUriJob.ToJobPartAsync( if (resumeJob) { - using (Stream stream = await _checkpointer.ReadableStreamAsync( + using (Stream stream = await _checkpointer.ReadJobPartPlanFileAsync( transferId: dataTransfer.Id, partNumber: 0, offset: 0, - readSize: 0, + length: 0, cancellationToken: _cancellationToken).ConfigureAwait(false)) { serviceToServiceJob.AppendJobPart( @@ -623,11 +623,11 @@ await serviceToServiceJob.ToJobPartAsync( if (resumeJob) { - using (Stream stream = await _checkpointer.ReadableStreamAsync( + using (Stream stream = await _checkpointer.ReadJobPartPlanFileAsync( transferId: dataTransfer.Id, partNumber: 0, offset: 0, - readSize: 0, + length: 0, cancellationToken: _cancellationToken).ConfigureAwait(false)) { uriToStreamJob.AppendJobPart( @@ -675,11 +675,11 @@ private async Task BuildContainerTransferJob( cancellationToken: _cancellationToken).ConfigureAwait(false); for (var currentJobPart = 0; currentJobPart < jobPartCount; currentJobPart++) { - using (Stream stream = await _checkpointer.ReadableStreamAsync( + using (Stream stream = await _checkpointer.ReadJobPartPlanFileAsync( transferId: dataTransfer.Id, partNumber: currentJobPart, offset: 0, - readSize: 0, + length: 0, cancellationToken: _cancellationToken).ConfigureAwait(false)) { streamToUriJob.AppendJobPart( @@ -722,11 +722,11 @@ await streamToUriJob.ToJobPartAsync( cancellationToken: _cancellationToken).ConfigureAwait(false); for (var currentJobPart = 0; currentJobPart < jobPartCount; currentJobPart++) { - using (Stream stream = await _checkpointer.ReadableStreamAsync( + using (Stream stream = await _checkpointer.ReadJobPartPlanFileAsync( transferId: dataTransfer.Id, partNumber: currentJobPart, offset: 0, - readSize: 0, + length: 0, cancellationToken: _cancellationToken).ConfigureAwait(false)) { serviceToServiceJob.AppendJobPart( @@ -762,11 +762,11 @@ await serviceToServiceJob.ToJobPartAsync( cancellationToken: _cancellationToken).ConfigureAwait(false); for (var currentJobPart = 0; currentJobPart < jobPartCount; currentJobPart++) { - using (Stream stream = await _checkpointer.ReadableStreamAsync( + using (Stream stream = await _checkpointer.ReadJobPartPlanFileAsync( transferId: dataTransfer.Id, partNumber: currentJobPart, offset: 0, - readSize: 0, + length: 0, cancellationToken: _cancellationToken).ConfigureAwait(false)) { uriToStreamJob.AppendJobPart( @@ -802,36 +802,15 @@ private static LocalTransferCheckpointer CreateDefaultCheckpointer() private async Task SetDataTransfers() { _dataTransfers.Clear(); + List storedTransfers = await _checkpointer.GetStoredTransfersAsync().ConfigureAwait(false); foreach (string transferId in storedTransfers) { - int jobPartCount = await _checkpointer.CurrentJobPartCountAsync( - transferId: transferId, - cancellationToken: _cancellationToken).ConfigureAwait(false); - - JobPartPlanHeader header; - using (Stream stream = await _checkpointer.ReadableStreamAsync( - transferId: transferId, - partNumber: 0, - offset: 0, - readSize: 0, - cancellationToken: _cancellationToken).ConfigureAwait(false)) - { - // Convert stream to job plan header - header = JobPartPlanHeader.Deserialize(stream); - } - - // Verify the contents of the header - // Check transfer id - if (!header.TransferId.Equals(transferId)) - { - throw Errors.MismatchTransferId(transferId, header.TransferId); - } - + DataTransferStatus jobStatus = await _checkpointer.GetJobStatusAsync(transferId).ConfigureAwait(false); _dataTransfers.Add(transferId, new DataTransfer( id: transferId, transferManager: this, - status: header.AtomicJobStatus)); + status: jobStatus)); } } diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/GetTransfersTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/GetTransfersTests.cs index 33f7a8793360..27b7430b2b9d 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/GetTransfersTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/GetTransfersTests.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using Azure.Storage.DataMovement.JobPlan; using NUnit.Framework; namespace Azure.Storage.DataMovement.Tests @@ -128,7 +129,7 @@ public async Task GetTransfers_Filtered( IList result = await manager.GetTransfersAsync(status).ToListAsync(); // Assert - AssertListTransfersEquals(storedTransfers.Where( d => d.TransferStatus == status).ToList(), result); + AssertListTransfersEquals(storedTransfers.Where(d => d.TransferStatus == status).ToList(), result); } [Test] @@ -281,20 +282,18 @@ public async Task GetResumableTransfers_IgnoresCompleted() LocalTransferCheckpointerFactory factory = new LocalTransferCheckpointerFactory(test.DirectoryPath); string transferId1 = Guid.NewGuid().ToString(); - factory.CreateStubJobPlanFile(test.DirectoryPath, transferId1); + factory.CreateStubJobPlanFile(test.DirectoryPath, transferId1, status: SuccessfulCompletedStatus); factory.CreateStubJobPartPlanFilesAsync( test.DirectoryPath, transferId1, - 3 /* jobPartCount */, - SuccessfulCompletedStatus); + 3 /* jobPartCount */); string transferId2 = Guid.NewGuid().ToString(); - factory.CreateStubJobPlanFile(test.DirectoryPath, transferId2); + factory.CreateStubJobPlanFile(test.DirectoryPath, transferId2, status: QueuedStatus); factory.CreateStubJobPartPlanFilesAsync( test.DirectoryPath, transferId2, - 3 /* jobPartCount */, - QueuedStatus); + 3 /* jobPartCount */); // Build TransferManager with the stored transfers TransferManagerOptions options = new TransferManagerOptions() @@ -317,7 +316,11 @@ private void AddTransferFromDataTransferProperties( DataTransferProperties properties) { // First add the job plan file for the transfer - factory.CreateStubJobPlanFile(checkpointerPath, properties.TransferId); + factory.CreateStubJobPlanFile( + checkpointerPath, + properties.TransferId, + parentSourcePath: properties.SourcePath, + parentDestinationPath: properties.DestinationPath); if (properties.IsContainer) { diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/JobPlanHeaderTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/JobPlanHeaderTests.cs index 85b44b89972e..147476562e1f 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/JobPlanHeaderTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/JobPlanHeaderTests.cs @@ -24,7 +24,7 @@ public void Ctor() Assert.AreEqual(DefaultCreateTime, header.CreateTime); Assert.AreEqual(DefaultJobPlanOperation, header.OperationType); Assert.AreEqual(false, header.EnumerationComplete); - Assert.AreEqual(DefaultJobPlanStatus, header.JobStatus); + Assert.AreEqual(DefaultJobStatus, header.JobStatus); Assert.AreEqual(DefaultSourcePath, header.ParentSourcePath); Assert.AreEqual(DefaultDestinationPath, header.ParentDestinationPath); } @@ -78,7 +78,7 @@ private void DeserializeAndVerify(Stream stream, string version) Assert.AreEqual(DefaultCreateTime, deserialized.CreateTime); Assert.AreEqual(DefaultJobPlanOperation, deserialized.OperationType); Assert.AreEqual(false, deserialized.EnumerationComplete); - Assert.AreEqual(DefaultJobPlanStatus, deserialized.JobStatus); + Assert.AreEqual(DefaultJobStatus, deserialized.JobStatus); Assert.AreEqual(DefaultSourcePath, deserialized.ParentSourcePath); Assert.AreEqual(DefaultDestinationPath, deserialized.ParentDestinationPath); } diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/LocalTransferCheckpointerFactory.cs b/sdk/storage/Azure.Storage.DataMovement/tests/LocalTransferCheckpointerFactory.cs index 6df3147b9924..e5956e9804c5 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/LocalTransferCheckpointerFactory.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/LocalTransferCheckpointerFactory.cs @@ -74,12 +74,12 @@ public LocalTransferCheckpointer BuildCheckpointer(List dataTransf { foreach (DataTransfer dataTransfer in dataTransfers) { - CreateStubJobPlanFile(_checkpointerPath, dataTransfer.Id); + CreateStubJobPlanFile(_checkpointerPath, dataTransfer.Id, status: dataTransfer.TransferStatus); CreateStubJobPartPlanFilesAsync( checkpointerPath: _checkpointerPath, transferId: dataTransfer.Id, jobPartCount: _partCountDefault, - status: dataTransfer.TransferStatus); + status: new DataTransferStatus(DataTransferState.Paused, false, false)); } return new LocalTransferCheckpointer(_checkpointerPath); } @@ -143,17 +143,23 @@ internal void CreateStubJobPartPlanFilesAsync( } } - internal void CreateStubJobPlanFile(string checkpointPath, string transferId) + internal void CreateStubJobPlanFile( + string checkpointPath, + string transferId, + string parentSourcePath = _testSourcePath, + string parentDestinationPath = _testDestinationPath, + DataTransferStatus status = default) { + status ??= new DataTransferStatus(); JobPlanHeader header = new JobPlanHeader( DataMovementConstants.JobPlanFile.SchemaVersion, transferId, DateTimeOffset.UtcNow, JobPlanOperation.ServiceToService, false, /* enumerationComplete */ - JobPlanStatus.Queued, - _testSourcePath, - _testDestinationPath); + status, + parentSourcePath, + parentDestinationPath); string filePath = Path.Combine(checkpointPath, $"{transferId}.{DataMovementConstants.JobPlanFile.FileExtension}"); using (FileStream stream = File.Create(filePath)) diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/LocalTransferCheckpointerTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/LocalTransferCheckpointerTests.cs index ea65650e98eb..ca431b3208b8 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/LocalTransferCheckpointerTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/LocalTransferCheckpointerTests.cs @@ -226,11 +226,11 @@ await transferCheckpointer.AddNewJobPartAsync( int partCount = await transferCheckpointer.CurrentJobPartCountAsync(transferId); Assert.AreEqual(1, partCount); - using (Stream stream = await transferCheckpointer.ReadableStreamAsync( + using (Stream stream = await transferCheckpointer.ReadJobPartPlanFileAsync( transferId: transferId, partNumber: partNumber, offset: 0, - readSize: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) + length: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) { // Assert await CheckpointerTesting.AssertJobPlanHeaderAsync(header, stream); @@ -352,41 +352,41 @@ await transferCheckpointer.AddNewJobPartAsync( Assert.AreEqual(1, transferIds.Count); Assert.IsTrue(transferIds.Contains(transferId)); - using (Stream stream = await transferCheckpointer.ReadableStreamAsync( + using (Stream stream = await transferCheckpointer.ReadJobPartPlanFileAsync( transferId: transferId, partNumber: 0, offset: 0, - readSize: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) + length: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) { // Assert await CheckpointerTesting.AssertJobPlanHeaderAsync(header1, stream); } - using (Stream stream = await transferCheckpointer.ReadableStreamAsync( + using (Stream stream = await transferCheckpointer.ReadJobPartPlanFileAsync( transferId: transferId, partNumber: 1, offset: 0, - readSize: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) + length: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) { // Assert await CheckpointerTesting.AssertJobPlanHeaderAsync(header2, stream); } - using (Stream stream = await transferCheckpointer.ReadableStreamAsync( + using (Stream stream = await transferCheckpointer.ReadJobPartPlanFileAsync( transferId: transferId, partNumber: 2, offset: 0, - readSize: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) + length: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) { // Assert await CheckpointerTesting.AssertJobPlanHeaderAsync(header3, stream); } - using (Stream stream = await transferCheckpointer.ReadableStreamAsync( + using (Stream stream = await transferCheckpointer.ReadJobPartPlanFileAsync( transferId: transferId, partNumber: 3, offset: 0, - readSize: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) + length: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) { // Assert await CheckpointerTesting.AssertJobPlanHeaderAsync(header4, stream); @@ -434,11 +434,11 @@ await transferCheckpointer.AddNewJobPartAsync( Assert.AreEqual(1, transferIds.Count); Assert.IsTrue(transferIds.Contains(transferId)); - using (Stream stream = await transferCheckpointer.ReadableStreamAsync( + using (Stream stream = await transferCheckpointer.ReadJobPartPlanFileAsync( transferId: transferId, partNumber: 1, offset: 0, - readSize: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) + length: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) { // Assert await CheckpointerTesting.AssertJobPlanHeaderAsync(header, stream); @@ -731,7 +731,62 @@ public void CurrentJobPartCountAsync_Error() } [Test] - public async Task ReadableStreamAsync() + public async Task ReadJobPlanFileAsync() + { + // Arrange + using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory(); + TransferCheckpointer transferCheckpointer = new LocalTransferCheckpointer(test.DirectoryPath); + + string transferId = GetNewTransferId(); + await AddJobToCheckpointer(transferCheckpointer, transferId); + + // Act + JobPlanHeader header; + using (Stream stream = await transferCheckpointer.ReadJobPlanFileAsync(transferId, offset: 0, length: 0)) + { + header = JobPlanHeader.Deserialize(stream); + } + + // Assert + Assert.IsNotNull(header); + Assert.AreEqual(DataMovementConstants.JobPlanFile.SchemaVersion, header.Version); + Assert.AreEqual(transferId, header.TransferId); + Assert.AreEqual(CheckpointerTesting.DefaultWebSourcePath, header.ParentSourcePath); + Assert.AreEqual(CheckpointerTesting.DefaultWebDestinationPath, header.ParentDestinationPath); + } + + [Test] + public async Task ReadJobPlanFileAsync_Partial() + { + // Arrange + using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory(); + TransferCheckpointer transferCheckpointer = new LocalTransferCheckpointer(test.DirectoryPath); + + string transferId = GetNewTransferId(); + await AddJobToCheckpointer(transferCheckpointer, transferId); + + // Act + string actualTransferId; + int length = DataMovementConstants.GuidSizeInBytes; + using (Stream stream = await transferCheckpointer.ReadJobPlanFileAsync( + transferId, + DataMovementConstants.JobPlanFile.TransferIdIndex, + length)) + { + BinaryReader reader = new BinaryReader(stream); + byte[] transferIdBytes = reader.ReadBytes(length); + actualTransferId = new Guid(transferIdBytes).ToString(); + } + + DataTransferStatus actualJobStatus = await transferCheckpointer.GetJobStatusAsync(transferId); + + // Assert + Assert.AreEqual(transferId, actualTransferId); + Assert.AreEqual(actualJobStatus, new DataTransferStatus()); + } + + [Test] + public async Task ReadJobPartPlanFileAsync() { using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory(); @@ -758,11 +813,11 @@ await transferCheckpointer.AddNewJobPartAsync( } // Act - using (Stream stream = await transferCheckpointer.ReadableStreamAsync( + using (Stream stream = await transferCheckpointer.ReadJobPartPlanFileAsync( transferId: transferId, partNumber: partNumber, offset: 0, - readSize: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) + length: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) { // Assert await CheckpointerTesting.AssertJobPlanHeaderAsync(header, stream); @@ -770,7 +825,7 @@ await transferCheckpointer.AddNewJobPartAsync( } [Test] - public void ReadableStreamAsync_Error() + public void ReadJobPartPlanFileAsync_Error() { using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory(); @@ -782,13 +837,79 @@ public void ReadableStreamAsync_Error() // Act Assert.CatchAsync( - async () => await transferCheckpointer.ReadableStreamAsync( + async () => await transferCheckpointer.ReadJobPartPlanFileAsync( transferId: transferId, partNumber: partNumber, offset: 0, - readSize: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)); + length: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)); + } + + [Test] + public async Task WriteToJobPlanFileAsync() + { + // Arrange + using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory(); + TransferCheckpointer transferCheckpointer = new LocalTransferCheckpointer(test.DirectoryPath); + + string transferId = GetNewTransferId(); + await AddJobToCheckpointer(transferCheckpointer, transferId); + + // Act + // Change enumerationComplete (test with extra byte) + byte[] enumerationCompleteBytes = { 0x00, Convert.ToByte(true) }; + await transferCheckpointer.WriteToJobPlanFileAsync( + transferId, + DataMovementConstants.JobPlanFile.EnumerationCompleteIndex, + enumerationCompleteBytes, + bufferOffset: 1, + length: DataMovementConstants.OneByte); + + // Change Job Status + JobPlanStatus jobPlanStatus = JobPlanStatus.Completed | JobPlanStatus.HasSkipped; + byte[] jobPlanStatusBytes = BitConverter.GetBytes((int)jobPlanStatus); + await transferCheckpointer.WriteToJobPlanFileAsync( + transferId, + DataMovementConstants.JobPlanFile.JobStatusIndex, + jobPlanStatusBytes, + bufferOffset: 0, + length: DataMovementConstants.IntSizeInBytes); + + // Assert + int start = DataMovementConstants.JobPlanFile.EnumerationCompleteIndex; + int readLength = DataMovementConstants.JobPlanFile.ParentSourcePathOffsetIndex - start; + using (Stream stream = await transferCheckpointer.ReadJobPlanFileAsync( + transferId, + offset: start, + length: readLength)) + { + BinaryReader reader = new BinaryReader(stream); + bool enumerationComplete = Convert.ToBoolean(reader.ReadByte()); + Assert.IsTrue(enumerationComplete); + + JobPlanStatus actualJobPlanStatus = (JobPlanStatus)reader.ReadInt32(); + Assert.AreEqual(jobPlanStatus, actualJobPlanStatus); + } } + [Test] + public void WriteToJobPlanFileAsync_Error() + { + // Arrange + using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory(); + string transferId = GetNewTransferId(); + TransferCheckpointer transferCheckpointer = new LocalTransferCheckpointer(test.DirectoryPath); + + // Act / Assert + byte[] bytes = { 0x00 }; + Assert.CatchAsync( + async () => await transferCheckpointer.WriteToJobPlanFileAsync( + transferId: Guid.NewGuid().ToString(), + fileOffset: 0, + buffer: bytes, + bufferOffset: 0, + length: 1)); + } + [Test] public async Task SetJobTransferStatusAsync() { @@ -822,11 +943,11 @@ await transferCheckpointer.AddNewJobPartAsync( // Assert header.AtomicJobStatus = newStatus; - using (Stream stream = await transferCheckpointer.ReadableStreamAsync( + using (Stream stream = await transferCheckpointer.ReadJobPartPlanFileAsync( transferId: transferId, partNumber: partNumber, offset: 0, - readSize: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) + length: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) { await CheckpointerTesting.AssertJobPlanHeaderAsync(header, stream); } @@ -904,38 +1025,38 @@ await transferCheckpointer.AddNewJobPartAsync( // Assert header1.AtomicJobStatus = newStatus; - using (Stream stream = await transferCheckpointer.ReadableStreamAsync( + using (Stream stream = await transferCheckpointer.ReadJobPartPlanFileAsync( transferId: transferId, partNumber: 0, offset: 0, - readSize: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) + length: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) { await CheckpointerTesting.AssertJobPlanHeaderAsync(header1, stream); } header2.AtomicJobStatus = newStatus; - using (Stream stream = await transferCheckpointer.ReadableStreamAsync( + using (Stream stream = await transferCheckpointer.ReadJobPartPlanFileAsync( transferId: transferId, partNumber: 1, offset: 0, - readSize: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) + length: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) { await CheckpointerTesting.AssertJobPlanHeaderAsync(header2, stream); } header3.AtomicJobStatus = newStatus; - using (Stream stream = await transferCheckpointer.ReadableStreamAsync( + using (Stream stream = await transferCheckpointer.ReadJobPartPlanFileAsync( transferId: transferId, partNumber: 2, offset: 0, - readSize: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) + length: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) { await CheckpointerTesting.AssertJobPlanHeaderAsync(header3, stream); } header4.AtomicJobStatus = newStatus; - using (Stream stream = await transferCheckpointer.ReadableStreamAsync( + using (Stream stream = await transferCheckpointer.ReadJobPartPlanFileAsync( transferId: transferId, partNumber: 3, offset: 0, - readSize: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) + length: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) { await CheckpointerTesting.AssertJobPlanHeaderAsync(header4, stream); } @@ -995,11 +1116,11 @@ await transferCheckpointer.AddNewJobPartAsync( // Assert header.AtomicPartStatus = newStatus; - using (Stream stream = await transferCheckpointer.ReadableStreamAsync( + using (Stream stream = await transferCheckpointer.ReadJobPartPlanFileAsync( transferId: transferId, partNumber: partNumber, offset: 0, - readSize: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) + length: DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes)) { await CheckpointerTesting.AssertJobPlanHeaderAsync(header, stream); } diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/Shared/CheckpointerTesting.cs b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/CheckpointerTesting.cs index 95037e1dacc8..f9b335ea9fe4 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/Shared/CheckpointerTesting.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/CheckpointerTesting.cs @@ -57,7 +57,6 @@ internal static readonly DateTimeOffset DefaultStartTime internal static readonly DataTransferStatus DefaultJobStatus = new DataTransferStatusInternal(DataTransferState.Queued, false, false); internal static readonly DataTransferStatus DefaultPartStatus = new DataTransferStatusInternal(DataTransferState.Queued, false, false); internal static readonly DateTimeOffset DefaultCreateTime = new DateTimeOffset(2023, 08, 28, 17, 26, 0, default); - internal const JobPlanStatus DefaultJobPlanStatus = JobPlanStatus.Queued; internal static JobPartPlanHeader CreateDefaultJobPartHeader( string version = DataMovementConstants.JobPartPlanFile.SchemaVersion, @@ -183,7 +182,7 @@ internal static JobPlanHeader CreateDefaultJobHeader( DateTimeOffset createTime = default, JobPlanOperation operationType = DefaultJobPlanOperation, bool enumerationComplete = false, - JobPlanStatus jobStatus = DefaultJobPlanStatus, + DataTransferStatus jobStatus = default, string parentSourcePath = DefaultSourcePath, string parentDestinationPath = DefaultDestinationPath) { @@ -191,6 +190,7 @@ internal static JobPlanHeader CreateDefaultJobHeader( { createTime = DefaultCreateTime; } + jobStatus ??= DefaultJobStatus; return new JobPlanHeader( version,