Skip to content

Commit 957bf16

Browse files
authored
Execute async cleanup tasks in CoordinatorTests (#91794) (#91807)
Closing the `JoinValidationService` will enqueue a cache-cleaning task which must be executed to release any pages held by the cache. This commit also introduces a deterministic leak detector to replace the one based on garbage collection, because in these tests we know exactly the expected lifecycle of all allocated pages. Closes #91599 Closes #91379 Closes #90576
1 parent 265f208 commit 957bf16

File tree

2 files changed

+70
-4
lines changed

2 files changed

+70
-4
lines changed

test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import org.elasticsearch.common.settings.Settings;
5454
import org.elasticsearch.common.transport.TransportAddress;
5555
import org.elasticsearch.common.unit.ByteSizeValue;
56-
import org.elasticsearch.common.util.MockPageCacheRecycler;
5756
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
5857
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
5958
import org.elasticsearch.common.util.set.Sets;
@@ -269,6 +268,7 @@ public class Cluster implements Releasable {
269268
private final Map<Long, ClusterState> committedStatesByVersion = new HashMap<>();
270269
private final LinearizabilityChecker linearizabilityChecker = new LinearizabilityChecker();
271270
private final History history = new History();
271+
private final CountingPageCacheRecycler countingPageCacheRecycler;
272272
private final Recycler<BytesRef> recycler;
273273
private final NodeHealthService nodeHealthService;
274274

@@ -287,9 +287,8 @@ public Cluster(int initialNodeCount, boolean allNodesMasterEligible, Settings no
287287

288288
Cluster(int initialNodeCount, boolean allNodesMasterEligible, Settings nodeSettings, NodeHealthService nodeHealthService) {
289289
this.nodeHealthService = nodeHealthService;
290-
this.recycler = usually()
291-
? BytesRefRecycler.NON_RECYCLING_INSTANCE
292-
: new BytesRefRecycler(new MockPageCacheRecycler(Settings.EMPTY));
290+
this.countingPageCacheRecycler = new CountingPageCacheRecycler();
291+
this.recycler = new BytesRefRecycler(countingPageCacheRecycler);
293292
deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
294293

295294
assertThat(initialNodeCount, greaterThan(0));
@@ -874,6 +873,12 @@ public void close() {
874873
}
875874

876875
clusterNodes.forEach(ClusterNode::close);
876+
877+
// Closing nodes may spawn some other background cleanup tasks that must also be run
878+
runFor(DEFAULT_DELAY_VARIABILITY, "accumulate close-time tasks");
879+
deterministicTaskQueue.runAllRunnableTasks();
880+
881+
countingPageCacheRecycler.assertAllPagesReleased();
877882
}
878883

879884
protected List<NamedWriteableRegistry.Entry> extraNamedWriteables() {
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.cluster.coordination;
10+
11+
import org.elasticsearch.common.recycler.Recycler;
12+
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.common.util.PageCacheRecycler;
14+
15+
import static org.junit.Assert.assertEquals;
16+
import static org.junit.Assert.assertFalse;
17+
18+
public class CountingPageCacheRecycler extends PageCacheRecycler {
19+
20+
private int openPages = 0;
21+
22+
public CountingPageCacheRecycler() {
23+
super(Settings.EMPTY);
24+
}
25+
26+
@Override
27+
public Recycler.V<byte[]> bytePage(boolean clear) {
28+
final var page = super.bytePage(clear);
29+
openPages += 1;
30+
return new Recycler.V<>() {
31+
boolean closed = false;
32+
33+
@Override
34+
public byte[] v() {
35+
return page.v();
36+
}
37+
38+
@Override
39+
public boolean isRecycled() {
40+
return page.isRecycled();
41+
}
42+
43+
@Override
44+
public void close() {
45+
assertFalse(closed);
46+
closed = true;
47+
openPages -= 1;
48+
page.close();
49+
}
50+
};
51+
}
52+
53+
@Override
54+
public Recycler.V<Object[]> objectPage() {
55+
throw new AssertionError("unexpected call to objectPage()");
56+
}
57+
58+
public void assertAllPagesReleased() {
59+
assertEquals(0, openPages);
60+
}
61+
}

0 commit comments

Comments
 (0)