Skip to content

Commit

Permalink
Code changes to update STJ Serializer
Browse files Browse the repository at this point in the history
  • Loading branch information
kundadebdatta committed Sep 18, 2024
1 parent b7e8a06 commit 094faa0
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public async Task<ResponseMessage> CreateItemStreamAsync(
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken,
targetRequestSerializationFormat: ConfigurationManager.IsBinaryEncodingEnabled() ? JsonSerializationFormat.Binary : JsonSerializationFormat.Text,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
targetResponseSerializationFormat: JsonSerializationFormat.Text);
}

Expand All @@ -81,7 +81,8 @@ public async Task<ItemResponse<T>> CreateItemAsync<T>(
itemId: null,
item: item,
operationType: OperationType.Create,
requestOptions: requestOptions,
requestOptions: requestOptions,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
trace: trace,
cancellationToken: cancellationToken);

Expand All @@ -103,7 +104,7 @@ public async Task<ResponseMessage> ReadItemStreamAsync(
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken,
targetRequestSerializationFormat: ConfigurationManager.IsBinaryEncodingEnabled() ? JsonSerializationFormat.Binary : JsonSerializationFormat.Text,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
targetResponseSerializationFormat: JsonSerializationFormat.Text);
}

Expand All @@ -119,7 +120,8 @@ public async Task<ItemResponse<T>> ReadItemAsync<T>(
itemId: id,
streamPayload: null,
operationType: OperationType.Read,
requestOptions: requestOptions,
requestOptions: requestOptions,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
trace: trace,
cancellationToken: cancellationToken);

Expand All @@ -141,7 +143,7 @@ public async Task<ResponseMessage> UpsertItemStreamAsync(
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken,
targetRequestSerializationFormat: ConfigurationManager.IsBinaryEncodingEnabled() ? JsonSerializationFormat.Binary : JsonSerializationFormat.Text,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
targetResponseSerializationFormat: JsonSerializationFormat.Text);
}

Expand All @@ -162,7 +164,8 @@ public async Task<ItemResponse<T>> UpsertItemAsync<T>(
itemId: null,
item: item,
operationType: OperationType.Upsert,
requestOptions: requestOptions,
requestOptions: requestOptions,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
trace: trace,
cancellationToken: cancellationToken);

Expand All @@ -185,7 +188,7 @@ public async Task<ResponseMessage> ReplaceItemStreamAsync(
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken,
targetRequestSerializationFormat: ConfigurationManager.IsBinaryEncodingEnabled() ? JsonSerializationFormat.Binary : JsonSerializationFormat.Text,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
targetResponseSerializationFormat: JsonSerializationFormat.Text);
}

Expand All @@ -212,7 +215,8 @@ public async Task<ItemResponse<T>> ReplaceItemAsync<T>(
itemId: id,
item: item,
operationType: OperationType.Replace,
requestOptions: requestOptions,
requestOptions: requestOptions,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
trace: trace,
cancellationToken: cancellationToken);

Expand All @@ -234,7 +238,7 @@ public async Task<ResponseMessage> DeleteItemStreamAsync(
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken,
targetRequestSerializationFormat: ConfigurationManager.IsBinaryEncodingEnabled() ? JsonSerializationFormat.Binary : JsonSerializationFormat.Text,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
targetResponseSerializationFormat: JsonSerializationFormat.Text);
}

Expand All @@ -250,7 +254,8 @@ public async Task<ItemResponse<T>> DeleteItemAsync<T>(
itemId: id,
streamPayload: null,
operationType: OperationType.Delete,
requestOptions: requestOptions,
requestOptions: requestOptions,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
trace: trace,
cancellationToken: cancellationToken);

Expand Down Expand Up @@ -838,7 +843,8 @@ private async Task<ResponseMessage> ExtractPartitionKeyAndProcessItemStreamAsync
string itemId,
T item,
OperationType operationType,
ItemRequestOptions requestOptions,
ItemRequestOptions requestOptions,
JsonSerializationFormat? targetRequestSerializationFormat,
ITrace trace,
CancellationToken cancellationToken)
{
Expand All @@ -863,7 +869,8 @@ private async Task<ResponseMessage> ExtractPartitionKeyAndProcessItemStreamAsync
operationType,
requestOptions,
trace: trace,
cancellationToken: cancellationToken);
cancellationToken: cancellationToken,
targetRequestSerializationFormat: targetRequestSerializationFormat);
}

