Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ namespace Microsoft.Azure.Cosmos
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Collections;

internal class DistributedTransactionCommitter
{
Expand Down Expand Up @@ -75,13 +77,20 @@ private async Task<DistributedTransactionResponse> ExecuteCommitAsync(

cancellationToken.ThrowIfCancellationRequested();

return await DistributedTransactionResponse.FromResponseMessageAsync(
DistributedTransactionResponse response = await DistributedTransactionResponse.FromResponseMessageAsync(
responseMessage,
serverRequest,
this.clientContext.SerializerCore,
serverRequest.IdempotencyToken,
trace,
cancellationToken);

DistributedTransactionCommitter.MergeSessionTokens(
response,
serverRequest,
this.clientContext.DocumentClient.sessionContainer);

return response;
}
}
}
Expand All @@ -100,8 +109,59 @@ private static void EnrichRequestMessage(RequestMessage requestMessage, Distribu
requestMessage.UseGatewayMode = true;
}

internal static void MergeSessionTokens(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimization: Follow-up limit session merge for only SESSION, MM

DistributedTransactionResponse response,
DistributedTransactionServerRequest serverRequest,
ISessionContainer sessionContainer)
{
// Mirror the pattern used by GatewayStoreModel.CaptureSessionTokenAndHandleSplitAsync.
// after a response is received, store each operation's session token in the SessionContainer
// so that subsequent Session-consistency reads on the affected collections can use the latest token
// without getting ReadSessionNotAvailable.
//
// DTC spans multiple collections so the server embeds per-operation session
// tokens in the JSON body; those are already parsed into DistributedTransactionOperationResult.SessionToken,
// but we must explicitly push them into the SessionContainer.

if (response == null || response.Count == 0 || serverRequest == null || sessionContainer == null)
{
return;
}

RequestNameValueCollection headers = new RequestNameValueCollection();

for (int i = 0; i < response.Count; i++)
{
DistributedTransactionOperationResult result = response[i];
Comment thread
Meghana-Palaparthi marked this conversation as resolved.
DistributedTransactionOperation operation = serverRequest.Operations[result.Index];

if (string.IsNullOrEmpty(result.SessionToken) || string.IsNullOrEmpty(operation.CollectionResourceId))
{
continue;
}

if (result.StatusCode == HttpStatusCode.NotFound
&& result.SubStatusCode == SubStatusCodes.ReadSessionNotAvailable)
{
continue;
}

// Note: each SetSessionToken call acquires a write lock on the SessionContainer.
// For a future optimization, consider a batch-update API on ISessionContainer to
// reduce lock acquisitions when multiple operations target the same collection.
headers.Clear();
headers[HttpConstants.HttpHeaders.SessionToken] = result.SessionToken;

sessionContainer.SetSessionToken(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimization: Each operation inside does a locking, wondering if a new API for batch updates for a collection will help.

/cc: @FabianMeiswinkel

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please create a follow-up GitHub issue to track it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a GitHub issue: #5729

operation.CollectionResourceId,
DistributedTransactionConstants.GetCollectionFullName(operation.Database, operation.Container),
headers);
}
}

private Task AbortTransactionAsync(CancellationToken cancellationToken)
{
// TODO: Implement abort for the two-phase commit path.
throw new NotImplementedException();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public static async Task ResolveCollectionRidsAsync(
CancellationToken cancellationToken)
{
IEnumerable<IGrouping<string, DistributedTransactionOperation>> groupedOperations = operations
.GroupBy(op => $"/dbs/{op.Database}/colls/{op.Container}");
.GroupBy(op => DistributedTransactionConstants.GetCollectionFullName(op.Database, op.Container));

foreach (IGrouping<string, DistributedTransactionOperation> group in groupedOperations)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,10 @@ public static bool IsDistributedTransactionRequest(OperationType operationType,
return operationType == OperationType.CommitDistributedTransaction
&& resourceType == ResourceType.DistributedTransactionBatch;
}

internal static string GetCollectionFullName(string database, string container)
{
return $"dbs/{database}/colls/{container}";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace Microsoft.Azure.Cosmos
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;

Expand Down Expand Up @@ -218,6 +219,10 @@ internal static async Task<DistributedTransactionResponse> FromResponseMessageAs
// Validate results count matches operations count
if (response.results == null || response.results.Count != serverRequest.Operations.Count)
{
DefaultTrace.TraceWarning(
$"DTC response: result count ({response.results?.Count ?? 0}) differs from " +
$"operation count ({serverRequest.Operations.Count}).");

if (responseMessage.IsSuccessStatusCode)
{
// Server should guarantee results count equals operations count on success
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,61 @@ public async Task UpsertItemStream_ValidDocument_SerializedAsUpsertOperation()
response.Dispose();
}

// Session token handling

[TestMethod]
[Description("Session tokens returned in DTC operation responses are merged into the client's session container, preventing ReadSessionNotAvailable errors on subsequent reads.")]
public async Task ValidateSessionTokenMergedIntoDtcClient()
{
ToDoActivity seedDoc = ToDoActivity.CreateRandomToDoActivity();
ItemResponse<ToDoActivity> seedResponse = await this.container.CreateItemAsync(seedDoc, new PartitionKey(seedDoc.pk), cancellationToken: this.cancellationToken);

string validSessionToken = seedResponse.Headers.Session;
Assert.IsFalse(string.IsNullOrEmpty(validSessionToken), "A valid session token must be obtained from the emulator for this test to be meaningful.");

string dtcMockResponse = $@"{{""operationResponses"":[{{""index"":0,""statusCode"":201,""sessionToken"":""{validSessionToken}""}}]}}";

DistributedTransactionMockHandler handler = new DistributedTransactionMockHandler(
request => Task.FromResult(this.BuildMockResponse(HttpStatusCode.OK, dtcMockResponse)));

using CosmosClient dtcClient = TestCommon.CreateCosmosClient(
clientOptions: new CosmosClientOptions
{
CustomHandlers = { handler },
ConnectionMode = ConnectionMode.Gateway,
ConsistencyLevel = Cosmos.ConsistencyLevel.Session,
});

ToDoActivity newDoc = ToDoActivity.CreateRandomToDoActivity();
DistributedTransactionResponse dtcResponse = await dtcClient
.CreateDistributedWriteTransaction()
.CreateItem(this.database.Id, this.container.Id, new PartitionKey(newDoc.pk), newDoc)
.CommitTransactionAsync(this.cancellationToken);

Assert.IsTrue(dtcResponse.IsSuccessStatusCode, "The simulated DTC commit should appear successful to the client.");

Container dtcContainer = dtcClient.GetContainer(this.database.Id, this.container.Id);
try
{
ItemResponse<ToDoActivity> readResponse = await dtcContainer.ReadItemAsync<ToDoActivity>(
seedDoc.id,
new PartitionKey(seedDoc.pk),
new ItemRequestOptions { ConsistencyLevel = Cosmos.ConsistencyLevel.Session },
cancellationToken: this.cancellationToken);

Assert.AreEqual(HttpStatusCode.OK, readResponse.StatusCode, "A Session-consistency read after a DTC commit should return 200 OK.");
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
Assert.AreNotEqual(
(int)SubStatusCodes.ReadSessionNotAvailable,
ex.SubStatusCode,
"A Session-consistency read after a DTC commit must not fail with " +
"ReadSessionNotAvailable (404/1002). This indicates that session token " +
"merging in DistributedTransactionCommitter is broken.");
}
}

// Helpers

private void ValidateValueKind(JsonElement operation, string property, JsonValueKind expectedValueKind, int operationIndex, bool isRequired)
Expand Down
Loading
Loading