Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] Binary Encoding: Adds Binary Encoding Support for Point Operations #4652

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2cdf16b
Code changes to support binary encoding for point operations.
kundadebdatta Aug 23, 2024
387005b
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Sep 18, 2024
c0e5952
Code changes to introduce cosmos buffered stream wrapper.
kundadebdatta Sep 25, 2024
07eff74
Code changes to remove unnecessary using statement.
kundadebdatta Sep 25, 2024
265a7d3
Code changes to remove the pooling logic.
kundadebdatta Sep 25, 2024
bcdc75e
Code changes to port fixes into newtonsoft reader and writer to addre…
kundadebdatta Sep 25, 2024
4e35d0f
Code changes to set inner stream on disposal of buffered stream.
kundadebdatta Sep 26, 2024
4ac933f
Minor cosmetic code changes.
kundadebdatta Sep 26, 2024
8033075
Code changes to use clonable stream instead of buffered stream.
kundadebdatta Sep 27, 2024
4359dc8
Revert "Code changes to use clonable stream instead of buffered stream."
kundadebdatta Oct 1, 2024
88d5431
Code changes to use clonable stream from request handler when the out…
kundadebdatta Oct 2, 2024
abbcb11
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 2, 2024
16583c3
Code changes to address review comments.
kundadebdatta Oct 2, 2024
b3da27d
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 2, 2024
7fc7407
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 3, 2024
c275cf1
Code changes to fix test failures.
kundadebdatta Oct 3, 2024
da1f135
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 7, 2024
fbf9ffc
Code changes to address review comments.
kundadebdatta Oct 7, 2024
96c8581
Adding more stream tests.
kundadebdatta Oct 7, 2024
2baa15c
Code changes to fix batch item emulator tests.
kundadebdatta Oct 7, 2024
a97101a
Code changes to use cloneable stream in buffered stream.
kundadebdatta Oct 9, 2024
6981e01
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 9, 2024
9ed19b6
Code changes to fix build failures.
kundadebdatta Oct 9, 2024
dbc9596
Code changes to match ContainerCore.Items to master
kundadebdatta Oct 9, 2024
3929404
Code changes to fix build failures.
kundadebdatta Oct 9, 2024
b8151e0
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 9, 2024
cc67f67
Code changes to address review comments.
kundadebdatta Oct 10, 2024
de8aab6
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 10, 2024
d72dbe7
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 10, 2024
c932a1f
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 10, 2024
8855ebd
Code changes to fix container creation using binary encoding.
kundadebdatta Oct 16, 2024
7009570
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 18, 2024
f67ae93
Code changes to add internal types in a common place.
kundadebdatta Oct 18, 2024
b2b6af7
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 22, 2024
aea8a3a
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 23, 2024
3245664
Code changes to remove ContentSerializationFormat. Addressing minor r…
kundadebdatta Oct 23, 2024
cce11dd
Merge branch 'master' into users/dkunda/4644_binary_encoding_for_poin…
kundadebdatta Oct 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace Microsoft.Azure.Cosmos.Handlers
internal class RequestInvokerHandler : RequestHandler
{
private static readonly HttpMethod httpPatchMethod = new HttpMethod(HttpConstants.HttpMethods.Patch);
private static readonly string BinarySerializationFormat = SupportedSerializationFormats.CosmosBinary.ToString();
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
private static (bool, ResponseMessage) clientIsValid = (false, null);

private readonly CosmosClient client;
Expand Down Expand Up @@ -67,6 +68,12 @@ public override async Task<ResponseMessage> SendAsync(
request.Headers.Add(HttpConstants.HttpHeaders.Prefer, HttpConstants.HttpHeaderValues.PreferReturnMinimal);
}

if (ConfigurationManager.IsBinaryEncodingEnabled()
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
&& RequestInvokerHandler.IsPointOperationSupportedForBinaryEncoding(request))
{
request.Headers.Add(HttpConstants.HttpHeaders.SupportedSerializationFormats, RequestInvokerHandler.BinarySerializationFormat);
}

await this.ValidateAndSetConsistencyLevelAsync(request);
this.SetPriorityLevel(request);

Expand Down Expand Up @@ -94,6 +101,14 @@ public override async Task<ResponseMessage> SendAsync(
((CosmosTraceDiagnostics)response.Diagnostics).Value.AddOrUpdateDatum("ExcludedRegions", request.RequestOptions.ExcludeRegions);
}

if (ConfigurationManager.IsBinaryEncodingEnabled()
&& RequestInvokerHandler.IsPointOperationSupportedForBinaryEncoding(request)
&& response.Content != null
&& response.Content is not CloneableStream)
{
response.Content = await StreamExtension.AsClonableStreamAsync(response.Content, default);
}

return response;
}

