Skip to content

Regression in Flux.cache in latest Californium snapshots #2053

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wilkinsona opened this issue Feb 19, 2020 · 4 comments
Closed

Regression in Flux.cache in latest Californium snapshots #2053

wilkinsona opened this issue Feb 19, 2020 · 4 comments

Comments

@wilkinsona
Copy link
Contributor

Expected Behavior

Flux.cache should cache emitted signals and replay them

Actual Behavior

Signals are not cached

Steps to Reproduce

@Test
public void fluxCaching() {
    AtomicInteger invocations = new AtomicInteger();
    Flux<String> flux = Flux.fromIterable(() -> {
        invocations.incrementAndGet();
        return Arrays.asList("spring", "boot").iterator();
    }).cache(Duration.ofHours(1));
    assertThat(flux.blockLast()).isEqualTo("boot");
    assertThat(flux.blockLast()).isEqualTo("boot");
    assertThat(invocations).hasValue(1);
}

The above fails with Californium-BUILD-SNAPSHOT as the value of invocations is 2. It passes with SR-15.

The problem is also reproduced by Spring Boot's test suite which is how I discovered the problem. See spring-projects/spring-boot#20196.

Your Environment

  • Californium-BUILD-SNAPSHOT
  • Java 1.8.0_202
  • macOS
@simonbasle
Copy link
Contributor

This is a side effect of the fromIterable operator now possibly invoking the Iterable#iterator() method twice, see #2014 and the fix in #2021.

The goal is to check the Iterable can be safely categorized as finite, in order to avoid infinite loops when attempting to discard the remainder of the working Iterator later on, in case iteration is cancelled.

The extra call to iterator() is avoided if the Iterable is of type Collection, so one way of rewriting the test would be the following:

	@Test
	public void fluxCaching() {
		AtomicInteger invocations = new AtomicInteger();
		List<String> data = new ArrayList<String>(Arrays.asList("spring", "boot")) {
			@NotNull
			@Override
			public Iterator<String> iterator() {
				invocations.incrementAndGet();
				return super.iterator();
			}
		};
		Flux<String> flux = Flux.fromIterable(data).cache(Duration.ofHours(1));
		assertThat(flux.blockLast()).isEqualTo("boot");
		assertThat(flux.blockLast()).isEqualTo("boot");
		assertThat(invocations).hasValue(1);
	}

Does that work for you @wilkinsona ?

@snicoll
Copy link
Contributor

snicoll commented Feb 20, 2020

@simonbasle the test is an extract of what Spring Boot does to cache actuator responses. Don't you consider this to be a breaking change? I think it would be ok to rewrite the code when upgrading to a new feature release but it's a bit odd to have to do that in a maintenance release. Or was the usage above wrong in the first place?

Here is the test that's actually failing: https://github.com/spring-projects/spring-boot/blob/29bc5d848e1351f4f7e98cd55d6186650fb7ee35/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvokerTests.java#L86-L96

And the use of the cache operator is here: https://github.com/spring-projects/spring-boot/blob/29bc5d848e1351f4f7e98cd55d6186650fb7ee35/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/endpoint/invoker/cache/CachingOperationInvoker.java#L146-L160

@simonbasle
Copy link
Contributor

So, the test itself is built around an implicit assumption that fromIterable somehow guarantees the iterator() method to only be called once. Or rather, it assumes that iterator() called once == subscribe() called once.

This shifts the focus from what the cache() does to what the fromIterable() does, and makes an assumption on what should be an implementation detail.

A better way to test that a cache() prevents further subscriptions to a source is to have the source decorated with doFirst or doOnSubscribe:

	@Test
	public void fluxCaching() {
		AtomicInteger invocations = new AtomicInteger();
		Flux<String> source = Flux.just("spring", "boot")
            .doFirst(invocations::incrementAndGet);
       //or .doOnSubscribe(sub -> invocations.incrementAndGet());
        
        Flux<String> cached = source.cache(Duration.ofHours(1));

		assertThat(cached.blockLast()).isEqualTo("boot");
		assertThat(cached.blockLast()).isEqualTo("boot");
		assertThat(invocations).hasValue(1);
	}

@snicoll
Copy link
Contributor

snicoll commented Feb 20, 2020

With the help of @simonbasle we fixed the unit test and I've upgraded our build to snapshots.

Thanks for the support Simon, I think this issue can be closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants