diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuablePagedFluxCore.java b/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuablePagedFluxCore.java index f9836c46df3b..bdef0052902d 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuablePagedFluxCore.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/util/paging/ContinuablePagedFluxCore.java @@ -135,17 +135,13 @@ public void subscribe(CoreSubscriber coreSubscriber) { * @param provider the provider that when called returns Page Retriever Function * @param continuationToken the token to identify the pages to be retrieved * @param pageSize the preferred page size - * @param the type of Continuation token - * @param The type of items in a {@link ContinuablePage} - * @param

The {@link ContinuablePage} holding items of type {@code T} * @return a Flux of {@link ContinuablePage} identified by the given continuation token */ - private static > Flux

byPage(Supplier> provider, - C continuationToken, Integer pageSize) { + private Flux

byPage(Supplier> provider, C continuationToken, Integer pageSize) { return Flux.defer(() -> { final PageRetriever pageRetriever = provider.get(); final ContinuationState state = new ContinuationState<>(continuationToken); - return concatFluxOfPage(state, pageRetriever, pageSize); + return retrievePages(state, pageRetriever, pageSize); }); } @@ -156,13 +152,23 @@ private static > Flux

byPage(Supplier

the type of Continuation token - * @param The type of items in a {@link ContinuablePage} - * @param

The {@link ContinuablePage} holding items of type {@code T} * @return a Flux of {@link ContinuablePage} */ - private static > Flux

concatFluxOfPage(ContinuationState state, - PageRetriever pageRetriever, Integer pageSize) { + private Flux

retrievePages(ContinuationState state, PageRetriever pageRetriever, Integer pageSize) { + /* + * The second argument for 'expand' is an initial capacity hint to the expand subscriber to indicate what size + * buffer it should instantiate. 4 is used as PageRetriever's 'get' returns a Flux so an implementation may + * return multiple pages, but in the case only one page is retrieved the buffer won't need to be resized or + * request additional pages from the service. + */ + return retrievePage(state, pageRetriever, pageSize) + .expand(page -> { + state.setLastContinuationToken(page.getContinuationToken()); + return Flux.defer(() -> retrievePage(state, pageRetriever, pageSize)); + }, 4); + } + + private Flux

retrievePage(ContinuationState state, PageRetriever pageRetriever, Integer pageSize) { if (state.isDone()) { return Flux.empty(); } else { @@ -170,9 +176,7 @@ private static > Flux

concatFluxOfPage( .switchIfEmpty(Flux.defer(() -> { state.setLastContinuationToken(null); return Mono.empty(); - })) - .doOnNext(page -> state.setLastContinuationToken(page.getContinuationToken())) - .concatWith(Flux.defer(() -> concatFluxOfPage(state, pageRetriever, pageSize))); + })); } } }