From ee2323a76ffb1cdedb40ef8ccfb76e114ab8560b Mon Sep 17 00:00:00 2001 From: Nick Telford Date: Fri, 21 Nov 2025 16:38:21 +0000 Subject: [PATCH] KAFKA-14711: Add new StateStore interfaces These are the new interfaces detailed in KIP-1035: "StateStore managed changelog offsets". This PR introduces the interfaces changes, but makes otherwise no consequential behavioural changes. Outside of `StateStore.java`, _all_ changes are essentially just a rename of all invocations of `StateStore#flush` to instead call `StateStore#commit`. The `changelogOffsets` being passed in these invocations is currently unused: the behaviour of `StateStore#commit` remains identical to `StateStore#flush` before these changes. A new implementation of `StateStore#commit` that actually uses these offsets, along with changes to the use-site (in `ProcessorStateManager` and `GlobalStateManager`) will come in a later PR. Many strings, including documentation, and some variable names, have also been renamed (from "flush" to "commit"), to maintain consistency with the method they relate to. One exception is the `flush-rate` metric, which has not been renamed, because it will instead be deprecated in favour of a new `commit-rate` metric, which will be introduced in another PR. --- The only change in behaviour is as follows: calling `StateStore#flush` from within a `Processor` is now a guaranteed no-op. In the future, this will throw an `UnsupportedOperationException`, but to ensure no changes to end-user experience, we currently make it a no-op. Previously, any call to `StateStore#flush` from a `Processor` would have made no difference to program semantics, but likely would introduce performance problems for RocksDB. This is because it would force a flush of RocksDB memtables to disk on every invocation, which if naively used could be on _every_ `Record`. Consequently, making this a no-op should not make a difference for end-users, except potentially improving performance if they were incorrectly calling this method. --- .../integration/EosIntegrationTest.java | 2 +- .../integration/IQv2IntegrationTest.java | 3 +- .../StandbyTaskEOSIntegrationTest.java | 3 +- ...VersionedKeyValueStoreIntegrationTest.java | 3 +- .../kafka/streams/processor/StateStore.java | 78 ++++++++++++- .../internals/AbstractReadOnlyDecorator.java | 8 ++ .../internals/AbstractReadWriteDecorator.java | 7 ++ .../internals/GlobalStateManagerImpl.java | 8 +- .../internals/ProcessorStateManager.java | 24 ++-- ...tDualSchemaRocksDBSegmentedBytesStore.java | 5 +- .../AbstractRocksDBSegmentedBytesStore.java | 5 +- .../state/internals/AbstractSegments.java | 5 +- .../state/internals/CachingKeyValueStore.java | 5 +- .../state/internals/CachingSessionStore.java | 7 +- .../state/internals/CachingWindowStore.java | 6 +- .../internals/InMemoryKeyValueStore.java | 4 +- .../state/internals/InMemorySessionStore.java | 3 +- ...MemoryTimeOrderedKeyValueChangeBuffer.java | 3 +- .../state/internals/InMemoryWindowStore.java | 3 +- .../state/internals/KeyValueStoreWrapper.java | 7 +- ...ToTimestampedKeyValueByteStoreAdapter.java | 6 +- .../internals/LogicalKeyValueSegment.java | 6 +- .../internals/LogicalKeyValueSegments.java | 7 +- .../state/internals/MemoryLRUCache.java | 3 +- .../state/internals/MeteredKeyValueStore.java | 5 +- .../state/internals/MeteredSessionStore.java | 5 +- .../MeteredVersionedKeyValueStore.java | 5 +- .../state/internals/MeteredWindowStore.java | 5 +- .../streams/state/internals/RocksDBStore.java | 12 +- .../RocksDBTimeOrderedKeyValueBuffer.java | 5 +- .../RocksDBTimeOrderedWindowStore.java | 6 +- .../internals/RocksDBTimestampedStore.java | 5 +- .../internals/RocksDBVersionedStore.java | 8 +- .../streams/state/internals/Segments.java | 4 +- .../TimeOrderedCachingWindowStore.java | 6 +- .../VersionedKeyValueToBytesStoreAdapter.java | 6 +- ...owToTimestampedWindowByteStoreAdapter.java | 6 +- .../state/internals/WrappedStateStore.java | 7 +- ...amSessionWindowAggregateProcessorTest.java | 5 +- .../internals/GlobalStateManagerImplTest.java | 8 +- .../internals/ProcessorContextImplTest.java | 18 --- .../internals/ProcessorStateManagerTest.java | 30 ++--- .../state/KeyValueStoreTestDriver.java | 84 +++++++------- .../kafka/streams/state/NoOpWindowStore.java | 4 +- .../internals/AbstractKeyValueStoreTest.java | 104 +++++++++--------- .../AbstractRocksDBWindowStoreTest.java | 7 +- .../AbstractWindowBytesStoreTest.java | 16 +-- .../CachingInMemoryKeyValueStoreTest.java | 27 ++--- .../CachingInMemorySessionStoreTest.java | 25 +++-- .../CachingPersistentSessionStoreTest.java | 25 +++-- .../CachingPersistentWindowStoreTest.java | 23 ++-- .../ChangeLoggingSessionBytesStoreTest.java | 8 +- .../internals/InMemoryKeyValueStoreTest.java | 9 +- .../internals/InMemoryLRUCacheStoreTest.java | 37 ++++--- .../internals/KeyValueStoreWrapperTest.java | 14 ++- .../internals/MeteredKeyValueStoreTest.java | 6 +- .../MeteredTimestampedKeyValueStoreTest.java | 6 +- .../MeteredVersionedKeyValueStoreTest.java | 6 +- .../internals/MeteredWindowStoreTest.java | 6 +- .../internals/ReadOnlyWindowStoreStub.java | 3 +- .../state/internals/RocksDBStoreTest.java | 14 +-- ...deredCachingPersistentWindowStoreTest.java | 25 +++-- .../TimeOrderedKeyValueBufferTest.java | 7 +- .../internals/TimeOrderedWindowStoreTest.java | 25 +++-- .../test/GenericInMemoryKeyValueStore.java | 3 +- ...nericInMemoryTimestampedKeyValueStore.java | 3 +- .../apache/kafka/test/MockKeyValueStore.java | 22 ++-- .../apache/kafka/test/NoOpReadOnlyStore.java | 8 +- .../kafka/test/ReadOnlySessionStoreStub.java | 4 +- .../kafka/streams/TopologyTestDriver.java | 8 +- .../streams/KeyValueStoreFacadeTest.java | 8 +- .../kafka/streams/WindowStoreFacadeTest.java | 8 +- 72 files changed, 528 insertions(+), 374 deletions(-) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 46ace65cf04f8..24dd92fa431eb 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -1200,7 +1200,7 @@ public void process(final Record record) { sum += value; } state.put(key, sum); - state.flush(); + state.commit(Map.of()); } diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java index 20fc7f4723642..c2b532afc7724 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; @@ -383,7 +384,7 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { + public void commit(final Map changelogOffsets) { } diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index 67435c3afeb3d..ae5557b1f6bac 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -54,6 +54,7 @@ import java.io.IOException; import java.time.Duration; import java.util.Collections; +import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -358,7 +359,7 @@ public void process(final Record record) { } store.put(key, value); - store.flush(); + store.commit(Map.of()); if (key == KEY_1) { // after error injection, we need to avoid a consecutive error after rebalancing diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java index d3ec4d9838e22..740b5b1af5c31 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.integration; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; @@ -821,7 +822,7 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { + public void commit(final Map changelogOffsets) { // do nothing } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java index 38a3e23e28a1e..215155e08f9f7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability.Evolving; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.api.ProcessorContext; @@ -26,6 +27,8 @@ import org.apache.kafka.streams.query.QueryConfig; import org.apache.kafka.streams.query.QueryResult; +import java.util.Map; + /** * A storage engine for managing state maintained by a stream processor. *

@@ -73,8 +76,55 @@ public interface StateStore { /** * Flush any cached data + * + * @deprecated Use {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() ProcessorContext#commit()} + * instead. + */ + @Deprecated + default void flush() { + // no-op + } + + /** + * Commit all written records to this StateStore. + *

+ * This method CANNOT be called by users from {@link org.apache.kafka.streams.processor.api.Processor + * processors}. Doing so will throw an {@link java.lang.UnsupportedOperationException}. + *

+ * Instead, users should call {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() + * ProcessorContext#commit()} to request a Task commit. + *

+ * If {@link #managesOffsets()} returns {@code true}, the given {@code changelogOffsets} will be guaranteed to be + * persisted to disk along with the written records. + *

+ * {@code changelogOffsets} will usually contain a single partition, in the case of a regular StateStore. However, + * they may contain multiple partitions in the case of a Global StateStore with multiple partitions. All provided + * partitions MUST be persisted to disk. + *

+ * Implementations SHOULD ensure that {@code changelogOffsets} are committed to disk atomically with the + * records they represent, if possible. + * + * @param changelogOffsets The changelog offset(s) corresponding to the most recently written records. + */ + default void commit(final Map changelogOffsets) { + flush(); + } + + /** + * Returns the most recently {@link #commit(Map) committed} offset for the given {@link TopicPartition}. + *

+ * If {@link #managesOffsets()} and {@link #persistent()} both return {@code true}, this method will return the + * offset that corresponds to the changelog record most recently written to this store, for the given {@code + * partition}. + * + * @param partition The partition to get the committed offset for. + * @return The last {@link #commit(Map) committed} offset for the {@code partition}; or {@code null} if no offset + * has been committed for the partition, or if either {@link #persistent()} or {@link #managesOffsets()} + * return {@code false}. */ - void flush(); + default Long committedOffset(final TopicPartition partition) { + return null; + } /** * Close the storage engine. @@ -93,6 +143,32 @@ public interface StateStore { */ boolean persistent(); + /** + * Determines if this StateStore manages its own offsets. + *

+ * If this method returns {@code true}, then offsets provided to {@link #commit(Map)} will be retrievable using + * {@link #committedOffset(TopicPartition)}. + *

+ * If this method returns {@code false}, offsets provided to {@link #commit(Map)} will be ignored, and {@link + * #committedOffset(TopicPartition)} will be expected to always return {@code null}. + *

+ * This method is provided to enable custom StateStores to opt-in to managing their own offsets. This is required, + * to ensure that custom StateStores provide the consistency guarantees that Kafka Streams expects when operating + * under an {@code exactly-once} {@code processing.guarantee}. + *

+ * New implementations are required to implement this method and return {@code true}. Existing implementations + * should upgrade to managing their own offsets as soon as possible, as the legacy offset management is deprecated + * and will be removed in a future version. + * + * @deprecated New implementations should always return {@code true} and manage their own offsets. In the future, + * this method will be removed and it will be assumed to always return {@code true}. + * @return Whether this StateStore manages its own offsets. + */ + @Deprecated + default boolean managesOffsets() { + return false; + } + /** * Is this store open for reading and writing * @return {@code true} if the store is open diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java index c8b683aa96cba..01fb9d7bf3ce8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; @@ -34,6 +35,7 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore; import java.util.List; +import java.util.Map; abstract class AbstractReadOnlyDecorator extends WrappedStateStore { @@ -44,6 +46,7 @@ private AbstractReadOnlyDecorator(final T inner) { } @Override + @Deprecated public void flush() { throw new UnsupportedOperationException(ERROR_MESSAGE); } @@ -54,6 +57,11 @@ public void init(final StateStoreContext stateStoreContext, throw new UnsupportedOperationException(ERROR_MESSAGE); } + @Override + public void commit(final Map changelogOffsets) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + @Override public void close() { throw new UnsupportedOperationException(ERROR_MESSAGE); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java index d9772027cb2d8..569adad58c325 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; @@ -34,6 +35,7 @@ import org.apache.kafka.streams.state.internals.WrappedStateStore; import java.util.List; +import java.util.Map; abstract class AbstractReadWriteDecorator extends WrappedStateStore { static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams"; @@ -48,6 +50,11 @@ public void init(final StateStoreContext stateStoreContext, throw new UnsupportedOperationException(ERROR_MESSAGE); } + @Override + public void commit(final Map changelogOffsets) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + @Override public void close() { throw new UnsupportedOperationException(ERROR_MESSAGE); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 2bf65c31d7512..cbb7c4ebbcee2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -464,16 +464,16 @@ private long maybeUpdateDeadlineOrThrow(final long currentDeadlineMs) { @Override public void flush() { - log.debug("Flushing all global globalStores registered in the state manager"); + log.debug("Committing all global globalStores registered in the state manager"); for (final Map.Entry> entry : globalStores.entrySet()) { if (entry.getValue().isPresent()) { final StateStore store = entry.getValue().get(); try { - log.trace("Flushing global store={}", store.name()); - store.flush(); + log.trace("Committing global store={}", store.name()); + store.commit(Map.of()); } catch (final RuntimeException e) { throw new ProcessorStateException( - String.format("Failed to flush global state store %s", store.name()), + String.format("Failed to commit global state store %s", store.name()), e ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 3506845d288af..cea6baf2be471 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -521,36 +521,36 @@ void restore(final StateStoreMetadata storeMetadata, final List) store).flushCache(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java index e05b6328ec8b3..85b85855a36be 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -277,8 +278,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - segments.flush(); + public void commit(final Map changelogOffsets) { + segments.commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index bde8d8319197d..201ca7ff9411a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; @@ -326,8 +327,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - segments.flush(); + public void commit(final Map changelogOffsets) { + segments.commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java index 4f7ca5e59ae9c..71044792a0b8a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.query.Position; @@ -160,9 +161,9 @@ public List allSegments(final boolean forward) { } @Override - public void flush() { + public void commit(final Map changelogOffsets) { for (final S segment : segments.values()) { - segment.flush(); + segment.commit(changelogOffsets); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 83343d04494d6..c630481472726 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -452,13 +453,13 @@ public long approximateNumEntries() { } @Override - public void flush() { + public void commit(final Map changelogOffsets) { validateStoreOpen(); lock.writeLock().lock(); try { validateStoreOpen(); internalContext.cache().flush(cacheName); - wrapped().flush(); + wrapped().commit(changelogOffsets); } finally { lock.writeLock().unlock(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index ec0c1bd077d6f..20329d393160b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Windowed; @@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory; import java.util.LinkedList; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; @@ -315,9 +317,10 @@ public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFr return backwardFindSessions(keyFrom, keyTo, 0, Long.MAX_VALUE); } - public void flush() { + @Override + public void commit(final Map changelogOffsets) { internalContext.cache().flush(cacheName); - wrapped().flush(); + wrapped().commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 0432c1726cb3e..3bdbe24fe1b27 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -37,6 +38,7 @@ import org.slf4j.LoggerFactory; import java.util.LinkedList; +import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicLong; @@ -407,9 +409,9 @@ public KeyValueIterator, byte[]> backwardAll() { } @Override - public synchronized void flush() { + public synchronized void commit(final Map changelogOffsets) { internalContext.cache().flush(cacheName); - wrapped().flush(); + wrapped().commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index e7449ea49d44a..51744ad9cedf5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -38,6 +39,7 @@ import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NavigableMap; import java.util.NoSuchElementException; import java.util.Set; @@ -229,7 +231,7 @@ public long approximateNumEntries() { } @Override - public void flush() { + public void commit(final Map changelogOffsets) { // do-nothing since it is in-memory } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java index ed2bb1868867c..1a5c3197442ea 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -362,7 +363,7 @@ public QueryResult query(final Query query, } @Override - public void flush() { + public void commit(final Map changelogOffsets) { // do-nothing since it is in-memory } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java index b1471591cb651..eff87a86fee0e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -244,7 +245,7 @@ public void close() { } @Override - public void flush() { + public void commit(final Map changelogOffsets) { if (loggingEnabled) { // counting on this getting called before the record collector's flush for (final Bytes key : dirtyKeys) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index d3d5ba4a20d44..c883576977d96 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -385,7 +386,7 @@ public QueryResult query(final Query query, } @Override - public void flush() { + public void commit(final Map changelogOffsets) { // do-nothing since it is in-memory } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java index dce28444d85ec..4b53eb890c858 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; @@ -30,6 +31,8 @@ import org.apache.kafka.streams.state.VersionedKeyValueStore; import org.apache.kafka.streams.state.VersionedRecord; +import java.util.Map; + /** * A wrapper class for non-windowed key-value stores used within the DSL. All such stores are * instances of either {@link TimestampedKeyValueStore} or {@link VersionedKeyValueStore}. @@ -127,8 +130,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - store.flush(); + public void commit(final Map changelogOffsets) { + store.commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java index 8e79a86bc2b8a..f62c703d91d5a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -36,6 +37,7 @@ import org.apache.kafka.streams.state.KeyValueStore; import java.util.List; +import java.util.Map; import static org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat; import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue; @@ -100,8 +102,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - store.flush(); + public void commit(final Map changelogOffsets) { + store.commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java index 18b371048c3f1..4d39b94561b8b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.BytesSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -36,6 +37,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -142,8 +144,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - throw new UnsupportedOperationException("nothing to flush for logical segment"); + public void commit(final Map changelogOffsets) { + throw new UnsupportedOperationException("nothing to commit for logical segment"); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java index c46a2c2788c4b..fbca4f5cd4e2d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.query.Position; @@ -32,7 +33,7 @@ * {@link #segmentForTimestamp(long)}, {@link #getOrCreateSegment(long, StateStoreContext)}, * {@link #getOrCreateSegmentIfLive(long, StateStoreContext, long)}, * {@link #segments(long, long, boolean)}, and {@link #allSegments(boolean)} - * only return regular segments and not reserved segments. The methods {@link #flush()} + * only return regular segments and not reserved segments. The methods {@link #commit(Map)} * and {@link #close()} flush and close both regular and reserved segments, due to * the fact that both types of segments share the same physical RocksDB instance. * To create a reserved segment, use {@link #createReservedSegment(long, String)} instead. @@ -114,8 +115,8 @@ public void cleanupExpiredSegments(final long streamTime) { } @Override - public void flush() { - physicalStore.flush(); + public void commit(final Map changelogOffsets) { + physicalStore.commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index dc77be5da01dd..9729e63e60a98 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -219,7 +220,7 @@ public long approximateNumEntries() { } @Override - public void flush() { + public void commit(final Map changelogOffsets) { // do-nothing since it is in-memory } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index a89bf0c62c1b0..f41e370348850 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; @@ -383,8 +384,8 @@ public KeyValueIterator reverseAll() { } @Override - public void flush() { - maybeMeasureLatency(super::flush, time, flushSensor); + public void commit(final Map changelogOffsets) { + maybeMeasureLatency(() -> super.commit(changelogOffsets), time, flushSensor); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 7794a6ebc510a..484d3586ce5e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; @@ -398,8 +399,8 @@ public KeyValueIterator, V> backwardFindSessions(final K keyFrom, } @Override - public void flush() { - maybeMeasureLatency(super::flush, time, flushSensor); + public void commit(final Map changelogOffsets) { + maybeMeasureLatency(() -> super.commit(changelogOffsets), time, flushSensor); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java index 6afb4d1531d44..34f137e3b3fb4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; @@ -346,8 +347,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - internal.flush(); + public void commit(final Map changelogOffsets) { + internal.commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 1ba37da6dab0c..8fc5781d184cb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; @@ -355,8 +356,8 @@ public KeyValueIterator, V> backwardAll() { } @Override - public void flush() { - maybeMeasureLatency(super::flush, time, flushSensor); + public void commit(final Map changelogOffsets) { + maybeMeasureLatency(() -> super.commit(changelogOffsets), time, flushSensor); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index ede618237cf35..d4705ad62d125 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Bytes; @@ -633,14 +634,14 @@ private boolean isOverflowing(final long value) { } @Override - public synchronized void flush() { + public synchronized void commit(final Map changelogOffsets) { if (db == null) { return; } try { - cfAccessor.flush(dbAccessor); + cfAccessor.commit(dbAccessor, changelogOffsets); } catch (final RocksDBException e) { - throw new ProcessorStateException("Error while executing flush from store " + name, e); + throw new ProcessorStateException("Error while executing commit from store " + name, e); } } @@ -835,7 +836,7 @@ ManagedKeyValueIterator range(final DBAccessor accessor, long approximateNumEntries(final DBAccessor accessor) throws RocksDBException; - void flush(final DBAccessor accessor) throws RocksDBException; + void commit(final DBAccessor accessor, final Map changelogOffsets) throws RocksDBException; void addToBatch(final byte[] key, final byte[] value, @@ -951,7 +952,8 @@ public long approximateNumEntries(final DBAccessor accessor) throws RocksDBExcep } @Override - public void flush(final DBAccessor accessor) throws RocksDBException { + public void commit(final DBAccessor accessor, + final Map changelogOffsets) throws RocksDBException { accessor.flush(columnFamily); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java index 26065bf0fe318..a2203e362da40 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.BytesSerializer; import org.apache.kafka.common.serialization.Serde; @@ -195,8 +196,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - store.flush(); + public void commit(final Map changelogOffsets) { + store.commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java index ba39d2548e8d3..41708ed7b4231 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StateStore; @@ -30,6 +31,7 @@ import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema; +import java.util.Map; import java.util.Objects; @@ -61,8 +63,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - wrapped().flush(); + public void commit(final Map changelogOffsets) { + wrapped().commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java index 7c44ca7c179cf..36262cb8e0dcf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.AbstractIterator; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -39,6 +40,7 @@ import java.nio.charset.StandardCharsets; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; import java.util.Optional; @@ -253,7 +255,8 @@ public long approximateNumEntries(final DBAccessor accessor) throws RocksDBExcep } @Override - public void flush(final DBAccessor accessor) throws RocksDBException { + public void commit(final DBAccessor accessor, + final Map changelogOffsets) throws RocksDBException { accessor.flush(oldColumnFamily, newColumnFamily); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java index 54580a26a1bde..ceb2c8cb8a86f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -55,6 +56,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED; @@ -303,9 +305,9 @@ public String name() { } @Override - public void flush() { - segmentStores.flush(); - // flushing segments store includes flushing latest value store, since they share the + public void commit(final Map changelogOffsets) { + segmentStores.commit(changelogOffsets); + // committing segments store includes committing latest value store, since they share the // same physical RocksDB instance } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java index 18086a5441b65..e9c7d144d84e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java @@ -16,9 +16,11 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.StateStoreContext; import java.util.List; +import java.util.Map; interface Segments { @@ -38,7 +40,7 @@ interface Segments { List allSegments(final boolean forward); - void flush(); + void commit(final Map changelogOffsets); void close(); } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java index 646cbf2ca3557..b347f383ec73a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; @@ -47,6 +48,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -504,9 +506,9 @@ public KeyValueIterator, byte[]> backwardAll() { } @Override - public synchronized void flush() { + public synchronized void commit(final Map changelogOffsets) { internalContext.cache().flush(cacheName); - wrapped().flush(); + wrapped().commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java index 5daa6ed1815dd..1d16b2a86b783 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes.ByteArraySerde; import org.apache.kafka.common.serialization.Serializer; @@ -38,6 +39,7 @@ import org.apache.kafka.streams.state.VersionedRecord; import java.util.List; +import java.util.Map; import java.util.Objects; /** @@ -93,8 +95,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - inner.flush(); + public void commit(final Map changelogOffsets) { + inner.commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java index eec2e2ff1d8de..90cb126d5de13 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.StateStore; @@ -30,6 +31,7 @@ import org.apache.kafka.streams.state.WindowStoreIterator; import java.time.Instant; +import java.util.Map; import static org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat; import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue; @@ -161,8 +163,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - store.flush(); + public void commit(final Map changelogOffsets) { + store.commit(changelogOffsets); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java index adbb7568c87c5..0f0f7ee62ee5b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; @@ -27,6 +28,8 @@ import org.apache.kafka.streams.state.TimestampedBytesStore; import org.apache.kafka.streams.state.VersionedBytesStore; +import java.util.Map; + /** * A storage engine wrapper for utilities like logging, caching, and metering. */ @@ -110,8 +113,8 @@ void validateStoreOpen() { } @Override - public void flush() { - wrapped.flush(); + public void commit(final Map changelogOffsets) { + wrapped.commit(changelogOffsets); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index f6658997b7bf9..d6b94c7e7f92c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -57,6 +57,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; @@ -244,7 +245,7 @@ public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap(fi processor.process(new Record<>(sessionId, "third", now)); processor.process(new Record<>(sessionId, "third", now)); - sessionStore.flush(); + sessionStore.commit(Map.of()); if (emitFinal) { assertEquals( @@ -318,7 +319,7 @@ public void shouldHandleMultipleSessionsAndMerging(final EmitStrategy.StrategyTy processor.process(new Record<>("a", "3", GAP_MS + 1 + GAP_MS / 2)); processor.process(new Record<>("c", "3", GAP_MS + 1 + GAP_MS / 2)); - sessionStore.flush(); + sessionStore.commit(Map.of()); if (emitFinal) { assertEquals(Arrays.asList( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 74705840de983..9d163b4df719d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -413,18 +413,18 @@ public void shouldFlushStateStores() { stateManager.registerStore(store2, stateRestoreCallback, null); stateManager.flush(); - assertTrue(store1.flushed); - assertTrue(store2.flushed); + assertTrue(store1.committed); + assertTrue(store2.committed); } @Test - public void shouldThrowProcessorStateStoreExceptionIfStoreFlushFailed() { + public void shouldThrowProcessorStateStoreExceptionIfStoreCommitFailed() { stateManager.initialize(); // register the stores initializeConsumer(1, 0, t1); stateManager.registerStore(new NoOpReadOnlyStore<>(store1.name()) { @Override - public void flush() { + public void commit(final Map changelogOffsets) { throw new RuntimeException("KABOOM!"); } }, stateRestoreCallback, null); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java index 9410ca5a97896..960ca160ef7d8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java @@ -103,7 +103,6 @@ public class ProcessorContextImplTest { private static final String REGISTERED_STORE_NAME = "registered-store"; private static final TopicPartition CHANGELOG_PARTITION = new TopicPartition("store-changelog", 1); - private boolean flushExecuted = false; private boolean putExecuted = false; private boolean putWithTimestampExecuted; private boolean putIfAbsentExecuted = false; @@ -300,7 +299,6 @@ public void localKeyValueStoreShouldNotAllowInitOrClose() { final KeyValueStore keyValueStoreMock = mock(KeyValueStore.class); when(stateManager.store("LocalKeyValueStore")).thenAnswer(answer -> keyValueStoreMock(keyValueStoreMock)); - mockStateStoreFlush(keyValueStoreMock); mockKeyValueStoreOperation(keyValueStoreMock); context = buildProcessorContextImpl(streamsConfig, stateManager); @@ -314,7 +312,6 @@ public void localKeyValueStoreShouldNotAllowInitOrClose() { verifyStoreCannotBeInitializedOrClosed(store); store.flush(); - assertTrue(flushExecuted); store.put("1", 1L); assertTrue(putExecuted); @@ -344,7 +341,6 @@ public void localTimestampedKeyValueStoreShouldNotAllowInitOrClose() { when(stateManager.store("LocalTimestampedKeyValueStore")) .thenAnswer(answer -> timestampedKeyValueStoreMock(timestampedKeyValueStoreMock)); mockTimestampedKeyValueOperation(timestampedKeyValueStoreMock); - mockStateStoreFlush(timestampedKeyValueStoreMock); context = buildProcessorContextImpl(streamsConfig, stateManager); @@ -357,7 +353,6 @@ public void localTimestampedKeyValueStoreShouldNotAllowInitOrClose() { verifyStoreCannotBeInitializedOrClosed(store); store.flush(); - assertTrue(flushExecuted); store.put("1", ValueAndTimestamp.make(1L, 2L)); assertTrue(putExecuted); @@ -387,7 +382,6 @@ public void localWindowStoreShouldNotAllowInitOrClose() { final WindowStore windowStore = mock(WindowStore.class); when(stateManager.store("LocalWindowStore")).thenAnswer(answer -> windowStoreMock(windowStore)); - mockStateStoreFlush(windowStore); doAnswer(answer -> { putExecuted = true; @@ -405,7 +399,6 @@ public void localWindowStoreShouldNotAllowInitOrClose() { verifyStoreCannotBeInitializedOrClosed(store); store.flush(); - assertTrue(flushExecuted); store.put("1", 1L, 1L); assertTrue(putExecuted); @@ -427,7 +420,6 @@ public void localTimestampedWindowStoreShouldNotAllowInitOrClose() { final TimestampedWindowStore windowStore = mock(TimestampedWindowStore.class); when(stateManager.store("LocalTimestampedWindowStore")).thenAnswer(answer -> timestampedWindowStoreMock(windowStore)); - mockStateStoreFlush(windowStore); doAnswer(answer -> { putExecuted = true; @@ -448,7 +440,6 @@ public void localTimestampedWindowStoreShouldNotAllowInitOrClose() { verifyStoreCannotBeInitializedOrClosed(store); store.flush(); - assertTrue(flushExecuted); store.put("1", ValueAndTimestamp.make(1L, 1L), 1L); assertTrue(putExecuted); @@ -473,7 +464,6 @@ public void localSessionStoreShouldNotAllowInitOrClose() { final SessionStore sessionStore = mock(SessionStore.class); when(stateManager.store("LocalSessionStore")).thenAnswer(answer -> sessionStoreMock(sessionStore)); - mockStateStoreFlush(sessionStore); doAnswer(answer -> { putExecuted = true; @@ -496,7 +486,6 @@ public void localSessionStoreShouldNotAllowInitOrClose() { verifyStoreCannotBeInitializedOrClosed(store); store.flush(); - assertTrue(flushExecuted); store.remove(null); assertTrue(removeExecuted); @@ -907,13 +896,6 @@ private void initStateStoreMock(final StateStore stateStore) { when(stateStore.isOpen()).thenReturn(true); } - private void mockStateStoreFlush(final StateStore stateStore) { - doAnswer(answer -> { - flushExecuted = true; - return null; - }).when(stateStore).flush(); - } - @SuppressWarnings("rawtypes") private void doTest(final String name, final Consumer checker) { final Processor processor = new Processor<>() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 8c28ae6a33dcb..93bd701a29e7e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -568,11 +568,11 @@ public void shouldFlushCheckpointAndClose() throws IOException { } finally { stateMgr.flush(); - assertTrue(persistentStore.flushed); - assertTrue(nonPersistentStore.flushed); + assertTrue(persistentStore.committed); + assertTrue(nonPersistentStore.committed); // make sure that flush is called in the proper order - assertThat(persistentStore.getLastFlushCount(), Matchers.lessThan(nonPersistentStore.getLastFlushCount())); + assertThat(persistentStore.getLastCommitCount(), Matchers.lessThan(nonPersistentStore.getLastCommitCount())); stateMgr.updateChangelogOffsets(ackedOffsets); stateMgr.checkpoint(); @@ -710,12 +710,12 @@ public void shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeen } @Test - public void shouldThrowProcessorStateExceptionOnFlushIfStoreThrowsAnException() { + public void shouldThrowProcessorStateExceptionOnCommitIfStoreThrowsAnException() { final RuntimeException exception = new RuntimeException("KABOOM!"); final ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE); final MockKeyValueStore stateStore = new MockKeyValueStore(persistentStoreName, true) { @Override - public void flush() { + public void commit(final Map changelogOffsets) { throw exception; } }; @@ -726,12 +726,12 @@ public void flush() { } @Test - public void shouldPreserveStreamsExceptionOnFlushIfStoreThrows() { + public void shouldPreserveStreamsExceptionOnCommitIfStoreThrows() { final StreamsException exception = new StreamsException("KABOOM!"); final ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE); final MockKeyValueStore stateStore = new MockKeyValueStore(persistentStoreName, true) { @Override - public void flush() { + public void commit(final Map changelogOffsets) { throw exception; } }; @@ -774,12 +774,12 @@ public void close() { } @Test - public void shouldThrowProcessorStateExceptionOnFlushIfStoreThrowsAFailedProcessingException() { + public void shouldThrowProcessorStateExceptionOnCommitIfStoreThrowsAFailedProcessingException() { final RuntimeException exception = new RuntimeException("KABOOM!"); final ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE); final MockKeyValueStore stateStore = new MockKeyValueStore(persistentStoreName, true) { @Override - public void flush() { + public void commit(final Map changelogOffsets) { throw new FailedProcessingException("processor", exception); } }; @@ -909,19 +909,19 @@ public void shouldThrowIfRestoreCallbackThrows() { } @Test - public void shouldFlushGoodStoresEvenSomeThrowsException() { - final AtomicBoolean flushedStore = new AtomicBoolean(false); + public void shouldCommitGoodStoresEvenSomeThrowsException() { + final AtomicBoolean committedStore = new AtomicBoolean(false); final MockKeyValueStore stateStore1 = new MockKeyValueStore(persistentStoreName, true) { @Override - public void flush() { + public void commit(final Map changelogOffsets) { throw new RuntimeException("KABOOM!"); } }; final MockKeyValueStore stateStore2 = new MockKeyValueStore(persistentStoreTwoName, true) { @Override - public void flush() { - flushedStore.set(true); + public void commit(final Map changelogOffsets) { + committedStore.set(true); } }; final ProcessorStateManager stateManager = getStateManager(Task.TaskType.ACTIVE); @@ -933,7 +933,7 @@ public void flush() { stateManager.flush(); } catch (final ProcessorStateException expected) { /* ignore */ } - assertTrue(flushedStore.get()); + assertTrue(committedStore.get()); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 78c6dedcbf45c..51cbe8f940da2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -64,7 +64,7 @@ /** * A component that provides a {@link #context() StateStoreContext} that can be supplied to a {@link KeyValueStore} so that - * all entries written to the Kafka topic by the store during {@link KeyValueStore#flush()} are captured for testing purposes. + * all entries written to the Kafka topic by the store during {@link KeyValueStore#commit(Map)} are captured for testing purposes. * This class simplifies testing of various {@link KeyValueStore} instances, especially those that use * {@link MeteredKeyValueStore} to monitor and write its entries to the Kafka topic. * @@ -93,29 +93,29 @@ * assertNull(store.get(3)); * store.delete(5); * - * // Flush the store and verify all current entries were properly flushed ... - * store.flush(); - * assertEquals("zero", driver.flushedEntryStored(0)); - * assertEquals("one", driver.flushedEntryStored(1)); - * assertEquals("two", driver.flushedEntryStored(2)); - * assertEquals("four", driver.flushedEntryStored(4)); - * assertNull(driver.flushedEntryStored(5)); + * // Commit the store and verify all current entries were properly committed ... + * store.commit(Map.of()); + * assertEquals("zero", driver.committedEntryStored(0)); + * assertEquals("one", driver.committedEntryStored(1)); + * assertEquals("two", driver.committedEntryStored(2)); + * assertEquals("four", driver.committedEntryStored(4)); + * assertNull(driver.committedEntryStored(5)); * - * assertEquals(false, driver.flushedEntryRemoved(0)); - * assertEquals(false, driver.flushedEntryRemoved(1)); - * assertEquals(false, driver.flushedEntryRemoved(2)); - * assertEquals(false, driver.flushedEntryRemoved(4)); - * assertEquals(true, driver.flushedEntryRemoved(5)); + * assertEquals(false, driver.committedEntryRemoved(0)); + * assertEquals(false, driver.committedEntryRemoved(1)); + * assertEquals(false, driver.committedEntryRemoved(2)); + * assertEquals(false, driver.committedEntryRemoved(4)); + * assertEquals(true, driver.committedEntryRemoved(5)); * * * *

Restoring a store

* This component can be used to test whether a {@link KeyValueStore} implementation properly * {@link StateStoreContext#register(StateStore, StateRestoreCallback) registers itself} with the {@link StateStoreContext}, so that - * the persisted contents of a store are properly restored from the flushed entries when the store instance is started. + * the persisted contents of a store are properly restored from the committed entries when the store instance is started. *

* To do this, create an instance of this driver component, {@link #addEntryToRestoreLog(Object, Object) add entries} that will be - * passed to the store upon creation (simulating the entries that were previously flushed to the topic), and then create the store + * passed to the store upon creation (simulating the entries that were previously committed to the topic), and then create the store * using this driver's {@link #context() ProcessorContext}: * *

@@ -189,8 +189,8 @@ public static  KeyValueStoreTestDriver create(final Serializer ke
         return new KeyValueStoreTestDriver<>(serdes);
     }
 
-    private final Map flushedEntries = new HashMap<>();
-    private final Set flushedRemovals = new HashSet<>();
+    private final Map committedEntries = new HashMap<>();
+    private final Set committedRemovals = new HashSet<>();
     private final List> restorableEntries = new LinkedList<>();
 
     private final InternalMockProcessorContext context;
@@ -243,7 +243,7 @@ public  void send(final String topic,
                 final K keyTest = serdes.keyFrom(keyBytes);
                 final V valueTest = serdes.valueFrom(valueBytes);
 
-                recordFlushed(keyTest, valueTest);
+                recordCommitted(keyTest, valueTest);
             }
 
             @Override
@@ -286,15 +286,15 @@ public Map appConfigsWithPrefix(final String prefix) {
         };
     }
 
-    private void recordFlushed(final K key, final V value) {
+    private void recordCommitted(final K key, final V value) {
         if (value == null) {
             // This is a removal ...
-            flushedRemovals.add(key);
-            flushedEntries.remove(key);
+            committedRemovals.add(key);
+            committedEntries.remove(key);
         } else {
             // This is a normal add
-            flushedEntries.put(key, value);
-            flushedRemovals.remove(key);
+            committedEntries.put(key, value);
+            committedRemovals.remove(key);
         }
     }
 
@@ -343,8 +343,8 @@ public void addEntryToRestoreLog(final K key, final V value) {
 
     /**
      * Get the context that should be supplied to a {@link KeyValueStore}'s constructor. This context records any messages
-     * written by the store to the Kafka topic, making them available via the {@link #flushedEntryStored(Object)} and
-     * {@link #flushedEntryRemoved(Object)} methods.
+     * written by the store to the Kafka topic, making them available via the {@link #committedEntryStored(Object)} and
+     * {@link #committedEntryRemoved(Object)} methods.
      * 

* If the {@link KeyValueStore}'s are to be restored upon its startup, be sure to {@link #addEntryToRestoreLog(Object, Object) * add the restore entries} before creating the store with the {@link StateStoreContext} returned by this method. @@ -395,47 +395,47 @@ public int sizeOf(final KeyValueStore store) { } /** - * Retrieve the value that the store {@link KeyValueStore#flush() flushed} with the given key. + * Retrieve the value that the store {@link KeyValueStore#commit(Map) committed} with the given key. * * @param key the key - * @return the value that was flushed with the key, or {@code null} if no such key was flushed or if the entry with this - * key was removed upon flush + * @return the value that was committed with the key, or {@code null} if no such key was committed or if the entry with this + * key was removed upon commit */ - public V flushedEntryStored(final K key) { - return flushedEntries.get(key); + public V committedEntryStored(final K key) { + return committedEntries.get(key); } /** - * Determine whether the store {@link KeyValueStore#flush() flushed} the removal of the given key. + * Determine whether the store {@link KeyValueStore#commit(Map) committed} the removal of the given key. * * @param key the key - * @return {@code true} if the entry with the given key was removed when flushed, or {@code false} if the entry was not - * removed when last flushed + * @return {@code true} if the entry with the given key was removed when committed, or {@code false} if the entry was not + * removed when last committed */ - public boolean flushedEntryRemoved(final K key) { - return flushedRemovals.contains(key); + public boolean committedEntryRemoved(final K key) { + return committedRemovals.contains(key); } /** * Return number of removed entry */ - public int numFlushedEntryStored() { - return flushedEntries.size(); + public int numCommittedEntryStored() { + return committedEntries.size(); } /** * Return number of removed entry */ - public int numFlushedEntryRemoved() { - return flushedRemovals.size(); + public int numCommittedEntryRemoved() { + return committedRemovals.size(); } /** - * Remove all {@link #flushedEntryStored(Object) flushed entries}, {@link #flushedEntryRemoved(Object) flushed removals}, + * Remove all {@link #committedEntryStored(Object) committed entries}, {@link #committedEntryRemoved(Object) committed removals}, */ public void clear() { restorableEntries.clear(); - flushedEntries.clear(); - flushedRemovals.clear(); + committedEntries.clear(); + committedRemovals.clear(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java index 5de4e65bd5615..c022da839f3f4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java @@ -16,12 +16,14 @@ */ package org.apache.kafka.streams.state; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.query.Position; import java.time.Instant; +import java.util.Map; import java.util.NoSuchElementException; public class NoOpWindowStore implements ReadOnlyWindowStore, StateStore { @@ -59,7 +61,7 @@ public String name() { public void init(final StateStoreContext stateStoreContext, final StateStore root) {} @Override - public void flush() { + public void commit(final Map changelogOffsets) { } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index 1175add263fec..3727add492d13 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -160,24 +160,24 @@ public void testPutGetRange() { assertNull(store.get(3)); assertEquals("four", store.get(4)); assertEquals("five", store.get(5)); - // Flush now so that for caching store, we will not skip the deletion following an put - store.flush(); + // Commit now so that for caching store, we will not skip the deletion following an put + store.commit(Map.of()); store.delete(5); assertEquals(4, driver.sizeOf(store)); - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - assertEquals("zero", driver.flushedEntryStored(0)); - assertEquals("one", driver.flushedEntryStored(1)); - assertEquals("two", driver.flushedEntryStored(2)); - assertEquals("four", driver.flushedEntryStored(4)); - assertNull(driver.flushedEntryStored(5)); + // Commit the store and verify all current entries were properly committed ... + store.commit(Map.of()); + assertEquals("zero", driver.committedEntryStored(0)); + assertEquals("one", driver.committedEntryStored(1)); + assertEquals("two", driver.committedEntryStored(2)); + assertEquals("four", driver.committedEntryStored(4)); + assertNull(driver.committedEntryStored(5)); - assertFalse(driver.flushedEntryRemoved(0)); - assertFalse(driver.flushedEntryRemoved(1)); - assertFalse(driver.flushedEntryRemoved(2)); - assertFalse(driver.flushedEntryRemoved(4)); - assertTrue(driver.flushedEntryRemoved(5)); + assertFalse(driver.committedEntryRemoved(0)); + assertFalse(driver.committedEntryRemoved(1)); + assertFalse(driver.committedEntryRemoved(2)); + assertFalse(driver.committedEntryRemoved(4)); + assertTrue(driver.committedEntryRemoved(5)); final HashMap expectedContents = new HashMap<>(); expectedContents.put(2, "two"); @@ -208,24 +208,24 @@ public void testPutGetReverseRange() { assertNull(store.get(3)); assertEquals("four", store.get(4)); assertEquals("five", store.get(5)); - // Flush now so that for caching store, we will not skip the deletion following an put - store.flush(); + // Commit now so that for caching store, we will not skip the deletion following an put + store.commit(Map.of()); store.delete(5); assertEquals(4, driver.sizeOf(store)); - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - assertEquals("zero", driver.flushedEntryStored(0)); - assertEquals("one", driver.flushedEntryStored(1)); - assertEquals("two", driver.flushedEntryStored(2)); - assertEquals("four", driver.flushedEntryStored(4)); - assertNull(driver.flushedEntryStored(5)); + // Commit the store and verify all current entries were properly committed ... + store.commit(Map.of()); + assertEquals("zero", driver.committedEntryStored(0)); + assertEquals("one", driver.committedEntryStored(1)); + assertEquals("two", driver.committedEntryStored(2)); + assertEquals("four", driver.committedEntryStored(4)); + assertNull(driver.committedEntryStored(5)); - assertFalse(driver.flushedEntryRemoved(0)); - assertFalse(driver.flushedEntryRemoved(1)); - assertFalse(driver.flushedEntryRemoved(2)); - assertFalse(driver.flushedEntryRemoved(4)); - assertTrue(driver.flushedEntryRemoved(5)); + assertFalse(driver.committedEntryRemoved(0)); + assertFalse(driver.committedEntryRemoved(1)); + assertFalse(driver.committedEntryRemoved(2)); + assertFalse(driver.committedEntryRemoved(4)); + assertTrue(driver.committedEntryRemoved(5)); final HashMap expectedContents = new HashMap<>(); expectedContents.put(2, "two"); @@ -256,22 +256,22 @@ public void testPutGetWithDefaultSerdes() { assertNull(store.get(3)); assertEquals("four", store.get(4)); assertEquals("five", store.get(5)); - store.flush(); + store.commit(Map.of()); store.delete(5); - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - assertEquals("zero", driver.flushedEntryStored(0)); - assertEquals("one", driver.flushedEntryStored(1)); - assertEquals("two", driver.flushedEntryStored(2)); - assertEquals("four", driver.flushedEntryStored(4)); - assertNull(driver.flushedEntryStored(5)); + // Commit the store and verify all current entries were properly committed ... + store.commit(Map.of()); + assertEquals("zero", driver.committedEntryStored(0)); + assertEquals("one", driver.committedEntryStored(1)); + assertEquals("two", driver.committedEntryStored(2)); + assertEquals("four", driver.committedEntryStored(4)); + assertNull(driver.committedEntryStored(5)); - assertFalse(driver.flushedEntryRemoved(0)); - assertFalse(driver.flushedEntryRemoved(1)); - assertFalse(driver.flushedEntryRemoved(2)); - assertFalse(driver.flushedEntryRemoved(4)); - assertTrue(driver.flushedEntryRemoved(5)); + assertFalse(driver.committedEntryRemoved(0)); + assertFalse(driver.committedEntryRemoved(1)); + assertFalse(driver.committedEntryRemoved(2)); + assertFalse(driver.committedEntryRemoved(4)); + assertTrue(driver.committedEntryRemoved(5)); } @Test @@ -332,17 +332,17 @@ public void testPutIfAbsent() { assertNull(store.get(3)); assertEquals("four", store.get(4)); - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - assertEquals("zero", driver.flushedEntryStored(0)); - assertEquals("one", driver.flushedEntryStored(1)); - assertEquals("two", driver.flushedEntryStored(2)); - assertEquals("four", driver.flushedEntryStored(4)); + // Commit the store and verify all current entries were properly committed ... + store.commit(Map.of()); + assertEquals("zero", driver.committedEntryStored(0)); + assertEquals("one", driver.committedEntryStored(1)); + assertEquals("two", driver.committedEntryStored(2)); + assertEquals("four", driver.committedEntryStored(4)); - assertFalse(driver.flushedEntryRemoved(0)); - assertFalse(driver.flushedEntryRemoved(1)); - assertFalse(driver.flushedEntryRemoved(2)); - assertFalse(driver.flushedEntryRemoved(4)); + assertFalse(driver.committedEntryRemoved(0)); + assertFalse(driver.committedEntryRemoved(1)); + assertFalse(driver.committedEntryRemoved(2)); + assertFalse(driver.committedEntryRemoved(4)); } @Test @@ -486,7 +486,7 @@ public void testSize() { store.put(2, "two"); store.put(4, "four"); store.put(5, "five"); - store.flush(); + store.commit(Map.of()); assertEquals(5, store.approximateNumEntries()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java index 66a896598f131..96995d047bac8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBWindowStoreTest.java @@ -34,6 +34,7 @@ import java.io.File; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import static java.time.Duration.ofMillis; @@ -439,7 +440,7 @@ public void testRolling() { ofEpochMilli(startTime + increment * 8 + WINDOW_SIZE)))); // check segment directories - windowStore.flush(); + windowStore.commit(Map.of()); assertEquals( Set.of( segments.segmentName(4), @@ -596,7 +597,7 @@ public void testRestore() throws Exception { windowStore.put(6, "six", startTime + increment * 6); windowStore.put(7, "seven", startTime + increment * 7); windowStore.put(8, "eight", startTime + increment * 8); - windowStore.flush(); + windowStore.commit(Map.of()); windowStore.close(); @@ -750,7 +751,7 @@ public void testRestore() throws Exception { ofEpochMilli(startTime + increment * 8 + WINDOW_SIZE)))); // check segment directories - windowStore.flush(); + windowStore.commit(Map.of()); assertEquals( Set.of( segments.segmentName(4L), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java index 1486cca8c7e0d..32c03005e1cb2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java @@ -235,8 +235,8 @@ public void testRangeAndSinglePointFetch() { ofEpochMilli(defaultStartTime + 12L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 12L + WINDOW_SIZE)))); - // Flush the store and verify all current entries were properly flushed ... - windowStore.flush(); + // Commit the store and verify all current entries were properly committed ... + windowStore.commit(Map.of()); final List> changeLog = new ArrayList<>(); for (final ProducerRecord record : recordCollector.collected()) { @@ -627,8 +627,8 @@ public void testPutAndFetchBefore() { Set.of(), valuesToSetAndCloseIterator(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 13L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 13L)))); - // Flush the store and verify all current entries were properly flushed ... - windowStore.flush(); + // Commit the store and verify all current entries were properly committed ... + windowStore.commit(Map.of()); final List> changeLog = new ArrayList<>(); for (final ProducerRecord record : recordCollector.collected()) { @@ -737,8 +737,8 @@ public void testPutAndFetchAfter() { valuesToSetAndCloseIterator(windowStore.fetch(2, ofEpochMilli(defaultStartTime + 12L), ofEpochMilli(defaultStartTime + 12L + WINDOW_SIZE)))); - // Flush the store and verify all current entries were properly flushed ... - windowStore.flush(); + // Commit the store and verify all current entries were properly committed ... + windowStore.commit(Map.of()); final List> changeLog = new ArrayList<>(); for (final ProducerRecord record : recordCollector.collected()) { @@ -806,8 +806,8 @@ public void testPutSameKeyTimestamp() { ofEpochMilli(defaultStartTime + 4L - WINDOW_SIZE), ofEpochMilli(defaultStartTime + 4L + WINDOW_SIZE)))); - // Flush the store and verify all current entries were properly flushed ... - windowStore.flush(); + // Commit the store and verify all current entries were properly committed ... + windowStore.commit(Map.of()); final List> changeLog = new ArrayList<>(); for (final ProducerRecord record : recordCollector.collected()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java index e2e37ad1e4ade..4a0579b8ef03d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java @@ -48,6 +48,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; @@ -124,7 +125,7 @@ public void shouldSetFlushListener() { @Test public void shouldAvoidFlushingDeletionsWithoutDirtyKeys() { final int added = addItemsToCache(); - // all dirty entries should have been flushed + // all dirty entries should have been committed assertEquals(added, underlyingStore.approximateNumEntries()); assertEquals(added, cacheFlushListener.forwarded.size()); @@ -133,7 +134,7 @@ public void shouldAvoidFlushingDeletionsWithoutDirtyKeys() { assertEquals(added, cacheFlushListener.forwarded.size()); store.put(bytesKey("key"), null); - store.flush(); + store.commit(Map.of()); assertEquals(added, underlyingStore.approximateNumEntries()); assertEquals(added, cacheFlushListener.forwarded.size()); } @@ -225,7 +226,7 @@ private void shouldMatchPositionAfterPut() { ); assertEquals(Position.emptyPosition(), underlyingStore.getPosition()); - store.flush(); + store.commit(Map.of()); assertEquals( Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))), @@ -263,7 +264,7 @@ public void shouldForwardDirtyItemToListenerWhenEvicted() { @Test public void shouldForwardDirtyItemsWhenFlushCalled() { store.put(bytesKey("1"), bytesValue("a")); - store.flush(); + store.commit(Map.of()); assertEquals("a", cacheFlushListener.forwarded.get("1").newValue); assertNull(cacheFlushListener.forwarded.get("1").oldValue); } @@ -272,23 +273,23 @@ public void shouldForwardDirtyItemsWhenFlushCalled() { public void shouldForwardOldValuesWhenEnabled() { store.setFlushListener(cacheFlushListener, true); store.put(bytesKey("1"), bytesValue("a")); - store.flush(); + store.commit(Map.of()); assertEquals("a", cacheFlushListener.forwarded.get("1").newValue); assertNull(cacheFlushListener.forwarded.get("1").oldValue); store.put(bytesKey("1"), bytesValue("b")); store.put(bytesKey("1"), bytesValue("c")); - store.flush(); + store.commit(Map.of()); assertEquals("c", cacheFlushListener.forwarded.get("1").newValue); assertEquals("a", cacheFlushListener.forwarded.get("1").oldValue); store.put(bytesKey("1"), null); - store.flush(); + store.commit(Map.of()); assertNull(cacheFlushListener.forwarded.get("1").newValue); assertEquals("c", cacheFlushListener.forwarded.get("1").oldValue); cacheFlushListener.forwarded.clear(); store.put(bytesKey("1"), bytesValue("a")); store.put(bytesKey("1"), bytesValue("b")); store.put(bytesKey("1"), null); - store.flush(); + store.commit(Map.of()); assertNull(cacheFlushListener.forwarded.get("1")); cacheFlushListener.forwarded.clear(); } @@ -296,22 +297,22 @@ public void shouldForwardOldValuesWhenEnabled() { @Test public void shouldNotForwardOldValuesWhenDisabled() { store.put(bytesKey("1"), bytesValue("a")); - store.flush(); + store.commit(Map.of()); assertEquals("a", cacheFlushListener.forwarded.get("1").newValue); assertNull(cacheFlushListener.forwarded.get("1").oldValue); store.put(bytesKey("1"), bytesValue("b")); - store.flush(); + store.commit(Map.of()); assertEquals("b", cacheFlushListener.forwarded.get("1").newValue); assertNull(cacheFlushListener.forwarded.get("1").oldValue); store.put(bytesKey("1"), null); - store.flush(); + store.commit(Map.of()); assertNull(cacheFlushListener.forwarded.get("1").newValue); assertNull(cacheFlushListener.forwarded.get("1").oldValue); cacheFlushListener.forwarded.clear(); store.put(bytesKey("1"), bytesValue("a")); store.put(bytesKey("1"), bytesValue("b")); store.put(bytesKey("1"), null); - store.flush(); + store.commit(Map.of()); assertNull(cacheFlushListener.forwarded.get("1")); cacheFlushListener.forwarded.clear(); } @@ -477,7 +478,7 @@ public void shouldDeleteItemsFromCache() { @Test public void shouldNotShowItemsDeletedFromCacheButFlushedToStoreBeforeDelete() { store.put(bytesKey("a"), bytesValue("a")); - store.flush(); + store.commit(Map.of()); store.delete(bytesKey("a")); assertNull(store.get(bytesKey("a"))); try (final KeyValueIterator iterator = store.range(bytesKey("a"), bytesKey("b"))) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java index af4dbf3a446c1..b68374f878631 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemorySessionStoreTest.java @@ -54,6 +54,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Random; import static java.util.Arrays.asList; @@ -166,7 +167,7 @@ private void shouldMatchPositionAfterPut() { assertEquals(Position.emptyPosition(), cachingStore.getPosition()); assertEquals(Position.emptyPosition(), underlyingStore.getPosition()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))), @@ -472,7 +473,7 @@ public void shouldFetchCorrectlyAcrossSegments() { cachingStore.put(a1, "1".getBytes()); cachingStore.put(a2, "2".getBytes()); cachingStore.put(a3, "3".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.put(a4, "4".getBytes()); cachingStore.put(a5, "5".getBytes()); cachingStore.put(a6, "6".getBytes()); @@ -499,7 +500,7 @@ public void shouldBackwardFetchCorrectlyAcrossSegments() { cachingStore.put(a1, "1".getBytes()); cachingStore.put(a2, "2".getBytes()); cachingStore.put(a3, "3".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.put(a4, "4".getBytes()); cachingStore.put(a5, "5".getBytes()); cachingStore.put(a6, "6".getBytes()); @@ -580,7 +581,7 @@ public void shouldForwardChangedValuesDuringFlush() { cachingStore.setFlushListener(flushListener, true); cachingStore.put(b, "1".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.singletonList( @@ -593,7 +594,7 @@ public void shouldForwardChangedValuesDuringFlush() { flushListener.forwarded.clear(); cachingStore.put(a, "1".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.singletonList( @@ -606,7 +607,7 @@ public void shouldForwardChangedValuesDuringFlush() { flushListener.forwarded.clear(); cachingStore.put(a, "2".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.singletonList( @@ -619,7 +620,7 @@ public void shouldForwardChangedValuesDuringFlush() { flushListener.forwarded.clear(); cachingStore.remove(a); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.singletonList( @@ -634,7 +635,7 @@ public void shouldForwardChangedValuesDuringFlush() { cachingStore.put(a, "1".getBytes()); cachingStore.put(a, "2".getBytes()); cachingStore.remove(a); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.emptyList(), @@ -654,13 +655,13 @@ public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() cachingStore.setFlushListener(flushListener, false); cachingStore.put(a, "1".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.put(a, "2".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.remove(a); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( asList(new KeyValueTimestamp<>( @@ -682,7 +683,7 @@ public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() cachingStore.put(a, "1".getBytes()); cachingStore.put(a, "2".getBytes()); cachingStore.remove(a); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.emptyList(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java index 159503b920a7e..cbbd7ca383e6a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentSessionStoreTest.java @@ -54,6 +54,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Random; import static java.util.Arrays.asList; @@ -165,7 +166,7 @@ private void shouldMatchPositionAfterPut() { assertEquals(Position.emptyPosition(), cachingStore.getPosition()); assertEquals(Position.emptyPosition(), underlyingStore.getPosition()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))), @@ -484,7 +485,7 @@ public void shouldFetchCorrectlyAcrossSegments() { cachingStore.put(a1, "1".getBytes()); cachingStore.put(a2, "2".getBytes()); cachingStore.put(a3, "3".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.put(a4, "4".getBytes()); cachingStore.put(a5, "5".getBytes()); cachingStore.put(a6, "6".getBytes()); @@ -511,7 +512,7 @@ public void shouldBackwardFetchCorrectlyAcrossSegments() { cachingStore.put(a1, "1".getBytes()); cachingStore.put(a2, "2".getBytes()); cachingStore.put(a3, "3".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.put(a4, "4".getBytes()); cachingStore.put(a5, "5".getBytes()); cachingStore.put(a6, "6".getBytes()); @@ -593,7 +594,7 @@ public void shouldForwardChangedValuesDuringFlush() { cachingStore.setFlushListener(flushListener, true); cachingStore.put(b, "1".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.singletonList( @@ -608,7 +609,7 @@ public void shouldForwardChangedValuesDuringFlush() { flushListener.forwarded.clear(); cachingStore.put(a, "1".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.singletonList( @@ -623,7 +624,7 @@ public void shouldForwardChangedValuesDuringFlush() { flushListener.forwarded.clear(); cachingStore.put(a, "2".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.singletonList( @@ -638,7 +639,7 @@ public void shouldForwardChangedValuesDuringFlush() { flushListener.forwarded.clear(); cachingStore.remove(a); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.singletonList( @@ -655,7 +656,7 @@ public void shouldForwardChangedValuesDuringFlush() { cachingStore.put(a, "1".getBytes()); cachingStore.put(a, "2".getBytes()); cachingStore.remove(a); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.emptyList(), @@ -675,13 +676,13 @@ public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() cachingStore.setFlushListener(flushListener, false); cachingStore.put(a, "1".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.put(a, "2".getBytes()); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.remove(a); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( asList( @@ -708,7 +709,7 @@ public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() cachingStore.put(a, "1".getBytes()); cachingStore.put(a, "2".getBytes()); cachingStore.remove(a); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Collections.emptyList(), diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java index 4548da5bd1134..29435b12f20b8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java @@ -64,6 +64,7 @@ import java.time.Instant; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.UUID; @@ -286,7 +287,7 @@ private void shouldMatchPositionAfterPut() { assertEquals(Position.emptyPosition(), cachingStore.getPosition()); assertEquals(Position.emptyPosition(), underlyingStore.getPosition()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))), @@ -630,7 +631,7 @@ public void shouldForwardDirtyItemsWhenFlushCalled() { final Windowed windowedKey = new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("a", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); } @@ -648,23 +649,23 @@ public void shouldForwardOldValuesWhenEnabled() { new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cacheListener.forwarded.clear(); cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue); assertEquals("b", cacheListener.forwarded.get(windowedKey).oldValue); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey).newValue); assertEquals("c", cacheListener.forwarded.get(windowedKey).oldValue); cacheListener.forwarded.clear(); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey)); cacheListener.forwarded.clear(); } @@ -675,22 +676,22 @@ public void shouldForwardOldValuesWhenDisabled() { new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cacheListener.forwarded.clear(); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey)); cacheListener.forwarded.clear(); } @@ -704,7 +705,7 @@ public void shouldForwardDirtyItemToListenerWhenEvicted() { @Test public void shouldTakeValueFromCacheIfSameTimestampFlushedToRocks() { cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); try (final WindowStoreIterator fetch = diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java index 9a23e657600cf..2e7f8519fabe4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java @@ -35,6 +35,8 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; +import java.util.Map; + import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.mockito.Mockito.times; @@ -175,10 +177,10 @@ public void shouldDelegateToUnderlyingStoreWhenBackwardFindingSessionRange() { } @Test - public void shouldFlushUnderlyingStore() { - store.flush(); + public void shouldCommitUnderlyingStore() { + store.commit(Map.of()); - verify(inner).flush(); + verify(inner).commit(Map.of()); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java index eb57bee38bf04..e8b0bb31b3811 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java @@ -39,6 +39,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.UUID; @@ -141,7 +142,7 @@ public void shouldReturnKeysWithGivenPrefix() { stringSerializer.serialize(null, "f"))); byteStore.putAll(entries); - byteStore.flush(); + byteStore.commit(Map.of()); final List valuesWithPrefix = new ArrayList<>(); int numberOfKeysReturned = 0; @@ -176,7 +177,7 @@ public void shouldReturnKeysWithGivenPrefixExcludingNextKeyLargestKey() { stringSerializer.serialize(null, "f"))); byteStore.putAll(entries); - byteStore.flush(); + byteStore.commit(Map.of()); try (final KeyValueIterator keysWithPrefixAsabcd = byteStore.prefixScan("abcd", stringSerializer)) { int numberOfKeysReturned = 0; @@ -206,7 +207,7 @@ public void shouldReturnUUIDsWithStringPrefix() { stringSerializer.serialize(null, "b"))); byteStore.putAll(entries); - byteStore.flush(); + byteStore.commit(Map.of()); final List valuesWithPrefix = new ArrayList<>(); int numberOfKeysReturned = 0; @@ -236,7 +237,7 @@ public void shouldReturnNoKeys() { new Bytes(stringSerializer.serialize(null, "c")), stringSerializer.serialize(null, "e"))); byteStore.putAll(entries); - byteStore.flush(); + byteStore.commit(Map.of()); int numberOfKeysReturned = 0; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java index eb23f80db729e..2e866bde22c65 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java @@ -27,6 +27,7 @@ import java.util.Arrays; import java.util.List; +import java.util.Map; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -102,35 +103,35 @@ public void testEvict() { assertEquals(10, driver.sizeOf(store)); store.put(10, "ten"); - store.flush(); + store.commit(Map.of()); assertEquals(10, driver.sizeOf(store)); - assertTrue(driver.flushedEntryRemoved(0)); - assertEquals(1, driver.numFlushedEntryRemoved()); + assertTrue(driver.committedEntryRemoved(0)); + assertEquals(1, driver.numCommittedEntryRemoved()); store.delete(1); - store.flush(); + store.commit(Map.of()); assertEquals(9, driver.sizeOf(store)); - assertTrue(driver.flushedEntryRemoved(0)); - assertTrue(driver.flushedEntryRemoved(1)); - assertEquals(2, driver.numFlushedEntryRemoved()); + assertTrue(driver.committedEntryRemoved(0)); + assertTrue(driver.committedEntryRemoved(1)); + assertEquals(2, driver.numCommittedEntryRemoved()); store.put(11, "eleven"); - store.flush(); + store.commit(Map.of()); assertEquals(10, driver.sizeOf(store)); - assertEquals(2, driver.numFlushedEntryRemoved()); + assertEquals(2, driver.numCommittedEntryRemoved()); store.put(2, "two-again"); - store.flush(); + store.commit(Map.of()); assertEquals(10, driver.sizeOf(store)); - assertEquals(2, driver.numFlushedEntryRemoved()); + assertEquals(2, driver.numCommittedEntryRemoved()); store.put(12, "twelve"); - store.flush(); + store.commit(Map.of()); assertEquals(10, driver.sizeOf(store)); - assertTrue(driver.flushedEntryRemoved(0)); - assertTrue(driver.flushedEntryRemoved(1)); - assertTrue(driver.flushedEntryRemoved(3)); - assertEquals(3, driver.numFlushedEntryRemoved()); + assertTrue(driver.committedEntryRemoved(0)); + assertTrue(driver.committedEntryRemoved(1)); + assertTrue(driver.committedEntryRemoved(3)); + assertEquals(3, driver.numCommittedEntryRemoved()); } @Test @@ -155,8 +156,8 @@ public void testRestoreEvict() { store = createKeyValueStore(driver.context()); context.restore(store.name(), driver.restoredEntries()); // Verify that the store's changelog does not get more appends ... - assertEquals(0, driver.numFlushedEntryStored()); - assertEquals(0, driver.numFlushedEntryRemoved()); + assertEquals(0, driver.numCommittedEntryStored()); + assertEquals(0, driver.numCommittedEntryRemoved()); // and there are no other entries ... assertEquals(10, driver.sizeOf(store)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java index f7018a7fae348..4d4eb9f33c265 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java @@ -37,6 +37,8 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; +import java.util.Map; + import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.nullValue; @@ -208,21 +210,21 @@ public void shouldInitVersionedStore() { } @Test - public void shouldFlushTimestampedStore() { + public void shouldCommitTimestampedStore() { givenWrapperWithTimestampedStore(); - wrapper.flush(); + wrapper.commit(Map.of()); - verify(timestampedStore).flush(); + verify(timestampedStore).commit(Map.of()); } @Test - public void shouldFlushVersionedStore() { + public void shouldCommitVersionedStore() { givenWrapperWithVersionedStore(); - wrapper.flush(); + wrapper.commit(Map.of()); - verify(versionedStore).flush(); + verify(versionedStore).commit(Map.of()); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index 4aba2784971bd..b43fa03b985b3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -317,12 +317,12 @@ public void shouldGetAllFromInnerStoreAndRecordAllMetric() { } @Test - public void shouldFlushInnerWhenFlushTimeRecords() { + public void shouldFlushInnerWhenCommitTimeRecords() { setUp(); - doNothing().when(inner).flush(); + doNothing().when(inner).commit(Map.of()); init(); - metered.flush(); + metered.commit(Map.of()); final KafkaMetric metric = metric("flush-rate"); assertTrue((Double) metric.metricValue() > 0); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java index 9b5d33db96642..0a0bf8c261df0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java @@ -357,12 +357,12 @@ public void shouldGetAllFromInnerStoreAndRecordAllMetric() { } @Test - public void shouldFlushInnerWhenFlushTimeRecords() { + public void shouldCommitInnerWhenCommitTimeRecords() { setUp(); - doNothing().when(inner).flush(); + doNothing().when(inner).commit(Map.of()); init(); - metered.flush(); + metered.commit(Map.of()); final KafkaMetric metric = metric("flush-rate"); assertTrue((Double) metric.metricValue() > 0); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java index 56fe45630d17d..14638efa95597 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java @@ -224,10 +224,10 @@ public void shouldDelegateAndRecordMetricsOnGetWithTimestamp() { } @Test - public void shouldDelegateAndRecordMetricsOnFlush() { - store.flush(); + public void shouldDelegateAndRecordMetricsOnCommit() { + store.commit(Map.of()); - verify(inner).flush(); + verify(inner).commit(Map.of()); assertThat((Double) getMetric("flush-rate").metricValue(), greaterThan(0.0)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 3cf17ff830eda..9c3a92e9eb95a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -349,11 +349,11 @@ public void shouldBackwardFetchAllFromInnerStoreAndRecordFetchMetrics() { } @Test - public void shouldRecordFlushLatency() { - doNothing().when(innerStoreMock).flush(); + public void shouldRecordCommitLatency() { + doNothing().when(innerStoreMock).commit(Map.of()); store.init(context, store); - store.flush(); + store.commit(Map.of()); // it suffices to verify one flush metric since all flush metrics are recorded by the same sensor // and the sensor is tested elsewhere diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java index 25d7fa3a68b13..fd8ff4eb5db45 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.internals.ApiUtils; @@ -381,7 +382,7 @@ public String name() { public void init(final StateStoreContext stateStoreContext, final StateStore root) {} @Override - public void flush() { + public void commit(final Map changelogOffsets) { } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 530a25b4e662d..d756d635af636 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -364,7 +364,7 @@ public void shouldNotSetCacheInValueProvidersWhenUserProvidesPlainTableFormatCon public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() { rocksDBStore.init(context, rocksDBStore); rocksDBStore.put(new Bytes("existingKey".getBytes(UTF_8)), "existingValue".getBytes(UTF_8)); - rocksDBStore.flush(); + rocksDBStore.commit(Map.of()); final List> restoreBytes = new ArrayList<>(); @@ -427,7 +427,7 @@ public void shouldPutAll() { rocksDBStore.init(context, rocksDBStore); rocksDBStore.putAll(entries); - rocksDBStore.flush(); + rocksDBStore.commit(Map.of()); assertEquals( "a", @@ -486,7 +486,7 @@ public void shouldReturnKeysWithGivenPrefix() { rocksDBStore.init(context, rocksDBStore); rocksDBStore.putAll(entries); - rocksDBStore.flush(); + rocksDBStore.commit(Map.of()); try (final KeyValueIterator keysWithPrefix = rocksDBStore.prefixScan("prefix", stringSerializer)) { final List valuesWithPrefix = new ArrayList<>(); @@ -521,7 +521,7 @@ public void shouldReturnKeysWithGivenPrefixExcludingNextKeyLargestKey() { rocksDBStore.init(context, rocksDBStore); rocksDBStore.putAll(entries); - rocksDBStore.flush(); + rocksDBStore.commit(Map.of()); try (final KeyValueIterator keysWithPrefixAsabcd = rocksDBStore.prefixScan("abcd", stringSerializer)) { int numberOfKeysReturned = 0; @@ -619,7 +619,7 @@ public void shouldReturnUUIDsWithStringPrefix() { rocksDBStore.init(context, rocksDBStore); rocksDBStore.putAll(entries); - rocksDBStore.flush(); + rocksDBStore.commit(Map.of()); try (final KeyValueIterator keysWithPrefix = rocksDBStore.prefixScan(prefix, stringSerializer)) { final List valuesWithPrefix = new ArrayList<>(); @@ -654,7 +654,7 @@ public void shouldReturnNoKeys() { stringSerializer.serialize(null, "e"))); rocksDBStore.init(context, rocksDBStore); rocksDBStore.putAll(entries); - rocksDBStore.flush(); + rocksDBStore.commit(Map.of()); try (final KeyValueIterator keysWithPrefix = rocksDBStore.prefixScan("d", stringSerializer)) { int numberOfKeysReturned = 0; @@ -868,7 +868,7 @@ public void shouldThrowProcessorStateExceptionOnPutDeletedDir() throws IOExcepti rocksDBStore.put( new Bytes(stringSerializer.serialize(null, "anyKey")), stringSerializer.serialize(null, "anyValue")); - assertThrows(ProcessorStateException.class, () -> rocksDBStore.flush()); + assertThrows(ProcessorStateException.class, () -> rocksDBStore.commit(Map.of())); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java index ffa509d518871..0253c6fc058c8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java @@ -63,6 +63,7 @@ import java.time.Instant; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.UUID; @@ -309,7 +310,7 @@ private void shouldMatchPositionAfterPut() { assertEquals(Position.emptyPosition(), cachingStore.getPosition()); assertEquals(Position.emptyPosition(), underlyingStore.getPosition()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))), @@ -675,12 +676,12 @@ public void shouldFlushEvictedItemsIntoUnderlyingStore(final boolean hasIndex) { @ParameterizedTest @ValueSource(booleans = {true, false}) - public void shouldForwardDirtyItemsWhenFlushCalled(final boolean hasIndex) { + public void shouldForwardDirtyItemsWhenCommitCalled(final boolean hasIndex) { setUp(hasIndex); final Windowed windowedKey = new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("a", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); } @@ -702,23 +703,23 @@ public void shouldForwardOldValuesWhenEnabled(final boolean hasIndex) { new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cacheListener.forwarded.clear(); cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue); assertEquals("b", cacheListener.forwarded.get(windowedKey).oldValue); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey).newValue); assertEquals("c", cacheListener.forwarded.get(windowedKey).oldValue); cacheListener.forwarded.clear(); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey)); cacheListener.forwarded.clear(); } @@ -731,22 +732,22 @@ public void shouldNotForwardOldValuesWhenDisabled(final boolean hasIndex) { new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cacheListener.forwarded.clear(); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey)); cacheListener.forwarded.clear(); } @@ -764,7 +765,7 @@ public void shouldForwardDirtyItemToListenerWhenEvicted(final boolean hasIndex) public void shouldTakeValueFromCacheIfSameTimestampFlushedToRocks(final boolean hasIndex) { setUp(hasIndex); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); try (final WindowStoreIterator fetch = diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java index 0a65d47d207f2..a3e21a5428823 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java @@ -47,6 +47,7 @@ import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.Random; @@ -311,7 +312,7 @@ public void shouldReturnPriorValueForBufferedKey(final String testName, final Fu @ParameterizedTest @MethodSource("parameters") - public void shouldFlush(final String testName, final Function bufferSupplier) { + public void shouldCommit(final String testName, final Function bufferSupplier) { setup(testName, bufferSupplier); final TimeOrderedKeyValueBuffer> buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); @@ -323,8 +324,8 @@ public void shouldFlush(final String testName, final Function bufferS // replace "deleteme" with a tombstone buffer.evictWhile(() -> buffer.minTimestamp() < 1, kv -> { }); - // flush everything to the changelog - buffer.flush(); + // commit everything to the changelog + buffer.commit(Map.of()); // the buffer should serialize the buffer time and the value as byte[], // which we can't compare for equality using ProducerRecord. diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java index 9d0db9bae0fbb..8bff63ddb4d3e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java @@ -62,6 +62,7 @@ import java.time.Instant; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.UUID; @@ -315,7 +316,7 @@ private void shouldMatchPositionAfterPut() { assertEquals(Position.emptyPosition(), cachingStore.getPosition()); assertEquals(Position.emptyPosition(), underlyingStore.getPosition()); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals( Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))), @@ -681,12 +682,12 @@ public void shouldFlushEvictedItemsIntoUnderlyingStore(final boolean hasIndex) { @ParameterizedTest @ValueSource(booleans = {true, false}) - public void shouldForwardDirtyItemsWhenFlushCalled(final boolean hasIndex) { + public void shouldForwardDirtyItemsWhenCommitCalled(final boolean hasIndex) { setUp(hasIndex); final Windowed windowedKey = new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("a", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); } @@ -708,23 +709,23 @@ public void shouldForwardOldValuesWhenEnabled(final boolean hasIndex) { new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cacheListener.forwarded.clear(); cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue); assertEquals("b", cacheListener.forwarded.get(windowedKey).oldValue); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey).newValue); assertEquals("c", cacheListener.forwarded.get(windowedKey).oldValue); cacheListener.forwarded.clear(); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey)); cacheListener.forwarded.clear(); } @@ -737,22 +738,22 @@ public void shouldForwardOldValuesWhenDisabled(final boolean hasIndex) { new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey).newValue); assertNull(cacheListener.forwarded.get(windowedKey).oldValue); cacheListener.forwarded.clear(); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); assertNull(cacheListener.forwarded.get(windowedKey)); cacheListener.forwarded.clear(); } @@ -770,7 +771,7 @@ public void shouldForwardDirtyItemToListenerWhenEvicted(final boolean hasIndex) public void shouldTakeValueFromCacheIfSameTimestampFlushedToRocks(final boolean hasIndex) { setUp(hasIndex); cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP); - cachingStore.flush(); + cachingStore.commit(Map.of()); cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP); try (final WindowStoreIterator fetch = diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java index f13add1dc414e..203aae869bafd 100644 --- a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.test; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; @@ -140,7 +141,7 @@ public long approximateNumEntries() { } @Override - public void flush() { + public void commit(final Map changelogOffsets) { // do-nothing since it is in-memory } diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java index e5d599032a53a..59880319583ad 100644 --- a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.test; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; @@ -142,7 +143,7 @@ public long approximateNumEntries() { } @Override - public void flush() { + public void commit(final Map changelogOffsets) { // do-nothing since it is in-memory } diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java index a2a6ac43dfd76..331a7c5a24b85 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.test; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.Serializer; @@ -29,18 +30,19 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; public class MockKeyValueStore implements KeyValueStore { - // keep a global counter of flushes and a local reference to which store had which - // flush, so we can reason about the order in which stores get flushed. - private static final AtomicInteger GLOBAL_FLUSH_COUNTER = new AtomicInteger(0); - private final AtomicInteger instanceLastFlushCount = new AtomicInteger(-1); + // keep a global counter of commits and a local reference to which store had which + // commit, so we can reason about the order in which stores get committed. + private static final AtomicInteger GLOBAL_COMMIT_COUNTER = new AtomicInteger(0); + private final AtomicInteger instanceLastCommitCount = new AtomicInteger(-1); private final String name; private final boolean persistent; public boolean initialized = false; - public boolean flushed = false; + public boolean committed = false; public boolean closed = true; public final ArrayList keys = new ArrayList<>(); public final ArrayList values = new ArrayList<>(); @@ -65,13 +67,13 @@ public void init(final StateStoreContext stateStoreContext, } @Override - public void flush() { - instanceLastFlushCount.set(GLOBAL_FLUSH_COUNTER.getAndIncrement()); - flushed = true; + public void commit(final Map changelogOffsets) { + instanceLastCommitCount.set(GLOBAL_COMMIT_COUNTER.getAndIncrement()); + committed = true; } - public int getLastFlushCount() { - return instanceLastFlushCount.get(); + public int getLastCommitCount() { + return instanceLastCommitCount.get(); } @Override diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java index aa8f4a5b8bcd6..2eed5faf63825 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.test; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; @@ -24,13 +25,14 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import java.io.File; +import java.util.Map; public class NoOpReadOnlyStore implements ReadOnlyKeyValueStore, StateStore { private final String name; private final boolean rocksdbStore; private boolean open = true; public boolean initialized; - public boolean flushed; + public boolean committed; public NoOpReadOnlyStore() { this("", false); @@ -89,8 +91,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo } @Override - public void flush() { - flushed = true; + public void commit(final Map changelogOffsets) { + committed = true; } @Override diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java index d7c5936b7d5e6..3a8be4e34e171 100644 --- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.test; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Windowed; @@ -29,6 +30,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; @@ -187,7 +189,7 @@ public String name() { public void init(StateStoreContext stateStoreContext, StateStore root) {} @Override - public void flush() { + public void commit(final Map changelogOffsets) { } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index f0e748f35ea0d..19187188643ea 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -1236,8 +1236,8 @@ public V delete(final K key) { } @Override - public void flush() { - inner.flush(); + public void commit(final Map changelogOffsets) { + inner.commit(changelogOffsets); } @Override @@ -1328,8 +1328,8 @@ public KeyValueIterator, V> backwardFetchAll(final long timeFrom, } @Override - public void flush() { - inner.flush(); + public void commit(final Map changelogOffsets) { + inner.commit(changelogOffsets); } @Override diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java index 3eadb857d22d4..5942322f103f5 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java @@ -26,6 +26,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Map; + import static java.util.Arrays.asList; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -98,9 +100,9 @@ public void shouldDeleteAndReturnPlainValue() { } @Test - public void shouldForwardFlush() { - keyValueStoreFacade.flush(); - verify(mockedKeyValueTimestampStore).flush(); + public void shouldForwardCommit() { + keyValueStoreFacade.commit(Map.of()); + verify(mockedKeyValueTimestampStore).commit(Map.of()); } @Test diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java index 719acd1bc12a9..7b64dc6e37485 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java @@ -26,6 +26,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Map; + import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Mockito.mock; @@ -62,9 +64,9 @@ public void shouldPutWindowStartTimestampWithUnknownTimestamp() { } @Test - public void shouldForwardFlush() { - windowStoreFacade.flush(); - verify(mockedWindowTimestampStore).flush(); + public void shouldForwardCommit() { + windowStoreFacade.commit(Map.of()); + verify(mockedWindowTimestampStore).commit(Map.of()); } @Test