PartitionKeyMismatchRetryPolicy requestRetryPolicy = null;
Expand All @@ -876,7 +883,8 @@ private async Task<ResponseMessage> ExtractPartitionKeyAndProcessItemStreamAsync
itemId,
itemStream,
operationType,
requestOptions,
requestOptions,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
trace: trace,
cancellationToken: cancellationToken);

Expand Down Expand Up @@ -908,7 +916,7 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(
ItemRequestOptions requestOptions,
ITrace trace,
CancellationToken cancellationToken,
JsonSerializationFormat? targetRequestSerializationFormat = default,
JsonSerializationFormat? targetRequestSerializationFormat,
JsonSerializationFormat? targetResponseSerializationFormat = default)
{
if (trace == null)
Expand Down Expand Up @@ -1264,7 +1272,7 @@ public Task<ResponseMessage> PatchItemStreamAsync(
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken,
targetRequestSerializationFormat: ConfigurationManager.IsBinaryEncodingEnabled() ? JsonSerializationFormat.Binary : JsonSerializationFormat.Text,
targetRequestSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
targetResponseSerializationFormat: JsonSerializationFormat.Text);
}

Expand Down Expand Up @@ -1300,6 +1308,13 @@ private ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderPrivate(
container: this,
changeFeedProcessor: changeFeedProcessor,
applyBuilderConfiguration: changeFeedProcessor.ApplyBuildConfiguration).WithChangeFeedMode(mode);
}

private static JsonSerializationFormat GetTargetRequestSerializationFormat()
{
return ConfigurationManager.IsBinaryEncodingEnabled()
? JsonSerializationFormat.Binary
: JsonSerializationFormat.Text;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ internal static Stream ConvertToStreamUsingJsonSerializationFormat(

byte[] formattedBytes = writer.GetResult().ToArray();

return new MemoryStream(formattedBytes, index: 0, count: formattedBytes.Length, writable: false, publiclyVisible: true);
return new MemoryStream(formattedBytes, index: 0, count: formattedBytes.Length, writable: true, publiclyVisible: true);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ namespace Microsoft.Azure.Cosmos
using System.Reflection;
using System.Text.Json;
using System.Text.Json.Serialization;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Json;

/// <summary>
/// This class provides a default implementation of System.Text.Json Cosmos Linq Serializer.
Expand Down Expand Up @@ -49,8 +51,23 @@ public override T FromStream<T>(Stream stream)

using (stream)
{
using StreamReader reader = new (stream);
return JsonSerializer.Deserialize<T>(reader.ReadToEnd(), this.jsonSerializerOptions);
if (CosmosSerializationUtil.CheckFirstBufferByte(
stream,
JsonSerializationFormat.Binary,
out byte[] content))
{
if (CosmosObject.TryCreateFromBuffer(content, out CosmosObject cosmosObject))
{
return System.Text.Json.JsonSerializer.Deserialize<T>(cosmosObject.ToString(), this.jsonSerializerOptions);
}
else
{
using Stream textStream = CosmosSerializationUtil.ConvertToStreamUsingJsonSerializationFormat(content, JsonSerializationFormat.Text);
return this.DeserializeStream<T>(textStream);
}
}

return this.DeserializeStream<T>(stream);
}
}

Expand All @@ -60,7 +77,7 @@ public override Stream ToStream<T>(T input)
MemoryStream streamPayload = new ();
using Utf8JsonWriter writer = new (streamPayload);

JsonSerializer.Serialize(writer, input, this.jsonSerializerOptions);
System.Text.Json.JsonSerializer.Serialize(writer, input, this.jsonSerializerOptions);

streamPayload.Position = 0;
return streamPayload;
Expand Down Expand Up @@ -100,5 +117,18 @@ public override string SerializeMemberName(MemberInfo memberInfo)

return memberInfo.Name;
}

/// <summary>
/// Deserializes the stream into the specified type using STJ Serializer.
/// </summary>
/// <typeparam name="T">The desired type, the input stream to be deserialize into</typeparam>
/// <param name="stream">An instance of <see cref="Stream"/> containing th raw input stream.</param>
/// <returns>The deserialized output of type <typeparamref name="T"/>.</returns>
private T DeserializeStream<T>(
Stream stream)
{
using StreamReader reader = new (stream);
return System.Text.Json.JsonSerializer.Deserialize<T>(reader.ReadToEnd(), this.jsonSerializerOptions);
}
}
}
Loading

0 comments on commit 094faa0

Please sign in to comment.