Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17131: Refactor TimeDefinitions #18241

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,16 @@ static EagerBufferConfig maxBytes(final long byteLimit) {
/**
* Create a buffer unconstrained by size (either keys or bytes).
*
* As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
* <p>As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some side cleanup in related code...

*
* If there isn't enough heap available to meet the demand, the application will encounter an
* <p>If there isn't enough heap available to meet the demand, the application will encounter an
* {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that
* JVM processes under extreme memory pressure may exhibit poor GC behavior.
*
* This is a convenient option if you doubt that your buffer will be that large, but also don't
* <p>This is a convenient option if you doubt that your buffer will be that large, but also don't
* wish to pick particular constraints, such as in testing.
*
* This buffer is "strict" in the sense that it will enforce the time bound or crash.
* <p>This buffer is "strict" in the sense that it will enforce the time bound or crash.
* It will never emit early.
*/
static StrictBufferConfig unbounded() {
Expand All @@ -91,32 +91,32 @@ static StrictBufferConfig unbounded() {
/**
* Set the buffer to be unconstrained by size (either keys or bytes).
*
* As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
* <p>As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
*
* If there isn't enough heap available to meet the demand, the application will encounter an
* <p>If there isn't enough heap available to meet the demand, the application will encounter an
* {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that
* JVM processes under extreme memory pressure may exhibit poor GC behavior.
*
* This is a convenient option if you doubt that your buffer will be that large, but also don't
* <p>This is a convenient option if you doubt that your buffer will be that large, but also don't
* wish to pick particular constraints, such as in testing.
*
* This buffer is "strict" in the sense that it will enforce the time bound or crash.
* <p>This buffer is "strict" in the sense that it will enforce the time bound or crash.
* It will never emit early.
*/
StrictBufferConfig withNoBound();

/**
* Set the buffer to gracefully shut down the application when any of its constraints are violated
*
* This buffer is "strict" in the sense that it will enforce the time bound or shut down.
* <p>This buffer is "strict" in the sense that it will enforce the time bound or shut down.
* It will never emit early.
*/
StrictBufferConfig shutDownWhenFull();

/**
* Set the buffer to just emit the oldest records when any of its constraints are violated.
*
* This buffer is "not strict" in the sense that it may emit early, so it is suitable for reducing
* <p>This buffer is "not strict" in the sense that it may emit early, so it is suitable for reducing
* duplicate results downstream, but does not promise to eliminate them.
*/
EagerBufferConfig emitEarlyWhenFull();
Expand All @@ -125,6 +125,7 @@ static StrictBufferConfig unbounded() {
* Disable the changelog for this suppression's internal buffer.
* This will turn off fault-tolerance for the suppression, and will result in data loss in the event of a rebalance.
* By default, the changelog is enabled.
*
* @return this
*/
BC withLoggingDisabled();
Expand All @@ -144,14 +145,14 @@ static StrictBufferConfig unbounded() {
/**
* Configure the suppression to emit only the "final results" from the window.
*
* By default, all Streams operators emit results whenever new results are available.
* <p>By default, all Streams operators emit results whenever new results are available.
* This includes windowed operations.
*
* This configuration will instead emit just one result per key for each window, guaranteeing
* <p>This configuration will instead emit just one result per key for each window, guaranteeing
* to deliver only the final result. This option is suitable for use cases in which the business logic
* requires a hard guarantee that only the final result is propagated. For example, sending alerts.
*
* To accomplish this, the operator will buffer events from the window until the window close (that is,
* <p>To accomplish this, the operator will buffer events from the window until the window close (that is,
* until the end-time passes, and additionally until the grace period expires). Since windowed operators
* are required to reject out-of-order events for a window whose grace period is expired, there is an additional
* guarantee that the final results emitted from this suppression will match any queryable state upstream.
Expand All @@ -161,7 +162,7 @@ static StrictBufferConfig unbounded() {
* property to emit early and then issue an update later.
* @return a "final results" mode suppression configuration
*/
static Suppressed<Windowed> untilWindowCloses(final StrictBufferConfig bufferConfig) {
static Suppressed<Windowed<?>> untilWindowCloses(final StrictBufferConfig bufferConfig) {
return new FinalResultsSuppressionBuilder<>(null, bufferConfig);
}

Expand All @@ -175,20 +176,20 @@ static Suppressed<Windowed> untilWindowCloses(final StrictBufferConfig bufferCon
* @param <K> The key type for the KTable to apply this suppression to.
* @return a suppression configuration
*/
static <K> Suppressed<K> untilTimeLimit(final Duration timeToWaitForMoreEvents, final BufferConfig bufferConfig) {
static <K> Suppressed<K> untilTimeLimit(final Duration timeToWaitForMoreEvents, final BufferConfig<?> bufferConfig) {
return new SuppressedInternal<>(null, timeToWaitForMoreEvents, bufferConfig, null, false);
}

/**
* Use the specified name for the suppression node in the topology.
* <p>
* This can be used to insert a suppression without changing the rest of the topology names
*
* <p>This can be used to insert a suppression without changing the rest of the topology names
* (and therefore not requiring an application reset).
* <p>
* Note however, that once a suppression has buffered some records, removing it from the topology would cause
*
* <p>Note however, that once a suppression has buffered some records, removing it from the topology would cause
* the loss of those records.
* <p>
* A suppression can be "disabled" with the configuration {@code untilTimeLimit(Duration.ZERO, ...}.
*
* <p>A suppression can be "disabled" with the configuration {@code untilTimeLimit(Duration.ZERO, ...}.
*
* @param name The name to be used for the suppression node and changelog topic
* @return The same configuration with the addition of the given {@code name}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.time.Duration;
import java.util.Objects;

public class FinalResultsSuppressionBuilder<K extends Windowed> implements Suppressed<K>, NamedSuppressed<K> {
public class FinalResultsSuppressionBuilder<K extends Windowed<?>> implements Suppressed<K>, NamedSuppressed<K> {
private final String name;
private final StrictBufferConfig bufferConfig;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ public Processor<K, Change<V>, K, Change<V>> get() {
@Override
public KTableValueGetterSupplier<K, V> view() {
final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parentKTable.valueGetterSupplier();
return new KTableValueGetterSupplier<K, V>() {
return new KTableValueGetterSupplier<>() {

@Override
public KTableValueGetter<K, V> get() {
final KTableValueGetter<K, V> parentGetter = parentValueGetterSupplier.get();
return new KTableValueGetter<K, V>() {
return new KTableValueGetter<>() {
private TimeOrderedKeyValueBuffer<K, V, Change<V>> buffer;

@Override
Expand Down Expand Up @@ -166,7 +166,7 @@ public void process(final Record<K, Change<V>> record) {
}

private void buffer(final Record<K, Change<V>> record) {
final long bufferTime = bufferTimeDefinition.time(internalProcessorContext, record.key());
final long bufferTime = bufferTimeDefinition.time(internalProcessorContext.recordContext(), record.key());

buffer.put(bufferTime, record, internalProcessorContext.recordContext());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class SuppressedInternal<K> implements Suppressed<K>, NamedSuppressed<K>
private static final StrictBufferConfigImpl DEFAULT_BUFFER_CONFIG = (StrictBufferConfigImpl) BufferConfig.unbounded();

private final String name;
private final BufferConfigInternal bufferConfig;
private final BufferConfigInternal<?> bufferConfig;
private final Duration timeToWaitForMoreEvents;
private final TimeDefinition<K> timeDefinition;
private final boolean safeToDropTombstones;
Expand All @@ -39,21 +39,21 @@ public class SuppressedInternal<K> implements Suppressed<K>, NamedSuppressed<K>
* idempotent and correct). We decided that the unnecessary tombstones would not be
* desirable in the output stream, though, hence the ability to drop them.
*
* A alternative is to remember whether a result has previously been emitted
* <p>A alternative is to remember whether a result has previously been emitted
* for a key and drop tombstones in that case, but it would be a little complicated to
* figure out when to forget the fact that we have emitted some result (currently, the
* buffer immediately forgets all about a key when we emit, which helps to keep it
* compact).
*/
public SuppressedInternal(final String name,
final Duration suppressionTime,
final BufferConfig bufferConfig,
final BufferConfig<?> bufferConfig,
final TimeDefinition<K> timeDefinition,
final boolean safeToDropTombstones) {
this.name = name;
this.timeToWaitForMoreEvents = suppressionTime == null ? DEFAULT_SUPPRESSION_TIME : suppressionTime;
this.timeDefinition = timeDefinition == null ? TimeDefinitions.RecordTimeDefinition.instance() : timeDefinition;
this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : (BufferConfigInternal) bufferConfig;
this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : (BufferConfigInternal<?>) bufferConfig;
this.safeToDropTombstones = safeToDropTombstones;
}

Expand All @@ -69,7 +69,7 @@ public String name() {

@SuppressWarnings("unchecked")
public <BC extends Suppressed.BufferConfig<BC>> BufferConfigInternal<BC> bufferConfig() {
return bufferConfig;
return (BufferConfigInternal<BC>) bufferConfig;
}

TimeDefinition<K> timeDefinition() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,63 +17,47 @@
package org.apache.kafka.streams.kstream.internals.suppress;

import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.RecordContext;

final class TimeDefinitions {
private TimeDefinitions() {}

enum TimeDefinitionType {
RECORD_TIME, WINDOW_END_TIME
}

/**
* This interface should never be instantiated outside of this class.
*/
interface TimeDefinition<K> {
long time(final ProcessorContext context, final K key);

TimeDefinitionType type();
long time(final RecordContext context, final K key);
}

public static class RecordTimeDefinition<K> implements TimeDefinition<K> {
private static final RecordTimeDefinition INSTANCE = new RecordTimeDefinition();
static class RecordTimeDefinition<K> implements TimeDefinition<K> {
private static final RecordTimeDefinition<?> INSTANCE = new RecordTimeDefinition<>();

private RecordTimeDefinition() {}

@SuppressWarnings("unchecked")
public static <K> RecordTimeDefinition<K> instance() {
return RecordTimeDefinition.INSTANCE;
static <K> RecordTimeDefinition<K> instance() {
return (RecordTimeDefinition<K>) RecordTimeDefinition.INSTANCE;
}

@Override
public long time(final ProcessorContext context, final K key) {
public long time(final RecordContext context, final K key) {
return context.timestamp();
}

@Override
public TimeDefinitionType type() {
return TimeDefinitionType.RECORD_TIME;
}
}

public static class WindowEndTimeDefinition<K extends Windowed> implements TimeDefinition<K> {
private static final WindowEndTimeDefinition INSTANCE = new WindowEndTimeDefinition();
static class WindowEndTimeDefinition<K extends Windowed<?>> implements TimeDefinition<K> {
private static final WindowEndTimeDefinition<?> INSTANCE = new WindowEndTimeDefinition<>();

private WindowEndTimeDefinition() {}

@SuppressWarnings("unchecked")
public static <K extends Windowed> WindowEndTimeDefinition<K> instance() {
return WindowEndTimeDefinition.INSTANCE;
static <K extends Windowed<?>> WindowEndTimeDefinition<K> instance() {
return (WindowEndTimeDefinition<K>) WindowEndTimeDefinition.INSTANCE;
}

@Override
public long time(final ProcessorContext context, final K key) {
public long time(final RecordContext context, final K key) {
return key.window().end();
}

@Override
public TimeDefinitionType type() {
return TimeDefinitionType.WINDOW_END_TIME;
}
}
}
Loading