Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ export class IndexUpdateService {
},
{
strategy: 'esql_async',
retrieveResults: true,
retrieveIntermediateResults: true,
}
)
.pipe(
Expand Down
9 changes: 4 additions & 5 deletions src/platform/packages/shared/kbn-search-types/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,10 @@ export interface ISearchOptions {
isRestore?: boolean;

/**
* By default, when polling, we don't retrieve the results of the search request (until it is complete). (For async
* search, this is the difference between calling _async_search/{id} and _async_search/status/{id}.) setting this to
* `true` will request the search results, regardless of whether or not the search is complete.
* By default, when polling, we don't retrieve the results of the search request (until it is complete).
* setting this to `true` will request the search results, regardless of whether or not the search is complete.
*/
retrieveResults?: boolean;
retrieveIntermediateResults?: boolean;

/**
* Represents a meta-information about a Kibana entity intitating a saerch request.
Expand Down Expand Up @@ -144,7 +143,7 @@ export type ISearchOptionsSerializable = Pick<
| 'isStored'
| 'isSearchStored'
| 'isRestore'
| 'retrieveResults'
| 'retrieveIntermediateResults'
| 'executionContext'
| 'stream'
| 'projectRouting'
Expand Down
28 changes: 4 additions & 24 deletions src/platform/plugins/shared/data/common/search/poll_search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import {
fromEvent,
switchMap,
takeUntil,
takeWhile,
tap,
throwError,
timer,
Expand All @@ -31,26 +30,7 @@ export const pollSearch = <Response extends IKibanaSearchResponse>(
cancel?: () => Promise<void>,
{ pollInterval, abortSignal }: IAsyncSearchOptions = {}
): Observable<Response> => {
const getPollInterval = (elapsedTime: number): number => {
if (typeof pollInterval === 'number') return pollInterval;
else {
// if static pollInterval is not provided, then use default back-off logic
switch (true) {
case elapsedTime < 1500:
return 300;
case elapsedTime < 5000:
return 1000;
case elapsedTime < 20000:
return 2500;
default:
return 5000;
}
}
};

return defer(() => {
const startTime = Date.now();

if (abortSignal?.aborted) {
throw new AbortError();
}
Expand All @@ -69,16 +49,16 @@ export const pollSearch = <Response extends IKibanaSearchResponse>(
);

return from(search()).pipe(
expand(() => {
const elapsedTime = Date.now() - startTime;
return timer(getPollInterval(elapsedTime)).pipe(switchMap(() => search()));
expand((response) => {
return isRunningResponse(response)
? timer(pollInterval ?? 0).pipe(switchMap(() => search()))
: EMPTY;
}),
tap((response) => {
if (isAbortResponse(response)) {
throw new AbortError();
}
}),
takeWhile<Response>(isRunningResponse, true),
takeUntil<Response>(aborted$)
);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ export const ENHANCED_ES_SEARCH_STRATEGY = 'ese';
export interface IAsyncSearchOptions extends SearchSourceSearchOptions {
/**
* The number of milliseconds to wait between receiving a response and sending another request
* If not provided, then a default 1 second interval with back-off up to 5 seconds interval is used
* If not provided, then it defaults to 0 (no wait time)
*/
pollInterval?: number;
/**
* The length of time to wait for results before initiating a new poll request.
*/
pollLength?: string;
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import type {
import { createEsError, isEsError, renderSearchError } from '@kbn/search-errors';
import { AbortReason, defaultFreeze } from '@kbn/kibana-utils-plugin/common';
import type { ICPSManager } from '@kbn/cps-utils';
import moment from 'moment';
import {
EVENT_TYPE_DATA_SEARCH_TIMEOUT,
EVENT_PROPERTY_SEARCH_TIMEOUT_MS,
Expand Down Expand Up @@ -274,8 +275,8 @@ export class SearchInterceptor {

if (combined.sessionId !== undefined) serializableOptions.sessionId = combined.sessionId;
if (combined.isRestore !== undefined) serializableOptions.isRestore = combined.isRestore;
if (combined.retrieveResults !== undefined)
serializableOptions.retrieveResults = combined.retrieveResults;
if (combined.retrieveIntermediateResults !== undefined)
serializableOptions.retrieveIntermediateResults = combined.retrieveIntermediateResults;
if (combined.legacyHitsTotal !== undefined)
serializableOptions.legacyHitsTotal = combined.legacyHitsTotal;
if (combined.strategy !== undefined) serializableOptions.strategy = combined.strategy;
Expand Down Expand Up @@ -398,7 +399,9 @@ export class SearchInterceptor {
let firstRequestParams: SanitizedConnectionRequestParams;

return pollSearch(search, cancel, {
pollInterval: this.deps.searchConfig.asyncSearch.pollInterval,
pollInterval: moment
.duration(this.deps.searchConfig.asyncSearch.pollInterval)
.asMilliseconds(),
...options,
abortSignal: searchAbortController.getSignal(),
}).pipe(
Expand Down Expand Up @@ -433,7 +436,11 @@ export class SearchInterceptor {
return from(
this.runSearch(
{ id, ...request },
{ ...options, abortSignal: new AbortController().signal, retrieveResults: true }
{
...options,
abortSignal: new AbortController().signal,
retrieveIntermediateResults: true,
}
)
).pipe(
map((response) =>
Expand Down
11 changes: 7 additions & 4 deletions src/platform/plugins/shared/data/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ export const searchConfigSchema = schema.object({
asyncSearch: schema.object({
/**
* Block and wait until the search is completed up to the timeout (see es async_search's `wait_for_completion_timeout`)
* TODO: we should optimize this as 100ms is likely not optimal (https://github.com/elastic/kibana/issues/143277)
*/
waitForCompletion: schema.duration({ defaultValue: '200ms' }),
waitForCompletion: schema.duration({ defaultValue: '1000ms' }),
/**
* How long the async search needs to be available after each search poll. Ongoing async searches and any saved search results are deleted after this period.
* (see es async_search's `keep_alive`)
Expand All @@ -71,9 +70,13 @@ export const searchConfigSchema = schema.object({
batchedReduceSize: schema.number({ defaultValue: 64 }),
/**
* How long to wait before polling the async_search after the previous poll response.
* If not provided, then default dynamic interval with backoff is used.
* If not provided, defaults to zero.
*/
pollInterval: schema.maybe(schema.number({ min: 200 })),
pollInterval: schema.duration({ defaultValue: '1s' }),
/**
* How long to wait for results before initiating a new poll request. If not provided, defaults to 30s.
*/
pollLength: schema.duration({ defaultValue: '200ms' }),
}),
aggs: schema.object({
shardDelay: schema.object({
Expand Down
1 change: 0 additions & 1 deletion src/platform/plugins/shared/data/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ export type {
ISearchSessionService,
SearchRequestHandlerContext,
DataRequestHandlerContext,
AsyncSearchStatusResponse,
} from './search';
export {
SearchSessionService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export function registerSearchRoute(
sessionId: schema.maybe(schema.string()),
isStored: schema.maybe(schema.boolean()),
isRestore: schema.maybe(schema.boolean()),
retrieveResults: schema.maybe(schema.boolean()),
retrieveIntermediateResults: schema.maybe(schema.boolean()),
stream: schema.maybe(schema.boolean()),
requestHash: schema.maybe(schema.string()),
projectRouting: schema.maybe(schema.string()),
Expand All @@ -68,7 +68,7 @@ export function registerSearchRoute(
sessionId,
isStored,
isRestore,
retrieveResults,
retrieveIntermediateResults,
stream,
requestHash,
projectRouting,
Expand Down Expand Up @@ -103,7 +103,7 @@ export function registerSearchRoute(
sessionId,
isStored,
isRestore,
retrieveResults,
retrieveIntermediateResults,
stream,
requestHash,
projectRouting,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export function getCommonDefaultAsyncGetParams(

return {
// Wait up to the timeout for the response to return
wait_for_completion_timeout: `${config.asyncSearch.waitForCompletion.asMilliseconds()}ms`,
wait_for_completion_timeout: `${config.asyncSearch.pollLength.asMilliseconds()}ms`,
...(useSearchSessions && options.isStored
? // Use session's keep_alive if search belongs to a stored session
options.isSearchStored || options.isRestore // if search was already stored and extended, then no need to extend keepAlive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import type { IAsyncSearchRequestParams } from '../..';
import { getKbnSearchError, KbnSearchError } from '../../report_search_error';
import type { ISearchStrategy, SearchStrategyDependencies } from '../../types';
import type { IAsyncSearchOptions } from '../../../../common';
import { DataViewType, isRunningResponse, pollSearch } from '../../../../common';
import { DataViewType, pollSearch } from '../../../../common';
import {
getDefaultAsyncGetParams,
getDefaultAsyncSubmitParams,
getIgnoreThrottled,
} from './request_utils';
import { toAsyncKibanaSearchResponse, toAsyncKibanaSearchStatusResponse } from './response_utils';
import { toAsyncKibanaSearchResponse } from './response_utils';
import type { SearchUsage } from '../../collectors/search';
import { searchUsageObserver } from '../../collectors/search';
import { getDefaultSearchParams, getShardTimeout } from '../es_search';
Expand All @@ -44,42 +44,22 @@ export const enhancedEsSearchStrategyProvider = (
return client.asyncSearch.delete({ id });
}

async function asyncSearchStatus(
{ id, ...request }: IEsSearchRequest<IAsyncSearchRequestParams>,
options: IAsyncSearchOptions,
{ esClient }: Pick<SearchStrategyDependencies, 'esClient'>
) {
const client = useInternalUser ? esClient.asInternalUser : esClient.asCurrentUser;
const keepAlive =
request.params?.keep_alive ?? getDefaultAsyncGetParams(searchConfig, options).keep_alive;

const { body, headers } = await client.asyncSearch.status(
{ id: id!, keep_alive: keepAlive },
{ ...options.transport, signal: options.abortSignal, meta: true }
);
return toAsyncKibanaSearchStatusResponse(body, headers?.warning);
}

// Gets the current status of the async search request. If the request is complete, then queries for the results.
// Gets the current status of the async search request.
async function getAsyncSearch(
{ id, ...request }: IEsSearchRequest<IAsyncSearchRequestParams>,
options: IAsyncSearchOptions,
{ esClient }: SearchStrategyDependencies
) {
if (!options.retrieveResults) {
// First, request the status of the async search, and return the status if incomplete
const status = await asyncSearchStatus({ id, ...request }, options, { esClient });
if (isRunningResponse(status)) return status;
}

// Then, if the search is complete, request & return the final results
const client = useInternalUser ? esClient.asInternalUser : esClient.asCurrentUser;
const params = {
...getDefaultAsyncGetParams(searchConfig, options),
...(request.params?.keep_alive ? { keep_alive: request.params.keep_alive } : {}),
...(request.params?.wait_for_completion_timeout
? { wait_for_completion_timeout: request.params.wait_for_completion_timeout }
: {}),
...(options.retrieveIntermediateResults
? { return_intermediate_results: options.retrieveIntermediateResults }
: {}),
};
const { body, headers, meta } = await client.asyncSearch.get(
{ ...params, id: id! },
Expand All @@ -88,6 +68,7 @@ export const enhancedEsSearchStrategyProvider = (
signal: options.abortSignal,
meta: true,
asStream: options.stream,
requestTimeout: 600_000, // 10 minutes, making this huge enough that it should never interfere with the `wait_for_completion_timeout` param, which is what should be controlling the timeout of the search request.
}
);

Expand Down Expand Up @@ -218,7 +199,11 @@ export const enhancedEsSearchStrategyProvider = (
if (request.indexType === DataViewType.ROLLUP && deps.rollupsEnabled) {
return from(rollupSearch(request, options, deps));
} else {
return asyncSearch(request, options, deps);
try {
return asyncSearch(request, options, deps);
} catch (e) {
throw getKbnSearchError(e);
}
}
},
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,9 @@ import type { IKibanaSearchResponse } from '@kbn/search-types';
import type { IncomingHttpHeaders } from 'http';
import type { AsyncSearchResponse } from './types';
import { sanitizeRequestParams } from '../../sanitize_request_params';
import type { AsyncSearchStatusResponse } from './types';
import type { IAsyncSearchOptions } from '../../../../common';
import { shimHitsTotal, getTotalLoaded } from '../../../../common';

/**
* Get the Kibana representation of an async search status response.
*/
export function toAsyncKibanaSearchStatusResponse(
response: AsyncSearchStatusResponse,
warning?: string
): IKibanaSearchResponse {
return {
id: response.id,
rawResponse: {},
isPartial: response.is_partial,
isRunning: response.is_running,
...(warning ? { warning } : {}),
};
}

/**
* Get the Kibana representation of an async search response.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@
* License v3.0 only", or the "Server Side Public License, v 1".
*/

import type {
AsyncSearchGetRequest,
SearchResponse,
ShardStatistics,
} from '@elastic/elasticsearch/lib/api/types';
import type { AsyncSearchGetRequest, SearchResponse } from '@elastic/elasticsearch/lib/api/types';
import type { ISearchRequestParams } from '@kbn/search-types';

export interface IAsyncSearchRequestParams extends ISearchRequestParams {
Expand All @@ -27,7 +23,3 @@ export interface AsyncSearchResponse<T = unknown> {
is_partial: boolean;
is_running: boolean;
}
export interface AsyncSearchStatusResponse extends Omit<AsyncSearchResponse, 'response'> {
completion_status?: number;
_shards: ShardStatistics;
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ describe('ES|QL async search strategy', () => {
const id = 'FlBvQU5CS3BKVEdPcWM1V2lkYXNUbXccVmNhQl9wcWFRdG1WYzE4N2tsOFNNdzozNjMzOQ==';
const params = { query: 'from logs* | limit 10' };
const esSearch = await esqlAsyncSearchStrategyProvider(mockSearchConfig, mockLogger);
await esSearch.search({ id, params }, { retrieveResults: true }, mockDeps).toPromise();
await esSearch
.search({ id, params }, { retrieveIntermediateResults: true }, mockDeps)
.toPromise();

expect(mockAsyncQueryStop).toBeCalled();
const request = mockAsyncQueryStop.mock.calls[0][0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ export const esqlAsyncSearchStrategyProvider = (
const { abortSignal, ...options } = searchOptions;
const search = async () => {
const response = await (!id
? submitEsqlSearch(request, options, deps)
: options.retrieveResults
? submitEsqlSearch({ id, ...request }, options, deps)
: options.retrieveIntermediateResults
? stopEsqlAsyncSearch(id, options, deps)
: getEsqlAsyncSearch({ id, ...request }, options, deps));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ export const logEntriesSearchStrategyProvider = ({
esRequest,
{
...options,
retrieveResults: true, // the subsequent processing requires the actual search results
retrieveIntermediateResults: true, // the subsequent processing requires the actual search results
},
dependencies
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export const logEntrySearchStrategyProvider = ({
esRequest,
{
...options,
retrieveResults: true, // without it response will not contain progress information
retrieveIntermediateResults: true, // without it response will not contain progress information
},
dependencies
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,10 @@ function collectDocuments({
let registerFetchLatency: () => void = () => {};

const subscription = data.search
.search({ params }, { abortSignal: abortController.signal, retrieveResults: true })
.search(
{ params },
{ abortSignal: abortController.signal, retrieveIntermediateResults: true }
)
.pipe(
tap({
subscribe: () => {
Expand Down
Loading
Loading