Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AWSSDKforJavav2-03a897a.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS SDK for Java v2",
"contributor": "",
"description": "Fix an issue where `StackOverflowError` can occur when iterating over large pages from an async paginator. This can manifest as the publisher hanging/never reaching the end of the stream. Fixes [#6411](https://github.com/aws/aws-sdk-java-v2/issues/6411)."
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@
import com.squareup.javapoet.TypeSpec;
import com.squareup.javapoet.WildcardTypeName;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.lang.model.element.Modifier;
Expand All @@ -40,7 +38,6 @@
import software.amazon.awssdk.codegen.poet.PoetUtils;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher;
import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher;
import software.amazon.awssdk.core.pagination.async.ResponsesSubscription;

/**
Expand Down Expand Up @@ -200,21 +197,23 @@ private MethodSpec getMethodsSpecForSingleResultKey(String resultKey) {
return null;
}

String fluentGetter = fluentGetterMethodForResponseMember(resultKey);

CodeBlock.Builder iterableFnBuilder = CodeBlock.builder()
.add("$1N -> $1N.$2L", RESPONSE_LITERAL, fluentGetter);

String fluentGetterMethodName = resultKeyModel.getFluentGetterMethodName();

if (resultKeyModel.isMap()) {
iterableFnBuilder.add(".entrySet()");
}

TypeName resultKeyType = getTypeForResultKey(resultKey);

return MethodSpec.methodBuilder(resultKeyModel.getFluentGetterMethodName())
return MethodSpec.methodBuilder(fluentGetterMethodName)
.addModifiers(Modifier.PUBLIC, Modifier.FINAL)
.returns(ParameterizedTypeName.get(ClassName.get(SdkPublisher.class), resultKeyType))
.addCode("$T getIterator = ",
ParameterizedTypeName.get(ClassName.get(Function.class),
responseType(),
ParameterizedTypeName.get(ClassName.get(Iterator.class),
resultKeyType)))
.addCode(getIteratorLambdaBlock(resultKey, resultKeyModel))
.addCode("\n")
.addStatement("return $1T.builder().$2L(new $3L()).iteratorFunction(getIterator).$4L($4L).build()",
PaginatedItemsPublisher.class, NEXT_PAGE_FETCHER_MEMBER, nextPageFetcherClassName(),
LAST_PAGE_FIELD)
.addStatement("return this.flatMapIterable($L)", iterableFnBuilder.build())
.addJavadoc(CodeBlock.builder()
.add("Returns a publisher that can be used to get a stream of data. You need to "
+ "subscribe to the publisher to request the stream of data. The publisher "
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package software.amazon.awssdk.services.jsonprotocoltests.paginators;

import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.annotations.Generated;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher;
import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher;
import software.amazon.awssdk.core.pagination.async.ResponsesSubscription;
import software.amazon.awssdk.services.jsonprotocoltests.JsonProtocolTestsAsyncClient;
import software.amazon.awssdk.services.jsonprotocoltests.internal.UserAgentUtils;
Expand Down Expand Up @@ -107,15 +103,7 @@ public void subscribe(Subscriber<? super PaginatedOperationWithResultKeyAndMoreR
* and then applies that consumer to each response returned by the service.
*/
public final SdkPublisher<SimpleStruct> items() {
Function<PaginatedOperationWithResultKeyAndMoreResultsResponse, Iterator<SimpleStruct>> getIterator = response -> {
if (response != null && response.items() != null) {
return response.items().iterator();
}
return Collections.emptyIterator();
};
return PaginatedItemsPublisher.builder()
.nextPageFetcher(new PaginatedOperationWithResultKeyAndMoreResultsResponseFetcher())
.iteratorFunction(getIterator).isLastPage(isLastPage).build();
return this.flatMapIterable(response -> response.items());
}

private class PaginatedOperationWithResultKeyAndMoreResultsResponseFetcher implements
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package software.amazon.awssdk.services.jsonprotocoltests.paginators;

