diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 241df1cd3ec5..5c8c5ef092d9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -48,8 +48,8 @@ * A {@code KStream} can be transformed record by record, joined with another {@code KStream}, {@link KTable}, * {@link GlobalKTable}, or can be aggregated into a {@link KTable}. * Kafka Streams DSL can be mixed-and-matched with Processor API (PAPI) (c.f. {@link Topology}) via - * {@link #process(ProcessorSupplier, String...) process(...)} and {@link #transformValues(ValueTransformerSupplier, - * String...) transformValues(...)}. + * {@link #process(ProcessorSupplier, String...) process(...)} and {@link #processValues(FixedKeyProcessorSupplier, + * String...) processValues(...)}. * * @param Type of keys * @param Type of values @@ -206,8 +206,7 @@ KStream selectKey(final KeyValueMapper KStream map(final KeyValueMapper> mapper); @@ -245,8 +244,7 @@ KStream selectKey(final KeyValueMapper KStream map(final KeyValueMapper> mapper, final Named named); @@ -256,7 +254,7 @@ KStream map(final KeyValueMapper} can be transformed into an output record {@code }. * This is a stateless record-by-record operation (cf. - * {@link #transformValues(ValueTransformerSupplier, String...)} for stateful value transformation). + * {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing). *

* The example below counts the number of token of the value string. *

{@code
@@ -280,8 +278,7 @@  KStream map(final KeyValueMapper KStream mapValues(final ValueMapper mapper);
 
@@ -290,7 +287,7 @@  KStream map(final KeyValueMapper} can be transformed into an output record {@code }.
      * This is a stateless record-by-record operation (cf.
-     * {@link #transformValues(ValueTransformerSupplier, String...)} for stateful value transformation).
+     * {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing).
      * 

* The example below counts the number of token of the value string. *

{@code
@@ -315,8 +312,7 @@  KStream map(final KeyValueMapper KStream mapValues(final ValueMapper mapper,
                                   final Named named);
@@ -326,7 +322,7 @@  KStream mapValues(final ValueMapper mapper,
      * The provided {@link ValueMapperWithKey} is applied to each input record value and computes a new value for it.
      * Thus, an input record {@code } can be transformed into an output record {@code }.
      * This is a stateless record-by-record operation (cf.
-     * {@link #transformValues(ValueTransformerWithKeySupplier, String...)} for stateful value transformation).
+     * {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing).
      * 

* The example below counts the number of tokens of key and value strings. *

{@code
@@ -351,8 +347,7 @@  KStream mapValues(final ValueMapper mapper,
      * @see #flatMapValues(ValueMapper)
      * @see #flatMapValues(ValueMapperWithKey)
      * @see #process(ProcessorSupplier, String...)
-     * @see #transformValues(ValueTransformerSupplier, String...)
-     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #processValues(FixedKeyProcessorSupplier, String...)
      */
      KStream mapValues(final ValueMapperWithKey mapper);
 
@@ -361,7 +356,7 @@  KStream mapValues(final ValueMapper mapper,
      * The provided {@link ValueMapperWithKey} is applied to each input record value and computes a new value for it.
      * Thus, an input record {@code } can be transformed into an output record {@code }.
      * This is a stateless record-by-record operation (cf.
-     * {@link #transformValues(ValueTransformerWithKeySupplier, String...)} for stateful value transformation).
+     * {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing).
      * 

* The example below counts the number of tokens of key and value strings. *

{@code
@@ -387,8 +382,7 @@  KStream mapValues(final ValueMapper mapper,
      * @see #flatMapValues(ValueMapper)
      * @see #flatMapValues(ValueMapperWithKey)
      * @see #process(ProcessorSupplier, String...)
-     * @see #transformValues(ValueTransformerSupplier, String...)
-     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #processValues(FixedKeyProcessorSupplier, String...)
      */
      KStream mapValues(final ValueMapperWithKey mapper,
                                   final Named named);
@@ -436,8 +430,7 @@  KStream mapValues(final ValueMapperWithKey KStream mapValues(final ValueMapperWithKey KStream flatMap(final KeyValueMapper} can be transformed into output records {@code , , ...}.
-     * This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerSupplier, String...)}
-     * for stateful value transformation).
+     * This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)}
+     * for stateful value processing).
      * 

* The example below splits input records {@code } containing sentences as values into their words. *

{@code
@@ -530,8 +522,7 @@  KStream flatMap(final KeyValueMapper KStream flatMap(final KeyValueMapper} can be transformed into output records {@code , , ...}.
-     * This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerSupplier, String...)}
-     * for stateful value transformation).
+     * This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)}
+     * for stateful value processing).
      * 

* The example below splits input records {@code } containing sentences as values into their words. *

{@code
@@ -573,8 +564,7 @@  KStream flatMap(final KeyValueMapper KStream flatMapValues(final ValueMapper} can be transformed into output records {@code , , ...}.
-     * This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerWithKeySupplier, String...)}
-     * for stateful value transformation).
+     * This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)}
+     * for stateful value processing).
      * 

* The example below splits input records {@code }, with key=1, containing sentences as values * into their words. @@ -621,8 +611,7 @@ KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapper} can be transformed into output records {@code , , ...}. - * This is a stateless record-by-record operation (cf. {@link #transformValues(ValueTransformerWithKeySupplier, String...)} - * for stateful value transformation). + * This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)} + * for stateful value processing). *

* The example below splits input records {@code }, with key=1, containing sentences as values * into their words. @@ -670,8 +659,7 @@ KStream flatMapValues(final ValueMapper KStream leftJoin(final GlobalKTable globalTable, final KeyValueMapper keySelector, final ValueJoinerWithKey valueJoiner, final Named named); - - /** - * Transform the value of each input record into a new value (with possibly a new type) of the output record. - * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input - * record value and computes a new value for it. - * Thus, an input record {@code } can be transformed into an output record {@code }. - * Attaching a state store makes this a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper) mapValues()}). - * If you choose not to attach one, this operation is similar to the stateless {@link #mapValues(ValueMapper) mapValues()} - * but allows access to the {@code ProcessorContext} and record metadata. - * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress - * can be observed and additional periodic actions can be performed. - *

- * In order for the transformer to use state stores, the stores must be added to the topology and connected to the - * transformer using at least one of two strategies (though it's not required to connect global state stores; read-only - * access to global state stores is available by default). - *

- * The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)}, - * and specify the store names via {@code stateStoreNames} so they will be connected to the transformer. - *

{@code
-     * // create store
-     * StoreBuilder> keyValueStoreBuilder =
-     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                 Serdes.String(),
-     *                 Serdes.String());
-     * // add store
-     * builder.addStateStore(keyValueStoreBuilder);
-     *
-     * KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() {
-     *     public ValueTransformer get() {
-     *         return new MyValueTransformer();
-     *     }
-     * }, "myValueTransformState");
-     * }
- * The second strategy is for the given {@link ValueTransformerSupplier} to implement {@link ConnectedStoreProvider#stores()}, - * which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the transformer. - *
{@code
-     * class MyValueTransformerSupplier implements ValueTransformerSupplier {
-     *     // supply transformer
-     *     ValueTransformer get() {
-     *         return new MyValueTransformer();
-     *     }
-     *
-     *     // provide store(s) that will be added and connected to the associated transformer
-     *     // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
-     *     Set stores() {
-     *         StoreBuilder> keyValueStoreBuilder =
-     *                   Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                   Serdes.String(),
-     *                   Serdes.String());
-     *         return Collections.singleton(keyValueStoreBuilder);
-     *     }
-     * }
-     *
-     * ...
-     *
-     * KStream outputStream = inputStream.transformValues(new MyValueTransformerSupplier());
-     * }
- *

- * With either strategy, within the {@link ValueTransformer}, the state is obtained via the {@link ProcessorContext}. - * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, - * a schedule must be registered. - * The {@link ValueTransformer} must return the new value in {@link ValueTransformer#transform(Object) transform()}. - * No additional {@link KeyValue} pairs can be emitted via - * {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}. - * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to - * emit a {@link KeyValue} pair. - *

{@code
-     * class MyValueTransformer implements ValueTransformer {
-     *     private StateStore state;
-     *
-     *     void init(ProcessorContext context) {
-     *         this.state = context.getStateStore("myValueTransformState");
-     *         // punctuate each second, can access this.state
-     *         context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
-     *     }
-     *
-     *     NewValueType transform(V value) {
-     *         // can access this.state
-     *         return new NewValueType(); // or null
-     *     }
-     *
-     *     void close() {
-     *         // can access this.state
-     *     }
-     * }
-     * }
- * Even if any upstream operation was key-changing, no auto-repartition is triggered. - * If repartitioning is required, a call to {@link #repartition()} should be performed before - * {@code transformValues()}. - *

- * Setting a new value preserves data co-location with respect to the key. - * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) - * is applied to the result {@code KStream}. - * - * @param valueTransformerSupplier an instance of {@link ValueTransformerSupplier} that generates a newly constructed {@link ValueTransformer} - * The supplier should always generate a new instance. Creating a single {@link ValueTransformer} object - * and returning the same object reference in {@link ValueTransformer} is a - * violation of the supplier pattern and leads to runtime exceptions. - * @param stateStoreNames the names of the state stores used by the processor; not required if the supplier - * implements {@link ConnectedStoreProvider#stores()} - * @param the value type of the result stream - * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type) - * @see #mapValues(ValueMapper) - * @see #mapValues(ValueMapperWithKey) - * @deprecated Since 3.3. Use {@link KStream#processValues(FixedKeyProcessorSupplier, String...)} instead. - */ - @Deprecated - KStream transformValues(final ValueTransformerSupplier valueTransformerSupplier, - final String... stateStoreNames); - /** - * Transform the value of each input record into a new value (with possibly a new type) of the output record. - * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input - * record value and computes a new value for it. - * Thus, an input record {@code } can be transformed into an output record {@code }. - * Attaching a state store makes this a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper) mapValues()}). - * If you choose not to attach one, this operation is similar to the stateless {@link #mapValues(ValueMapper) mapValues()} - * but allows access to the {@code ProcessorContext} and record metadata. - * This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. - * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress - * can be observed and additional periodic actions can be performed. - *

- * In order for the transformer to use state stores, the stores must be added to the topology and connected to the - * transformer using at least one of two strategies (though it's not required to connect global state stores; read-only - * access to global state stores is available by default). - *

- * The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)}, - * and specify the store names via {@code stateStoreNames} so they will be connected to the transformer. - *

{@code
-     * // create store
-     * StoreBuilder> keyValueStoreBuilder =
-     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                 Serdes.String(),
-     *                 Serdes.String());
-     * // add store
-     * builder.addStateStore(keyValueStoreBuilder);
-     *
-     * KStream outputStream = inputStream.transformValues(new ValueTransformerSupplier() {
-     *     public ValueTransformer get() {
-     *         return new MyValueTransformer();
-     *     }
-     * }, "myValueTransformState");
-     * }
- * The second strategy is for the given {@link ValueTransformerSupplier} to implement {@link ConnectedStoreProvider#stores()}, - * which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the transformer. - *
{@code
-     * class MyValueTransformerSupplier implements ValueTransformerSupplier {
-     *     // supply transformer
-     *     ValueTransformer get() {
-     *         return new MyValueTransformer();
-     *     }
-     *
-     *     // provide store(s) that will be added and connected to the associated transformer
-     *     // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
-     *     Set stores() {
-     *         StoreBuilder> keyValueStoreBuilder =
-     *                   Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                   Serdes.String(),
-     *                   Serdes.String());
-     *         return Collections.singleton(keyValueStoreBuilder);
-     *     }
-     * }
-     *
-     * ...
-     *
-     * KStream outputStream = inputStream.transformValues(new MyValueTransformerSupplier());
-     * }
- *

- * With either strategy, within the {@link ValueTransformer}, the state is obtained via the {@link ProcessorContext}. - * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, - * a schedule must be registered. - * The {@link ValueTransformer} must return the new value in {@link ValueTransformer#transform(Object) transform()}. - * No additional {@link KeyValue} pairs can be emitted via - * pairs can be emitted via - * {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}. - * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to - * emit a {@link KeyValue} pair. - *

{@code
-     * class MyValueTransformer implements ValueTransformer {
-     *     private StateStore state;
-     *
-     *     void init(ProcessorContext context) {
-     *         this.state = context.getStateStore("myValueTransformState");
-     *         // punctuate each second, can access this.state
-     *         context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
-     *     }
-     *
-     *     NewValueType transform(V value) {
-     *         // can access this.state
-     *         return new NewValueType(); // or null
-     *     }
-     *
-     *     void close() {
-     *         // can access this.state
-     *     }
-     * }
-     * }
- * Even if any upstream operation was key-changing, no auto-repartition is triggered. - * If repartitioning is required, a call to {@link #repartition()} should be performed before - * {@code transformValues()}. - *

- * Setting a new value preserves data co-location with respect to the key. - * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) - * is applied to the result {@code KStream}. - * - * @param valueTransformerSupplier an instance of {@link ValueTransformerSupplier} that generates a newly constructed {@link ValueTransformer} - * The supplier should always generate a new instance. Creating a single {@link ValueTransformer} object - * and returning the same object reference in {@link ValueTransformer} is a - * violation of the supplier pattern and leads to runtime exceptions. - * @param named a {@link Named} config used to name the processor in the topology - * @param stateStoreNames the names of the state stores used by the processor; not required if the supplier - * implements {@link ConnectedStoreProvider#stores()} - * @param the value type of the result stream - * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type) - * @see #mapValues(ValueMapper) - * @see #mapValues(ValueMapperWithKey) - * @deprecated Since 3.3. Use {@link KStream#processValues(FixedKeyProcessorSupplier, Named, String...)} instead. - */ - @Deprecated - KStream transformValues(final ValueTransformerSupplier valueTransformerSupplier, - final Named named, - final String... stateStoreNames); - - /** - * Transform the value of each input record into a new value (with possibly a new type) of the output record. - * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to - * each input record value and computes a new value for it. - * Thus, an input record {@code } can be transformed into an output record {@code }. - * Attaching a state store makes this a stateful record-by-record operation (cf. {@link #mapValues(ValueMapperWithKey) mapValues()}). - * If you choose not to attach one, this operation is similar to the stateless {@link #mapValues(ValueMapperWithKey) mapValues()} - * but allows access to the {@code ProcessorContext} and record metadata. - * This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. - * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress - * can be observed and additional periodic actions can be performed. - *

- * In order for the transformer to use state stores, the stores must be added to the topology and connected to the - * transformer using at least one of two strategies (though it's not required to connect global state stores; read-only - * access to global state stores is available by default). - *

- * The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)}, - * and specify the store names via {@code stateStoreNames} so they will be connected to the transformer. - *

{@code
-     * // create store
-     * StoreBuilder> keyValueStoreBuilder =
-     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                 Serdes.String(),
-     *                 Serdes.String());
-     * // add store
-     * builder.addStateStore(keyValueStoreBuilder);
-     *
-     * KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() {
-     *     public ValueTransformer get() {
-     *         return new MyValueTransformer();
-     *     }
-     * }, "myValueTransformState");
-     * }
- * The second strategy is for the given {@link ValueTransformerWithKeySupplier} to implement {@link ConnectedStoreProvider#stores()}, - * which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the transformer. - *
{@code
-     * class MyValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier {
-     *     // supply transformer
-     *     ValueTransformerWithKey get() {
-     *         return new MyValueTransformerWithKey();
-     *     }
-     *
-     *     // provide store(s) that will be added and connected to the associated transformer
-     *     // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
-     *     Set stores() {
-     *         StoreBuilder> keyValueStoreBuilder =
-     *                   Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                   Serdes.String(),
-     *                   Serdes.String());
-     *         return Collections.singleton(keyValueStoreBuilder);
-     *     }
-     * }
-     *
-     * ...
-     *
-     * KStream outputStream = inputStream.transformValues(new MyValueTransformerWithKeySupplier());
-     * }
- *

- * With either strategy, within the {@link ValueTransformerWithKey}, the state is obtained via the {@link ProcessorContext}. - * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, - * a schedule must be registered. - * The {@link ValueTransformerWithKey} must return the new value in - * {@link ValueTransformerWithKey#transform(Object, Object) transform()}. - * No additional {@link KeyValue} pairs can be emitted via - * {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}. - * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries - * to emit a {@link KeyValue} pair. - *

{@code
-     * class MyValueTransformerWithKey implements ValueTransformerWithKey {
-     *     private StateStore state;
-     *
-     *     void init(ProcessorContext context) {
-     *         this.state = context.getStateStore("myValueTransformState");
-     *         // punctuate each second, can access this.state
-     *         context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
-     *     }
-     *
-     *     NewValueType transform(K readOnlyKey, V value) {
-     *         // can access this.state and use read-only key
-     *         return new NewValueType(readOnlyKey); // or null
-     *     }
-     *
-     *     void close() {
-     *         // can access this.state
-     *     }
-     * }
-     * }
- * Even if any upstream operation was key-changing, no auto-repartition is triggered. - * If repartitioning is required, a call to {@link #repartition()} should be performed before - * {@code transformValues()}. - *

- * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. - * So, setting a new value preserves data co-location with respect to the key. - * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) - * is applied to the result {@code KStream}. - * - * @param valueTransformerSupplier an instance of {@link ValueTransformerWithKeySupplier} that generates a newly constructed {@link ValueTransformerWithKey} - * The supplier should always generate a new instance. Creating a single {@link ValueTransformerWithKey} object - * and returning the same object reference in {@link ValueTransformerWithKey} is a - * violation of the supplier pattern and leads to runtime exceptions. - * @param stateStoreNames the names of the state stores used by the processor; not required if the supplier - * implements {@link ConnectedStoreProvider#stores()} - * @param the value type of the result stream - * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type) - * @see #mapValues(ValueMapper) - * @see #mapValues(ValueMapperWithKey) - * @deprecated Since 3.3. Use {@link KStream#processValues(FixedKeyProcessorSupplier, String...)} instead. - */ - @Deprecated - KStream transformValues(final ValueTransformerWithKeySupplier valueTransformerSupplier, - final String... stateStoreNames); - - /** - * Transform the value of each input record into a new value (with possibly a new type) of the output record. - * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to - * each input record value and computes a new value for it. - * Thus, an input record {@code } can be transformed into an output record {@code }. - * Attaching a state store makes this a stateful record-by-record operation (cf. {@link #mapValues(ValueMapperWithKey) mapValues()}). - * If you choose not to attach one, this operation is similar to the stateless {@link #mapValues(ValueMapperWithKey) mapValues()} - * but allows access to the {@code ProcessorContext} and record metadata. - * This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI. - * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress - * can be observed and additional periodic actions can be performed. - *

- * In order for the transformer to use state stores, the stores must be added to the topology and connected to the - * transformer using at least one of two strategies (though it's not required to connect global state stores; read-only - * access to global state stores is available by default). - *

- * The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)}, - * and specify the store names via {@code stateStoreNames} so they will be connected to the transformer. - *

{@code
-     * // create store
-     * StoreBuilder> keyValueStoreBuilder =
-     *         Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                 Serdes.String(),
-     *                 Serdes.String());
-     * // add store
-     * builder.addStateStore(keyValueStoreBuilder);
-     *
-     * KStream outputStream = inputStream.transformValues(new ValueTransformerWithKeySupplier() {
-     *     public ValueTransformerWithKey get() {
-     *         return new MyValueTransformerWithKey();
-     *     }
-     * }, "myValueTransformState");
-     * }
- * The second strategy is for the given {@link ValueTransformerWithKeySupplier} to implement {@link ConnectedStoreProvider#stores()}, - * which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the transformer. - *
{@code
-     * class MyValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier {
-     *     // supply transformer
-     *     ValueTransformerWithKey get() {
-     *         return new MyValueTransformerWithKey();
-     *     }
-     *
-     *     // provide store(s) that will be added and connected to the associated transformer
-     *     // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
-     *     Set stores() {
-     *         StoreBuilder> keyValueStoreBuilder =
-     *                   Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                   Serdes.String(),
-     *                   Serdes.String());
-     *         return Collections.singleton(keyValueStoreBuilder);
-     *     }
-     * }
-     *
-     * ...
-     *
-     * KStream outputStream = inputStream.transformValues(new MyValueTransformerWithKeySupplier());
-     * }
- *

- * With either strategy, within the {@link ValueTransformerWithKey}, the state is obtained via the {@link ProcessorContext}. - * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()}, - * a schedule must be registered. - * The {@link ValueTransformerWithKey} must return the new value in - * {@link ValueTransformerWithKey#transform(Object, Object) transform()}. - * No additional {@link KeyValue} pairs can be emitted via - * {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}. - * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries - * to emit a {@link KeyValue} pair. - *

{@code
-     * class MyValueTransformerWithKey implements ValueTransformerWithKey {
-     *     private StateStore state;
-     *
-     *     void init(ProcessorContext context) {
-     *         this.state = context.getStateStore("myValueTransformState");
-     *         // punctuate each second, can access this.state
-     *         context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
-     *     }
-     *
-     *     NewValueType transform(K readOnlyKey, V value) {
-     *         // can access this.state and use read-only key
-     *         return new NewValueType(readOnlyKey); // or null
-     *     }
-     *
-     *     void close() {
-     *         // can access this.state
-     *     }
-     * }
-     * }
- * Even if any upstream operation was key-changing, no auto-repartition is triggered. - * If repartitioning is required, a call to {@link #repartition()} should be performed before - * {@code transformValues()}. - *

- * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. - * So, setting a new value preserves data co-location with respect to the key. - * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) - * is applied to the result {@code KStream}. - * - * @param valueTransformerSupplier an instance of {@link ValueTransformerWithKeySupplier} that generates a newly constructed {@link ValueTransformerWithKey} - * The supplier should always generate a new instance. Creating a single {@link ValueTransformerWithKey} object - * and returning the same object reference in {@link ValueTransformerWithKey} is a - * violation of the supplier pattern and leads to runtime exceptions. - * @param named a {@link Named} config used to name the processor in the topology - * @param stateStoreNames the names of the state stores used by the processor; not required if the supplier - * implements {@link ConnectedStoreProvider#stores()} - * @param the value type of the result stream - * @return a {@code KStream} that contains records with unmodified key and new values (possibly of different type) - * @see #mapValues(ValueMapper) - * @see #mapValues(ValueMapperWithKey) - * @deprecated Since 3.3. Use {@link KStream#processValues(FixedKeyProcessorSupplier, Named, String...)} instead. - */ - @Deprecated - KStream transformValues(final ValueTransformerWithKeySupplier valueTransformerSupplier, - final Named named, - final String... stateStoreNames); /** * Transform the value of each input record into zero or more new values (with possibly a new * type) and emit for each new value a record with the same key of the input record and the value. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java index 228b1d712312..222cdc1bbc27 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java @@ -35,7 +35,6 @@ * @see Transformer * @see ValueTransformer * @see ValueTransformerSupplier - * @see KStream#transformValues(ValueTransformerSupplier, String...) * @deprecated Since 4.0. Use {@link org.apache.kafka.streams.processor.api.ProcessorSupplier api.ProcessorSupplier} instead. */ @Deprecated diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java index 6d4e4fe8f1cd..ae1d21334ca4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java @@ -44,8 +44,7 @@ * @param transformed value type * @see ValueTransformerSupplier * @see ValueTransformerWithKeySupplier - * @see KStream#transformValues(ValueTransformerSupplier, String...) - * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...) + * @see KTable#transformValues(ValueTransformerWithKeySupplier, Materialized, String...) * @see Transformer * @deprecated Since 4.0. Use {@link FixedKeyProcessor} instead. */ @@ -77,7 +76,7 @@ public interface ValueTransformer { /** * Transform the given value to a new value. - * Additionally, any {@link StateStore} that is {@link KStream#transformValues(ValueTransformerSupplier, String...) + * Additionally, any {@link StateStore} that is {@link KTable#transformValues(ValueTransformerWithKeySupplier, String...) * attached} to this operator can be accessed and modified arbitrarily (cf. * {@link ProcessorContext#getStateStore(String)}). *

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java index b0008744eac3..6a4c25b0c1c9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java @@ -31,8 +31,6 @@ * @see ValueTransformer * @see ValueTransformerWithKey * @see ValueTransformerWithKeySupplier - * @see KStream#transformValues(ValueTransformerSupplier, String...) - * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...) * @see Transformer * @see TransformerSupplier * @deprecated Since 4.0. Use {@link FixedKeyProcessorSupplier} instead. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java index 9c3552622adc..cc0c38d01ef0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java @@ -47,8 +47,7 @@ * @param transformed value type * @see ValueTransformer * @see ValueTransformerWithKeySupplier - * @see KStream#transformValues(ValueTransformerSupplier, String...) - * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...) + * @see KTable#transformValues(ValueTransformerWithKeySupplier, String...) * @see Transformer */ @@ -77,7 +76,7 @@ public interface ValueTransformerWithKey { /** * Transform the given [key and] value to a new value. - * Additionally, any {@link StateStore} that is {@link KStream#transformValues(ValueTransformerWithKeySupplier, String...) + * Additionally, any {@link StateStore} that is {@link KTable#transformValues(ValueTransformerWithKeySupplier, Named, String...) * attached} to this operator can be accessed and modified arbitrarily (cf. * {@link ProcessorContext#getStateStore(String)}). *

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKeySupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKeySupplier.java index 1c0feb0015e0..8b1e995f1c37 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKeySupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKeySupplier.java @@ -32,8 +32,6 @@ * @param transformed value type * @see ValueTransformer * @see ValueTransformerWithKey - * @see KStream#transformValues(ValueTransformerSupplier, String...) - * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...) * @see Transformer * @see TransformerSupplier */ diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index a23c5ad4b0be..ec2fd211efb3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -121,8 +121,6 @@ public class KStreamImpl extends AbstractStream implements KStream KStream doStreamTableJoin(final KTable table, builder); } - @Override - @Deprecated - public KStream transformValues(final org.apache.kafka.streams.kstream.ValueTransformerSupplier valueTransformerSupplier, - final String... stateStoreNames) { - Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null"); - return doTransformValues( - toValueTransformerWithKeySupplier(valueTransformerSupplier), - NamedInternal.empty(), - stateStoreNames); - } - - @Override - @Deprecated - public KStream transformValues(final org.apache.kafka.streams.kstream.ValueTransformerSupplier valueTransformerSupplier, - final Named named, - final String... stateStoreNames) { - Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null"); - Objects.requireNonNull(named, "named can't be null"); - return doTransformValues( - toValueTransformerWithKeySupplier(valueTransformerSupplier), - new NamedInternal(named), - stateStoreNames); - } - - @Override - @Deprecated - public KStream transformValues(final ValueTransformerWithKeySupplier valueTransformerSupplier, - final String... stateStoreNames) { - Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null"); - return doTransformValues(valueTransformerSupplier, NamedInternal.empty(), stateStoreNames); - } - - @Override - @Deprecated - public KStream transformValues(final ValueTransformerWithKeySupplier valueTransformerSupplier, - final Named named, - final String... stateStoreNames) { - Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null"); - Objects.requireNonNull(named, "named can't be null"); - return doTransformValues(valueTransformerSupplier, new NamedInternal(named), stateStoreNames); - } - - private KStream doTransformValues(final ValueTransformerWithKeySupplier valueTransformerWithKeySupplier, - final NamedInternal named, - final String... stateStoreNames) { - Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array"); - for (final String stateStoreName : stateStoreNames) { - Objects.requireNonNull(stateStoreName, "stateStoreNames can't contain `null` as store name"); - } - ApiUtils.checkSupplier(valueTransformerWithKeySupplier); - - final String name = named.orElseGenerateWithPrefix(builder, TRANSFORMVALUES_NAME); - final StatefulProcessorNode transformNode = new StatefulProcessorNode<>( - name, - new ProcessorParameters<>(new KStreamTransformValues<>(valueTransformerWithKeySupplier), name), - stateStoreNames); - transformNode.setValueChangingOperation(true); - - builder.addGraphNode(graphNode, transformNode); - - // cannot inherit value serde - return new KStreamImpl<>( - name, - keySerde, - null, - subTopologySourceNodes, - repartitionRequired, - transformNode, - builder); - } - @Override @Deprecated public KStream flatTransformValues(final org.apache.kafka.streams.kstream.ValueTransformerSupplier> valueTransformerSupplier, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java deleted file mode 100644 index 1b767ef39690..000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.ValueTransformerWithKey; -import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; -import org.apache.kafka.streams.processor.api.ContextualProcessor; -import org.apache.kafka.streams.processor.api.Processor; -import org.apache.kafka.streams.processor.api.ProcessorContext; -import org.apache.kafka.streams.processor.api.ProcessorSupplier; -import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext; -import org.apache.kafka.streams.processor.internals.InternalProcessorContext; -import org.apache.kafka.streams.state.StoreBuilder; - -import java.util.Set; - -public class KStreamTransformValues implements ProcessorSupplier { - - private final ValueTransformerWithKeySupplier valueTransformerSupplier; - - KStreamTransformValues(final ValueTransformerWithKeySupplier valueTransformerSupplier) { - this.valueTransformerSupplier = valueTransformerSupplier; - } - - @Override - public Processor get() { - return new KStreamTransformValuesProcessor<>(valueTransformerSupplier.get()); - } - - @Override - public Set> stores() { - return valueTransformerSupplier.stores(); - } - - public static class KStreamTransformValuesProcessor extends ContextualProcessor { - - private final ValueTransformerWithKey valueTransformer; - - KStreamTransformValuesProcessor(final ValueTransformerWithKey valueTransformer) { - this.valueTransformer = valueTransformer; - } - - @Override - public void init(final ProcessorContext context) { - super.init(context); - valueTransformer.init(new ForwardingDisabledProcessorContext((InternalProcessorContext) context)); - } - - @Override - public void process(final Record record) { - context().forward(record.withValue(valueTransformer.transform(record.key(), record.value()))); - } - - @Override - public void close() { - valueTransformer.close(); - } - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java index 91824d5a5b81..108a7d7233bf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java @@ -20,6 +20,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; import org.apache.kafka.streams.state.StoreBuilder; import java.util.Set; @@ -91,10 +92,7 @@ * @see Topology#addProcessor(String, org.apache.kafka.streams.processor.api.ProcessorSupplier, String...) * @see KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, String...) * @see KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, Named, String...) - * @see KStream#transformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, String...) - * @see KStream#transformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, Named, String...) - * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...) - * @see KStream#transformValues(ValueTransformerWithKeySupplier, Named, String...) + * @see KStream#processValues(FixedKeyProcessorSupplier, String...) * @see KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, String...) * @see KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, Named, String...) * @see KStream#flatTransformValues(ValueTransformerWithKeySupplier, String...) diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index aa646e873cf3..6f79aee9dbe2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -845,24 +845,6 @@ public void shouldUseSpecifiedNameForForEachOperation() { assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME); } - @Test - @SuppressWarnings("deprecation") - public void shouldUseSpecifiedNameForTransformValues() { - builder.stream(STREAM_TOPIC).transformValues(() -> new NoopValueTransformer<>(), Named.as(STREAM_OPERATION_NAME)); - builder.build(); - final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology(); - assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldUseSpecifiedNameForTransformValuesWithKey() { - builder.stream(STREAM_TOPIC).transformValues(() -> new NoopValueTransformerWithKey<>(), Named.as(STREAM_OPERATION_NAME)); - builder.build(); - final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology(); - assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME); - } - @Test public void shouldUseSpecifiedNameForSplitOperation() { builder.stream(STREAM_TOPIC) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 226fc357a6a3..b78696f259ad 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -114,33 +114,6 @@ public class KStreamImplTest { private final MockApiProcessorSupplier processorSupplier = new MockApiProcessorSupplier<>(); private final MockApiFixedKeyProcessorSupplier fixedKeyProcessorSupplier = new MockApiFixedKeyProcessorSupplier<>(); @SuppressWarnings("deprecation") - private final org.apache.kafka.streams.kstream.ValueTransformerSupplier valueTransformerSupplier = - () -> new org.apache.kafka.streams.kstream.ValueTransformer() { - @Override - public void init(final ProcessorContext context) {} - - @Override - public String transform(final String value) { - return value; - } - - @Override - public void close() {} - }; - private final ValueTransformerWithKeySupplier valueTransformerWithKeySupplier = - () -> new ValueTransformerWithKey() { - @Override - public void init(final ProcessorContext context) {} - - @Override - public String transform(final String key, final String value) { - return value; - } - - @Override - public void close() {} - }; - @SuppressWarnings("deprecation") private final org.apache.kafka.streams.kstream.ValueTransformerSupplier> flatValueTransformerSupplier = () -> new org.apache.kafka.streams.kstream.ValueTransformer>() { @Override @@ -1213,9 +1186,6 @@ public void shouldPreserveSerdesForOperators() { assertEquals(((AbstractStream) stream1.flatMapValues(flatMapper)).keySerde(), consumedInternal.keySerde()); assertNull(((AbstractStream) stream1.flatMapValues(flatMapper)).valueSerde()); - assertEquals(((AbstractStream) stream1.transformValues(valueTransformerSupplier)).keySerde(), consumedInternal.keySerde()); - assertNull(((AbstractStream) stream1.transformValues(valueTransformerSupplier)).valueSerde()); - assertNull(((AbstractStream) stream1.merge(stream1)).keySerde()); assertNull(((AbstractStream) stream1.merge(stream1)).valueSerde()); @@ -1589,7 +1559,7 @@ public void shouldNotAllowBadProcessSupplierOnProcessWithNamed() { processorSupplier.get(); final IllegalArgumentException exception = assertThrows( IllegalArgumentException.class, - () -> testStream.process(() -> processor, Named.as("flatTransformer")) + () -> testStream.process(() -> processor, Named.as("processor")) ); assertThat(exception.getMessage(), containsString("#get() must return a new object each time it is called.")); } @@ -1606,272 +1576,49 @@ public void shouldNotAllowBadProcessSupplierOnProcessWithNamedAndStores() { } @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowBadTransformerSupplierOnTransformValues() { - final org.apache.kafka.streams.kstream.ValueTransformer transformer = valueTransformerSupplier.get(); + public void shouldNotAllowBadProcessSupplierOnProcessValues() { + final org.apache.kafka.streams.processor.api.FixedKeyProcessor processor = + fixedKeyProcessorSupplier.get(); final IllegalArgumentException exception = assertThrows( - IllegalArgumentException.class, - () -> testStream.transformValues(() -> transformer) + IllegalArgumentException.class, + () -> testStream.processValues(() -> processor) ); assertThat(exception.getMessage(), containsString("#get() must return a new object each time it is called.")); } @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowBadTransformerSupplierOnTransformValuesWithNamed() { - final org.apache.kafka.streams.kstream.ValueTransformer transformer = valueTransformerSupplier.get(); + public void shouldNotAllowBadProcessSupplierOnProcessValuesWithStores() { + final org.apache.kafka.streams.processor.api.FixedKeyProcessor processor = + fixedKeyProcessorSupplier.get(); final IllegalArgumentException exception = assertThrows( - IllegalArgumentException.class, - () -> testStream.transformValues(() -> transformer, Named.as("transformer")) + IllegalArgumentException.class, + () -> testStream.processValues(() -> processor, "storeName") ); assertThat(exception.getMessage(), containsString("#get() must return a new object each time it is called.")); } @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullValueTransformerSupplierOnTransformValues() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.transformValues((org.apache.kafka.streams.kstream.ValueTransformerSupplier) null)); - assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowBadValueTransformerWithKeySupplierOnTransformValues() { - final ValueTransformerWithKey transformer = valueTransformerWithKeySupplier.get(); + public void shouldNotAllowBadProcessSupplierOnProcessValuesWithNamed() { + final org.apache.kafka.streams.processor.api.FixedKeyProcessor processor = + fixedKeyProcessorSupplier.get(); final IllegalArgumentException exception = assertThrows( - IllegalArgumentException.class, - () -> testStream.transformValues(() -> transformer) + IllegalArgumentException.class, + () -> testStream.processValues(() -> processor, Named.as("processor")) ); assertThat(exception.getMessage(), containsString("#get() must return a new object each time it is called.")); } @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowBadValueTransformerWithKeySupplierOnTransformValuesWithNamed() { - final ValueTransformerWithKey transformer = valueTransformerWithKeySupplier.get(); + public void shouldNotAllowBadProcessSupplierOnProcessValuesWithNamedAndStores() { + final org.apache.kafka.streams.processor.api.FixedKeyProcessor processor = + fixedKeyProcessorSupplier.get(); final IllegalArgumentException exception = assertThrows( - IllegalArgumentException.class, - () -> testStream.transformValues(() -> transformer, Named.as("transformer")) + IllegalArgumentException.class, + () -> testStream.processValues(() -> processor, Named.as("processor"), "storeName") ); assertThat(exception.getMessage(), containsString("#get() must return a new object each time it is called.")); } - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullValueTransformerWithKeySupplierOnTransformValues() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.transformValues((ValueTransformerWithKeySupplier) null)); - assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullValueTransformerSupplierOnTransformValuesWithStores() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.transformValues( - (org.apache.kafka.streams.kstream.ValueTransformerSupplier) null, - "storeName")); - assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullValueTransformerWithKeySupplierOnTransformValuesWithStores() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.transformValues( - (ValueTransformerWithKeySupplier) null, - "storeName")); - assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullValueTransformerSupplierOnTransformValuesWithNamed() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.transformValues( - (org.apache.kafka.streams.kstream.ValueTransformerSupplier) null, - Named.as("valueTransformer"))); - assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullValueTransformerWithKeySupplierOnTransformValuesWithNamed() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.transformValues( - (ValueTransformerWithKeySupplier) null, - Named.as("valueTransformerWithKey"))); - assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullValueTransformerSupplierOnTransformValuesWithNamedAndStores() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.transformValues( - (org.apache.kafka.streams.kstream.ValueTransformerSupplier) null, - Named.as("valueTransformer"), - "storeName")); - assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullValueTransformerWithKeySupplierOnTransformValuesWithNamedAndStores() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.transformValues( - (ValueTransformerWithKeySupplier) null, - Named.as("valueTransformerWithKey"), - "storeName")); - assertThat(exception.getMessage(), equalTo("valueTransformerSupplier can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullStoreNamesOnTransformValuesWithValueTransformerSupplier() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.transformValues( - valueTransformerSupplier, - (String[]) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullStoreNamesOnTransformValuesWithValueTransformerWithKeySupplier() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.transformValues( - valueTransformerWithKeySupplier, - (String[]) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullStoreNameOnTransformValuesWithValueTransformerSupplier() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.transformValues( - valueTransformerSupplier, (String) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't contain `null` as store name")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullStoreNameOnTransformValuesWithValueTransformerWithKeySupplier() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.transformValues( - valueTransformerWithKeySupplier, - (String) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't contain `null` as store name")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullStoreNamesOnTransformValuesWithValueTransformerSupplierWithNamed() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.transformValues( - valueTransformerSupplier, - Named.as("valueTransformer"), - (String[]) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullStoreNamesOnTransformValuesWithValueTransformerWithKeySupplierWithNamed() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.transformValues( - valueTransformerWithKeySupplier, - Named.as("valueTransformer"), - (String[]) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a null array")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullStoreNameOnTransformValuesWithValueTransformerSupplierWithNamed() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.transformValues( - valueTransformerSupplier, - Named.as("valueTransformer"), - (String) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't contain `null` as store name")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullStoreNameOnTransformValuesWithValueTransformerWithKeySupplierWithName() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.transformValues( - valueTransformerWithKeySupplier, - Named.as("valueTransformerWithKey"), - (String) null)); - assertThat(exception.getMessage(), equalTo("stateStoreNames can't contain `null` as store name")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullNamedOnTransformValuesWithValueTransformerSupplier() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.transformValues( - valueTransformerSupplier, - (Named) null)); - assertThat(exception.getMessage(), equalTo("named can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullNamedOnTransformValuesWithValueTransformerWithKeySupplier() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.transformValues( - valueTransformerWithKeySupplier, - (Named) null)); - assertThat(exception.getMessage(), equalTo("named can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullNamedOnTransformValuesWithValueTransformerSupplierAndStores() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.transformValues( - valueTransformerSupplier, - (Named) null, - "storeName")); - assertThat(exception.getMessage(), equalTo("named can't be null")); - } - - @Test - @SuppressWarnings("deprecation") - public void shouldNotAllowNullNamedOnTransformValuesWithValueTransformerWithKeySupplierAndStores() { - final NullPointerException exception = assertThrows( - NullPointerException.class, - () -> testStream.transformValues( - valueTransformerWithKeySupplier, - (Named) null, - "storeName")); - assertThat(exception.getMessage(), equalTo("named can't be null")); - } - @Test @SuppressWarnings("deprecation") public void shouldNotAllowNullValueTransformerSupplierOnFlatTransformValues() { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java deleted file mode 100644 index 5cb51ae7bebb..000000000000 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KeyValueTimestamp; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.TestInputTopic; -import org.apache.kafka.streams.TopologyTestDriver; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.ValueTransformerWithKey; -import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; -import org.apache.kafka.streams.processor.api.Processor; -import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext; -import org.apache.kafka.streams.processor.internals.InternalProcessorContext; -import org.apache.kafka.test.MockProcessorSupplier; -import org.apache.kafka.test.NoOpValueTransformerWithKeySupplier; -import org.apache.kafka.test.StreamsTestUtils; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.junit.jupiter.MockitoSettings; -import org.mockito.quality.Strictness; - -import java.util.Properties; - -import static org.hamcrest.CoreMatchers.isA; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.mockito.Mockito.mock; - -@ExtendWith(MockitoExtension.class) -@MockitoSettings(strictness = Strictness.STRICT_STUBS) -public class KStreamTransformValuesTest { - private final String topicName = "topic"; - private final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); - private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer()); - private InternalProcessorContext context = mock(InternalProcessorContext.class); - - @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. - @Test - public void testTransform() { - final StreamsBuilder builder = new StreamsBuilder(); - - final org.apache.kafka.streams.kstream.ValueTransformerSupplier valueTransformerSupplier = - () -> new org.apache.kafka.streams.kstream.ValueTransformer() { - private int total = 0; - - @Override - public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { } - - @Override - public Integer transform(final Number value) { - total += value.intValue(); - return total; - } - - @Override - public void close() { } - }; - - final int[] expectedKeys = {1, 10, 100, 1000}; - - final KStream stream; - stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer())); - stream.transformValues(valueTransformerSupplier).process(supplier); - - try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { - for (final int expectedKey : expectedKeys) { - final TestInputTopic inputTopic = - driver.createInputTopic(topicName, new IntegerSerializer(), new IntegerSerializer()); - inputTopic.pipeInput(expectedKey, expectedKey * 10, expectedKey / 2L); - } - } - final KeyValueTimestamp[] expected = {new KeyValueTimestamp<>(1, 10, 0), - new KeyValueTimestamp<>(10, 110, 5), - new KeyValueTimestamp<>(100, 1110, 50), - new KeyValueTimestamp<>(1000, 11110, 500)}; - - assertArrayEquals(expected, supplier.theCapturedProcessor().processed().toArray()); - } - - @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. - @Test - public void testTransformWithKey() { - final StreamsBuilder builder = new StreamsBuilder(); - - final ValueTransformerWithKeySupplier valueTransformerSupplier = - () -> new ValueTransformerWithKey() { - private int total = 0; - - @Override - public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { } - - @Override - public Integer transform(final Integer readOnlyKey, final Number value) { - total += value.intValue() + readOnlyKey; - return total; - } - - @Override - public void close() { } - }; - - final int[] expectedKeys = {1, 10, 100, 1000}; - - final KStream stream; - stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer())); - stream.transformValues(valueTransformerSupplier).process(supplier); - - try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { - final TestInputTopic inputTopic = - driver.createInputTopic(topicName, new IntegerSerializer(), new IntegerSerializer()); - for (final int expectedKey : expectedKeys) { - inputTopic.pipeInput(expectedKey, expectedKey * 10, expectedKey / 2L); - } - } - final KeyValueTimestamp[] expected = {new KeyValueTimestamp<>(1, 11, 0), - new KeyValueTimestamp<>(10, 121, 5), - new KeyValueTimestamp<>(100, 1221, 50), - new KeyValueTimestamp<>(1000, 12221, 500)}; - - assertArrayEquals(expected, supplier.theCapturedProcessor().processed().toArray()); - } - - @SuppressWarnings("unchecked") - @Test - public void shouldInitializeTransformerWithForwardDisabledProcessorContext() { - final NoOpValueTransformerWithKeySupplier transformer = new NoOpValueTransformerWithKeySupplier<>(); - final KStreamTransformValues transformValues = new KStreamTransformValues<>(transformer); - final Processor processor = transformValues.get(); - - processor.init(context); - - assertThat(transformer.context, isA((Class) ForwardingDisabledProcessorContext.class)); - } -} diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index 5e6cc4f3f220..80d43fc2315b 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -584,94 +584,6 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { ): KStream[K, VR] = new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, named, stateStoreNames: _*)) - /** - * Transform the value of each input record into a new value (with possible new type) of the output record. - * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input - * record value and computes a new value for it. - * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected - * to the `ValueTransformer`. - * It's not required to connect global state stores that are added via `addGlobalStore`; - * read-only access to global state stores is available by default. - * - * @param valueTransformerSupplier a instance of `ValueTransformerSupplier` that generates a `ValueTransformer` - * @param stateStoreNames the names of the state stores used by the processor - * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) - * @see `org.apache.kafka.streams.kstream.KStream#transformValues` - */ - @deprecated(since = "3.3", message = "Use processValues(FixedKeyProcessorSupplier, String*) instead.") - def transformValues[VR]( - valueTransformerSupplier: ValueTransformerSupplier[V, VR], - stateStoreNames: String* - ): KStream[K, VR] = - new KStream(inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*)) - - /** - * Transform the value of each input record into a new value (with possible new type) of the output record. - * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input - * record value and computes a new value for it. - * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected - * to the `ValueTransformer`. - * It's not required to connect global state stores that are added via `addGlobalStore`; - * read-only access to global state stores is available by default. - * - * @param valueTransformerSupplier a instance of `ValueTransformerSupplier` that generates a `ValueTransformer` - * @param named a [[Named]] config used to name the processor in the topology - * @param stateStoreNames the names of the state stores used by the processor - * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) - * @see `org.apache.kafka.streams.kstream.KStream#transformValues` - */ - @deprecated(since = "3.3", message = "Use processValues(FixedKeyProcessorSupplier, Named, String*) instead.") - def transformValues[VR]( - valueTransformerSupplier: ValueTransformerSupplier[V, VR], - named: Named, - stateStoreNames: String* - ): KStream[K, VR] = - new KStream(inner.transformValues[VR](valueTransformerSupplier, named, stateStoreNames: _*)) - - /** - * Transform the value of each input record into a new value (with possible new type) of the output record. - * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input - * record value and computes a new value for it. - * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected - * to the `ValueTransformer`. - * It's not required to connect global state stores that are added via `addGlobalStore`; - * read-only access to global state stores is available by default. - * - * @param valueTransformerSupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey` - * @param stateStoreNames the names of the state stores used by the processor - * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) - * @see `org.apache.kafka.streams.kstream.KStream#transformValues` - */ - @deprecated(since = "3.3", message = "Use processValues(FixedKeyProcessorSupplier, String*) instead.") - def transformValues[VR]( - valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR], - stateStoreNames: String* - ): KStream[K, VR] = - new KStream(inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*)) - - /** - * Transform the value of each input record into a new value (with possible new type) of the output record. - * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) is applied to each input - * record value and computes a new value for it. - * In order to assign a state, the state must be created and added via `addStateStore` before they can be connected - * to the `ValueTransformer`. - * It's not required to connect global state stores that are added via `addGlobalStore`; - * read-only access to global state stores is available by default. - * - * @param valueTransformerSupplier a instance of `ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey` - * @param named a [[Named]] config used to name the processor in the topology - * @param stateStoreNames the names of the state stores used by the processor - * @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type) - * @see `org.apache.kafka.streams.kstream.KStream#transformValues` - */ - @deprecated(since = "3.3", message = "Use processValues(FixedKeyProcessorSupplier, Named, String*) instead.") - def transformValues[VR]( - valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR], - named: Named, - stateStoreNames: String* - ): KStream[K, VR] = - new KStream(inner.transformValues[VR](valueTransformerSupplier, named, stateStoreNames: _*)) - /** * Process all records in this stream, one record at a time, by applying a `Processor` (provided by the given * `processorSupplier`).