Skip to content

Commit

Permalink
[Internal] Binary Encoding: Adds Binary Encoding Support for Point Op…
Browse files Browse the repository at this point in the history
…erations (#4652)

# Pull Request Template

# Description

This PR introduces binary encoding support on request and responses for
different Point operations.

## What is Binary Encoding?

As the name suggests, binary encoding is a encoding mechanism through
which the request payload will be encoded to binary first and sent to
backend for processing. Decoding to Text will happen on the response
path. The biggest benefit of binary encoding is to reduce cost on
backend storage which helps to reduce the overall COGS.

## Scope

The point operations that are currently in and out of scope for binary
encoding are given below in tabular format:

| Operations Currently in Scope| Operations Currently Out of Scope|
Reason for Out of Scope|
| --- | --- | --- |
| `CreateItemAsync()` | `PatchItemAsync()` | Operation Currently not
Supported in BE
| `CreateItemStreamAsync()` | `PatchItemStreamAsync()` | Operation
Currently not Supported in BE
| `ReadItemAsync()` | `TransactionalBatches` | Operation Currently not
Supported in BE
| `ReadItemStreamAsync()` | `Bulk APIs` | Operation Currently not
Supported in BE
| `UpsertItemAsync()` |  | |
| `UpsertItemStreamAsync()` |  | |
| `RepalceItemAsync()` |  | |
| `ReplaceItemStreamAsync()` |  | |
| `DeleteItemAsync()` |  | |
| `DeleteItemStreamAsync()` |  | |

## How to Enable Binary Encoding?

This PR introduces a new environment variable
`AZURE_COSMOS_BINARY_ENCODING_ENABLED` to opt-in or opt-out the binary
encoding feature on demand. Setting this environment variable to `True`
will enable Binary encoding support.

## How Binary Encoding has been Achieved?

The binary encoding in the .NET SDK has been divided into two parts
which are applicable differently for `ItemAsync()` and
`ItemStreamAsync()` apis. The details are given below:

- **`ItemAsync()` APIs:** Currently the `CosmosJsonDotNetSerializer` has
been refactored to read and write the binary bits directly into the
stream. This reduces any conversion of the text stream to binary and
vice versa and makes the serialization and de-serialization process even
faster.

- **`ItemStreamAsync()` APIs:** For these APIs, there are literally no
serializes involved and the stream is returned directly to the caller.
Therefore, this flow converts a Text stream into Binary and does the
opposite on the response path. Conversion is a little bit costlier
operation, in comparison with directly writing the binary stream using
the serializer. Note that, irrespective of the binary encoding feature
enabled or disabled, the output stream will always be in Text format,
unless otherwise requested explicitly.

## Are There Any Way to Request Binary Bits on Response?

The answer is yes. We introduced a new internal request option:
`EnableBinaryResponseOnPointOperations` in the `ItemRequestOptions`, and
setting this flag to `True` will not do any Text conversation, and will
return the raw binary bits to the caller. However, please note that this
option is applicable only for the `ItemStreamAsync()` APIs and will be
helpful for some of the internal teams.

## Flow Diagrams ##

To understand the changes better, please take a look at the flow
diagrams below for both `ItemAsync()` and `ItemStreamAsync()` APIs.

**Flow Diagram for `ItemAsync()` APIs that are in Scope per the Above
Table:**

```mermaid
flowchart TD
    A[All 'ItemAsync' APIs in Scope] -->|SerializerCore.ToStream| B{Select <br> Serializer}
    B -->|One| C[CosmosJsonDotNetSerializer]
    B -->|Two| D[CosmosSystemTextJsonSerializer]
    B -->|Three| E[Any Custom <br> Serializer]
    C -->|Serialize to <br> Binary Stream| F[ContainerCore<br>.ProcessItemStreamAsync]
    D -->|Serialize to <br> Text Stream| F[ContainerCore<br>.ProcessItemStreamAsync]
    E -->|Stream may or <br> may not be <br> Serialized to Binary| F[ContainerCore<br>.ProcessItemStreamAsync]
    F --> G{Is Input <br> Stream in <br> Binary ?}
    G -->|True| I[ProcessResourceOperationStreamAsync]
    G -->|False| H[Convert Input Text <br> Stream to <br> Binary Stream]
    H --> I 
    I --> |SendAsync| J[RequestInvokerHandler]
    J --> |Sets following headers to request response in binary format: 
    x-ms-cosmos-supported-serialization-formats = CosmosBinary
    x-ms-documentdb-content-serialization-format = CosmosBinary| K[TransportHandler]
    K --> |Binary Response <br> Stream|L[ContainerCore<br>.ProcessItemStreamAsync]
    L --> |Note: No explicit conversion to binary stream happens because we let the serializer directly de-serialize the binary stream into text. SerializerCore.FromStream| M{Select <br> Serializer}
    M -->|One| N[CosmosJsonDotNetSerializer]
    M -->|Two| O[CosmosSystemTextJsonSerializer]
    M -->|Three| P[Any Custom <br> Serializer]    
    N -->|De-Serialize to <br> Text Stream| Q[Container<br>.ItemAsync Response]
    O -->|De-Serialize to <br> Text Stream| Q[Container<br>.ItemAsync Response]
    P -->|Stream may or <br> may not be <br> De-Serialized to Text| Q[Container<br>.ItemAsync Response <br> in Text]
```

**Flow Diagram for `ItemStreamAsync()` APIs that are in Scope per the
Above Table:**

```mermaid
flowchart TD
    A[All 'ItemStreamAsync' APIs in Scope]
    A -->|Stream may or <br> may not be <br> Serialized to Binary| F[ContainerCore<br>.ProcessItemStreamAsync]
    F --> G{Is Input <br> Stream in <br> Binary ?}
    G -->|True| I[ProcessResourceOperationStreamAsync]
    G -->|False| H[Convert Input Text <br> Stream to <br> Binary Stream]
    H --> I 
    I --> |SendAsync| J[RequestInvokerHandler]
    J --> |Sets following headers to get binary response: 
    x-ms-cosmos-supported-serialization-formats = CosmosBinary
    x-ms-documentdb-content-serialization-format = CosmosBinary| K[TransportHandler]
    K --> |Binary Response <br> Stream|L[ContainerCore<br>.ProcessItemStreamAsync]
    L --> M{Is Response <br> Stream in <br> Binary ?}
    M -->|Yes| N[CosmosSerializationUtil]
    M -->|No| Q
    N -->|Convert Binary Stream to <br> Text Stream| Q[Container<br>.ItemAsync Response]
```

## Performance Testing

Below are the comparison results for the perf testing done on the master
branch and the current feature branch with binary encoding disabled:

``` ini

BenchmarkDotNet=v0.13.5, OS=ubuntu 20.04
Intel Xeon Platinum 8272CL CPU 2.60GHz, 1 CPU, 16 logical and 8 physical cores
.NET SDK=6.0.427
  [Host]  : .NET 6.0.35 (6.0.3524.45918), X64 RyuJIT AVX2
  LongRun : .NET 6.0.35 (6.0.3524.45918), X64 RyuJIT AVX2

Job=LongRun  IterationCount=100  LaunchCount=3  
RunStrategy=Throughput  WarmupCount=15

```

**Benchmark Results with No Binary Encoding on master branch:**


![image](https://github.com/user-attachments/assets/3fa3407d-1fbd-45f9-9be0-e0257fc055da)


**Benchmark Results with Binary Encoding Disabled on feature branch:**


![image](https://github.com/user-attachments/assets/dbac9e9a-ba1f-48e7-b231-1fe318967ab5)


Benchmark results comparison in terms of percentage between `master` and
`feature` branch:


![image](https://github.com/user-attachments/assets/6112d018-0801-4384-bf11-4c093da0ef23)


## Type of change

Please delete options that are not relevant.

- [x] New feature (non-breaking change which adds functionality)

## Closing issues

To automatically close an issue: closes #4644
  • Loading branch information
kundadebdatta authored Oct 23, 2024
1 parent f1e5c2f commit f6ae4c4
Show file tree
Hide file tree
Showing 17 changed files with 2,821 additions and 1,066 deletions.
25 changes: 25 additions & 0 deletions Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace Microsoft.Azure.Cosmos.Handlers
internal class RequestInvokerHandler : RequestHandler
{
private static readonly HttpMethod httpPatchMethod = new HttpMethod(HttpConstants.HttpMethods.Patch);
private static readonly string BinarySerializationFormat = SupportedSerializationFormats.CosmosBinary.ToString();
private static (bool, ResponseMessage) clientIsValid = (false, null);

private readonly CosmosClient client;
Expand Down Expand Up @@ -67,6 +68,12 @@ public override async Task<ResponseMessage> SendAsync(
request.Headers.Add(HttpConstants.HttpHeaders.Prefer, HttpConstants.HttpHeaderValues.PreferReturnMinimal);
}

if (ConfigurationManager.IsBinaryEncodingEnabled()
&& RequestInvokerHandler.IsPointOperationSupportedForBinaryEncoding(request))
{
request.Headers.Add(HttpConstants.HttpHeaders.SupportedSerializationFormats, RequestInvokerHandler.BinarySerializationFormat);
}

await this.ValidateAndSetConsistencyLevelAsync(request);
this.SetPriorityLevel(request);

Expand Down Expand Up @@ -94,6 +101,14 @@ public override async Task<ResponseMessage> SendAsync(
((CosmosTraceDiagnostics)response.Diagnostics).Value.AddOrUpdateDatum("ExcludedRegions", request.RequestOptions.ExcludeRegions);
}

if (ConfigurationManager.IsBinaryEncodingEnabled()
&& RequestInvokerHandler.IsPointOperationSupportedForBinaryEncoding(request)
&& response.Content != null
&& response.Content is not CloneableStream)
{
response.Content = await StreamExtension.AsClonableStreamAsync(response.Content, default);
}

return response;
}

Expand Down Expand Up @@ -547,6 +562,16 @@ private static bool IsItemNoRepsonseSet(bool enableContentResponseOnWrite, Opera
operationType == OperationType.Patch);
}

private static bool IsPointOperationSupportedForBinaryEncoding(RequestMessage request)
{
return request.ResourceType == ResourceType.Document
&& (request.OperationType == OperationType.Create
|| request.OperationType == OperationType.Replace
|| request.OperationType == OperationType.Delete
|| request.OperationType == OperationType.Read
|| request.OperationType == OperationType.Upsert);
}

private static bool IsClientNoResponseSet(CosmosClientOptions clientOptions, OperationType operationType)
{
return clientOptions != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public override byte[] ReadAsBytes()
public override DateTime? ReadAsDateTime()
{
this.Read();
if (this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
if (this.jsonReader.CurrentTokenType == JsonTokenType.Null || this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
{
return null;
}
Expand All @@ -211,7 +211,7 @@ public override byte[] ReadAsBytes()
public override DateTimeOffset? ReadAsDateTimeOffset()
{
this.Read();
if (this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
if (this.jsonReader.CurrentTokenType == JsonTokenType.Null || this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
{
return null;
}
Expand Down Expand Up @@ -260,7 +260,7 @@ public override byte[] ReadAsBytes()
public override string ReadAsString()
{
this.Read();
if (this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
if (this.jsonReader.CurrentTokenType == JsonTokenType.Null || this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
{
return null;
}
Expand All @@ -278,7 +278,7 @@ public override string ReadAsString()
private double? ReadNumberValue()
{
this.Read();
if (this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
if (this.jsonReader.CurrentTokenType == JsonTokenType.Null || this.jsonReader.CurrentTokenType == JsonTokenType.EndArray)
{
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ public override void WriteValue(bool value)
/// <param name="value">The <see cref="short"/> value to write.</param>
public override void WriteValue(short value)
{
base.WriteValue((long)value);
base.WriteValue((Int16)value);
this.jsonWriter.WriteInt16Value(value);
}

/// <summary>
Expand Down
19 changes: 19 additions & 0 deletions Microsoft.Azure.Cosmos/src/RequestOptions/ItemRequestOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,25 @@ public ConsistencyLevel? ConsistencyLevel
/// </remarks>
public DedicatedGatewayRequestOptions DedicatedGatewayRequestOptions { get; set; }

/// <summary>
/// Gets or sets the boolean to enable binary response for point operations like Create, Upsert, Read, Patch, and Replace.
/// Setting this option to true will cause the response to be in binary format. This request option will remain internal only
/// since the consumer of thie flag will be the internal components of the cosmos db ecosystem.
/// </summary>
/// <example>
/// <code language="c#">
/// <![CDATA[
/// ItemRequestOptions requestOptions = new ItemRequestOptions() { EnableBinaryResponseOnPointOperations = true };
/// ResponseMessage responseMessage = await container.CreateItemStreamAsync(createStream, new Cosmos.PartitionKey(comment.pk), requestOptions);
/// Assert.AreEqual(HttpStatusCode.Created, responseMessage.StatusCode);
/// ]]>
/// </code>
/// </example>
/// <remarks>
/// This is optimal for workloads where the returned resource can be processed in binary format.
/// </remarks>
internal bool EnableBinaryResponseOnPointOperations { get; set; }

/// <summary>
/// Fill the CosmosRequestMessage headers with the set properties
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public async Task<ResponseMessage> CreateItemStreamAsync(
streamPayload: streamPayload,
operationType: OperationType.Create,
requestOptions: requestOptions,
trace: trace,
trace: trace,
targetResponseSerializationFormat: JsonSerializationFormat.Text,
cancellationToken: cancellationToken);
}

Expand Down Expand Up @@ -100,7 +101,8 @@ public async Task<ResponseMessage> ReadItemStreamAsync(
streamPayload: null,
operationType: OperationType.Read,
requestOptions: requestOptions,
trace: trace,
trace: trace,
targetResponseSerializationFormat: JsonSerializationFormat.Text,
cancellationToken: cancellationToken);
}

Expand All @@ -117,7 +119,8 @@ public async Task<ItemResponse<T>> ReadItemAsync<T>(
streamPayload: null,
operationType: OperationType.Read,
requestOptions: requestOptions,
trace: trace,
trace: trace,
targetResponseSerializationFormat: default,
cancellationToken: cancellationToken);

return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
Expand All @@ -136,7 +139,8 @@ public async Task<ResponseMessage> UpsertItemStreamAsync(
streamPayload: streamPayload,
operationType: OperationType.Upsert,
requestOptions: requestOptions,
trace: trace,
trace: trace,
targetResponseSerializationFormat: JsonSerializationFormat.Text,
cancellationToken: cancellationToken);
}

Expand Down Expand Up @@ -178,7 +182,8 @@ public async Task<ResponseMessage> ReplaceItemStreamAsync(
streamPayload: streamPayload,
operationType: OperationType.Replace,
requestOptions: requestOptions,
trace: trace,
trace: trace,
targetResponseSerializationFormat: JsonSerializationFormat.Text,
cancellationToken: cancellationToken);
}

Expand Down Expand Up @@ -225,7 +230,8 @@ public async Task<ResponseMessage> DeleteItemStreamAsync(
streamPayload: null,
operationType: OperationType.Delete,
requestOptions: requestOptions,
trace: trace,
trace: trace,
targetResponseSerializationFormat: JsonSerializationFormat.Text,
cancellationToken: cancellationToken);
}

Expand All @@ -242,7 +248,8 @@ public async Task<ItemResponse<T>> DeleteItemAsync<T>(
streamPayload: null,
operationType: OperationType.Delete,
requestOptions: requestOptions,
trace: trace,
trace: trace,
targetResponseSerializationFormat: default,
cancellationToken: cancellationToken);

return this.ClientContext.ResponseFactory.CreateItemResponse<T>(response);
Expand Down Expand Up @@ -853,7 +860,8 @@ private async Task<ResponseMessage> ExtractPartitionKeyAndProcessItemStreamAsync
itemStream,
operationType,
requestOptions,
trace: trace,
trace: trace,
targetResponseSerializationFormat: default,
cancellationToken: cancellationToken);
}

Expand All @@ -868,7 +876,8 @@ private async Task<ResponseMessage> ExtractPartitionKeyAndProcessItemStreamAsync
itemStream,
operationType,
requestOptions,
trace: trace,
trace: trace,
targetResponseSerializationFormat: default,
cancellationToken: cancellationToken);

if (responseMessage.IsSuccessStatusCode)
Expand Down Expand Up @@ -897,7 +906,8 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(
Stream streamPayload,
OperationType operationType,
ItemRequestOptions requestOptions,
ITrace trace,
ITrace trace,
JsonSerializationFormat? targetResponseSerializationFormat,
CancellationToken cancellationToken)
{
if (trace == null)
Expand All @@ -912,6 +922,11 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(

ContainerInternal.ValidatePartitionKey(partitionKey, requestOptions);
string resourceUri = this.GetResourceUri(requestOptions, operationType, itemId);

// Convert Text to Binary Stream.
streamPayload = CosmosSerializationUtil.TrySerializeStreamToTargetFormat(
targetSerializationFormat: ContainerCore.GetTargetRequestSerializationFormat(),
inputStream: streamPayload == null ? null : await StreamExtension.AsClonableStreamAsync(streamPayload));

ResponseMessage responseMessage = await this.ClientContext.ProcessResourceOperationStreamAsync(
resourceUri: resourceUri,
Expand All @@ -925,6 +940,16 @@ private async Task<ResponseMessage> ProcessItemStreamAsync(
requestEnricher: null,
trace: trace,
cancellationToken: cancellationToken);

// Convert Binary Stream to Text.
if (targetResponseSerializationFormat.HasValue
&& (requestOptions == null || !requestOptions.EnableBinaryResponseOnPointOperations)
&& responseMessage?.Content is CloneableStream outputCloneableStream)
{
responseMessage.Content = CosmosSerializationUtil.TrySerializeStreamToTargetFormat(
targetSerializationFormat: targetResponseSerializationFormat.Value,
inputStream: outputCloneableStream);
}

return responseMessage;
}
Expand Down Expand Up @@ -1217,7 +1242,8 @@ public Task<ResponseMessage> PatchItemStreamAsync(
streamPayload: streamPayload,
operationType: OperationType.Patch,
requestOptions: requestOptions,
trace: trace,
trace: trace,
targetResponseSerializationFormat: JsonSerializationFormat.Text,
cancellationToken: cancellationToken);
}

Expand Down Expand Up @@ -1255,6 +1281,13 @@ private ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilderPrivate(
applyBuilderConfiguration: changeFeedProcessor.ApplyBuildConfiguration).WithChangeFeedMode(mode);
}

private static JsonSerializationFormat GetTargetRequestSerializationFormat()
{
return ConfigurationManager.IsBinaryEncodingEnabled()
? JsonSerializationFormat.Binary
: JsonSerializationFormat.Text;
}

/// <summary>
/// This method is useful for determining if a smaller, more granular feed range (y) is fully contained within a broader feed range (x), which is a common operation in distributed systems to manage partitioned data.
///
Expand Down
Loading

0 comments on commit f6ae4c4

Please sign in to comment.