diff --git a/Microsoft.Azure.Cosmos/src/DistributedTransaction/DistributedTransactionCommitter.cs b/Microsoft.Azure.Cosmos/src/DistributedTransaction/DistributedTransactionCommitter.cs index 5ffcdea0c8..9dd95c7e84 100644 --- a/Microsoft.Azure.Cosmos/src/DistributedTransaction/DistributedTransactionCommitter.cs +++ b/Microsoft.Azure.Cosmos/src/DistributedTransaction/DistributedTransactionCommitter.cs @@ -181,10 +181,8 @@ internal static void MergeSessionTokens( // so that subsequent Session-consistency reads on the affected collections can use the latest token // without getting ReadSessionNotAvailable. // - // DTC spans multiple collections so the server embeds per-operation session - // tokens in the JSON body; those are already parsed into DistributedTransactionOperationResult.SessionToken, - // but we must explicitly push them into the SessionContainer. - + // DTC spans multiple collections so the server embeds per-operation session tokens in the JSON body. + // DistributedTransactionOperationResult.FromJson assembles each token into canonical {pkRangeId}:{lsn} form. if (response == null || response.Count == 0 || serverRequest == null || sessionContainer == null) { return; @@ -195,29 +193,40 @@ internal static void MergeSessionTokens( for (int i = 0; i < response.Count; i++) { DistributedTransactionOperationResult result = response[i]; - DistributedTransactionOperation operation = serverRequest.Operations[result.Index]; - if (string.IsNullOrEmpty(result.SessionToken) || string.IsNullOrEmpty(operation.CollectionResourceId)) + DistributedTransactionOperation operation = null; + try { - continue; - } + operation = serverRequest.Operations[result.Index]; + + if (string.IsNullOrEmpty(result.SessionToken) || string.IsNullOrEmpty(operation.CollectionResourceId)) + { + continue; + } - if (result.StatusCode == HttpStatusCode.NotFound - && result.SubStatusCode == SubStatusCodes.ReadSessionNotAvailable) + // SessionToken is already in canonical {pkRangeId}:{lsn} format, assembled by FromJson. + // Note: each SetSessionToken call acquires a write lock on the SessionContainer. + // For a future optimization, consider a batch-update API on ISessionContainer to + // reduce lock acquisitions when multiple operations target the same collection. + headers.Clear(); + headers[HttpConstants.HttpHeaders.SessionToken] = result.SessionToken; + + sessionContainer.SetSessionToken( + operation.CollectionResourceId, + DistributedTransactionConstants.GetCollectionFullName(operation.Database, operation.Container), + headers); + } + catch (Exception ex) when (ex is not OperationCanceledException) { - continue; + // Session-token bookkeeping must never fail a transaction the server already committed. + // Log and continue so the remaining operations' tokens are still attempted. + DefaultTrace.TraceWarning( + "DTC session token merge failed for operation index {0} (collection {1}): [{2}] {3}", + result.Index, + operation?.CollectionResourceId ?? "", + ex.GetType().Name, + ex.Message); } - - // Note: each SetSessionToken call acquires a write lock on the SessionContainer. - // For a future optimization, consider a batch-update API on ISessionContainer to - // reduce lock acquisitions when multiple operations target the same collection. - headers.Clear(); - headers[HttpConstants.HttpHeaders.SessionToken] = result.SessionToken; - - sessionContainer.SetSessionToken( - operation.CollectionResourceId, - DistributedTransactionConstants.GetCollectionFullName(operation.Database, operation.Container), - headers); } } } diff --git a/Microsoft.Azure.Cosmos/src/DistributedTransaction/DistributedTransactionOperationResult.cs b/Microsoft.Azure.Cosmos/src/DistributedTransaction/DistributedTransactionOperationResult.cs index 257c29b93a..0f1efa364f 100644 --- a/Microsoft.Azure.Cosmos/src/DistributedTransaction/DistributedTransactionOperationResult.cs +++ b/Microsoft.Azure.Cosmos/src/DistributedTransaction/DistributedTransactionOperationResult.cs @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Cosmos using System.Net; using System.Text.Json; using System.Text.Json.Serialization; + using Microsoft.Azure.Cosmos.Core.Trace; using Microsoft.Azure.Cosmos.Tracing; using Microsoft.Azure.Documents; @@ -35,6 +36,7 @@ internal DistributedTransactionOperationResult(DistributedTransactionOperationRe this.ETag = other.ETag; this.ResourceStream = other.ResourceStream; this.SessionToken = other.SessionToken; + this.PartitionKeyRangeId = other.PartitionKeyRangeId; this.RequestCharge = other.RequestCharge; this.ActivityId = other.ActivityId; this.Trace = other.Trace; @@ -89,6 +91,13 @@ public DistributedTransactionOperationResult() [JsonPropertyName("sessionToken")] public virtual string SessionToken { get; internal set; } + /// + /// Gets the raw partition key range ID emitted by the server. + /// + [JsonInclude] + [JsonPropertyName("partitionKeyRangeId")] + public virtual string PartitionKeyRangeId { get; internal set; } + /// /// Gets the resource stream associated with the operation result. /// The stream contains the raw response payload returned by the operation. @@ -135,10 +144,11 @@ public virtual uint SubStatusCodeValue /// Creates a from a JSON element. /// /// The JSON element containing the operation result. - /// The deserialized operation result. + /// The deserialized operation result with a canonical session token. internal static DistributedTransactionOperationResult FromJson(JsonElement json) { - DistributedTransactionOperationResult result = JsonSerializer.Deserialize(json, DistributedTransactionOperationResult.CaseInsensitiveOptions); + DistributedTransactionOperationResult result = JsonSerializer.Deserialize(json, DistributedTransactionOperationResult.CaseInsensitiveOptions) + ?? throw new JsonException($"Failed to deserialize DTC operation result: Deserialize returned null. JSON element kind: '{json.ValueKind}'."); if (json.TryGetProperty(DistributedTransactionSerializer.ResourceBody, out JsonElement resourceBody) && resourceBody.ValueKind != JsonValueKind.Undefined @@ -154,6 +164,32 @@ internal static DistributedTransactionOperationResult FromJson(JsonElement json) result.ResourceStream = new MemoryStream(bytes, 0, bytes.Length, writable: false, publiclyVisible: true); } + if (!string.IsNullOrWhiteSpace(result.SessionToken)) + { + int colonIndex = result.SessionToken.IndexOf(':'); + if (colonIndex > 0 && colonIndex < result.SessionToken.Length - 1) + { + // Already in canonical {pkRangeId}:{lsn} form — leave as-is. + } + else if (!string.IsNullOrWhiteSpace(result.PartitionKeyRangeId)) + { + result.SessionToken = result.PartitionKeyRangeId + ":" + result.SessionToken; + } + else + { + DefaultTrace.TraceWarning( + "DTC operation index {0} returned session token without a valid partitionKeyRangeId (value: '{1}'); session token will not be merged into the session container.", + result.Index, + result.PartitionKeyRangeId ?? ""); + result.SessionToken = null; + } + } + else if (result.SessionToken != null) + { + // Normalize whitespace-only to null so downstream guards don't need to recheck. + result.SessionToken = null; + } + return result; } } diff --git a/Microsoft.Azure.Cosmos/src/DistributedTransaction/DistributedTransactionSerializer.cs b/Microsoft.Azure.Cosmos/src/DistributedTransaction/DistributedTransactionSerializer.cs index d421c1fb1d..c35f49d64c 100644 --- a/Microsoft.Azure.Cosmos/src/DistributedTransaction/DistributedTransactionSerializer.cs +++ b/Microsoft.Azure.Cosmos/src/DistributedTransaction/DistributedTransactionSerializer.cs @@ -26,6 +26,7 @@ internal static class DistributedTransactionSerializer internal const string Index = "index"; internal const string ResourceBody = "resourceBody"; internal const string SessionToken = "sessionToken"; + internal const string PartitionKeyRangeId = "partitionKeyRangeId"; internal const string ETag = "ifMatch"; internal const string OperationType = "operationType"; internal const string ResourceType = "resourceType"; diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/DistributedTransaction/DistributedTransactionTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/DistributedTransaction/DistributedTransactionTests.cs index ae6ea56d56..68a5e6e87d 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/DistributedTransaction/DistributedTransactionTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/DistributedTransaction/DistributedTransactionTests.cs @@ -732,16 +732,26 @@ public async Task UpsertItemStream_ValidDocument_SerializedAsUpsertOperation() // Session token handling [TestMethod] - [Description("Session tokens returned in DTC operation responses are merged into the client's session container, preventing ReadSessionNotAvailable errors on subsequent reads.")] + [Description("When DTC response carries a session token in the new wire format (LSN-only sessionToken + " + + "separate partitionKeyRangeId), the SDK assembles the canonical {pkRangeId}:{lsn} token and merges it " + + "into the session container so that subsequent Session-consistency reads succeed.")] public async Task ValidateSessionTokenMergedIntoDtcClient() { ToDoActivity seedDoc = ToDoActivity.CreateRandomToDoActivity(); ItemResponse seedResponse = await this.container.CreateItemAsync(seedDoc, new PartitionKey(seedDoc.pk), cancellationToken: this.cancellationToken); - string validSessionToken = seedResponse.Headers.Session; - Assert.IsFalse(string.IsNullOrEmpty(validSessionToken), "A valid session token must be obtained from the emulator for this test to be meaningful."); + string canonicalToken = seedResponse.Headers.Session; + Assert.IsFalse(string.IsNullOrEmpty(canonicalToken), "A valid session token must be obtained from the emulator for this test to be meaningful."); - string dtcMockResponse = $@"{{""operationResponses"":[{{""index"":0,""statusCode"":201,""sessionToken"":""{validSessionToken}""}}]}}"; + // Split the canonical {pkRangeId}:{lsn} token into the two fields the DTC endpoint sends. + int colonIndex = canonicalToken.IndexOf(':'); + Assert.IsTrue(colonIndex > 0, $"Emulator session token '{canonicalToken}' must be in {{pkRangeId}}:{{lsn}} format."); + string pkRangeId = canonicalToken.Substring(0, colonIndex); + string lsnOnly = canonicalToken.Substring(colonIndex + 1); + + // Build a DTC mock response using the new wire contract: LSN-only in sessionToken, + // pkRangeId in a separate partitionKeyRangeId field. + string dtcMockResponse = $@"{{""operationResponses"":[{{""index"":0,""statusCode"":201,""sessionToken"":""{lsnOnly}"",""partitionKeyRangeId"":""{pkRangeId}""}}]}}"; DistributedTransactionMockHandler handler = new DistributedTransactionMockHandler( request => Task.FromResult(this.BuildMockResponse(HttpStatusCode.OK, dtcMockResponse))); @@ -754,13 +764,17 @@ public async Task ValidateSessionTokenMergedIntoDtcClient() ConsistencyLevel = Cosmos.ConsistencyLevel.Session, }); - ToDoActivity newDoc = ToDoActivity.CreateRandomToDoActivity(); + // Use the same partition key as seedDoc so the DTC operation targets the same physical + // partition whose session token is carried in the mock response. + ToDoActivity newDoc = ToDoActivity.CreateRandomToDoActivity(pk: seedDoc.pk); DistributedTransactionResponse dtcResponse = await dtcClient .CreateDistributedWriteTransaction() .CreateItem(this.database.Id, this.container.Id, new PartitionKey(newDoc.pk), newDoc.id, newDoc) .CommitTransactionAsync(this.cancellationToken); Assert.IsTrue(dtcResponse.IsSuccessStatusCode, "The simulated DTC commit should appear successful to the client."); + Assert.AreEqual(canonicalToken, dtcResponse[0].SessionToken, + "SessionToken must be assembled as {pkRangeId}:{lsn} from the two separate wire fields."); Container dtcContainer = dtcClient.GetContainer(this.database.Id, this.container.Id); try @@ -784,6 +798,53 @@ public async Task ValidateSessionTokenMergedIntoDtcClient() } } + [TestMethod] + [Description("When DTC response carries only an LSN-only sessionToken with no partitionKeyRangeId " + + "(current server behavior before coordinator update), the commit must succeed without throwing " + + "and the SDK silently skips merging the session token rather than crashing.")] + // TODO(issue#5857): Remove this test once the coordinator is updated to emit partitionKeyRangeId and the SDK no longer needs to handle its absence. + public async Task ValidateSessionTokenSkipped_WhenPartitionKeyRangeIdAbsent() + { + ToDoActivity seedDoc = ToDoActivity.CreateRandomToDoActivity(); + ItemResponse seedResponse = await this.container.CreateItemAsync(seedDoc, new PartitionKey(seedDoc.pk), cancellationToken: this.cancellationToken); + + string canonicalToken = seedResponse.Headers.Session; + Assert.IsFalse(string.IsNullOrEmpty(canonicalToken), "A valid session token must be obtained from the emulator."); + int colonIndex = canonicalToken.IndexOf(':'); + Assert.IsTrue(colonIndex > 0, $"Emulator session token '{canonicalToken}' must be in {{pkRangeId}}:{{lsn}} format."); + string lsnOnly = canonicalToken.Substring(colonIndex + 1); + + // Current server behavior: LSN-only token, no partitionKeyRangeId field. + string dtcMockResponse = $@"{{""operationResponses"":[{{""index"":0,""statusCode"":201,""sessionToken"":""{lsnOnly}""}}]}}"; + + DistributedTransactionMockHandler handler = new DistributedTransactionMockHandler( + request => Task.FromResult(this.BuildMockResponse(HttpStatusCode.OK, dtcMockResponse))); + + using CosmosClient dtcClient = TestCommon.CreateCosmosClient( + clientOptions: new CosmosClientOptions + { + CustomHandlers = { handler }, + ConnectionMode = ConnectionMode.Gateway, + ConsistencyLevel = Cosmos.ConsistencyLevel.Session, + }); + + // Use the same partition key as seedDoc for consistency. + ToDoActivity newDoc = ToDoActivity.CreateRandomToDoActivity(pk: seedDoc.pk); + DistributedTransactionResponse dtcResponse = await dtcClient + .CreateDistributedWriteTransaction() + .CreateItem(this.database.Id, this.container.Id, new PartitionKey(newDoc.pk), newDoc.id, newDoc) + .CommitTransactionAsync(this.cancellationToken); + + // Commit must succeed — this was the crash point before the fix (IndexOutOfRangeException + // in SessionContainer.SetSessionToken when it tried tokenParts[1] on an LSN-only token). + Assert.IsTrue(dtcResponse.IsSuccessStatusCode, "Commit must succeed even when partitionKeyRangeId is absent."); + + // Session token must be null — FromJson nulls it out when pkRangeId is absent so that + // MergeSessionTokens skips the operation rather than passing a bad token to SetSessionToken. + Assert.IsNull(dtcResponse[0].SessionToken, + "SessionToken must be null when partitionKeyRangeId is absent; the SDK silently skips merging."); + } + // Read Transaction Tests [TestMethod] diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/DistributedTransaction/DistributedTransactionCommitterTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/DistributedTransaction/DistributedTransactionCommitterTests.cs index 9f18336e9e..52bd77ad7d 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/DistributedTransaction/DistributedTransactionCommitterTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/DistributedTransaction/DistributedTransactionCommitterTests.cs @@ -6,6 +6,7 @@ namespace Microsoft.Azure.Cosmos.Tests.DistributedTransaction { using System; using System.Collections.Generic; + using System.Globalization; using System.IO; using System.Linq; using System.Net; @@ -13,6 +14,7 @@ namespace Microsoft.Azure.Cosmos.Tests.DistributedTransaction using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.Common; + using Microsoft.Azure.Cosmos.Core.Trace; using Microsoft.Azure.Cosmos.Tests; using Microsoft.Azure.Cosmos.Tracing; using Microsoft.Azure.Documents; @@ -37,12 +39,14 @@ public class DistributedTransactionCommitterTests [Description("Verifies that when the DTC response carries a session token, the token is merged into the SessionContainer")] public async Task CommitTransactionAsync_MergesSessionTokensIntoSessionContainer() { - const string sessionToken = "0:1#9#4=8#5=7"; + const string lsnOnly = "1#9#4=8#5=7"; + const string pkRangeId = "0"; + const string expectedToken = "0:1#9#4=8#5=7"; SessionContainer sessionContainer = new SessionContainer("testhost"); string responseJson = BuildDtcResponseJson( - new[] { (statusCode: 201, sessionToken) }); + new[] { (statusCode: 201, subStatusCode: (int?)null, sessionToken: lsnOnly, partitionKeyRangeId: pkRangeId) }); Mock mockContext = this.CreateMockContext( sessionContainer, @@ -66,8 +70,8 @@ public async Task CommitTransactionAsync_MergesSessionTokensIntoSessionContainer await committer.CommitTransactionAsync(CancellationToken.None); string storedToken = sessionContainer.GetSessionToken(DistributedTransactionConstants.GetCollectionFullName(DatabaseName, ContainerName)); - Assert.AreEqual(sessionToken, storedToken, - "Session token should be merged into SessionContainer after a successful DTC commit."); + Assert.AreEqual(expectedToken, storedToken, + "Session token should be assembled as {pkRangeId}:{lsn} and merged into SessionContainer after a successful DTC commit."); } [TestMethod] @@ -108,7 +112,9 @@ public async Task CommitTransactionAsync_SkipsMerge_WhenSessionTokenIsNull() [Description("Verifies that the correct collectionRid and collectionFullname are passed to SetSessionToken for each operation")] public async Task CommitTransactionAsync_PassesCorrectCollectionToSetSessionToken() { - const string sessionToken = "0:1#5#4=3"; + const string lsnOnly = "1#5#4=3"; + const string pkRangeId = "0"; + const string assembledToken = "0:1#5#4=3"; const string container2 = "testcontainer2"; string collectionRid1 = ResourceId.NewDocumentCollectionId(42, 129).DocumentCollectionId.ToString(); @@ -146,8 +152,8 @@ public async Task CommitTransactionAsync_PassesCorrectCollectionToSetSessionToke Encoding.UTF8.GetBytes(BuildDtcResponseJson( new[] { - (statusCode: 200, sessionToken), - (statusCode: 200, sessionToken), + (statusCode: 200, subStatusCode: (int?)null, sessionToken: lsnOnly, partitionKeyRangeId: pkRangeId), + (statusCode: 200, subStatusCode: (int?)null, sessionToken: lsnOnly, partitionKeyRangeId: pkRangeId), }))) }; mockContext.Setup(c => c.ProcessResourceOperationStreamAsync( @@ -192,7 +198,7 @@ public async Task CommitTransactionAsync_PassesCorrectCollectionToSetSessionToke s => s.SetSessionToken( collectionRid1, DistributedTransactionConstants.GetCollectionFullName(DatabaseName, ContainerName), - It.Is(h => h[HttpConstants.HttpHeaders.SessionToken] == sessionToken)), + It.Is(h => h[HttpConstants.HttpHeaders.SessionToken] == assembledToken)), Times.Once, "SetSessionToken should be called for the first operation with its collection RID and fullname."); @@ -200,24 +206,65 @@ public async Task CommitTransactionAsync_PassesCorrectCollectionToSetSessionToke s => s.SetSessionToken( collectionRid2, DistributedTransactionConstants.GetCollectionFullName(DatabaseName, container2), - It.Is(h => h[HttpConstants.HttpHeaders.SessionToken] == sessionToken)), + It.Is(h => h[HttpConstants.HttpHeaders.SessionToken] == assembledToken)), Times.Once, "SetSessionToken should be called for the second operation with its collection RID and fullname."); } [TestMethod] - [Description("Verifies that 404/1002 (ReadSessionNotAvailable) operation results are excluded from session token merging")] - public async Task CommitTransactionAsync_SkipsMerge_When404ReadSessionNotAvailable() + [Description("Verifies that session tokens are still merged into the SessionContainer even when the DTC response indicates a failure")] + public async Task CommitTransactionAsync_MergesSessionTokens_OnFailureResponse() { - const string sessionToken = "0:1#9#4=8#5=7"; - const int readSessionNotAvailableSubStatus = 1002; + // Deliberately distinct from the success-path token so a copy-paste regression would be caught. + const string lsnOnly = "1#3#4=2#5=1"; + const string pkRangeId = "0"; + const string expectedToken = "0:1#3#4=2#5=1"; SessionContainer sessionContainer = new SessionContainer("testhost"); Mock mockContext = this.CreateMockContext( sessionContainer, - responseContent: BuildDtcResponseJson(new[] { (statusCode: 404, subStatusCode: (int?)readSessionNotAvailableSubStatus, sessionToken) }), - statusCode: HttpStatusCode.NotFound); + responseContent: BuildDtcResponseJson(new[] { (statusCode: 409, subStatusCode: (int?)null, sessionToken: lsnOnly, partitionKeyRangeId: pkRangeId) }), + statusCode: HttpStatusCode.Conflict); + + List operations = new List + { + new DistributedTransactionOperation( + OperationType.Create, + operationIndex: 0, + DatabaseName, + ContainerName, + new PartitionKey("pk1"), + id: "doc1") + }; + + DistributedTransactionCommitter committer = new DistributedTransactionCommitter( + operations, mockContext.Object); + + DistributedTransactionResponse response = await committer.CommitTransactionAsync(CancellationToken.None); + + string storedToken = sessionContainer.GetSessionToken(DistributedTransactionConstants.GetCollectionFullName(DatabaseName, ContainerName)); + Assert.AreEqual(expectedToken, storedToken, + "Session token should still be merged even when the DTC response indicates a failure."); + } + + [TestMethod] + [Description("When session token is LSN-only and partitionKeyRangeId is present, the token is assembled as {pkRangeId}:{lsn}")] + public async Task CommitTransactionAsync_AssemblesSessionToken_WhenPartitionKeyRangeIdIsPresent() + { + const string lsnOnly = "1#9#4=8#5=7"; + const string pkRangeId = "0"; + const string expectedToken = "0:1#9#4=8#5=7"; + + SessionContainer sessionContainer = new SessionContainer("testhost"); + + string responseJson = BuildDtcResponseJson( + new[] { (statusCode: 201, subStatusCode: (int?)null, sessionToken: lsnOnly, partitionKeyRangeId: pkRangeId) }); + + Mock mockContext = this.CreateMockContext( + sessionContainer, + responseContent: responseJson, + statusCode: HttpStatusCode.OK); List operations = new List { @@ -236,23 +283,26 @@ public async Task CommitTransactionAsync_SkipsMerge_When404ReadSessionNotAvailab await committer.CommitTransactionAsync(CancellationToken.None); string storedToken = sessionContainer.GetSessionToken(DistributedTransactionConstants.GetCollectionFullName(DatabaseName, ContainerName)); - Assert.IsTrue(string.IsNullOrEmpty(storedToken), - "Session token should NOT be merged for 404/ReadSessionNotAvailable operation results."); + Assert.AreEqual(expectedToken, storedToken, + "Session token should be assembled as {pkRangeId}:{lsn} when partitionKeyRangeId is present."); } [TestMethod] - [Description("Verifies that session tokens are still merged into the SessionContainer even when the DTC response indicates a failure")] - public async Task CommitTransactionAsync_MergesSessionTokens_OnFailureResponse() + [Description("When partitionKeyRangeId is absent, merge is silently skipped")] + public async Task CommitTransactionAsync_SkipsMerge_WhenLsnOnlyAndPartitionKeyRangeIdIsAbsent() { - // Deliberately distinct from the success-path token so a copy-paste regression would be caught. - const string sessionToken = "0:1#3#4=2#5=1"; + const string lsnOnly = "1#9#4=8#5=7"; SessionContainer sessionContainer = new SessionContainer("testhost"); + // No partitionKeyRangeId; session token is LSN-only (as always returned by the endpoint) + string responseJson = BuildDtcResponseJson( + new[] { (statusCode: 201, subStatusCode: (int?)null, sessionToken: lsnOnly, partitionKeyRangeId: (string)null) }); + Mock mockContext = this.CreateMockContext( sessionContainer, - responseContent: BuildDtcResponseJson(new[] { (statusCode: 409, sessionToken) }), - statusCode: HttpStatusCode.Conflict); + responseContent: responseJson, + statusCode: HttpStatusCode.OK); List operations = new List { @@ -268,15 +318,325 @@ public async Task CommitTransactionAsync_MergesSessionTokens_OnFailureResponse() DistributedTransactionCommitter committer = new DistributedTransactionCommitter( operations, mockContext.Object); - DistributedTransactionResponse response = await committer.CommitTransactionAsync(CancellationToken.None); + await committer.CommitTransactionAsync(CancellationToken.None); string storedToken = sessionContainer.GetSessionToken(DistributedTransactionConstants.GetCollectionFullName(DatabaseName, ContainerName)); - Assert.AreEqual(sessionToken, storedToken, - "Session token should still be merged even when the DTC response indicates a failure."); + Assert.IsTrue(string.IsNullOrEmpty(storedToken), + "SessionContainer should not be updated when partitionKeyRangeId is absent."); + } + + + [DataTestMethod] + [DataRow("", DisplayName = "Empty string partitionKeyRangeId")] + [DataRow(" ", DisplayName = "Whitespace-only partitionKeyRangeId")] + [DataRow(" ", DisplayName = "Multiple whitespace partitionKeyRangeId")] + [Description("When partitionKeyRangeId is present but empty or whitespace, merge is silently skipped. " + + "The server has no validation on this field; throwing would risk failing a committed transaction.")] + public async Task CommitTransactionAsync_SkipsMerge_WhenPartitionKeyRangeIdIsEmptyOrWhitespace(string pkRangeId) + { + const string lsnOnly = "1#9#4=8#5=7"; + + SessionContainer sessionContainer = new SessionContainer("testhost"); + + string responseJson = BuildDtcResponseJson( + new[] { (statusCode: 201, subStatusCode: (int?)null, sessionToken: lsnOnly, partitionKeyRangeId: pkRangeId) }); + + Mock mockContext = this.CreateMockContext( + sessionContainer, + responseContent: responseJson, + statusCode: HttpStatusCode.OK); + + List operations = new List + { + new DistributedTransactionOperation( + OperationType.Create, + operationIndex: 0, + DatabaseName, + ContainerName, + new PartitionKey("pk1"), + id: "doc1") + }; + + DistributedTransactionCommitter committer = new DistributedTransactionCommitter( + operations, mockContext.Object); + + await committer.CommitTransactionAsync(CancellationToken.None); + + string storedToken = sessionContainer.GetSessionToken(DistributedTransactionConstants.GetCollectionFullName(DatabaseName, ContainerName)); + Assert.IsTrue(string.IsNullOrEmpty(storedToken), + $"SessionContainer should not be updated when partitionKeyRangeId is '{pkRangeId}' (empty/whitespace)."); } // ─── Retry / Spec-Compliance Tests ───────────────────────────────────── + [TestMethod] + [Description("m8: In a multi-operation response, an op whose pkRangeId is absent is skipped while " + + "subsequent ops with pkRangeId still have their session tokens merged correctly.")] + public async Task CommitTransactionAsync_MultiOp_SkipsOpWithMissingPkRangeId_MergesRemainingOps() + { + const string lsnOnly = "1#9#4=8#5=7"; + const string pkRangeId = "0"; + const string assembledToken = "0:1#9#4=8#5=7"; + const string container2 = "testcontainer2"; + + string collectionRid1 = ResourceId.NewDocumentCollectionId(42, 129).DocumentCollectionId.ToString(); + string collectionRid2 = ResourceId.NewDocumentCollectionId(42, 200).DocumentCollectionId.ToString(); + + Mock mockSessionContainer = new Mock(); + + MockDocumentClient documentClient = new MockDocumentClient + { + sessionContainer = mockSessionContainer.Object + }; + + ContainerProperties containerProperties1 = ContainerProperties.CreateWithResourceId(collectionRid1); + containerProperties1.PartitionKeyPath = "/pk"; + ContainerProperties containerProperties2 = ContainerProperties.CreateWithResourceId(collectionRid2); + containerProperties2.PartitionKeyPath = "/pk"; + + Mock mockContext = new Mock(); + mockContext.Setup(c => c.DocumentClient).Returns(documentClient); + mockContext.Setup(c => c.SerializerCore).Returns(MockCosmosUtil.Serializer); + mockContext.Setup(c => c.GetCachedContainerPropertiesAsync( + DistributedTransactionConstants.GetCollectionFullName(DatabaseName, ContainerName), + It.IsAny(), It.IsAny())) + .ReturnsAsync(containerProperties1); + mockContext.Setup(c => c.GetCachedContainerPropertiesAsync( + DistributedTransactionConstants.GetCollectionFullName(DatabaseName, container2), + It.IsAny(), It.IsAny())) + .ReturnsAsync(containerProperties2); + + // op 0: missing pkRangeId — should be skipped (SessionToken nulled in FromJson) + // op 1: has pkRangeId — should be merged + string responseJson = BuildDtcResponseJson(new[] + { + (statusCode: 201, subStatusCode: (int?)null, sessionToken: lsnOnly, partitionKeyRangeId: (string)null), + (statusCode: 201, subStatusCode: (int?)null, sessionToken: lsnOnly, partitionKeyRangeId: pkRangeId), + }); + + ResponseMessage responseMessage = new ResponseMessage(HttpStatusCode.OK) + { + Content = new MemoryStream(Encoding.UTF8.GetBytes(responseJson)) + }; + + mockContext.Setup(c => c.ProcessResourceOperationStreamAsync( + It.IsAny(), + ResourceType.DistributedTransactionBatch, + OperationType.CommitDistributedTransaction, + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(responseMessage); + + List operations = new List + { + new DistributedTransactionOperation( + OperationType.Create, operationIndex: 0, + DatabaseName, ContainerName, new PartitionKey("pk1"), id: "doc1"), + new DistributedTransactionOperation( + OperationType.Create, operationIndex: 1, + DatabaseName, container2, new PartitionKey("pk2"), id: "doc2"), + }; + + DistributedTransactionCommitter committer = new DistributedTransactionCommitter( + operations, mockContext.Object); + + await committer.CommitTransactionAsync(CancellationToken.None); + + // op 0 (missing pkRangeId) must NOT have been merged. + mockSessionContainer.Verify( + s => s.SetSessionToken( + collectionRid1, + DistributedTransactionConstants.GetCollectionFullName(DatabaseName, ContainerName), + It.IsAny()), + Times.Never, + "SetSessionToken must not be called for an operation whose pkRangeId is absent."); + + // op 1 (has pkRangeId) must have been merged with the assembled token. + mockSessionContainer.Verify( + s => s.SetSessionToken( + collectionRid2, + DistributedTransactionConstants.GetCollectionFullName(DatabaseName, container2), + It.Is(h => h[HttpConstants.HttpHeaders.SessionToken] == assembledToken)), + Times.Once, + "SetSessionToken must be called for the operation that has pkRangeId, with the assembled token."); + } + + [TestMethod] + [Description("m9: When an operation result has no partitionKeyRangeId, FromJson emits a TraceWarning " + + "so the skip is observable in diagnostic traces.")] + public async Task CommitTransactionAsync_EmitsTraceWarning_WhenPartitionKeyRangeIdIsAbsent() + { + const string lsnOnly = "1#9#4=8#5=7"; + + SessionContainer sessionContainer = new SessionContainer("testhost"); + + string responseJson = BuildDtcResponseJson( + new[] { (statusCode: 201, subStatusCode: (int?)null, sessionToken: lsnOnly, partitionKeyRangeId: (string)null) }); + + Mock mockContext = this.CreateMockContext( + sessionContainer, + responseContent: responseJson, + statusCode: HttpStatusCode.OK); + + List operations = new List + { + new DistributedTransactionOperation( + OperationType.Create, operationIndex: 0, + DatabaseName, ContainerName, new PartitionKey("pk1"), id: "doc1") + }; + + DistributedTransactionCommitter committer = new DistributedTransactionCommitter( + operations, mockContext.Object); + + List capturedWarnings = new List(); + System.Diagnostics.TraceListener listener = new DelegatingTraceListener( + (eventType, message) => + { + if (eventType == System.Diagnostics.TraceEventType.Warning) + { + capturedWarnings.Add(message); + } + }); + + System.Diagnostics.SourceLevels previousLevel = DefaultTrace.TraceSource.Switch.Level; + DefaultTrace.TraceSource.Switch.Level = System.Diagnostics.SourceLevels.All; + DefaultTrace.TraceSource.Listeners.Add(listener); + try + { + await committer.CommitTransactionAsync(CancellationToken.None); + } + finally + { + DefaultTrace.TraceSource.Listeners.Remove(listener); + DefaultTrace.TraceSource.Switch.Level = previousLevel; + } + + Assert.IsTrue( + capturedWarnings.Any(m => m.Contains("partitionKeyRangeId")), + "A TraceWarning mentioning 'partitionKeyRangeId' should be emitted when pkRangeId is absent."); + } + + + [TestMethod] + [Description("When SetSessionToken throws, the exception is swallowed and CommitTransactionAsync still returns the response rather than rethrowing")] + public async Task CommitTransactionAsync_SwallowsSetSessionTokenException() + { + const string lsnOnly = "1#9#4=8#5=7"; + const string pkRangeId = "0"; + + Mock mockSessionContainer = new Mock(); + mockSessionContainer + .Setup(s => s.SetSessionToken( + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Throws(new InvalidOperationException("simulated SetSessionToken failure")); + + MockDocumentClient documentClient = new MockDocumentClient + { + sessionContainer = mockSessionContainer.Object + }; + + ContainerProperties containerProperties = ContainerProperties.CreateWithResourceId(CollectionResourceId); + containerProperties.Id = "TestContainerId"; + containerProperties.PartitionKeyPath = "/pk"; + + Mock mockContext = new Mock(); + mockContext.Setup(c => c.DocumentClient).Returns(documentClient); + mockContext.Setup(c => c.SerializerCore).Returns(MockCosmosUtil.Serializer); + mockContext.Setup(c => c.GetCachedContainerPropertiesAsync( + It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(containerProperties); + + string responseJson = BuildDtcResponseJson( + new[] { (statusCode: 201, subStatusCode: (int?)null, sessionToken: lsnOnly, partitionKeyRangeId: pkRangeId) }); + + ResponseMessage responseMessage = new ResponseMessage(HttpStatusCode.OK) + { + Content = new MemoryStream(Encoding.UTF8.GetBytes(responseJson)) + }; + + mockContext.Setup(c => c.ProcessResourceOperationStreamAsync( + It.IsAny(), + ResourceType.DistributedTransactionBatch, + OperationType.CommitDistributedTransaction, + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(responseMessage); + + List operations = new List + { + new DistributedTransactionOperation( + OperationType.Create, + operationIndex: 0, + DatabaseName, + ContainerName, + new PartitionKey("pk1"), + id: "doc1") + }; + + DistributedTransactionCommitter committer = new DistributedTransactionCommitter( + operations, mockContext.Object); + + // Must not throw even though SetSessionToken throws internally. + DistributedTransactionResponse response = await committer.CommitTransactionAsync(CancellationToken.None); + Assert.IsNotNull(response, "CommitTransactionAsync should return a response even when SetSessionToken throws."); + Assert.AreEqual(HttpStatusCode.OK, response.StatusCode); + } + + [TestMethod] + [Description("When SetSessionToken throws OperationCanceledException, the exception must propagate — it must not be swallowed by the MergeSessionTokens catch block.")] + public async Task CommitTransactionAsync_PropagatesOperationCanceledException_FromSetSessionToken() + { + const string lsnOnly = "1#9#4=8#5=7"; + const string pkRangeId = "0"; + + Mock mockSessionContainer = new Mock(); + mockSessionContainer + .Setup(s => s.SetSessionToken( + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Throws(new OperationCanceledException("simulated cancellation in SetSessionToken")); + + Mock mockContext = this.CreateMockContext( + mockSessionContainer.Object, + responseContent: BuildDtcResponseJson( + new[] { (statusCode: 201, subStatusCode: (int?)null, sessionToken: lsnOnly, partitionKeyRangeId: pkRangeId) }), + statusCode: HttpStatusCode.OK); + + List operations = new List + { + new DistributedTransactionOperation( + OperationType.Create, + operationIndex: 0, + DatabaseName, + ContainerName, + new PartitionKey("pk1"), + id: "doc1") + }; + + DistributedTransactionCommitter committer = new DistributedTransactionCommitter( + operations, mockContext.Object); + + await Assert.ThrowsExceptionAsync( + () => committer.CommitTransactionAsync(CancellationToken.None), + "OperationCanceledException from SetSessionToken must propagate, not be swallowed."); + } + + [TestMethod] [Description("Verifies that a commit succeeds without retrying when the server returns a success response on the first attempt.")] public async Task CommitTransaction_SucceedsOnFirstAttempt() @@ -768,11 +1128,18 @@ private static string BuildDtcResponseJson( (int statusCode, string sessionToken)[] operations) { return BuildDtcResponseJson( - operations.Select(o => (o.statusCode, subStatusCode: (int?)null, o.sessionToken)).ToArray()); + operations.Select(o => (o.statusCode, subStatusCode: (int?)null, o.sessionToken, partitionKeyRangeId: (string)null)).ToArray()); } private static string BuildDtcResponseJson( (int statusCode, int? subStatusCode, string sessionToken)[] operations) + { + return BuildDtcResponseJson( + operations.Select(o => (o.statusCode, o.subStatusCode, o.sessionToken, partitionKeyRangeId: (string)null)).ToArray()); + } + + private static string BuildDtcResponseJson( + (int statusCode, int? subStatusCode, string sessionToken, string partitionKeyRangeId)[] operations) { StringBuilder sb = new StringBuilder(); sb.Append(@"{""operationResponses"":["); @@ -794,6 +1161,11 @@ private static string BuildDtcResponseJson( sb.Append($@",""{DistributedTransactionSerializer.SessionToken}"":""{operations[i].sessionToken}"""); } + if (operations[i].partitionKeyRangeId != null) + { + sb.Append($@",""{DistributedTransactionSerializer.PartitionKeyRangeId}"":""{operations[i].partitionKeyRangeId}"""); + } + sb.Append('}'); } @@ -979,5 +1351,35 @@ private static ResponseMessage CreateEmptyResponseMessage(HttpStatusCode statusC message.Headers.SubStatusCodeLiteral = subStatusCode.ToString(); return message; } + + /// + /// A that forwards each event to a delegate, + /// used in tests to assert that specific trace messages are emitted. + /// + private sealed class DelegatingTraceListener : System.Diagnostics.TraceListener + { + private readonly Action onEvent; + + public DelegatingTraceListener(Action onEvent) + => this.onEvent = onEvent; + + public override void Write(string message) { } + + public override void WriteLine(string message) { } + + public override void TraceEvent( + System.Diagnostics.TraceEventCache eventCache, + string source, + System.Diagnostics.TraceEventType eventType, + int id, + string format, + params object[] args) + { + string message = args != null && args.Length > 0 + ? string.Format(System.Globalization.CultureInfo.InvariantCulture, format, args) + : format; + this.onEvent(eventType, message); + } + } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/DistributedTransaction/DistributedTransactionResponseTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/DistributedTransaction/DistributedTransactionResponseTests.cs index fb93a14e1e..cf839d3f8f 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/DistributedTransaction/DistributedTransactionResponseTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/DistributedTransaction/DistributedTransactionResponseTests.cs @@ -557,13 +557,15 @@ public async Task FromResponseMessage_OperationResult_ETag_DeserializesCorrectly } [TestMethod] - [Description("SessionToken deserializes from the 'sessionToken' JSON property.")] + [Description("SessionToken is assembled as {pkRangeId}:{lsn} from the separate 'sessionToken' (LSN-only) and 'partitionKeyRangeId' JSON fields.")] public async Task FromResponseMessage_OperationResult_SessionToken_DeserializesCorrectly() { + const string lsnOnly = "12345"; + const string pkRangeId = "0"; const string expectedSessionToken = "0:12345"; DistributedTransactionServerRequest serverRequest = await BuildServerRequestAsync(operationCount: 1); - string json = $@"{{""operationResponses"":[{{""index"":0,""statusCode"":201,""sessionToken"":""{expectedSessionToken}""}}]}}"; + string json = $@"{{""operationResponses"":[{{""index"":0,""statusCode"":201,""sessionToken"":""{lsnOnly}"",""partitionKeyRangeId"":""{pkRangeId}""}}]}}"; ResponseMessage responseMessage = BuildResponseMessage(HttpStatusCode.OK, json); DistributedTransactionResponse response = await DistributedTransactionResponse.FromResponseMessageAsync( @@ -574,7 +576,144 @@ public async Task FromResponseMessage_OperationResult_SessionToken_DeserializesC CancellationToken.None); Assert.AreEqual(expectedSessionToken, response[0].SessionToken, - "SessionToken must equal the value from the JSON 'sessionToken' field."); + "SessionToken must be assembled as {pkRangeId}:{lsn} from the two separate JSON fields."); + } + + [TestMethod] + [Description("When partitionKeyRangeId is absent, FromJson sets SessionToken to null so MergeSessionTokens skips the operation.")] + // TODO(issue#5857): Remove once the coordinator starts emitting partitionKeyRangeId for all operations. + public async Task FromResponseMessage_OperationResult_SessionToken_NullWhenPartitionKeyRangeIdAbsent() + { + const string lsnOnly = "12345"; + DistributedTransactionServerRequest serverRequest = await BuildServerRequestAsync(operationCount: 1); + + // partitionKeyRangeId field is omitted — current server behavior + string json = $@"{{""operationResponses"":[{{""index"":0,""statusCode"":201,""sessionToken"":""{lsnOnly}""}}]}}"; + ResponseMessage responseMessage = BuildResponseMessage(HttpStatusCode.OK, json); + + DistributedTransactionResponse response = await DistributedTransactionResponse.FromResponseMessageAsync( + responseMessage, + serverRequest, + MockCosmosUtil.Serializer, + NoOpTrace.Singleton, + CancellationToken.None); + + Assert.IsNull(response[0].SessionToken, + "SessionToken must be null when partitionKeyRangeId is absent so the merge is skipped."); + } + + [DataTestMethod] + [DataRow("", DisplayName = "Empty string partitionKeyRangeId")] + [DataRow(" ", DisplayName = "Whitespace-only partitionKeyRangeId")] + [DataRow(" ", DisplayName = "Multiple spaces partitionKeyRangeId")] + [Description("When partitionKeyRangeId is present but empty or whitespace, FromJson sets SessionToken to null " + + "so MergeSessionTokens skips the operation. The server has no validation on this field and can " + + "send blank values; failing the commit would be worse than skipping the merge.")] + public async Task FromResponseMessage_OperationResult_SessionToken_NullWhenPartitionKeyRangeIdIsBlank(string pkRangeId) + { + const string lsnOnly = "12345"; + DistributedTransactionServerRequest serverRequest = await BuildServerRequestAsync(operationCount: 1); + + string json = $@"{{""operationResponses"":[{{""index"":0,""statusCode"":201,""sessionToken"":""{lsnOnly}"",""partitionKeyRangeId"":""{pkRangeId}""}}]}}"; + ResponseMessage responseMessage = BuildResponseMessage(HttpStatusCode.OK, json); + + DistributedTransactionResponse response = await DistributedTransactionResponse.FromResponseMessageAsync( + responseMessage, + serverRequest, + MockCosmosUtil.Serializer, + NoOpTrace.Singleton, + CancellationToken.None); + + Assert.IsNull(response[0].SessionToken, + $"SessionToken must be null when partitionKeyRangeId is '{pkRangeId}' (empty/whitespace) so the merge is safely skipped."); + } + + [DataTestMethod] + [DataRow(" ", DisplayName = "Single space sessionToken")] + [DataRow(" ", DisplayName = "Multiple spaces sessionToken")] + [Description("When sessionToken is whitespace-only, FromJson treats it the same as absent — SessionToken remains null.")] + public async Task FromResponseMessage_OperationResult_SessionToken_NullWhenSessionTokenIsWhitespace(string whitespaceToken) + { + DistributedTransactionServerRequest serverRequest = await BuildServerRequestAsync(operationCount: 1); + + string json = $@"{{""operationResponses"":[{{""index"":0,""statusCode"":201,""sessionToken"":""{whitespaceToken}"",""partitionKeyRangeId"":""0""}}]}}"; + ResponseMessage responseMessage = BuildResponseMessage(HttpStatusCode.OK, json); + + DistributedTransactionResponse response = await DistributedTransactionResponse.FromResponseMessageAsync( + responseMessage, + serverRequest, + MockCosmosUtil.Serializer, + NoOpTrace.Singleton, + CancellationToken.None); + + Assert.IsNull(response[0].SessionToken, + $"SessionToken must be null when the sessionToken value is whitespace ('{whitespaceToken}')."); + } + + [DataTestMethod] + [DataRow("0:-1#425344#1=12345", "0:-1#425344#1=12345", DisplayName = "Well-formed canonical token preserved")] + [DataRow("3:500", "3:500", DisplayName = "Simple pkRangeId:lsn preserved")] + [Description("When sessionToken is already in canonical {pkRangeId}:{lsn} form (colon at position > 0 with content on both sides), FromJson leaves it as-is even without partitionKeyRangeId.")] + public async Task FromResponseMessage_OperationResult_SessionToken_PreservedWhenAlreadyCanonical(string token, string expected) + { + DistributedTransactionServerRequest serverRequest = await BuildServerRequestAsync(operationCount: 1); + + // sessionToken is already canonical; partitionKeyRangeId is absent + string json = $@"{{""operationResponses"":[{{""index"":0,""statusCode"":201,""sessionToken"":""{token}""}}]}}"; + ResponseMessage responseMessage = BuildResponseMessage(HttpStatusCode.OK, json); + + DistributedTransactionResponse response = await DistributedTransactionResponse.FromResponseMessageAsync( + responseMessage, + serverRequest, + MockCosmosUtil.Serializer, + NoOpTrace.Singleton, + CancellationToken.None); + + Assert.AreEqual(expected, response[0].SessionToken, + $"A well-formed canonical token '{token}' must be left as-is."); + } + + [DataTestMethod] + [DataRow(":-1#425344", "3", DisplayName = "Leading colon (no pkRangeId) — not canonical, assembles with pkRangeId")] + [DataRow("3:", "5", DisplayName = "Trailing colon only (no LSN) — not canonical, assembles with pkRangeId")] + [Description("Session tokens with a colon at position 0 or at the last character are not valid canonical tokens — they get assembled with the provided partitionKeyRangeId.")] + public async Task FromResponseMessage_OperationResult_SessionToken_AssembledWhenColonIsAtEdge(string token, string pkRangeId) + { + DistributedTransactionServerRequest serverRequest = await BuildServerRequestAsync(operationCount: 1); + + string json = $@"{{""operationResponses"":[{{""index"":0,""statusCode"":201,""sessionToken"":""{token}"",""partitionKeyRangeId"":""{pkRangeId}""}}]}}"; + ResponseMessage responseMessage = BuildResponseMessage(HttpStatusCode.OK, json); + + DistributedTransactionResponse response = await DistributedTransactionResponse.FromResponseMessageAsync( + responseMessage, + serverRequest, + MockCosmosUtil.Serializer, + NoOpTrace.Singleton, + CancellationToken.None); + + Assert.AreEqual(pkRangeId + ":" + token, response[0].SessionToken, + $"A token with edge colon '{token}' must be assembled with pkRangeId '{pkRangeId}'."); + } + + [TestMethod] + [Description("When sessionToken is absent entirely, SessionToken remains null regardless of partitionKeyRangeId.")] + public async Task FromResponseMessage_OperationResult_SessionToken_NullWhenSessionTokenFieldAbsent() + { + DistributedTransactionServerRequest serverRequest = await BuildServerRequestAsync(operationCount: 1); + + // Neither sessionToken nor partitionKeyRangeId present + string json = @"{""operationResponses"":[{""index"":0,""statusCode"":201}]}"; + ResponseMessage responseMessage = BuildResponseMessage(HttpStatusCode.OK, json); + + DistributedTransactionResponse response = await DistributedTransactionResponse.FromResponseMessageAsync( + responseMessage, + serverRequest, + MockCosmosUtil.Serializer, + NoOpTrace.Singleton, + CancellationToken.None); + + Assert.IsNull(response[0].SessionToken, + "SessionToken must be null when the sessionToken JSON field is absent."); } // IsRetriable parsing