Skip to content

Commit 093e3de

Browse files
committed
Fix pre-sorting of shards in the can_match phase (#53397)
This commit fixes a bug on sorted queries with a primary sort field that uses different types in the requested indices. In this scenario the returned min/max values to sort the shards are not comparable so we should avoid the sorting rather than throwing an obscure exception.
1 parent a0ba58a commit 093e3de

File tree

2 files changed

+87
-3
lines changed

2 files changed

+87
-3
lines changed

server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,9 @@
3131
import org.elasticsearch.search.sort.SortOrder;
3232
import org.elasticsearch.transport.Transport;
3333

34-
import java.util.Arrays;
3534
import java.util.Comparator;
3635
import java.util.List;
3736
import java.util.Map;
38-
import java.util.Objects;
3937
import java.util.Set;
4038
import java.util.concurrent.Executor;
4139
import java.util.function.BiFunction;
@@ -127,7 +125,18 @@ private static List<SearchShardIterator> sortShards(GroupShardsIterator<SearchSh
127125
}
128126

129127
private static boolean shouldSortShards(MinAndMax<?>[] minAndMaxes) {
130-
return Arrays.stream(minAndMaxes).anyMatch(Objects::nonNull);
128+
Class<?> clazz = null;
129+
for (MinAndMax<?> minAndMax : minAndMaxes) {
130+
if (clazz == null) {
131+
clazz = minAndMax == null ? null : minAndMax.getMin().getClass();
132+
} else if (minAndMax != null && clazz != minAndMax.getMin().getClass()) {
133+
// we don't support sort values that mix different types (e.g.: long/double, numeric/keyword).
134+
// TODO: we could fail the request because there is a high probability
135+
// that the merging of topdocs will fail later for the same reason ?
136+
return false;
137+
}
138+
}
139+
return clazz != null;
131140
}
132141

133142
private static Comparator<Integer> shardComparator(GroupShardsIterator<SearchShardIterator> shardsIts,

server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.action.search;
2020

21+
import org.apache.lucene.util.BytesRef;
2122
import org.elasticsearch.Version;
2223
import org.elasticsearch.action.ActionListener;
2324
import org.elasticsearch.action.OriginalIndices;
@@ -53,6 +54,8 @@
5354
import java.util.concurrent.atomic.AtomicReference;
5455
import java.util.stream.IntStream;
5556

57+
import static org.hamcrest.Matchers.equalTo;
58+
5659
public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
5760

5861
public void testFilterShards() throws InterruptedException {
@@ -349,4 +352,76 @@ public void run() {
349352
}
350353
}
351354
}
355+
356+
public void testInvalidSortShards() throws InterruptedException {
357+
final TransportSearchAction.SearchTimeProvider timeProvider =
358+
new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), System::nanoTime);
359+
360+
Map<String, Transport.Connection> lookup = new ConcurrentHashMap<>();
361+
DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
362+
DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
363+
lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode));
364+
lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode));
365+
366+
for (SortOrder order : SortOrder.values()) {
367+
int numShards = randomIntBetween(2, 20);
368+
List<ShardId> shardIds = new ArrayList<>();
369+
Set<ShardId> shardToSkip = new HashSet<>();
370+
371+
SearchTransportService searchTransportService = new SearchTransportService(null, null) {
372+
@Override
373+
public void sendCanMatch(Transport.Connection connection, ShardSearchRequest request, SearchTask task,
374+
ActionListener<SearchService.CanMatchResponse> listener) {
375+
final MinAndMax<?> minMax;
376+
if (request.shardId().id() == numShards-1) {
377+
minMax = MinAndMax.newMinMax(new BytesRef("bar"), new BytesRef("baz"));
378+
} else {
379+
Long min = randomLong();
380+
Long max = randomLongBetween(min, Long.MAX_VALUE);
381+
minMax = MinAndMax.newMinMax(min, max);
382+
}
383+
boolean canMatch = frequently();
384+
synchronized (shardIds) {
385+
shardIds.add(request.shardId());
386+
if (canMatch == false) {
387+
shardToSkip.add(request.shardId());
388+
}
389+
}
390+
new Thread(() -> listener.onResponse(new SearchService.CanMatchResponse(canMatch, minMax))).start();
391+
}
392+
};
393+
394+
AtomicReference<GroupShardsIterator<SearchShardIterator>> result = new AtomicReference<>();
395+
CountDownLatch latch = new CountDownLatch(1);
396+
GroupShardsIterator<SearchShardIterator> shardsIter = SearchAsyncActionTests.getShardsIter("logs",
397+
new OriginalIndices(new String[]{"logs"}, SearchRequest.DEFAULT_INDICES_OPTIONS),
398+
numShards, randomBoolean(), primaryNode, replicaNode);
399+
final SearchRequest searchRequest = new SearchRequest();
400+
searchRequest.source(new SearchSourceBuilder().sort(SortBuilders.fieldSort("timestamp").order(order)));
401+
searchRequest.allowPartialSearchResults(true);
402+
403+
CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase(logger,
404+
searchTransportService,
405+
(clusterAlias, node) -> lookup.get(node),
406+
Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)),
407+
Collections.emptyMap(), Collections.emptyMap(), EsExecutors.newDirectExecutorService(),
408+
searchRequest, null, shardsIter, timeProvider, 0, null,
409+
(iter) -> new SearchPhase("test") {
410+
@Override
411+
public void run() {
412+
result.set(iter);
413+
latch.countDown();
414+
}
415+
}, SearchResponse.Clusters.EMPTY);
416+
417+
canMatchPhase.start();
418+
latch.await();
419+
int shardId = 0;
420+
for (SearchShardIterator i : result.get()) {
421+
assertThat(i.shardId().id(), equalTo(shardId++));
422+
assertEquals(shardToSkip.contains(i.shardId()), i.skip());
423+
}
424+
assertThat(result.get().size(), equalTo(numShards));
425+
}
426+
}
352427
}

0 commit comments

Comments
 (0)