Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,23 @@ function getAllFetchParams(searchRequests, Promise) {
}

async function serializeAllFetchParams(fetchParams, searchRequests, serializeFetchParams) {
const searcRequestsWithFetchParams = [];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what reviewer let this typo pass?? 😇

const searchRequestsWithFetchParams = [];
const failedSearchRequests = [];

// Gather the fetch param responses from all the successful requests.
fetchParams.forEach((result, index) => {
if (result.resolved) {
searcRequestsWithFetchParams.push(result.resolved);
searchRequestsWithFetchParams.push(result.resolved);
} else {
const searchRequest = searchRequests[index];

// TODO: All strategies will need to implement this.
searchRequest.handleFailure(result.rejected);
failedSearchRequests.push(searchRequest);
}
});

return {
serializedFetchParams: await serializeFetchParams(searcRequestsWithFetchParams),
serializedFetchParams: await serializeFetchParams(searchRequestsWithFetchParams),
failedSearchRequests,
};
}
Expand Down
76 changes: 60 additions & 16 deletions x-pack/plugins/rollup/public/search/rollup_search_strategy.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,52 @@
import { kfetchAbortable } from 'ui/kfetch';
import { SearchError } from 'ui/courier';

export const rollupSearchStrategy = {
id: 'rollup',
function getAllFetchParams(searchRequests, Promise) {
return Promise.map(searchRequests, (searchRequest) => {
return Promise.try(searchRequest.getFetchParams, void 0, searchRequest)
.then((fetchParams) => {
return (searchRequest.fetchParams = fetchParams);
})
.then(value => ({ resolved: value }))
.catch(error => ({ rejected: error }));
});
}

search: async ({ searchRequests, Promise }) => {
// TODO: Batch together requests to hit a bulk rollup search endpoint.
const searchRequest = searchRequests[0];
const searchParams = await searchRequest.getFetchParams();
const indexPattern = searchParams.index.title || searchParams.index;
async function serializeAllFetchParams(fetchParams, searchRequests) {
const searchRequestsWithFetchParams = [];
const failedSearchRequests = [];

// Gather the fetch param responses from all the successful requests.
fetchParams.forEach((result, index) => {
if (result.resolved) {
searchRequestsWithFetchParams.push(result.resolved);
} else {
const searchRequest = searchRequests[index];

searchRequest.handleFailure(result.rejected);
failedSearchRequests.push(searchRequest);
}
});

const serializedFetchParams = serializeFetchParams(searchRequestsWithFetchParams);

return {
serializedFetchParams,
failedSearchRequests,
};
}

function serializeFetchParams(searchRequestsWithFetchParams) {
return JSON.stringify(searchRequestsWithFetchParams.map(searchRequestWithFetchParams => {
const indexPattern = searchRequestWithFetchParams.index.title || searchRequestWithFetchParams.index;
const {
body: {
size,
aggs,
query: _query,
},
} = searchParams;
index,
} = searchRequestWithFetchParams;

// TODO: Temporarily automatically assign same timezone and interval as what's defined by
// the rollup job. This should be done by the visualization itself.
Expand All @@ -30,7 +61,7 @@ export const rollupSearchStrategy = {

Object.keys(subAggs).forEach(subAggName => {
if (subAggName === 'date_histogram') {
const dateHistogramAgg = searchRequest.source.getField('index').typeMeta.aggs.date_histogram;
const dateHistogramAgg = index.typeMeta.aggs.date_histogram;
const subAgg = subAggs[subAggName];
const field = subAgg.field;
subAgg.time_zone = dateHistogramAgg[field].time_zone;
Expand All @@ -39,30 +70,43 @@ export const rollupSearchStrategy = {
});
});

const index = indexPattern;
const query = {
'size': size,
'aggregations': aggs,
'query': _query,
};

return { index: indexPattern, query };
}));
}

export const rollupSearchStrategy = {
id: 'rollup',

search: async ({ searchRequests, Promise }) => {
// Flatten the searchSource within each searchRequest to get the fetch params,
// e.g. body, filters, index pattern, query.
const allFetchParams = await getAllFetchParams(searchRequests, Promise);

// Serialize the fetch params into a format suitable for the body of an ES query.
const {
serializedFetchParams,
failedSearchRequests,
} = await serializeAllFetchParams(allFetchParams, searchRequests);

const {
fetching,
abort,
} = kfetchAbortable({
pathname: '../api/rollup/search',
method: 'POST',
body: JSON.stringify({ index, query }),
body: serializedFetchParams,
});

// TODO: Implement this. Search requests which can't be sent.
const failedSearchRequests = [];

return {
// Munge data into shape expected by consumer.
searching: new Promise((resolve, reject) => {
fetching.then(result => {
resolve([ result ]);
resolve(result);
}).catch(error => {
const {
body: { statusText, error: title, message },
Expand Down
14 changes: 9 additions & 5 deletions x-pack/plugins/rollup/server/routes/api/search.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@ export function registerSearchRoute(server) {
path: '/api/rollup/search',
method: 'POST',
handler: async (request, reply) => {
const { index, query } = request.payload;
const callWithRequest = callWithRequestFactory(server, request);

try {
const results = await callWithRequest('rollup.search', {
index,
body: query,
});
const results = [];

for (let i = 0; i < request.payload.length; i++) {
const { index, query } = request.payload[i];
results.push(await callWithRequest('rollup.search', {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using await in a loop blocks the loop iteration until it resolves, right? could we change this a little to use Promise.all instead so that the calls are truly asynchronous?

        const results = [];

        for (let i = 0; i < request.payload.length; i++) {
          const { index, query } = request.payload[i];
          results.push(callWithRequest('rollup.search', {
            index,
            body: query,
          }));
        }

        reply(await Promise.all(results));

https://eslint.org/docs/rules/no-await-in-loop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So funny, I went on a walk and realized the same thing, and I knew this comment was waiting for me when I came back...

index,
body: query,
}));
}

reply(results);
} catch(err) {
Expand Down