Skip to content

Commit 39839f9

Browse files
authored
TEST: Access cluster state directly in assertSeqNos (#33277)
Some AbstractDisruptionTestCase tests start failing since we enabled assertSeqNos (in #33130). They fail because the assertSeqNos assertion queries cluster stats while the cluster is disrupted or not formed yet. This commit switches to use the cluster state and shard stats directly from the test cluster. Closes #33251
1 parent 8a2d154 commit 39839f9

File tree

1 file changed

+40
-34
lines changed

1 file changed

+40
-34
lines changed

test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020
package org.elasticsearch.test;
2121

2222
import com.carrotsearch.hppc.ObjectLongMap;
23+
import com.carrotsearch.hppc.cursors.IntObjectCursor;
24+
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
2325
import com.carrotsearch.randomizedtesting.RandomizedContext;
2426
import com.carrotsearch.randomizedtesting.annotations.TestGroup;
2527
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
2628
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
2729
import org.apache.http.HttpHost;
2830
import org.apache.lucene.search.Sort;
31+
import org.apache.lucene.store.AlreadyClosedException;
2932
import org.apache.lucene.util.LuceneTestCase;
3033
import org.elasticsearch.ElasticsearchException;
3134
import org.elasticsearch.ExceptionsHelper;
@@ -48,10 +51,6 @@
4851
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
4952
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
5053
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
51-
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
52-
import org.elasticsearch.action.admin.indices.stats.IndexStats;
53-
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
54-
import org.elasticsearch.action.admin.indices.stats.ShardStats;
5554
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
5655
import org.elasticsearch.action.bulk.BulkRequestBuilder;
5756
import org.elasticsearch.action.bulk.BulkResponse;
@@ -187,7 +186,6 @@
187186
import java.util.List;
188187
import java.util.Locale;
189188
import java.util.Map;
190-
import java.util.Optional;
191189
import java.util.Random;
192190
import java.util.Set;
193191
import java.util.concurrent.Callable;
@@ -2328,40 +2326,48 @@ public static Index resolveIndex(String index) {
23282326

23292327
protected void assertSeqNos() throws Exception {
23302328
assertBusy(() -> {
2331-
IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get();
2332-
for (IndexStats indexStats : stats.getIndices().values()) {
2333-
for (IndexShardStats indexShardStats : indexStats.getIndexShards().values()) {
2334-
Optional<ShardStats> maybePrimary = Stream.of(indexShardStats.getShards())
2335-
.filter(s -> s.getShardRouting().active() && s.getShardRouting().primary())
2336-
.findFirst();
2337-
if (maybePrimary.isPresent() == false) {
2329+
final ClusterState state = clusterService().state();
2330+
for (ObjectObjectCursor<String, IndexRoutingTable> indexRoutingTable : state.routingTable().indicesRouting()) {
2331+
for (IntObjectCursor<IndexShardRoutingTable> indexShardRoutingTable : indexRoutingTable.value.shards()) {
2332+
ShardRouting primaryShardRouting = indexShardRoutingTable.value.primaryShard();
2333+
if (primaryShardRouting == null || primaryShardRouting.assignedToNode() == false) {
23382334
continue;
23392335
}
2340-
ShardStats primary = maybePrimary.get();
2341-
final SeqNoStats primarySeqNoStats = primary.getSeqNoStats();
2342-
final ShardRouting primaryShardRouting = primary.getShardRouting();
2336+
DiscoveryNode primaryNode = state.nodes().get(primaryShardRouting.currentNodeId());
2337+
IndexShard primaryShard = internalCluster().getInstance(IndicesService.class, primaryNode.getName())
2338+
.indexServiceSafe(primaryShardRouting.index()).getShard(primaryShardRouting.id());
2339+
final SeqNoStats primarySeqNoStats;
2340+
final ObjectLongMap<String> syncGlobalCheckpoints;
2341+
try {
2342+
primarySeqNoStats = primaryShard.seqNoStats();
2343+
syncGlobalCheckpoints = primaryShard.getInSyncGlobalCheckpoints();
2344+
} catch (AlreadyClosedException ex) {
2345+
continue; // shard is closed - just ignore
2346+
}
23432347
assertThat(primaryShardRouting + " should have set the global checkpoint",
2344-
primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)));
2345-
final DiscoveryNode node = clusterService().state().nodes().get(primaryShardRouting.currentNodeId());
2346-
final IndicesService indicesService =
2347-
internalCluster().getInstance(IndicesService.class, node.getName());
2348-
final IndexShard indexShard = indicesService.getShardOrNull(primaryShardRouting.shardId());
2349-
final ObjectLongMap<String> globalCheckpoints = indexShard.getInSyncGlobalCheckpoints();
2350-
for (ShardStats shardStats : indexShardStats) {
2351-
final SeqNoStats seqNoStats = shardStats.getSeqNoStats();
2352-
if (seqNoStats == null) {
2353-
continue; // this shard was closed
2348+
primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)));
2349+
for (ShardRouting replicaShardRouting : indexShardRoutingTable.value.replicaShards()) {
2350+
if (replicaShardRouting.assignedToNode() == false) {
2351+
continue;
2352+
}
2353+
DiscoveryNode replicaNode = state.nodes().get(replicaShardRouting.currentNodeId());
2354+
IndexShard replicaShard = internalCluster().getInstance(IndicesService.class, replicaNode.getName())
2355+
.indexServiceSafe(replicaShardRouting.index()).getShard(replicaShardRouting.id());
2356+
final SeqNoStats seqNoStats;
2357+
try {
2358+
seqNoStats = replicaShard.seqNoStats();
2359+
} catch (AlreadyClosedException e) {
2360+
continue; // shard is closed - just ignore
23542361
}
2355-
assertThat(shardStats.getShardRouting() + " local checkpoint mismatch",
2356-
seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint()));
2357-
assertThat(shardStats.getShardRouting() + " global checkpoint mismatch",
2358-
seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint()));
2359-
assertThat(shardStats.getShardRouting() + " max seq no mismatch",
2360-
seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo()));
2362+
assertThat(replicaShardRouting + " local checkpoint mismatch",
2363+
seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint()));
2364+
assertThat(replicaShardRouting + " global checkpoint mismatch",
2365+
seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint()));
2366+
assertThat(replicaShardRouting + " max seq no mismatch",
2367+
seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo()));
23612368
// the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard
2362-
assertThat(
2363-
seqNoStats.getGlobalCheckpoint(),
2364-
equalTo(globalCheckpoints.get(shardStats.getShardRouting().allocationId().getId())));
2369+
assertThat(replicaShardRouting + " global checkpoint syncs mismatch", seqNoStats.getGlobalCheckpoint(),
2370+
equalTo(syncGlobalCheckpoints.get(replicaShardRouting.allocationId().getId())));
23652371
}
23662372
}
23672373
}

0 commit comments

Comments
 (0)