diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java index 475ea85db940a..91e2fac9411d0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java @@ -20,9 +20,14 @@ import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper; +import java.util.Collections; +import java.util.Set; + import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT; import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST; @@ -34,17 +39,20 @@ public class KTableFilter implements KTableProcessorSupplier parent, final Predicate predicate, final boolean filterNot, - final String queryableName) { + final String queryableName, + final StoreFactory storeFactory) { this.parent = parent; this.predicate = predicate; this.filterNot = filterNot; this.queryableName = queryableName; // If upstream is already materialized, enable sending old values to avoid sending unnecessary tombstones: this.sendOldValues = parent.enableSendingOldValues(false); + this.storeFactory = storeFactory; } public void setUseVersionedSemantics(final boolean useVersionedSemantics) { @@ -61,6 +69,14 @@ public Processor, KIn, Change> get() { return new KTableFilterProcessor(); } + @Override + public Set> stores() { + if (storeFactory == null) { + return null; + } + return Collections.singleton(new StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory)); + } + @Override public boolean enableSendingOldValues(final boolean forceMaterialization) { if (queryableName != null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index aa07714167738..40c565c8ceb4b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -209,17 +209,13 @@ private KTable doFilter(final Predicate predicate, final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME); final KTableProcessorSupplier processorSupplier = - new KTableFilter<>(this, predicate, filterNot, queryableStoreName); + new KTableFilter<>(this, predicate, filterNot, queryableStoreName, storeFactory); final ProcessorParameters processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType( new ProcessorParameters<>(processorSupplier, name) ); - final GraphNode tableNode = new TableFilterNode<>( - name, - processorParameters, - storeFactory - ); + final GraphNode tableNode = new TableFilterNode<>(name, processorParameters); maybeSetOutputVersioned(tableNode, materializedInternal); builder.addGraphNode(this.graphNode, tableNode); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java index 38033693ebb62..6fef05604cf68 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableFilterNode.java @@ -19,14 +19,12 @@ import org.apache.kafka.streams.kstream.internals.KTableFilter; import org.apache.kafka.streams.processor.api.ProcessorSupplier; -import org.apache.kafka.streams.processor.internals.StoreFactory; public class TableFilterNode extends TableProcessorNode implements VersionedSemanticsGraphNode { public TableFilterNode(final String nodeName, - final ProcessorParameters processorParameters, - final StoreFactory storeFactory) { - super(nodeName, processorParameters, storeFactory, null); + final ProcessorParameters processorParameters) { + super(nodeName, processorParameters, null, null); } @SuppressWarnings("unchecked") diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index b20bec101bca5..71905e1481c8f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -1642,6 +1642,26 @@ public void shouldWrapProcessorsForMapValuesWithMaterializedStore() { assertThat(counter.numConnectedStateStores(), is(1)); } + @Test + public void shouldWrapProcessorAndStoreForFilterTable() { + final Map props = dummyStreamsConfigMap(); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + builder.table("input", Consumed.as("source-table")) + .filter((k, v) -> true, Named.as("filter"), Materialized.as("filter")) + .toStream(Named.as("to-stream")) + .to("output-topic", Produced.as("sink")); + builder.build(); + + assertThat(counter.wrappedProcessorNames(), + Matchers.containsInAnyOrder("source-table", "filter", "to-stream")); + assertThat(counter.numUniqueStateStores(), is(1)); + assertThat(counter.numConnectedStateStores(), is(1)); + } + @Test public void shouldWrapProcessorsForTableAggregate() { final Map props = dummyStreamsConfigMap(); @@ -1744,8 +1764,8 @@ public void shouldWrapProcessorsWhenMultipleTableOperators() { "to-table", "map-values", "map-values-stateful", "filter-table", "filter-table-stateful", "to-stream" )); - assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1)); - assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1)); + assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2)); + assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2)); } @Test