From 6731d8b06d99f727b264b5c48bcb237b9d2a5220 Mon Sep 17 00:00:00 2001 From: Lucas Saldanha Date: Thu, 17 Apr 2025 13:22:14 +1200 Subject: [PATCH] New infra stream module --- infrastructure/async/build.gradle | 1 + .../AbstractDelegatingStreamHandler.java | 33 ++++ .../async/stream/AsyncIterator.java | 29 +++ .../async/stream/AsyncIteratorCollector.java | 50 +++++ .../async/stream/AsyncQueue.java | 28 +++ .../async/stream/AsyncStream.java | 143 ++++++++++++++ .../async/stream/AsyncStreamBase.java | 21 +++ .../async/stream/AsyncStreamHandler.java | 45 +++++ .../async/stream/AsyncStreamPublisher.java | 16 ++ .../async/stream/AsyncStreamSlicer.java | 73 ++++++++ .../async/stream/AsyncStreamTransformer.java | 19 ++ .../async/stream/AsyncStreamVisitor.java | 23 +++ .../stream/BufferingStreamPublisher.java | 109 +++++++++++ .../async/stream/CircularBuf.java | 45 +++++ .../async/stream/FilteringStreamHandler.java | 37 ++++ .../async/stream/FlattenStreamHandler.java | 55 ++++++ .../async/stream/FutureAsyncIteratorImpl.java | 33 ++++ .../async/stream/LimitedAsyncQueue.java | 68 +++++++ .../async/stream/MapStreamHandler.java | 37 ++++ .../async/stream/SliceStreamHandler.java | 44 +++++ .../async/stream/SyncToAsyncIteratorImpl.java | 71 +++++++ .../async/stream/TransformAsyncIterator.java | 31 +++ .../infrastructure/async/stream/Util.java | 24 +++ .../async/stream/VisitorHandler.java | 43 +++++ .../stream/AsyncStreamPublisherTest.java | 169 +++++++++++++++++ .../async/stream/AsyncStreamTest.java | 176 ++++++++++++++++++ 26 files changed, 1423 insertions(+) create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AbstractDelegatingStreamHandler.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncIterator.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncIteratorCollector.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncQueue.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStream.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamBase.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamHandler.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamPublisher.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamSlicer.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamTransformer.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamVisitor.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/BufferingStreamPublisher.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/CircularBuf.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/FilteringStreamHandler.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/FlattenStreamHandler.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/FutureAsyncIteratorImpl.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/LimitedAsyncQueue.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/MapStreamHandler.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/SliceStreamHandler.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/SyncToAsyncIteratorImpl.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/TransformAsyncIterator.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/Util.java create mode 100644 infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/VisitorHandler.java create mode 100644 infrastructure/async/src/test/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamPublisherTest.java create mode 100644 infrastructure/async/src/test/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamTest.java diff --git a/infrastructure/async/build.gradle b/infrastructure/async/build.gradle index fdf3076d418..5788fd1815c 100644 --- a/infrastructure/async/build.gradle +++ b/infrastructure/async/build.gradle @@ -1,6 +1,7 @@ dependencies { implementation project(":infrastructure:metrics") implementation project(":infrastructure:time") + implementation project(":infrastructure:exceptions") implementation 'com.google.guava:guava' testImplementation testFixtures(project(":infrastructure:metrics")) diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AbstractDelegatingStreamHandler.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AbstractDelegatingStreamHandler.java new file mode 100644 index 00000000000..c1593b577e1 --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AbstractDelegatingStreamHandler.java @@ -0,0 +1,33 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +abstract class AbstractDelegatingStreamHandler implements AsyncStreamHandler { + + protected final AsyncStreamHandler delegate; + + protected AbstractDelegatingStreamHandler(final AsyncStreamHandler delegate) { + this.delegate = delegate; + } + + @Override + public void onComplete() { + delegate.onComplete(); + } + + @Override + public void onError(final Throwable t) { + delegate.onError(t); + } +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncIterator.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncIterator.java new file mode 100644 index 00000000000..20a8b6fe8ee --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncIterator.java @@ -0,0 +1,29 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +abstract class AsyncIterator implements AsyncStream { + + abstract void iterate(AsyncStreamHandler callback); + + @Override + public AsyncIterator transform(final AsyncStreamTransformer transformer) { + return new TransformAsyncIterator<>(this, transformer); + } + + @Override + public void consume(final AsyncStreamHandler consumer) { + iterate(consumer); + } +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncIteratorCollector.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncIteratorCollector.java new file mode 100644 index 00000000000..f5615451c9a --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncIteratorCollector.java @@ -0,0 +1,50 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +import java.util.stream.Collector; +import tech.pegasys.teku.infrastructure.async.SafeFuture; + +class AsyncIteratorCollector implements AsyncStreamHandler { + private final A accumulator; + private final Collector collector; + + private final SafeFuture promise = new SafeFuture<>(); + + public AsyncIteratorCollector(final Collector collector) { + this.collector = collector; + this.accumulator = collector.supplier().get(); + } + + @Override + public SafeFuture onNext(final T t) { + collector.accumulator().accept(accumulator, t); + return TRUE_FUTURE; + } + + @Override + public void onComplete() { + final R result = collector.finisher().apply(accumulator); + promise.complete(result); + } + + @Override + public void onError(final Throwable t) { + promise.completeExceptionally(t); + } + + public SafeFuture getPromise() { + return promise; + } +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncQueue.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncQueue.java new file mode 100644 index 00000000000..86ff1892918 --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncQueue.java @@ -0,0 +1,28 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +import tech.pegasys.teku.infrastructure.async.SafeFuture; + +/** + * FIFO queue analogous to {@link java.util.concurrent.BlockingQueue} which returns {@link + * SafeFuture} element promise from {@link #take()} instead of blocking when no elements available + * in the queue. + */ +interface AsyncQueue { + + void put(T item); + + SafeFuture take(); +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStream.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStream.java new file mode 100644 index 00000000000..a7881f19956 --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStream.java @@ -0,0 +1,143 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +import static tech.pegasys.teku.infrastructure.async.stream.Util.noCallBinaryOperator; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletionStage; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collector; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import tech.pegasys.teku.infrastructure.async.SafeFuture; + +/** Similar to {@link Stream} but may perform async operations */ +public interface AsyncStream extends AsyncStreamBase { + + static AsyncStream empty() { + return of(); + } + + static AsyncStream exceptional(final Throwable error) { + final AsyncStreamPublisher ret = createPublisher(1); + ret.onError(error); + return ret; + } + + @SafeVarargs + static AsyncStream of(final T... elements) { + return create(List.of(elements).iterator()); + } + + static AsyncStream create(final Stream stream) { + return create(stream.iterator()); + } + + static AsyncStream create(final Iterator iterator) { + return new SyncToAsyncIteratorImpl<>(iterator); + } + + static AsyncStream create(final CompletionStage future) { + return new FutureAsyncIteratorImpl<>(future); + } + + static AsyncStreamPublisher createPublisher(final int maxBufferSize) { + return new BufferingStreamPublisher<>(maxBufferSize); + } + + // transformation + + default AsyncStream flatMap(final Function> toStreamMapper) { + return map(toStreamMapper).transform(FlattenStreamHandler::new); + } + + default AsyncStream map(final Function mapper) { + return transform(sourceCallback -> new MapStreamHandler<>(sourceCallback, mapper)); + } + + default AsyncStream filter(final Predicate filter) { + return transform(sourceCallback -> new FilteringStreamHandler<>(sourceCallback, filter)); + } + + default AsyncStream peek(final AsyncStreamVisitor visitor) { + return transform(src -> new VisitorHandler<>(src, visitor)); + } + + default AsyncStream mapAsync(final Function> mapper) { + return flatMap(e -> AsyncStream.create(mapper.apply(e))); + } + + // slicing + + default AsyncStream slice(final AsyncStreamSlicer slicer) { + return transform(sourceCallback -> new SliceStreamHandler<>(sourceCallback, slicer)); + } + + default AsyncStream limit(final long count) { + return slice(AsyncStreamSlicer.limit(count)); + } + + default AsyncStream takeWhile(final Predicate whileCondition) { + return slice(AsyncStreamSlicer.takeWhile(whileCondition)); + } + + default AsyncStream takeUntil(final Predicate untilCondition, final boolean includeLast) { + AsyncStreamSlicer whileSlicer = AsyncStreamSlicer.takeWhile(untilCondition.negate()); + AsyncStreamSlicer untilSlicer = + includeLast ? whileSlicer.then(AsyncStreamSlicer.limit(1)) : whileSlicer; + return slice(untilSlicer); + } + + // consuming + + default SafeFuture collect(final Collector collector) { + final AsyncIteratorCollector asyncIteratorCollector = + new AsyncIteratorCollector<>(collector); + consume(asyncIteratorCollector); + return asyncIteratorCollector.getPromise(); + } + + default SafeFuture> findFirst() { + return this.limit(1) + .toList() + .thenApply(l -> l.isEmpty() ? Optional.empty() : Optional.of(l.getFirst())); + } + + default SafeFuture forEach(final Consumer consumer) { + return collect(Collector.of(() -> null, (a, t) -> consumer.accept(t), noCallBinaryOperator())); + } + + default > SafeFuture collect(final C targetCollection) { + return collect(Collectors.toCollection(() -> targetCollection)); + } + + default SafeFuture> toList() { + return collect(Collectors.toUnmodifiableList()); + } + + default SafeFuture> findLast() { + return collectLast(1) + .thenApply(l -> l.isEmpty() ? Optional.empty() : Optional.of(l.getFirst())); + } + + default SafeFuture> collectLast(final int count) { + return collect(CircularBuf.createCollector(count)); + } +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamBase.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamBase.java new file mode 100644 index 00000000000..0f2a172aaff --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamBase.java @@ -0,0 +1,21 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +public interface AsyncStreamBase { + + AsyncStream transform(AsyncStreamTransformer transformer); + + void consume(AsyncStreamHandler consumer); +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamHandler.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamHandler.java new file mode 100644 index 00000000000..25a3903673f --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamHandler.java @@ -0,0 +1,45 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +import tech.pegasys.teku.infrastructure.async.SafeFuture; + +/** + * Internal {@link AsyncStream} implementation interface which is analogous to {@link + * java.util.concurrent.Flow.Subscriber} in RX world + */ +public interface AsyncStreamHandler { + + SafeFuture TRUE_FUTURE = SafeFuture.completedFuture(true); + SafeFuture FALSE_FUTURE = SafeFuture.completedFuture(false); + + /** + * Called when next element is available + * + * @return The promise to the upstream handler if the further elements are expected. When the + * future is completed with false the only next call expected is {@link #onComplete()} + */ + SafeFuture onNext(T t); + + /** + * Called when the stream is complete. No other calls are expected after this call + * + *

An implementation MUST NOT call this method prior to {@link SafeFuture} returned + * from the last {@link #onNext(Object)} call is completed. + */ + void onComplete(); + + /** Called when upstream error happened Basically no other calls are expected after this */ + void onError(Throwable t); +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamPublisher.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamPublisher.java new file mode 100644 index 00000000000..1497cf8a2dc --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamPublisher.java @@ -0,0 +1,16 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +public interface AsyncStreamPublisher extends AsyncStreamHandler, AsyncStream {} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamSlicer.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamSlicer.java new file mode 100644 index 00000000000..d0165a10e7f --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamSlicer.java @@ -0,0 +1,73 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +import static tech.pegasys.teku.infrastructure.async.stream.AsyncStreamSlicer.SliceResult.CONTINUE; +import static tech.pegasys.teku.infrastructure.async.stream.AsyncStreamSlicer.SliceResult.INCLUDE_AND_STOP; +import static tech.pegasys.teku.infrastructure.async.stream.AsyncStreamSlicer.SliceResult.SKIP_AND_STOP; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; + +public interface AsyncStreamSlicer { + + enum SliceResult { + CONTINUE, + INCLUDE_AND_STOP, + SKIP_AND_STOP + } + + SliceResult slice(final T element); + + static AsyncStreamSlicer limit(final long count) { + return new AsyncStreamSlicer<>() { + private final AtomicLong remainCount = new AtomicLong(count); + + @Override + public SliceResult slice(final T element) { + return remainCount.decrementAndGet() > 0 ? CONTINUE : INCLUDE_AND_STOP; + } + }; + } + + static AsyncStreamSlicer takeWhile(final Predicate condition) { + return t -> condition.test(t) ? CONTINUE : SKIP_AND_STOP; + } + + default AsyncStreamSlicer then(final AsyncStreamSlicer nextSlicer) { + return new AsyncStreamSlicer<>() { + private boolean thisSlicerCompleted = false; + + @Override + public SliceResult slice(final T element) { + if (thisSlicerCompleted) { + return nextSlicer.slice(element); + } else { + final SliceResult result = AsyncStreamSlicer.this.slice(element); + return switch (result) { + case CONTINUE -> result; + case SKIP_AND_STOP -> { + thisSlicerCompleted = true; + yield nextSlicer.slice(element); + } + case INCLUDE_AND_STOP -> { + thisSlicerCompleted = true; + yield CONTINUE; + } + }; + } + } + }; + } +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamTransformer.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamTransformer.java new file mode 100644 index 00000000000..1b0f747e1a6 --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamTransformer.java @@ -0,0 +1,19 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +public interface AsyncStreamTransformer { + + AsyncStreamHandler process(AsyncStreamHandler downstreamHandler); +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamVisitor.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamVisitor.java new file mode 100644 index 00000000000..621a5252249 --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamVisitor.java @@ -0,0 +1,23 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +public interface AsyncStreamVisitor { + + void onNext(T t); + + void onComplete(); + + void onError(Throwable t); +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/BufferingStreamPublisher.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/BufferingStreamPublisher.java new file mode 100644 index 00000000000..ede7d20600b --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/BufferingStreamPublisher.java @@ -0,0 +1,109 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +import tech.pegasys.teku.infrastructure.async.SafeFuture; + +class BufferingStreamPublisher extends AsyncIterator implements AsyncStreamPublisher { + + private final AsyncQueue> eventQueue; + + private boolean isDone = false; + + BufferingStreamPublisher(final int maxBufferSize) { + this.eventQueue = new LimitedAsyncQueue<>(maxBufferSize); + } + + sealed interface Event { + boolean isTerminal(); + } + + record ItemEvent(T item, SafeFuture nextReturn) implements Event { + @Override + public boolean isTerminal() { + return false; + } + } + + record CompleteEvent() implements Event { + @Override + public boolean isTerminal() { + return true; + } + } + + record ErrorEvent(Throwable error) implements Event { + @Override + public boolean isTerminal() { + return true; + } + } + + private synchronized void putNext(final Event event) { + if (isDone) { + throw new IllegalStateException("Stream has been done already"); + } + isDone = event.isTerminal(); + eventQueue.put(event); + } + + private SafeFuture> takeNext() { + return eventQueue.take(); + } + + @Override + void iterate(final AsyncStreamHandler delegate) { + SafeFuture.asyncDoWhile( + () -> + takeNext() + .thenCompose( + event -> + switch (event) { + case ItemEvent item -> { + delegate.onNext(item.item()).propagateTo(item.nextReturn()); + yield item.nextReturn(); + } + case CompleteEvent ignored -> { + delegate.onComplete(); + yield FALSE_FUTURE; + } + case ErrorEvent errorEvent -> { + delegate.onError(errorEvent.error()); + yield FALSE_FUTURE; + } + })) + .finish(delegate::onError); + } + + @Override + public SafeFuture onNext(final T t) { + SafeFuture ret = new SafeFuture<>(); + try { + putNext(new ItemEvent<>(t, ret)); + } catch (Exception e) { + ret.completeExceptionally(e); + } + return ret; + } + + @Override + public void onComplete() { + putNext(new CompleteEvent<>()); + } + + @Override + public synchronized void onError(final Throwable t) { + putNext(new ErrorEvent<>(t)); + } +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/CircularBuf.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/CircularBuf.java new file mode 100644 index 00000000000..7a36fbe77f8 --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/CircularBuf.java @@ -0,0 +1,45 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +import static tech.pegasys.teku.infrastructure.async.stream.Util.noCallBinaryOperator; + +import java.util.ArrayDeque; +import java.util.List; +import java.util.stream.Collector; + +class CircularBuf { + final ArrayDeque buf; + final int maxSize; + + public CircularBuf(final int maxSize) { + buf = new ArrayDeque<>(maxSize); + this.maxSize = maxSize; + } + + public void add(final C t) { + if (buf.size() == maxSize) { + buf.removeFirst(); + } + buf.add(t); + } + + public static Collector> createCollector(final int count) { + return Collector., List>of( + () -> new CircularBuf(count), + CircularBuf::add, + noCallBinaryOperator(), + buf -> buf.buf.stream().toList()); + } +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/FilteringStreamHandler.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/FilteringStreamHandler.java new file mode 100644 index 00000000000..f667f05cd3d --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/FilteringStreamHandler.java @@ -0,0 +1,37 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +import java.util.function.Predicate; +import tech.pegasys.teku.infrastructure.async.SafeFuture; + +class FilteringStreamHandler extends AbstractDelegatingStreamHandler { + + private final Predicate filter; + + protected FilteringStreamHandler( + final AsyncStreamHandler delegate, final Predicate filter) { + super(delegate); + this.filter = filter; + } + + @Override + public SafeFuture onNext(final T t) { + if (filter.test(t)) { + return delegate.onNext(t); + } else { + return TRUE_FUTURE; + } + } +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/FlattenStreamHandler.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/FlattenStreamHandler.java new file mode 100644 index 00000000000..77c4425645b --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/FlattenStreamHandler.java @@ -0,0 +1,55 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +import tech.pegasys.teku.infrastructure.async.SafeFuture; + +class FlattenStreamHandler, T> + extends AbstractDelegatingStreamHandler { + + protected FlattenStreamHandler(final AsyncStreamHandler delegate) { + super(delegate); + } + + @Override + public SafeFuture onNext(final TCol asyncIterator) { + final SafeFuture ret = new SafeFuture<>(); + asyncIterator.consume( + new AsyncStreamHandler() { + @Override + public SafeFuture onNext(final T t) { + SafeFuture proceedFuture = delegate.onNext(t); + + return proceedFuture.thenPeek( + proceed -> { + if (!proceed) { + ret.complete(false); + } + }); + } + + @Override + public void onComplete() { + ret.complete(true); + } + + @Override + public void onError(final Throwable t) { + ret.complete(false); + delegate.onError(t); + } + }); + return ret; + } +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/FutureAsyncIteratorImpl.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/FutureAsyncIteratorImpl.java new file mode 100644 index 00000000000..cef99249154 --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/FutureAsyncIteratorImpl.java @@ -0,0 +1,33 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +import java.util.concurrent.CompletionStage; +import tech.pegasys.teku.infrastructure.async.SafeFuture; + +class FutureAsyncIteratorImpl extends AsyncIterator { + + private final SafeFuture future; + + FutureAsyncIteratorImpl(final CompletionStage future) { + this.future = SafeFuture.of(future); + } + + @Override + public void iterate(final AsyncStreamHandler callback) { + future.finish( + succ -> callback.onNext(succ).finish(__ -> callback.onComplete(), callback::onError), + callback::onError); + } +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/LimitedAsyncQueue.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/LimitedAsyncQueue.java new file mode 100644 index 00000000000..b3a558bfae3 --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/LimitedAsyncQueue.java @@ -0,0 +1,68 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import tech.pegasys.teku.infrastructure.async.SafeFuture; + +class LimitedAsyncQueue implements AsyncQueue { + + private final int maxSize; + + private final Queue items = new ArrayDeque<>(); + private final Queue> takers = new ArrayDeque<>(); + + public LimitedAsyncQueue(final int maxSize) { + this.maxSize = maxSize; + } + + // Adds an item to the queue + @Override + public void put(final T item) { + final CompletableFuture maybeTaker; + synchronized (this) { + if (!takers.isEmpty()) { + // If there are pending takers, complete one with the item + maybeTaker = takers.poll(); + } else { + // Otherwise, add the item to the items queue + if (items.size() >= maxSize) { + throw new IllegalStateException("Buffer size overflow: " + maxSize); + } + items.offer(item); + maybeTaker = null; + } + } + if (maybeTaker != null) { + maybeTaker.complete(item); + } + } + + // Returns a CompletableFuture that will be completed when an item is available + @Override + public synchronized SafeFuture take() { + if (!items.isEmpty()) { + // If items are available, return a completed future + final T item = items.poll(); + return SafeFuture.completedFuture(item); + } else { + // If no items, create a new CompletableFuture and add it to takers + final SafeFuture future = new SafeFuture<>(); + takers.offer(future); + return future; + } + } +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/MapStreamHandler.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/MapStreamHandler.java new file mode 100644 index 00000000000..bc22ec0779d --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/MapStreamHandler.java @@ -0,0 +1,37 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +import java.util.function.Function; +import tech.pegasys.teku.infrastructure.async.SafeFuture; + +class MapStreamHandler extends AbstractDelegatingStreamHandler { + + private final Function mapper; + + protected MapStreamHandler(final AsyncStreamHandler delegate, final Function mapper) { + super(delegate); + this.mapper = mapper; + } + + @Override + public SafeFuture onNext(final S s) { + try { + return delegate.onNext(mapper.apply(s)); + } catch (Exception e) { + delegate.onError(e); + return FALSE_FUTURE; + } + } +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/SliceStreamHandler.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/SliceStreamHandler.java new file mode 100644 index 00000000000..e8f39ca4f03 --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/SliceStreamHandler.java @@ -0,0 +1,44 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +import tech.pegasys.teku.infrastructure.async.SafeFuture; + +class SliceStreamHandler extends AbstractDelegatingStreamHandler { + + private final AsyncStreamSlicer slicer; + + protected SliceStreamHandler( + final AsyncStreamHandler delegate, final AsyncStreamSlicer slicer) { + super(delegate); + this.slicer = slicer; + } + + @Override + public SafeFuture onNext(final T t) { + AsyncStreamSlicer.SliceResult sliceResult = slicer.slice(t); + return switch (sliceResult) { + case CONTINUE -> delegate.onNext(t); + case SKIP_AND_STOP -> { + delegate.onComplete(); + yield FALSE_FUTURE; + } + case INCLUDE_AND_STOP -> { + SafeFuture ret = delegate.onNext(t).thenApply(__ -> false); + delegate.onComplete(); + yield ret; + } + }; + } +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/SyncToAsyncIteratorImpl.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/SyncToAsyncIteratorImpl.java new file mode 100644 index 00000000000..5abdb09cab5 --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/SyncToAsyncIteratorImpl.java @@ -0,0 +1,71 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +import java.util.Iterator; +import tech.pegasys.teku.infrastructure.async.SafeFuture; + +class SyncToAsyncIteratorImpl extends AsyncIterator { + + private final Iterator iterator; + private AsyncStreamHandler callback; + + SyncToAsyncIteratorImpl(final Iterator iterator) { + this.iterator = iterator; + } + + @Override + public void iterate(final AsyncStreamHandler callback) { + synchronized (this) { + if (this.callback != null) { + throw new IllegalStateException("This one-shot iterator has been used already"); + } + this.callback = callback; + } + next(); + } + + private void next() { + try { + while (true) { + if (!iterator.hasNext()) { + callback.onComplete(); + break; + } + T next = iterator.next(); + SafeFuture shouldContinueFut = callback.onNext(next); + if (shouldContinueFut.isCompletedNormally()) { + Boolean shouldContinue = shouldContinueFut.getImmediately(); + if (!shouldContinue) { + callback.onComplete(); + break; + } + } else { + shouldContinueFut.finish(this::onNextComplete, err -> callback.onError(err)); + break; + } + } + } catch (Throwable e) { + callback.onError(e); + } + } + + private void onNextComplete(final boolean shouldContinue) { + if (shouldContinue) { + next(); + } else { + callback.onComplete(); + } + } +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/TransformAsyncIterator.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/TransformAsyncIterator.java new file mode 100644 index 00000000000..68bdcdcdfc0 --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/TransformAsyncIterator.java @@ -0,0 +1,31 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +class TransformAsyncIterator extends AsyncIterator { + private final AsyncIterator delegateIterator; + private final AsyncStreamTransformer streamTransformer; + + public TransformAsyncIterator( + final AsyncIterator delegateIterator, + final AsyncStreamTransformer streamTransformer) { + this.delegateIterator = delegateIterator; + this.streamTransformer = streamTransformer; + } + + @Override + public void iterate(final AsyncStreamHandler callback) { + delegateIterator.iterate(streamTransformer.process(callback)); + } +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/Util.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/Util.java new file mode 100644 index 00000000000..d747448b1ee --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/Util.java @@ -0,0 +1,24 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +import java.util.function.BinaryOperator; + +class Util { + static BinaryOperator noCallBinaryOperator() { + return (c, c2) -> { + throw new UnsupportedOperationException("Shouldn't be called"); + }; + } +} diff --git a/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/VisitorHandler.java b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/VisitorHandler.java new file mode 100644 index 00000000000..6e9e9a89f2b --- /dev/null +++ b/infrastructure/async/src/main/java/tech/pegasys/teku/infrastructure/async/stream/VisitorHandler.java @@ -0,0 +1,43 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +import tech.pegasys.teku.infrastructure.async.SafeFuture; + +class VisitorHandler extends AbstractDelegatingStreamHandler { + private final AsyncStreamVisitor visitor; + + public VisitorHandler(final AsyncStreamHandler delegate, final AsyncStreamVisitor visitor) { + super(delegate); + this.visitor = visitor; + } + + @Override + public void onComplete() { + visitor.onComplete(); + delegate.onComplete(); + } + + @Override + public void onError(final Throwable t) { + visitor.onError(t); + delegate.onError(t); + } + + @Override + public SafeFuture onNext(final T t) { + visitor.onNext(t); + return delegate.onNext(t); + } +} diff --git a/infrastructure/async/src/test/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamPublisherTest.java b/infrastructure/async/src/test/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamPublisherTest.java new file mode 100644 index 00000000000..d723fb8a8bd --- /dev/null +++ b/infrastructure/async/src/test/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamPublisherTest.java @@ -0,0 +1,169 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.infrastructure.async.SafeFuture; + +@SuppressWarnings("FutureReturnValueIgnored") +public class AsyncStreamPublisherTest { + + AsyncStreamPublisher publisher = AsyncStream.createPublisher(Integer.MAX_VALUE); + AsyncStream stream = + publisher + .flatMap(i -> AsyncStream.create(IntStream.range(i * 10, i * 10 + 5).boxed().iterator())) + .filter(i -> i % 2 == 0) + .map(i -> i * 10) + .limit(10); + + // List expectedValues = List.of(0, 20, 40, 100, 120, 140, 200, 220, 240, 300); + + @Test + void sanityTest() { + List collector = new ArrayList<>(); + SafeFuture> listPromise = stream.collect(collector); + + assertThat(collector).isEmpty(); + + { + SafeFuture f = publisher.onNext(0); + assertThat(f).isCompletedWithValue(true); + } + assertThat(collector).containsExactly(0, 20, 40); + + { + SafeFuture f = publisher.onNext(1); + assertThat(f).isCompletedWithValue(true); + } + assertThat(collector).containsExactly(0, 20, 40, 100, 120, 140); + + { + SafeFuture f = publisher.onNext(2); + assertThat(f).isCompletedWithValue(true); + } + assertThat(collector).containsExactly(0, 20, 40, 100, 120, 140, 200, 220, 240); + assertThat(listPromise).isNotDone(); + + { + SafeFuture f = publisher.onNext(3); + assertThat(f).isCompletedWithValue(false); + } + // limit(10) kicks in + assertThat(collector).containsExactly(0, 20, 40, 100, 120, 140, 200, 220, 240, 300); + assertThat(listPromise) + .isCompletedWithValue(List.of(0, 20, 40, 100, 120, 140, 200, 220, 240, 300)); + } + + @Test + void completeShouldCompleteStream() { + SafeFuture> listPromise = stream.toList(); + publisher.onNext(0); + publisher.onComplete(); + assertThat(listPromise).isCompletedWithValue(List.of(0, 20, 40)); + } + + @Test + void errorShouldCompleteStream() { + SafeFuture> listPromise = stream.toList(); + publisher.onNext(0); + publisher.onError(new RuntimeException("test")); + assertThat(listPromise).isCompletedExceptionally(); + } + + @Test + void publishingAllPriorToConsumeShouldWork() { + publisher.onNext(0); + publisher.onNext(1); + publisher.onComplete(); + + assertThat(stream.toList()).isCompletedWithValue(List.of(0, 20, 40, 100, 120, 140)); + } + + @Test + void publishingPartiallyPriorToConsumeShouldWork() { + publisher.onNext(0); + SafeFuture> list = stream.toList(); + publisher.onNext(1); + publisher.onComplete(); + + assertThat(list).isCompletedWithValue(List.of(0, 20, 40, 100, 120, 140)); + } + + @Test + void issuingErrorPriorToConsumeShouldWork() { + publisher.onNext(0); + publisher.onError(new RuntimeException("test")); + + assertThat(stream.toList()).isCompletedExceptionally(); + } + + @Test + void shouldIgnoreAnyItemsAfterOnComplete() { + publisher.onNext(0); + publisher.onComplete(); + publisher.onNext(1); + + assertThat(stream.toList()).isCompletedWithValue(List.of(0, 20, 40)); + } + + @Test + void sanityThreadSafetyTest() throws Exception { + AsyncStreamPublisher myPublisher = AsyncStream.createPublisher(Integer.MAX_VALUE); + AsyncStream stream = + myPublisher + .map(i -> i * 10) + .flatMap(i -> AsyncStream.of(i / 10, 777777)) + .filter(i -> i != 777777); + + int threadCount = 16; + CountDownLatch startLatch = new CountDownLatch(threadCount); + CountDownLatch finishLatch = new CountDownLatch(threadCount); + for (int i = 0; i < threadCount; i++) { + int finalI = i; + new Thread( + () -> { + startLatch.countDown(); + try { + startLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + for (int j = 0; j < 1000; j++) { + myPublisher.onNext(finalI * 1000 + j); + } + finishLatch.countDown(); + }) + .start(); + } + SafeFuture> listPromise = stream.toList(); + + boolean rc = finishLatch.await(5, TimeUnit.SECONDS); + assertThat(rc).isTrue(); + + myPublisher.onComplete(); + + List list = listPromise.get(5, TimeUnit.SECONDS); + + assertThat(list) + .containsExactlyInAnyOrderElementsOf( + IntStream.range(0, threadCount * 1000).boxed().toList()); + } +} diff --git a/infrastructure/async/src/test/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamTest.java b/infrastructure/async/src/test/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamTest.java new file mode 100644 index 00000000000..5e9f6606026 --- /dev/null +++ b/infrastructure/async/src/test/java/tech/pegasys/teku/infrastructure/async/stream/AsyncStreamTest.java @@ -0,0 +1,176 @@ +/* + * Copyright Consensys Software Inc., 2024 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.infrastructure.async.stream; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.infrastructure.async.SafeFuture; + +public class AsyncStreamTest { + + @Test + void sanityTest() { + List> futures = + Stream.generate(() -> new SafeFuture()).limit(5).toList(); + + ArrayList collector = new ArrayList<>(); + + SafeFuture> listPromise = + AsyncStream.create(futures.iterator()) + .flatMap(AsyncStream::create) + .flatMap( + i -> AsyncStream.create(IntStream.range(i * 10, i * 10 + 5).boxed().iterator())) + .filter(i -> i % 2 == 0) + .map(i -> i * 10) + .limit(10) + .collect(collector); + + assertThat(collector).isEmpty(); + + futures.get(1).complete(1); + + assertThat(collector).isEmpty(); + + futures.get(0).complete(0); + + assertThat(collector).containsExactly(0, 20, 40, 100, 120, 140); + assertThat(listPromise).isNotDone(); + + // even if future is completed exceptionally it never reaches downstream thus shouldn't affect + // the final result + futures.get(4).completeExceptionally(new RuntimeException("test")); + + assertThat(listPromise).isNotDone(); + + futures.get(2).complete(2); + + assertThat(collector).containsExactly(0, 20, 40, 100, 120, 140, 200, 220, 240); + assertThat(listPromise).isNotDone(); + + futures.get(3).complete(3); + + // limit(10) kicks in + assertThat(collector).containsExactly(0, 20, 40, 100, 120, 140, 200, 220, 240, 300); + assertThat(listPromise) + .isCompletedWithValue(List.of(0, 20, 40, 100, 120, 140, 200, 220, 240, 300)); + } + + @Test + void mapAsyncTest() throws Exception { + int listSize = 100; + SafeFuture launchFuture = new SafeFuture<>(); + List> futures = + Stream.generate(() -> new SafeFuture()).limit(listSize).toList(); + + SafeFuture> listFuture = + AsyncStream.create(launchFuture) + .flatMap( + __ -> { + Stream idxStream = IntStream.range(0, futures.size()).boxed(); + return AsyncStream.create(idxStream).mapAsync(futures::get); + }) + .toList(); + + launchFuture.complete(null); + for (int i = 0; i < futures.size(); i++) { + futures.get(i).complete(i); + } + + assertThat(listFuture.get(1, TimeUnit.SECONDS)) + .containsExactlyElementsOf(IntStream.range(0, listSize).boxed().toList()); + } + + @Test + void longStreamOfCompletedFuturesShouldNotCauseStackOverflow() { + List ints = + AsyncStream.create(IntStream.range(0, 10000).boxed().iterator()) + .mapAsync(SafeFuture::completedFuture) + .toList() + .join(); + + assertThat(ints).hasSize(10000); + } + + @Test + void longStreamOfFlatMapShouldNotCauseStackOverflow() { + List ints = + AsyncStream.create(IntStream.range(0, 10000).boxed().iterator()) + .flatMap(AsyncStream::of) + .toList() + .join(); + + assertThat(ints).hasSize(10000); + } + + @Test + void checkTheOrderIsPreserved() { + SafeFuture fut0 = new SafeFuture<>(); + SafeFuture fut1 = new SafeFuture<>(); + + SafeFuture> listFut = AsyncStream.of(fut0, fut1).mapAsync(f -> f).toList(); + + fut1.complete(1); + fut0.complete(0); + + assertThat(listFut).isCompletedWithValue(List.of(0, 1)); + } + + @Test + void checkUntilAndCollectLast() { + List> futures = + Stream.generate(() -> new SafeFuture()).limit(10).toList(); + + SafeFuture> resFuture = + AsyncStream.create(futures.iterator()) + .mapAsync(fut -> fut) + .takeUntil(i -> i == 4, true) + .collectLast(2); + + for (int i = 0; i < 4; i++) { + futures.get(i).complete(i); + } + + assertThat(resFuture).isNotDone(); + + futures.get(4).complete(4); + + assertThat(resFuture).isCompletedWithValue(List.of(3, 4)); + } + + @Test + void checkUntilEmpty() { + assertThat(AsyncStream.of(0, 1, 2).takeUntil(i -> i == 0, false).findFirst().join()).isEmpty(); + assertThat(AsyncStream.of(0, 1, 2).takeUntil(i -> i == 100, false).collectLast(1).join()) + .containsExactly(2); + assertThat(AsyncStream.of(0, 1, 2).takeUntil(i -> i == 100, true).collectLast(1).join()) + .containsExactly(2); + } + + @Test + void checkUntilFirst() { + assertThat(AsyncStream.of(0, 1, 2).takeUntil(i -> i == 0, true).toList().join()) + .containsExactly(0); + } + + @Test + void checkCollectLastWithLessElements() { + assertThat(AsyncStream.of(0, 1).collectLast(3).join()).containsExactly(0, 1); + } +}