-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18026: KIP-1112 migrate KTableSuppressProcessorSupplier #18150
base: trunk
Are you sure you want to change the base?
Conversation
final ProcessorGraphNode<K, Change<V>> node = new TableSuppressNode<>( | ||
name, | ||
new ProcessorParameters<>(suppressionSupplier, name), | ||
StoreBuilderWrapper.wrapStoreBuilder(storeBuilder) | ||
new String[]{storeName} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this intentional? The callee seems not using the storeName
below: https://github.com/apache/kafka/pull/18150/files#diff-c1133b80fb36ee7e4fbf7a26007f63e41a411e7f36e45aa96373afd9ebdc5559R22
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's definitely not being used in the PR, but actually we do still need it so it should/will be used (see https://github.com/apache/kafka/pull/18150/files#r1885482579)
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java
Show resolved
Hide resolved
super(nodeName, processorParameters); | ||
|
||
this.storeNames = storeNames; | ||
this.storeNames = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can change this yet since not all StatefulProcessorNode users have been migrated yet. For example the FKJ. Pretty sure this is what's causing the test failures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mm yeah almost definitely the problem:
Caused by: org.apache.kafka.streams.errors.StreamsException: Processor KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000022 has no access to StateStore KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000021 as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator, or they can provide a StoreBuilder by implementing the stores() method on the Supplier itself. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA.
basically setting this to null skips the #connectProcessorAndStateStore call, and not every operator that uses this constructor (including children of StatefulProcessorNode) have been migrated to implementing stores
(which will result in the store being connected during the ProcessorParameters#addProcessorTo
call)
Migrates
KTableSuppressProcessorSupplier
to use the theProcessorSupplier#stores()
method