Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,7 @@ public void process(final Record<Long, Long> record) {
sum += value;
}
state.put(key, sum);
state.flush();
state.commit(Map.of());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
Expand Down Expand Up @@ -383,7 +384,7 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo
}

@Override
public void flush() {
public void commit(final Map<TopicPartition, Long> changelogOffsets) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -358,7 +359,7 @@ public void process(final Record<Integer, Integer> record) {
}

store.put(key, value);
store.flush();
store.commit(Map.of());

if (key == KEY_1) {
// after error injection, we need to avoid a consecutive error after rebalancing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.integration;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
Expand Down Expand Up @@ -821,7 +822,7 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo
}

@Override
public void flush() {
public void commit(final Map<TopicPartition, Long> changelogOffsets) {
// do nothing
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.api.ProcessorContext;
Expand All @@ -26,6 +27,8 @@
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;

import java.util.Map;

/**
* A storage engine for managing state maintained by a stream processor.
* <p>
Expand Down Expand Up @@ -73,8 +76,55 @@ public interface StateStore {

/**
* Flush any cached data
*
* @deprecated Use {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() ProcessorContext#commit()}
* instead.
*/
@Deprecated
default void flush() {
// no-op
}

/**
* Commit all written records to this StateStore.
* <p>
* This method <b>CANNOT<b> be called by users from {@link org.apache.kafka.streams.processor.api.Processor
* processors}. Doing so will throw an {@link java.lang.UnsupportedOperationException}.
* <p>
* Instead, users should call {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit()
* ProcessorContext#commit()} to request a Task commit.
* <p>
* If {@link #managesOffsets()} returns {@code true}, the given {@code changelogOffsets} will be guaranteed to be
* persisted to disk along with the written records.
* <p>
* {@code changelogOffsets} will usually contain a single partition, in the case of a regular StateStore. However,
* they may contain multiple partitions in the case of a Global StateStore with multiple partitions. All provided
* partitions <em>MUST</em> be persisted to disk.
* <p>
* Implementations <em>SHOULD</em> ensure that {@code changelogOffsets} are committed to disk atomically with the
* records they represent, if possible.
*
* @param changelogOffsets The changelog offset(s) corresponding to the most recently written records.
*/
default void commit(final Map<TopicPartition, Long> changelogOffsets) {
flush();
}

/**
* Returns the most recently {@link #commit(Map) committed} offset for the given {@link TopicPartition}.
* <p>
* If {@link #managesOffsets()} and {@link #persistent()} both return {@code true}, this method will return the
* offset that corresponds to the changelog record most recently written to this store, for the given {@code
* partition}.
*
* @param partition The partition to get the committed offset for.
* @return The last {@link #commit(Map) committed} offset for the {@code partition}; or {@code null} if no offset
* has been committed for the partition, or if either {@link #persistent()} or {@link #managesOffsets()}
* return {@code false}.
*/
void flush();
default Long committedOffset(final TopicPartition partition) {
return null;
}

/**
* Close the storage engine.
Expand All @@ -93,6 +143,32 @@ public interface StateStore {
*/
boolean persistent();

/**
* Determines if this StateStore manages its own offsets.
* <p>
* If this method returns {@code true}, then offsets provided to {@link #commit(Map)} will be retrievable using
* {@link #committedOffset(TopicPartition)}.
* <p>
* If this method returns {@code false}, offsets provided to {@link #commit(Map)} will be ignored, and {@link
* #committedOffset(TopicPartition)} will be expected to always return {@code null}.
* <p>
* This method is provided to enable custom StateStores to opt-in to managing their own offsets. This is required,
* to ensure that custom StateStores provide the consistency guarantees that Kafka Streams expects when operating
* under an {@code exactly-once} {@code processing.guarantee}.
* <p>
* New implementations are required to implement this method and return {@code true}. Existing implementations
* should upgrade to managing their own offsets as soon as possible, as the legacy offset management is deprecated
* and will be removed in a future version.
*
* @deprecated New implementations should always return {@code true} and manage their own offsets. In the future,
* this method will be removed and it will be assumed to always return {@code true}.
* @return Whether this StateStore manages its own offsets.
*/
@Deprecated
default boolean managesOffsets() {
return false;
}

/**
* Is this store open for reading and writing
* @return {@code true} if the store is open
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
Expand All @@ -34,6 +35,7 @@
import org.apache.kafka.streams.state.internals.WrappedStateStore;

import java.util.List;
import java.util.Map;

abstract class AbstractReadOnlyDecorator<T extends StateStore, K, V> extends WrappedStateStore<T, K, V> {

Expand All @@ -44,6 +46,7 @@ private AbstractReadOnlyDecorator(final T inner) {
}

@Override
@Deprecated
public void flush() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
Expand All @@ -54,6 +57,11 @@ public void init(final StateStoreContext stateStoreContext,
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public void commit(final Map<TopicPartition, Long> changelogOffsets) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public void close() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
Expand All @@ -34,6 +35,7 @@
import org.apache.kafka.streams.state.internals.WrappedStateStore;

import java.util.List;
import java.util.Map;

abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends WrappedStateStore<T, K, V> {
static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams";
Expand All @@ -48,6 +50,11 @@ public void init(final StateStoreContext stateStoreContext,
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public void commit(final Map<TopicPartition, Long> changelogOffsets) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public void close() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,16 +464,16 @@ private long maybeUpdateDeadlineOrThrow(final long currentDeadlineMs) {

@Override
public void flush() {
log.debug("Flushing all global globalStores registered in the state manager");
log.debug("Committing all global globalStores registered in the state manager");
for (final Map.Entry<String, Optional<StateStore>> entry : globalStores.entrySet()) {
if (entry.getValue().isPresent()) {
final StateStore store = entry.getValue().get();
try {
log.trace("Flushing global store={}", store.name());
store.flush();
log.trace("Committing global store={}", store.name());
store.commit(Map.of());
} catch (final RuntimeException e) {
throw new ProcessorStateException(
String.format("Failed to flush global state store %s", store.name()),
String.format("Failed to commit global state store %s", store.name()),
e
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,36 +521,36 @@ void restore(final StateStoreMetadata storeMetadata, final List<ConsumerRecord<b

/**
* @throws TaskMigratedException recoverable error sending changelog records that would cause the task to be removed
* @throws StreamsException fatal error when flushing the state store, for example sending changelog records failed
* or flushing state store get IO errors; such error should cause the thread to die
* @throws StreamsException fatal error when commmitting the state store, for example sending changelog records failed
* or committing state store get IO errors; such error should cause the thread to die
*/
@Override
public void flush() {
RuntimeException firstException = null;
// attempting to flush the stores
// attempting to commit the stores
if (!stores.isEmpty()) {
log.debug("Flushing all stores registered in the state manager: {}", stores);
log.debug("Committing all stores registered in the state manager: {}", stores);
for (final StateStoreMetadata metadata : stores.values()) {
final StateStore store = metadata.stateStore;
log.trace("Flushing store {}", store.name());
log.trace("Committing store {}", store.name());
try {
store.flush();
store.commit(Map.of());
} catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by Streams itself
// In case of FailedProcessingException Do not keep the failed processing exception in the stack trace
if (exception instanceof FailedProcessingException)
firstException = new ProcessorStateException(
format("%sFailed to flush state store %s", logPrefix, store.name()),
format("%sFailed to commit state store %s", logPrefix, store.name()),
exception.getCause());
else if (exception instanceof StreamsException)
firstException = exception;
else
firstException = new ProcessorStateException(
format("%sFailed to flush state store %s", logPrefix, store.name()), exception);
log.error("Failed to flush state store {}: ", store.name(), firstException);
format("%sFailed to commit state store %s", logPrefix, store.name()), exception);
log.error("Failed to commit state store {}: ", store.name(), firstException);
} else {
log.error("Failed to flush state store {}: ", store.name(), exception);
log.error("Failed to commit state store {}: ", store.name(), exception);
}
}
}
Expand All @@ -570,9 +570,9 @@ public void flushCache() {
final StateStore store = metadata.stateStore;

try {
// buffer should be flushed to send all records to changelog
// buffer should be committed to send all records to changelog
if (store instanceof TimeOrderedKeyValueBuffer) {
store.flush();
store.commit(Map.of());
} else if (store instanceof CachedStateStore) {
((CachedStateStore<?, ?>) store).flushCache();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -277,8 +278,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo
}

@Override
public void flush() {
segments.flush();
public void commit(final Map<TopicPartition, Long> changelogOffsets) {
segments.commit(changelogOffsets);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
Expand Down Expand Up @@ -326,8 +327,8 @@ public void init(final StateStoreContext stateStoreContext, final StateStore roo
}

@Override
public void flush() {
segments.flush();
public void commit(final Map<TopicPartition, Long> changelogOffsets) {
segments.commit(changelogOffsets);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.query.Position;
Expand Down Expand Up @@ -160,9 +161,9 @@ public List<S> allSegments(final boolean forward) {
}

@Override
public void flush() {
public void commit(final Map<TopicPartition, Long> changelogOffsets) {
for (final S segment : segments.values()) {
segment.flush();
segment.commit(changelogOffsets);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -452,13 +453,13 @@ public long approximateNumEntries() {
}

@Override
public void flush() {
public void commit(final Map<TopicPartition, Long> changelogOffsets) {
validateStoreOpen();
lock.writeLock().lock();
try {
validateStoreOpen();
internalContext.cache().flush(cacheName);
wrapped().flush();
wrapped().commit(changelogOffsets);
} finally {
lock.writeLock().unlock();
}
Expand Down
Loading