Skip to content

Commit

Permalink
KAFKA-18023: Enforcing Explicit Naming for Kafka Streams Internal Topics
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastienviale committed Dec 17, 2024
1 parent 337fb8c commit 5aa6107
Show file tree
Hide file tree
Showing 14 changed files with 284 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,

final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(
Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()),
Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumedInternal.keySerde(), consumedInternal.valueSerde()).withLoggingDisabled(),
internalStreamsBuilder,
topic + "-",
true /* force materializing global tables */);
Expand Down Expand Up @@ -456,7 +456,7 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
Objects.requireNonNull(materialized, "materialized can't be null");
final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
// always use the serdes from consumed
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde()).withLoggingDisabled();

final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,10 @@ public class StreamsConfig extends AbstractConfig {
public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable pushing of internal client metrics for (main, restore, and global) consumers, producers, and admin clients." +
" The cluster must have a client metrics subscription which corresponds to a client.";

/** {@code ensure.explicit.internal.resource.naming} */
public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG = "ensure.explicit.internal.resource.naming";
public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC = "Whether to enforce explicit naming for all internal resources of the topology, including internal topics (e.g., changelog and repartition topics) and their associated state stores." +
" When enabled, the application will fail to start if any internal resource has an auto-generated name.";
/** {@code log.summary.interval.ms} */
public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = "log.summary.interval.ms";
private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "The output interval in milliseconds for logging summary information.\n" +
Expand Down Expand Up @@ -865,6 +869,11 @@ public class StreamsConfig extends AbstractConfig {
Importance.HIGH,
STATE_DIR_DOC,
"${java.io.tmpdir}")
.define(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG,
Type.BOOLEAN,
false,
Importance.HIGH,
ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC)

// MEDIUM

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DEFAULT;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DOC;
import static org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC;
import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY;
import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC;
Expand Down Expand Up @@ -142,7 +144,12 @@ public final class TopologyConfig extends AbstractConfig {
Type.CLASS,
DSL_STORE_SUPPLIERS_CLASS_DEFAULT,
Importance.LOW,
DSL_STORE_SUPPLIERS_CLASS_DOC);
DSL_STORE_SUPPLIERS_CLASS_DOC)
.define(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG,
Type.BOOLEAN,
false,
Importance.HIGH,
ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC);
}
private static final Logger log = LoggerFactory.getLogger(TopologyConfig.class);

Expand All @@ -164,6 +171,8 @@ public final class TopologyConfig extends AbstractConfig {
public final Supplier<DeserializationExceptionHandler> deserializationExceptionHandlerSupplier;
public final Supplier<ProcessingExceptionHandler> processingExceptionHandlerSupplier;

public final boolean ensureExplicitInternalResourceNaming;

public TopologyConfig(final StreamsConfig configs) {
this(null, configs, mkObjectProperties(configs.originals()));
}
Expand Down Expand Up @@ -272,6 +281,8 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo
} else {
dslStoreSuppliers = globalAppConfigs.getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG);
}

ensureExplicitInternalResourceNaming = globalAppConfigs.getBoolean(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG);
}

@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, Aggregator<
final String repartitionNamePrefix = repartitionReqs.userProvidedRepartitionTopicName != null ?
repartitionReqs.userProvidedRepartitionTopicName : storeName;

createRepartitionSource(repartitionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde);

