diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 46ace65cf04f8..24dd92fa431eb 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -1200,7 +1200,7 @@ public void process(final Record record) { sum += value; } state.put(key, sum); - state.flush(); + state.commit(Map.of()); } diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java index 20fc7f4723642..c2b532afc7724 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; @@ -383,7 +384,7 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { + public void commit(final Map changelogOffsets) { } diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index 67435c3afeb3d..ae5557b1f6bac 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -54,6 +54,7 @@ import java.io.IOException; import java.time.Duration; import java.util.Collections; +import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -358,7 +359,7 @@ public void process(final Record record) { } store.put(key, value); - store.flush(); + store.commit(Map.of()); if (key == KEY_1) { // after error injection, we need to avoid a consecutive error after rebalancing diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java index d3ec4d9838e22..740b5b1af5c31 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.integration; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; @@ -821,7 +822,7 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { + public void commit(final Map changelogOffsets) { // do nothing } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java index 38a3e23e28a1e..215155e08f9f7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability.Evolving; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.api.ProcessorContext; @@ -26,6 +27,8 @@ import org.apache.kafka.streams.query.QueryConfig; import org.apache.kafka.streams.query.QueryResult; +import java.util.Map; + /** * A storage engine for managing state maintained by a stream processor. *

@@ -73,8 +76,55 @@ public interface StateStore { /** * Flush any cached data + * + * @deprecated Use {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() ProcessorContext#commit()} + * instead. + */ + @Deprecated + default void flush() { + // no-op + } + + /** + * Commit all written records to this StateStore. + *

+ * This method CANNOT be called by users from {@link org.apache.kafka.streams.processor.api.Processor + * processors}. Doing so will throw an {@link java.lang.UnsupportedOperationException}. + *

+ * Instead, users should call {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() + * ProcessorContext#commit()} to request a Task commit. + *

+ * If {@link #managesOffsets()} returns {@code true}, the given {@code changelogOffsets} will be guaranteed to be + * persisted to disk along with the written records. + *

+ * {@code changelogOffsets} will usually contain a single partition, in the case of a regular StateStore. However, + * they may contain multiple partitions in the case of a Global StateStore with multiple partitions. All provided + * partitions MUST be persisted to disk. + *

+ * Implementations SHOULD ensure that {@code changelogOffsets} are committed to disk atomically with the + * records they represent, if possible. + * + * @param changelogOffsets The changelog offset(s) corresponding to the most recently written records. + */ + default void commit(final Map changelogOffsets) { + flush(); + } + + /** + * Returns the most recently {@link #commit(Map) committed} offset for the given {@link TopicPartition}. + *

+ * If {@link #managesOffsets()} and {@link #persistent()} both return {@code true}, this method will return the + * offset that corresponds to the changelog record most recently written to this store, for the given {@code + * partition}. + * + * @param partition The partition to get the committed offset for. + * @return The last {@link #commit(Map) committed} offset for the {@code partition}; or {@code null} if no offset + * has been committed for the partition, or if either {@link #persistent()} or {@link #managesOffsets()} + * return {@code false}. */ - void flush(); + default Long committedOffset(final TopicPartition partition) { + return null; + } /** * Close the storage engine. @@ -93,6 +143,32 @@ public interface StateStore { */ boolean persistent(); + /** + * Determines if this StateStore manages its own offsets. + *

+ * If this method returns {@code true}, then offsets provided to {@link #commit(Map)} will be retrievable using + * {@link #committedOffset(TopicPartition)}. + *

+ * If this method returns {@code false}, offsets provided to {@link #commit(Map)} will be ignored, and {@link + * #committedOffset(TopicPartition)} will be expected to always return {@code null}. + *

+ * This method is provided to enable custom StateStores to opt-in to managing their own offsets. This is required, + * to ensure that custom StateStores provide the consistency guarantees that Kafka Streams expects when operating + * under an {@code exactly-once} {@code processing.guarantee}. + *

+ * New implementations are required to implement this method and return {@code true}. Existing implementations + * should upgrade to managing their own offsets as soon as possible, as the legacy offset management is deprecated + * and will be removed in a future version. + * + * @deprecated New implementations should always return {@code true} and manage their own offsets. In the future, + * this method will be removed and it will be assumed to always return {@code true}. + * @return Whether this StateStore manages its own offsets. + */ + @Deprecated + default boolean managesOffsets() { + return false; + } + /** * Is this store open for reading and writing * @return {@code true} if the store is open diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java index c8b683aa96cba..01fb9d7bf3ce8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; @@ -34,6 +35,7 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore; import java.util.List; +import java.util.Map; abstract class AbstractReadOnlyDecorator extends WrappedStateStore { @@ -44,6 +46,7 @@ private AbstractReadOnlyDecorator(final T inner) { } @Override + @Deprecated public void flush() { throw new UnsupportedOperationException(ERROR_MESSAGE); } @@ -54,6 +57,11 @@ public void init(final StateStoreContext stateStoreContext, throw new UnsupportedOperationException(ERROR_MESSAGE); } + @Override + public void commit(final Map changelogOffsets) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + @Override public void close() { throw new UnsupportedOperationException(ERROR_MESSAGE); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java index d9772027cb2d8..569adad58c325 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; @@ -34,6 +35,7 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore; import java.util.List; +import java.util.Map; abstract class AbstractReadWriteDecorator extends WrappedStateStore { static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams"; @@ -48,6 +50,11 @@ public void init(final StateStoreContext stateStoreContext, throw new UnsupportedOperationException(ERROR_MESSAGE); } + @Override + public void commit(final Map changelogOffsets) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + @Override public void close() { throw new UnsupportedOperationException(ERROR_MESSAGE); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 2bf65c31d7512..cbb7c4ebbcee2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -464,16 +464,16 @@ private long maybeUpdateDeadlineOrThrow(final long currentDeadlineMs) { @Override public void flush() { - log.debug("Flushing all global globalStores registered in the state manager"); + log.debug("Committing all global globalStores registered in the state manager"); for (final Map.Entry> entry : globalStores.entrySet()) { if (entry.getValue().isPresent()) { final StateStore store = entry.getValue().get(); try { - log.trace("Flushing global store={}", store.name()); - store.flush(); + log.trace("Committing global store={}", store.name()); + store.commit(Map.of()); } catch (final RuntimeException e) { throw new ProcessorStateException( - String.format("Failed to flush global state store %s", store.name()), + String.format("Failed to commit global state store %s", store.name()), e ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 3506845d288af..cea6baf2be471 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -521,36 +521,36 @@ void restore(final StateStoreMetadata storeMetadata, final List) store).flushCache(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java index e05b6328ec8b3..85b85855a36be 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -277,8 +278,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - segments.flush(); + public void commit(final Map changelogOffsets) { + segments.commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index bde8d8319197d..201ca7ff9411a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; @@ -326,8 +327,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - segments.flush(); + public void commit(final Map changelogOffsets) { + segments.commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java index 4f7ca5e59ae9c..71044792a0b8a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.query.Position; @@ -160,9 +161,9 @@ public List allSegments(final boolean forward) { } @Override - public void flush() { + public void commit(final Map changelogOffsets) { for (final S segment : segments.values()) { - segment.flush(); + segment.commit(changelogOffsets); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 83343d04494d6..c630481472726 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -452,13 +453,13 @@ public long approximateNumEntries() { } @Override - public void flush() { + public void commit(final Map changelogOffsets) { validateStoreOpen(); lock.writeLock().lock(); try { validateStoreOpen(); internalContext.cache().flush(cacheName); - wrapped().flush(); + wrapped().commit(changelogOffsets); } finally { lock.writeLock().unlock(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index ec0c1bd077d6f..20329d393160b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; @@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory; import java.util.LinkedList; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; @@ -315,9 +317,10 @@ public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFr return backwardFindSessions(keyFrom, keyTo, 0, Long.MAX_VALUE); } - public void flush() { + @Override + public void commit(final Map changelogOffsets) { internalContext.cache().flush(cacheName); - wrapped().flush(); + wrapped().commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 0432c1726cb3e..3bdbe24fe1b27 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -37,6 +38,7 @@ import org.slf4j.LoggerFactory; import java.util.LinkedList; +import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicLong; @@ -407,9 +409,9 @@ public KeyValueIterator, byte[]> backwardAll() { } @Override - public synchronized void flush() { + public synchronized void commit(final Map changelogOffsets) { internalContext.cache().flush(cacheName); - wrapped().flush(); + wrapped().commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index e7449ea49d44a..51744ad9cedf5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -38,6 +39,7 @@ import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NavigableMap; import java.util.NoSuchElementException; import java.util.Set; @@ -229,7 +231,7 @@ public long approximateNumEntries() { } @Override - public void flush() { + public void commit(final Map changelogOffsets) { // do-nothing since it is in-memory } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index ed2bb1868867c..1a5c3197442ea 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -362,7 +363,7 @@ public QueryResult query(final Query query, } @Override - public void flush() { + public void commit(final Map changelogOffsets) { // do-nothing since it is in-memory } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java index b1471591cb651..eff87a86fee0e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -244,7 +245,7 @@ public void close() { } @Override - public void flush() { + public void commit(final Map changelogOffsets) { if (loggingEnabled) { // counting on this getting called before the record collector's flush for (final Bytes key : dirtyKeys) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index d3d5ba4a20d44..c883576977d96 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -385,7 +386,7 @@ public QueryResult query(final Query query, } @Override - public void flush() { + public void commit(final Map changelogOffsets) { // do-nothing since it is in-memory } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java index dce28444d85ec..4b53eb890c858 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; @@ -30,6 +31,8 @@ import org.apache.kafka.streams.state.VersionedKeyValueStore; import org.apache.kafka.streams.state.VersionedRecord; +import java.util.Map; + /** * A wrapper class for non-windowed key-value stores used within the DSL. All such stores are * instances of either {@link TimestampedKeyValueStore} or {@link VersionedKeyValueStore}. @@ -127,8 +130,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - store.flush(); + public void commit(final Map changelogOffsets) { + store.commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java index 8e79a86bc2b8a..f62c703d91d5a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -36,6 +37,7 @@ import org.apache.kafka.streams.state.KeyValueStore; import java.util.List; +import java.util.Map; import static org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat; import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue; @@ -100,8 +102,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - store.flush(); + public void commit(final Map changelogOffsets) { + store.commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java index 18b371048c3f1..4d39b94561b8b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.BytesSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -36,6 +37,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -142,8 +144,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - throw new UnsupportedOperationException("nothing to flush for logical segment"); + public void commit(final Map changelogOffsets) { + throw new UnsupportedOperationException("nothing to commit for logical segment"); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java index c46a2c2788c4b..fbca4f5cd4e2d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.query.Position; @@ -32,7 +33,7 @@ * {@link #segmentForTimestamp(long)}, {@link #getOrCreateSegment(long, StateStoreContext)}, * {@link #getOrCreateSegmentIfLive(long, StateStoreContext, long)}, * {@link #segments(long, long, boolean)}, and {@link #allSegments(boolean)} - * only return regular segments and not reserved segments. The methods {@link #flush()} + * only return regular segments and not reserved segments. The methods {@link #commit(Map)} * and {@link #close()} flush and close both regular and reserved segments, due to * the fact that both types of segments share the same physical RocksDB instance. * To create a reserved segment, use {@link #createReservedSegment(long, String)} instead. @@ -114,8 +115,8 @@ public void cleanupExpiredSegments(final long streamTime) { } @Override - public void flush() { - physicalStore.flush(); + public void commit(final Map changelogOffsets) { + physicalStore.commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index dc77be5da01dd..9729e63e60a98 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -219,7 +220,7 @@ public long approximateNumEntries() { } @Override - public void flush() { + public void commit(final Map changelogOffsets) { // do-nothing since it is in-memory } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index a89bf0c62c1b0..f41e370348850 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; @@ -383,8 +384,8 @@ public KeyValueIterator reverseAll() { } @Override - public void flush() { - maybeMeasureLatency(super::flush, time, flushSensor); + public void commit(final Map changelogOffsets) { + maybeMeasureLatency(() -> super.commit(changelogOffsets), time, flushSensor); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 7794a6ebc510a..484d3586ce5e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; @@ -398,8 +399,8 @@ public KeyValueIterator, V> backwardFindSessions(final K keyFrom, } @Override - public void flush() { - maybeMeasureLatency(super::flush, time, flushSensor); + public void commit(final Map changelogOffsets) { + maybeMeasureLatency(() -> super.commit(changelogOffsets), time, flushSensor); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java index 6afb4d1531d44..34f137e3b3fb4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; @@ -346,8 +347,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - internal.flush(); + public void commit(final Map changelogOffsets) { + internal.commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 1ba37da6dab0c..8fc5781d184cb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; @@ -355,8 +356,8 @@ public KeyValueIterator, V> backwardAll() { } @Override - public void flush() { - maybeMeasureLatency(super::flush, time, flushSensor); + public void commit(final Map changelogOffsets) { + maybeMeasureLatency(() -> super.commit(changelogOffsets), time, flushSensor); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index ede618237cf35..d4705ad62d125 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; @@ -633,14 +634,14 @@ private boolean isOverflowing(final long value) { } @Override - public synchronized void flush() { + public synchronized void commit(final Map changelogOffsets) { if (db == null) { return; } try { - cfAccessor.flush(dbAccessor); + cfAccessor.commit(dbAccessor, changelogOffsets); } catch (final RocksDBException e) { - throw new ProcessorStateException("Error while executing flush from store " + name, e); + throw new ProcessorStateException("Error while executing commit from store " + name, e); } } @@ -835,7 +836,7 @@ ManagedKeyValueIterator range(final DBAccessor accessor, long approximateNumEntries(final DBAccessor accessor) throws RocksDBException; - void flush(final DBAccessor accessor) throws RocksDBException; + void commit(final DBAccessor accessor, final Map changelogOffsets) throws RocksDBException; void addToBatch(final byte[] key, final byte[] value, @@ -951,7 +952,8 @@ public long approximateNumEntries(final DBAccessor accessor) throws RocksDBExcep } @Override - public void flush(final DBAccessor accessor) throws RocksDBException { + public void commit(final DBAccessor accessor, + final Map changelogOffsets) throws RocksDBException { accessor.flush(columnFamily); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java index 26065bf0fe318..a2203e362da40 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.BytesSerializer; import org.apache.kafka.common.serialization.Serde; @@ -195,8 +196,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - store.flush(); + public void commit(final Map changelogOffsets) { + store.commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java index ba39d2548e8d3..41708ed7b4231 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StateStore; @@ -30,6 +31,7 @@ import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema; +import java.util.Map; import java.util.Objects; @@ -61,8 +63,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - wrapped().flush(); + public void commit(final Map changelogOffsets) { + wrapped().commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java index 7c44ca7c179cf..36262cb8e0dcf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.AbstractIterator; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -39,6 +40,7 @@ import java.nio.charset.StandardCharsets; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; import java.util.Optional; @@ -253,7 +255,8 @@ public long approximateNumEntries(final DBAccessor accessor) throws RocksDBExcep } @Override - public void flush(final DBAccessor accessor) throws RocksDBException { + public void commit(final DBAccessor accessor, + final Map changelogOffsets) throws RocksDBException { accessor.flush(oldColumnFamily, newColumnFamily); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java index 54580a26a1bde..ceb2c8cb8a86f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -55,6 +56,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED; @@ -303,9 +305,9 @@ public String name() { } @Override - public void flush() { - segmentStores.flush(); - // flushing segments store includes flushing latest value store, since they share the + public void commit(final Map changelogOffsets) { + segmentStores.commit(changelogOffsets); + // committing segments store includes committing latest value store, since they share the // same physical RocksDB instance } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java index 18086a5441b65..e9c7d144d84e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java @@ -16,9 +16,11 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.StateStoreContext; import java.util.List; +import java.util.Map; interface Segments { @@ -38,7 +40,7 @@ interface Segments { List allSegments(final boolean forward); - void flush(); + void commit(final Map changelogOffsets); void close(); } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java index 646cbf2ca3557..b347f383ec73a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; @@ -47,6 +48,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -504,9 +506,9 @@ public KeyValueIterator, byte[]> backwardAll() { } @Override - public synchronized void flush() { + public synchronized void commit(final Map changelogOffsets) { internalContext.cache().flush(cacheName); - wrapped().flush(); + wrapped().commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java index 5daa6ed1815dd..1d16b2a86b783 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes.ByteArraySerde; import org.apache.kafka.common.serialization.Serializer; @@ -38,6 +39,7 @@ import org.apache.kafka.streams.state.VersionedRecord; import java.util.List; +import java.util.Map; import java.util.Objects; /** @@ -93,8 +95,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - inner.flush(); + public void commit(final Map changelogOffsets) { + inner.commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java index eec2e2ff1d8de..90cb126d5de13 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StateStore; @@ -30,6 +31,7 @@ import org.apache.kafka.streams.state.WindowStoreIterator; import java.time.Instant; +import java.util.Map; import static org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat; import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue; @@ -161,8 +163,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - store.flush(); + public void commit(final Map changelogOffsets) { + store.commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java index adbb7568c87c5..0f0f7ee62ee5b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; @@ -27,6 +28,8 @@ import org.apache.kafka.streams.state.TimestampedBytesStore; import org.apache.kafka.streams.state.VersionedBytesStore; +import java.util.Map; + /** * A storage engine wrapper for utilities like logging, caching, and metering. */ @@ -110,8 +113,8 @@ void validateStoreOpen() { } @Override - public void flush() { - wrapped.flush(); + public void commit(final Map changelogOffsets) { + wrapped.commit(changelogOffsets); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index f6658997b7bf9..d6b94c7e7f92c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -57,6 +57,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; @@ -244,7 +245,7 @@ public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap(fi processor.process(new Record<>(sessionId, "third", now)); processor.process(new Record<>(sessionId, "third", now)); - sessionStore.flush(); + sessionStore.commit(Map.of()); if (emitFinal) { assertEquals( @@ -318,7 +319,7 @@ public void shouldHandleMultipleSessionsAndMerging(final EmitStrategy.StrategyTy processor.process(new Record<>("a", "3", GAP_MS + 1 + GAP_MS / 2)); processor.process(new Record<>("c", "3", GAP_MS + 1 + GAP_MS / 2)); - sessionStore.flush(); + sessionStore.commit(Map.of()); if (emitFinal) { assertEquals(Arrays.asList( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 74705840de983..9d163b4df719d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -413,18 +413,18 @@ public void shouldFlushStateStores() { stateManager.registerStore(store2, stateRestoreCallback, null); stateManager.flush(); - assertTrue(store1.flushed); - assertTrue(store2.flushed); + assertTrue(store1.committed); + assertTrue(store2.committed); } @Test - public void shouldThrowProcessorStateStoreExceptionIfStoreFlushFailed() { + public void shouldThrowProcessorStateStoreExceptionIfStoreCommitFailed() { stateManager.initialize(); // register the stores initializeConsumer(1, 0, t1); stateManager.registerStore(new NoOpReadOnlyStore<>(store1.name()) { @Override - public void flush() { + public void commit(final Map changelogOffsets) { throw new RuntimeException("KABOOM!"); } }, stateRestoreCallback, null); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java index 9410ca5a97896..960ca160ef7d8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java @@ -103,7 +103,6 @@ public class ProcessorContextImplTest { private static final String REGISTERED_STORE_NAME = "registered-store"; private static final TopicPartition CHANGELOG_PARTITION = new TopicPartition("store-changelog", 1); - private boolean flushExecuted = false; private boolean putExecuted = false; private boolean putWithTimestampExecuted; private boolean putIfAbsentExecuted = false; @@ -300,7 +299,6 @@ public void localKeyValueStoreShouldNotAllowInitOrClose() { final KeyValueStore keyValueStoreMock = mock(KeyValueStore.class); when(stateManager.store("LocalKeyValueStore")).thenAnswer(answer -> keyValueStoreMock(keyValueStoreMock)); - mockStateStoreFlush(keyValueStoreMock); mockKeyValueStoreOperation(keyValueStoreMock); context = buildProcessorContextImpl(streamsConfig, stateManager); @@ -314,7 +312,6 @@ public void localKeyValueStoreShouldNotAllowInitOrClose() { verifyStoreCannotBeInitializedOrClosed(store); store.flush(); - assertTrue(flushExecuted); store.put("1", 1L); assertTrue(putExecuted); @@ -344,7 +341,6 @@ public void localTimestampedKeyValueStoreShouldNotAllowInitOrClose() { when(stateManager.store("LocalTimestampedKeyValueStore")) .thenAnswer(answer -> timestampedKeyValueStoreMock(timestampedKeyValueStoreMock)); mockTimestampedKeyValueOperation(timestampedKeyValueStoreMock); - mockStateStoreFlush(timestampedKeyValueStoreMock); context = buildProcessorContextImpl(streamsConfig, stateManager); @@ -357,7 +353,6 @@ public void localTimestampedKeyValueStoreShouldNotAllowInitOrClose() { verifyStoreCannotBeInitializedOrClosed(store); store.flush(); - assertTrue(flushExecuted); store.put("1", ValueAndTimestamp.make(1L, 2L)); assertTrue(putExecuted); @@ -387,7 +382,6 @@ public void localWindowStoreShouldNotAllowInitOrClose() { final WindowStore windowStore = mock(WindowStore.class); when(stateManager.store("LocalWindowStore")).thenAnswer(answer -> windowStoreMock(windowStore)); - mockStateStoreFlush(windowStore); doAnswer(answer -> { putExecuted = true; @@ -405,7 +399,6 @@ public void localWindowStoreShouldNotAllowInitOrClose() { verifyStoreCannotBeInitializedOrClosed(store); store.flush(); - assertTrue(flushExecuted); store.put("1", 1L, 1L); assertTrue(putExecuted); @@ -427,7 +420,6 @@ public void localTimestampedWindowStoreShouldNotAllowInitOrClose() { final TimestampedWindowStore windowStore = mock(TimestampedWindowStore.class); when(stateManager.store("LocalTimestampedWindowStore")).thenAnswer(answer -> timestampedWindowStoreMock(windowStore)); - mockStateStoreFlush(windowStore); doAnswer(answer -> { putExecuted = true; @@ -448,7 +440,6 @@ public void localTimestampedWindowStoreShouldNotAllowInitOrClose() { verifyStoreCannotBeInitializedOrClosed(store); store.flush(); - assertTrue(flushExecuted); store.put("1", ValueAndTimestamp.make(1L, 1L), 1L); assertTrue(putExecuted); @@ -473,7 +464,6 @@ public void localSessionStoreShouldNotAllowInitOrClose() { final SessionStore sessionStore = mock(SessionStore.class); when(stateManager.store("LocalSessionStore")).thenAnswer(answer -> sessionStoreMock(sessionStore)); - mockStateStoreFlush(sessionStore); doAnswer(answer -> { putExecuted = true; @@ -496,7 +486,6 @@ public void localSessionStoreShouldNotAllowInitOrClose() { verifyStoreCannotBeInitializedOrClosed(store); store.flush(); - assertTrue(flushExecuted); store.remove(null); assertTrue(removeExecuted); @@ -907,13 +896,6 @@ private void initStateStoreMock(final StateStore stateStore) { when(stateStore.isOpen()).thenReturn(true); } - private void mockStateStoreFlush(final StateStore stateStore) { - doAnswer(answer -> { - flushExecuted = true; - return null; - }).when(stateStore).flush(); - } - @SuppressWarnings("rawtypes") private void doTest(final String name, final Consumer checker) { final Processor processor = new Processor<>() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 8c28ae6a33dcb..93bd701a29e7e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -568,11 +568,11 @@ public void shouldFlushCheckpointAndClose() throws IOException { } finally { stateMgr.flush(); - assertTrue(persistentStore.flushed); - assertTrue(nonPersistentStore.flushed); + assertTrue(persistentStore.committed); + assertTrue(nonPersistentStore.committed); // make sure that flush is called in the proper order - assertThat(persistentStore.getLastFlushCount(), Matchers.lessThan(nonPersistentStore.getLastFlushCount())); + assertThat(persistentStore.getLastCommitCount(), Matchers.lessThan(nonPersistentStore.getLastCommitCount())); stateMgr.updateChangelogOffsets(ackedOffsets); stateMgr.checkpoint(); @@ -710,12 +710,12 @@ public void shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeen } @Test - public void shouldThrowProcessorStateExceptionOnFlushIfStoreThrowsAnException() { + public void shouldThrowProcessorStateExceptionOnCommitIfStoreThrowsAnException() { final RuntimeException exception = new RuntimeException("KABOOM!"); final ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE); final MockKeyValueStore stateStore = new MockKeyValueStore(persistentStoreName, true) { @Override - public void flush() { + public void commit(final Map changelogOffsets) { throw exception; } }; @@ -726,12 +726,12 @@ public void flush() { } @Test - public void shouldPreserveStreamsExceptionOnFlushIfStoreThrows() { + public void shouldPreserveStreamsExceptionOnCommitIfStoreThrows() { final StreamsException exception = new StreamsException("KABOOM!"); final ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE); final MockKeyValueStore stateStore = new MockKeyValueStore(persistentStoreName, true) { @Override - public void flush() { + public void commit(final Map changelogOffsets) { throw exception; } }; @@ -774,12 +774,12 @@ public void close() { } @Test - public void shouldThrowProcessorStateExceptionOnFlushIfStoreThrowsAFailedProcessingException() { + public void shouldThrowProcessorStateExceptionOnCommitIfStoreThrowsAFailedProcessingException() { final RuntimeException exception = new RuntimeException("KABOOM!"); final ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE); final MockKeyValueStore stateStore = new MockKeyValueStore(persistentStoreName, true) { @Override - public void flush() { + public void commit(final Map changelogOffsets) { throw new FailedProcessingException("processor", exception); } }; @@ -909,19 +909,19 @@ public void shouldThrowIfRestoreCallbackThrows() { } @Test - public void shouldFlushGoodStoresEvenSomeThrowsException() { - final AtomicBoolean flushedStore = new AtomicBoolean(false); + public void shouldCommitGoodStoresEvenSomeThrowsException() { + final AtomicBoolean committedStore = new AtomicBoolean(false); final MockKeyValueStore stateStore1 = new MockKeyValueStore(persistentStoreName, true) { @Override - public void flush() { + public void commit(final Map changelogOffsets) { throw new RuntimeException("KABOOM!"); } }; final MockKeyValueStore stateStore2 = new MockKeyValueStore(persistentStoreTwoName, true) { @Override - public void flush() { - flushedStore.set(true); + public void commit(final Map changelogOffsets) { + committedStore.set(true); } }; final ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE); @@ -933,7 +933,7 @@ public void flush() { stateManager.flush(); } catch (final ProcessorStateException expected) { /* ignore */ } - assertTrue(flushedStore.get()); + assertTrue(committedStore.get()); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 78c6dedcbf45c..51cbe8f940da2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -64,7 +64,7 @@ /** * A component that provides a {@link #context() StateStoreContext} that can be supplied to a {@link KeyValueStore} so that - * all entries written to the Kafka topic by the store during {@link KeyValueStore#flush()} are captured for testing purposes. + * all entries written to the Kafka topic by the store during {@link KeyValueStore#commit(Map)} are captured for testing purposes. * This class simplifies testing of various {@link KeyValueStore} instances, especially those that use * {@link MeteredKeyValueStore} to monitor and write its entries to the Kafka topic. * @@ -93,29 +93,29 @@ * assertNull(store.get(3)); * store.delete(5); * - * // Flush the store and verify all current entries were properly flushed ... - * store.flush(); - * assertEquals("zero", driver.flushedEntryStored(0)); - * assertEquals("one", driver.flushedEntryStored(1)); - * assertEquals("two", driver.flushedEntryStored(2)); - * assertEquals("four", driver.flushedEntryStored(4)); - * assertNull(driver.flushedEntryStored(5)); + * // Commit the store and verify all current entries were properly committed ... + * store.commit(Map.of()); + * assertEquals("zero", driver.committedEntryStored(0)); + * assertEquals("one", driver.committedEntryStored(1)); + * assertEquals("two", driver.committedEntryStored(2)); + * assertEquals("four", driver.committedEntryStored(4)); + * assertNull(driver.committedEntryStored(5)); * - * assertEquals(false, driver.flushedEntryRemoved(0)); - * assertEquals(false, driver.flushedEntryRemoved(1)); - * assertEquals(false, driver.flushedEntryRemoved(2)); - * assertEquals(false, driver.flushedEntryRemoved(4)); - * assertEquals(true, driver.flushedEntryRemoved(5)); + * assertEquals(false, driver.committedEntryRemoved(0)); + * assertEquals(false, driver.committedEntryRemoved(1)); + * assertEquals(false, driver.committedEntryRemoved(2)); + * assertEquals(false, driver.committedEntryRemoved(4)); + * assertEquals(true, driver.committedEntryRemoved(5)); * * * *

Restoring a store

* This component can be used to test whether a {@link KeyValueStore} implementation properly * {@link StateStoreContext#register(StateStore, StateRestoreCallback) registers itself} with the {@link StateStoreContext}, so that - * the persisted contents of a store are properly restored from the flushed entries when the store instance is started. + * the persisted contents of a store are properly restored from the committed entries when the store instance is started. *

* To do this, create an instance of this driver component, {@link #addEntryToRestoreLog(Object, Object) add entries} that will be - * passed to the store upon creation (simulating the entries that were previously flushed to the topic), and then create the store + * passed to the store upon creation (simulating the entries that were previously committed to the topic), and then create the store * using this driver's {@link #context() ProcessorContext}: * *

@@ -189,8 +189,8 @@ public static  KeyValueStoreTestDriver create(final Serializer ke
         return new KeyValueStoreTestDriver<>(serdes);
     }
 
-    private final Map flushedEntries = new HashMap<>();
-    private final Set flushedRemovals = new HashSet<>();
+    private final Map committedEntries = new HashMap<>();
+    private final Set committedRemovals = new HashSet<>();
     private final List> restorableEntries = new LinkedList<>();
 
     private final InternalMockProcessorContext context;
@@ -243,7 +243,7 @@ public  void send(final String topic,
                 final K keyTest = serdes.keyFrom(keyBytes);
                 final V valueTest = serdes.valueFrom(valueBytes);
 
-                recordFlushed(keyTest, valueTest);
+                recordCommitted(keyTest, valueTest);
             }
 
             @Override
@@ -286,15 +286,15 @@ public Map appConfigsWithPrefix(final String prefix) {
         };
     }
 
-    private void recordFlushed(final K key, final V value) {
+    private void recordCommitted(final K key, final V value) {
         if (value == null) {
             // This is a removal ...
-            flushedRemovals.add(key);
-            flushedEntries.remove(key);
+            committedRemovals.add(key);
+            committedEntries.remove(key);
         } else {
             // This is a normal add
-            flushedEntries.put(key, value);
-            flushedRemovals.remove(key);
+            committedEntries.put(key, value);
+            committedRemovals.remove(key);
         }
     }
 
@@ -343,8 +343,8 @@ public void addEntryToRestoreLog(final K key, final V value) {
 
     /**
      * Get the context that should be supplied to a {@link KeyValueStore}'s constructor. This context records any messages
-     * written by the store to the Kafka topic, making them available via the {@link #flushedEntryStored(Object)} and
-     * {@link #flushedEntryRemoved(Object)} methods.
+     * written by the store to the Kafka topic, making them available via the {@link #committedEntryStored(Object)} and
+     * {@link #committedEntryRemoved(Object)} methods.
      * 

* If the {@link KeyValueStore}'s are to be restored upon its startup, be sure to {@link #addEntryToRestoreLog(Object, Object) * add the restore entries} before creating the store with the {@link StateStoreContext} returned by this method. @@ -395,47 +395,47 @@ public int sizeOf(final KeyValueStore store) { } /** - * Retrieve the value that the store {@link KeyValueStore#flush() flushed} with the given key. + * Retrieve the value that the store {@link KeyValueStore#commit(Map) committed} with the given key. * * @param key the key - * @return the value that was flushed with the key, or {@code null} if no such key was flushed or if the entry with this - * key was removed upon flush + * @return the value that was committed with the key, or {@code null} if no such key was committed or if the entry with this + * key was removed upon commit */ - public V flushedEntryStored(final K key) { - return flushedEntries.get(key); + public V committedEntryStored(final K key) { + return committedEntries.get(key); } /** - * Determine whether the store {@link KeyValueStore#flush() flushed} the removal of the given key. + * Determine whether the store {@link KeyValueStore#commit(Map) committed} the removal of the given key. * * @param key the key - * @return {@code true} if the entry with the given key was removed when flushed, or {@code false} if the entry was not - * removed when last flushed + * @return {@code true} if the entry with the given key was removed when committed, or {@code false} if the entry was not + * removed when last committed */ - public boolean flushedEntryRemoved(final K key) { - return flushedRemovals.contains(key); + public boolean committedEntryRemoved(final K key) { + return committedRemovals.contains(key); } /** * Return number of removed entry */ - public int numFlushedEntryStored() { - return flushedEntries.size(); + public int numCommittedEntryStored() { + return committedEntries.size(); } /** * Return number of removed entry */ - public int numFlushedEntryRemoved() { - return flushedRemovals.size(); + public int numCommittedEntryRemoved() { + return committedRemovals.size(); } /** - * Remove all {@link #flushedEntryStored(Object) flushed entries}, {@link #flushedEntryRemoved(Object) flushed removals}, + * Remove all {@link #committedEntryStored(Object) committed entries}, {@link #committedEntryRemoved(Object) committed removals}, */ public void clear() { restorableEntries.clear(); - flushedEntries.clear(); - flushedRemovals.clear(); + committedEntries.clear(); + committedRemovals.clear(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java index 5de4e65bd5615..c022da839f3f4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java @@ -16,12 +16,14 @@ */ package org.apache.kafka.streams.state; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.query.Position; import java.time.Instant; +import java.util.Map; import java.util.NoSuchElementException; public class NoOpWindowStore implements ReadOnlyWindowStore, StateStore { @@ -59,7 +61,7 @@ public String name() { public void init(final StateStoreContext stateStoreContext, final StateStore root) {} @Override - public void flush() { + public void commit(final Map changelogOffsets) { } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index 1175add263fec..3727add492d13 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -160,24 +160,24 @@ public void testPutGetRange() { assertNull(store.get(3)); assertEquals("four", store.get(4)); assertEquals("five", store.get(5)); - // Flush now so that for caching store, we will not skip the deletion following an put - store.flush(); + // Commit now so that for caching store, we will not skip the deletion following an put + store.commit(Map.of()); store.delete(5); assertEquals(4, driver.sizeOf(store)); - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - assertEquals("zero", driver.flushedEntryStored(0)); - assertEquals("one", driver.flushedEntryStored(1)); - assertEquals("two", driver.flushedEntryStored(2)); - assertEquals("four", driver.flushedEntryStored(4)); - assertNull(driver.flushedEntryStored(5)); + // Commit the store and verify all current entries were properly committed ... + store.commit(Map.of()); + assertEquals("zero", driver.committedEntryStored(0)); + assertEquals("one", driver.committedEntryStored(1)); + assertEquals("two", driver.committedEntryStored(2)); + assertEquals("four", driver.committedEntryStored(4)); + assertNull(driver.committedEntryStored(5)); - assertFalse(driver.flushedEntryRemoved(0)); - assertFalse(driver.flushedEntryRemoved(1)); - assertFalse(driver.flushedEntryRemoved(2)); - assertFalse(driver.flushedEntryRemoved(4)); - assertTrue(driver.flushedEntryRemoved(5)); + assertFalse(driver.committedEntryRemoved(0)); + assertFalse(driver.committedEntryRemoved(1)); + assertFalse(driver.committedEntryRemoved(2)); + assertFalse(driver.committedEntryRemoved(4)); + assertTrue(driver.committedEntryRemoved(5)); final HashMap expectedContents = new HashMap<>(); expectedContents.put(2, "two"); @@ -208,24 +208,24 @@ public void testPutGetReverseRange() { assertNull(store.get(3)); assertEquals("four", store.get(4)); assertEquals("five", store.get(5)); - // Flush now so that for caching store, we will not skip the deletion following an put - store.flush(); + // Commit now so that for caching store, we will not skip the deletion following an put + store.commit(Map.of()); store.delete(5); assertEquals(4, driver.sizeOf(store)); - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - assertEquals("zero", driver.flushedEntryStored(0)); - assertEquals("one", driver.flushedEntryStored(1)); - assertEquals("two", driver.flushedEntryStored(2)); - assertEquals("four", driver.flushedEntryStored(4)); - assertNull(driver.flushedEntryStored(5)); + // Commit the store and verify all current entries were properly committed ... + store.commit(Map.of()); + assertEquals("zero", driver.committedEntryStored(0)); + assertEquals("one", driver.committedEntryStored(1)); + assertEquals("two", driver.committedEntryStored(2)); + assertEquals("four", driver.committedEntryStored(4)); + assertNull(driver.committedEntryStored(5)); - assertFalse(driver.flushedEntryRemoved(0)); - assertFalse(driver.flushedEntryRemoved(1)); - assertFalse(driver.flushedEntryRemoved(2)); - assertFalse(driver.flushedEntryRemoved(4)); - assertTrue(driver.flushedEntryRemoved(5)); + assertFalse(driver.committedEntryRemoved(0)); + assertFalse(driver.committedEntryRemoved(1)); + assertFalse(driver.committedEntryRemoved(2)); + assertFalse(driver.committedEntryRemoved(4)); + assertTrue(driver.committedEntryRemoved(5)); final HashMap expectedContents = new HashMap<>(); expectedContents.put(2, "two"); @@ -256,22 +256,22 @@ public void testPutGetWithDefaultSerdes() { assertNull(store.get(3)); assertEquals("four", store.get(4)); assertEquals("five", store.get(5)); - store.flush(); + store.commit(Map.of()); store.delete(5); - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - assertEquals("zero", driver.flushedEntryStored(0)); - assertEquals("one", driver.flushedEntryStored(1)); - assertEquals("two", driver.flushedEntryStored(2)); - assertEquals("four", driver.flushedEntryStored(4)); - assertNull(driver.flushedEntryStored(5)); + // Commit the store and verify all current entries were properly committed ... + store.commit(Map.of()); + assertEquals("zero", driver.committedEntryStored(0)); + assertEquals("one", driver.committedEntryStored(1)); + assertEquals("two", driver.committedEntryStored(2)); + assertEquals("four", driver.committedEntryStored(4)); + assertNull(driver.committedEntryStored(5)); - assertFalse(driver.flushedEntryRemoved(0)); - assertFalse(driver.flushedEntryRemoved(1)); - assertFalse(driver.flushedEntryRemoved(2)); - assertFalse(driver.flushedEntryRemoved(4)); - assertTrue(driver.flushedEntryRemoved(5)); + assertFalse(driver.committedEntryRemoved(0)); + assertFalse(driver.committedEntryRemoved(1)); + assertFalse(driver.committedEntryRemoved(2)); + assertFalse(driver.committedEntryRemoved(4)); + assertTrue(driver.committedEntryRemoved(5)); } @Test @@ -332,17 +332,17 @@ public void testPutIfAbsent() { assertNull(store.get(3)); assertEquals("four", store.get(4)); - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - assertEquals("zero", driver.flushedEntryStored(0)); - assertEquals("one", driver.flushedEntryStored(1)); - assertEquals("two", driver.flushedEntryStored(2)); - assertEquals("four", driver.flushedEntryStored(4)); + // Commit the store and verify all current entries were properly committed ... + store.commit(Map.of()); + assertEquals("zero", driver.committedEntryStored(0)); + assertEquals("one", driver.committedEntryStored(1)); + assertEquals("two", driver.committedEntryStored(2)); + assertEquals("four", driver.committedEntryStored(4)); - assertFalse(driver.flushedEntryRemoved(0)); - assertFalse(driver.flushedEntryRemoved(1)); - assertFalse(driver.flushedEntryRemoved(2)); - assertFalse(driver.flushedEntryRemoved(4)); + assertFalse(driver.committedEntryRemoved(0)); + assertFalse(driver.committedEntryRemoved(1)); + assertFalse(driver.committedEntryRemoved(2)); + assertFalse(driver.committedEntryRemoved(4)); } @Test @@ -486,7 +486,7 @@ public void testSize() { store.put(2, "two"); store.put(4, "four"); store.put(5, "five"); - store.flush(); + store.commit(Map.of()); assertEquals(5, store.approximateNumEntries()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java index 66a896598f131..96995d047bac8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java @@ -34,6 +34,7 @@ import java.io.File; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import static java.time.Duration.ofMillis; @@ -439,7 +440,7 @@ public void testRolling() { ofEpochMilli(startTime + increment * 8 + WINDOW_SIZE)))); // check segment directories - windowStore.flush(); + windowStore.commit(Map.of()); assertEquals( Set.of( segments.segmentName(4), @@ -596,7 +597,7 @@ public void testRestore() throws Exception { windowStore.put(6, "six", startTime + increment * 6); windowStore.put(7, "seven", startTime + increment * 7); windowStore.put(8, "eight", startTime + increment * 8); - windowStore.flush(); + windowStore.commit(Map.of()); windowStore.close(); @@ -750,7 +751,7 @@ public void testRestore() throws Exception { ofEpochMilli(startTime + increment * 8 + WINDOW_SIZE)))); // check segment directories - windowStore.flush(); + windowStore.commit(Map.of()); assertEquals( Set.of( segments.segmentName(4L), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java index 1486cca8c7e0d..32c03005e1cb2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java @@ -235,8 +235,8 @@ public void testRangeAndSinglePointFetch() { ofEpochMilli(defaultStartTime + 12L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 12L + WINDOW_SIZE)))); - // Flush the store and verify all current entries were properly flushed ... - windowStore.flush(); + // Commit the store and verify all current entries were properly committed ... + windowStore.commit(Map.of()); final List> changeLog = new ArrayList<>(); for (final ProducerRecord record : recordCollector.collected()) { @@ -627,8 +627,8 @@ public void testPutAndFetchBefore() { Set.of(), valuesToSetAndCloseIterator(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 13L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 13L)))); - // Flush the store and verify all current entries were properly flushed ... - windowStore.flush(); + // Commit the store and verify all current entries were properly committed ... + windowStore.commit(Map.of()); final List> changeLog = new ArrayList<>(); for (final ProducerRecord record : recordCollector.collected()) { @@ -737,8 +737,8 @@ public void testPutAndFetchAfter() { valuesToSetAndCloseIterator(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 12L), ofEpochMilli(defaultStartTime + 12L + WINDOW_SIZE)))); - // Flush the store and verify all current entries were properly flushed ... - windowStore.flush(); + // Commit the store and verify all current entries were properly committed ... + windowStore.commit(Map.of()); final List> changeLog = new ArrayList<>(); for (final ProducerRecord record : recordCollector.collected()) { @@ -806,8 +806,8 @@ public void testPutSameKeyTimestamp() { ofEpochMilli(defaultStartTime + 4L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 4L + WINDOW_SIZE)))); - // Flush the store and verify all current entries were properly flushed ... - windowStore.flush(); + // Commit the store and verify all current entries were properly committed ... + windowStore.commit(Map.of()); final List> changeLog = new ArrayList<>(); for (final ProducerRecord record : recordCollector.collected()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java index e2e37ad1e4ade..4a0579b8ef03d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java @@ -48,6 +48,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; @@ -124,7 +125,7 @@ public void shouldSetFlushListener() { @Test public void shouldAvoidFlushingDeletionsWithoutDirtyKeys() { final int added = addItemsToCache(); - // all dirty entries should have been flushed + // all dirty entries should have been committed assertEquals(added, underlyingStore.approximateNumEntries()); assertEquals(added, cacheFlushListener.forwarded.size()); @@ -133,7 +134,7 @@ public void shouldAvoidFlushingDeletionsWithoutDirtyKeys() { assertEquals(added, cacheFlushListener.forwarded.size()); store.put(bytesKey("key"), null); - store.flush(); + store.commit(Map.of()); assertEquals(added, underlyingStore.approximateNumEntries()); assertEquals(added, cacheFlushListener.forwarded.size()); } @@ -225,7 +226,7 @@ private void shouldMatchPositionAfterPut() { ); assertEquals(Position.emptyPosition(), underlyingStore.getPosition()); - store.flush(); + store.commit(Map.of()); assertEquals( Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))), @@ -263,7 +264,7 @@ public void shouldForwardDirtyItemToListenerWhenEvicted() { @Test public void shouldForwardDirtyItemsWhenFlushCalled() { store.put(bytesKey("1"), bytesValue("a")); - store.flush(); + store.commit(Map.of()); assertEquals("a", cacheFlushListener.forwarded.get("1").newValue); assertNull(cacheFlushListener.forwarded.get("1").oldValue); } @@ -272,23 +273,23 @@ public void shouldForwardDirtyItemsWhenFlushCalled() { public void shouldForwardOldValuesWhenEnabled() { store.setFlushListener(cacheFlushListener, true); store.put(bytesKey("1"), bytesValue("a")); - store.flush(); + store.commit(Map.of()); assertEquals("a", cacheFlushListener.forwarded.get("1").newValue); assertNull(cacheFlushListener.forwarded.get("1").oldValue); store.put(bytesKey("1"), bytesValue("b")); store.put(bytesKey("1"), bytesValue("c")); - store.flush(); + store.commit(Map.of()); assertEquals("c", cacheFlushListener.forwarded.get("1").newValue); assertEquals("a", cacheFlushListener.forwarded.get("1").oldValue); store.put(bytesKey("1"), null); - store.flush(); + store.commit(Map.of()); assertNull(cacheFlushListener.forwarded.get("1").newValue); assertEquals("c", cacheFlushListener.forwarded.get("1").oldValue); cacheFlushListener.forwarded.clear(); store.put(bytesKey("1"), bytesValue("a")); store.put(bytesKey("1"), bytesValue("b")); store.put(bytesKey("1"), null); - store.flush(); + store.commit(Map.of()); assertNull(cacheFlushListener.forwarded.get("1")); cacheFlushListener.forwarded.clear(); } @@ -296,22 +297,22 @@ public void shouldForwardOldValuesWhenEnabled() { @Test public void shouldNotForwardOldValuesWhenDisabled() { store.put(bytesKey("1"), bytesValue("a")); - store.flush(); + store.commit(Map.of()); assertEquals("a", cacheFlushListener.forwarded.get("1").newValue); assertNull(cacheFlushListener.forwarded.get("1").oldValue); store.put(bytesKey("1"), bytesValue("b")); - store.flush(); + store.commit(Map.of()); assertEquals("b", cacheFlushListener.forwarded.get("1").newValue); assertNull(cacheFlushListener.forwarded.get("1").oldValue); store.put(bytesKey("1"), null); - store.flush(); + store.commit(Map.of()); assertNull(cacheFlushListener.forwarded.get("1").newValue); assertNull(cacheFlushListener.forwarded.get("1").oldValue); cacheFlushListener.forwarded.clear(); store.put(bytesKey("1"), bytesValue("a")); store.put(bytesKey("1"), bytesValue("b")); store.put(bytesKey("1"), null); - store.flush(); + store.commit(Map.of()); assertNull(cacheFlushListener.forwarded.get("1")); cacheFlushListener.forwarded.clear(); } @@ -477,7 +478,7 @@ public void shouldDeleteItemsFromCache() { @Test public void shouldNotShowItemsDeletedFromCacheButFlushedToStoreBeforeDelete() { store.put(bytesKey("a"), bytesValue("a")); - store.flush(); + store.commit(Map.of()); store.delete(bytesKey("a")); assertNull(store.get(bytesKey("a"))); try (final KeyValueIterator iterator = store.range(bytesKey("a"), bytesKey("b"))) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java index af4dbf3a446c1..b68374f878631 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java @@ -54,6 +54,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Random; import static java.util.Arrays.asList; @@ -166,7 +167,7 @@ private void shouldMatchPositionAfterPut() { assertEquals(Position.emptyPosition(), cachingStore.getPosition()); assertEquals(Position.emptyPosition(), underlyingStore.getPosition()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))), @@ -472,7 +473,7 @@ public void shouldFetchCorrectlyAcrossSegments() { cachingStore.put(a1, "1".getBytes()); cachingStore.put(a2, "2".getBytes()); cachingStore.put(a3, "3".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.put(a4, "4".getBytes()); cachingStore.put(a5, "5".getBytes()); cachingStore.put(a6, "6".getBytes()); @@ -499,7 +500,7 @@ public void shouldBackwardFetchCorrectlyAcrossSegments() { cachingStore.put(a1, "1".getBytes()); cachingStore.put(a2, "2".getBytes()); cachingStore.put(a3, "3".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.put(a4, "4".getBytes()); cachingStore.put(a5, "5".getBytes()); cachingStore.put(a6, "6".getBytes()); @@ -580,7 +581,7 @@ public void shouldForwardChangedValuesDuringFlush() { cachingStore.setFlushListener(flushListener, true); cachingStore.put(b, "1".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.singletonList( @@ -593,7 +594,7 @@ public void shouldForwardChangedValuesDuringFlush() { flushListener.forwarded.clear(); cachingStore.put(a, "1".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.singletonList( @@ -606,7 +607,7 @@ public void shouldForwardChangedValuesDuringFlush() { flushListener.forwarded.clear(); cachingStore.put(a, "2".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.singletonList( @@ -619,7 +620,7 @@ public void shouldForwardChangedValuesDuringFlush() { flushListener.forwarded.clear(); cachingStore.remove(a); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.singletonList( @@ -634,7 +635,7 @@ public void shouldForwardChangedValuesDuringFlush() { cachingStore.put(a, "1".getBytes()); cachingStore.put(a, "2".getBytes()); cachingStore.remove(a); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.emptyList(), @@ -654,13 +655,13 @@ public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() cachingStore.setFlushListener(flushListener, false); cachingStore.put(a, "1".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.put(a, "2".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.remove(a); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( asList(new KeyValueTimestamp<>( @@ -682,7 +683,7 @@ public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() cachingStore.put(a, "1".getBytes()); cachingStore.put(a, "2".getBytes()); cachingStore.remove(a); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.emptyList(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java index 159503b920a7e..cbbd7ca383e6a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java @@ -54,6 +54,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Random; import static java.util.Arrays.asList; @@ -165,7 +166,7 @@ private void shouldMatchPositionAfterPut() { assertEquals(Position.emptyPosition(), cachingStore.getPosition()); assertEquals(Position.emptyPosition(), underlyingStore.getPosition()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))), @@ -484,7 +485,7 @@ public void shouldFetchCorrectlyAcrossSegments() { cachingStore.put(a1, "1".getBytes()); cachingStore.put(a2, "2".getBytes()); cachingStore.put(a3, "3".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.put(a4, "4".getBytes()); cachingStore.put(a5, "5".getBytes()); cachingStore.put(a6, "6".getBytes()); @@ -511,7 +512,7 @@ public void shouldBackwardFetchCorrectlyAcrossSegments() { cachingStore.put(a1, "1".getBytes()); cachingStore.put(a2, "2".getBytes()); cachingStore.put(a3, "3".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.put(a4, "4".getBytes()); cachingStore.put(a5, "5".getBytes()); cachingStore.put(a6, "6".getBytes()); @@ -593,7 +594,7 @@ public void shouldForwardChangedValuesDuringFlush() { cachingStore.setFlushListener(flushListener, true); cachingStore.put(b, "1".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.singletonList( @@ -608,7 +609,7 @@ public void shouldForwardChangedValuesDuringFlush() { flushListener.forwarded.clear(); cachingStore.put(a, "1".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.singletonList( @@ -623,7 +624,7 @@ public void shouldForwardChangedValuesDuringFlush() { flushListener.forwarded.clear(); cachingStore.put(a, "2".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.singletonList( @@ -638,7 +639,7 @@ public void shouldForwardChangedValuesDuringFlush() { flushListener.forwarded.clear(); cachingStore.remove(a); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.singletonList( @@ -655,7 +656,7 @@ public void shouldForwardChangedValuesDuringFlush() { cachingStore.put(a, "1".getBytes()); cachingStore.put(a, "2".getBytes()); cachingStore.remove(a); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.emptyList(), @@ -675,13 +676,13 @@ public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() cachingStore.setFlushListener(flushListener, false); cachingStore.put(a, "1".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.put(a, "2".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.remove(a); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( asList( @@ -708,7 +709,7 @@ public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() cachingStore.put(a, "1".getBytes()); cachingStore.put(a, "2".getBytes()); cachingStore.remove(a); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.emptyList(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java index 4548da5bd1134..29435b12f20b8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java @@ -64,6 +64,7 @@ import java.time.Instant; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.UUID; @@ -286,7 +287,7 @@ private void shouldMatchPositionAfterPut() { assertEquals(Position.emptyPosition(), cachingStore.getPosition()); assertEquals(Position.emptyPosition(), underlyingStore.getPosition()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))), @@ -630,7 +631,7 @@ public void shouldForwardDirtyItemsWhenFlushCalled() { final Windowed windowedKey = new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("a", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); } @@ -648,23 +649,23 @@ public void shouldForwardOldValuesWhenEnabled() { new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cacheListener.forwarded.clear(); cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue); assertEquals("b", cacheListener.forwarded.get(windowedKey).oldValue); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey).newValue); assertEquals("c", cacheListener.forwarded.get(windowedKey).oldValue); cacheListener.forwarded.clear(); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey)); cacheListener.forwarded.clear(); } @@ -675,22 +676,22 @@ public void shouldForwardOldValuesWhenDisabled() { new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cacheListener.forwarded.clear(); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey)); cacheListener.forwarded.clear(); } @@ -704,7 +705,7 @@ public void shouldForwardDirtyItemToListenerWhenEvicted() { @Test public void shouldTakeValueFromCacheIfSameTimestampFlushedToRocks() { cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); try (final WindowStoreIterator fetch = diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java index 9a23e657600cf..2e7f8519fabe4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java @@ -35,6 +35,8 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; +import java.util.Map; + import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.mockito.Mockito.times; @@ -175,10 +177,10 @@ public void shouldDelegateToUnderlyingStoreWhenBackwardFindingSessionRange() { } @Test - public void shouldFlushUnderlyingStore() { - store.flush(); + public void shouldCommitUnderlyingStore() { + store.commit(Map.of()); - verify(inner).flush(); + verify(inner).commit(Map.of()); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java index eb57bee38bf04..e8b0bb31b3811 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java @@ -39,6 +39,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.UUID; @@ -141,7 +142,7 @@ public void shouldReturnKeysWithGivenPrefix() { stringSerializer.serialize(null, "f"))); byteStore.putAll(entries); - byteStore.flush(); + byteStore.commit(Map.of()); final List valuesWithPrefix = new ArrayList<>(); int numberOfKeysReturned = 0; @@ -176,7 +177,7 @@ public void shouldReturnKeysWithGivenPrefixExcludingNextKeyLargestKey() { stringSerializer.serialize(null, "f"))); byteStore.putAll(entries); - byteStore.flush(); + byteStore.commit(Map.of()); try (final KeyValueIterator keysWithPrefixAsabcd = byteStore.prefixScan("abcd", stringSerializer)) { int numberOfKeysReturned = 0; @@ -206,7 +207,7 @@ public void shouldReturnUUIDsWithStringPrefix() { stringSerializer.serialize(null, "b"))); byteStore.putAll(entries); - byteStore.flush(); + byteStore.commit(Map.of()); final List valuesWithPrefix = new ArrayList<>(); int numberOfKeysReturned = 0; @@ -236,7 +237,7 @@ public void shouldReturnNoKeys() { new Bytes(stringSerializer.serialize(null, "c")), stringSerializer.serialize(null, "e"))); byteStore.putAll(entries); - byteStore.flush(); + byteStore.commit(Map.of()); int numberOfKeysReturned = 0; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java index eb23f80db729e..2e866bde22c65 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java @@ -27,6 +27,7 @@ import java.util.Arrays; import java.util.List; +import java.util.Map; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -102,35 +103,35 @@ public void testEvict() { assertEquals(10, driver.sizeOf(store)); store.put(10, "ten"); - store.flush(); + store.commit(Map.of()); assertEquals(10, driver.sizeOf(store)); - assertTrue(driver.flushedEntryRemoved(0)); - assertEquals(1, driver.numFlushedEntryRemoved()); + assertTrue(driver.committedEntryRemoved(0)); + assertEquals(1, driver.numCommittedEntryRemoved()); store.delete(1); - store.flush(); + store.commit(Map.of()); assertEquals(9, driver.sizeOf(store)); - assertTrue(driver.flushedEntryRemoved(0)); - assertTrue(driver.flushedEntryRemoved(1)); - assertEquals(2, driver.numFlushedEntryRemoved()); + assertTrue(driver.committedEntryRemoved(0)); + assertTrue(driver.committedEntryRemoved(1)); + assertEquals(2, driver.numCommittedEntryRemoved()); store.put(11, "eleven"); - store.flush(); + store.commit(Map.of()); assertEquals(10, driver.sizeOf(store)); - assertEquals(2, driver.numFlushedEntryRemoved()); + assertEquals(2, driver.numCommittedEntryRemoved()); store.put(2, "two-again"); - store.flush(); + store.commit(Map.of()); assertEquals(10, driver.sizeOf(store)); - assertEquals(2, driver.numFlushedEntryRemoved()); + assertEquals(2, driver.numCommittedEntryRemoved()); store.put(12, "twelve"); - store.flush(); + store.commit(Map.of()); assertEquals(10, driver.sizeOf(store)); - assertTrue(driver.flushedEntryRemoved(0)); - assertTrue(driver.flushedEntryRemoved(1)); - assertTrue(driver.flushedEntryRemoved(3)); - assertEquals(3, driver.numFlushedEntryRemoved()); + assertTrue(driver.committedEntryRemoved(0)); + assertTrue(driver.committedEntryRemoved(1)); + assertTrue(driver.committedEntryRemoved(3)); + assertEquals(3, driver.numCommittedEntryRemoved()); } @Test @@ -155,8 +156,8 @@ public void testRestoreEvict() { store = createKeyValueStore(driver.context()); context.restore(store.name(), driver.restoredEntries()); // Verify that the store's changelog does not get more appends ... - assertEquals(0, driver.numFlushedEntryStored()); - assertEquals(0, driver.numFlushedEntryRemoved()); + assertEquals(0, driver.numCommittedEntryStored()); + assertEquals(0, driver.numCommittedEntryRemoved()); // and there are no other entries ... assertEquals(10, driver.sizeOf(store)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java index f7018a7fae348..4d4eb9f33c265 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java @@ -37,6 +37,8 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; +import java.util.Map; + import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.nullValue; @@ -208,21 +210,21 @@ public void shouldInitVersionedStore() { } @Test - public void shouldFlushTimestampedStore() { + public void shouldCommitTimestampedStore() { givenWrapperWithTimestampedStore(); - wrapper.flush(); + wrapper.commit(Map.of()); - verify(timestampedStore).flush(); + verify(timestampedStore).commit(Map.of()); } @Test - public void shouldFlushVersionedStore() { + public void shouldCommitVersionedStore() { givenWrapperWithVersionedStore(); - wrapper.flush(); + wrapper.commit(Map.of()); - verify(versionedStore).flush(); + verify(versionedStore).commit(Map.of()); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index 4aba2784971bd..b43fa03b985b3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -317,12 +317,12 @@ public void shouldGetAllFromInnerStoreAndRecordAllMetric() { } @Test - public void shouldFlushInnerWhenFlushTimeRecords() { + public void shouldFlushInnerWhenCommitTimeRecords() { setUp(); - doNothing().when(inner).flush(); + doNothing().when(inner).commit(Map.of()); init(); - metered.flush(); + metered.commit(Map.of()); final KafkaMetric metric = metric("flush-rate"); assertTrue((Double) metric.metricValue() > 0); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java index 9b5d33db96642..0a0bf8c261df0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java @@ -357,12 +357,12 @@ public void shouldGetAllFromInnerStoreAndRecordAllMetric() { } @Test - public void shouldFlushInnerWhenFlushTimeRecords() { + public void shouldCommitInnerWhenCommitTimeRecords() { setUp(); - doNothing().when(inner).flush(); + doNothing().when(inner).commit(Map.of()); init(); - metered.flush(); + metered.commit(Map.of()); final KafkaMetric metric = metric("flush-rate"); assertTrue((Double) metric.metricValue() > 0); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java index 56fe45630d17d..14638efa95597 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java @@ -224,10 +224,10 @@ public void shouldDelegateAndRecordMetricsOnGetWithTimestamp() { } @Test - public void shouldDelegateAndRecordMetricsOnFlush() { - store.flush(); + public void shouldDelegateAndRecordMetricsOnCommit() { + store.commit(Map.of()); - verify(inner).flush(); + verify(inner).commit(Map.of()); assertThat((Double) getMetric("flush-rate").metricValue(), greaterThan(0.0)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 3cf17ff830eda..9c3a92e9eb95a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -349,11 +349,11 @@ public void shouldBackwardFetchAllFromInnerStoreAndRecordFetchMetrics() { } @Test - public void shouldRecordFlushLatency() { - doNothing().when(innerStoreMock).flush(); + public void shouldRecordCommitLatency() { + doNothing().when(innerStoreMock).commit(Map.of()); store.init(context, store); - store.flush(); + store.commit(Map.of()); // it suffices to verify one flush metric since all flush metrics are recorded by the same sensor // and the sensor is tested elsewhere diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java index 25d7fa3a68b13..fd8ff4eb5db45 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.internals.ApiUtils; @@ -381,7 +382,7 @@ public String name() { public void init(final StateStoreContext stateStoreContext, final StateStore root) {} @Override - public void flush() { + public void commit(final Map changelogOffsets) { } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 530a25b4e662d..d756d635af636 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -364,7 +364,7 @@ public void shouldNotSetCacheInValueProvidersWhenUserProvidesPlainTableFormatCon public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() { rocksDBStore.init(context, rocksDBStore); rocksDBStore.put(new Bytes("existingKey".getBytes(UTF_8)), "existingValue".getBytes(UTF_8)); - rocksDBStore.flush(); + rocksDBStore.commit(Map.of()); final List> restoreBytes = new ArrayList<>(); @@ -427,7 +427,7 @@ public void shouldPutAll() { rocksDBStore.init(context, rocksDBStore); rocksDBStore.putAll(entries); - rocksDBStore.flush(); + rocksDBStore.commit(Map.of()); assertEquals( "a", @@ -486,7 +486,7 @@ public void shouldReturnKeysWithGivenPrefix() { rocksDBStore.init(context, rocksDBStore); rocksDBStore.putAll(entries); - rocksDBStore.flush(); + rocksDBStore.commit(Map.of()); try (final KeyValueIterator keysWithPrefix = rocksDBStore.prefixScan("prefix", stringSerializer)) { final List valuesWithPrefix = new ArrayList<>(); @@ -521,7 +521,7 @@ public void shouldReturnKeysWithGivenPrefixExcludingNextKeyLargestKey() { rocksDBStore.init(context, rocksDBStore); rocksDBStore.putAll(entries); - rocksDBStore.flush(); + rocksDBStore.commit(Map.of()); try (final KeyValueIterator keysWithPrefixAsabcd = rocksDBStore.prefixScan("abcd", stringSerializer)) { int numberOfKeysReturned = 0; @@ -619,7 +619,7 @@ public void shouldReturnUUIDsWithStringPrefix() { rocksDBStore.init(context, rocksDBStore); rocksDBStore.putAll(entries); - rocksDBStore.flush(); + rocksDBStore.commit(Map.of()); try (final KeyValueIterator keysWithPrefix = rocksDBStore.prefixScan(prefix, stringSerializer)) { final List valuesWithPrefix = new ArrayList<>(); @@ -654,7 +654,7 @@ public void shouldReturnNoKeys() { stringSerializer.serialize(null, "e"))); rocksDBStore.init(context, rocksDBStore); rocksDBStore.putAll(entries); - rocksDBStore.flush(); + rocksDBStore.commit(Map.of()); try (final KeyValueIterator keysWithPrefix = rocksDBStore.prefixScan("d", stringSerializer)) { int numberOfKeysReturned = 0; @@ -868,7 +868,7 @@ public void shouldThrowProcessorStateExceptionOnPutDeletedDir() throws IOExcepti rocksDBStore.put( new Bytes(stringSerializer.serialize(null, "anyKey")), stringSerializer.serialize(null, "anyValue")); - assertThrows(ProcessorStateException.class, () -> rocksDBStore.flush()); + assertThrows(ProcessorStateException.class, () -> rocksDBStore.commit(Map.of())); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java index ffa509d518871..0253c6fc058c8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java @@ -63,6 +63,7 @@ import java.time.Instant; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.UUID; @@ -309,7 +310,7 @@ private void shouldMatchPositionAfterPut() { assertEquals(Position.emptyPosition(), cachingStore.getPosition()); assertEquals(Position.emptyPosition(), underlyingStore.getPosition()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))), @@ -675,12 +676,12 @@ public void shouldFlushEvictedItemsIntoUnderlyingStore(final boolean hasIndex) { @ParameterizedTest @ValueSource(booleans = {true, false}) - public void shouldForwardDirtyItemsWhenFlushCalled(final boolean hasIndex) { + public void shouldForwardDirtyItemsWhenCommitCalled(final boolean hasIndex) { setUp(hasIndex); final Windowed windowedKey = new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("a", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); } @@ -702,23 +703,23 @@ public void shouldForwardOldValuesWhenEnabled(final boolean hasIndex) { new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cacheListener.forwarded.clear(); cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue); assertEquals("b", cacheListener.forwarded.get(windowedKey).oldValue); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey).newValue); assertEquals("c", cacheListener.forwarded.get(windowedKey).oldValue); cacheListener.forwarded.clear(); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey)); cacheListener.forwarded.clear(); } @@ -731,22 +732,22 @@ public void shouldNotForwardOldValuesWhenDisabled(final boolean hasIndex) { new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cacheListener.forwarded.clear(); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey)); cacheListener.forwarded.clear(); } @@ -764,7 +765,7 @@ public void shouldForwardDirtyItemToListenerWhenEvicted(final boolean hasIndex) public void shouldTakeValueFromCacheIfSameTimestampFlushedToRocks(final boolean hasIndex) { setUp(hasIndex); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); try (final WindowStoreIterator fetch = diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java index 0a65d47d207f2..a3e21a5428823 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java @@ -47,6 +47,7 @@ import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.Random; @@ -311,7 +312,7 @@ public void shouldReturnPriorValueForBufferedKey(final String testName, final Fu @ParameterizedTest @MethodSource("parameters") - public void shouldFlush(final String testName, final Function bufferSupplier) { + public void shouldCommit(final String testName, final Function bufferSupplier) { setup(testName, bufferSupplier); final TimeOrderedKeyValueBuffer> buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); @@ -323,8 +324,8 @@ public void shouldFlush(final String testName, final Function bufferS // replace "deleteme" with a tombstone buffer.evictWhile(() -> buffer.minTimestamp() < 1, kv -> { }); - // flush everything to the changelog - buffer.flush(); + // commit everything to the changelog + buffer.commit(Map.of()); // the buffer should serialize the buffer time and the value as byte[], // which we can't compare for equality using ProducerRecord. diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java index 9d0db9bae0fbb..8bff63ddb4d3e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java @@ -62,6 +62,7 @@ import java.time.Instant; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.UUID; @@ -315,7 +316,7 @@ private void shouldMatchPositionAfterPut() { assertEquals(Position.emptyPosition(), cachingStore.getPosition()); assertEquals(Position.emptyPosition(), underlyingStore.getPosition()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))), @@ -681,12 +682,12 @@ public void shouldFlushEvictedItemsIntoUnderlyingStore(final boolean hasIndex) { @ParameterizedTest @ValueSource(booleans = {true, false}) - public void shouldForwardDirtyItemsWhenFlushCalled(final boolean hasIndex) { + public void shouldForwardDirtyItemsWhenCommitCalled(final boolean hasIndex) { setUp(hasIndex); final Windowed windowedKey = new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("a", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); } @@ -708,23 +709,23 @@ public void shouldForwardOldValuesWhenEnabled(final boolean hasIndex) { new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cacheListener.forwarded.clear(); cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue); assertEquals("b", cacheListener.forwarded.get(windowedKey).oldValue); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey).newValue); assertEquals("c", cacheListener.forwarded.get(windowedKey).oldValue); cacheListener.forwarded.clear(); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey)); cacheListener.forwarded.clear(); } @@ -737,22 +738,22 @@ public void shouldForwardOldValuesWhenDisabled(final boolean hasIndex) { new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cacheListener.forwarded.clear(); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey)); cacheListener.forwarded.clear(); } @@ -770,7 +771,7 @@ public void shouldForwardDirtyItemToListenerWhenEvicted(final boolean hasIndex) public void shouldTakeValueFromCacheIfSameTimestampFlushedToRocks(final boolean hasIndex) { setUp(hasIndex); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); try (final WindowStoreIterator fetch = diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java index f13add1dc414e..203aae869bafd 100644 --- a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.test; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; @@ -140,7 +141,7 @@ public long approximateNumEntries() { } @Override - public void flush() { + public void commit(final Map changelogOffsets) { // do-nothing since it is in-memory } diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java index e5d599032a53a..59880319583ad 100644 --- a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.test; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; @@ -142,7 +143,7 @@ public long approximateNumEntries() { } @Override - public void flush() { + public void commit(final Map changelogOffsets) { // do-nothing since it is in-memory } diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java index a2a6ac43dfd76..331a7c5a24b85 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.test; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.Serializer; @@ -29,18 +30,19 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; public class MockKeyValueStore implements KeyValueStore { - // keep a global counter of flushes and a local reference to which store had which - // flush, so we can reason about the order in which stores get flushed. - private static final AtomicInteger GLOBAL_FLUSH_COUNTER = new AtomicInteger(0); - private final AtomicInteger instanceLastFlushCount = new AtomicInteger(-1); + // keep a global counter of commits and a local reference to which store had which + // commit, so we can reason about the order in which stores get committed. + private static final AtomicInteger GLOBAL_COMMIT_COUNTER = new AtomicInteger(0); + private final AtomicInteger instanceLastCommitCount = new AtomicInteger(-1); private final String name; private final boolean persistent; public boolean initialized = false; - public boolean flushed = false; + public boolean committed = false; public boolean closed = true; public final ArrayList keys = new ArrayList<>(); public final ArrayList values = new ArrayList<>(); @@ -65,13 +67,13 @@ public void init(final StateStoreContext stateStoreContext, } @Override - public void flush() { - instanceLastFlushCount.set(GLOBAL_FLUSH_COUNTER.getAndIncrement()); - flushed = true; + public void commit(final Map changelogOffsets) { + instanceLastCommitCount.set(GLOBAL_COMMIT_COUNTER.getAndIncrement()); + committed = true; } - public int getLastFlushCount() { - return instanceLastFlushCount.get(); + public int getLastCommitCount() { + return instanceLastCommitCount.get(); } @Override diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java index aa8f4a5b8bcd6..2eed5faf63825 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.test; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; @@ -24,13 +25,14 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import java.io.File; +import java.util.Map; public class NoOpReadOnlyStore implements ReadOnlyKeyValueStore, StateStore { private final String name; private final boolean rocksdbStore; private boolean open = true; public boolean initialized; - public boolean flushed; + public boolean committed; public NoOpReadOnlyStore() { this("", false); @@ -89,8 +91,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - flushed = true; + public void commit(final Map changelogOffsets) { + committed = true; } @Override diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java index d7c5936b7d5e6..3a8be4e34e171 100644 --- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.test; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Windowed; @@ -29,6 +30,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; @@ -187,7 +189,7 @@ public String name() { public void init(StateStoreContext stateStoreContext, StateStore root) {} @Override - public void flush() { + public void commit(final Map changelogOffsets) { } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index f0e748f35ea0d..19187188643ea 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -1236,8 +1236,8 @@ public V delete(final K key) { } @Override - public void flush() { - inner.flush(); + public void commit(final Map changelogOffsets) { + inner.commit(changelogOffsets); } @Override @@ -1328,8 +1328,8 @@ public KeyValueIterator, V> backwardFetchAll(final long timeFrom, } @Override - public void flush() { - inner.flush(); + public void commit(final Map changelogOffsets) { + inner.commit(changelogOffsets); } @Override diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java index 3eadb857d22d4..5942322f103f5 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java @@ -26,6 +26,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Map; + import static java.util.Arrays.asList; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -98,9 +100,9 @@ public void shouldDeleteAndReturnPlainValue() { } @Test - public void shouldForwardFlush() { - keyValueStoreFacade.flush(); - verify(mockedKeyValueTimestampStore).flush(); + public void shouldForwardCommit() { + keyValueStoreFacade.commit(Map.of()); + verify(mockedKeyValueTimestampStore).commit(Map.of()); } @Test diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java index 719acd1bc12a9..7b64dc6e37485 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java @@ -26,6 +26,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Map; + import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Mockito.mock; @@ -62,9 +64,9 @@ public void shouldPutWindowStartTimestampWithUnknownTimestamp() { } @Test - public void shouldForwardFlush() { - windowStoreFacade.flush(); - verify(mockedWindowTimestampStore).flush(); + public void shouldForwardCommit() { + windowStoreFacade.commit(Map.of()); + verify(mockedWindowTimestampStore).commit(Map.of()); } @Test