Skip to content

Commit 90fff54

Browse files
committed
Tie break on cluster alias when merging shard search failures (#38715)
A recent test failure triggered an edge case scenario where failures may be coming back with the same shard id, yet from different clusters. This commit adapts the failures comparator to take the cluster alias into account when merging failures as part of CCS requests execution. Also the corresponding test has been split in two: with and without search shard target set to the failure. Closes #38672
1 parent 6ae7915 commit 90fff54

File tree

2 files changed

+67
-11
lines changed

2 files changed

+67
-11
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,25 @@ public int compare(ShardSearchFailure o1, ShardSearchFailure o2) {
204204
if (shardId2 == null) {
205205
return 1;
206206
}
207-
return shardId1.compareTo(shardId2);
207+
int shardIdCompare = shardId1.compareTo(shardId2);
208+
//we could assume that the same shard id cannot come back from multiple clusters as even with same index name and shard index,
209+
//the index uuid does not match. But the same cluster can be registered multiple times with different aliases, in which case
210+
//we may get failures from the same index, yet with a different cluster alias in their shard target.
211+
if (shardIdCompare != 0) {
212+
return shardIdCompare;
213+
}
214+
String clusterAlias1 = o1.shard() == null ? null : o1.shard().getClusterAlias();
215+
String clusterAlias2 = o2.shard() == null ? null : o2.shard().getClusterAlias();
216+
if (clusterAlias1 == null && clusterAlias2 == null) {
217+
return 0;
218+
}
219+
if (clusterAlias1 == null) {
220+
return -1;
221+
}
222+
if (clusterAlias2 == null) {
223+
return 1;
224+
}
225+
return clusterAlias1.compareTo(clusterAlias2);
208226
}
209227

210228
private ShardId extractShardId(ShardSearchFailure failure) {

server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,14 @@ public void testMergeShardFailures() throws InterruptedException {
109109
SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0);
110110
SearchResponseMerger merger = new SearchResponseMerger(0, 0, SearchContext.TRACK_TOTAL_HITS_ACCURATE,
111111
searchTimeProvider, flag -> null);
112-
PriorityQueue<Tuple<ShardId, ShardSearchFailure>> priorityQueue = new PriorityQueue<>(Comparator.comparing(Tuple::v1));
112+
PriorityQueue<Tuple<SearchShardTarget, ShardSearchFailure>> priorityQueue = new PriorityQueue<>(Comparator.comparing(Tuple::v1,
113+
(o1, o2) -> {
114+
int compareTo = o1.getShardId().compareTo(o2.getShardId());
115+
if (compareTo != 0) {
116+
return compareTo;
117+
}
118+
return o1.getClusterAlias().compareTo(o2.getClusterAlias());
119+
}));
113120
int numIndices = numResponses * randomIntBetween(1, 3);
114121
Iterator<Map.Entry<String, Index[]>> indicesPerCluster = randomRealisticIndices(numIndices, numResponses).entrySet().iterator();
115122
for (int i = 0; i < numResponses; i++) {
@@ -120,15 +127,46 @@ public void testMergeShardFailures() throws InterruptedException {
120127
ShardSearchFailure[] shardSearchFailures = new ShardSearchFailure[numFailures];
121128
for (int j = 0; j < numFailures; j++) {
122129
ShardId shardId = new ShardId(randomFrom(indices), j);
123-
ShardSearchFailure failure;
124-
if (randomBoolean()) {
125-
SearchShardTarget searchShardTarget = new SearchShardTarget(randomAlphaOfLength(6), shardId, clusterAlias, null);
126-
failure = new ShardSearchFailure(new IllegalArgumentException(), searchShardTarget);
127-
} else {
128-
ElasticsearchException elasticsearchException = new ElasticsearchException(new IllegalArgumentException());
129-
elasticsearchException.setShard(shardId);
130-
failure = new ShardSearchFailure(elasticsearchException);
131-
}
130+
SearchShardTarget searchShardTarget = new SearchShardTarget(randomAlphaOfLength(6), shardId, clusterAlias, null);
131+
ShardSearchFailure failure = new ShardSearchFailure(new IllegalArgumentException(), searchShardTarget);
132+
shardSearchFailures[j] = failure;
133+
priorityQueue.add(Tuple.tuple(searchShardTarget, failure));
134+
}
135+
SearchResponse searchResponse = new SearchResponse(InternalSearchResponse.empty(), null,
136+
1, 1, 0, 100L, shardSearchFailures, SearchResponse.Clusters.EMPTY);
137+
addResponse(merger, searchResponse);
138+
}
139+
awaitResponsesAdded();
140+
assertEquals(numResponses, merger.numResponses());
141+
SearchResponse.Clusters clusters = SearchResponseTests.randomClusters();
142+
SearchResponse mergedResponse = merger.getMergedResponse(clusters);
143+
assertSame(clusters, mergedResponse.getClusters());
144+
assertEquals(numResponses, mergedResponse.getTotalShards());
145+
assertEquals(numResponses, mergedResponse.getSuccessfulShards());
146+
assertEquals(0, mergedResponse.getSkippedShards());
147+
assertEquals(priorityQueue.size(), mergedResponse.getFailedShards());
148+
ShardSearchFailure[] shardFailures = mergedResponse.getShardFailures();
149+
assertEquals(priorityQueue.size(), shardFailures.length);
150+
for (ShardSearchFailure shardFailure : shardFailures) {
151+
ShardSearchFailure expected = priorityQueue.poll().v2();
152+
assertSame(expected, shardFailure);
153+
}
154+
}
155+
156+
public void testMergeShardFailuresNullShardTarget() throws InterruptedException {
157+
SearchTimeProvider searchTimeProvider = new SearchTimeProvider(0, 0, () -> 0);
158+
SearchResponseMerger merger = new SearchResponseMerger(0, 0, SearchContext.TRACK_TOTAL_HITS_ACCURATE,
159+
searchTimeProvider, flag -> null);
160+
PriorityQueue<Tuple<ShardId, ShardSearchFailure>> priorityQueue = new PriorityQueue<>(Comparator.comparing(Tuple::v1));
161+
for (int i = 0; i < numResponses; i++) {
162+
int numFailures = randomIntBetween(1, 10);
163+
ShardSearchFailure[] shardSearchFailures = new ShardSearchFailure[numFailures];
164+
for (int j = 0; j < numFailures; j++) {
165+
String index = "index-" + i;
166+
ShardId shardId = new ShardId(index, index + "-uuid", j);
167+
ElasticsearchException elasticsearchException = new ElasticsearchException(new IllegalArgumentException());
168+
elasticsearchException.setShard(shardId);
169+
ShardSearchFailure failure = new ShardSearchFailure(elasticsearchException);
132170
shardSearchFailures[j] = failure;
133171
priorityQueue.add(Tuple.tuple(shardId, failure));
134172
}

0 commit comments

Comments
 (0)