createRepartitionSource(repartitionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde, repartitionReqs.userProvidedRepartitionTopicName != null || storeName != null);
if (!parentNodes.containsKey(repartitionReqs)) {
final GraphNode repartitionNode = repartitionNodeBuilder.build();
builder.addGraphNode(repartitionReqs.graphNode, repartitionNode);
Expand Down Expand Up @@ -290,14 +289,16 @@ <KR, VIn> KTable<KR, VOut> createTable(final Collection<GraphNode> processors,
private <VIn> void createRepartitionSource(final String repartitionTopicNamePrefix,
final OptimizableRepartitionNodeBuilder<K, ?> optimizableRepartitionNodeBuilder,
final Serde<K> keySerde,
final Serde<?> valueSerde) {
final Serde<?> valueSerde,
final boolean isRepartitionTopicNameProvidedByUser) {

KStreamImpl.createRepartitionedSource(builder,
keySerde,
(Serde<VIn>) valueSerde,
repartitionTopicNamePrefix,
null,
(OptimizableRepartitionNodeBuilder<K, VIn>) optimizableRepartitionNodeBuilder);
(OptimizableRepartitionNodeBuilder<K, VIn>) optimizableRepartitionNodeBuilder,
isRepartitionTopicNameProvidedByUser);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ <KR, VR> KTable<KR, VR> build(final NamedInternal functionName,
if (repartitionRequired) {
final OptimizableRepartitionNodeBuilder<K, V> repartitionNodeBuilder = optimizableRepartitionNodeBuilder();
final String repartitionTopicPrefix = userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : storeFactory.storeName();
sourceName = createRepartitionSource(repartitionTopicPrefix, repartitionNodeBuilder);

sourceName = createRepartitionSource(repartitionTopicPrefix, repartitionNodeBuilder, userProvidedRepartitionTopicName != null);
// First time through we need to create a repartition node.
// Any subsequent calls to GroupedStreamAggregateBuilder#build we check if
// the user has provided a name for the repartition topic, is so we re-use
Expand Down Expand Up @@ -122,14 +121,16 @@ <KR, VR> KTable<KR, VR> build(final NamedInternal functionName,
* @return the new sourceName of the repartitioned source
*/
private String createRepartitionSource(final String repartitionTopicNamePrefix,
final OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder) {
final OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder,
final boolean isRepartitionTopicNameProvidedByUser) {

return KStreamImpl.createRepartitionedSource(builder,
keySerde,
valueSerde,
repartitionTopicNamePrefix,
null,
optimizableRepartitionNodeBuilder);
optimizableRepartitionNodeBuilder,
isRepartitionTopicNameProvidedByUser);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@ public void buildAndOptimizeTopology(final Properties props) {
}
}
internalTopologyBuilder.validateCopartition();

internalTopologyBuilder.checkUnprovidedNames();

}

/**
Expand Down Expand Up @@ -489,7 +492,8 @@ private void mergeRepartitionTopics() {
//passing in the name of the first repartition topic, re-used to create the optimized repartition topic
final GraphNode optimizedSingleRepartition = createRepartitionNode(repartitionTopicName,
groupedInternal.keySerde(),
groupedInternal.valueSerde());
groupedInternal.valueSerde(),
true);

// re-use parent buildPriority to make sure the single repartition graph node is evaluated before downstream nodes
optimizedSingleRepartition.setBuildPriority(keyChangingNode.buildPriority());
Expand Down Expand Up @@ -579,7 +583,8 @@ private boolean mergeNodeHasRepartitionChildren(final GraphNode mergeNode,

private <K, V> OptimizableRepartitionNode<K, V> createRepartitionNode(final String repartitionTopicName,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
final Serde<V> valueSerde,
final boolean isRepartitionTopicNameProvidedByUser) {

final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> repartitionNodeBuilder =
OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
Expand All @@ -589,7 +594,8 @@ private <K, V> OptimizableRepartitionNode<K, V> createRepartitionNode(final Stri
valueSerde,
repartitionTopicName,
null,
repartitionNodeBuilder
repartitionNodeBuilder,
isRepartitionTopicNameProvidedByUser
);

// ensures setting the repartition topic to the name of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalResourcesNaming;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
Expand Down Expand Up @@ -524,7 +525,8 @@ private KStream<K, V> doRepartition(final Repartitioned<K, V> repartitioned) {
valueSerde,
name,
repartitionedInternal.streamPartitioner(),
unoptimizableRepartitionNodeBuilder.withInternalTopicProperties(internalTopicProperties)
unoptimizableRepartitionNodeBuilder.withInternalTopicProperties(internalTopicProperties),
repartitionedInternal.name() != null
);

final UnoptimizableRepartitionNode<K, V> unoptimizableRepartitionNode = unoptimizableRepartitionNodeBuilder.build();
Expand Down Expand Up @@ -644,7 +646,8 @@ public KTable<K, V> toTable(final Named named,
valueSerdeOverride,
name,
null,
repartitionNodeBuilder
repartitionNodeBuilder,
namedInternal.name() != null
);

tableParentNode = repartitionNodeBuilder.build();
Expand Down Expand Up @@ -850,13 +853,13 @@ private <VO, VR> KStream<K, VR> doJoin(final KStream<K, VO> otherStream,
if (joinThis.repartitionRequired) {
final String joinThisName = joinThis.name;
final String leftJoinRepartitionTopicName = name.suffixWithOrElseGet("-left", joinThisName);
joinThis = joinThis.repartitionForJoin(leftJoinRepartitionTopicName, streamJoinedInternal.keySerde(), streamJoinedInternal.valueSerde());
joinThis = joinThis.repartitionForJoin(leftJoinRepartitionTopicName, streamJoinedInternal.keySerde(), streamJoinedInternal.valueSerde(), name.name() != null || joinThisName != null);
}

if (joinOther.repartitionRequired) {
final String joinOtherName = joinOther.name;
final String rightJoinRepartitionTopicName = name.suffixWithOrElseGet("-right", joinOtherName);
joinOther = joinOther.repartitionForJoin(rightJoinRepartitionTopicName, streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde());
joinOther = joinOther.repartitionForJoin(rightJoinRepartitionTopicName, streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde(), name.name() != null || joinOtherName != null);
}

joinThis.ensureCopartitionWith(Collections.singleton(joinOther));
Expand All @@ -875,7 +878,8 @@ private <VO, VR> KStream<K, VR> doJoin(final KStream<K, VO> otherStream,
*/
private KStreamImpl<K, V> repartitionForJoin(final String repartitionName,
final Serde<K> keySerdeOverride,
final Serde<V> valueSerdeOverride) {
final Serde<V> valueSerdeOverride,
final boolean isRepartitionTopicNameProvidedByUser) {
final Serde<K> repartitionKeySerde = keySerdeOverride != null ? keySerdeOverride : keySerde;
final Serde<V> repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : valueSerde;
final OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder =
Expand All @@ -889,7 +893,8 @@ private KStreamImpl<K, V> repartitionForJoin(final String repartitionName,
repartitionValueSerde,
repartitionName,
null,
optimizableRepartitionNodeBuilder);
optimizableRepartitionNodeBuilder,
isRepartitionTopicNameProvidedByUser);

if (repartitionNode == null || !name.equals(repartitionName)) {
repartitionNode = optimizableRepartitionNodeBuilder.build();
Expand All @@ -911,11 +916,15 @@ static <K1, V1, RN extends BaseRepartitionNode<K1, V1>> String createRepartition
final Serde<V1> valueSerde,
final String repartitionTopicNamePrefix,
final StreamPartitioner<K1, V1> streamPartitioner,
final BaseRepartitionNodeBuilder<K1, V1, RN> baseRepartitionNodeBuilder) {
final BaseRepartitionNodeBuilder<K1, V1, RN> baseRepartitionNodeBuilder,
final boolean isRepartitionTopicNameProvidedByUser) {

final String repartitionTopicName = repartitionTopicNamePrefix.endsWith(REPARTITION_TOPIC_SUFFIX) ?
repartitionTopicNamePrefix :
repartitionTopicNamePrefix + REPARTITION_TOPIC_SUFFIX;
if (!isRepartitionTopicNameProvidedByUser) {
builder.internalTopologyBuilder().addUnprovidedInternalTopics(InternalResourcesNaming.build().withRepartitionTopic(repartitionTopicName));
}

// Always need to generate the names to burn index counter for compatibility
final String genSinkName = builder.newProcessorName(SINK_NAME);
Expand Down Expand Up @@ -992,7 +1001,8 @@ public <VO, VR> KStream<K, VR> join(final KTable<K, VO> table,
final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(
name != null ? name : this.name,
joinedInternal.keySerde(),
joinedInternal.leftValueSerde()
joinedInternal.leftValueSerde(),
name != null
);
return thisStreamRepartitioned.doStreamTableJoin(table, joiner, joinedInternal, false);
} else {
Expand Down Expand Up @@ -1035,7 +1045,8 @@ public <VO, VR> KStream<K, VR> leftJoin(final KTable<K, VO> table,
final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(
name != null ? name : this.name,
joinedInternal.keySerde(),
joinedInternal.leftValueSerde()
joinedInternal.leftValueSerde(),
name != null
);
return thisStreamRepartitioned.doStreamTableJoin(table, joiner, joinedInternal, true);
} else {
Expand Down Expand Up @@ -1162,6 +1173,10 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
}
final String bufferName = name + "-Buffer";
if (joinedInternal.name() == null) {
final InternalResourcesNaming internalResourcesNaming = InternalResourcesNaming.build().withStateStore(bufferName).withChangelogTopic(bufferName + "-changelog");
internalTopologyBuilder().addUnprovidedInternalTopics(internalResourcesNaming);
}
bufferStoreBuilder = Optional.of(new RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferName, joinedInternal.gracePeriod(), name));
}

Expand Down
Loading

0 comments on commit 5aa6107

Please sign in to comment.