Skip to content

Commit de69c3e

Browse files
committed
[FLINK-24213][qs] Use single lock in ServerConnection
1 parent fa2ab61 commit de69c3e

File tree

1 file changed

+10
-4
lines changed

1 file changed

+10
-4
lines changed

flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ServerConnection.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
final class ServerConnection<REQ extends MessageBody, RESP extends MessageBody> {
5151
private static final Logger LOG = LoggerFactory.getLogger(ServerConnection.class);
5252

53-
private final Object connectionLock = new Object();
53+
private final Object connectionLock;
5454

5555
@GuardedBy("connectionLock")
5656
private InternalConnection<REQ, RESP> internalConnection;
@@ -60,7 +60,8 @@ final class ServerConnection<REQ extends MessageBody, RESP extends MessageBody>
6060

6161
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
6262

63-
private ServerConnection(InternalConnection<REQ, RESP> internalConnection) {
63+
private ServerConnection(Object lock, InternalConnection<REQ, RESP> internalConnection) {
64+
this.connectionLock = lock;
6465
this.internalConnection = internalConnection;
6566
forwardCloseFuture();
6667
}
@@ -119,11 +120,14 @@ ServerConnection<REQ, RESP> createPendingConnection(
119120
final String clientName,
120121
final MessageSerializer<REQ, RESP> serializer,
121122
final KvStateRequestStats stats) {
123+
final Object lock = new Object();
124+
122125
return new ServerConnection<>(
126+
lock,
123127
new PendingConnection<>(
124128
channel ->
125129
new EstablishedConnection<>(
126-
clientName, serializer, channel, stats)));
130+
lock, clientName, serializer, channel, stats)));
127131
}
128132

129133
interface InternalConnection<REQ, RESP> {
@@ -288,7 +292,7 @@ public REQ getRequest() {
288292
private static class EstablishedConnection<REQ extends MessageBody, RESP extends MessageBody>
289293
implements ClientHandlerCallback<RESP>, InternalConnection<REQ, RESP> {
290294

291-
private final Object lock = new Object();
295+
private final Object lock;
292296

293297
/** The actual TCP channel. */
294298
private final Channel channel;
@@ -315,11 +319,13 @@ private static class EstablishedConnection<REQ extends MessageBody, RESP extends
315319
* @param channel The actual TCP channel
316320
*/
317321
EstablishedConnection(
322+
final Object lock,
318323
final String clientName,
319324
final MessageSerializer<REQ, RESP> serializer,
320325
final Channel channel,
321326
final KvStateRequestStats stats) {
322327

328+
this.lock = lock;
323329
this.channel = Preconditions.checkNotNull(channel);
324330

325331
// Add the client handler with the callback

0 commit comments

Comments
 (0)