From c40873eeeea13f1fb2c6dbe641d087b6a1e612a1 Mon Sep 17 00:00:00 2001 From: Dongie Agnir Date: Mon, 5 Jan 2026 15:17:24 -0800 Subject: [PATCH 1/4] Fix stackoverflow with large pages in paginator Previously, signalig onNext() to the subscriber was done via recursion, pulling elements from an iterator over the current page returned by the service. However, this can quickly lead to a stackoverflow error since the stack will grow linearly with the size of the page. This commit fixes this issue by using SdkPublisher's builtin flatMapIterable(), which uses a loop to signal onNext(), and also ensures that it does not call itself recursively. fixes #6411 --- .../bugfix-AWSSDKforJavav2-03a897a.json | 6 ++ .../paginators/AsyncResponseClassSpec.java | 27 +++-- ...nWithResultKeyAndMoreResultsPublisher.java | 14 +-- ...inatedOperationWithResultKeyPublisher.java | 13 +-- .../SameTokenPaginationApiPublisher.java | 17 +-- .../async/PaginatedItemsPublisher.java | 99 ----------------- .../async/PaginatedItemsPublisherTckTest.java | 91 ---------------- .../test/java/utils/SdkSubscriberTest.java | 102 ++++++++---------- .../paginators/paginators-1.json | 10 ++ .../paginators/service-2.json | 49 +++++++++ .../services/paginators/PaginatorsTest.java | 90 ++++++++++++++++ 11 files changed, 217 insertions(+), 301 deletions(-) create mode 100644 .changes/next-release/bugfix-AWSSDKforJavav2-03a897a.json delete mode 100644 core/sdk-core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisher.java delete mode 100644 core/sdk-core/src/test/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisherTckTest.java create mode 100644 test/codegen-generated-classes-test/src/main/resources/codegen-resources/paginators/paginators-1.json create mode 100644 test/codegen-generated-classes-test/src/main/resources/codegen-resources/paginators/service-2.json create mode 100644 test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/paginators/PaginatorsTest.java diff --git a/.changes/next-release/bugfix-AWSSDKforJavav2-03a897a.json b/.changes/next-release/bugfix-AWSSDKforJavav2-03a897a.json new file mode 100644 index 000000000000..f98e0facec32 --- /dev/null +++ b/.changes/next-release/bugfix-AWSSDKforJavav2-03a897a.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS SDK for Java v2", + "contributor": "", + "description": "Fix an issue where `StackOverflowError` can occur when iterating over large pages from an async paginator. This can manifest as the publisher hanging/never reaching the end of the stream. Fixes [#6411](https://github.com/aws/aws-sdk-java-v2/issues/6411)." +} diff --git a/codegen/src/main/java/software/amazon/awssdk/codegen/poet/paginators/AsyncResponseClassSpec.java b/codegen/src/main/java/software/amazon/awssdk/codegen/poet/paginators/AsyncResponseClassSpec.java index 9ef71caa5597..0fc6f9bfa5a5 100644 --- a/codegen/src/main/java/software/amazon/awssdk/codegen/poet/paginators/AsyncResponseClassSpec.java +++ b/codegen/src/main/java/software/amazon/awssdk/codegen/poet/paginators/AsyncResponseClassSpec.java @@ -24,11 +24,9 @@ import com.squareup.javapoet.TypeSpec; import com.squareup.javapoet.WildcardTypeName; import java.util.Collections; -import java.util.Iterator; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.lang.model.element.Modifier; @@ -40,7 +38,6 @@ import software.amazon.awssdk.codegen.poet.PoetUtils; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher; -import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher; import software.amazon.awssdk.core.pagination.async.ResponsesSubscription; /** @@ -200,21 +197,23 @@ private MethodSpec getMethodsSpecForSingleResultKey(String resultKey) { return null; } + String fluentGetter = fluentGetterMethodForResponseMember(resultKey); + + CodeBlock.Builder iterableFnBuilder = CodeBlock.builder() + .add("$1N -> $1N.$2L", RESPONSE_LITERAL, fluentGetter); + + String fluentGetterMethodName = resultKeyModel.getFluentGetterMethodName(); + + if (resultKeyModel.isMap()) { + iterableFnBuilder.add(".entrySet()"); + } + TypeName resultKeyType = getTypeForResultKey(resultKey); - return MethodSpec.methodBuilder(resultKeyModel.getFluentGetterMethodName()) + return MethodSpec.methodBuilder(fluentGetterMethodName) .addModifiers(Modifier.PUBLIC, Modifier.FINAL) .returns(ParameterizedTypeName.get(ClassName.get(SdkPublisher.class), resultKeyType)) - .addCode("$T getIterator = ", - ParameterizedTypeName.get(ClassName.get(Function.class), - responseType(), - ParameterizedTypeName.get(ClassName.get(Iterator.class), - resultKeyType))) - .addCode(getIteratorLambdaBlock(resultKey, resultKeyModel)) - .addCode("\n") - .addStatement("return $1T.builder().$2L(new $3L()).iteratorFunction(getIterator).$4L($4L).build()", - PaginatedItemsPublisher.class, NEXT_PAGE_FETCHER_MEMBER, nextPageFetcherClassName(), - LAST_PAGE_FIELD) + .addStatement("return this.flatMapIterable($L)", iterableFnBuilder.build()) .addJavadoc(CodeBlock.builder() .add("Returns a publisher that can be used to get a stream of data. You need to " + "subscribe to the publisher to request the stream of data. The publisher " diff --git a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyAndMoreResultsPublisher.java b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyAndMoreResultsPublisher.java index 19d36c210d7f..384ae8b0342b 100644 --- a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyAndMoreResultsPublisher.java +++ b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyAndMoreResultsPublisher.java @@ -1,14 +1,10 @@ package software.amazon.awssdk.services.jsonprotocoltests.paginators; -import java.util.Collections; -import java.util.Iterator; import java.util.concurrent.CompletableFuture; -import java.util.function.Function; import org.reactivestreams.Subscriber; import software.amazon.awssdk.annotations.Generated; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher; -import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher; import software.amazon.awssdk.core.pagination.async.ResponsesSubscription; import software.amazon.awssdk.services.jsonprotocoltests.JsonProtocolTestsAsyncClient; import software.amazon.awssdk.services.jsonprotocoltests.internal.UserAgentUtils; @@ -107,15 +103,7 @@ public void subscribe(Subscriber items() { - Function> getIterator = response -> { - if (response != null && response.items() != null) { - return response.items().iterator(); - } - return Collections.emptyIterator(); - }; - return PaginatedItemsPublisher.builder() - .nextPageFetcher(new PaginatedOperationWithResultKeyAndMoreResultsResponseFetcher()) - .iteratorFunction(getIterator).isLastPage(isLastPage).build(); + return this.flatMapIterable(response -> response.items()); } private class PaginatedOperationWithResultKeyAndMoreResultsResponseFetcher implements diff --git a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyPublisher.java b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyPublisher.java index 8a86197f3e86..717a80fbf88c 100644 --- a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyPublisher.java +++ b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/PaginatedOperationWithResultKeyPublisher.java @@ -1,14 +1,10 @@ package software.amazon.awssdk.services.jsonprotocoltests.paginators; -import java.util.Collections; -import java.util.Iterator; import java.util.concurrent.CompletableFuture; -import java.util.function.Function; import org.reactivestreams.Subscriber; import software.amazon.awssdk.annotations.Generated; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher; -import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher; import software.amazon.awssdk.core.pagination.async.ResponsesSubscription; import software.amazon.awssdk.core.util.PaginatorUtils; import software.amazon.awssdk.services.jsonprotocoltests.JsonProtocolTestsAsyncClient; @@ -107,14 +103,7 @@ public void subscribe(Subscriber items() { - Function> getIterator = response -> { - if (response != null && response.items() != null) { - return response.items().iterator(); - } - return Collections.emptyIterator(); - }; - return PaginatedItemsPublisher.builder().nextPageFetcher(new PaginatedOperationWithResultKeyResponseFetcher()) - .iteratorFunction(getIterator).isLastPage(isLastPage).build(); + return this.flatMapIterable(response -> response.items()); } private class PaginatedOperationWithResultKeyResponseFetcher implements diff --git a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/customizations/SameTokenPaginationApiPublisher.java b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/customizations/SameTokenPaginationApiPublisher.java index 921ba3cfdd96..7e17cb44ee05 100644 --- a/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/customizations/SameTokenPaginationApiPublisher.java +++ b/codegen/src/test/resources/software/amazon/awssdk/codegen/poet/paginators/customizations/SameTokenPaginationApiPublisher.java @@ -1,14 +1,10 @@ package software.amazon.awssdk.services.jsonprotocoltests.paginators; -import java.util.Collections; -import java.util.Iterator; import java.util.concurrent.CompletableFuture; -import java.util.function.Function; import org.reactivestreams.Subscriber; import software.amazon.awssdk.annotations.Generated; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher; -import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher; import software.amazon.awssdk.core.pagination.async.ResponsesSubscription; import software.amazon.awssdk.core.util.PaginatorUtils; import software.amazon.awssdk.services.jsonprotocoltests.JsonProtocolTestsAsyncClient; @@ -85,7 +81,7 @@ public SameTokenPaginationApiPublisher(JsonProtocolTestsAsyncClient client, Same } private SameTokenPaginationApiPublisher(JsonProtocolTestsAsyncClient client, SameTokenPaginationApiRequest firstRequest, - boolean isLastPage) { + boolean isLastPage) { this.client = client; this.firstRequest = firstRequest; this.isLastPage = isLastPage; @@ -94,7 +90,7 @@ private SameTokenPaginationApiPublisher(JsonProtocolTestsAsyncClient client, Sam @Override public void subscribe(Subscriber subscriber) { subscriber.onSubscribe(ResponsesSubscription.builder().subscriber(subscriber) - .nextPageFetcher(new SameTokenPaginationApiResponseFetcher()).build()); + .nextPageFetcher(new SameTokenPaginationApiResponseFetcher()).build()); } /** @@ -103,14 +99,7 @@ public void subscribe(Subscriber subscri * and then applies that consumer to each response returned by the service. */ public final SdkPublisher items() { - Function> getIterator = response -> { - if (response != null && response.items() != null) { - return response.items().iterator(); - } - return Collections.emptyIterator(); - }; - return PaginatedItemsPublisher.builder().nextPageFetcher(new SameTokenPaginationApiResponseFetcher()) - .iteratorFunction(getIterator).isLastPage(isLastPage).build(); + return this.flatMapIterable(response -> response.items()); } private class SameTokenPaginationApiResponseFetcher implements AsyncPageFetcher { diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisher.java deleted file mode 100644 index dea160c65b96..000000000000 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisher.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.core.pagination.async; - -import java.util.Iterator; -import java.util.function.Function; -import org.reactivestreams.Subscriber; -import software.amazon.awssdk.annotations.SdkProtectedApi; -import software.amazon.awssdk.core.async.SdkPublisher; -import software.amazon.awssdk.core.internal.pagination.async.ItemsSubscription; - -/** - * A publisher to request for a stream of paginated items. The class can be used to request data for paginated items - * across multiple pages. - * - * @param The type of a single response page - * @param The type of paginated member in a response page - */ -@SdkProtectedApi -public final class PaginatedItemsPublisher implements SdkPublisher { - - private final AsyncPageFetcher nextPageFetcher; - - private final Function> getIteratorFunction; - - private final boolean isLastPage; - - private PaginatedItemsPublisher(BuilderImpl builder) { - this.nextPageFetcher = builder.nextPageFetcher; - this.getIteratorFunction = builder.iteratorFunction; - this.isLastPage = builder.isLastPage; - } - - public static Builder builder() { - return new BuilderImpl(); - } - - @Override - public void subscribe(Subscriber subscriber) { - subscriber.onSubscribe(isLastPage ? new EmptySubscription(subscriber) - : ItemsSubscription.builder() - .subscriber(subscriber) - .nextPageFetcher(nextPageFetcher) - .iteratorFunction(getIteratorFunction) - .build()); - } - - public interface Builder { - Builder nextPageFetcher(AsyncPageFetcher nextPageFetcher); - - Builder iteratorFunction(Function iteratorFunction); - - Builder isLastPage(boolean isLastPage); - - PaginatedItemsPublisher build(); - } - - private static final class BuilderImpl implements Builder { - private AsyncPageFetcher nextPageFetcher; - private Function iteratorFunction; - private boolean isLastPage; - - @Override - public Builder nextPageFetcher(AsyncPageFetcher nextPageFetcher) { - this.nextPageFetcher = nextPageFetcher; - return this; - } - - @Override - public Builder iteratorFunction(Function iteratorFunction) { - this.iteratorFunction = iteratorFunction; - return this; - } - - @Override - public Builder isLastPage(boolean isLastPage) { - this.isLastPage = isLastPage; - return this; - } - - @Override - public PaginatedItemsPublisher build() { - return new PaginatedItemsPublisher(this); - } - } -} diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisherTckTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisherTckTest.java deleted file mode 100644 index 80df7e5ff018..000000000000 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisherTckTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.core.pagination.async; - -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.LongStream; -import org.reactivestreams.Publisher; -import org.reactivestreams.tck.PublisherVerification; -import org.reactivestreams.tck.TestEnvironment; - -/** - * TCK verification test for {@link PaginatedItemsPublisher}. - */ -public class PaginatedItemsPublisherTckTest extends PublisherVerification { - - public PaginatedItemsPublisherTckTest() { - super(new TestEnvironment()); - } - - @Override - public Publisher createPublisher(long l) { - Function, Iterator> getIterator = response -> response != null ? response.iterator() - : Collections.emptyIterator(); - - return PaginatedItemsPublisher.builder() - .nextPageFetcher(new PageFetcher(l, 5)) - .iteratorFunction(getIterator) - .isLastPage(false) - .build(); - } - - @Override - public Publisher createFailedPublisher() { - // It's not possible to initialize PaginatedItemsPublisher to a failed - // state since we can only reach a failed state if we fail to fulfill a - // request, e.g. because the service returned an error response. - - // return null to skip related tests - return null; - } - - /** - * Simple {@link AsyncPageFetcher} that returns lists of longs as pages. - */ - private static class PageFetcher implements AsyncPageFetcher> { - private final long maxVal; - private final long step; - - private PageFetcher(long maxVal, long step) { - this.maxVal = maxVal; - this.step = step; - } - - @Override - public boolean hasNextPage(List oldPage) { - return (lastElement(oldPage)) < maxVal - 1; - } - - @Override - public CompletableFuture> nextPage(List oldPage) { - long i = lastElement(oldPage) + 1; - long j = Math.min(i + step, maxVal); - List stream = LongStream.range(i, j).boxed().collect(Collectors.toList()); - return CompletableFuture.completedFuture(stream); - } - - private long lastElement(List s) { - // first page is always null - if (s == null) return -1; - return s.get(s.size() - 1); - } - } -} diff --git a/core/sdk-core/src/test/java/utils/SdkSubscriberTest.java b/core/sdk-core/src/test/java/utils/SdkSubscriberTest.java index 0f46f4fdfa19..f4dbe3ef3014 100644 --- a/core/sdk-core/src/test/java/utils/SdkSubscriberTest.java +++ b/core/sdk-core/src/test/java/utils/SdkSubscriberTest.java @@ -15,6 +15,7 @@ package utils; +import io.reactivex.Flowable; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -22,8 +23,7 @@ import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import org.reactivestreams.Subscriber; -import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher; -import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher; +import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.utils.async.LimitingSubscriber; import software.amazon.awssdk.utils.internal.async.EmptySubscription; @@ -31,7 +31,6 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -48,96 +47,83 @@ public class SdkSubscriberTest { public static final Function> SAMPLE_ITERATOR = response -> Arrays.asList(1, 2, 3, 4, 5, 6).listIterator(); public static final Function> EMPTY_ITERATOR = response -> new ArrayList().listIterator(); - @Mock - AsyncPageFetcher asyncPageFetcher; - PaginatedItemsPublisher itemsPublisher; @Mock Subscriber mockSubscriber; + private SdkPublisher sdkPublisher; + @Before public void setUp() { - doReturn(CompletableFuture.completedFuture(1)) - .when(asyncPageFetcher).nextPage(null); - doReturn(false) - .when(asyncPageFetcher).hasNextPage(any()); + sdkPublisher = SdkPublisher.adapt(Flowable.just(1, 2, 3, 4, 5, 6)); } @Test public void limitingSubscriber_with_different_limits() throws InterruptedException, ExecutionException, TimeoutException { - itemsPublisher = PaginatedItemsPublisher.builder().nextPageFetcher(asyncPageFetcher) - .iteratorFunction(SAMPLE_ITERATOR).isLastPage(false).build(); + List belowLimit = new ArrayList<>(); + sdkPublisher.limit(3).subscribe(belowLimit::add).get(5, TimeUnit.SECONDS); + assertThat(belowLimit).containsExactly(1, 2, 3); - final List belowLimit = new ArrayList<>(); - itemsPublisher.limit(3).subscribe(e -> belowLimit.add(e)).get(5, TimeUnit.SECONDS); - assertThat(belowLimit).isEqualTo(Arrays.asList(1, 2, 3)); + List beyondLimit = new ArrayList<>(); + sdkPublisher.limit(33).subscribe(beyondLimit::add).get(5, TimeUnit.SECONDS); + assertThat(beyondLimit).containsExactly(1, 2, 3, 4, 5, 6); - final List beyondLimit = new ArrayList<>(); - itemsPublisher.limit(33).subscribe(e -> beyondLimit.add(e)).get(5, TimeUnit.SECONDS); - assertThat(beyondLimit).isEqualTo(Arrays.asList(1, 2, 3, 4, 5, 6)); - - final List zeroLimit = new ArrayList<>(); - itemsPublisher.limit(0).subscribe(e -> zeroLimit.add(e)).get(5, TimeUnit.SECONDS); - assertThat(zeroLimit).isEqualTo(Arrays.asList()); + List zeroLimit = new ArrayList<>(); + sdkPublisher.limit(0).subscribe(zeroLimit::add).get(5, TimeUnit.SECONDS); + assertThat(zeroLimit).isEmpty(); } @Test public void filteringSubscriber_with_different_filters() throws InterruptedException, ExecutionException, TimeoutException { - itemsPublisher = PaginatedItemsPublisher.builder().nextPageFetcher(asyncPageFetcher) - .iteratorFunction(SAMPLE_ITERATOR).isLastPage(false).build(); - - final List filteredSomeList = new ArrayList<>(); - itemsPublisher.filter(i -> i % 2 == 0).subscribe(e -> filteredSomeList.add(e)).get(5, TimeUnit.SECONDS); - assertThat(filteredSomeList).isEqualTo(Arrays.asList(2, 4, 6)); - final List filteredAllList = new ArrayList<>(); - itemsPublisher.filter(i -> i % 10 == 0).subscribe(e -> filteredAllList.add(e)).get(5, TimeUnit.SECONDS); - assertThat(filteredAllList).isEqualTo(Arrays.asList()); + List filteredSomeList = new ArrayList<>(); + sdkPublisher.filter(i -> i % 2 == 0).subscribe(filteredSomeList::add).get(5, TimeUnit.SECONDS); + assertThat(filteredSomeList).containsExactly(2, 4, 6); - final List filteredNone = new ArrayList<>(); - itemsPublisher.filter(i -> i % 1 == 0).subscribe(e -> filteredNone.add(e)).get(5, TimeUnit.SECONDS); - assertThat(filteredNone).isEqualTo(Arrays.asList(1, 2, 3, 4, 5, 6)); + List filteredAllList = new ArrayList<>(); + sdkPublisher.filter(i -> i % 10 == 0).subscribe(filteredAllList::add).get(5, TimeUnit.SECONDS); + assertThat(filteredAllList).isEmpty(); + List filteredNone = new ArrayList<>(); + sdkPublisher.filter(i -> i % 1 == 0).subscribe(filteredNone::add).get(5, TimeUnit.SECONDS); + assertThat(filteredNone).containsExactly(1, 2, 3, 4, 5, 6); } @Test public void limit_and_filter_subscriber_chained_with_different_conditions() throws InterruptedException, ExecutionException, TimeoutException { - itemsPublisher = PaginatedItemsPublisher.builder().nextPageFetcher(asyncPageFetcher) - .iteratorFunction(SAMPLE_ITERATOR).isLastPage(false).build(); - final List belowLimitWithFiltering = new ArrayList<>(); - itemsPublisher.limit(4).filter(i -> i % 2 == 0).subscribe(e -> belowLimitWithFiltering.add(e)).get(5, TimeUnit.SECONDS); - assertThat(belowLimitWithFiltering).isEqualTo(Arrays.asList(2, 4)); + List belowLimitWithFiltering = new ArrayList<>(); + sdkPublisher.limit(4).filter(i -> i % 2 == 0).subscribe(belowLimitWithFiltering::add).get(5, TimeUnit.SECONDS); + assertThat(belowLimitWithFiltering).containsExactly(2, 4); - final List beyondLimitWithAllFiltering = new ArrayList<>(); - itemsPublisher.limit(33).filter(i -> i % 10 == 0).subscribe(e -> beyondLimitWithAllFiltering.add(e)).get(5, TimeUnit.SECONDS); - assertThat(beyondLimitWithAllFiltering).isEqualTo(Arrays.asList()); + List beyondLimitWithAllFiltering = new ArrayList<>(); + sdkPublisher.limit(33).filter(i -> i % 10 == 0).subscribe(beyondLimitWithAllFiltering::add).get(5, TimeUnit.SECONDS); + assertThat(beyondLimitWithAllFiltering).isEmpty(); - final List zeroLimitAndNoFilter = new ArrayList<>(); - itemsPublisher.limit(0).filter(i -> i % 1 == 0).subscribe(e -> zeroLimitAndNoFilter.add(e)).get(5, TimeUnit.SECONDS); - assertThat(zeroLimitAndNoFilter).isEqualTo(Arrays.asList()); + List zeroLimitAndNoFilter = new ArrayList<>(); + sdkPublisher.limit(0).filter(i -> i % 1 == 0).subscribe(zeroLimitAndNoFilter::add).get(5, TimeUnit.SECONDS); + assertThat(zeroLimitAndNoFilter).isEmpty(); - final List filteringbelowLimitWith = new ArrayList<>(); - itemsPublisher.filter(i -> i % 2 == 0).limit(2).subscribe(e -> filteringbelowLimitWith.add(e)).get(5, TimeUnit.SECONDS); - assertThat(filteringbelowLimitWith).isEqualTo(Arrays.asList(2, 4)); + List filteringbelowLimitWith = new ArrayList<>(); + sdkPublisher.filter(i -> i % 2 == 0).limit(2).subscribe(filteringbelowLimitWith::add).get(5, TimeUnit.SECONDS); + assertThat(filteringbelowLimitWith).containsExactly(2, 4); - final List filteringAndOutsideLimit = new ArrayList<>(); - itemsPublisher.filter(i -> i % 10 == 0).limit(33).subscribe(e -> filteringAndOutsideLimit.add(e)).get(5, TimeUnit.SECONDS); - assertThat(filteringAndOutsideLimit).isEqualTo(Arrays.asList()); + List filteringAndOutsideLimit = new ArrayList<>(); + sdkPublisher.filter(i -> i % 10 == 0).limit(33).subscribe(filteringAndOutsideLimit::add).get(5, TimeUnit.SECONDS); + assertThat(filteringAndOutsideLimit).isEmpty(); } @Test public void limit__subscriber_with_empty_input_and_zero_limit() throws Exception { - itemsPublisher = PaginatedItemsPublisher.builder().nextPageFetcher(asyncPageFetcher) - .iteratorFunction(EMPTY_ITERATOR).isLastPage(false).build(); + sdkPublisher = SdkPublisher.adapt(Flowable.empty()); - final List zeroLimit = new ArrayList<>(); - itemsPublisher.limit(0).subscribe(e -> zeroLimit.add(e)).get(5, TimeUnit.SECONDS); - assertThat(zeroLimit).isEqualTo(Arrays.asList()); + List zeroLimit = new ArrayList<>(); + sdkPublisher.limit(0).subscribe(zeroLimit::add).get(5, TimeUnit.SECONDS); + assertThat(zeroLimit).isEmpty(); List nonZeroLimit = new ArrayList<>(); - itemsPublisher.limit(10).subscribe(e -> nonZeroLimit.add(e)).get(5, TimeUnit.SECONDS); - assertThat(zeroLimit).isEqualTo(Arrays.asList()); + sdkPublisher.limit(10).subscribe(nonZeroLimit::add).get(5, TimeUnit.SECONDS); + assertThat(zeroLimit).isEmpty(); } diff --git a/test/codegen-generated-classes-test/src/main/resources/codegen-resources/paginators/paginators-1.json b/test/codegen-generated-classes-test/src/main/resources/codegen-resources/paginators/paginators-1.json new file mode 100644 index 000000000000..e0d22dba7dd6 --- /dev/null +++ b/test/codegen-generated-classes-test/src/main/resources/codegen-resources/paginators/paginators-1.json @@ -0,0 +1,10 @@ +{ + "pagination": { + "ListStrings": { + "input_token": "NextToken", + "limit_key": "MaxResults", + "output_token": "NextToken", + "result_key": "Strings" + } + } +} \ No newline at end of file diff --git a/test/codegen-generated-classes-test/src/main/resources/codegen-resources/paginators/service-2.json b/test/codegen-generated-classes-test/src/main/resources/codegen-resources/paginators/service-2.json new file mode 100644 index 000000000000..11247132b112 --- /dev/null +++ b/test/codegen-generated-classes-test/src/main/resources/codegen-resources/paginators/service-2.json @@ -0,0 +1,49 @@ +{ + "version":"2.0", + "metadata":{ + "apiVersion":"2016-03-11", + "endpointPrefix":"paginators", + "jsonVersion":"1.1", + "protocol":"rest-json", + "serviceAbbreviation":"Paginators", + "serviceFullName":"Paginators", + "serviceId":"Paginators", + "signatureVersion":"v4", + "targetPrefix":"Paginators", + "timestampFormat":"unixTimestamp", + "uid":"paginators-2016-03-11" + }, + "operations":{ + "ListStrings":{ + "name":"ListStrings", + "http":{ + "method":"POST", + "requestUri":"/2016-03-11/listStrings" + }, + "input":{"shape":"ListStringsRequest"}, + "output":{"shape":"ListStringsResponse"} + } + }, + "shapes":{ + "ListStringsRequest":{ + "type":"structure", + "members": { + "NextToken": {"shape":"String"}, + "MaxResults": {"shape":"Integer"} + } + }, + "ListStringsResponse":{ + "type": "structure", + "members": { + "Strings": {"shape": "ListOfStrings"}, + "NextToken": {"shape": "String"} + } + }, + "Integer":{"type":"integer"}, + "ListOfStrings":{ + "type":"list", + "member":{"shape":"String"} + }, + "String":{"type":"string"} + } +} \ No newline at end of file diff --git a/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/paginators/PaginatorsTest.java b/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/paginators/PaginatorsTest.java new file mode 100644 index 000000000000..ccef25a956d9 --- /dev/null +++ b/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/paginators/PaginatorsTest.java @@ -0,0 +1,90 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.paginators; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import io.reactivex.Flowable; +import java.net.URI; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.paginators.model.ListStringsRequest; +import software.amazon.awssdk.services.paginators.paginators.ListStringsPublisher; + +public class PaginatorsTest { + private static final WireMockServer wireMock = new WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort()); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static PaginatorsAsyncClient client; + + @BeforeAll + static void setup() { + wireMock.start(); + + client = PaginatorsAsyncClient.builder() + .region(Region.US_WEST_2) + .endpointOverride(URI.create("http://localhost:" + wireMock.port())) + .credentialsProvider(AnonymousCredentialsProvider.create()) + .build(); + } + + @AfterAll + static void teardown() { + client.close(); + wireMock.stop(); + } + + @Test + @Timeout(value = 15, unit = TimeUnit.SECONDS) + void listStrings_largePage_succeeds() { + int nItems = 10_000; + wireMock.stubFor(post(urlEqualTo("/2016-03-11/listStrings")) + .willReturn(aResponse().withStatus(200) + .withJsonBody(createScanResponse(nItems)))); + + ListStringsPublisher publisher = client.listStringsPaginator(ListStringsRequest.builder().build()); + + Long itemsSeen = Flowable.fromPublisher(publisher.strings()).count().blockingGet(); + assertThat(itemsSeen).isEqualTo(nItems); + } + + private static JsonNode createScanResponse(int nItems) { + ObjectNode resp = mapper.createObjectNode(); + + ArrayNode strings = mapper.createArrayNode(); + for (int i = 0; i < nItems; i++) { + strings.add(mapper.valueToTree(Integer.toString(i))); + } + + resp.set("Strings", strings); + + return resp; + } +} From d6c80d22f7763598bd922eecd2c5e546670a8a4f Mon Sep 17 00:00:00 2001 From: Dongie Agnir Date: Mon, 12 Jan 2026 10:53:42 -0800 Subject: [PATCH 2/4] Reintroduce the protected API for backcompat --- .../async/PaginatedItemsPublisher.java | 99 +++++++++++++++++++ .../async/PaginatedItemsPublisherTckTest.java | 91 +++++++++++++++++ 2 files changed, 190 insertions(+) create mode 100644 core/sdk-core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisher.java create mode 100644 core/sdk-core/src/test/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisherTckTest.java diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisher.java new file mode 100644 index 000000000000..dea160c65b96 --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisher.java @@ -0,0 +1,99 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.pagination.async; + +import java.util.Iterator; +import java.util.function.Function; +import org.reactivestreams.Subscriber; +import software.amazon.awssdk.annotations.SdkProtectedApi; +import software.amazon.awssdk.core.async.SdkPublisher; +import software.amazon.awssdk.core.internal.pagination.async.ItemsSubscription; + +/** + * A publisher to request for a stream of paginated items. The class can be used to request data for paginated items + * across multiple pages. + * + * @param The type of a single response page + * @param The type of paginated member in a response page + */ +@SdkProtectedApi +public final class PaginatedItemsPublisher implements SdkPublisher { + + private final AsyncPageFetcher nextPageFetcher; + + private final Function> getIteratorFunction; + + private final boolean isLastPage; + + private PaginatedItemsPublisher(BuilderImpl builder) { + this.nextPageFetcher = builder.nextPageFetcher; + this.getIteratorFunction = builder.iteratorFunction; + this.isLastPage = builder.isLastPage; + } + + public static Builder builder() { + return new BuilderImpl(); + } + + @Override + public void subscribe(Subscriber subscriber) { + subscriber.onSubscribe(isLastPage ? new EmptySubscription(subscriber) + : ItemsSubscription.builder() + .subscriber(subscriber) + .nextPageFetcher(nextPageFetcher) + .iteratorFunction(getIteratorFunction) + .build()); + } + + public interface Builder { + Builder nextPageFetcher(AsyncPageFetcher nextPageFetcher); + + Builder iteratorFunction(Function iteratorFunction); + + Builder isLastPage(boolean isLastPage); + + PaginatedItemsPublisher build(); + } + + private static final class BuilderImpl implements Builder { + private AsyncPageFetcher nextPageFetcher; + private Function iteratorFunction; + private boolean isLastPage; + + @Override + public Builder nextPageFetcher(AsyncPageFetcher nextPageFetcher) { + this.nextPageFetcher = nextPageFetcher; + return this; + } + + @Override + public Builder iteratorFunction(Function iteratorFunction) { + this.iteratorFunction = iteratorFunction; + return this; + } + + @Override + public Builder isLastPage(boolean isLastPage) { + this.isLastPage = isLastPage; + return this; + } + + @Override + public PaginatedItemsPublisher build() { + return new PaginatedItemsPublisher(this); + } + } +} diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisherTckTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisherTckTest.java new file mode 100644 index 000000000000..80df7e5ff018 --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisherTckTest.java @@ -0,0 +1,91 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.pagination.async; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.LongStream; +import org.reactivestreams.Publisher; +import org.reactivestreams.tck.PublisherVerification; +import org.reactivestreams.tck.TestEnvironment; + +/** + * TCK verification test for {@link PaginatedItemsPublisher}. + */ +public class PaginatedItemsPublisherTckTest extends PublisherVerification { + + public PaginatedItemsPublisherTckTest() { + super(new TestEnvironment()); + } + + @Override + public Publisher createPublisher(long l) { + Function, Iterator> getIterator = response -> response != null ? response.iterator() + : Collections.emptyIterator(); + + return PaginatedItemsPublisher.builder() + .nextPageFetcher(new PageFetcher(l, 5)) + .iteratorFunction(getIterator) + .isLastPage(false) + .build(); + } + + @Override + public Publisher createFailedPublisher() { + // It's not possible to initialize PaginatedItemsPublisher to a failed + // state since we can only reach a failed state if we fail to fulfill a + // request, e.g. because the service returned an error response. + + // return null to skip related tests + return null; + } + + /** + * Simple {@link AsyncPageFetcher} that returns lists of longs as pages. + */ + private static class PageFetcher implements AsyncPageFetcher> { + private final long maxVal; + private final long step; + + private PageFetcher(long maxVal, long step) { + this.maxVal = maxVal; + this.step = step; + } + + @Override + public boolean hasNextPage(List oldPage) { + return (lastElement(oldPage)) < maxVal - 1; + } + + @Override + public CompletableFuture> nextPage(List oldPage) { + long i = lastElement(oldPage) + 1; + long j = Math.min(i + step, maxVal); + List stream = LongStream.range(i, j).boxed().collect(Collectors.toList()); + return CompletableFuture.completedFuture(stream); + } + + private long lastElement(List s) { + // first page is always null + if (s == null) return -1; + return s.get(s.size() - 1); + } + } +} From 4705f7f2fed1629a60950a281d4bdece04781135 Mon Sep 17 00:00:00 2001 From: Dongie Agnir Date: Mon, 12 Jan 2026 12:18:34 -0800 Subject: [PATCH 3/4] Review comments --- .../amazon/awssdk/services/paginators/PaginatorsTest.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/paginators/PaginatorsTest.java b/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/paginators/PaginatorsTest.java index ccef25a956d9..9c7a6f4a8b52 100644 --- a/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/paginators/PaginatorsTest.java +++ b/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/paginators/PaginatorsTest.java @@ -65,9 +65,8 @@ static void teardown() { @Timeout(value = 15, unit = TimeUnit.SECONDS) void listStrings_largePage_succeeds() { int nItems = 10_000; - wireMock.stubFor(post(urlEqualTo("/2016-03-11/listStrings")) - .willReturn(aResponse().withStatus(200) - .withJsonBody(createScanResponse(nItems)))); + wireMock.stubFor(post(urlEqualTo("/2016-03-11/listStrings")).willReturn( + aResponse().withStatus(200).withJsonBody(createResponse(nItems)))); ListStringsPublisher publisher = client.listStringsPaginator(ListStringsRequest.builder().build()); @@ -75,7 +74,7 @@ void listStrings_largePage_succeeds() { assertThat(itemsSeen).isEqualTo(nItems); } - private static JsonNode createScanResponse(int nItems) { + private static JsonNode createResponse(int nItems) { ObjectNode resp = mapper.createObjectNode(); ArrayNode strings = mapper.createArrayNode(); From 7366a7f5f38deaac59b03523b1ef2089935df7e5 Mon Sep 17 00:00:00 2001 From: Dongie Agnir Date: Mon, 12 Jan 2026 13:20:37 -0800 Subject: [PATCH 4/4] Deprecated item publisher --- .../awssdk/core/pagination/async/PaginatedItemsPublisher.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisher.java index dea160c65b96..ee6d4c289ca1 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisher.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/pagination/async/PaginatedItemsPublisher.java @@ -30,6 +30,7 @@ * @param The type of paginated member in a response page */ @SdkProtectedApi +@Deprecated public final class PaginatedItemsPublisher implements SdkPublisher { private final AsyncPageFetcher nextPageFetcher;