Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
<Compile Include="$(AzureStorageSharedSources)Errors.Clients.cs" LinkBase="Shared\Storage" />
<Compile Include="$(AzureStorageSharedSources)StorageExtensions.cs" LinkBase="Shared\Storage" />
<Compile Include="$(AzureStorageSharedSources)UriExtensions.cs" LinkBase="Shared\Storage" />
<Compile Include="$(AzureStorageSharedSources)PooledMemoryStream.cs" LinkBase="Shared\Storage" />
<Compile Include="$(AzureStorageSharedSources)StreamExtensions.cs" LinkBase="Shared" />
</ItemGroup>
<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Text;
using Azure.Core;
using Azure.Storage.Blobs.Models;
using static Azure.Storage.DataMovement.JobPlanExtensions;
Comment thread
jalauzon-msft marked this conversation as resolved.
using Metadata = System.Collections.Generic.IDictionary<string, string>;
using Tags = System.Collections.Generic.IDictionary<string, string>;

Expand Down Expand Up @@ -278,23 +279,6 @@ private int CalculateLength()
return length;
}

private void WriteVariableLengthFieldInfo(BinaryWriter writer, byte[] bytes, ref int currentVariableLengthIndex)
{
// Write the offset, -1 if size is 0
if (bytes.Length > 0)
{
writer.Write(currentVariableLengthIndex);
currentVariableLengthIndex += bytes.Length;
}
else
{
writer.Write(-1);
}

// Write the length
writer.Write(bytes.Length);
}

