Skip to content

Commit 2dc728d

Browse files
authored
Add support for multiple rollup searches. (#21755)
1 parent 53595bd commit 2dc728d

File tree

3 files changed

+70
-25
lines changed

3 files changed

+70
-25
lines changed

src/ui/public/courier/search_strategy/default_search_strategy.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,24 +33,23 @@ function getAllFetchParams(searchRequests, Promise) {
3333
}
3434

3535
async function serializeAllFetchParams(fetchParams, searchRequests, serializeFetchParams) {
36-
const searcRequestsWithFetchParams = [];
36+
const searchRequestsWithFetchParams = [];
3737
const failedSearchRequests = [];
3838

3939
// Gather the fetch param responses from all the successful requests.
4040
fetchParams.forEach((result, index) => {
4141
if (result.resolved) {
42-
searcRequestsWithFetchParams.push(result.resolved);
42+
searchRequestsWithFetchParams.push(result.resolved);
4343
} else {
4444
const searchRequest = searchRequests[index];
4545

46-
// TODO: All strategies will need to implement this.
4746
searchRequest.handleFailure(result.rejected);
4847
failedSearchRequests.push(searchRequest);
4948
}
5049
});
5150

5251
return {
53-
serializedFetchParams: await serializeFetchParams(searcRequestsWithFetchParams),
52+
serializedFetchParams: await serializeFetchParams(searchRequestsWithFetchParams),
5453
failedSearchRequests,
5554
};
5655
}

x-pack/plugins/rollup/public/search/rollup_search_strategy.js

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,52 @@
77
import { kfetchAbortable } from 'ui/kfetch';
88
import { SearchError } from 'ui/courier';
99

10-
export const rollupSearchStrategy = {
11-
id: 'rollup',
10+
function getAllFetchParams(searchRequests, Promise) {
11+
return Promise.map(searchRequests, (searchRequest) => {
12+
return Promise.try(searchRequest.getFetchParams, void 0, searchRequest)
13+
.then((fetchParams) => {
14+
return (searchRequest.fetchParams = fetchParams);
15+
})
16+
.then(value => ({ resolved: value }))
17+
.catch(error => ({ rejected: error }));
18+
});
19+
}
1220

13-
search: async ({ searchRequests, Promise }) => {
14-
// TODO: Batch together requests to hit a bulk rollup search endpoint.
15-
const searchRequest = searchRequests[0];
16-
const searchParams = await searchRequest.getFetchParams();
17-
const indexPattern = searchParams.index.title || searchParams.index;
21+
async function serializeAllFetchParams(fetchParams, searchRequests) {
22+
const searchRequestsWithFetchParams = [];
23+
const failedSearchRequests = [];
24+
25+
// Gather the fetch param responses from all the successful requests.
26+
fetchParams.forEach((result, index) => {
27+
if (result.resolved) {
28+
searchRequestsWithFetchParams.push(result.resolved);
29+
} else {
30+
const searchRequest = searchRequests[index];
31+
32+
searchRequest.handleFailure(result.rejected);
33+
failedSearchRequests.push(searchRequest);
34+
}
35+
});
36+
37+
const serializedFetchParams = serializeFetchParams(searchRequestsWithFetchParams);
38+
39+
return {
40+
serializedFetchParams,
41+
failedSearchRequests,
42+
};
43+
}
44+
45+
function serializeFetchParams(searchRequestsWithFetchParams) {
46+
return JSON.stringify(searchRequestsWithFetchParams.map(searchRequestWithFetchParams => {
47+
const indexPattern = searchRequestWithFetchParams.index.title || searchRequestWithFetchParams.index;
1848
const {
1949
body: {
2050
size,
2151
aggs,
2252
query: _query,
2353
},
24-
} = searchParams;
54+
index,
55+
} = searchRequestWithFetchParams;
2556

2657
// TODO: Temporarily automatically assign same timezone and interval as what's defined by
2758
// the rollup job. This should be done by the visualization itself.
@@ -30,7 +61,7 @@ export const rollupSearchStrategy = {
3061

3162
Object.keys(subAggs).forEach(subAggName => {
3263
if (subAggName === 'date_histogram') {
33-
const dateHistogramAgg = searchRequest.source.getField('index').typeMeta.aggs.date_histogram;
64+
const dateHistogramAgg = index.typeMeta.aggs.date_histogram;
3465
const subAgg = subAggs[subAggName];
3566
const field = subAgg.field;
3667
subAgg.time_zone = dateHistogramAgg[field].time_zone;
@@ -39,30 +70,43 @@ export const rollupSearchStrategy = {
3970
});
4071
});
4172

42-
const index = indexPattern;
4373
const query = {
4474
'size': size,
4575
'aggregations': aggs,
4676
'query': _query,
4777
};
4878

79+
return { index: indexPattern, query };
80+
}));
81+
}
82+
83+
export const rollupSearchStrategy = {
84+
id: 'rollup',
85+
86+
search: async ({ searchRequests, Promise }) => {
87+
// Flatten the searchSource within each searchRequest to get the fetch params,
88+
// e.g. body, filters, index pattern, query.
89+
const allFetchParams = await getAllFetchParams(searchRequests, Promise);
90+
91+
// Serialize the fetch params into a format suitable for the body of an ES query.
92+
const {
93+
serializedFetchParams,
94+
failedSearchRequests,
95+
} = await serializeAllFetchParams(allFetchParams, searchRequests);
96+
4997
const {
5098
fetching,
5199
abort,
52100
} = kfetchAbortable({
53101
pathname: '../api/rollup/search',
54102
method: 'POST',
55-
body: JSON.stringify({ index, query }),
103+
body: serializedFetchParams,
56104
});
57105

58-
// TODO: Implement this. Search requests which can't be sent.
59-
const failedSearchRequests = [];
60-
61106
return {
62-
// Munge data into shape expected by consumer.
63107
searching: new Promise((resolve, reject) => {
64108
fetching.then(result => {
65-
resolve([ result ]);
109+
resolve(result);
66110
}).catch(error => {
67111
const {
68112
body: { statusText, error: title, message },

x-pack/plugins/rollup/server/routes/api/search.js

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,17 @@ export function registerSearchRoute(server) {
1414
path: '/api/rollup/search',
1515
method: 'POST',
1616
handler: async (request, reply) => {
17-
const { index, query } = request.payload;
1817
const callWithRequest = callWithRequestFactory(server, request);
1918

2019
try {
21-
const results = await callWithRequest('rollup.search', {
22-
index,
23-
body: query,
24-
});
20+
const requests = request.payload.map(({ index, query }) => (
21+
callWithRequest('rollup.search', {
22+
index,
23+
body: query,
24+
})
25+
));
2526

27+
const results = await Promise.all(requests);
2628
reply(results);
2729
} catch(err) {
2830
if (isEsError(err)) {

0 commit comments

Comments
 (0)