Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,8 @@ internal static void MergeSessionTokens(
// so that subsequent Session-consistency reads on the affected collections can use the latest token
// without getting ReadSessionNotAvailable.
//
// DTC spans multiple collections so the server embeds per-operation session
// tokens in the JSON body; those are already parsed into DistributedTransactionOperationResult.SessionToken,
// but we must explicitly push them into the SessionContainer.

// DTC spans multiple collections so the server embeds per-operation session tokens in the JSON body.
// DistributedTransactionOperationResult.FromJson assembles each token into canonical {pkRangeId}:{lsn} form.
if (response == null || response.Count == 0 || serverRequest == null || sessionContainer == null)
{
return;
Expand All @@ -195,29 +193,40 @@ internal static void MergeSessionTokens(
for (int i = 0; i < response.Count; i++)
{
DistributedTransactionOperationResult result = response[i];
DistributedTransactionOperation operation = serverRequest.Operations[result.Index];

if (string.IsNullOrEmpty(result.SessionToken) || string.IsNullOrEmpty(operation.CollectionResourceId))
DistributedTransactionOperation operation = null;
try
{
continue;
}
operation = serverRequest.Operations[result.Index];

if (string.IsNullOrEmpty(result.SessionToken) || string.IsNullOrEmpty(operation.CollectionResourceId))
{
continue;
}

if (result.StatusCode == HttpStatusCode.NotFound
&& result.SubStatusCode == SubStatusCodes.ReadSessionNotAvailable)
// SessionToken is already in canonical {pkRangeId}:{lsn} format, assembled by FromJson.
// Note: each SetSessionToken call acquires a write lock on the SessionContainer.
// For a future optimization, consider a batch-update API on ISessionContainer to
// reduce lock acquisitions when multiple operations target the same collection.
headers.Clear();
headers[HttpConstants.HttpHeaders.SessionToken] = result.SessionToken;

sessionContainer.SetSessionToken(
operation.CollectionResourceId,
DistributedTransactionConstants.GetCollectionFullName(operation.Database, operation.Container),
headers);
Comment thread
Meghana-Palaparthi marked this conversation as resolved.
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
continue;
// Session-token bookkeeping must never fail a transaction the server already committed.
// Log and continue so the remaining operations' tokens are still attempted.
DefaultTrace.TraceWarning(
"DTC session token merge failed for operation index {0} (collection {1}): [{2}] {3}",
result.Index,
Comment thread
Meghana-Palaparthi marked this conversation as resolved.
operation?.CollectionResourceId ?? "<unknown>",
ex.GetType().Name,
ex.Message);
}

// Note: each SetSessionToken call acquires a write lock on the SessionContainer.
// For a future optimization, consider a batch-update API on ISessionContainer to
// reduce lock acquisitions when multiple operations target the same collection.
headers.Clear();
headers[HttpConstants.HttpHeaders.SessionToken] = result.SessionToken;

sessionContainer.SetSessionToken(
operation.CollectionResourceId,
DistributedTransactionConstants.GetCollectionFullName(operation.Database, operation.Container),
headers);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Cosmos
using System.Net;
using System.Text.Json;
using System.Text.Json.Serialization;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;

Expand All @@ -35,6 +36,7 @@ internal DistributedTransactionOperationResult(DistributedTransactionOperationRe
this.ETag = other.ETag;
this.ResourceStream = other.ResourceStream;
this.SessionToken = other.SessionToken;
this.PartitionKeyRangeId = other.PartitionKeyRangeId;
this.RequestCharge = other.RequestCharge;
this.ActivityId = other.ActivityId;
this.Trace = other.Trace;
Expand Down Expand Up @@ -89,6 +91,13 @@ public DistributedTransactionOperationResult()
[JsonPropertyName("sessionToken")]
public virtual string SessionToken { get; internal set; }

/// <summary>
/// Gets the raw partition key range ID emitted by the server.
/// </summary>
[JsonInclude]
[JsonPropertyName("partitionKeyRangeId")]
public virtual string PartitionKeyRangeId { get; internal set; }

/// <summary>
/// Gets the resource stream associated with the operation result.
/// The stream contains the raw response payload returned by the operation.
Expand Down Expand Up @@ -135,10 +144,11 @@ public virtual uint SubStatusCodeValue
/// Creates a <see cref="DistributedTransactionOperationResult"/> from a JSON element.
/// </summary>
/// <param name="json">The JSON element containing the operation result.</param>
/// <returns>The deserialized operation result.</returns>
/// <returns>The deserialized operation result with a canonical session token.</returns>
internal static DistributedTransactionOperationResult FromJson(JsonElement json)
{
DistributedTransactionOperationResult result = JsonSerializer.Deserialize<DistributedTransactionOperationResult>(json, DistributedTransactionOperationResult.CaseInsensitiveOptions);
DistributedTransactionOperationResult result = JsonSerializer.Deserialize<DistributedTransactionOperationResult>(json, DistributedTransactionOperationResult.CaseInsensitiveOptions)
?? throw new JsonException($"Failed to deserialize DTC operation result: Deserialize returned null. JSON element kind: '{json.ValueKind}'.");

if (json.TryGetProperty(DistributedTransactionSerializer.ResourceBody, out JsonElement resourceBody)
&& resourceBody.ValueKind != JsonValueKind.Undefined
Expand All @@ -154,6 +164,32 @@ internal static DistributedTransactionOperationResult FromJson(JsonElement json)
result.ResourceStream = new MemoryStream(bytes, 0, bytes.Length, writable: false, publiclyVisible: true);
}

if (!string.IsNullOrWhiteSpace(result.SessionToken))
{
int colonIndex = result.SessionToken.IndexOf(':');
if (colonIndex > 0 && colonIndex < result.SessionToken.Length - 1)
{
// Already in canonical {pkRangeId}:{lsn} form — leave as-is.
}
else if (!string.IsNullOrWhiteSpace(result.PartitionKeyRangeId))
{
result.SessionToken = result.PartitionKeyRangeId + ":" + result.SessionToken;
Comment thread
Meghana-Palaparthi marked this conversation as resolved.
}
else
{
DefaultTrace.TraceWarning(
"DTC operation index {0} returned session token without a valid partitionKeyRangeId (value: '{1}'); session token will not be merged into the session container.",
result.Index,
result.PartitionKeyRangeId ?? "<absent>");
result.SessionToken = null;
}
}
else if (result.SessionToken != null)
{
// Normalize whitespace-only to null so downstream guards don't need to recheck.
result.SessionToken = null;
}

return result;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ internal static class DistributedTransactionSerializer
internal const string Index = "index";
internal const string ResourceBody = "resourceBody";
internal const string SessionToken = "sessionToken";
internal const string PartitionKeyRangeId = "partitionKeyRangeId";
internal const string ETag = "ifMatch";
internal const string OperationType = "operationType";
internal const string ResourceType = "resourceType";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,16 +732,26 @@ public async Task UpsertItemStream_ValidDocument_SerializedAsUpsertOperation()
// Session token handling

[TestMethod]
[Description("Session tokens returned in DTC operation responses are merged into the client's session container, preventing ReadSessionNotAvailable errors on subsequent reads.")]
[Description("When DTC response carries a session token in the new wire format (LSN-only sessionToken + " +
"separate partitionKeyRangeId), the SDK assembles the canonical {pkRangeId}:{lsn} token and merges it " +
"into the session container so that subsequent Session-consistency reads succeed.")]
public async Task ValidateSessionTokenMergedIntoDtcClient()
{
ToDoActivity seedDoc = ToDoActivity.CreateRandomToDoActivity();
ItemResponse<ToDoActivity> seedResponse = await this.container.CreateItemAsync(seedDoc, new PartitionKey(seedDoc.pk), cancellationToken: this.cancellationToken);

string validSessionToken = seedResponse.Headers.Session;
Assert.IsFalse(string.IsNullOrEmpty(validSessionToken), "A valid session token must be obtained from the emulator for this test to be meaningful.");
string canonicalToken = seedResponse.Headers.Session;
Assert.IsFalse(string.IsNullOrEmpty(canonicalToken), "A valid session token must be obtained from the emulator for this test to be meaningful.");

string dtcMockResponse = $@"{{""operationResponses"":[{{""index"":0,""statusCode"":201,""sessionToken"":""{validSessionToken}""}}]}}";
// Split the canonical {pkRangeId}:{lsn} token into the two fields the DTC endpoint sends.
int colonIndex = canonicalToken.IndexOf(':');
Assert.IsTrue(colonIndex > 0, $"Emulator session token '{canonicalToken}' must be in {{pkRangeId}}:{{lsn}} format.");
string pkRangeId = canonicalToken.Substring(0, colonIndex);
string lsnOnly = canonicalToken.Substring(colonIndex + 1);

// Build a DTC mock response using the new wire contract: LSN-only in sessionToken,
// pkRangeId in a separate partitionKeyRangeId field.
string dtcMockResponse = $@"{{""operationResponses"":[{{""index"":0,""statusCode"":201,""sessionToken"":""{lsnOnly}"",""partitionKeyRangeId"":""{pkRangeId}""}}]}}";

DistributedTransactionMockHandler handler = new DistributedTransactionMockHandler(
request => Task.FromResult(this.BuildMockResponse(HttpStatusCode.OK, dtcMockResponse)));
Expand All @@ -754,13 +764,17 @@ public async Task ValidateSessionTokenMergedIntoDtcClient()
ConsistencyLevel = Cosmos.ConsistencyLevel.Session,
});

ToDoActivity newDoc = ToDoActivity.CreateRandomToDoActivity();
// Use the same partition key as seedDoc so the DTC operation targets the same physical
// partition whose session token is carried in the mock response.
ToDoActivity newDoc = ToDoActivity.CreateRandomToDoActivity(pk: seedDoc.pk);
DistributedTransactionResponse dtcResponse = await dtcClient
.CreateDistributedWriteTransaction()
.CreateItem(this.database.Id, this.container.Id, new PartitionKey(newDoc.pk), newDoc.id, newDoc)
.CommitTransactionAsync(this.cancellationToken);

Assert.IsTrue(dtcResponse.IsSuccessStatusCode, "The simulated DTC commit should appear successful to the client.");
Assert.AreEqual(canonicalToken, dtcResponse[0].SessionToken,
"SessionToken must be assembled as {pkRangeId}:{lsn} from the two separate wire fields.");

Container dtcContainer = dtcClient.GetContainer(this.database.Id, this.container.Id);
try
Expand All @@ -784,6 +798,53 @@ public async Task ValidateSessionTokenMergedIntoDtcClient()
}
}

[TestMethod]
[Description("When DTC response carries only an LSN-only sessionToken with no partitionKeyRangeId " +
"(current server behavior before coordinator update), the commit must succeed without throwing " +
"and the SDK silently skips merging the session token rather than crashing.")]
// TODO(issue#5857): Remove this test once the coordinator is updated to emit partitionKeyRangeId and the SDK no longer needs to handle its absence.
public async Task ValidateSessionTokenSkipped_WhenPartitionKeyRangeIdAbsent()
{
ToDoActivity seedDoc = ToDoActivity.CreateRandomToDoActivity();
ItemResponse<ToDoActivity> seedResponse = await this.container.CreateItemAsync(seedDoc, new PartitionKey(seedDoc.pk), cancellationToken: this.cancellationToken);

string canonicalToken = seedResponse.Headers.Session;
Assert.IsFalse(string.IsNullOrEmpty(canonicalToken), "A valid session token must be obtained from the emulator.");
int colonIndex = canonicalToken.IndexOf(':');
Assert.IsTrue(colonIndex > 0, $"Emulator session token '{canonicalToken}' must be in {{pkRangeId}}:{{lsn}} format.");
string lsnOnly = canonicalToken.Substring(colonIndex + 1);

// Current server behavior: LSN-only token, no partitionKeyRangeId field.
string dtcMockResponse = $@"{{""operationResponses"":[{{""index"":0,""statusCode"":201,""sessionToken"":""{lsnOnly}""}}]}}";

DistributedTransactionMockHandler handler = new DistributedTransactionMockHandler(
request => Task.FromResult(this.BuildMockResponse(HttpStatusCode.OK, dtcMockResponse)));

using CosmosClient dtcClient = TestCommon.CreateCosmosClient(
clientOptions: new CosmosClientOptions
{
CustomHandlers = { handler },
ConnectionMode = ConnectionMode.Gateway,
ConsistencyLevel = Cosmos.ConsistencyLevel.Session,
});

// Use the same partition key as seedDoc for consistency.
ToDoActivity newDoc = ToDoActivity.CreateRandomToDoActivity(pk: seedDoc.pk);
DistributedTransactionResponse dtcResponse = await dtcClient
.CreateDistributedWriteTransaction()
.CreateItem(this.database.Id, this.container.Id, new PartitionKey(newDoc.pk), newDoc.id, newDoc)
.CommitTransactionAsync(this.cancellationToken);

// Commit must succeed — this was the crash point before the fix (IndexOutOfRangeException
// in SessionContainer.SetSessionToken when it tried tokenParts[1] on an LSN-only token).
Assert.IsTrue(dtcResponse.IsSuccessStatusCode, "Commit must succeed even when partitionKeyRangeId is absent.");

// Session token must be null — FromJson nulls it out when pkRangeId is absent so that
// MergeSessionTokens skips the operation rather than passing a bad token to SetSessionToken.
Assert.IsNull(dtcResponse[0].SessionToken,
"SessionToken must be null when partitionKeyRangeId is absent; the SDK silently skips merging.");
}

// Read Transaction Tests

[TestMethod]
Expand Down
Loading
Loading