Skip to content

Commit

Permalink
incorporate code review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
neildsh committed May 17, 2024
1 parent 01ae3d6 commit 9faa445
Showing 1 changed file with 19 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public InitializationParameters(
private enum ExecutionState
{
Uninitialized,
Initialized
Initialized,
Done
}

public static TryCatch<IQueryPipelineStage> MonadicCreate(
Expand Down Expand Up @@ -1673,15 +1674,12 @@ private sealed class NonStreamingOrderByPipelineStage : IQueryPipelineStage

private BufferedOrderByResults bufferedResults;

private bool firstPage;

public TryCatch<QueryPage> Current { get; private set; }

private NonStreamingOrderByPipelineStage(InitializationParameters parameters, int pageSize)
{
this.parameters = parameters ?? throw new ArgumentNullException(nameof(parameters));
this.pageSize = pageSize;
this.firstPage = true;
this.executionState = ExecutionState.Uninitialized;
}

Expand All @@ -1693,11 +1691,19 @@ public ValueTask DisposeAsync()

public async ValueTask<bool> MoveNextAsync(ITrace trace, CancellationToken cancellationToken)
{
if (this.executionState == ExecutionState.Done)
{
return false;
}

cancellationToken.ThrowIfCancellationRequested();

bool firstPage = false;
if (this.executionState == ExecutionState.Uninitialized)
{
await this.MoveNextAsync_InitializeAsync(trace, cancellationToken);
firstPage = true;
this.bufferedResults = await this.MoveNextAsync_InitializeAsync(trace, cancellationToken);
this.executionState = ExecutionState.Initialized;
}

List<CosmosElement> documents = new List<CosmosElement>(this.pageSize);
Expand All @@ -1706,9 +1712,9 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace, CancellationToken cance
documents.Add(this.bufferedResults.Enumerator.Current.Payload);
}

if (this.firstPage || documents.Count > 0)
if (firstPage || documents.Count > 0)
{
double requestCharge = this.firstPage ? this.bufferedResults.TotalRequestCharge : 0;
double requestCharge = firstPage ? this.bufferedResults.TotalRequestCharge : 0;
QueryPage queryPage = new QueryPage(
documents: documents,
requestCharge: requestCharge,
Expand All @@ -1720,17 +1726,17 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace, CancellationToken cance
state: documents.Count > 0 ? NonStreamingOrderByInProgress : null,
streaming: false);

this.firstPage = false;
this.Current = TryCatch<QueryPage>.FromResult(queryPage);
return true;
}
else
{
this.executionState = ExecutionState.Done;
return false;
}
}

private async Task MoveNextAsync_InitializeAsync(ITrace trace, CancellationToken cancellationToken)
private async Task<BufferedOrderByResults> MoveNextAsync_InitializeAsync(ITrace trace, CancellationToken cancellationToken)
{
ITracingAsyncEnumerator<TryCatch<OrderByQueryPage>> enumerator = await OrderByCrossPartitionRangePageEnumerator.CreateAsync(
this.parameters.DocumentContainer,
Expand All @@ -1752,8 +1758,7 @@ private async Task MoveNextAsync_InitializeAsync(ITrace trace, CancellationToken
trace,
cancellationToken);

this.bufferedResults = bufferedResults;
this.executionState = ExecutionState.Initialized;
return bufferedResults;
}

public static IQueryPipelineStage Create(
Expand Down Expand Up @@ -1817,7 +1822,7 @@ public static async Task<ITracingAsyncEnumerator<TryCatch<OrderByQueryPage>>> Cr
OrderByQueryPartitionRangePageAsyncEnumerator enumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create(
documentContainer,
sqlQuerySpec,
new FeedRangeState<QueryState>(range, null),
new FeedRangeState<QueryState>(range, state: null),
partitionKey,
queryPaginationOptions,
filter: null,
Expand Down Expand Up @@ -1853,6 +1858,7 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace, CancellationToken cance
{
while (this.enumeratorsAndTokens.Count > 0)
{
cancellationToken.ThrowIfCancellationRequested();
(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token) = this.enumeratorsAndTokens.Dequeue();
if (await enumerator.MoveNextAsync(trace, cancellationToken))
{
Expand All @@ -1861,7 +1867,7 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace, CancellationToken cance
OrderByContinuationToken continuationToken;
if (enumerator.Current.Result.Page.Documents.Count > 0)
{
// Use the token for the next page, since we fully drained the enumerator.
// Use the token for the next page, since we fully drained the page.
continuationToken = enumerator.FeedRangeState.State?.Value != null ?
CreateOrderByContinuationToken(
new ParallelContinuationToken(
Expand Down

0 comments on commit 9faa445

Please sign in to comment.