private static void CheckSchemaVersion(int version)
{
if (version != DataMovementBlobConstants.DestinationJobPartHeader.SchemaVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ protected internal DataTransferStatus(Azure.Storage.DataMovement.DataTransferSta
public bool HasSkippedItems { get { throw null; } }
public Azure.Storage.DataMovement.DataTransferState State { get { throw null; } }
public bool Equals(Azure.Storage.DataMovement.DataTransferStatus other) { throw null; }
public override bool Equals(object obj) { throw null; }
public override int GetHashCode() { throw null; }
public static bool operator ==(Azure.Storage.DataMovement.DataTransferStatus left, Azure.Storage.DataMovement.DataTransferStatus right) { throw null; }
public static bool operator !=(Azure.Storage.DataMovement.DataTransferStatus left, Azure.Storage.DataMovement.DataTransferStatus right) { throw null; }
}
public partial class LocalFilesStorageResourceProvider : Azure.Storage.DataMovement.StorageResourceProvider
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ protected internal DataTransferStatus(Azure.Storage.DataMovement.DataTransferSta
public bool HasSkippedItems { get { throw null; } }
public Azure.Storage.DataMovement.DataTransferState State { get { throw null; } }
public bool Equals(Azure.Storage.DataMovement.DataTransferStatus other) { throw null; }
public override bool Equals(object obj) { throw null; }
public override int GetHashCode() { throw null; }
public static bool operator ==(Azure.Storage.DataMovement.DataTransferStatus left, Azure.Storage.DataMovement.DataTransferStatus right) { throw null; }
public static bool operator !=(Azure.Storage.DataMovement.DataTransferStatus left, Azure.Storage.DataMovement.DataTransferStatus right) { throw null; }
}
public partial class LocalFilesStorageResourceProvider : Azure.Storage.DataMovement.StorageResourceProvider
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@
<Compile Include="$(AzureStorageSharedSources)PartitionedStream.cs" LinkBase="Shared\Storage" />
<Compile Include="$(AzureStorageSharedSources)ProgressIncrementingStream.cs" LinkBase="Shared\Storage" />
<Compile Include="$(AzureStorageSharedSources)UriExtensions.cs" LinkBase="Shared\Storage" />
<Compile Include="$(AzureStorageSharedSources)PooledMemoryStream.cs" LinkBase="Shared\Storage" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -1,38 +1,41 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Azure.Storage.DataMovement.JobPlan;

namespace Azure.Storage.DataMovement
{
internal partial class CheckpointerExtensions
{
internal static async Task<bool> IsResumableAsync(
internal static async Task<DataTransferStatus> GetJobStatusAsync(
this TransferCheckpointer checkpointer,
string transferId,
CancellationToken cancellationToken)
CancellationToken cancellationToken = default)
{
DataTransferState transferState = (DataTransferState) await checkpointer.GetByteValue(
transferId,
DataMovementConstants.JobPartPlanFile.AtomicJobStatusStateIndex,
cancellationToken).ConfigureAwait(false);

byte hasFailedItemsByte = await checkpointer.GetByteValue(
using (Stream stream = await checkpointer.ReadJobPlanFileAsync(
transferId,
DataMovementConstants.JobPartPlanFile.AtomicJobStatusHasFailedIndex,
cancellationToken).ConfigureAwait(false);
bool hasFailedItems = Convert.ToBoolean(hasFailedItemsByte);
DataMovementConstants.JobPlanFile.JobStatusIndex,
DataMovementConstants.IntSizeInBytes,
cancellationToken).ConfigureAwait(false))
{
BinaryReader reader = new BinaryReader(stream);
JobPlanStatus jobPlanStatus = (JobPlanStatus)reader.ReadInt32();
return jobPlanStatus.ToDataTransferStatus();
}
}

byte hasSkippedItemsByte = await checkpointer.GetByteValue(
transferId,
DataMovementConstants.JobPartPlanFile.AtomicJobStatusHasSkippedIndex,
cancellationToken).ConfigureAwait(false);
bool hasSkippedItems = Convert.ToBoolean(hasSkippedItemsByte);
internal static async Task<bool> IsResumableAsync(
this TransferCheckpointer checkpointer,
string transferId,
CancellationToken cancellationToken)
{
DataTransferStatus jobStatus = await checkpointer.GetJobStatusAsync(transferId, cancellationToken).ConfigureAwait(false);

// Transfers marked as fully completed are not resumable
return transferState != DataTransferState.Completed || hasFailedItems || hasSkippedItems;
return jobStatus.State != DataTransferState.Completed || jobStatus.HasFailedItems || jobStatus.HasSkippedItems;
}

internal static async Task<DataTransferProperties> GetDataTransferPropertiesAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT License.

using System;
using System.Runtime.CompilerServices;
using System.Threading;

namespace Azure.Storage.DataMovement
Expand Down Expand Up @@ -105,15 +104,54 @@ internal bool TrySetTransferStateChange(DataTransferState state)
return Interlocked.Exchange(ref _stateValue, (int)state) != (int)state;
}

/// <inheritdoc/>
public bool Equals(DataTransferStatus other)
{
if (other == null)
{
return false;
}

return State.Equals(other.State) &&
HasFailedItems.Equals(other.HasFailedItems) &&
HasSkippedItems.Equals(other.HasSkippedItems);
}

/// <summary>
/// Indicates whether the current object is equal to another object of the same type.
/// Equality operator.
/// </summary>
/// <param name="other">An object to compare with this object.</param>
/// <returns>Returns true if the current object is equal to the other parameter; otherwise, false.</returns>
public bool Equals(DataTransferStatus other)
=> State.Equals(other.State) &&
HasFailedItems.Equals(other.HasFailedItems) &&
HasSkippedItems.Equals(other.HasSkippedItems);
/// <param name="left">The left hand side.</param>
/// <param name="right">The right hand side.</param>
/// <returns>True, if the two values are equal; otherwise false.</returns>
public static bool operator ==(DataTransferStatus left, DataTransferStatus right)
{
if (left is null != right is null)
{
return false;
}
return left?.Equals(right) ?? true;
}

/// <summary>
/// Inequality operator.
/// </summary>
/// <param name="left">The left hand side.</param>
/// <param name="right">The right hand side.</param>
/// <returns>True, if the two values are not equal; otherwise false.</returns>
public static bool operator !=(DataTransferStatus left, DataTransferStatus right) => !(left == right);

/// <inheritdoc/>
public override bool Equals(object obj) => Equals(obj as DataTransferStatus);

/// <inheritdoc/>
public override int GetHashCode()
{
int hashCode = 1225395075;
hashCode = hashCode * -1521134295 + State.GetHashCode();
hashCode = hashCode * -1521134295 + HasFailedItems.GetHashCode();
hashCode = hashCode * -1521134295 + HasSkippedItems.GetHashCode();
return hashCode;
}

/// <summary>
/// Performs a Deep Copy of the <see cref="DataTransferStatus"/>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ internal class DataMovementConstants
internal const int MaxJobPartReaders = 64;
internal const int MaxJobChunkTasks = 3000;
internal const int StatusCheckInSec = 10;
internal const int DefaultArrayPoolArraySize = 4 * 1024;

internal static class ConcurrencyTuner
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ namespace Azure.Storage.DataMovement
{
internal class DataTransferStatusInternal : DataTransferStatus
{
public DataTransferStatusInternal() : base()
{ }

public DataTransferStatusInternal(
DataTransferState state,
bool hasFailedItems,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ internal class JobPlanHeader
/// <summary>
/// The current status of the transfer job.
/// </summary>
public JobPlanStatus JobStatus;
public DataTransferStatus JobStatus;

/// <summary>
/// The parent path for the source of the transfer.
Expand All @@ -56,7 +56,7 @@ public JobPlanHeader(
DateTimeOffset createTime,
JobPlanOperation operationType,
bool enumerationComplete,
JobPlanStatus jobStatus,
DataTransferStatus jobStatus,
string parentSourcePath,
string parentDestinationPath)
{
Expand Down Expand Up @@ -100,23 +100,15 @@ public void Serialize(Stream stream)
writer.Write(Convert.ToByte(EnumerationComplete));

// JobStatus
writer.Write((int)JobStatus);
writer.Write((int)JobStatus.ToJobPlanStatus());

// ParentSourcePath offset
// ParentSourcePath offset/length
byte[] parentSourcePathBytes = Encoding.UTF8.GetBytes(ParentSourcePath);
writer.Write(currentVariableLengthIndex);
currentVariableLengthIndex += parentSourcePathBytes.Length;

// ParentSourcePath length
writer.Write(parentSourcePathBytes.Length);
JobPlanExtensions.WriteVariableLengthFieldInfo(writer, parentSourcePathBytes, ref currentVariableLengthIndex);

// ParentDestinationPath offset
// ParentDestinationPath offset/length
byte[] parentDestinationPathBytes = Encoding.UTF8.GetBytes(ParentDestinationPath);
writer.Write(currentVariableLengthIndex);
currentVariableLengthIndex += parentDestinationPathBytes.Length;

// ParentDestinationPath length
writer.Write(parentDestinationPathBytes.Length);
JobPlanExtensions.WriteVariableLengthFieldInfo(writer, parentDestinationPathBytes, ref currentVariableLengthIndex);

// ParentSourcePath
writer.Write(parentSourcePathBytes);
Expand Down Expand Up @@ -156,7 +148,7 @@ public static JobPlanHeader Deserialize(Stream stream)
bool enumerationComplete = Convert.ToBoolean(enumerationCompleteByte);

// JobStatus
JobPlanStatus jobStatus = (JobPlanStatus)reader.ReadInt32();
JobPlanStatus jobPlanStatus = (JobPlanStatus)reader.ReadInt32();

// ParentSourcePath offset
int parentSourcePathOffset = reader.ReadInt32();
Expand Down Expand Up @@ -194,7 +186,7 @@ public static JobPlanHeader Deserialize(Stream stream)
createTime,
operationType,
enumerationComplete,
jobStatus,
jobPlanStatus.ToDataTransferStatus(),
parentSourcePath,
parentDestinationPath);
}
Expand Down
Loading