Skip to content

Commit dd5d165

Browse files
authored
Prevent channel enqueue after selector close (#25478)
This commit adds additional protection to `ESSelector` and its implementations to ensure that channels are not enqueued after the selector is closed. After a channel has been added to the queue, we check that the selector is open. If it is not, then we remove the channel from the queue. If the channel is removed successfully, we throw an `IllegalStateException`.
1 parent 99aa04b commit dd5d165

File tree

3 files changed

+28
-7
lines changed

3 files changed

+28
-7
lines changed

test/framework/src/main/java/org/elasticsearch/transport/nio/AcceptingSelector.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ void cleanup() {
7979
*/
8080
public void registerServerChannel(NioServerSocketChannel serverSocketChannel) {
8181
newChannels.add(serverSocketChannel);
82+
ensureSelectorOpenForEnqueuing(newChannels, serverSocketChannel);
8283
wakeup();
8384
}
8485

test/framework/src/main/java/org/elasticsearch/transport/nio/ESSelector.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,8 @@ public void close(boolean shouldInterrupt) throws IOException {
146146
}
147147

148148
public void queueChannelClose(NioChannel channel) {
149-
ensureOpen();
150149
channelsToClose.offer(channel);
150+
ensureSelectorOpenForEnqueuing(channelsToClose, channel);
151151
wakeup();
152152
}
153153

@@ -180,17 +180,36 @@ public PlainActionFuture<Boolean> isRunningFuture() {
180180
return isRunningFuture;
181181
}
182182

183+
/**
184+
* This is a convenience method to be called after some object (normally channels) are enqueued with this
185+
* selector. This method will check if the selector is still open. If it is open, normal operation can
186+
* proceed.
187+
*
188+
* If the selector is closed, then we attempt to remove the object from the queue. If the removal
189+
* succeeds then we throw an {@link IllegalStateException} indicating that normal operation failed. If
190+
* the object cannot be removed from the queue, then the object has already been handled by the selector
191+
* and operation can proceed normally.
192+
*
193+
* If this method is called from the selector thread, we will not throw an exception as the selector
194+
* thread can manipulate its queues internally even if it is no longer open.
195+
*
196+
* @param queue the queue to which the object was added
197+
* @param objectAdded the objected added
198+
* @param <O> the object type
199+
*/
200+
<O> void ensureSelectorOpenForEnqueuing(ConcurrentLinkedQueue<O> queue, O objectAdded) {
201+
if (isClosed.get() && isOnCurrentThread() == false) {
202+
if (queue.remove(objectAdded)) {
203+
throw new IllegalStateException("selector is already closed");
204+
}
205+
}
206+
}
207+
183208
private void closeChannel(NioChannel channel) {
184209
try {
185210
eventHandler.handleClose(channel);
186211
} finally {
187212
registeredChannels.remove(channel);
188213
}
189214
}
190-
191-
private void ensureOpen() {
192-
if (isClosed.get()) {
193-
throw new IllegalStateException("selector is already closed");
194-
}
195-
}
196215
}

test/framework/src/main/java/org/elasticsearch/transport/nio/SocketSelector.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ void cleanup() {
8484
*/
8585
public void registerSocketChannel(NioSocketChannel nioSocketChannel) {
8686
newChannels.offer(nioSocketChannel);
87+
ensureSelectorOpenForEnqueuing(newChannels, nioSocketChannel);
8788
wakeup();
8889
}
8990

0 commit comments

Comments
 (0)