Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18026: KIP-1112, ProcessorWrapper API with PAPI and partial DSL implementation #17892

Open
wants to merge 9 commits into
base: trunk
Choose a base branch
from

Conversation

ableegoldman
Copy link
Contributor

@ableegoldman ableegoldman commented Nov 21, 2024

This PR includes the API for KIP-1112 and a partial implementation, which wraps any processors added through the PAPI and the DSL processors that are written to the topology through the ProcessorParameters#addProcessorTo method.

Further PRs will complete the implementation by converting the remaining DSL operators to using the #addProcessorTo method, and future-proof the processor writing mechanism to prevent new DSL operators from being implemented incorrectly/without the wrapper

Unit tests are WIP -- will be added to this PR shortly

@ableegoldman ableegoldman changed the title KAFKA-18026: KIP-1112, ProcessorWrapper API and PAPI implementation KAFKA-18026: KIP-1112, ProcessorWrapper API with PAPI and partial DSL implementation Nov 21, 2024
@@ -714,8 +714,10 @@ public org.apache.kafka.streams.processor.api.Processor<Object, Object, Object,
public synchronized <KIn, VIn, KOut, VOut> Topology addProcessor(final String name,
final ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
final String... parentNames) {
internalTopologyBuilder.addProcessor(name, supplier, parentNames);
final Set<StoreBuilder<?>> stores = supplier.stores();
final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped = internalTopologyBuilder.wrapProcessorSupplier(name, supplier);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the entire implementation for PAPI apps

dslStoreSuppliers = Utils.newInstance(
config.getClass(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG),
DslStoreSuppliers.class);
dslStoreSuppliers = config.getConfiguredInstance(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side fix: we weren't configuring this with the app configs

for (final StoreBuilder<?> storeBuilder : processorSupplier.stores()) {
ApiUtils.checkSupplier(processorSupplier);

final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is how the DSL implementation will work -- processors are wrapped when building the DSL into a Topology, specifically in this method where they are added to the InternalTopologyBuilder.

Note this PR is only a partial implementation since not all DSL operators go through this class, some just write their processors to the InternalTopologyBuilder directly. Followup PRs will tackle converting these operators and enforcing that any new ones added in the future will have to go through this method and be wrapped

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant