Skip to content

Commit

Permalink
KAFKA-16339: [4/4 KStream#flatTransformValues] Remove Deprecated "tra…
Browse files Browse the repository at this point in the history
…nsformer" methods and classes (apache#17882)

Reviewer: Matthias J. Sax <[email protected]>
  • Loading branch information
fonsdant authored Dec 9, 2024
1 parent ee42644 commit d5c2029
Show file tree
Hide file tree
Showing 14 changed files with 38 additions and 1,514 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,4 @@ public static void checkSupplier(final Supplier<?> supplier) {
" %s#get() must return a new object each time it is called.", supplierClass, supplierClass));
}
}

/**
* @throws IllegalArgumentException if the same instance is obtained each time
*/
@SuppressWarnings("deprecation")
public static <VR, V> void checkSupplier(final org.apache.kafka.streams.kstream.ValueTransformerSupplier<V, VR> supplier) {
if (supplier.get() == supplier.get()) {
final String supplierClass = supplier.getClass().getName();
throw new IllegalArgumentException(String.format("%s generates single reference." +
" %s#get() must return a new object each time it is called.", supplierClass, supplierClass));
}
}
}
503 changes: 6 additions & 497 deletions streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,12 @@
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.StoreBuilder;

import java.util.Collection;
import java.util.HashSet;
Expand Down Expand Up @@ -109,40 +104,6 @@ static <K, V, VR> ValueMapperWithKey<K, V, VR> withKey(final ValueMapper<V, VR>
return (readOnlyKey, value) -> valueMapper.apply(value);
}

@SuppressWarnings("deprecation")
static <K, V, VR> ValueTransformerWithKeySupplier<K, V, VR> toValueTransformerWithKeySupplier(
final org.apache.kafka.streams.kstream.ValueTransformerSupplier<V, VR> valueTransformerSupplier) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
ApiUtils.checkSupplier(valueTransformerSupplier);
return new ValueTransformerWithKeySupplier<K, V, VR>() {
@Override
public ValueTransformerWithKey<K, V, VR> get() {
final org.apache.kafka.streams.kstream.ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
return new ValueTransformerWithKey<K, V, VR>() {
@Override
public void init(final ProcessorContext context) {
valueTransformer.init(context);
}

@Override
public VR transform(final K readOnlyKey, final V value) {
return valueTransformer.transform(value);
}

@Override
public void close() {
valueTransformer.close();
}
};
}

@Override
public Set<StoreBuilder<?>> stores() {
return valueTransformerSupplier.stores();
}
};
}

static <K, V1, V2, VR> ValueJoinerWithKey<K, V1, V2, VR> toValueJoinerWithKey(final ValueJoiner<V1, V2, VR> valueJoiner) {
Objects.requireNonNull(valueJoiner, "joiner can't be null");
return (readOnlyKey, value1, value2) -> valueJoiner.apply(value1, value2);
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.graph.BaseRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.BaseRepartitionNode.BaseRepartitionNodeBuilder;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
Expand Down Expand Up @@ -121,8 +120,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K

private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";

private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";

private static final String FOREACH_NAME = "KSTREAM-FOREACH-";

private static final String TO_KTABLE_NAME = "KSTREAM-TOTABLE-";
Expand Down Expand Up @@ -1202,75 +1199,6 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
builder);
}

@Override
@Deprecated
public <VR> KStream<K, VR> flatTransformValues(final org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
return doFlatTransformValues(
toValueTransformerWithKeySupplier(valueTransformerSupplier),
NamedInternal.empty(),
stateStoreNames);
}

@Override
@Deprecated
public <VR> KStream<K, VR> flatTransformValues(final org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier,
final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
return doFlatTransformValues(
toValueTransformerWithKeySupplier(valueTransformerSupplier),
named,
stateStoreNames);
}

@Override
@Deprecated
public <VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
return doFlatTransformValues(valueTransformerSupplier, NamedInternal.empty(), stateStoreNames);
}

@Override
@Deprecated
public <VR> KStream<K, VR> flatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier,
final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
return doFlatTransformValues(valueTransformerSupplier, named, stateStoreNames);
}

private <VR> KStream<K, VR> doFlatTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerWithKeySupplier,
final Named named,
final String... stateStoreNames) {
Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array");
for (final String stateStoreName : stateStoreNames) {
Objects.requireNonNull(stateStoreName, "stateStoreNames can't contain `null` as store name");
}
ApiUtils.checkSupplier(valueTransformerWithKeySupplier);

final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, TRANSFORMVALUES_NAME);
final StatefulProcessorNode<? super K, ? super V> transformNode = new StatefulProcessorNode<>(
name,
new ProcessorParameters<>(new KStreamFlatTransformValues<>(valueTransformerWithKeySupplier), name),
stateStoreNames);
transformNode.setValueChangingOperation(true);

builder.addGraphNode(graphNode, transformNode);

// cannot inherit value serde
return new KStreamImpl<>(
name,
keySerde,
null,
subTopologySourceNodes,
repartitionRequired,
transformNode,
builder);
}

@Override
@Deprecated
public void process(final org.apache.kafka.streams.processor.ProcessorSupplier<? super K, ? super V> processorSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;

Expand Down Expand Up @@ -92,11 +91,8 @@
* @see Topology#addProcessor(String, org.apache.kafka.streams.processor.api.ProcessorSupplier, String...)
* @see KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, String...)
* @see KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, Named, String...)
* @see KStream#processValues(FixedKeyProcessorSupplier, String...)
* @see KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, String...)
* @see KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, Named, String...)
* @see KStream#flatTransformValues(ValueTransformerWithKeySupplier, String...)
* @see KStream#flatTransformValues(ValueTransformerWithKeySupplier, Named, String...)
* @see KStream#processValues(FixedKeyProcessorSupplier, String...)
* @see KStream#processValues(FixedKeyProcessorSupplier, Named, String...)
*/
public interface ConnectedStoreProvider {

Expand Down
Loading

0 comments on commit d5c2029

Please sign in to comment.