Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
1d2ea99
bump default submit wait to 1s
drewdaemon Mar 6, 2026
cd165c6
rename retrieveResults
drewdaemon Mar 6, 2026
295b9d8
extend the timeout
drewdaemon Mar 10, 2026
07f1614
remove the final poll request
drewdaemon Mar 10, 2026
79fdfdf
Merge branch 'main' of github.com:elastic/kibana into 229903/continuo…
drewdaemon Mar 12, 2026
3a1a411
add poll length
drewdaemon Mar 12, 2026
ab6d7b3
Automatically apply configuration based on protocol
drewdaemon Mar 18, 2026
78976f3
Fix timeout problem
drewdaemon Mar 18, 2026
80f4390
Merge branch 'main' into 229903/continuous-polling
elasticmachine Mar 24, 2026
8c4b61a
Merge branch 'main' of github.com:elastic/kibana into 229903/continuo…
drewdaemon Mar 24, 2026
ef5e33f
Merge branch 'main' of github.com:elastic/kibana into 229903/continuo…
drewdaemon Mar 27, 2026
25aaafa
respect pollLength yml setting
drewdaemon Mar 27, 2026
0e902f6
return_intermediate_results defaults to false
drewdaemon Mar 27, 2026
f9f7873
remove buffer size change
drewdaemon Mar 27, 2026
b9eebf8
update strategy test
drewdaemon Mar 27, 2026
ae81692
fix strategy tests
drewdaemon Mar 30, 2026
a65ef09
make pollLength a duration
drewdaemon Mar 30, 2026
13a75ee
configure serverless
drewdaemon Mar 30, 2026
1fba93c
better format
drewdaemon Mar 30, 2026
73eca8f
better config
drewdaemon Mar 30, 2026
6df22f3
Merge branch 'main' of github.com:elastic/kibana into 229903/continuo…
drewdaemon Mar 30, 2026
fe33d20
rename
drewdaemon Mar 30, 2026
f428389
formatting
drewdaemon Mar 30, 2026
33ca667
add search interceptor test
drewdaemon Mar 30, 2026
6b0ca9c
update poll length to be more human readable
drewdaemon Mar 30, 2026
27188b8
some name misses
drewdaemon Mar 30, 2026
fd208c6
serverless config fix
drewdaemon Mar 30, 2026
fd32d74
fix config
drewdaemon Mar 31, 2026
068ec68
Merge branch 'main' into 229903/continuous-polling
drewdaemon Mar 31, 2026
2ec331f
Merge branch '229903/continuous-polling' of github.com:drewdaemon/kib…
drewdaemon Mar 31, 2026
6827aa3
revert wait-for-completion setting
drewdaemon Mar 31, 2026
2c6a4b6
revert serverless change
drewdaemon Mar 31, 2026
6a3f881
Merge branch 'main' into 229903/continuous-polling
drewdaemon Apr 7, 2026
1abeb1e
Merge branch 'main' into 229903/continuous-polling
drewdaemon Apr 7, 2026
50a1d18
Merge branch 'main' into 229903/continuous-polling
stratoula Apr 8, 2026
2d5e914
Update serverless.yml
drewdaemon Apr 9, 2026
8e4eb8e
update search interceptor
drewdaemon Apr 9, 2026
701c4c6
Merge branch 'main' of github.com:elastic/kibana into 229903/continuo…
drewdaemon Apr 9, 2026
3a36579
use 0 for wait_for_completion_timeout with session polls
drewdaemon Apr 9, 2026
fa87041
scope the change
drewdaemon Apr 9, 2026
f01cc85
add comment
drewdaemon Apr 9, 2026
3322067
Merge branch 'main' into 229903/continuous-polling
drewdaemon Apr 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ export class IndexUpdateService {
},
{
strategy: 'esql_async',
retrieveResults: true,
returnIntermediateResults: true,
}
)
.pipe(
Expand Down
9 changes: 4 additions & 5 deletions src/platform/packages/shared/kbn-search-types/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,10 @@ export interface ISearchOptions {
isRestore?: boolean;

/**
* By default, when polling, we don't retrieve the results of the search request (until it is complete). (For async
* search, this is the difference between calling _async_search/{id} and _async_search/status/{id}.) setting this to
* `true` will request the search results, regardless of whether or not the search is complete.
* By default, when polling, we don't retrieve the results of the search request (until it is complete).
* setting this to `true` will request the search results, regardless of whether or not the search is complete.
*/
retrieveResults?: boolean;
returnIntermediateResults?: boolean;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean that we can get incremental results while the search is in progress?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I know the Elasticsearch naming could be taken either way, but we are really talking about "partial results," not "incremental results."

But to your question, there should be no practical change in the behavior from the old retrieveResults param. retrieveResults has always meant, "retrieve all results that are available whether or not the search is complete."

The change is that instead of switching endpoints from the status to the GET endpoint when returnIntermediateResults is set, we control the behavior with a param to the GET endpoint.


/**
* Represents a meta-information about a Kibana entity intitating a saerch request.
Expand Down Expand Up @@ -144,7 +143,7 @@ export type ISearchOptionsSerializable = Pick<
| 'isStored'
| 'isSearchStored'
| 'isRestore'
| 'retrieveResults'
| 'returnIntermediateResults'
| 'executionContext'
| 'stream'
| 'projectRouting'
Expand Down
12 changes: 6 additions & 6 deletions src/platform/plugins/shared/data/common/search/poll_search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import {
fromEvent,
switchMap,
takeUntil,
takeWhile,
tap,
throwError,
timer,
Expand Down Expand Up @@ -49,12 +48,12 @@ export const pollSearch = <Response extends IKibanaSearchResponse>(
};

return defer(() => {
const startTime = Date.now();

if (abortSignal?.aborted) {
throw new AbortError();
}

const startTime = Date.now();

const safeCancel = () =>
cancel?.().catch((e) => {
console.error(e); // eslint-disable-line no-console
Expand All @@ -69,16 +68,17 @@ export const pollSearch = <Response extends IKibanaSearchResponse>(
);

return from(search()).pipe(
expand(() => {
expand((response) => {
const elapsedTime = Date.now() - startTime;
return timer(getPollInterval(elapsedTime)).pipe(switchMap(() => search()));
return isRunningResponse(response)
? timer(getPollInterval(elapsedTime)).pipe(switchMap(() => search()))
: EMPTY;
Comment on lines +73 to +75
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious as to why we use this vs. takeWhile?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I made the changes in this PR, the way it was set up before was making an extra request at the end after the results were already available. But I am not an rxjs guru so open to suggestions

}),
tap((response) => {
if (isAbortResponse(response)) {
throw new AbortError();
}
}),
takeWhile<Response>(isRunningResponse, true),
takeUntil<Response>(aborted$)
);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ export const ENHANCED_ES_SEARCH_STRATEGY = 'ese';
export interface IAsyncSearchOptions extends SearchSourceSearchOptions {
/**
* The number of milliseconds to wait between receiving a response and sending another request
* If not provided, then a default 1 second interval with back-off up to 5 seconds interval is used
* If not provided, then it defaults to 0 (no wait time)
*/
pollInterval?: number;
/**
* The length of time to wait for results before initiating a new poll request.
*/
pollLength?: string;
Comment on lines +20 to +23
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to understand where this is used but I couldn't make out code that makes use of this. As a far as I understand these are per-request options and I don't see pollLength from this getting applied somewhere. The other places pollLength is updated in this PR is about the kibana.yml setting. Can you clarify its use in the comment or check if I'm missing something 😅 ?

Copy link
Copy Markdown
Contributor Author

@drewdaemon drewdaemon Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pollLength controls the value of wait_for_completion_timeout passed to Elasticsearch during polling. This is the amount of time Elasticsearch holds the polling connection open.

take a look at https://github.com/elastic/kibana/pull/256564/changes#diff-238b1c61769eb4806c1ef55dc1afe0c04f79740d0753d307fdf89c6b9e5c9799R71, and also the diagram in the PR description.

BTW, this is search code so vis team review is not required (vis team owns esaggs) even though the vis team are technically co-owners of the data plugin. I say this not because I don't welcome it if you'd still like to review but just to take the pressure off :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the context! Looking at the comment in src/platform/plugins/shared/data/public/search/search_interceptor/search_interceptor.ts:

// don't set or override user-configured pollLength, it will be applied server-side
         !this.deps.searchConfig.asyncSearch.pollLength && this.protocolSupportsMultiplexing
              ? DEFAULT_MULTIPLEXING_POLL_LENGTH
              : undefined,

It looks like asyncSearch.pollLength is checked but there is never a case where it would get applied on the client side. It means the value would be the same client+server side and that's why we just consider it server side onle? Maybe I'm just a bit confused a bit by the comment 😅 can you explain the necessary logic client side (default vs undefined) if we're applying it server side in any case anyway?

Copy link
Copy Markdown
Contributor Author

@drewdaemon drewdaemon Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason we have to have client-side logic around this is that we need to react to the protocol the browser is using for the next hop. You can have a setup where the server is on HTTP/1 and the browser is on HTTP/2 with a proxy in the middle. In that case, we should optimize for multiplexing.

However, if the user has explicitly configured a pollLength in kibana.yml, we need to respect that by not overriding it in the request from the client.

It's all in support of this setting configuration flow:

  flowchart TD
      Start[Start] -->|Polling GET| ClientConfig{search.asyncSearch.pollLength set?}

      ClientConfig -->|Yes - Number| UseClientConfig[Use config value]
      ClientConfig -->|No| Multiplex{HTTP/2 or<br/>HTTP/3? - Needs to be checked client-side}

      Multiplex -->|Yes| Use30s[Use 30000ms]
      Multiplex -->|No| Undefined[Omitted - functionally zero]
Loading

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Now I got it, I failed to connect the dots between that flowchart and the code :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't blame you! I updated the diagram in the description to hopefully be more clear. LMK if you have any further suggestions

}
3 changes: 3 additions & 0 deletions src/platform/plugins/shared/data/config.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ export const getMockSearchConfig = ({
},
asyncSearch: {
waitForCompletion = moment.duration(100, 'ms'),
pollLength = moment.duration(2000, 'ms'),
keepAlive = moment.duration(1, 'm'),
batchedReduceSize = 64,
} = {
waitForCompletion: moment.duration(100, 'ms'),
pollLength: moment.duration(2000, 'ms'),
keepAlive: moment.duration(1, 'm'),
batchedReduceSize: 64,
},
Expand All @@ -33,6 +35,7 @@ export const getMockSearchConfig = ({
waitForCompletion,
keepAlive,
batchedReduceSize,
pollLength,
} as SearchConfigSchema['asyncSearch'],
sessions: {
enabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { SearchTimeoutError, TimeoutErrorMode } from './timeout_error';
import { SearchSessionIncompleteWarning } from './search_session_incomplete_warning';
import { getMockSearchConfig } from '../../../config.mock';
import type { ICPSManager } from '@kbn/cps-utils';
import moment from 'moment';

jest.mock('./create_request_hash', () => {
const originalModule = jest.requireActual('./create_request_hash');
Expand Down Expand Up @@ -2170,7 +2171,7 @@ describe('SearchInterceptor', () => {
"/internal/search/ese/1",
Object {
"asResponse": true,
"body": "{\\"id\\":\\"1\\",\\"params\\":{},\\"retrieveResults\\":true,\\"stream\\":true}",
"body": "{\\"id\\":\\"1\\",\\"params\\":{},\\"returnIntermediateResults\\":true,\\"stream\\":true}",
"context": undefined,
"signal": AbortSignal {},
"version": "1",
Expand Down Expand Up @@ -2205,7 +2206,7 @@ describe('SearchInterceptor', () => {
"/internal/search/ese/1",
Object {
"asResponse": true,
"body": "{\\"id\\":\\"1\\",\\"params\\":{},\\"retrieveResults\\":true,\\"stream\\":true}",
"body": "{\\"id\\":\\"1\\",\\"params\\":{},\\"returnIntermediateResults\\":true,\\"stream\\":true}",
"context": undefined,
"signal": AbortSignal {},
"version": "1",
Expand Down Expand Up @@ -2490,4 +2491,221 @@ describe('SearchInterceptor', () => {
});
});
});

describe('pollLength configuration', () => {
const inspectorServiceMock = {
open: () => {},
} as unknown as InspectorStart;

beforeEach(() => {
mockCoreSetup.http.post.mockReset();
});

test('should use DEFAULT_MULTIPLEXING_POLL_LENGTH when pollLength is not set and protocol supports multiplexing', async () => {
const interceptor = new SearchInterceptor({
toasts: mockCoreSetup.notifications.toasts,
startServices: new Promise((resolve) => {
resolve([
mockCoreStart,
{ inspector: inspectorServiceMock } as unknown as SearchServiceStartDependencies,
{},
]);
}),
uiSettings: mockCoreSetup.uiSettings,
http: mockCoreSetup.http,
executionContext: mockCoreSetup.executionContext,
session: sessionService,
searchConfig: {
asyncSearch: {
waitForCompletion: moment.duration(100, 'ms'),
keepAlive: moment.duration(1, 'm'),
batchedReduceSize: 64,
pollLength: undefined, // Explicitly undefined
},
sessions: {
enabled: true,
defaultExpiration: moment.duration(7, 'd'),
},
} as any,
});
(interceptor as any).protocolSupportsMultiplexing = true;

const responses = [
{
time: 10,
value: getMockSearchResponse({
isPartial: true,
isRunning: true,
id: '1',
rawResponse: {},
}),
},
{
time: 20,
value: getMockSearchResponse({
isPartial: false,
isRunning: false,
id: '1',
rawResponse: {},
}),
},
];

mockCoreSetup.http.post.mockImplementation(getHttpMock(responses));

const response = interceptor.search({ params: {} }, { pollInterval: 0 });
response.subscribe({ next, error, complete });

await timeTravel(10);
await timeTravel(20);

expect(mockCoreSetup.http.post).toHaveBeenCalledTimes(2);

const pollRequest = (
mockCoreSetup.http.post.mock.calls[1] as unknown as [string, HttpFetchOptions]
)[1];
const pollBody = JSON.parse(pollRequest?.body as string);

// Should use DEFAULT_MULTIPLEXING_POLL_LENGTH (30s)
expect(pollBody.params.wait_for_completion_timeout).toBe('30s');
});

test('should not set wait_for_completion_timeout when pollLength is not set and protocol does not support multiplexing', async () => {
const interceptor = new SearchInterceptor({
toasts: mockCoreSetup.notifications.toasts,
startServices: new Promise((resolve) => {
resolve([
mockCoreStart,
{ inspector: inspectorServiceMock } as unknown as SearchServiceStartDependencies,
{},
]);
}),
uiSettings: mockCoreSetup.uiSettings,
http: mockCoreSetup.http,
executionContext: mockCoreSetup.executionContext,
session: sessionService,
searchConfig: {
asyncSearch: {
waitForCompletion: moment.duration(100, 'ms'),
keepAlive: moment.duration(1, 'm'),
batchedReduceSize: 64,
pollLength: undefined, // Explicitly undefined
},
sessions: {
enabled: true,
defaultExpiration: moment.duration(7, 'd'),
},
} as any,
});
(interceptor as any).protocolSupportsMultiplexing = false;

const responses = [
{
time: 10,
value: getMockSearchResponse({
isPartial: true,
isRunning: true,
id: '1',
rawResponse: {},
}),
},
{
time: 20,
value: getMockSearchResponse({
isPartial: false,
isRunning: false,
id: '1',
rawResponse: {},
}),
},
];

mockCoreSetup.http.post.mockImplementation(getHttpMock(responses));

const response = interceptor.search({ params: {} }, { pollInterval: 0 });
response.subscribe({ next, error, complete });

await timeTravel(10);
await timeTravel(20);

expect(mockCoreSetup.http.post).toHaveBeenCalledTimes(2);

const pollRequest = (
mockCoreSetup.http.post.mock.calls[1] as unknown as [string, HttpFetchOptions]
)[1];
const pollBody = JSON.parse(pollRequest?.body as string);

// Should not have wait_for_completion_timeout
expect(pollBody.params.wait_for_completion_timeout).toBeUndefined();
});

test('should not set wait_for_completion_timeout when pollLength is set even if protocol supports multiplexing', async () => {
const interceptor = new SearchInterceptor({
toasts: mockCoreSetup.notifications.toasts,
startServices: new Promise((resolve) => {
resolve([
mockCoreStart,
{ inspector: inspectorServiceMock } as unknown as SearchServiceStartDependencies,
{},
]);
}),
uiSettings: mockCoreSetup.uiSettings,
http: mockCoreSetup.http,
executionContext: mockCoreSetup.executionContext,
session: sessionService,
searchConfig: {
asyncSearch: {
waitForCompletion: moment.duration(100, 'ms'),
keepAlive: moment.duration(1, 'm'),
batchedReduceSize: 64,
pollLength: moment.duration(1, 'm'), // Explicitly set
},
sessions: {
enabled: true,
defaultExpiration: moment.duration(7, 'd'),
},
} as any,
});
(interceptor as any).protocolSupportsMultiplexing = true;

const responses = [
{
time: 10,
value: getMockSearchResponse({
isPartial: true,
isRunning: true,
id: '1',
rawResponse: {},
}),
},
{
time: 20,
value: getMockSearchResponse({
isPartial: false,
isRunning: false,
id: '1',
rawResponse: {},
}),
},
];

mockCoreSetup.http.post.mockImplementation(getHttpMock(responses));

const response = interceptor.search({ params: {} }, { pollInterval: 0 });
response.subscribe({ next, error, complete });

await timeTravel(10);
await timeTravel(20);

expect(mockCoreSetup.http.post).toHaveBeenCalledTimes(2);

const pollRequest = (
mockCoreSetup.http.post.mock.calls[1] as unknown as [string, HttpFetchOptions]
)[1];
const pollBody = JSON.parse(pollRequest?.body as string);

// Should not have wait_for_completion_timeout
expect(pollBody.params.wait_for_completion_timeout).toBeUndefined();
});
});
});
Loading
Loading