Skip to content

Commit

Permalink
add bufferUntil builder
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten committed Dec 3, 2024
1 parent 756d795 commit 48a4e42
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,67 +8,77 @@
import org.davidmoten.kool.function.BiPredicate;
import org.davidmoten.kool.function.Function;

public final class BufferWhileBuilder<T> {
public final class BufferBuilder<T> {

private final Stream<T> stream;
private boolean isWhile;

// private Callable<? extends S> factory;
// private BiFunction<? super S, ? super T, ? extends S> accumulator, BiPredicate<? super S, ? super T> condition,
// boolean emitRemainder, Function<? super S, Integer> step, int maxReplay

BufferWhileBuilder(Stream<T> stream) {
BufferBuilder(Stream<T> stream, boolean isWhile) {
this.stream = stream;
this.isWhile = isWhile;
}

public <S> BuilderHasFactory<T, S> factory(Callable<? extends S> factory) {
return new BuilderHasFactory<T, S>(stream, factory);
return new BuilderHasFactory<T, S>(stream, factory, isWhile);
}

public BuilderHasFactoryArrayList<T> arrayList() {
return new BuilderHasFactoryArrayList<T>(stream);
return new BuilderHasFactoryArrayList<T>(stream, isWhile);
}

public static final class BuilderHasFactoryArrayList<T> {

private final Stream<T> stream;
private final boolean isWhile;

BuilderHasFactoryArrayList(Stream<T> stream) {
BuilderHasFactoryArrayList(Stream<T> stream, boolean isWhile) {
this.stream = stream;
this.isWhile = isWhile;
}

public BuilderHasAccumulator<T, List<T>> condition(BiPredicate<? super List<T>, ? super T> condition) {
return new BuilderHasFactory<T, List<T>>(stream, ArrayList::new) //
return new BuilderHasFactory<T, List<T>>(stream, ArrayList::new, isWhile) //
.condition(condition) //
.accumulator(listAccumulator());
}

private BiFunction<List<T>, T, List<T>> listAccumulator() {
return (list, x) -> {
list.add(x);
return list;
};
.accumulator((list, x) -> {
list.add(x);
return list;
});
}
}

public static final class BuilderHasFactory<T, S> {

private final Stream<T> stream;
private final Callable<? extends S> factory;
private final boolean isWhile;
private Function<? super S, Integer> step;
private BiFunction<? super S, ? super T, ? extends S> accumulator;
private BiPredicate<? super S, ? super T> condition;
private boolean emitRemainder = true;
private int maxReplay = 1024;

BuilderHasFactory(Stream<T> stream, Callable<? extends S> factory) {
BuilderHasFactory(Stream<T> stream, Callable<? extends S> factory, boolean isWhile) {
this.stream = stream;
this.factory = factory;
this.isWhile = isWhile;
}

public BuilderHasCondition<T, S> condition(BiPredicate<? super S, ? super T> condition) {
this.condition = condition;
return new BuilderHasCondition<T, S>(this);
}

Stream<S> build() {
if (isWhile) {
return stream.bufferWhile(factory, accumulator, condition, emitRemainder, step, maxReplay);
} else {
return stream.bufferUntil(factory, accumulator, condition, emitRemainder, step, maxReplay);
}
}
}

public static final class BuilderHasCondition<T, S> {
Expand Down Expand Up @@ -123,7 +133,7 @@ public BuilderHasStep<T, S> maxReplay(int maxReplay) {
}

public Stream<S> build() {
return b.stream.bufferWhile(b.factory, b.accumulator, b.condition, b.emitRemainder, b.step, b.maxReplay);
return b.build();
}
}

Expand Down
8 changes: 6 additions & 2 deletions kool/src/main/java/org/davidmoten/kool/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -975,8 +975,12 @@ default <S> Stream<S> bufferWhile(Callable<? extends S> factory,
step, maxReplay);
}

default BufferWhileBuilder<T> bufferWhile() {
return new BufferWhileBuilder<T>(this);
default BufferBuilder<T> bufferUntil() {
return new BufferBuilder<T>(this, false);
}

default BufferBuilder<T> bufferWhile() {
return new BufferBuilder<T>(this, true);
}

default Stream<Indexed<T>> mapWithIndex(long startIndex) {
Expand Down
21 changes: 21 additions & 0 deletions kool/src/test/java/org/davidmoten/kool/StreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,27 @@ public void testBufferUntilWithStepSmallerThanBufferedList() {
Lists.newArrayList(6, 7), //
Lists.newArrayList(7));
}

@Test
public void testBufferUntilWithStepSmallerThanBufferedListUseBuilder() {
Stream.of(1, 2, 3, 4, 5, 6, 7) //
.bufferUntil()
.arrayList()
.condition((list, t) -> list.size() == 3)
.step(1)
.emitRemainder(true)
.maxReplay(100)
.build()
.test() //
.assertValuesOnly( //
Lists.newArrayList(1, 2, 3, 4), //
Lists.newArrayList(2, 3, 4, 5), //
Lists.newArrayList(3, 4, 5, 6), //
Lists.newArrayList(4, 5, 6, 7), //
Lists.newArrayList(5, 6, 7), //
Lists.newArrayList(6, 7), //
Lists.newArrayList(7));
}

@Test
public void testBufferWhileWithStepSmallerThanBufferedList() {
Expand Down

0 comments on commit 48a4e42

Please sign in to comment.