Expand Down Expand Up @@ -547,6 +562,16 @@ private static bool IsItemNoRepsonseSet(bool enableContentResponseOnWrite, Opera
operationType == OperationType.Patch);
}

private static bool IsPointOperationSupportedForBinaryEncoding(RequestMessage request)
{
return request.ResourceType == ResourceType.Document
&& (request.OperationType == OperationType.Create
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
|| request.OperationType == OperationType.Replace
|| request.OperationType == OperationType.Delete
|| request.OperationType == OperationType.Read
|| request.OperationType == OperationType.Upsert);
}

private static bool IsClientNoResponseSet(CosmosClientOptions clientOptions, OperationType operationType)
{
return clientOptions != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public override byte[] ReadAsBytes()
public override DateTime? ReadAsDateTime()
{
this.Read();
if (this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
if (this.jsonReader.CurrentTokenType == JsonTokenType.Null || this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
{
return null;
}
Expand All @@ -211,7 +211,7 @@ public override byte[] ReadAsBytes()
public override DateTimeOffset? ReadAsDateTimeOffset()
{
this.Read();
if (this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
if (this.jsonReader.CurrentTokenType == JsonTokenType.Null || this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
{
return null;
}
Expand Down Expand Up @@ -260,7 +260,7 @@ public override byte[] ReadAsBytes()
public override string ReadAsString()
{
this.Read();
if (this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
if (this.jsonReader.CurrentTokenType == JsonTokenType.Null || this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
{
return null;
}
Expand All @@ -278,7 +278,7 @@ public override string ReadAsString()
private double? ReadNumberValue()
{
this.Read();
if (this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
if (this.jsonReader.CurrentTokenType == JsonTokenType.Null || this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
{
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ public override void WriteValue(bool value)
/// <param name="value">The <see cref="short"/> value to write.</param>
public override void WriteValue(short value)
{
base.WriteValue((long)value);
base.WriteValue((Int16)value);
this.jsonWriter.WriteInt16Value(value);
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
}

/// <summary>
Expand Down
19 changes: 19 additions & 0 deletions Microsoft.Azure.Cosmos/src/RequestOptions/ItemRequestOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,25 @@ public ConsistencyLevel? ConsistencyLevel
/// </remarks>
public DedicatedGatewayRequestOptions DedicatedGatewayRequestOptions { get; set; }

/// <summary>
/// Gets or sets the boolean to enable binary response for point operations like Create, Upsert, Read, Patch, and Replace.
/// Setting this option to true will cause the response to be in binary format. This request option will remain internal only
/// since the consumer of thie flag will be the internal components of the cosmos db ecosystem.
/// </summary>
/// <example>
/// <code language="c#">
/// <![CDATA[
/// ItemRequestOptions requestOptions = new ItemRequestOptions() { EnableBinaryResponseOnPointOperations = true };
/// ResponseMessage responseMessage = await container.CreateItemStreamAsync(createStream, new Cosmos.PartitionKey(comment.pk), requestOptions);
/// Assert.AreEqual(HttpStatusCode.Created, responseMessage.StatusCode);
/// ]]>
/// </code>
/// </example>
/// <remarks>
/// This is optimal for workloads where the returned resource can be processed in binary format.
/// </remarks>
internal bool EnableBinaryResponseOnPointOperations { get; set; }
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Fill the CosmosRequestMessage headers with the set properties
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public async Task<ResponseMessage> CreateItemStreamAsync(
streamPayload: streamPayload,
operationType: OperationType.Create,
requestOptions: requestOptions,
trace: trace,
trace: trace,
targetResponseSerializationFormat: JsonSerializationFormat.Text,
cancellationToken: cancellationToken);
}

Expand Down Expand Up @@ -100,7 +101,8 @@ public async Task<ResponseMessage> ReadItemStreamAsync(
streamPayload: null,
operationType: OperationType.Read,
requestOptions: requestOptions,
trace: trace,
trace: trace,
targetResponseSerializationFormat: JsonSerializationFormat.Text,
cancellationToken: cancellationToken);
}

Expand All @@ -117,7 +119,8 @@ public async Task<ItemResponse<T>> ReadItemAsync<T>(
streamPayload: null,
operationType: OperationType.Read,
requestOptions: requestOptions,
trace: trace,
trace: trace,
targetResponseSerializationFormat: default,
cancellationToken: cancellationToken);

return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
Expand All @@ -136,7 +139,8 @@ public async Task<ResponseMessage> UpsertItemStreamAsync(
streamPayload: streamPayload,
operationType: OperationType.Upsert,
requestOptions: requestOptions,
trace: trace,
trace: trace,
targetResponseSerializationFormat: JsonSerializationFormat.Text,
cancellationToken: cancellationToken);
}

Expand Down Expand Up @@ -178,7 +182,8 @@ public async Task<ResponseMessage> ReplaceItemStreamAsync(
streamPayload: streamPayload,
operationType: OperationType.Replace,
requestOptions: requestOptions,
trace: trace,
trace: trace,
targetResponseSerializationFormat: JsonSerializationFormat.Text,
cancellationToken: cancellationToken);
}

Expand Down Expand Up @@ -225,7 +230,8 @@ public async Task<ResponseMessage> DeleteItemStreamAsync(
streamPayload: null,
operationType: OperationType.Delete,
requestOptions: requestOptions,
trace: trace,
trace: trace,
targetResponseSerializationFormat: JsonSerializationFormat.Text,
cancellationToken: cancellationToken);
}

Expand All @@ -242,7 +248,8 @@ public async Task<ItemResponse<T>> DeleteItemAsync<T>(
streamPayload: null,
operationType: OperationType.Delete,
requestOptions: requestOptions,
trace: trace,
trace: trace,
targetResponseSerializationFormat: default,
cancellationToken: cancellationToken);

return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
Expand Down Expand Up @@ -853,7 +860,8 @@ private async Task<ResponseMessage> ExtractPartitionKeyAndProcessItemStreamAsync
itemStream,
operationType,
requestOptions,
trace: trace,
trace: trace,
targetResponseSerializationFormat: default,
cancellationToken: cancellationToken);
}

Expand All @@ -868,7 +876,8 @@ private async Task<ResponseMessage> ExtractPartitionKeyAndProcessItemStreamAsync
itemStream,
operationType,
requestOptions,
trace: trace,
trace: trace,
targetResponseSerializationFormat: default,
cancellationToken: cancellationToken);

if (responseMessage.IsSuccessStatusCode)
Expand Down Expand Up @@ -897,7 +906,8 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(
Stream streamPayload,
OperationType operationType,
ItemRequestOptions requestOptions,
ITrace trace,
ITrace trace,
JsonSerializationFormat? targetResponseSerializationFormat,
CancellationToken cancellationToken)
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
{
if (trace == null)
Expand All @@ -912,6 +922,11 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(

ContainerInternal.ValidatePartitionKey(partitionKey, requestOptions);
string resourceUri = this.GetResourceUri(requestOptions, operationType, itemId);

// Convert Text to Binary Stream.
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved
streamPayload = CosmosSerializationUtil.TrySerializeStreamToTargetFormat(
targetSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
inputStream: streamPayload == null ? null : await StreamExtension.AsClonableStreamAsync(streamPayload));

ResponseMessage responseMessage = await this.ClientContext.ProcessResourceOperationStreamAsync(
resourceUri: resourceUri,
Expand All @@ -925,6 +940,16 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(
requestEnricher: null,
trace: trace,
cancellationToken: cancellationToken);

// Convert Binary Stream to Text.
if (targetResponseSerializationFormat.HasValue
&& (requestOptions == null || !requestOptions.EnableBinaryResponseOnPointOperations)
&& responseMessage?.Content is CloneableStream outputCloneableStream)
{
responseMessage.Content = CosmosSerializationUtil.TrySerializeStreamToTargetFormat(
targetSerializationFormat: targetResponseSerializationFormat.Value,
inputStream: outputCloneableStream);
}

return responseMessage;
}
Expand Down Expand Up @@ -1217,7 +1242,8 @@ public Task<ResponseMessage> PatchItemStreamAsync(
streamPayload: streamPayload,
operationType: OperationType.Patch,
requestOptions: requestOptions,
trace: trace,
trace: trace,
targetResponseSerializationFormat: JsonSerializationFormat.Text,
cancellationToken: cancellationToken);
}

Expand Down Expand Up @@ -1255,6 +1281,13 @@ private ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderPrivate(
applyBuilderConfiguration: changeFeedProcessor.ApplyBuildConfiguration).WithChangeFeedMode(mode);
}
kundadebdatta marked this conversation as resolved.
Show resolved Hide resolved

private static JsonSerializationFormat GetTargetRequestSerializationFormat()
{
return ConfigurationManager.IsBinaryEncodingEnabled()
? JsonSerializationFormat.Binary
: JsonSerializationFormat.Text;
}

/// <summary>
/// This method is useful for determining if a smaller, more granular feed range (y) is fully contained within a broader feed range (x), which is a common operation in distributed systems to manage partitioned data.
///
Expand Down
Loading