diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java index 57b18b4caf587..5bda71d487b5c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java @@ -72,16 +72,16 @@ static EagerBufferConfig maxBytes(final long byteLimit) { /** * Create a buffer unconstrained by size (either keys or bytes). * - * As a result, the buffer will consume as much memory as it needs, dictated by the time bound. + *
As a result, the buffer will consume as much memory as it needs, dictated by the time bound. * - * If there isn't enough heap available to meet the demand, the application will encounter an + *
If there isn't enough heap available to meet the demand, the application will encounter an * {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that * JVM processes under extreme memory pressure may exhibit poor GC behavior. * - * This is a convenient option if you doubt that your buffer will be that large, but also don't + *
This is a convenient option if you doubt that your buffer will be that large, but also don't * wish to pick particular constraints, such as in testing. * - * This buffer is "strict" in the sense that it will enforce the time bound or crash. + *
This buffer is "strict" in the sense that it will enforce the time bound or crash. * It will never emit early. */ static StrictBufferConfig unbounded() { @@ -91,16 +91,16 @@ static StrictBufferConfig unbounded() { /** * Set the buffer to be unconstrained by size (either keys or bytes). * - * As a result, the buffer will consume as much memory as it needs, dictated by the time bound. + *
As a result, the buffer will consume as much memory as it needs, dictated by the time bound. * - * If there isn't enough heap available to meet the demand, the application will encounter an + *
If there isn't enough heap available to meet the demand, the application will encounter an * {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that * JVM processes under extreme memory pressure may exhibit poor GC behavior. * - * This is a convenient option if you doubt that your buffer will be that large, but also don't + *
This is a convenient option if you doubt that your buffer will be that large, but also don't * wish to pick particular constraints, such as in testing. * - * This buffer is "strict" in the sense that it will enforce the time bound or crash. + *
This buffer is "strict" in the sense that it will enforce the time bound or crash. * It will never emit early. */ StrictBufferConfig withNoBound(); @@ -108,7 +108,7 @@ static StrictBufferConfig unbounded() { /** * Set the buffer to gracefully shut down the application when any of its constraints are violated * - * This buffer is "strict" in the sense that it will enforce the time bound or shut down. + *
This buffer is "strict" in the sense that it will enforce the time bound or shut down. * It will never emit early. */ StrictBufferConfig shutDownWhenFull(); @@ -116,7 +116,7 @@ static StrictBufferConfig unbounded() { /** * Set the buffer to just emit the oldest records when any of its constraints are violated. * - * This buffer is "not strict" in the sense that it may emit early, so it is suitable for reducing + *
This buffer is "not strict" in the sense that it may emit early, so it is suitable for reducing * duplicate results downstream, but does not promise to eliminate them. */ EagerBufferConfig emitEarlyWhenFull(); @@ -125,6 +125,7 @@ static StrictBufferConfig unbounded() { * Disable the changelog for this suppression's internal buffer. * This will turn off fault-tolerance for the suppression, and will result in data loss in the event of a rebalance. * By default, the changelog is enabled. + * * @return this */ BC withLoggingDisabled(); @@ -144,14 +145,14 @@ static StrictBufferConfig unbounded() { /** * Configure the suppression to emit only the "final results" from the window. * - * By default, all Streams operators emit results whenever new results are available. + *
By default, all Streams operators emit results whenever new results are available. * This includes windowed operations. * - * This configuration will instead emit just one result per key for each window, guaranteeing + *
This configuration will instead emit just one result per key for each window, guaranteeing * to deliver only the final result. This option is suitable for use cases in which the business logic * requires a hard guarantee that only the final result is propagated. For example, sending alerts. * - * To accomplish this, the operator will buffer events from the window until the window close (that is, + *
To accomplish this, the operator will buffer events from the window until the window close (that is,
* until the end-time passes, and additionally until the grace period expires). Since windowed operators
* are required to reject out-of-order events for a window whose grace period is expired, there is an additional
* guarantee that the final results emitted from this suppression will match any queryable state upstream.
@@ -161,7 +162,7 @@ static StrictBufferConfig unbounded() {
* property to emit early and then issue an update later.
* @return a "final results" mode suppression configuration
*/
- static Suppressed
- * This can be used to insert a suppression without changing the rest of the topology names
+ *
+ * This can be used to insert a suppression without changing the rest of the topology names
* (and therefore not requiring an application reset).
- *
- * Note however, that once a suppression has buffered some records, removing it from the topology would cause
+ *
+ * Note however, that once a suppression has buffered some records, removing it from the topology would cause
* the loss of those records.
- *
- * A suppression can be "disabled" with the configuration {@code untilTimeLimit(Duration.ZERO, ...}.
+ *
+ * A suppression can be "disabled" with the configuration {@code untilTimeLimit(Duration.ZERO, ...}.
*
* @param name The name to be used for the suppression node and changelog topic
* @return The same configuration with the addition of the given {@code name}.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
index e917556c8736e..9aff6e61b8477 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
@@ -22,7 +22,7 @@
import java.time.Duration;
import java.util.Objects;
-public class FinalResultsSuppressionBuilder A alternative is to remember whether a result has previously been emitted
* for a key and drop tombstones in that case, but it would be a little complicated to
* figure out when to forget the fact that we have emitted some result (currently, the
* buffer immediately forgets all about a key when we emit, which helps to keep it
@@ -47,13 +47,13 @@ public class SuppressedInternal