Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions infrastructure/async/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
dependencies {
implementation project(":infrastructure:metrics")
implementation project(":infrastructure:time")
implementation project(":infrastructure:exceptions")
implementation 'com.google.guava:guava'

testImplementation testFixtures(project(":infrastructure:metrics"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.infrastructure.async.stream;

abstract class AbstractDelegatingStreamHandler<S, T> implements AsyncStreamHandler<T> {

protected final AsyncStreamHandler<S> delegate;

protected AbstractDelegatingStreamHandler(final AsyncStreamHandler<S> delegate) {
this.delegate = delegate;
}

@Override
public void onComplete() {
delegate.onComplete();
}

@Override
public void onError(final Throwable t) {
delegate.onError(t);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.infrastructure.async.stream;

abstract class AsyncIterator<T> implements AsyncStream<T> {

abstract void iterate(AsyncStreamHandler<T> callback);

@Override
public <R> AsyncIterator<R> transform(final AsyncStreamTransformer<T, R> transformer) {
return new TransformAsyncIterator<>(this, transformer);
}

@Override
public void consume(final AsyncStreamHandler<T> consumer) {
iterate(consumer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.infrastructure.async.stream;

import java.util.stream.Collector;
import tech.pegasys.teku.infrastructure.async.SafeFuture;

class AsyncIteratorCollector<T, A, R> implements AsyncStreamHandler<T> {
private final A accumulator;
private final Collector<T, A, R> collector;

private final SafeFuture<R> promise = new SafeFuture<>();

public AsyncIteratorCollector(final Collector<T, A, R> collector) {
this.collector = collector;
this.accumulator = collector.supplier().get();
}

@Override
public SafeFuture<Boolean> onNext(final T t) {
collector.accumulator().accept(accumulator, t);
return TRUE_FUTURE;
}

@Override
public void onComplete() {
final R result = collector.finisher().apply(accumulator);
promise.complete(result);
}

@Override
public void onError(final Throwable t) {
promise.completeExceptionally(t);
}

public SafeFuture<R> getPromise() {
return promise;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.infrastructure.async.stream;

import tech.pegasys.teku.infrastructure.async.SafeFuture;

/**
* FIFO queue analogous to {@link java.util.concurrent.BlockingQueue} which returns {@link
* SafeFuture} element promise from {@link #take()} instead of blocking when no elements available
* in the queue.
*/
interface AsyncQueue<T> {

void put(T item);

SafeFuture<T> take();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.infrastructure.async.stream;

import static tech.pegasys.teku.infrastructure.async.stream.Util.noCallBinaryOperator;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import tech.pegasys.teku.infrastructure.async.SafeFuture;

/** Similar to {@link Stream} but may perform async operations */
public interface AsyncStream<T> extends AsyncStreamBase<T> {

static <T> AsyncStream<T> empty() {
return of();
}

static <T> AsyncStream<T> exceptional(final Throwable error) {
final AsyncStreamPublisher<T> ret = createPublisher(1);
ret.onError(error);
return ret;
}

@SafeVarargs
static <T> AsyncStream<T> of(final T... elements) {
return create(List.of(elements).iterator());
}

static <T> AsyncStream<T> create(final Stream<T> stream) {
return create(stream.iterator());
}

static <T> AsyncStream<T> create(final Iterator<T> iterator) {
return new SyncToAsyncIteratorImpl<>(iterator);
}

static <T> AsyncStream<T> create(final CompletionStage<T> future) {
return new FutureAsyncIteratorImpl<>(future);
}

static <T> AsyncStreamPublisher<T> createPublisher(final int maxBufferSize) {
return new BufferingStreamPublisher<>(maxBufferSize);
}

// transformation

default <R> AsyncStream<R> flatMap(final Function<T, AsyncStream<R>> toStreamMapper) {
return map(toStreamMapper).transform(FlattenStreamHandler::new);
}

default <R> AsyncStream<R> map(final Function<T, R> mapper) {
return transform(sourceCallback -> new MapStreamHandler<>(sourceCallback, mapper));
}

default AsyncStream<T> filter(final Predicate<T> filter) {
return transform(sourceCallback -> new FilteringStreamHandler<>(sourceCallback, filter));
}

default AsyncStream<T> peek(final AsyncStreamVisitor<T> visitor) {
return transform(src -> new VisitorHandler<>(src, visitor));
}

default <R> AsyncStream<R> mapAsync(final Function<T, SafeFuture<R>> mapper) {
return flatMap(e -> AsyncStream.create(mapper.apply(e)));
}

// slicing

default AsyncStream<T> slice(final AsyncStreamSlicer<T> slicer) {
return transform(sourceCallback -> new SliceStreamHandler<>(sourceCallback, slicer));
}

default AsyncStream<T> limit(final long count) {
return slice(AsyncStreamSlicer.limit(count));
}

default AsyncStream<T> takeWhile(final Predicate<T> whileCondition) {
return slice(AsyncStreamSlicer.takeWhile(whileCondition));
}

default AsyncStream<T> takeUntil(final Predicate<T> untilCondition, final boolean includeLast) {
AsyncStreamSlicer<T> whileSlicer = AsyncStreamSlicer.takeWhile(untilCondition.negate());
AsyncStreamSlicer<T> untilSlicer =
includeLast ? whileSlicer.then(AsyncStreamSlicer.limit(1)) : whileSlicer;
return slice(untilSlicer);
}

// consuming

default <A, R> SafeFuture<R> collect(final Collector<T, A, R> collector) {
final AsyncIteratorCollector<T, A, R> asyncIteratorCollector =
new AsyncIteratorCollector<>(collector);
consume(asyncIteratorCollector);
return asyncIteratorCollector.getPromise();
}

default SafeFuture<Optional<T>> findFirst() {
return this.limit(1)
.toList()
.thenApply(l -> l.isEmpty() ? Optional.empty() : Optional.of(l.getFirst()));
}

default SafeFuture<Void> forEach(final Consumer<T> consumer) {
return collect(Collector.of(() -> null, (a, t) -> consumer.accept(t), noCallBinaryOperator()));
}

default <C extends Collection<T>> SafeFuture<C> collect(final C targetCollection) {
return collect(Collectors.toCollection(() -> targetCollection));
}

default SafeFuture<List<T>> toList() {
return collect(Collectors.toUnmodifiableList());
}

default SafeFuture<Optional<T>> findLast() {
return collectLast(1)
.thenApply(l -> l.isEmpty() ? Optional.empty() : Optional.of(l.getFirst()));
}

default SafeFuture<List<T>> collectLast(final int count) {
return collect(CircularBuf.createCollector(count));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.infrastructure.async.stream;

public interface AsyncStreamBase<T> {

<R> AsyncStream<R> transform(AsyncStreamTransformer<T, R> transformer);

void consume(AsyncStreamHandler<T> consumer);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.infrastructure.async.stream;

import tech.pegasys.teku.infrastructure.async.SafeFuture;

/**
* Internal {@link AsyncStream} implementation interface which is analogous to {@link
* java.util.concurrent.Flow.Subscriber} in RX world
*/
public interface AsyncStreamHandler<T> {

SafeFuture<Boolean> TRUE_FUTURE = SafeFuture.completedFuture(true);
SafeFuture<Boolean> FALSE_FUTURE = SafeFuture.completedFuture(false);

/**
* Called when next element is available
*
* @return The promise to the upstream handler if the further elements are expected. When the
* future is completed with false the only next call expected is {@link #onComplete()}
*/
SafeFuture<Boolean> onNext(T t);

/**
* Called when the stream is complete. No other calls are expected after this call
*
* <p>An implementation MUST NOT call this method prior to {@link SafeFuture<Boolean>} returned
* from the last {@link #onNext(Object)} call is completed.
*/
void onComplete();

/** Called when upstream error happened Basically no other calls are expected after this */
void onError(Throwable t);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Consensys Software Inc., 2024
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.infrastructure.async.stream;

public interface AsyncStreamPublisher<T> extends AsyncStreamHandler<T>, AsyncStream<T> {}
Loading
Loading