Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,7 @@ public virtual FeedIterator GetDatabaseQueryStreamIterator(
#endif
virtual DistributedWriteTransaction CreateDistributedWriteTransaction()
{
return new DistributedWriteTransactionCore();
return new DistributedWriteTransactionCore(this.ClientContext);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;

internal class DistributedTransactionCommitter
{
// TODO: Move to HttpConstants.HttpHeaders once DTC headers are added centrally
private const string IdempotencyTokenHeader = "x-ms-dtc-operation-id";

private readonly IReadOnlyList<DistributedTransactionOperation> operations;
private readonly CosmosClientContext clientContext;

public DistributedTransactionCommitter(
IReadOnlyList<DistributedTransactionOperation> operations,
CosmosClientContext clientContext)
{
this.operations = operations ?? throw new ArgumentNullException(nameof(operations));
this.clientContext = clientContext ?? throw new ArgumentNullException(nameof(clientContext));
}

public async Task<DistributedTransactionResponse> CommitTransactionAsync(CancellationToken cancellationToken)
{
try
{
cancellationToken.ThrowIfCancellationRequested();
await DistributedTransactionCommitterUtils.ResolveCollectionRidsAsync(
this.operations,
this.clientContext,
cancellationToken);

DistributedTransactionServerRequest serverRequest = await DistributedTransactionServerRequest.CreateAsync(
this.operations,
this.clientContext.SerializerCore,
cancellationToken);

return await this.ExecuteCommitAsync(serverRequest, cancellationToken);
}
catch (Exception ex)
{
DefaultTrace.TraceError($"Distributed transaction failed: {ex.Message}");
// await this.AbortTransactionAsync(cancellationToken);
throw;
}
}

private async Task<DistributedTransactionResponse> ExecuteCommitAsync(
DistributedTransactionServerRequest serverRequest,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
using (ITrace trace = Trace.GetRootTrace("Execute Distributed Transaction Commit", TraceComponent.Batch, TraceLevel.Info))
{
using (MemoryStream bodyStream = serverRequest.TransferBodyStream())
{
ResponseMessage responseMessage = await this.clientContext.ProcessResourceOperationStreamAsync(
resourceUri: "/dtc/ops",
resourceType: ResourceType.Document, // TODO: Update to a new ResourceType specific to DTC
operationType: OperationType.Batch, // TODO: Update to a new OperationType specific to DTC
requestOptions: null,
cosmosContainerCore: null,
Comment thread
kirankumarkolli marked this conversation as resolved.
partitionKey: null,
itemId: null,
streamPayload: bodyStream,
requestEnricher: requestMessage => this.EnrichRequestMessage(requestMessage, serverRequest),
trace: trace,
cancellationToken: cancellationToken);

cancellationToken.ThrowIfCancellationRequested();

return await DistributedTransactionResponse.FromResponseMessageAsync(
responseMessage,
serverRequest,
this.clientContext.SerializerCore,
serverRequest.IdempotencyToken,
trace,
cancellationToken);
}
}
}

private void EnrichRequestMessage(RequestMessage requestMessage, DistributedTransactionServerRequest serverRequest)
{
// Set DTC-specific headers
requestMessage.Headers.Add(IdempotencyTokenHeader, serverRequest.IdempotencyToken.ToString());
requestMessage.UseGatewayMode = true;
}

private Task AbortTransactionAsync(CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;

internal class DistributedTransactionCommitterUtils
{
public static async Task ResolveCollectionRidsAsync(
IReadOnlyList<DistributedTransactionOperation> operations,
CosmosClientContext clientContext,
CancellationToken cancellationToken)
{
IEnumerable<IGrouping<string, DistributedTransactionOperation>> groupedOperations = operations
.GroupBy(op => $"/dbs/{op.Database}/colls/{op.Container}");

foreach (IGrouping<string, DistributedTransactionOperation> group in groupedOperations)
{
cancellationToken.ThrowIfCancellationRequested();

string collectionPath = group.Key;
ContainerProperties containerProperties = await clientContext.GetCachedContainerPropertiesAsync(
collectionPath,
NoOpTrace.Singleton,
cancellationToken);

string containerResourceId = containerProperties.ResourceId;
ResourceId resourceId = ResourceId.Parse(containerResourceId);
string databaseResourceId = resourceId.DatabaseId.ToString();

foreach (DistributedTransactionOperation operation in group)
{
operation.CollectionResourceId = containerResourceId;
operation.DatabaseResourceId = databaseResourceId;
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@ namespace Microsoft.Azure.Cosmos
{
using System;
using System.IO;

//using Microsoft.Azure.Documents;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;

/// <summary>
/// Represents an operation on a document whichwill be executed as a part of a distributed transaction.
/// Represents an operation on a document which will be executed as a part of a distributed transaction.
/// </summary>
internal class DistributedTransactionOperation
{
protected Memory<byte> body;

public DistributedTransactionOperation(
Documents.OperationType operationType,
OperationType operationType,
int operationIndex,
string database,
string container,
Expand All @@ -36,11 +39,37 @@ public DistributedTransactionOperation(

public string Container { get; internal set; }

public Documents.OperationType OperationType { get; internal set; }
public OperationType OperationType { get; internal set; }

public int OperationIndex { get; internal set; }
public int OperationIndex { get; internal set; }

public string Id { get; internal set; }

public string CollectionResourceId { get; internal set; }

public string DatabaseResourceId { get; internal set; }

internal string PartitionKeyJson { get; set; }

internal string SessionToken { get; set; }

internal string ETag { get; set; }

internal Stream ResourceStream { get; set; }

internal Memory<byte> ResourceBody
{
get => this.body;
set => this.body = value;
}

internal virtual async Task MaterializeResourceAsync(CosmosSerializerCore serializerCore, CancellationToken cancellationToken)
{
if (this.body.IsEmpty && this.ResourceStream != null)
{
this.body = await BatchExecUtils.StreamToMemoryAsync(this.ResourceStream, cancellationToken);
}
}
}

internal class DistributedTransactionOperation<T> : DistributedTransactionOperation
Expand Down Expand Up @@ -69,6 +98,18 @@ public DistributedTransactionOperation(
{
this.Resource = resource;
}

public T Resource { get; internal set; }

internal override Task MaterializeResourceAsync(CosmosSerializerCore serializerCore, CancellationToken cancellationToken)
{
if (this.body.IsEmpty && this.Resource != null)
{
this.ResourceStream = serializerCore.ToStream(this.Resource);
return base.MaterializeResourceAsync(serializerCore, cancellationToken);
}

return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ namespace Microsoft.Azure.Cosmos
using System;
using System.IO;
using System.Net;
using System.Text.Json;
using System.Text.Json.Serialization;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;

Expand All @@ -27,51 +29,112 @@ internal DistributedTransactionOperationResult(HttpStatusCode statusCode)

internal DistributedTransactionOperationResult(DistributedTransactionOperationResult other)
{
this.Index = other.Index;
this.StatusCode = other.StatusCode;
this.SubStatusCode = other.SubStatusCode;
this.ETag = other.ETag;
this.ResourceStream = other.ResourceStream;
this.SessionToken = other.SessionToken;
this.RequestCharge = other.RequestCharge;
this.ActivityId = other.ActivityId;
this.Trace = other.Trace;
}

/// <summary>
/// Initializes a new instance of the <see cref="DistributedTransactionOperationResult"/> class.
/// This protected constructor is intended for use by derived classes.
/// </summary>
[JsonConstructor]
protected DistributedTransactionOperationResult()
{
}

/// <summary>
/// Gets the index of this operation within the distributed transaction.
/// </summary>
[JsonInclude]
[JsonPropertyName("index")]
public virtual int Index { get; internal set; }

/// <summary>
/// Gets the HTTP status code returned by the operation.
/// </summary>
[JsonInclude]
[JsonPropertyName("statuscode")]
public virtual HttpStatusCode StatusCode { get; internal set; }

/// <summary>
/// Gets a value indicating whether the HTTP status code returned by the operation indicates success.
/// </summary>
public virtual bool IsSuccessStatusCode => ((int)this.StatusCode >= 200) && ((int)this.StatusCode <= 299);
[JsonIgnore]
public virtual bool IsSuccessStatusCode => (int)this.StatusCode >= 200 && (int)this.StatusCode <= 299;

/// <summary>
/// Gets the entity tag (ETag) associated with the operation result.
/// The ETag is used for concurrency control and represents the version of the resource.
/// </summary>
[JsonInclude]
[JsonPropertyName("etag")]
public virtual string ETag { get; internal set; }

/// <summary>
/// Gets the session token associated with the operation result.
/// </summary>
[JsonInclude]
[JsonPropertyName("sessionToken")]
public virtual string SessionToken { get; internal set; }

/// <summary>
/// Gets the resource stream associated with the operation result.
/// The stream contains the raw response payload returned by the operation.
/// </summary>
[JsonIgnore]
public virtual Stream ResourceStream { get; internal set; }

/// <summary>
/// Used for JSON deserialization of the base64-encoded resource body.
/// </summary>
[JsonInclude]
[JsonPropertyName("resourcebody")]
internal string ResourceBodyBase64
{
get => null; // Write-only for deserialization
set
{
if (!string.IsNullOrEmpty(value))
{
byte[] resourceBody = Convert.FromBase64String(value);
this.ResourceStream = new MemoryStream(resourceBody, 0, resourceBody.Length, writable: false, publiclyVisible: true);
}
}
}

/// <summary>
/// Request charge in request units for the operation.
/// </summary>
[JsonPropertyName("requestCharge")]
internal virtual double RequestCharge { get; set; }

[JsonPropertyName("substatuscode")]
internal virtual SubStatusCodes SubStatusCode { get; set; }

/// <summary>
/// ActivityId related to the operation.
/// </summary>
[JsonIgnore]
internal virtual string ActivityId { get; set; }

[JsonIgnore]
internal ITrace Trace { get; set; }

/// <summary>
/// Creates a <see cref="DistributedTransactionOperationResult"/> from a JSON element.
/// </summary>
/// <param name="json">The JSON element containing the operation result.</param>
/// <returns>The deserialized operation result.</returns>
internal static DistributedTransactionOperationResult FromJson(JsonElement json)
{
return JsonSerializer.Deserialize<DistributedTransactionOperationResult>(json);
}
}
}
Loading
Loading