Skip to content

Commit

Permalink
Rewrite completion-order streaming tests (#947)
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit authored Sep 16, 2024
1 parent b0ca86e commit ec1f1ae
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 93 deletions.
93 changes: 0 additions & 93 deletions src/test/java/com/pivovarit/collectors/FunctionalTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,27 @@

import com.pivovarit.collectors.ParallelCollectors.Batching;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestFactory;

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Stream;

import static com.pivovarit.collectors.ParallelCollectors.parallel;
import static com.pivovarit.collectors.ParallelCollectors.parallelToStream;
import static com.pivovarit.collectors.TestUtils.returnWithDelay;
import static com.pivovarit.collectors.TestUtils.withExecutor;
import static java.lang.String.format;
import static java.time.Duration.ofMillis;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toCollection;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static java.util.stream.Stream.of;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.DynamicTest.dynamicTest;

/**
Expand Down Expand Up @@ -68,59 +55,6 @@ Stream<DynamicTest> streaming_batching_collectors() {
).flatMap(i -> i);
}

@Test
void shouldCollectInCompletionOrder() {
// given
try (var executor = threadPoolExecutor(4)) {
List<Integer> result = of(350, 200, 0, 400)
.collect(parallelToStream(i -> returnWithDelay(i, ofMillis(i)), executor, 4))
.limit(2)
.toList();

assertThat(result).isSorted();
}
}

@Test
void shouldCollectEagerlyInCompletionOrder() {
// given
var executor = threadPoolExecutor(4);
AtomicBoolean result = new AtomicBoolean(false);
CompletableFuture.runAsync(() -> {
of(1, 10000, 1, 0)
.collect(parallelToStream(i -> returnWithDelay(i, ofMillis(i)), executor, 2))
.forEach(i -> {
if (i == 1) {
result.set(true);
}
});
});

await()
.atMost(1, SECONDS)
.until(result::get);
}

@Test
void shouldExecuteEagerlyOnProvidedThreadPool() {
try (var executor = Executors.newFixedThreadPool(2)) {
var countingExecutor = new CountingExecutor(executor);
var executions = new AtomicInteger();
var list = List.of("A", "B");

list.stream()
.collect(parallel(s -> {
executions.incrementAndGet();
return s;
}, countingExecutor, 1))
.join()
.forEach(__ -> {});

assertThat(countingExecutor.getInvocations()).isEqualTo(1);
assertThat(executions.get()).isEqualTo(2);
}
}

private static <R extends Collection<Integer>> Stream<DynamicTest> batchTests(CollectorSupplier<Function<Integer, Integer>, Executor, Integer, Collector<Integer, ?, CompletableFuture<R>>> collector, String name) {
return of(shouldProcessOnNThreadsETParallelism(collector, name));
}
Expand Down Expand Up @@ -158,35 +92,8 @@ private static <R extends Collection<Integer>> DynamicTest shouldProcessOnNThrea
.supplyAsync(stream::toList, Executors.newSingleThreadExecutor()));
}

private static ThreadPoolExecutor threadPoolExecutor(int unitsOfWork) {
return new ThreadPoolExecutor(unitsOfWork, unitsOfWork,
0L, MILLISECONDS,
new LinkedBlockingQueue<>());
}

@FunctionalInterface
interface CollectorSupplier<T1, T2, T3, R> {
R apply(T1 t1, T2 t2, T3 t3);
}

private static class CountingExecutor implements Executor {

private final AtomicInteger counter = new AtomicInteger();

private final Executor delegate;

public CountingExecutor(Executor delegate) {
this.delegate = delegate;
}

@Override
public void execute(Runnable command) {
counter.incrementAndGet();
delegate.execute(command);
}

public Integer getInvocations() {
return counter.get();
}
}
}
45 changes: 45 additions & 0 deletions src/test/java/com/pivovarit/collectors/test/StreamingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.junit.jupiter.api.TestFactory;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -15,7 +16,10 @@

import static com.pivovarit.collectors.TestUtils.returnWithDelay;
import static com.pivovarit.collectors.test.StreamingTest.CollectorDefinition.collector;
import static java.time.Duration.ofMillis;
import static java.time.Duration.ofSeconds;
import static java.util.stream.Stream.of;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

class StreamingTest {
Expand All @@ -31,6 +35,22 @@ private static Stream<CollectorDefinition<Integer, Integer>> allStreaming() {
);
}

private static Stream<CollectorDefinition<Integer, Integer>> allCompletionOrderStreaming() {
return Stream.of(
collector("parallelToStream()", (f) -> ParallelCollectors.parallelToStream(f)),
collector("parallelToStream(e)", (f) -> ParallelCollectors.parallelToStream(f, e())),
collector("parallelToStream(e, p)", (f) -> ParallelCollectors.parallelToStream(f, e(), p()))
);
}

private static Stream<CollectorDefinition<Integer, Integer>> allOrderedStreaming() {
return Stream.of(
collector("parallelToOrderedStream()", (f) -> ParallelCollectors.parallelToOrderedStream(f)),
collector("parallelToOrderedStream(e)", (f) -> ParallelCollectors.parallelToOrderedStream(f, e())),
collector("parallelToOrderedStream(e, p)", (f) -> ParallelCollectors.parallelToOrderedStream(f, e(), p()))
);
}

@TestFactory
Stream<DynamicTest> shouldPushElementsAsSoonAsTheyAreReady() {
return allStreaming()
Expand All @@ -49,6 +69,31 @@ Stream<DynamicTest> shouldPushElementsAsSoonAsTheyAreReady() {
}));
}

@TestFactory
Stream<DynamicTest> shouldCollectInCompletionOrder() {
return allCompletionOrderStreaming()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
var result = of(300, 200, 0, 400)
.collect(c.factory().collector(i -> returnWithDelay(i, ofMillis(i))))
.toList();

assertThat(result).isSorted();
}));
}

@TestFactory
Stream<DynamicTest> shouldCollectInOriginalOrder() {
return allOrderedStreaming()
.map(c -> DynamicTest.dynamicTest(c.name(), () -> {
var source = List.of(300, 200, 0, 400);
var result = source.stream()
.collect(c.factory().collector(i -> returnWithDelay(i, ofMillis(i))))
.toList();

assertThat(result).containsExactlyElementsOf(source);
}));
}

protected record CollectorDefinition<T, R>(String name, CollectorFactory<T, R> factory) {
static <T, R> CollectorDefinition<T, R> collector(String name, CollectorFactory<T, R> collector) {
return new CollectorDefinition<>(name, collector);
Expand Down

0 comments on commit ec1f1ae

Please sign in to comment.