Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18026: transition KTable#filter impl to use processor wrapper #18205

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;

import java.util.Collections;
import java.util.Set;

import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST;
Expand All @@ -34,17 +39,20 @@ public class KTableFilter<KIn, VIn> implements KTableProcessorSupplier<KIn, VIn,
private final String queryableName;
private boolean sendOldValues;
private boolean useVersionedSemantics = false;
private final StoreFactory storeFactory;

KTableFilter(final KTableImpl<KIn, ?, VIn> parent,
final Predicate<? super KIn, ? super VIn> predicate,
final boolean filterNot,
final String queryableName) {
final String queryableName,
final StoreFactory storeFactory) {
this.parent = parent;
this.predicate = predicate;
this.filterNot = filterNot;
this.queryableName = queryableName;
// If upstream is already materialized, enable sending old values to avoid sending unnecessary tombstones:
this.sendOldValues = parent.enableSendingOldValues(false);
this.storeFactory = storeFactory;
}

public void setUseVersionedSemantics(final boolean useVersionedSemantics) {
Expand All @@ -61,6 +69,14 @@ public Processor<KIn, Change<VIn>, KIn, Change<VIn>> get() {
return new KTableFilterProcessor();
}

@Override
public Set<StoreBuilder<?>> stores() {
if (storeFactory == null) {
return null;
}
return Collections.singleton(new StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory));
}

@Override
public boolean enableSendingOldValues(final boolean forceMaterialization) {
if (queryableName != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,17 +209,13 @@ private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME);

final KTableProcessorSupplier<K, V, K, V> processorSupplier =
new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
new KTableFilter<>(this, predicate, filterNot, queryableStoreName, storeFactory);

final ProcessorParameters<K, V, ?, ?> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType(
new ProcessorParameters<>(processorSupplier, name)
);

final GraphNode tableNode = new TableFilterNode<>(
name,
processorParameters,
storeFactory
);
final GraphNode tableNode = new TableFilterNode<>(name, processorParameters);
maybeSetOutputVersioned(tableNode, materializedInternal);

builder.addGraphNode(this.graphNode, tableNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@

import org.apache.kafka.streams.kstream.internals.KTableFilter;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.StoreFactory;

public class TableFilterNode<K, V> extends TableProcessorNode<K, V> implements VersionedSemanticsGraphNode {

public TableFilterNode(final String nodeName,
final ProcessorParameters<K, V, ?, ?> processorParameters,
final StoreFactory storeFactory) {
super(nodeName, processorParameters, storeFactory, null);
final ProcessorParameters<K, V, ?, ?> processorParameters) {
super(nodeName, processorParameters, null, null);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1642,6 +1642,26 @@ public void shouldWrapProcessorsForMapValuesWithMaterializedStore() {
assertThat(counter.numConnectedStateStores(), is(1));
}

@Test
public void shouldWrapProcessorAndStoreForFilterTable() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);
final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));

builder.table("input", Consumed.as("source-table"))
.filter((k, v) -> true, Named.as("filter"), Materialized.as("filter"))
.toStream(Named.as("to-stream"))
.to("output-topic", Produced.as("sink"));
builder.build();

assertThat(counter.wrappedProcessorNames(),
Matchers.containsInAnyOrder("source-table", "filter", "to-stream"));
assertThat(counter.numUniqueStateStores(), is(1));
assertThat(counter.numConnectedStateStores(), is(1));
}

@Test
public void shouldWrapProcessorsForTableAggregate() {
final Map<Object, Object> props = dummyStreamsConfigMap();
Expand Down Expand Up @@ -1744,8 +1764,8 @@ public void shouldWrapProcessorsWhenMultipleTableOperators() {
"to-table", "map-values", "map-values-stateful",
"filter-table", "filter-table-stateful", "to-stream"
));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(1));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
}

@Test
Expand Down
Loading