Skip to content

Commit 9dfcc07

Browse files
committed
Fix pre-sorting of shards in the can_match phase (#53397)
This commit fixes a bug on sorted queries with a primary sort field that uses different types in the requested indices. In this scenario the returned min/max values to sort the shards are not comparable so we should avoid the sorting rather than throwing an obscure exception.
1 parent f696360 commit 9dfcc07

File tree

3 files changed

+89
-5
lines changed

3 files changed

+89
-5
lines changed

server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,9 @@
3232
import org.elasticsearch.search.sort.SortOrder;
3333
import org.elasticsearch.transport.Transport;
3434

35-
import java.util.Arrays;
3635
import java.util.Comparator;
3736
import java.util.List;
3837
import java.util.Map;
39-
import java.util.Objects;
4038
import java.util.Set;
4139
import java.util.concurrent.Executor;
4240
import java.util.function.BiFunction;
@@ -128,7 +126,18 @@ private static List<SearchShardIterator> sortShards(GroupShardsIterator<SearchSh
128126
}
129127

130128
private static boolean shouldSortShards(MinAndMax<?>[] minAndMaxes) {
131-
return Arrays.stream(minAndMaxes).anyMatch(Objects::nonNull);
129+
Class<?> clazz = null;
130+
for (MinAndMax<?> minAndMax : minAndMaxes) {
131+
if (clazz == null) {
132+
clazz = minAndMax == null ? null : minAndMax.getMin().getClass();
133+
} else if (minAndMax != null && clazz != minAndMax.getMin().getClass()) {
134+
// we don't support sort values that mix different types (e.g.: long/double, numeric/keyword).
135+
// TODO: we could fail the request because there is a high probability
136+
// that the merging of topdocs will fail later for the same reason ?
137+
return false;
138+
}
139+
}
140+
return clazz != null;
132141
}
133142

134143
private static Comparator<Integer> shardComparator(GroupShardsIterator<SearchShardIterator> shardsIts,

server/src/main/java/org/elasticsearch/search/sort/MinAndMax.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,14 @@ public void writeTo(StreamOutput out) throws IOException {
5555
/**
5656
* Return the minimum value.
5757
*/
58-
T getMin() {
58+
public T getMin() {
5959
return minValue;
6060
}
6161

6262
/**
6363
* Return the maximum value.
6464
*/
65-
T getMax() {
65+
public T getMax() {
6666
return maxValue;
6767
}
6868

server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.action.search;
2020

21+
import org.apache.lucene.util.BytesRef;
2122
import org.elasticsearch.Version;
2223
import org.elasticsearch.action.ActionListener;
2324
import org.elasticsearch.action.OriginalIndices;
@@ -54,6 +55,8 @@
5455
import java.util.concurrent.atomic.AtomicReference;
5556
import java.util.stream.IntStream;
5657

58+
import static org.hamcrest.Matchers.equalTo;
59+
5760
public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
5861

5962
public void testFilterShards() throws InterruptedException {
@@ -350,4 +353,76 @@ public void run() {
350353
}
351354
}
352355
}
356+
357+
public void testInvalidSortShards() throws InterruptedException {
358+
final TransportSearchAction.SearchTimeProvider timeProvider =
359+
new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), System::nanoTime);
360+
361+
Map<String, Transport.Connection> lookup = new ConcurrentHashMap<>();
362+
DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
363+
DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
364+
lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode));
365+
lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode));
366+
367+
for (SortOrder order : SortOrder.values()) {
368+
int numShards = randomIntBetween(2, 20);
369+
List<ShardId> shardIds = new ArrayList<>();
370+
Set<ShardId> shardToSkip = new HashSet<>();
371+
372+
SearchTransportService searchTransportService = new SearchTransportService(null, null) {
373+
@Override
374+
public void sendCanMatch(Transport.Connection connection, ShardSearchRequest request, SearchTask task,
375+
ActionListener<SearchService.CanMatchResponse> listener) {
376+
final MinAndMax<?> minMax;
377+
if (request.shardId().id() == numShards-1) {
378+
minMax = new MinAndMax<>(new BytesRef("bar"), new BytesRef("baz"));
379+
} else {
380+
Long min = randomLong();
381+
Long max = randomLongBetween(min, Long.MAX_VALUE);
382+
minMax = new MinAndMax<>(min, max);
383+
}
384+
boolean canMatch = frequently();
385+
synchronized (shardIds) {
386+
shardIds.add(request.shardId());
387+
if (canMatch == false) {
388+
shardToSkip.add(request.shardId());
389+
}
390+
}
391+
new Thread(() -> listener.onResponse(new SearchService.CanMatchResponse(canMatch, minMax))).start();
392+
}
393+
};
394+
395+
AtomicReference<GroupShardsIterator<SearchShardIterator>> result = new AtomicReference<>();
396+
CountDownLatch latch = new CountDownLatch(1);
397+
GroupShardsIterator<SearchShardIterator> shardsIter = SearchAsyncActionTests.getShardsIter("logs",
398+
new OriginalIndices(new String[]{"logs"}, SearchRequest.DEFAULT_INDICES_OPTIONS),
399+
numShards, randomBoolean(), primaryNode, replicaNode);
400+
final SearchRequest searchRequest = new SearchRequest();
401+
searchRequest.source(new SearchSourceBuilder().sort(SortBuilders.fieldSort("timestamp").order(order)));
402+
searchRequest.allowPartialSearchResults(true);
403+
404+
CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase(logger,
405+
searchTransportService,
406+
(clusterAlias, node) -> lookup.get(node),
407+
Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)),
408+
Collections.emptyMap(), Collections.emptyMap(), EsExecutors.newDirectExecutorService(),
409+
searchRequest, null, shardsIter, timeProvider, ClusterState.EMPTY_STATE, null,
410+
(iter) -> new SearchPhase("test") {
411+
@Override
412+
public void run() {
413+
result.set(iter);
414+
latch.countDown();
415+
}
416+
}, SearchResponse.Clusters.EMPTY);
417+
418+
canMatchPhase.start();
419+
latch.await();
420+
int shardId = 0;
421+
for (SearchShardIterator i : result.get()) {
422+
assertThat(i.shardId().id(), equalTo(shardId++));
423+
assertEquals(shardToSkip.contains(i.shardId()), i.skip());
424+
}
425+
assertThat(result.get().size(), equalTo(numShards));
426+
}
427+
}
353428
}

0 commit comments

Comments
 (0)