From f084eae437e7da8401f317790c5bf4f5894656a4 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 17 Dec 2024 16:19:50 -0800 Subject: [PATCH] KAFKA-17131: Refactor TimeDefinitions Refactor TimeDefintiions to not use old ProcessorContext any longer --- .../kafka/streams/kstream/Suppressed.java | 43 ++++++++++--------- .../FinalResultsSuppressionBuilder.java | 2 +- .../KTableSuppressProcessorSupplier.java | 6 +-- .../suppress/SuppressedInternal.java | 10 ++--- .../internals/suppress/TimeDefinitions.java | 40 ++++++----------- 5 files changed, 43 insertions(+), 58 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java index 57b18b4caf587..5bda71d487b5c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java @@ -72,16 +72,16 @@ static EagerBufferConfig maxBytes(final long byteLimit) { /** * Create a buffer unconstrained by size (either keys or bytes). * - * As a result, the buffer will consume as much memory as it needs, dictated by the time bound. + *

As a result, the buffer will consume as much memory as it needs, dictated by the time bound. * - * If there isn't enough heap available to meet the demand, the application will encounter an + *

If there isn't enough heap available to meet the demand, the application will encounter an * {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that * JVM processes under extreme memory pressure may exhibit poor GC behavior. * - * This is a convenient option if you doubt that your buffer will be that large, but also don't + *

This is a convenient option if you doubt that your buffer will be that large, but also don't * wish to pick particular constraints, such as in testing. * - * This buffer is "strict" in the sense that it will enforce the time bound or crash. + *

This buffer is "strict" in the sense that it will enforce the time bound or crash. * It will never emit early. */ static StrictBufferConfig unbounded() { @@ -91,16 +91,16 @@ static StrictBufferConfig unbounded() { /** * Set the buffer to be unconstrained by size (either keys or bytes). * - * As a result, the buffer will consume as much memory as it needs, dictated by the time bound. + *

As a result, the buffer will consume as much memory as it needs, dictated by the time bound. * - * If there isn't enough heap available to meet the demand, the application will encounter an + *

If there isn't enough heap available to meet the demand, the application will encounter an * {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that * JVM processes under extreme memory pressure may exhibit poor GC behavior. * - * This is a convenient option if you doubt that your buffer will be that large, but also don't + *

This is a convenient option if you doubt that your buffer will be that large, but also don't * wish to pick particular constraints, such as in testing. * - * This buffer is "strict" in the sense that it will enforce the time bound or crash. + *

This buffer is "strict" in the sense that it will enforce the time bound or crash. * It will never emit early. */ StrictBufferConfig withNoBound(); @@ -108,7 +108,7 @@ static StrictBufferConfig unbounded() { /** * Set the buffer to gracefully shut down the application when any of its constraints are violated * - * This buffer is "strict" in the sense that it will enforce the time bound or shut down. + *

This buffer is "strict" in the sense that it will enforce the time bound or shut down. * It will never emit early. */ StrictBufferConfig shutDownWhenFull(); @@ -116,7 +116,7 @@ static StrictBufferConfig unbounded() { /** * Set the buffer to just emit the oldest records when any of its constraints are violated. * - * This buffer is "not strict" in the sense that it may emit early, so it is suitable for reducing + *

This buffer is "not strict" in the sense that it may emit early, so it is suitable for reducing * duplicate results downstream, but does not promise to eliminate them. */ EagerBufferConfig emitEarlyWhenFull(); @@ -125,6 +125,7 @@ static StrictBufferConfig unbounded() { * Disable the changelog for this suppression's internal buffer. * This will turn off fault-tolerance for the suppression, and will result in data loss in the event of a rebalance. * By default, the changelog is enabled. + * * @return this */ BC withLoggingDisabled(); @@ -144,14 +145,14 @@ static StrictBufferConfig unbounded() { /** * Configure the suppression to emit only the "final results" from the window. * - * By default, all Streams operators emit results whenever new results are available. + *

By default, all Streams operators emit results whenever new results are available. * This includes windowed operations. * - * This configuration will instead emit just one result per key for each window, guaranteeing + *

This configuration will instead emit just one result per key for each window, guaranteeing * to deliver only the final result. This option is suitable for use cases in which the business logic * requires a hard guarantee that only the final result is propagated. For example, sending alerts. * - * To accomplish this, the operator will buffer events from the window until the window close (that is, + *

To accomplish this, the operator will buffer events from the window until the window close (that is, * until the end-time passes, and additionally until the grace period expires). Since windowed operators * are required to reject out-of-order events for a window whose grace period is expired, there is an additional * guarantee that the final results emitted from this suppression will match any queryable state upstream. @@ -161,7 +162,7 @@ static StrictBufferConfig unbounded() { * property to emit early and then issue an update later. * @return a "final results" mode suppression configuration */ - static Suppressed untilWindowCloses(final StrictBufferConfig bufferConfig) { + static Suppressed> untilWindowCloses(final StrictBufferConfig bufferConfig) { return new FinalResultsSuppressionBuilder<>(null, bufferConfig); } @@ -175,20 +176,20 @@ static Suppressed untilWindowCloses(final StrictBufferConfig bufferCon * @param The key type for the KTable to apply this suppression to. * @return a suppression configuration */ - static Suppressed untilTimeLimit(final Duration timeToWaitForMoreEvents, final BufferConfig bufferConfig) { + static Suppressed untilTimeLimit(final Duration timeToWaitForMoreEvents, final BufferConfig bufferConfig) { return new SuppressedInternal<>(null, timeToWaitForMoreEvents, bufferConfig, null, false); } /** * Use the specified name for the suppression node in the topology. - *

- * This can be used to insert a suppression without changing the rest of the topology names + * + *

This can be used to insert a suppression without changing the rest of the topology names * (and therefore not requiring an application reset). - *

- * Note however, that once a suppression has buffered some records, removing it from the topology would cause + * + *

Note however, that once a suppression has buffered some records, removing it from the topology would cause * the loss of those records. - *

- * A suppression can be "disabled" with the configuration {@code untilTimeLimit(Duration.ZERO, ...}. + * + *

A suppression can be "disabled" with the configuration {@code untilTimeLimit(Duration.ZERO, ...}. * * @param name The name to be used for the suppression node and changelog topic * @return The same configuration with the addition of the given {@code name}. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java index e917556c8736e..9aff6e61b8477 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java @@ -22,7 +22,7 @@ import java.time.Duration; import java.util.Objects; -public class FinalResultsSuppressionBuilder implements Suppressed, NamedSuppressed { +public class FinalResultsSuppressionBuilder> implements Suppressed, NamedSuppressed { private final String name; private final StrictBufferConfig bufferConfig; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java index 0b0c6ca15e9f7..3f98d444bb257 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java @@ -63,12 +63,12 @@ public Processor, K, Change> get() { @Override public KTableValueGetterSupplier view() { final KTableValueGetterSupplier parentValueGetterSupplier = parentKTable.valueGetterSupplier(); - return new KTableValueGetterSupplier() { + return new KTableValueGetterSupplier<>() { @Override public KTableValueGetter get() { final KTableValueGetter parentGetter = parentValueGetterSupplier.get(); - return new KTableValueGetter() { + return new KTableValueGetter<>() { private TimeOrderedKeyValueBuffer> buffer; @Override @@ -166,7 +166,7 @@ public void process(final Record> record) { } private void buffer(final Record> record) { - final long bufferTime = bufferTimeDefinition.time(internalProcessorContext, record.key()); + final long bufferTime = bufferTimeDefinition.time(internalProcessorContext.recordContext(), record.key()); buffer.put(bufferTime, record, internalProcessorContext.recordContext()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java index 51307bba9f5d7..89b07a9808b30 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java @@ -27,7 +27,7 @@ public class SuppressedInternal implements Suppressed, NamedSuppressed private static final StrictBufferConfigImpl DEFAULT_BUFFER_CONFIG = (StrictBufferConfigImpl) BufferConfig.unbounded(); private final String name; - private final BufferConfigInternal bufferConfig; + private final BufferConfigInternal bufferConfig; private final Duration timeToWaitForMoreEvents; private final TimeDefinition timeDefinition; private final boolean safeToDropTombstones; @@ -39,7 +39,7 @@ public class SuppressedInternal implements Suppressed, NamedSuppressed * idempotent and correct). We decided that the unnecessary tombstones would not be * desirable in the output stream, though, hence the ability to drop them. * - * A alternative is to remember whether a result has previously been emitted + *

A alternative is to remember whether a result has previously been emitted * for a key and drop tombstones in that case, but it would be a little complicated to * figure out when to forget the fact that we have emitted some result (currently, the * buffer immediately forgets all about a key when we emit, which helps to keep it @@ -47,13 +47,13 @@ public class SuppressedInternal implements Suppressed, NamedSuppressed */ public SuppressedInternal(final String name, final Duration suppressionTime, - final BufferConfig bufferConfig, + final BufferConfig bufferConfig, final TimeDefinition timeDefinition, final boolean safeToDropTombstones) { this.name = name; this.timeToWaitForMoreEvents = suppressionTime == null ? DEFAULT_SUPPRESSION_TIME : suppressionTime; this.timeDefinition = timeDefinition == null ? TimeDefinitions.RecordTimeDefinition.instance() : timeDefinition; - this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : (BufferConfigInternal) bufferConfig; + this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : (BufferConfigInternal) bufferConfig; this.safeToDropTombstones = safeToDropTombstones; } @@ -69,7 +69,7 @@ public String name() { @SuppressWarnings("unchecked") public > BufferConfigInternal bufferConfig() { - return bufferConfig; + return (BufferConfigInternal) bufferConfig; } TimeDefinition timeDefinition() { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinitions.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinitions.java index 640965fdd6a21..c4a38e23c97c8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinitions.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinitions.java @@ -17,63 +17,47 @@ package org.apache.kafka.streams.kstream.internals.suppress; import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.RecordContext; final class TimeDefinitions { private TimeDefinitions() {} - enum TimeDefinitionType { - RECORD_TIME, WINDOW_END_TIME - } - /** * This interface should never be instantiated outside of this class. */ interface TimeDefinition { - long time(final ProcessorContext context, final K key); - - TimeDefinitionType type(); + long time(final RecordContext context, final K key); } - public static class RecordTimeDefinition implements TimeDefinition { - private static final RecordTimeDefinition INSTANCE = new RecordTimeDefinition(); + static class RecordTimeDefinition implements TimeDefinition { + private static final RecordTimeDefinition INSTANCE = new RecordTimeDefinition<>(); private RecordTimeDefinition() {} @SuppressWarnings("unchecked") - public static RecordTimeDefinition instance() { - return RecordTimeDefinition.INSTANCE; + static RecordTimeDefinition instance() { + return (RecordTimeDefinition) RecordTimeDefinition.INSTANCE; } @Override - public long time(final ProcessorContext context, final K key) { + public long time(final RecordContext context, final K key) { return context.timestamp(); } - - @Override - public TimeDefinitionType type() { - return TimeDefinitionType.RECORD_TIME; - } } - public static class WindowEndTimeDefinition implements TimeDefinition { - private static final WindowEndTimeDefinition INSTANCE = new WindowEndTimeDefinition(); + static class WindowEndTimeDefinition> implements TimeDefinition { + private static final WindowEndTimeDefinition INSTANCE = new WindowEndTimeDefinition<>(); private WindowEndTimeDefinition() {} @SuppressWarnings("unchecked") - public static WindowEndTimeDefinition instance() { - return WindowEndTimeDefinition.INSTANCE; + static > WindowEndTimeDefinition instance() { + return (WindowEndTimeDefinition) WindowEndTimeDefinition.INSTANCE; } @Override - public long time(final ProcessorContext context, final K key) { + public long time(final RecordContext context, final K key) { return key.window().end(); } - - @Override - public TimeDefinitionType type() { - return TimeDefinitionType.WINDOW_END_TIME; - } } }