diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html index a7d82b97ccd62..7f9aaa60616d5 100644 --- a/docs/streams/developer-guide/config-streams.html +++ b/docs/streams/developer-guide/config-streams.html @@ -79,6 +79,7 @@
  • default.value.serde
  • deserialization.exception.handler
  • enable.metrics.push
  • +
  • ensure.explicit.internal.resource.naming
  • log.summary.interval.ms
  • max.task.idle.ms
  • max.warmup.replicas
  • @@ -348,17 +349,26 @@

    num.standby.replicaslog.summary.interval.ms + ensure.explicit.internal.resource.naming + High + + Whether to enforce explicit naming for all internal resources of the topology, including internal + topics (e.g., changelog and repartition topics) and their associated state stores. + When enabled, the application will refuse to start if any internal resource has an auto-generated name. + + false + + log.summary.interval.ms Low The output interval in milliseconds for logging summary information (disabled if negative). 120000 (2 minutes) - enable.metrics.push + enable.metrics.push Low Whether to enable pushing of client metrics to the cluster, if the cluster has a client metrics subscription which matches this client. true - max.task.idle.ms + max.task.idle.ms Medium

    @@ -377,76 +387,76 @@

    num.standby.replicas0 - max.warmup.replicas + max.warmup.replicas Medium The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once. 2 - metric.reporters + metric.reporters Low A list of classes to use as metrics reporters. the empty list - metrics.num.samples + metrics.num.samples Low The number of samples maintained to compute metrics. 2 - metrics.recording.level + metrics.recording.level Low The highest recording level for metrics. INFO - metrics.sample.window.ms + metrics.sample.window.ms Low The window of time in milliseconds a metrics sample is computed over. 30000 (30 seconds) - num.standby.replicas + num.standby.replicas High The number of standby replicas for each task. 0 - num.stream.threads + num.stream.threads Medium The number of threads to execute stream processing. 1 - probing.rebalance.interval.ms + probing.rebalance.interval.ms Low The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have sufficiently caught up. 600000 (10 minutes) - processing.exception.handler + processing.exception.handler Medium Exception handling class that implements the ProcessingExceptionHandler interface. LogAndFailProcessingExceptionHandler - processing.guarantee + processing.guarantee Medium The processing mode. Can be either "at_least_once" or "exactly_once_v2" (for EOS version 2, requires broker version 2.5+). See Processing Guarantee.. "at_least_once" - processor.wrapper.class + processor.wrapper.class Medium A class or class name implementing the ProcessorWrapper interface. Must be passed in when creating the topology, and will not be applied unless passed in to the appropriate constructor as a TopologyConfig. You should use the StreamsBuilder#new(TopologyConfig) constructor for DSL applications, and the Topology#new(TopologyConfig) constructor for PAPI applications. - production.exception.handler + production.exception.handler Medium Exception handling class that implements the ProductionExceptionHandler interface. DefaultProductionExceptionHandler - poll.ms + poll.ms Low The amount of time in milliseconds to block waiting for input. 100 - rack.aware.assignment.strategy + rack.aware.assignment.strategy Low The strategy used for rack aware assignment. Acceptable value are "none" (default), @@ -455,7 +465,7 @@

    num.standby.replicasRack Aware Assignment Strategy. "none" - rack.aware.assignment.tags Low List of tag keys used to distribute standby replicas across Kafka Streams clients. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over @@ -463,72 +473,72 @@

    num.standby.replicasRack Aware Assignment Tags. the empty list - rack.aware.assignment.non_overlap_cost + rack.aware.assignment.non_overlap_cost Low Cost associated with moving tasks from existing assignment. See Rack Aware Assignment Non-Overlap-Cost. null - rack.aware.assignment.non_overlap_cost + rack.aware.assignment.non_overlap_cost Low Cost associated with cross rack traffic. See Rack Aware Assignment Traffic-Cost. null - replication.factor + replication.factor Medium The replication factor for changelog topics and repartition topics created by the application. The default of -1 (meaning: use broker default replication factor) requires broker version 2.4 or newer. -1 - retry.backoff.ms + retry.backoff.ms Low The amount of time in milliseconds, before a request is retried. 100 - rocksdb.config.setter + rocksdb.config.setter Medium The RocksDB configuration. null - state.cleanup.delay.ms + state.cleanup.delay.ms Low The amount of time in milliseconds to wait before deleting state when a partition has migrated. 600000 (10 minutes) - state.dir + state.dir High Directory location for state stores. /${java.io.tmpdir}/kafka-streams - task.assignor.class + task.assignor.class Medium A task assignor class or class name implementing the TaskAssignor interface. The high-availability task assignor. - task.timeout.ms + task.timeout.ms Medium The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of 0 ms, a task would raise an error for the first internal error. For any timeout larger than 0 ms, a task will retry at least once before an error is raised. 300000 (5 minutes) - topology.optimization + topology.optimization Medium A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: StreamsConfig.NO_OPTIMIZATION (none), StreamsConfig.OPTIMIZE (all) or a comma separated list of specific optimizations: StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS (reuse.ktable.source.topics), StreamsConfig.MERGE_REPARTITION_TOPICS (merge.repartition.topics), StreamsConfig.SINGLE_STORE_SELF_JOIN (single.store.self.join). "NO_OPTIMIZATION" - upgrade.from + upgrade.from Medium The version you are upgrading from during a rolling upgrade. See Upgrade From null - windowstore.changelog.additional.retention.ms + windowstore.changelog.additional.retention.ms Low Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. 86400000 (1 day) - window.size.ms + window.size.ms Low Sets window size for the deserializer in order to calculate window end times. null @@ -753,6 +763,18 @@

    default.value.serdeData types and serialization.

    +
    +

    ensure.explicit.internal.resource.naming

    +
    +
    +

    + Whether to enforce explicit naming for all internal resources of the topology, including internal + topics (e.g., changelog and repartition topics) and their associated state stores. + When enabled, the application will refuse to start if any internal resource has an auto-generated name. +

    +
    +
    +

    rack.aware.assignment.non_overlap_cost

    diff --git a/docs/streams/developer-guide/dsl-topology-naming.html b/docs/streams/developer-guide/dsl-topology-naming.html index ec3bc857c10ef..832806050a593 100644 --- a/docs/streams/developer-guide/dsl-topology-naming.html +++ b/docs/streams/developer-guide/dsl-topology-naming.html @@ -300,6 +300,19 @@

    Conclusion

    Stream/Table non-stateful operationsNamed + + To further enforce best practices, Kafka Streams provides a configuration option, + ensure.explicit.internal.resource.naming: +
    /
    +            Properties props = new Properties();
    +            props.put(StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
    +		  
    + This parameter ensures that all internal topics, state stores, and changelog topics have explicitly defined names. When this configuration + is enabled, a Kafka Streams application will not start if any of these components rely on auto-generated names. This guarantees + stability across topology updates, as manually defined names remain unchanged even when new processors or transformations are added. + Enforcing explicit naming is particularly important in production environments, where consistency and backward compatibility are essential + for maintaining reliable stream processing applications. +

    diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 89fd62fd9d6b2..257187bfa452a 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -139,6 +139,16 @@

    < More details about the new config StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG can be found in KIP-295.

    +

    Streams API changes in 4.1.0

    + +

    + The introduction of KIP-1111 + enables you to enforce explicit naming for all internal resources of the topology, including internal topics (e.g., changelog and repartition topics) and their associated state stores. + This ensures that every internal resource is named before the Kafka Streams application is deployed, which is essential for upgrading your topology. + You can enable this feature via StreamsConfig using the StreamsConfig#ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG parameter. + When set to true, the application will refuse to start if any internal resource has an auto-generated name. +

    +

    Streams API changes in 4.0.0

    diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index 7037e8d7fd3a5..e56a4cbfb4e56 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -384,7 +384,7 @@ public synchronized GlobalKTable globalTable(final String topic, final MaterializedInternal> materializedInternal = new MaterializedInternal<>( - Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), + Materialized.>with(consumedInternal.keySerde(), consumedInternal.valueSerde()).withLoggingDisabled(), internalStreamsBuilder, topic + "-", true /* force materializing global tables */); @@ -457,7 +457,7 @@ public synchronized GlobalKTable globalTable(final String topic, Objects.requireNonNull(materialized, "materialized can't be null"); final ConsumedInternal consumedInternal = new ConsumedInternal<>(consumed); // always use the serdes from consumed - materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde()); + materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde()).withLoggingDisabled(); final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"); diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 6f0e21025c3bd..fa9e1bd48f8b2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -592,6 +592,12 @@ public class StreamsConfig extends AbstractConfig { public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable pushing of internal client metrics for (main, restore, and global) consumers, producers, and admin clients." + " The cluster must have a client metrics subscription which corresponds to a client."; + /** {@code ensure.explicit.internal.resource.naming} */ + public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG = "ensure.explicit.internal.resource.naming"; + static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC = "Whether to enforce explicit naming for all internal resources of the topology, including internal" + + " topics (e.g., changelog and repartition topics) and their associated state stores." + + " When enabled, the application will refuse to start if any internal resource has an auto-generated name."; + /** {@code log.summary.interval.ms} */ public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = "log.summary.interval.ms"; private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "The output interval in milliseconds for logging summary information.\n" + @@ -869,6 +875,11 @@ public class StreamsConfig extends AbstractConfig { Importance.HIGH, STATE_DIR_DOC, "${java.io.tmpdir}") + .define(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, + Type.BOOLEAN, + false, + Importance.HIGH, + ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC) // MEDIUM diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java index e96d5281d090d..da8c246b26d12 100644 --- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java @@ -55,6 +55,8 @@ import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DEFAULT; import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DOC; +import static org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC; import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY; import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC; @@ -142,7 +144,12 @@ public final class TopologyConfig extends AbstractConfig { Type.CLASS, DSL_STORE_SUPPLIERS_CLASS_DEFAULT, Importance.LOW, - DSL_STORE_SUPPLIERS_CLASS_DOC); + DSL_STORE_SUPPLIERS_CLASS_DOC) + .define(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, + Type.BOOLEAN, + false, + Importance.HIGH, + ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC); } private static final Logger log = LoggerFactory.getLogger(TopologyConfig.class); @@ -164,6 +171,8 @@ public final class TopologyConfig extends AbstractConfig { public final Supplier deserializationExceptionHandlerSupplier; public final Supplier processingExceptionHandlerSupplier; + public final boolean ensureExplicitInternalResourceNaming; + public TopologyConfig(final StreamsConfig configs) { this(null, configs, mkObjectProperties(configs.originals())); } @@ -272,6 +281,8 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo } else { dslStoreSuppliers = globalAppConfigs.getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG); } + + ensureExplicitInternalResourceNaming = globalAppConfigs.getBoolean(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG); } @Deprecated diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java index 2bd81e43d276d..f41f52dac8b1c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java @@ -59,7 +59,7 @@ KTable build(final Map, Aggregator valueSerde, final String queryableName, final boolean isOutputVersioned) { - processRepartitions(groupPatterns, storeFactory.storeName()); + processRepartitions(groupPatterns, storeFactory.storeName(), queryableName); final Collection processors = new ArrayList<>(); final Collection parentProcessors = new ArrayList<>(); @@ -94,7 +94,7 @@ KTable build(final Map final Serde valueSerde, final String queryableName, final Windows windows) { - processRepartitions(groupPatterns, storeFactory.storeName()); + processRepartitions(groupPatterns, storeFactory.storeName(), queryableName); final Collection processors = new ArrayList<>(); final Collection parentProcessors = new ArrayList<>(); @@ -135,7 +135,7 @@ KTable build(final Map, Aggregator sessionMerger) { - processRepartitions(groupPatterns, storeFactory.storeName()); + processRepartitions(groupPatterns, storeFactory.storeName(), queryableName); final Collection processors = new ArrayList<>(); final Collection parentProcessors = new ArrayList<>(); int counter = 0; @@ -175,7 +175,7 @@ KTable build(final Map, Aggregator valueSerde, final String queryableName, final SlidingWindows slidingWindows) { - processRepartitions(groupPatterns, storeFactory.storeName()); + processRepartitions(groupPatterns, storeFactory.storeName(), queryableName); final Collection parentProcessors = new ArrayList<>(); final Collection processors = new ArrayList<>(); int counter = 0; @@ -206,7 +206,8 @@ KTable build(final Map, Aggregator, Aggregator> groupPatterns, - final String storeName) { + final String storeName, + final String queryableName) { for (final KGroupedStreamImpl repartitionReqs : groupPatterns.keySet()) { if (repartitionReqs.repartitionRequired) { @@ -216,8 +217,9 @@ private void processRepartitions(final Map, Aggregator< final String repartitionNamePrefix = repartitionReqs.userProvidedRepartitionTopicName != null ? repartitionReqs.userProvidedRepartitionTopicName : storeName; - createRepartitionSource(repartitionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde); + final boolean isRepartitionTopicNameProvidedByUser = repartitionReqs.userProvidedRepartitionTopicName != null || queryableName != null; + createRepartitionSource(repartitionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde, isRepartitionTopicNameProvidedByUser); if (!parentNodes.containsKey(repartitionReqs)) { final GraphNode repartitionNode = repartitionNodeBuilder.build(); builder.addGraphNode(repartitionReqs.graphNode, repartitionNode); @@ -270,14 +272,16 @@ KTable createTable(final Collection processors, private void createRepartitionSource(final String repartitionTopicNamePrefix, final OptimizableRepartitionNodeBuilder optimizableRepartitionNodeBuilder, final Serde keySerde, - final Serde valueSerde) { + final Serde valueSerde, + final boolean isRepartitionTopicNameProvidedByUser) { KStreamImpl.createRepartitionedSource(builder, keySerde, (Serde) valueSerde, repartitionTopicNamePrefix, null, - (OptimizableRepartitionNodeBuilder) optimizableRepartitionNodeBuilder); + (OptimizableRepartitionNodeBuilder) optimizableRepartitionNodeBuilder, + isRepartitionTopicNameProvidedByUser); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java index b99034c5306b5..023513d0704f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java @@ -126,8 +126,10 @@ private KTable build(final String aggFunctionName, if (repartitionRequired) { final OptimizableRepartitionNodeBuilder repartitionNodeBuilder = optimizableRepartitionNodeBuilder(); + final String repartitionTopicPrefix = userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : storeName; - sourceName = createRepartitionSource(repartitionTopicPrefix, repartitionNodeBuilder); + + sourceName = createRepartitionSource(repartitionTopicPrefix, repartitionNodeBuilder, userProvidedRepartitionTopicName != null || queryableStoreName != null); // First time through we need to create a repartition node. // Any subsequent calls to GroupedStreamAggregateBuilder#build we check if @@ -157,14 +159,16 @@ private KTable build(final String aggFunctionName, * @return the new sourceName of the repartitioned source */ private String createRepartitionSource(final String repartitionTopicNamePrefix, - final OptimizableRepartitionNodeBuilder optimizableRepartitionNodeBuilder) { + final OptimizableRepartitionNodeBuilder optimizableRepartitionNodeBuilder, + final boolean isRepartitionTopicNameProvidedByUser) { return KStreamImpl.createRepartitionedSource(builder, keySerde, valueSerde, repartitionTopicNamePrefix, null, - optimizableRepartitionNodeBuilder); + optimizableRepartitionNodeBuilder, + isRepartitionTopicNameProvidedByUser); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index c1a5bff2e0599..968276bd501b0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -328,6 +328,9 @@ public void buildAndOptimizeTopology(final Properties props) { } } internalTopologyBuilder.validateCopartition(); + + internalTopologyBuilder.checkUnprovidedNames(); + } /** @@ -588,7 +591,8 @@ private OptimizableRepartitionNode createRepartitionNode(final Stri valueSerde, repartitionTopicName, null, - repartitionNodeBuilder + repartitionNodeBuilder, + true ); // ensures setting the repartition topic to the name of the diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 8fb1327955e60..1927aed03faed 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -58,6 +58,7 @@ import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.internals.InternalResourcesNaming; import org.apache.kafka.streams.processor.internals.InternalTopicProperties; import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor; import org.apache.kafka.streams.state.KeyValueStore; @@ -525,7 +526,8 @@ private KStream doRepartition(final Repartitioned repartitioned) { valueSerde, name, repartitionedInternal.streamPartitioner(), - unoptimizableRepartitionNodeBuilder.withInternalTopicProperties(internalTopicProperties) + unoptimizableRepartitionNodeBuilder.withInternalTopicProperties(internalTopicProperties), + repartitionedInternal.name() != null ); final UnoptimizableRepartitionNode unoptimizableRepartitionNode = unoptimizableRepartitionNodeBuilder.build(); @@ -633,7 +635,8 @@ public KTable toTable(final Named named, valueSerdeOverride, name, null, - repartitionNodeBuilder + repartitionNodeBuilder, + namedInternal.name() != null ); tableParentNode = repartitionNodeBuilder.build(); @@ -895,21 +898,23 @@ private KStream doJoin( if (joinThis.repartitionRequired) { final String joinThisName = joinThis.name; final String leftJoinRepartitionTopicName = name.suffixWithOrElseGet("-left", joinThisName); + joinThis = joinThis.repartitionForJoin( leftJoinRepartitionTopicName, streamJoinedInternal.keySerde(), - streamJoinedInternal.valueSerde() - ); + streamJoinedInternal.valueSerde(), + name.name() != null); } if (joinOther.repartitionRequired) { final String joinOtherName = joinOther.name; final String rightJoinRepartitionTopicName = name.suffixWithOrElseGet("-right", joinOtherName); + joinOther = joinOther.repartitionForJoin( rightJoinRepartitionTopicName, streamJoinedInternal.keySerde(), - streamJoinedInternal.otherValueSerde() - ); + streamJoinedInternal.otherValueSerde(), + name.name() != null); } joinThis.ensureCopartitionWith(Collections.singleton(joinOther)); @@ -928,7 +933,8 @@ private KStream doJoin( */ private KStreamImpl repartitionForJoin(final String repartitionName, final Serde keySerdeOverride, - final Serde valueSerdeOverride) { + final Serde valueSerdeOverride, + final boolean isRepartitionTopicNameProvidedByUser) { final Serde repartitionKeySerde = keySerdeOverride != null ? keySerdeOverride : keySerde; final Serde repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : valueSerde; final OptimizableRepartitionNodeBuilder optimizableRepartitionNodeBuilder = @@ -942,7 +948,8 @@ private KStreamImpl repartitionForJoin(final String repartitionName, repartitionValueSerde, repartitionName, null, - optimizableRepartitionNodeBuilder); + optimizableRepartitionNodeBuilder, + isRepartitionTopicNameProvidedByUser); if (repartitionNode == null || !name.equals(repartitionName)) { repartitionNode = optimizableRepartitionNodeBuilder.build(); @@ -965,11 +972,15 @@ static > Str final Serde valueSerde, final String repartitionTopicNamePrefix, final StreamPartitioner streamPartitioner, - final BaseRepartitionNodeBuilder baseRepartitionNodeBuilder - ) { + final BaseRepartitionNodeBuilder baseRepartitionNodeBuilder, + final boolean isRepartitionTopicNameProvidedByUser) { + final String repartitionTopicName = repartitionTopicNamePrefix.endsWith(REPARTITION_TOPIC_SUFFIX) ? repartitionTopicNamePrefix : repartitionTopicNamePrefix + REPARTITION_TOPIC_SUFFIX; + if (!isRepartitionTopicNameProvidedByUser) { + builder.internalTopologyBuilder().addImplicitInternalNames(InternalResourcesNaming.builder().withRepartitionTopic(repartitionTopicName).build()); + } // Always need to generate the names to burn index counter for compatibility final String genSinkName = builder.newProcessorName(SINK_NAME); @@ -1051,7 +1062,8 @@ public KStream join( final KStreamImpl thisStreamRepartitioned = repartitionForJoin( name != null ? name : this.name, joinedInternal.keySerde(), - joinedInternal.leftValueSerde() + joinedInternal.leftValueSerde(), + name != null ); return thisStreamRepartitioned.doStreamTableJoin(table, joiner, joinedInternal, false); } else { @@ -1091,7 +1103,8 @@ public KStream leftJoin(final KTable table, final KStreamImpl thisStreamRepartitioned = repartitionForJoin( name != null ? name : this.name, joinedInternal.keySerde(), - joinedInternal.leftValueSerde() + joinedInternal.leftValueSerde(), + name != null ); return thisStreamRepartitioned.doStreamTableJoin(table, joiner, joinedInternal, true); } else { @@ -1124,6 +1137,11 @@ private KStream doStreamTableJoin(final KTable processorSupplier = new KStreamKTableJoin<>( diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java index aeece23cf3430..902079863fde2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java @@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode; import org.apache.kafka.streams.kstream.internals.graph.WindowedStreamProcessorNode; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalResourcesNaming; import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper; import org.apache.kafka.streams.processor.internals.StoreFactory; import org.apache.kafka.streams.state.Stores; @@ -168,6 +169,14 @@ public KStream join(final KStream lhs, ); } + if (userProvidedBaseStoreName == null) { + addInternalResourceName(thisWindowStore); + addInternalResourceName(otherWindowStore); + if (outerJoinWindowStore.isPresent()) { + addInternalResourceName(outerJoinWindowStore.get()); + } + } + // Time-shared between joins to keep track of the maximum stream time final TimeTrackerSupplier sharedTimeTrackerSupplier = new TimeTrackerSupplier(); @@ -261,4 +270,12 @@ private static StoreFactory joinWindowStoreBuilderFromSupplier(final Wind valueSerde )); } + + private void addInternalResourceName(final StoreFactory windowStore) { + final InternalResourcesNaming.Builder thisInternalResourcesNaming = InternalResourcesNaming.builder().withStateStore(windowStore.storeName()); + if (windowStore.loggingEnabled()) { + thisInternalResourcesNaming.withChangelogTopic(windowStore.storeName() + "-changelog"); + } + builder.internalTopologyBuilder().addImplicitInternalNames(thisInternalResourcesNaming.build()); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 7f37a27149ec9..e9e9035981dd1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -66,6 +66,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.internals.InternalResourcesNaming; import org.apache.kafka.streams.processor.internals.InternalTopicProperties; import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor; import org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier; @@ -549,8 +550,15 @@ public KTable suppress(final Suppressed suppressed) { final SuppressedInternal suppressedInternal = buildSuppress(suppressed, name); - final String storeName = - suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : builder.newStoreName(SUPPRESS_NAME); + final String storeName; + if (suppressedInternal.name() != null) { + storeName = suppressedInternal.name() + "-store"; + } else { + storeName = builder.newStoreName(SUPPRESS_NAME); + if (suppressedInternal.bufferConfig().isLoggingEnabled()) { + internalTopologyBuilder().addImplicitInternalNames(InternalResourcesNaming.builder().withChangelogTopic(storeName + "-changelog").build()); + } + } final StoreBuilder>> storeBuilder; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java index d6cd130ba6db6..7bd727b09be3b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java @@ -21,6 +21,7 @@ import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.InternalResourcesNaming; import org.apache.kafka.streams.state.DslStoreSuppliers; import org.apache.kafka.streams.state.StoreSupplier; @@ -53,6 +54,13 @@ public MaterializedInternal(final Materialized materialized, queryable = forceQueryable || storeName() != null; if (storeName() == null && nameProvider != null) { storeName = nameProvider.newStoreName(generatedStorePrefix); + if (nameProvider instanceof InternalStreamsBuilder) { + final InternalResourcesNaming.Builder internalResourcesNaming = InternalResourcesNaming.builder().withStateStore(storeName); + if (loggingEnabled()) { + internalResourcesNaming.withChangelogTopic(storeName + "-changelog"); + } + ((InternalStreamsBuilder) nameProvider).internalTopologyBuilder().addImplicitInternalNames(internalResourcesNaming.build()); + } } // if store type is not configured during creating Materialized, then try to get the topologyConfigs from nameProvider diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalResourcesNaming.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalResourcesNaming.java new file mode 100644 index 0000000000000..f4b46088f3a7a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalResourcesNaming.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +public final class InternalResourcesNaming { + + private final String repartitionTopic; + private final String changelogTopic; + private final String stateStore; + + private InternalResourcesNaming(final Builder builder) { + this.repartitionTopic = builder.repartitionTopic; + this.changelogTopic = builder.changelogTopic; + this.stateStore = builder.stateStore; + } + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + private String repartitionTopic; + private String changelogTopic; + private String stateStore; + + private Builder() {} + + public Builder withRepartitionTopic(final String repartitionTopic) { + this.repartitionTopic = repartitionTopic; + return this; + } + + public Builder withChangelogTopic(final String changelogTopic) { + this.changelogTopic = changelogTopic; + return this; + } + + public Builder withStateStore(final String stateStore) { + this.stateStore = stateStore; + return this; + } + + public InternalResourcesNaming build() { + return new InternalResourcesNaming(this); + } + } + + public String repartitionTopic() { + return repartitionTopic; + } + + public String changelogTopic() { + return changelogTopic; + } + + public String stateStore() { + return stateStore; + } +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 23f4f343fad23..2503a35841963 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.errors.TopologyException; @@ -53,6 +54,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -64,12 +66,14 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +import static org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG; import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG; public class InternalTopologyBuilder { public InternalTopologyBuilder() { this.topologyName = null; + this.ensureExplicitInternalResourceNaming = false; this.processorWrapper = new NoOpProcessorWrapper(); } @@ -78,7 +82,7 @@ public InternalTopologyBuilder(final TopologyConfig topologyConfigs) { this.topologyConfigs = topologyConfigs; this.topologyName = topologyConfigs.topologyName; - + this.ensureExplicitInternalResourceNaming = topologyConfigs.ensureExplicitInternalResourceNaming; try { processorWrapper = topologyConfigs.getConfiguredInstance( PROCESSOR_WRAPPER_CLASS_CONFIG, @@ -194,6 +198,10 @@ public InternalTopologyBuilder(final TopologyConfig topologyConfigs) { private boolean hasPersistentStores = false; + private final boolean ensureExplicitInternalResourceNaming; + + private final Set implicitInternalNames = new LinkedHashSet<>(); + public static class ReprocessFactory { private final ProcessorSupplier processorSupplier; @@ -2293,4 +2301,46 @@ public WrappedProcessorSupplier wra processorWrapper.wrapProcessorSupplier(name, processorSupplier) ); } + + public void addImplicitInternalNames(final InternalResourcesNaming internalResourcesNaming) { + implicitInternalNames.add(internalResourcesNaming); + } + + public void checkUnprovidedNames() { + if (!implicitInternalNames.isEmpty()) { + final StringBuilder result = new StringBuilder(); + final List changelogTopics = new ArrayList<>(); + final List stateStores = new ArrayList<>(); + final List repartitionTopics = new ArrayList<>(); + for (final InternalResourcesNaming internalResourcesNaming : implicitInternalNames) { + if (!Utils.isBlank(internalResourcesNaming.changelogTopic())) { + changelogTopics.add(internalResourcesNaming.changelogTopic()); + } + if (!Utils.isBlank(internalResourcesNaming.stateStore())) { + stateStores.add(internalResourcesNaming.stateStore()); + } + if (!Utils.isBlank(internalResourcesNaming.repartitionTopic())) { + repartitionTopics.add(internalResourcesNaming.repartitionTopic()); + } + } + if (!changelogTopics.isEmpty()) { + result.append(String.format("Following changelog topic(s) has not been named: %s%n", String.join(", ", changelogTopics))); + } + if (!stateStores.isEmpty()) { + result.append(String.format("Following state store(s) has not been named: %s%n", String.join(", ", stateStores))); + } + if (!repartitionTopics.isEmpty()) { + result.append(String.format("Following repartition topic(s) has not been named: %s%n", String.join(", ", repartitionTopics))); + } + if (ensureExplicitInternalResourceNaming) { + throw new TopologyException(result.toString()); + } else { + log.warn("Explicit naming for internal resources is currently disabled. If you want to enforce" + + " user-defined names for all internal resources, set " + ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG + + " to true. Note: Changing internal resource names may require a full streams application reset for an" + + " already deployed application. Consult the documentation on naming operators for more details. {}", result); + } + } + } + } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index 08e413703c13a..a6c7980631980 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -22,9 +22,11 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.Topology.AutoOffsetReset; import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Branched; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.Joined; @@ -35,12 +37,14 @@ import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Printed; import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Repartitioned; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.SlidingWindows; import org.apache.kafka.streams.kstream.StreamJoined; import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.TableJoined; import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; @@ -54,6 +58,7 @@ import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; import org.apache.kafka.streams.state.internals.InMemorySessionStore; @@ -90,6 +95,7 @@ import java.util.regex.Pattern; import static java.util.Arrays.asList; +import static org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG; import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0; @@ -103,6 +109,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -2354,6 +2361,552 @@ public void shouldNowAllowStreamAndTableFromSameTopic() { assertThrows(TopologyException.class, builder::build); } + @Test + void shouldThrowWhenGroupByAggregationWithRepartitionNameAndLoggingEnabled() { + final StreamsBuilder builder = buildWithGroupByAggregationTopology( + Grouped.with("repartition-name", Serdes.String(), Serdes.String()), + Materialized.with(Serdes.String(), Serdes.Long()) + ); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog")); + assertTrue(e.getMessage().contains("Following state store(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003")); + assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named")); + } + + @Test + void shouldThrowWhenGroupByAggregationWithRepartitionNameAndLoggingDisabled() { + final StreamsBuilder builder = buildWithGroupByAggregationTopology( + Grouped.with("repartition-name", Serdes.String(), Serdes.String()), + Materialized.>with(Serdes.String(), Serdes.Long()) + .withLoggingDisabled() + ); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertTrue(e.getMessage().contains("Following state store(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003")); + assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named")); + } + + @Test + void shouldNotThrowWhenGroupByAggregationWithMaterializedName() { + final StreamsBuilder builder = buildWithGroupByAggregationTopology( + Grouped.with(Serdes.String(), Serdes.String()), + Materialized.>as("materialized-name") + .withKeySerde(Serdes.String()).withValueSerde(Serdes.Long()) + ); + assertBuildDoesNotThrow(builder); + } + + @Test + void shouldNotThrowWhenGroupByAggregationWithRepartitionNameAndMaterialized() { + final StreamsBuilder builder = buildWithGroupByAggregationTopology( + Grouped.with("repartition-name", Serdes.String(), Serdes.String()), + Materialized.>as("materialized-name") + .withKeySerde(Serdes.String()).withValueSerde(Serdes.Long()) + ); + assertBuildDoesNotThrow(builder); + } + + @Test + void shouldThrowWhenGroupByAggregationWithoutRepartitionNameAndMaterializedName() { + final StreamsBuilder builder = buildWithGroupByAggregationTopology( + Grouped.with(Serdes.String(), Serdes.String()), + Materialized.with(Serdes.String(), Serdes.Long()) + ); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog")); + assertTrue(e.getMessage().contains("Following state store(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003")); + assertTrue(e.getMessage().contains("Following repartition topic(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition")); + } + + private StreamsBuilder buildWithGroupByAggregationTopology(final Grouped grouped, + final Materialized> materialized) { + + final Map props = dummyStreamsConfigMap(); + props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true); + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + final KStream stream = builder.stream("input1"); + stream + .groupBy((k, v) -> v, grouped) + .count(materialized) + .toStream() + .to("output", Produced.as("sink")); + return builder; + } + + @Test + void shouldThrowWhenGroupByKeyAggregationWithRepartitionNameAndLoggingEnabled() { + final StreamsBuilder builder = buildWithGroupByKeyAggregationTopology( + Grouped.with("repartition-name", Serdes.String(), Serdes.String()), + Materialized.with(Serdes.String(), Serdes.Long()) + ); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog")); + assertTrue(e.getMessage().contains("Following state store(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003")); + assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named")); + } + + @Test + void shouldThrowWhenGroupByKeyAggregationWithRepartitionNameAndLoggingDisabled() { + final StreamsBuilder builder = buildWithGroupByKeyAggregationTopology( + Grouped.with("repartition-name", Serdes.String(), Serdes.String()), + Materialized.>with(Serdes.String(), Serdes.Long()) + .withLoggingDisabled() + ); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertTrue(e.getMessage().contains("Following state store(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003")); + assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named")); + } + + @Test + void shouldNotThrowWhenGroupByKeyAggregationWithMaterializedName() { + final StreamsBuilder builder = buildWithGroupByKeyAggregationTopology( + Grouped.with(Serdes.String(), Serdes.String()), + Materialized.>as("materialized-name") + .withKeySerde(Serdes.String()).withValueSerde(Serdes.Long()) + ); + assertBuildDoesNotThrow(builder); + } + + @Test + void shouldNotThrowWhenGroupByKeyAggregationWithRepartitionNameAndMaterializedName() { + final StreamsBuilder builder = buildWithGroupByKeyAggregationTopology( + Grouped.with("repartition-name", Serdes.String(), Serdes.String()), + Materialized.>as("materialized-name") + .withKeySerde(Serdes.String()).withValueSerde(Serdes.Long()) + ); + assertBuildDoesNotThrow(builder); + } + + @Test + void shouldThrowWhenGroupByKeyAggregationWithoutRepartitionNameAndMaterializedName() { + final StreamsBuilder builder = buildWithGroupByKeyAggregationTopology( + Grouped.with(Serdes.String(), Serdes.String()), + Materialized.with(Serdes.String(), Serdes.Long()) + ); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog")); + assertTrue(e.getMessage().contains("Following state store(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003")); + assertTrue(e.getMessage().contains("Following repartition topic(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition")); + } + + private StreamsBuilder buildWithGroupByKeyAggregationTopology(final Grouped grouped, + final Materialized> materialized) { + final Map props = dummyStreamsConfigMap(); + props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true); + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + final KStream stream = builder.stream("input1"); + stream + .selectKey((k, v) -> v) + .groupByKey(grouped) + .count(materialized) + .toStream() + .to("output", Produced.as("sink")); + return builder; + } + + @Test + void shouldNotThrowWhenSuppressWithSuppressName() { + final StreamsBuilder builder = buildAggregationWithSuppressTopology(true, true); + assertBuildDoesNotThrow(builder); + } + + @Test + void shouldThrowWhenSuppressWithoutSuppressName() { + final StreamsBuilder builder = buildAggregationWithSuppressTopology(false, true); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: KTABLE-SUPPRESS-STATE-STORE-0000000003-changelog")); + assertFalse(e.getMessage().contains("Following state store(s) has not been named")); + assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named")); + } + + @Test + void shouldThrowWhenSuppressWithoutSuppressNameAndLoggingDisabled() { + final StreamsBuilder builder = buildAggregationWithSuppressTopology(false, false); + assertBuildDoesNotThrow(builder); + } + + private StreamsBuilder buildAggregationWithSuppressTopology(final boolean isSuppressNamed, + final boolean isLoggingEnabled) { + + final Map props = dummyStreamsConfigMap(); + props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true); + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + final KStream stream = builder.stream("input1"); + final KTable, Long> table = stream + .groupByKey() + .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1))) + .count(Materialized.as("materialized-name")); + if (isSuppressNamed) { + table.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()) + .withName("suppressed-name")) + .toStream() + .to("output", Produced.as("sink")); + } else { + if (isLoggingEnabled) { + table.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) + .toStream() + .to("output", Produced.as("sink")); + } else { + table.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().withLoggingDisabled())) + .toStream() + .to("output", Produced.as("sink")); + } + } + return builder; + } + + @Test + void shouldThrowWhenKStreamKStreamJoinWithRepartitionNameAndLoggingEnabled() { + final StreamsBuilder builder = buildKStreamKStreamJoinTopology( + StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()) + .withName("repartition-name") + ); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: KSTREAM-JOINTHIS-0000000012-store-changelog, KSTREAM-OUTEROTHER-0000000013-store-changelog, KSTREAM-OUTERSHARED-0000000012-store-changelog")); + assertTrue(e.getMessage().contains("Following state store(s) has not been named: KSTREAM-JOINTHIS-0000000012-store, KSTREAM-OUTEROTHER-0000000013-store, KSTREAM-OUTERSHARED-0000000012-store")); + assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named")); + } + + @Test + void shouldThrowWhenKStreamKStreamJoinWithRepartitionNameAndLoggingDisabled() { + final StreamsBuilder builder = buildKStreamKStreamJoinTopology( + StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()) + .withName("repartition-name").withLoggingDisabled() + ); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertTrue(e.getMessage().contains("Following state store(s) has not been named: KSTREAM-JOINTHIS-0000000012-store, KSTREAM-OUTEROTHER-0000000013-store, KSTREAM-OUTERSHARED-0000000012-store")); + assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named")); + } + + @Test + void shouldThrowWhenKStreamKStreamJoinWithMaterializedName() { + final StreamsBuilder builder = buildKStreamKStreamJoinTopology( + StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()) + .withStoreName("store-name") + ); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertFalse(e.getMessage().contains("Following changelog topic(s) has not been named")); + assertFalse(e.getMessage().contains("Following state store(s) has not been named")); + assertTrue(e.getMessage().contains("Following repartition topic(s) has not been named: KSTREAM-KEY-SELECT-0000000002-repartition, KSTREAM-KEY-SELECT-0000000003-repartition")); + } + + @Test + void shouldNotThrowWhenKStreamKStreamJoinWithRepartitionNameAndMaterializedName() { + final StreamsBuilder builder = buildKStreamKStreamJoinTopology( + StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()) + .withName("repartition-name") + .withStoreName("store-name") + ); + assertBuildDoesNotThrow(builder); + } + + @Test + void shouldThrowWhenKStreamKStreamJoinWithoutRepartitionNameAndMaterializedName() { + final StreamsBuilder builder = buildKStreamKStreamJoinTopology( + StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()) + ); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: KSTREAM-JOINTHIS-0000000012-store-changelog, KSTREAM-OUTEROTHER-0000000013-store-changelog, KSTREAM-OUTERSHARED-0000000012-store-changelog")); + assertTrue(e.getMessage().contains("Following state store(s) has not been named: KSTREAM-JOINTHIS-0000000012-store, KSTREAM-OUTEROTHER-0000000013-store, KSTREAM-OUTERSHARED-0000000012-store")); + assertTrue(e.getMessage().contains("Following repartition topic(s) has not been named: KSTREAM-KEY-SELECT-0000000002-repartition, KSTREAM-KEY-SELECT-0000000003-repartition")); + } + + private StreamsBuilder buildKStreamKStreamJoinTopology(final StreamJoined streamJoined) { + + final Map props = dummyStreamsConfigMap(); + props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true); + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + final KStream streamOne = builder.stream(STREAM_TOPIC); + final KStream streamTwo = builder.stream(STREAM_TOPIC_TWO); + streamOne + .selectKey((k, v) -> v) + .leftJoin( + streamTwo.selectKey((k, v) -> v), + (value1, value2) -> value1, + JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)), + streamJoined + ); + return builder; + } + + @Test + void shouldThrowWhenKStreamKTableJoinWithRepartitionNameAndLoggingEnabled() { + final StreamsBuilder builder = buildKStreamKTableJoinTopology( + Joined.with(Serdes.String(), Serdes.String(), Serdes.String()).withName("repartition-name"), + Materialized.with(Serdes.String(), Serdes.String()) + ); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: stream-topic-two-STATE-STORE-0000000001-changelog")); + assertTrue(e.getMessage().contains("Following state store(s) has not been named: stream-topic-two-STATE-STORE-0000000001")); + assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named")); + } + + @Test + void shouldThrowWhenKStreamKTableJoinWithRepartitionNameAndLoggingDisabled() { + final StreamsBuilder builder = buildKStreamKTableJoinTopology( + Joined.with(Serdes.String(), Serdes.String(), Serdes.String()).withName("repartition-name"), + Materialized.>with(Serdes.String(), Serdes.String()) + .withLoggingDisabled() + ); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertTrue(e.getMessage().contains("Following state store(s) has not been named: stream-topic-two-STATE-STORE-0000000001")); + assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named")); + } + + @Test + void shouldThrowWhenKStreamKTableJoinWithMaterializedName() { + final StreamsBuilder builder = buildKStreamKTableJoinTopology( + Joined.with(Serdes.String(), Serdes.String(), Serdes.String()), + Materialized.>as("materialized-name") + .withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) + ); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertFalse(e.getMessage().contains("Following changelog topic(s) has not been named")); + assertFalse(e.getMessage().contains("Following state store(s) has not been named")); + assertTrue(e.getMessage().contains("Following repartition topic(s) has not been named: KSTREAM-KEY-SELECT-0000000003-repartition")); + } + + @Test + void shouldNotThrowWhenKStreamKTableJoinWithRepartitionNameAndMaterializedName() { + final StreamsBuilder builder = buildKStreamKTableJoinTopology( + Joined.with(Serdes.String(), Serdes.String(), Serdes.String()).withName("repartition-name"), + Materialized.>as("materialized-name") + .withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) + ); + assertBuildDoesNotThrow(builder); + } + + @Test + void shouldThrowWhenKStreamKTableJoinWithoutRepartitionNameAndMaterializedName() { + final StreamsBuilder builder = buildKStreamKTableJoinTopology( + Joined.with(Serdes.String(), Serdes.String(), Serdes.String()), + Materialized.with(Serdes.String(), Serdes.String()) + ); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: stream-topic-two-STATE-STORE-0000000001-changelog")); + assertTrue(e.getMessage().contains("Following state store(s) has not been named: stream-topic-two-STATE-STORE-0000000001")); + assertTrue(e.getMessage().contains("Following repartition topic(s) has not been named: KSTREAM-KEY-SELECT-0000000004-repartition")); + } + + private StreamsBuilder buildKStreamKTableJoinTopology(final Joined joined, + final Materialized> materialized) { + + final Map props = dummyStreamsConfigMap(); + props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true); + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + final KStream stream = builder.stream(STREAM_TOPIC); + final KTable table = builder.table(STREAM_TOPIC_TWO, materialized); + stream + .selectKey((k, v) -> v) + .join( + table, + (value1, value2) -> value1, + joined + ); + return builder; + } + + + @Test + void shouldNotThrowWhenKStreamVersionedKTableJoinWithRepartitionName() { + final StreamsBuilder builder = buildKStreamVersionedKTableJoinTopology( + Joined.with(Serdes.String(), Serdes.String(), Serdes.String()).withName("repartition-name") + + ); + assertBuildDoesNotThrow(builder); + } + + @Test + void shouldThrowWhenKStreamVersionedKTableJoinWithoutRepartitionName() { + final StreamsBuilder builder = buildKStreamVersionedKTableJoinTopology( + Joined.with(Serdes.String(), Serdes.String(), Serdes.String()) + ); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: KSTREAM-JOIN-0000000007-Buffer-changelog")); + assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: KSTREAM-JOIN-0000000007-Buffer")); + assertTrue(e.getMessage().contains("Following repartition topic(s) has not been named: KSTREAM-KEY-SELECT-0000000003-repartition")); + } + + private StreamsBuilder buildKStreamVersionedKTableJoinTopology(final Joined joined) { + + final Map props = dummyStreamsConfigMap(); + props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true); + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + final VersionedBytesStoreSupplier versionedStoreSupplier = + Stores.persistentVersionedKeyValueStore("versioned-ktable-store", + Duration.ofDays(1)); + final Materialized> materialized = + Materialized.as(versionedStoreSupplier) + .withKeySerde(Serdes.String()).withValueSerde(Serdes.String()); + final KStream stream = builder.stream(STREAM_TOPIC); + final KTable table = builder.table(STREAM_TOPIC_TWO, materialized); + stream + .selectKey((k, v) -> v) + .join( + table, + (value1, value2) -> value1, + joined.withGracePeriod(Duration.ofHours(1)) + ) + .to("test-topic"); + return builder; + } + + @Test + void shouldNotThrowWhenKStreamGlobalKTableJoinWithMaterializedName() { + final StreamsBuilder builder = buildKStreamGlobalKTableJoinTopology( + Materialized.>as("materialized-name") + .withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) + ); + assertBuildDoesNotThrow(builder); + } + + @Test + void shouldThrowWhenKStreamGlobalKTableJoinWithoutStoreName() { + final StreamsBuilder builder = buildKStreamGlobalKTableJoinTopology(null); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertFalse(e.getMessage().contains("Following changelog topic(s) has not been named")); + assertTrue(e.getMessage().contains("Following state store(s) has not been named: stream-topic-two-STATE-STORE-0000000001")); + assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named")); + + } + + private StreamsBuilder buildKStreamGlobalKTableJoinTopology(final Materialized> materialized) { + + final Map props = dummyStreamsConfigMap(); + props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true); + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + final KStream stream = builder.stream(STREAM_TOPIC); + final GlobalKTable globalTable; + if (materialized != null) { + globalTable = builder.globalTable(STREAM_TOPIC_TWO, materialized); + } else { + globalTable = builder.globalTable(STREAM_TOPIC_TWO); + } + stream + .selectKey((k, v) -> v) + .join( + globalTable, + (k, v) -> k, + (value1, value2) -> value1 + ); + return builder; + } + + @Test + void shouldNotThrowWhenRepartitionWithRepartitionName() { + final StreamsBuilder builder = buildRepartitionTopology( + Repartitioned.with(Serdes.String(), Serdes.String()) + .withName("repartition-name") + ); + assertBuildDoesNotThrow(builder); + } + + @Test + void shouldThrowWhenRepartitionWithoutRepartition() { + final StreamsBuilder builder = buildRepartitionTopology( + Repartitioned.with(Serdes.String(), Serdes.String()) + ); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertFalse(e.getMessage().contains("Following changelog topic(s) has not been named")); + assertFalse(e.getMessage().contains("Following state store(s) has not been named")); + assertTrue(e.getMessage().contains("Following repartition topic(s) has not been named: KSTREAM-REPARTITION-0000000001-repartition")); + } + + private StreamsBuilder buildRepartitionTopology(final Repartitioned repartitioned) { + + final Map props = dummyStreamsConfigMap(); + props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true); + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + final KStream stream = builder.stream("input1"); + stream + .repartition(repartitioned) + .to("output", Produced.as("sink")); + return builder; + } + + @Test + void shouldThrowWhenCoGroupWithRepartitionNameAndLoggingEnabled() { + final StreamsBuilder builder = buildCoGroupTopology( + Grouped.with("repartition-name", Serdes.String(), Serdes.String()), + Materialized.with(Serdes.String(), Serdes.String()) + ); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog")); + assertTrue(e.getMessage().contains("Following state store(s) has not been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003")); + assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named")); + } + + @Test + void shouldThrowWhenCoGroupWithRepartitionNameAndLoggingDisabled() { + final StreamsBuilder builder = buildCoGroupTopology( + Grouped.with("repartition-name", Serdes.String(), Serdes.String()), + Materialized.>with(Serdes.String(), Serdes.String()) + .withLoggingDisabled() + ); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertTrue(e.getMessage().contains("Following state store(s) has not been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003")); + assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named")); + } + + @Test + void shouldNotThrowWhenCoGroupWithMaterializedName() { + final StreamsBuilder builder = buildCoGroupTopology( + Grouped.with(Serdes.String(), Serdes.String()), + Materialized.>as("materialized-name") + .withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) + ); + assertBuildDoesNotThrow(builder); + } + + @Test + void shouldNotThrowWhenCoGroupWithRepartitionNameAndMaterializedName() { + final StreamsBuilder builder = buildCoGroupTopology( + Grouped.with("repartition-name", Serdes.String(), Serdes.String()), + Materialized.>as("materialized-name") + .withKeySerde(Serdes.String()).withValueSerde(Serdes.String()) + ); + assertBuildDoesNotThrow(builder); + } + + @Test + void shouldThrowWhenCoGroupWithoutRepartitionNameAndMaterializedName() { + final StreamsBuilder builder = buildCoGroupTopology( + Grouped.with(Serdes.String(), Serdes.String()), + Materialized.with(Serdes.String(), Serdes.String()) + ); + final TopologyException e = assertThrows(TopologyException.class, builder::build); + assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog")); + assertTrue(e.getMessage().contains("Following state store(s) has not been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003")); + assertTrue(e.getMessage().contains("Following repartition topic(s) has not been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition")); + } + + private StreamsBuilder buildCoGroupTopology(final Grouped grouped, + final Materialized> materialized) { + + final Map props = dummyStreamsConfigMap(); + props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true); + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + final KStream streamOne = builder.stream(STREAM_TOPIC); + final KStream streamTwo = builder.stream(STREAM_TOPIC_TWO); + + final KGroupedStream groupedOne = streamOne.groupBy((k, v) -> v, grouped); + final KGroupedStream groupedTwo = streamTwo.groupByKey(); + + final Aggregator agg1 = (key, value, aggregate) -> aggregate + value; + final Aggregator agg2 = (key, value, aggregate) -> aggregate + value; + + final KTable coGroupedStream = groupedOne + .cogroup(agg1) + .cogroup(groupedTwo, agg2) + .aggregate(() -> "", materialized); + + coGroupedStream.toStream().to("output"); + + return builder; + } + private static void assertBuildDoesNotThrow(final StreamsBuilder builder) { try { builder.build(); diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index ce99358ad6da5..7e9447851e1e8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -69,6 +69,7 @@ import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE; import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.ENABLE_METRICS_PUSH_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH; import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH; @@ -129,6 +130,7 @@ public void shouldNotLeakInternalDocMembers() { case "DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC": case "DSL_STORE_SUPPLIERS_CLASS_DOC": case "PROCESSOR_WRAPPER_CLASS_DOC": + case "ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC": continue; // check for leaking, but already deprecated members @@ -1582,6 +1584,18 @@ public void shouldUseOldProductionExceptionHandlerWhenOnlyOldConfigIsSet() { assertEquals(RecordCollectorTest.ProductionExceptionHandlerMock.class, streamsConfig.productionExceptionHandler().getClass()); } + @Test + public void shouldGetDefaultEnsureExplicitInternalResourceNaming() { + assertFalse(streamsConfig.getBoolean(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG)); + } + + @Test + public void shouldEnsureExplicitInternalResourceNaming() { + props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true); + streamsConfig = new StreamsConfig(props); + assertTrue(streamsConfig.getBoolean(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG)); + } + static class MisconfiguredSerde implements Serde { @Override public void configure(final Map configs, final boolean isKey) {