Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18023: Enforcing Explicit Naming for Kafka Streams Internal Topics #18233

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,

final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(
Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()),
Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumedInternal.keySerde(), consumedInternal.valueSerde()).withLoggingDisabled(),
internalStreamsBuilder,
topic + "-",
true /* force materializing global tables */);
Expand Down Expand Up @@ -456,7 +456,7 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
Objects.requireNonNull(materialized, "materialized can't be null");
final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
// always use the serdes from consumed
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde()).withLoggingDisabled();

final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,10 @@ public class StreamsConfig extends AbstractConfig {
public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable pushing of internal client metrics for (main, restore, and global) consumers, producers, and admin clients." +
" The cluster must have a client metrics subscription which corresponds to a client.";

/** {@code ensure.explicit.internal.resource.naming} */
public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG = "ensure.explicit.internal.resource.naming";
public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC = "Whether to enforce explicit naming for all internal resources of the topology, including internal topics (e.g., changelog and repartition topics) and their associated state stores." +
" When enabled, the application will fail to start if any internal resource has an auto-generated name.";
/** {@code log.summary.interval.ms} */
public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = "log.summary.interval.ms";
private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "The output interval in milliseconds for logging summary information.\n" +
Expand Down Expand Up @@ -865,6 +869,11 @@ public class StreamsConfig extends AbstractConfig {
Importance.HIGH,
STATE_DIR_DOC,
"${java.io.tmpdir}")
.define(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG,
Type.BOOLEAN,
false,
Importance.HIGH,
ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC)

// MEDIUM

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DEFAULT;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DOC;
import static org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC;
import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY;
import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC;
Expand Down Expand Up @@ -142,7 +144,12 @@ public final class TopologyConfig extends AbstractConfig {
Type.CLASS,
DSL_STORE_SUPPLIERS_CLASS_DEFAULT,
Importance.LOW,
DSL_STORE_SUPPLIERS_CLASS_DOC);
DSL_STORE_SUPPLIERS_CLASS_DOC)
.define(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG,
Type.BOOLEAN,
false,
Importance.HIGH,
ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC);
}
private static final Logger log = LoggerFactory.getLogger(TopologyConfig.class);

Expand All @@ -164,6 +171,8 @@ public final class TopologyConfig extends AbstractConfig {
public final Supplier<DeserializationExceptionHandler> deserializationExceptionHandlerSupplier;
public final Supplier<ProcessingExceptionHandler> processingExceptionHandlerSupplier;

public final boolean ensureExplicitInternalResourceNaming;

public TopologyConfig(final StreamsConfig configs) {
this(null, configs, mkObjectProperties(configs.originals()));
}
Expand Down Expand Up @@ -272,6 +281,8 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo
} else {
dslStoreSuppliers = globalAppConfigs.getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG);
}

ensureExplicitInternalResourceNaming = globalAppConfigs.getBoolean(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG);
}

@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? sup
final Serde<VOut> valueSerde,
final String queryableName,
final boolean isOutputVersioned) {
processRepartitions(groupPatterns, storeFactory.storeName());
processRepartitions(groupPatterns, storeFactory.storeName(), queryableName);
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
boolean stateCreated = false;
Expand Down Expand Up @@ -92,7 +92,7 @@ <KR, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>
final Serde<VOut> valueSerde,
final String queryableName,
final Windows<W> windows) {
processRepartitions(groupPatterns, storeFactory.storeName());
processRepartitions(groupPatterns, storeFactory.storeName(), queryableName);

final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
Expand Down Expand Up @@ -132,7 +132,7 @@ <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? sup
final String queryableName,
final SessionWindows sessionWindows,
final Merger<? super K, VOut> sessionMerger) {
processRepartitions(groupPatterns, storeFactory.storeName());
processRepartitions(groupPatterns, storeFactory.storeName(), queryableName);
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
boolean stateCreated = false;
Expand Down Expand Up @@ -171,7 +171,7 @@ <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? sup
final Serde<VOut> valueSerde,
final String queryableName,
final SlidingWindows slidingWindows) {
processRepartitions(groupPatterns, storeFactory.storeName());
processRepartitions(groupPatterns, storeFactory.storeName(), queryableName);
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
final Collection<GraphNode> processors = new ArrayList<>();
boolean stateCreated = false;
Expand Down Expand Up @@ -202,7 +202,8 @@ <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? sup
}

