Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,7 @@ project(':clients') {
implementation libs.lz4
implementation libs.snappy
implementation libs.slf4jApi
implementation libs.crac

compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token parsing
compileOnly libs.jacksonJDK8Datatypes
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<!-- common library dependencies -->
<allow pkg="java" />
<allow pkg="javax.management" />
<allow pkg="org.crac" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
<allow pkg="org.opentest4j" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,13 @@ ClientRequest newClientRequest(String nodeId,
*/
boolean active();

/**
* Closes all network connections without closing the client completely.
*/
void suspend();

/**
* Attempts to reconnect to at least one node in the cluster.
*/
void resume();
}
17 changes: 17 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,23 @@ public boolean active() {
return state.get() == State.ACTIVE;
}

@Override
public void suspend() {
selector.closeAllConnections();
}

@Override
public void resume() {
long now = System.currentTimeMillis();
Node node = leastLoadedNode(now);
if (node == null) {
node = metadataUpdater.fetchNodes().stream().findAny().orElse(null);
}
if (node != null) {
initiateConnect(node, now);
}
}

private void ensureActive() {
if (!active())
throw new DisconnectException("NetworkClient is no longer active, state is " + state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.crac.Context;
import org.crac.Core;
import org.crac.Resource;
import org.slf4j.Logger;

import java.io.IOException;
Expand All @@ -62,14 +65,15 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Phaser;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
* requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.
*/
public class Sender implements Runnable {
public class Sender implements Runnable, Resource {

private final Logger log;

Expand Down Expand Up @@ -103,6 +107,8 @@ public class Sender implements Runnable {
/* true when the caller wants to ignore all unsent/inflight messages and force close. */
private volatile boolean forceClose;

private volatile Phaser checkpointPhaser;

/* metrics */
private final SenderMetrics sensors;

Expand Down Expand Up @@ -151,6 +157,7 @@ public Sender(LogContext logContext,
this.apiVersions = apiVersions;
this.transactionManager = transactionManager;
this.inFlightBatches = new HashMap<>();
Core.getGlobalContext().register(this);
}

public List<ProducerBatch> inFlightBatches(TopicPartition tp) {
Expand Down Expand Up @@ -296,6 +303,14 @@ public void run() {
*
*/
void runOnce() {
Phaser phaser = checkpointPhaser;
if (phaser != null) {
// first time we notify that checkpoint can progress
phaser.arriveAndAwaitAdvance();
// second time we wait for restore
phaser.arriveAndAwaitAdvance();
}

if (transactionManager != null) {
try {
transactionManager.maybeResolveSequences();
Expand Down Expand Up @@ -870,6 +885,21 @@ public static Sensor throttleTimeSensor(SenderMetricsRegistry metrics) {
return produceThrottleTimeSensor;
}

@Override
public void beforeCheckpoint(Context<? extends Resource> context) throws Exception {
checkpointPhaser = new Phaser(2);
checkpointPhaser.arriveAndAwaitAdvance();
client.suspend();
}

@Override
public void afterRestore(Context<? extends Resource> context) throws Exception {
Phaser phaser = checkpointPhaser;
checkpointPhaser = null;
client.resume();
phaser.arriveAndAwaitAdvance();
}

/**
* A collection of sensors for the sender
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,9 @@ public interface Selectable {
* @param id The id for the connection
*/
boolean isChannelReady(String id);

/**
* Drops all connections without closing the selector completely.
*/
void closeAllConnections();
}
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,20 @@ public boolean isChannelReady(String id) {
return channel != null && channel.ready();
}

@Override
public void closeAllConnections() {
List<String> connections = new ArrayList<>(channels.keySet());
AtomicReference<Throwable> firstException = new AtomicReference<>();
Utils.closeAllQuietly(firstException, "release connections",
connections.stream().map(id -> (AutoCloseable) () -> close(id)).toArray(AutoCloseable[]::new));
Throwable exception = firstException.get();
if (exception instanceof RuntimeException) {
throw (RuntimeException) exception;
} else if (exception != null) {
throw new RuntimeException(exception);
}
}

private KafkaChannel openOrClosingChannelOrFail(String id) {
KafkaChannel channel = this.channels.get(id);
if (channel == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,14 @@ public boolean active() {
return active;
}

@Override
public void suspend() {
}

@Override
public void resume() {
}

@Override
public void close() {
active = false;
Expand Down
4 changes: 4 additions & 0 deletions clients/src/test/java/org/apache/kafka/test/MockSelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ public boolean isChannelReady(String id) {
return ready.contains(id);
}

@Override
public void closeAllConnections() {
}

public void reset() {
clear();
initiatedSends.clear();
Expand Down
2 changes: 2 additions & 0 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ versions += [
bcpkix: "1.70",
checkstyle: "8.36.2",
commonsCli: "1.4",
crac: "0.1.3",
dropwizardMetrics: "4.1.12.1",
gradle: "8.0.2",
grgit: "4.1.1",
Expand Down Expand Up @@ -143,6 +144,7 @@ libs += [
argparse4j: "net.sourceforge.argparse4j:argparse4j:$versions.argparse4j",
bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix",
commonsCli: "commons-cli:commons-cli:$versions.commonsCli",
crac: "org.crac:crac:$versions.crac",
easymock: "org.easymock:easymock:$versions.easymock",
jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:$versions.jackson",
jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jacksonDatabind",
Expand Down