import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.annotations.Generated;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher;
import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher;
import software.amazon.awssdk.core.pagination.async.ResponsesSubscription;
import software.amazon.awssdk.core.util.PaginatorUtils;
import software.amazon.awssdk.services.jsonprotocoltests.JsonProtocolTestsAsyncClient;
Expand Down Expand Up @@ -107,14 +103,7 @@ public void subscribe(Subscriber<? super PaginatedOperationWithResultKeyResponse
* and then applies that consumer to each response returned by the service.
*/
public final SdkPublisher<SimpleStruct> items() {
Function<PaginatedOperationWithResultKeyResponse, Iterator<SimpleStruct>> getIterator = response -> {
if (response != null && response.items() != null) {
return response.items().iterator();
}
return Collections.emptyIterator();
};
return PaginatedItemsPublisher.builder().nextPageFetcher(new PaginatedOperationWithResultKeyResponseFetcher())
.iteratorFunction(getIterator).isLastPage(isLastPage).build();
return this.flatMapIterable(response -> response.items());
}

private class PaginatedOperationWithResultKeyResponseFetcher implements
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package software.amazon.awssdk.services.jsonprotocoltests.paginators;

import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.annotations.Generated;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher;
import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher;
import software.amazon.awssdk.core.pagination.async.ResponsesSubscription;
import software.amazon.awssdk.core.util.PaginatorUtils;
import software.amazon.awssdk.services.jsonprotocoltests.JsonProtocolTestsAsyncClient;
Expand Down Expand Up @@ -85,7 +81,7 @@ public SameTokenPaginationApiPublisher(JsonProtocolTestsAsyncClient client, Same
}

private SameTokenPaginationApiPublisher(JsonProtocolTestsAsyncClient client, SameTokenPaginationApiRequest firstRequest,
boolean isLastPage) {
boolean isLastPage) {
this.client = client;
this.firstRequest = firstRequest;
this.isLastPage = isLastPage;
Expand All @@ -94,7 +90,7 @@ private SameTokenPaginationApiPublisher(JsonProtocolTestsAsyncClient client, Sam
@Override
public void subscribe(Subscriber<? super SameTokenPaginationApiResponse> subscriber) {
subscriber.onSubscribe(ResponsesSubscription.builder().subscriber(subscriber)
.nextPageFetcher(new SameTokenPaginationApiResponseFetcher()).build());
.nextPageFetcher(new SameTokenPaginationApiResponseFetcher()).build());
}

/**
Expand All @@ -103,14 +99,7 @@ public void subscribe(Subscriber<? super SameTokenPaginationApiResponse> subscri
* and then applies that consumer to each response returned by the service.
*/
public final SdkPublisher<SimpleStruct> items() {
Function<SameTokenPaginationApiResponse, Iterator<SimpleStruct>> getIterator = response -> {
if (response != null && response.items() != null) {
return response.items().iterator();
}
return Collections.emptyIterator();
};
return PaginatedItemsPublisher.builder().nextPageFetcher(new SameTokenPaginationApiResponseFetcher())
.iteratorFunction(getIterator).isLastPage(isLastPage).build();
return this.flatMapIterable(response -> response.items());
}

private class SameTokenPaginationApiResponseFetcher implements AsyncPageFetcher<SameTokenPaginationApiResponse> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* @param <ItemT> The type of paginated member in a response page
*/
@SdkProtectedApi
@Deprecated
public final class PaginatedItemsPublisher<ResponseT, ItemT> implements SdkPublisher<ItemT> {

private final AsyncPageFetcher<ResponseT> nextPageFetcher;
Expand Down
102 changes: 44 additions & 58 deletions core/sdk-core/src/test/java/utils/SdkSubscriberTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,22 @@

package utils;

import io.reactivex.Flowable;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher;
import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.utils.async.LimitingSubscriber;
import software.amazon.awssdk.utils.internal.async.EmptySubscription;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -48,96 +47,83 @@ public class SdkSubscriberTest {

public static final Function<Integer, Iterator<Integer>> SAMPLE_ITERATOR = response -> Arrays.asList(1, 2, 3, 4, 5, 6).listIterator();
public static final Function<Integer, Iterator<Integer>> EMPTY_ITERATOR = response -> new ArrayList<Integer>().listIterator();
@Mock
AsyncPageFetcher asyncPageFetcher;
PaginatedItemsPublisher<Integer, Integer> itemsPublisher;
Copy link
Contributor Author

@dagnir dagnir Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this test is just testing features of SdkPublisher such as limit(), not PaginatedItemsPublisher itself, so just removed this class entirely


@Mock
Subscriber<Integer> mockSubscriber;

private SdkPublisher<Integer> sdkPublisher;

@Before
public void setUp() {
doReturn(CompletableFuture.completedFuture(1))
.when(asyncPageFetcher).nextPage(null);
doReturn(false)
.when(asyncPageFetcher).hasNextPage(any());
sdkPublisher = SdkPublisher.adapt(Flowable.just(1, 2, 3, 4, 5, 6));
}

@Test
public void limitingSubscriber_with_different_limits() throws InterruptedException, ExecutionException, TimeoutException {
itemsPublisher = PaginatedItemsPublisher.builder().nextPageFetcher(asyncPageFetcher)
.iteratorFunction(SAMPLE_ITERATOR).isLastPage(false).build();
List<Integer> belowLimit = new ArrayList<>();
sdkPublisher.limit(3).subscribe(belowLimit::add).get(5, TimeUnit.SECONDS);
assertThat(belowLimit).containsExactly(1, 2, 3);

final List<Integer> belowLimit = new ArrayList<>();
itemsPublisher.limit(3).subscribe(e -> belowLimit.add(e)).get(5, TimeUnit.SECONDS);
assertThat(belowLimit).isEqualTo(Arrays.asList(1, 2, 3));
List<Integer> beyondLimit = new ArrayList<>();
sdkPublisher.limit(33).subscribe(beyondLimit::add).get(5, TimeUnit.SECONDS);
assertThat(beyondLimit).containsExactly(1, 2, 3, 4, 5, 6);

final List<Integer> beyondLimit = new ArrayList<>();
itemsPublisher.limit(33).subscribe(e -> beyondLimit.add(e)).get(5, TimeUnit.SECONDS);
assertThat(beyondLimit).isEqualTo(Arrays.asList(1, 2, 3, 4, 5, 6));

final List<Integer> zeroLimit = new ArrayList<>();
itemsPublisher.limit(0).subscribe(e -> zeroLimit.add(e)).get(5, TimeUnit.SECONDS);
assertThat(zeroLimit).isEqualTo(Arrays.asList());
List<Integer> zeroLimit = new ArrayList<>();
sdkPublisher.limit(0).subscribe(zeroLimit::add).get(5, TimeUnit.SECONDS);
assertThat(zeroLimit).isEmpty();
}

@Test
public void filteringSubscriber_with_different_filters() throws InterruptedException, ExecutionException, TimeoutException {
itemsPublisher = PaginatedItemsPublisher.builder().nextPageFetcher(asyncPageFetcher)
.iteratorFunction(SAMPLE_ITERATOR).isLastPage(false).build();

final List<Integer> filteredSomeList = new ArrayList<>();
itemsPublisher.filter(i -> i % 2 == 0).subscribe(e -> filteredSomeList.add(e)).get(5, TimeUnit.SECONDS);
assertThat(filteredSomeList).isEqualTo(Arrays.asList(2, 4, 6));

final List<Integer> filteredAllList = new ArrayList<>();
itemsPublisher.filter(i -> i % 10 == 0).subscribe(e -> filteredAllList.add(e)).get(5, TimeUnit.SECONDS);
assertThat(filteredAllList).isEqualTo(Arrays.asList());
List<Integer> filteredSomeList = new ArrayList<>();
sdkPublisher.filter(i -> i % 2 == 0).subscribe(filteredSomeList::add).get(5, TimeUnit.SECONDS);
assertThat(filteredSomeList).containsExactly(2, 4, 6);

final List<Integer> filteredNone = new ArrayList<>();
itemsPublisher.filter(i -> i % 1 == 0).subscribe(e -> filteredNone.add(e)).get(5, TimeUnit.SECONDS);
assertThat(filteredNone).isEqualTo(Arrays.asList(1, 2, 3, 4, 5, 6));
List<Integer> filteredAllList = new ArrayList<>();
sdkPublisher.filter(i -> i % 10 == 0).subscribe(filteredAllList::add).get(5, TimeUnit.SECONDS);
assertThat(filteredAllList).isEmpty();

List<Integer> filteredNone = new ArrayList<>();
sdkPublisher.filter(i -> i % 1 == 0).subscribe(filteredNone::add).get(5, TimeUnit.SECONDS);
assertThat(filteredNone).containsExactly(1, 2, 3, 4, 5, 6);
}

@Test
public void limit_and_filter_subscriber_chained_with_different_conditions() throws InterruptedException, ExecutionException, TimeoutException {
itemsPublisher = PaginatedItemsPublisher.builder().nextPageFetcher(asyncPageFetcher)
.iteratorFunction(SAMPLE_ITERATOR).isLastPage(false).build();

final List<Integer> belowLimitWithFiltering = new ArrayList<>();
itemsPublisher.limit(4).filter(i -> i % 2 == 0).subscribe(e -> belowLimitWithFiltering.add(e)).get(5, TimeUnit.SECONDS);
assertThat(belowLimitWithFiltering).isEqualTo(Arrays.asList(2, 4));
List<Integer> belowLimitWithFiltering = new ArrayList<>();
sdkPublisher.limit(4).filter(i -> i % 2 == 0).subscribe(belowLimitWithFiltering::add).get(5, TimeUnit.SECONDS);
assertThat(belowLimitWithFiltering).containsExactly(2, 4);

final List<Integer> beyondLimitWithAllFiltering = new ArrayList<>();
itemsPublisher.limit(33).filter(i -> i % 10 == 0).subscribe(e -> beyondLimitWithAllFiltering.add(e)).get(5, TimeUnit.SECONDS);
assertThat(beyondLimitWithAllFiltering).isEqualTo(Arrays.asList());
List<Integer> beyondLimitWithAllFiltering = new ArrayList<>();
sdkPublisher.limit(33).filter(i -> i % 10 == 0).subscribe(beyondLimitWithAllFiltering::add).get(5, TimeUnit.SECONDS);
assertThat(beyondLimitWithAllFiltering).isEmpty();

final List<Integer> zeroLimitAndNoFilter = new ArrayList<>();
itemsPublisher.limit(0).filter(i -> i % 1 == 0).subscribe(e -> zeroLimitAndNoFilter.add(e)).get(5, TimeUnit.SECONDS);
assertThat(zeroLimitAndNoFilter).isEqualTo(Arrays.asList());
List<Integer> zeroLimitAndNoFilter = new ArrayList<>();
sdkPublisher.limit(0).filter(i -> i % 1 == 0).subscribe(zeroLimitAndNoFilter::add).get(5, TimeUnit.SECONDS);
assertThat(zeroLimitAndNoFilter).isEmpty();

final List<Integer> filteringbelowLimitWith = new ArrayList<>();
itemsPublisher.filter(i -> i % 2 == 0).limit(2).subscribe(e -> filteringbelowLimitWith.add(e)).get(5, TimeUnit.SECONDS);
assertThat(filteringbelowLimitWith).isEqualTo(Arrays.asList(2, 4));
List<Integer> filteringbelowLimitWith = new ArrayList<>();
sdkPublisher.filter(i -> i % 2 == 0).limit(2).subscribe(filteringbelowLimitWith::add).get(5, TimeUnit.SECONDS);
assertThat(filteringbelowLimitWith).containsExactly(2, 4);

final List<Integer> filteringAndOutsideLimit = new ArrayList<>();
itemsPublisher.filter(i -> i % 10 == 0).limit(33).subscribe(e -> filteringAndOutsideLimit.add(e)).get(5, TimeUnit.SECONDS);
assertThat(filteringAndOutsideLimit).isEqualTo(Arrays.asList());
List<Integer> filteringAndOutsideLimit = new ArrayList<>();
sdkPublisher.filter(i -> i % 10 == 0).limit(33).subscribe(filteringAndOutsideLimit::add).get(5, TimeUnit.SECONDS);
assertThat(filteringAndOutsideLimit).isEmpty();
}

@Test
public void limit__subscriber_with_empty_input_and_zero_limit() throws Exception {
itemsPublisher = PaginatedItemsPublisher.builder().nextPageFetcher(asyncPageFetcher)
.iteratorFunction(EMPTY_ITERATOR).isLastPage(false).build();
sdkPublisher = SdkPublisher.adapt(Flowable.empty());

final List<Integer> zeroLimit = new ArrayList<>();
itemsPublisher.limit(0).subscribe(e -> zeroLimit.add(e)).get(5, TimeUnit.SECONDS);
assertThat(zeroLimit).isEqualTo(Arrays.asList());
List<Integer> zeroLimit = new ArrayList<>();
sdkPublisher.limit(0).subscribe(zeroLimit::add).get(5, TimeUnit.SECONDS);
assertThat(zeroLimit).isEmpty();

List<Integer> nonZeroLimit = new ArrayList<>();
itemsPublisher.limit(10).subscribe(e -> nonZeroLimit.add(e)).get(5, TimeUnit.SECONDS);
assertThat(zeroLimit).isEqualTo(Arrays.asList());
sdkPublisher.limit(10).subscribe(nonZeroLimit::add).get(5, TimeUnit.SECONDS);
assertThat(zeroLimit).isEmpty();
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"pagination": {
"ListStrings": {
"input_token": "NextToken",
"limit_key": "MaxResults",
"output_token": "NextToken",
"result_key": "Strings"
}
}
}
Loading
Loading