private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final String storeName) {
final String storeName,
final String queryableName) {
for (final KGroupedStreamImpl<K, ?> repartitionReqs : groupPatterns.keySet()) {

if (repartitionReqs.repartitionRequired) {
Expand All @@ -212,8 +213,7 @@ private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, Aggregator<
final String repartitionNamePrefix = repartitionReqs.userProvidedRepartitionTopicName != null ?
repartitionReqs.userProvidedRepartitionTopicName : storeName;

createRepartitionSource(repartitionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde);

createRepartitionSource(repartitionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde, repartitionReqs.userProvidedRepartitionTopicName != null || queryableName != null);
if (!parentNodes.containsKey(repartitionReqs)) {
final GraphNode repartitionNode = repartitionNodeBuilder.build();
builder.addGraphNode(repartitionReqs.graphNode, repartitionNode);
Expand Down Expand Up @@ -290,14 +290,16 @@ <KR, VIn> KTable<KR, VOut> createTable(final Collection<GraphNode> processors,
private <VIn> void createRepartitionSource(final String repartitionTopicNamePrefix,
final OptimizableRepartitionNodeBuilder<K, ?> optimizableRepartitionNodeBuilder,
final Serde<K> keySerde,
final Serde<?> valueSerde) {
final Serde<?> valueSerde,
final boolean isRepartitionTopicNameProvidedByUser) {

KStreamImpl.createRepartitionedSource(builder,
keySerde,
(Serde<VIn>) valueSerde,
repartitionTopicNamePrefix,
null,
(OptimizableRepartitionNodeBuilder<K, VIn>) optimizableRepartitionNodeBuilder);
(OptimizableRepartitionNodeBuilder<K, VIn>) optimizableRepartitionNodeBuilder,
isRepartitionTopicNameProvidedByUser);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ <KR, VR> KTable<KR, VR> build(final NamedInternal functionName,
if (repartitionRequired) {
final OptimizableRepartitionNodeBuilder<K, V> repartitionNodeBuilder = optimizableRepartitionNodeBuilder();
final String repartitionTopicPrefix = userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : storeFactory.storeName();
sourceName = createRepartitionSource(repartitionTopicPrefix, repartitionNodeBuilder);

sourceName = createRepartitionSource(repartitionTopicPrefix, repartitionNodeBuilder, userProvidedRepartitionTopicName != null || queryableStoreName != null);
// First time through we need to create a repartition node.
// Any subsequent calls to GroupedStreamAggregateBuilder#build we check if
// the user has provided a name for the repartition topic, is so we re-use
Expand Down Expand Up @@ -122,14 +121,16 @@ <KR, VR> KTable<KR, VR> build(final NamedInternal functionName,
* @return the new sourceName of the repartitioned source
*/
private String createRepartitionSource(final String repartitionTopicNamePrefix,
final OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder) {
final OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder,
final boolean isRepartitionTopicNameProvidedByUser) {

return KStreamImpl.createRepartitionedSource(builder,
keySerde,
valueSerde,
repartitionTopicNamePrefix,
null,
optimizableRepartitionNodeBuilder);
optimizableRepartitionNodeBuilder,
isRepartitionTopicNameProvidedByUser);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@ public void buildAndOptimizeTopology(final Properties props) {
}
}
internalTopologyBuilder.validateCopartition();

internalTopologyBuilder.checkUnprovidedNames();

}

/**
Expand Down Expand Up @@ -489,7 +492,8 @@ private void mergeRepartitionTopics() {
//passing in the name of the first repartition topic, re-used to create the optimized repartition topic
final GraphNode optimizedSingleRepartition = createRepartitionNode(repartitionTopicName,
groupedInternal.keySerde(),
groupedInternal.valueSerde());
groupedInternal.valueSerde(),
true);

// re-use parent buildPriority to make sure the single repartition graph node is evaluated before downstream nodes
optimizedSingleRepartition.setBuildPriority(keyChangingNode.buildPriority());
Expand Down Expand Up @@ -579,7 +583,8 @@ private boolean mergeNodeHasRepartitionChildren(final GraphNode mergeNode,

private <K, V> OptimizableRepartitionNode<K, V> createRepartitionNode(final String repartitionTopicName,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
final Serde<V> valueSerde,
final boolean isRepartitionTopicNameProvidedByUser) {

final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> repartitionNodeBuilder =
OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
Expand All @@ -589,7 +594,8 @@ private <K, V> OptimizableRepartitionNode<K, V> createRepartitionNode(final Stri
valueSerde,
repartitionTopicName,
null,
repartitionNodeBuilder
repartitionNodeBuilder,
isRepartitionTopicNameProvidedByUser
);

// ensures setting the repartition topic to the name of the
Expand Down
Loading
Loading