Skip to content

Commit

Permalink
KAFKA-18026: transition KTable#filter impl to use processor wrapper (#…
Browse files Browse the repository at this point in the history
…18205)

This patch transitions the KTable#filter implementation to provide the materialized store via the ProcessorSupplier so that it can be wrapped by the processor wrapper if the wrapper is configured

Reviewers: Anna Sophie Blee-Goldman <[email protected]>
  • Loading branch information
rodesai authored Dec 18, 2024
1 parent 346e5dc commit 501da38
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;

import java.util.Collections;
import java.util.Set;

import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST;
Expand All @@ -34,17 +39,20 @@ public class KTableFilter<KIn, VIn> implements KTableProcessorSupplier<KIn, VIn,
private final String queryableName;
private boolean sendOldValues;
private boolean useVersionedSemantics = false;
private final StoreFactory storeFactory;

KTableFilter(final KTableImpl<KIn, ?, VIn> parent,
final Predicate<? super KIn, ? super VIn> predicate,
final boolean filterNot,
final String queryableName) {
final String queryableName,
final StoreFactory storeFactory) {
this.parent = parent;
this.predicate = predicate;
this.filterNot = filterNot;
this.queryableName = queryableName;
// If upstream is already materialized, enable sending old values to avoid sending unnecessary tombstones:
this.sendOldValues = parent.enableSendingOldValues(false);
this.storeFactory = storeFactory;
}

public void setUseVersionedSemantics(final boolean useVersionedSemantics) {
Expand All @@ -61,6 +69,14 @@ public Processor<KIn, Change<VIn>, KIn, Change<VIn>> get() {
return new KTableFilterProcessor();
}

@Override
public Set<StoreBuilder<?>> stores() {
if (storeFactory == null) {
return null;
}
return Collections.singleton(new StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory));
}

@Override
public boolean enableSendingOldValues(final boolean forceMaterialization) {
if (queryableName != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,17 +209,13 @@ private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME);

final KTableProcessorSupplier<K, V, K, V> processorSupplier =
new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
new KTableFilter<>(this, predicate, filterNot, queryableStoreName, storeFactory);

final ProcessorParameters<K, V, ?, ?> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType(
new ProcessorParameters<>(processorSupplier, name)
);

final GraphNode tableNode = new TableFilterNode<>(
name,
processorParameters,
storeFactory
);
final GraphNode tableNode = new TableFilterNode<>(name, processorParameters);
maybeSetOutputVersioned(tableNode, materializedInternal);

builder.addGraphNode(this.graphNode, tableNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@

import org.apache.kafka.streams.kstream.internals.KTableFilter;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.StoreFactory;

public class TableFilterNode<K, V> extends TableProcessorNode<K, V> implements VersionedSemanticsGraphNode {

public TableFilterNode(final String nodeName,
final ProcessorParameters<K, V, ?, ?> processorParameters,
final StoreFactory storeFactory) {
super(nodeName, processorParameters, storeFactory, null);
final ProcessorParameters<K, V, ?, ?> processorParameters) {
super(nodeName, processorParameters, null, null);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1642,6 +1642,26 @@ public void shouldWrapProcessorsForMapValuesWithMaterializedStore() {
assertThat(counter.numConnectedStateStores(), is(1));
}

@Test
public void shouldWrapProcessorAndStoreForFilterTable() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);
final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));

builder.table("input", Consumed.as("source-table"))
.filter((k, v) -> true, Named.as("filter"), Materialized.as("filter"))
.toStream(Named.as("to-stream"))
.to("output-topic", Produced.as("sink"));
builder.build();

assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder("source-table", "filter", "to-stream"));
assertThat(counter.numUniqueStateStores(), is(1));
assertThat(counter.numConnectedStateStores(), is(1));
}

@Test
public void shouldWrapProcessorsForTableAggregate() {
final Map<Object, Object> props = dummyStreamsConfigMap();
Expand Down Expand Up @@ -1744,8 +1764,8 @@ public void shouldWrapProcessorsWhenMultipleTableOperators() {
"to-table", "map-values", "map-values-stateful",
"filter-table", "filter-table-stateful", "to-stream"
));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
}

@Test
Expand Down

0 comments on commit 501da38

Please sign in to comment.