Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,16 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate.Aggregators;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination;
using Microsoft.Azure.Cosmos.Tracing;
using static IndexUtilizationHelper;

/// <summary>
/// Stage that is able to aggregate local aggregates from multiple continuations and partitions.
/// At a high level aggregates queries only return a "partial" aggregate.
/// "partial" means that the result is only valid for that one continuation (and one partition).
/// For example suppose you have the query "SELECT COUNT(1) FROM c" and you have a single partition collection,
/// then you will get one count for each continuation of the query.
/// If you wanted the true result for this query, then you will have to take the sum of all continuations.
/// The reason why we have multiple continuations is because for a long running query we have to break up the results into multiple continuations.
/// Fortunately all the aggregates can be aggregated across continuations and partitions.
/// </summary>
internal abstract partial class AggregateQueryPipelineStage : QueryPipelineStageBase
{
internal class AggregateQueryPipelineStage : QueryPipelineStageBase
{
/// <summary>
/// This class does most of the work, since a query like:
///
Expand All @@ -47,14 +41,86 @@ internal abstract partial class AggregateQueryPipelineStage : QueryPipelineStage
/// <param name="singleGroupAggregator">The single group aggregator that we will feed results into.</param>
/// <param name="isValueQuery">Whether or not the query has the 'VALUE' keyword.</param>
/// <remarks>This constructor is private since there is some async initialization that needs to happen in CreateAsync().</remarks>
public AggregateQueryPipelineStage(
private AggregateQueryPipelineStage(
IQueryPipelineStage source,
SingleGroupAggregator singleGroupAggregator,
bool isValueQuery)
: base(source)
{
this.singleGroupAggregator = singleGroupAggregator ?? throw new ArgumentNullException(nameof(singleGroupAggregator));
this.isValueQuery = isValueQuery;
}

public override async ValueTask<bool> MoveNextAsync(ITrace trace, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

if (trace == null)
{
throw new ArgumentNullException(nameof(trace));
}

if (this.returnedFinalPage)
{
return false;
}

// Note-2016-10-25-felixfan: Given what we support now, we should expect to return only 1 document.
// Note-2019-07-11-brchon: We can return empty pages until all the documents are drained,
// but then we will have to design a continuation token.

double requestCharge = 0;
IReadOnlyDictionary<string, string> cumulativeAdditionalHeaders = default;

while (await this.inputStage.MoveNextAsync(trace, cancellationToken))
{
TryCatch<QueryPage> tryGetPageFromSource = this.inputStage.Current;
if (tryGetPageFromSource.Failed)
{
this.Current = tryGetPageFromSource;
return true;
}

QueryPage sourcePage = tryGetPageFromSource.Result;

requestCharge += sourcePage.RequestCharge;

cumulativeAdditionalHeaders = AccumulateIndexUtilization(
cumulativeHeaders: cumulativeAdditionalHeaders,
currentHeaders: sourcePage.AdditionalHeaders);

foreach (CosmosElement element in sourcePage.Documents)
{
cancellationToken.ThrowIfCancellationRequested();

RewrittenAggregateProjections rewrittenAggregateProjections = new RewrittenAggregateProjections(
this.isValueQuery,
element);
this.singleGroupAggregator.AddValues(rewrittenAggregateProjections.Payload);
}
}

List<CosmosElement> finalResult = new List<CosmosElement>();
CosmosElement aggregationResult = this.singleGroupAggregator.GetResult();
if (aggregationResult != null)
{
finalResult.Add(aggregationResult);
}

QueryPage queryPage = new QueryPage(
documents: finalResult,
requestCharge: requestCharge,
activityId: default,
cosmosQueryExecutionInfo: default,
distributionPlanSpec: default,
disallowContinuationTokenMessage: default,
additionalHeaders: cumulativeAdditionalHeaders,
state: default,
streaming: default);

this.Current = TryCatch<QueryPage>.FromResult(queryPage);
this.returnedFinalPage = true;
return true;
}

public static TryCatch<IQueryPipelineStage> MonadicCreate(
Expand All @@ -64,14 +130,35 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
bool hasSelectValue,
CosmosElement continuationToken,
MonadicCreatePipelineStage monadicCreatePipelineStage)
{
return ClientAggregateQueryPipelineStage.MonadicCreate(
aggregates,
aliasToAggregateType,
orderedAliases,
hasSelectValue,
continuationToken,
monadicCreatePipelineStage);
{
if (monadicCreatePipelineStage == null)
{
throw new ArgumentNullException(nameof(monadicCreatePipelineStage));
}

TryCatch<SingleGroupAggregator> tryCreateSingleGroupAggregator = SingleGroupAggregator.TryCreate(
aggregates,
aliasToAggregateType,
orderedAliases,
hasSelectValue,
continuationToken: null);
if (tryCreateSingleGroupAggregator.Failed)
{
return TryCatch<IQueryPipelineStage>.FromException(tryCreateSingleGroupAggregator.Exception);
}

TryCatch<IQueryPipelineStage> tryCreateSource = monadicCreatePipelineStage(continuationToken);
if (tryCreateSource.Failed)
{
return tryCreateSource;
}

AggregateQueryPipelineStage stage = new AggregateQueryPipelineStage(
tryCreateSource.Result,
tryCreateSingleGroupAggregator.Result,
hasSelectValue);

return TryCatch<IQueryPipelineStage>.FromResult(stage);
}

/// <summary>
Expand Down Expand Up @@ -113,6 +200,6 @@ public RewrittenAggregateProjections(bool isValueAggregateQuery, CosmosElement r
}

public CosmosElement Payload { get; }
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,6 @@ public static TryCatch<IAggregator> TryCreate(CosmosElement continuationToken)
return TryCatch<IAggregator>.FromResult(new AverageAggregator(averageInfo));
}

public CosmosElement GetCosmosElementContinuationToken()
{
return AverageInfo.ToCosmosElement(this.globalAverage);
}

/// <summary>
/// Struct that stores a weighted average as a sum and count so they that average across different partitions with different numbers of documents can be taken.
/// </summary>
Expand All @@ -104,19 +99,6 @@ public AverageInfo(double? sum, long count)
this.Count = count;
}

public static CosmosElement ToCosmosElement(AverageInfo averageInfo)
{
Dictionary<string, CosmosElement> dictionary = new Dictionary<string, CosmosElement>();
if (averageInfo.Sum.HasValue)
{
dictionary.Add(AverageInfo.SumName, CosmosNumber64.Create(averageInfo.Sum.Value));
}

dictionary.Add(AverageInfo.CountName, CosmosNumber64.Create(averageInfo.Count));

return CosmosObject.Create(dictionary);
}

/// <summary>
/// Initializes a new instance of the AverageInfo class.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ public CosmosElement GetResult()
return CosmosNumber64.Create(this.globalCount);
}

public string GetContinuationToken()
{
return this.globalCount.ToString(CultureInfo.InvariantCulture);
}

public static TryCatch<IAggregator> TryCreate(CosmosElement continuationToken)
{
long partialCount;
Expand All @@ -82,10 +77,5 @@ public static TryCatch<IAggregator> TryCreate(CosmosElement continuationToken)
return TryCatch<IAggregator>.FromResult(
new CountAggregator(initialCount: partialCount));
}

public CosmosElement GetCosmosElementContinuationToken()
{
return CosmosNumber64.Create(this.globalCount);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,5 @@ internal interface IAggregator
/// </summary>
/// <returns>The result of the aggregation.</returns>
CosmosElement GetResult();

CosmosElement GetCosmosElementContinuationToken();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,5 @@ public static TryCatch<IAggregator> TryCreate(CosmosElement continuationToken)

return TryCatch<IAggregator>.FromResult(new MakeListAggregator(initialList: partialList));
}

public CosmosElement GetCosmosElementContinuationToken()
{
return this.GetResult();
}
}
}
Loading