diff --git a/build.gradle b/build.gradle
index 773aa7a776962..3527abbf914b1 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1330,6 +1330,7 @@ project(':clients') {
implementation libs.lz4
implementation libs.snappy
implementation libs.slf4jApi
+ implementation libs.crac
compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token parsing
compileOnly libs.jacksonJDK8Datatypes
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index c3318807f206f..05c5059023660 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -25,6 +25,7 @@
+
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 18a7eefe2022b..9618b2d9373f8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -213,4 +213,13 @@ ClientRequest newClientRequest(String nodeId,
*/
boolean active();
+ /**
+ * Closes all network connections without closing the client completely.
+ */
+ void suspend();
+
+ /**
+ * Attempts to reconnect to at least one node in the cluster.
+ */
+ void resume();
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 433440ce93325..30d442cd8e93b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -649,6 +649,23 @@ public boolean active() {
return state.get() == State.ACTIVE;
}
+ @Override
+ public void suspend() {
+ selector.closeAllConnections();
+ }
+
+ @Override
+ public void resume() {
+ long now = System.currentTimeMillis();
+ Node node = leastLoadedNode(now);
+ if (node == null) {
+ node = metadataUpdater.fetchNodes().stream().findAny().orElse(null);
+ }
+ if (node != null) {
+ initiateConnect(node, now);
+ }
+ }
+
private void ensureActive() {
if (!active())
throw new DisconnectException("NetworkClient is no longer active, state is " + state);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index ac19c6252b7ca..e54d34cf76281 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -52,6 +52,9 @@
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
+import org.crac.Context;
+import org.crac.Core;
+import org.crac.Resource;
import org.slf4j.Logger;
import java.io.IOException;
@@ -62,6 +65,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.Phaser;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -69,7 +73,7 @@
* The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
* requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.
*/
-public class Sender implements Runnable {
+public class Sender implements Runnable, Resource {
private final Logger log;
@@ -103,6 +107,8 @@ public class Sender implements Runnable {
/* true when the caller wants to ignore all unsent/inflight messages and force close. */
private volatile boolean forceClose;
+ private volatile Phaser checkpointPhaser;
+
/* metrics */
private final SenderMetrics sensors;
@@ -151,6 +157,7 @@ public Sender(LogContext logContext,
this.apiVersions = apiVersions;
this.transactionManager = transactionManager;
this.inFlightBatches = new HashMap<>();
+ Core.getGlobalContext().register(this);
}
public List inFlightBatches(TopicPartition tp) {
@@ -296,6 +303,14 @@ public void run() {
*
*/
void runOnce() {
+ Phaser phaser = checkpointPhaser;
+ if (phaser != null) {
+ // first time we notify that checkpoint can progress
+ phaser.arriveAndAwaitAdvance();
+ // second time we wait for restore
+ phaser.arriveAndAwaitAdvance();
+ }
+
if (transactionManager != null) {
try {
transactionManager.maybeResolveSequences();
@@ -870,6 +885,21 @@ public static Sensor throttleTimeSensor(SenderMetricsRegistry metrics) {
return produceThrottleTimeSensor;
}
+ @Override
+ public void beforeCheckpoint(Context extends Resource> context) throws Exception {
+ checkpointPhaser = new Phaser(2);
+ checkpointPhaser.arriveAndAwaitAdvance();
+ client.suspend();
+ }
+
+ @Override
+ public void afterRestore(Context extends Resource> context) throws Exception {
+ Phaser phaser = checkpointPhaser;
+ checkpointPhaser = null;
+ client.resume();
+ phaser.arriveAndAwaitAdvance();
+ }
+
/**
* A collection of sensors for the sender
*/
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
index afdd42e4a9c29..8d4d652e74d11 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
@@ -120,4 +120,9 @@ public interface Selectable {
* @param id The id for the connection
*/
boolean isChannelReady(String id);
+
+ /**
+ * Drops all connections without closing the selector completely.
+ */
+ void closeAllConnections();
}
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 0a047d5a57e0e..bc338b2fef595 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -976,6 +976,20 @@ public boolean isChannelReady(String id) {
return channel != null && channel.ready();
}
+ @Override
+ public void closeAllConnections() {
+ List connections = new ArrayList<>(channels.keySet());
+ AtomicReference firstException = new AtomicReference<>();
+ Utils.closeAllQuietly(firstException, "release connections",
+ connections.stream().map(id -> (AutoCloseable) () -> close(id)).toArray(AutoCloseable[]::new));
+ Throwable exception = firstException.get();
+ if (exception instanceof RuntimeException) {
+ throw (RuntimeException) exception;
+ } else if (exception != null) {
+ throw new RuntimeException(exception);
+ }
+ }
+
private KafkaChannel openOrClosingChannelOrFail(String id) {
KafkaChannel channel = this.channels.get(id);
if (channel == null)
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 6d404a906aa69..3e29fa534acc3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -571,6 +571,14 @@ public boolean active() {
return active;
}
+ @Override
+ public void suspend() {
+ }
+
+ @Override
+ public void resume() {
+ }
+
@Override
public void close() {
active = false;
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index 3560c3535aa73..31c0a89bb5482 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -235,6 +235,10 @@ public boolean isChannelReady(String id) {
return ready.contains(id);
}
+ @Override
+ public void closeAllConnections() {
+ }
+
public void reset() {
clear();
initiatedSends.clear();
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 23636469f71c4..9af149cbc410f 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -61,6 +61,7 @@ versions += [
bcpkix: "1.70",
checkstyle: "8.36.2",
commonsCli: "1.4",
+ crac: "0.1.3",
dropwizardMetrics: "4.1.12.1",
gradle: "8.0.2",
grgit: "4.1.1",
@@ -143,6 +144,7 @@ libs += [
argparse4j: "net.sourceforge.argparse4j:argparse4j:$versions.argparse4j",
bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix",
commonsCli: "commons-cli:commons-cli:$versions.commonsCli",
+ crac: "org.crac:crac:$versions.crac",
easymock: "org.easymock:easymock:$versions.easymock",
jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson",
jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jacksonDatabind",