Skip to content

Commit

Permalink
Add bufferWhile and bufferUntil builder (#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten authored Dec 3, 2024
1 parent bb578b0 commit b75cdfc
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 1 deletion.
140 changes: 140 additions & 0 deletions kool/src/main/java/org/davidmoten/kool/BufferBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package org.davidmoten.kool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

import org.davidmoten.kool.function.BiFunction;
import org.davidmoten.kool.function.BiPredicate;
import org.davidmoten.kool.function.Function;

public final class BufferBuilder<T> {

private final Stream<T> stream;
private boolean isWhile;

// private Callable<? extends S> factory;
// private BiFunction<? super S, ? super T, ? extends S> accumulator, BiPredicate<? super S, ? super T> condition,
// boolean emitRemainder, Function<? super S, Integer> step, int maxReplay

BufferBuilder(Stream<T> stream, boolean isWhile) {
this.stream = stream;
this.isWhile = isWhile;
}

public <S> BuilderHasFactory<T, S> factory(Callable<? extends S> factory) {
return new BuilderHasFactory<T, S>(stream, factory, isWhile);
}

public BuilderHasFactoryArrayList<T> arrayList() {
return new BuilderHasFactoryArrayList<T>(stream, isWhile);
}

public static final class BuilderHasFactoryArrayList<T> {

private final Stream<T> stream;
private final boolean isWhile;

BuilderHasFactoryArrayList(Stream<T> stream, boolean isWhile) {
this.stream = stream;
this.isWhile = isWhile;
}

public BuilderHasAccumulator<T, List<T>> condition(BiPredicate<? super List<T>, ? super T> condition) {
return new BuilderHasFactory<T, List<T>>(stream, ArrayList::new, isWhile) //
.condition(condition) //
.accumulator((list, x) -> {
list.add(x);
return list;
});
}
}

public static final class BuilderHasFactory<T, S> {

private final Stream<T> stream;
private final Callable<? extends S> factory;
private final boolean isWhile;
private Function<? super S, Integer> step;
private BiFunction<? super S, ? super T, ? extends S> accumulator;
private BiPredicate<? super S, ? super T> condition;
private boolean emitRemainder = true;
private int maxReplay = 1024;

BuilderHasFactory(Stream<T> stream, Callable<? extends S> factory, boolean isWhile) {
this.stream = stream;
this.factory = factory;
this.isWhile = isWhile;
}

public BuilderHasCondition<T, S> condition(BiPredicate<? super S, ? super T> condition) {
this.condition = condition;
return new BuilderHasCondition<T, S>(this);
}

Stream<S> build() {
if (isWhile) {
return stream.bufferWhile(factory, accumulator, condition, emitRemainder, step, maxReplay);
} else {
return stream.bufferUntil(factory, accumulator, condition, emitRemainder, step, maxReplay);
}
}
}

public static final class BuilderHasCondition<T, S> {

private final BuilderHasFactory<T, S> b;

public BuilderHasCondition(BuilderHasFactory<T, S> b) {
this.b = b;
}

public BuilderHasAccumulator<T, S> accumulator(BiFunction<? super S, ? super T, ? extends S> accumulator) {
b.accumulator = accumulator;
return new BuilderHasAccumulator<T, S>(b);
}
}

public static final class BuilderHasAccumulator<T, S> {

private final BuilderHasFactory<T, S> b;

BuilderHasAccumulator(BuilderHasFactory<T, S> b) {
this.b = b;
}

public BuilderHasStep<T, S> step(int step) {
return step(x -> step);
}

public BuilderHasStep<T, S> step(Function<? super S, Integer> step) {
b.step = step;
return new BuilderHasStep<T, S>(b);
}

}

public static final class BuilderHasStep<T, S> {

private final BuilderHasFactory<T, S> b;

BuilderHasStep(BuilderHasFactory<T, S> b) {
this.b = b;
}

public BuilderHasStep<T, S> emitRemainder(boolean emitRemainder) {
this.b.emitRemainder = emitRemainder;
return this;
}

public BuilderHasStep<T, S> maxReplay(int maxReplay) {
this.b.maxReplay = maxReplay;
return this;
}

public Stream<S> build() {
return b.build();
}
}

}
42 changes: 41 additions & 1 deletion kool/src/main/java/org/davidmoten/kool/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -945,20 +945,60 @@ default Stream<List<T>> bufferWhile(BiPredicate<? super List<T>, ? super T> cond
return bufferWhile(ArrayList::new, StreamUtils.listAccumulator(), condition, emitRemainder, step, maxReplay);
}

/**
* Buffers until a condition is true then emits the buffer and starts a new
* buffer for emission.
*
* @param <S> buffer type
* @param factory factory method to create a new buffer
* @param accumulator how to incorporate a stream element into the buffer
* @param condition if true then stream element is incorporated into the
* buffer
* @param emitRemainder what to do with the final buffer
* @param step function that defines overlap (or not) of buffers
* @param maxReplay overlap of buffers is achieved efficiently via a ring
* buffer (the stream is made replayable to handle buffer
* overlaps (e.g step = 1)
* @return buffered stream
*/
default <S> Stream<S> bufferUntil(Callable<? extends S> factory,
BiFunction<? super S, ? super T, ? extends S> accumulator, BiPredicate<? super S, ? super T> condition,
boolean emitRemainder, Function<? super S, Integer> step, int maxReplay) {
return new BufferWithFactoryPredicateAndStep<>(factory, accumulator, condition, emitRemainder, true, this,
step, maxReplay);
}

/**
* Buffers while a condition is true then emits the buffer and starts a new
* buffer for emission.
*
* @param <S> buffer type
* @param factory factory method to create a new buffer
* @param accumulator how to incorporate a stream element into the buffer
* @param condition if true then stream element is incorporated into the
* buffer
* @param emitRemainder what to do with the final buffer
* @param step function that defines overlap (or not) of buffers
* @param maxReplay overlap of buffers is achieved efficiently via a ring
* buffer (the stream is made replayable to handle buffer
* overlaps (e.g step = 1)
* @return buffered stream
*/
default <S> Stream<S> bufferWhile(Callable<? extends S> factory,
BiFunction<? super S, ? super T, ? extends S> accumulator, BiPredicate<? super S, ? super T> condition,
boolean emitRemainder, Function<? super S, Integer> step, int maxReplay) {
return new BufferWithFactoryPredicateAndStep<>(factory, accumulator, condition, emitRemainder, false, this,
step, maxReplay);
}


default BufferBuilder<T> bufferUntil() {
return new BufferBuilder<T>(this, false);
}

default BufferBuilder<T> bufferWhile() {
return new BufferBuilder<T>(this, true);
}

default Stream<Indexed<T>> mapWithIndex(long startIndex) {
return defer(() -> {
long[] index = new long[] { startIndex };
Expand Down
56 changes: 56 additions & 0 deletions kool/src/test/java/org/davidmoten/kool/StreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,27 @@ public void testBufferUntilWithStepSmallerThanBufferedList() {
Lists.newArrayList(6, 7), //
Lists.newArrayList(7));
}

@Test
public void testBufferUntilWithStepSmallerThanBufferedListUseBuilder() {
Stream.of(1, 2, 3, 4, 5, 6, 7) //
.bufferUntil()
.arrayList()
.condition((list, t) -> list.size() == 3)
.step(1)
.emitRemainder(true)
.maxReplay(100)
.build()
.test() //
.assertValuesOnly( //
Lists.newArrayList(1, 2, 3, 4), //
Lists.newArrayList(2, 3, 4, 5), //
Lists.newArrayList(3, 4, 5, 6), //
Lists.newArrayList(4, 5, 6, 7), //
Lists.newArrayList(5, 6, 7), //
Lists.newArrayList(6, 7), //
Lists.newArrayList(7));
}

@Test
public void testBufferWhileWithStepSmallerThanBufferedList() {
Expand Down Expand Up @@ -875,6 +896,41 @@ public void testBufferWhileWithStepSmallerThanBufferedListDontEmitRemainder() {
Lists.newArrayList(2, 3, 4, 5), //
Lists.newArrayList(3, 4, 5, 6));
}

@Test
public void testBufferWhileWithStepSmallerThanBufferedListDontEmitRemainderUseBuilder() {
Stream.of(1, 2, 3, 4, 5, 6, 7) //
.bufferWhile() //
.arrayList() //
.condition((list, t) -> list.size() <= 3) //
.step(1) //
.emitRemainder(false) //
.maxReplay(100)
.build()
.test() //
.assertValuesOnly( //
Lists.newArrayList(1, 2, 3, 4), //
Lists.newArrayList(2, 3, 4, 5), //
Lists.newArrayList(3, 4, 5, 6));
}

@Test
public void testBufferWhileWithStepSmallerThanBufferedListDontEmitRemainderUseBuilderWithFactory() {
Stream.of(1, 2, 3, 4, 5, 6, 7) //
.bufferWhile() //
.factory(ArrayList::new) //
.condition((list, t) -> list.size() <= 3) //
.accumulator((list, t)-> {list.add(t); return list;}) //
.step(1) //
.emitRemainder(false) //
.maxReplay(100)
.build()
.test() //
.assertValuesOnly( //
Lists.newArrayList(1, 2, 3, 4), //
Lists.newArrayList(2, 3, 4, 5), //
Lists.newArrayList(3, 4, 5, 6));
}

@Test(expected = NoSuchElementException.class)
public void testBufferUntilWithStepWhenEmptyAndNextCalled() {
Expand Down

0 comments on commit b75cdfc

Please sign in to comment.