Skip to content

Commit 53fd3d5

Browse files
committed
use Observables on server search API
1 parent 4dc6f3b commit 53fd3d5

File tree

25 files changed

+421
-332
lines changed

25 files changed

+421
-332
lines changed

docs/development/plugins/data/server/kibana-plugin-plugins-data-server.isearchstart.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,6 @@ export interface ISearchStart<SearchStrategyRequest extends IKibanaSearchRequest
1616
| --- | --- | --- |
1717
| [aggs](./kibana-plugin-plugins-data-server.isearchstart.aggs.md) | <code>AggsStart</code> | |
1818
| [getSearchStrategy](./kibana-plugin-plugins-data-server.isearchstart.getsearchstrategy.md) | <code>(name: string) =&gt; ISearchStrategy&lt;SearchStrategyRequest, SearchStrategyResponse&gt;</code> | Get other registered search strategies. For example, if a new strategy needs to use the already-registered ES search strategy, it can use this function to accomplish that. |
19-
| [search](./kibana-plugin-plugins-data-server.isearchstart.search.md) | <code>(context: RequestHandlerContext, request: SearchStrategyRequest, options: ISearchOptions) =&gt; Promise&lt;SearchStrategyResponse&gt;</code> | |
19+
| [search](./kibana-plugin-plugins-data-server.isearchstart.search.md) | <code>ISearchStrategy['search']</code> | |
2020
| [searchSource](./kibana-plugin-plugins-data-server.isearchstart.searchsource.md) | <code>{</code><br/><code> asScoped: (request: KibanaRequest) =&gt; Promise&lt;ISearchStartSearchSource&gt;;</code><br/><code> }</code> | |
2121

docs/development/plugins/data/server/kibana-plugin-plugins-data-server.isearchstart.search.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@
77
<b>Signature:</b>
88

99
```typescript
10-
search: (context: RequestHandlerContext, request: SearchStrategyRequest, options: ISearchOptions) => Promise<SearchStrategyResponse>;
10+
search: ISearchStrategy['search'];
1111
```

docs/development/plugins/data/server/kibana-plugin-plugins-data-server.isearchstrategy.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@ export interface ISearchStrategy<SearchStrategyRequest extends IKibanaSearchRequ
1717
| Property | Type | Description |
1818
| --- | --- | --- |
1919
| [cancel](./kibana-plugin-plugins-data-server.isearchstrategy.cancel.md) | <code>(context: RequestHandlerContext, id: string) =&gt; Promise&lt;void&gt;</code> | |
20-
| [search](./kibana-plugin-plugins-data-server.isearchstrategy.search.md) | <code>(context: RequestHandlerContext, request: SearchStrategyRequest, options?: ISearchOptions) =&gt; Promise&lt;SearchStrategyResponse&gt;</code> | |
20+
| [search](./kibana-plugin-plugins-data-server.isearchstrategy.search.md) | <code>(request: SearchStrategyRequest, options: ISearchOptions, context: RequestHandlerContext) =&gt; Observable&lt;SearchStrategyResponse&gt;</code> | |
2121

docs/development/plugins/data/server/kibana-plugin-plugins-data-server.isearchstrategy.search.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@
77
<b>Signature:</b>
88

99
```typescript
10-
search: (context: RequestHandlerContext, request: SearchStrategyRequest, options?: ISearchOptions) => Promise<SearchStrategyResponse>;
10+
search: (request: SearchStrategyRequest, options: ISearchOptions, context: RequestHandlerContext) => Observable<SearchStrategyResponse>;
1111
```

examples/search_examples/server/my_strategy.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
* under the License.
1818
*/
1919

20+
import { map } from 'rxjs/operators';
2021
import { ISearchStrategy, PluginStart } from '../../../src/plugins/data/server';
2122
import { IMyStrategyResponse, IMyStrategyRequest } from '../common';
2223

@@ -25,13 +26,13 @@ export const mySearchStrategyProvider = (
2526
): ISearchStrategy<IMyStrategyRequest, IMyStrategyResponse> => {
2627
const es = data.search.getSearchStrategy('es');
2728
return {
28-
search: async (context, request, options): Promise<IMyStrategyResponse> => {
29-
const esSearchRes = await es.search(context, request, options);
30-
return {
31-
...esSearchRes,
32-
cool: request.get_cool ? 'YES' : 'NOPE',
33-
};
34-
},
29+
search: (request, options, context) =>
30+
es.search(request, options, context).pipe(
31+
map((esSearchRes) => ({
32+
...esSearchRes,
33+
cool: request.get_cool ? 'YES' : 'NOPE',
34+
}))
35+
),
3536
cancel: async (context, id) => {
3637
if (es.cancel) {
3738
es.cancel(context, id);

examples/search_examples/server/routes/server_search_route.ts

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,26 +39,28 @@ export function registerServerSearchRoute(router: IRouter, data: DataPluginStart
3939
// Run a synchronous search server side, by enforcing a high keepalive and waiting for completion.
4040
// If you wish to run the search with polling (in basic+), you'd have to poll on the search API.
4141
// Please reach out to the @app-arch-team if you need this to be implemented.
42-
const res = await data.search.search(
43-
context,
44-
{
45-
params: {
46-
index,
47-
body: {
48-
aggs: {
49-
'1': {
50-
avg: {
51-
field,
42+
const res = await data.search
43+
.search(
44+
{
45+
params: {
46+
index,
47+
body: {
48+
aggs: {
49+
'1': {
50+
avg: {
51+
field,
52+
},
5253
},
5354
},
5455
},
56+
waitForCompletionTimeout: '5m',
57+
keepAlive: '5m',
5558
},
56-
waitForCompletionTimeout: '5m',
57-
keepAlive: '5m',
58-
},
59-
} as IEsSearchRequest,
60-
{}
61-
);
59+
} as IEsSearchRequest,
60+
{},
61+
context
62+
)
63+
.toPromise();
6264

6365
return response.ok({
6466
body: {

src/plugins/data/server/search/es_search/es_search_strategy.test.ts

Lines changed: 43 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ describe('ES search strategy', () => {
3535
},
3636
},
3737
});
38-
const mockContext = {
38+
39+
const mockContext = ({
3940
core: {
4041
uiSettings: {
4142
client: {
@@ -44,7 +45,8 @@ describe('ES search strategy', () => {
4445
},
4546
elasticsearch: { client: { asCurrentUser: { search: mockApiCaller } } },
4647
},
47-
};
48+
} as unknown) as RequestHandlerContext;
49+
4850
const mockConfig$ = pluginInitializerContextConfigMock<any>({}).legacy.globalConfig$;
4951

5052
beforeEach(() => {
@@ -57,44 +59,51 @@ describe('ES search strategy', () => {
5759
expect(typeof esSearch.search).toBe('function');
5860
});
5961

60-
it('calls the API caller with the params with defaults', async () => {
62+
it('calls the API caller with the params with defaults', async (done) => {
6163
const params = { index: 'logstash-*' };
62-
const esSearch = await esSearchStrategyProvider(mockConfig$, mockLogger);
6364

64-
await esSearch.search((mockContext as unknown) as RequestHandlerContext, { params });
65-
66-
expect(mockApiCaller).toBeCalled();
67-
expect(mockApiCaller.mock.calls[0][0]).toEqual({
68-
...params,
69-
ignore_unavailable: true,
70-
track_total_hits: true,
71-
});
65+
await esSearchStrategyProvider(mockConfig$, mockLogger)
66+
.search({ params }, {}, mockContext)
67+
.subscribe(() => {
68+
expect(mockApiCaller).toBeCalled();
69+
expect(mockApiCaller.mock.calls[0][0]).toEqual({
70+
...params,
71+
ignore_unavailable: true,
72+
track_total_hits: true,
73+
});
74+
done();
75+
});
7276
});
7377

74-
it('calls the API caller with overridden defaults', async () => {
78+
it('calls the API caller with overridden defaults', async (done) => {
7579
const params = { index: 'logstash-*', ignore_unavailable: false, timeout: '1000ms' };
76-
const esSearch = await esSearchStrategyProvider(mockConfig$, mockLogger);
77-
78-
await esSearch.search((mockContext as unknown) as RequestHandlerContext, { params });
7980

80-
expect(mockApiCaller).toBeCalled();
81-
expect(mockApiCaller.mock.calls[0][0]).toEqual({
82-
...params,
83-
track_total_hits: true,
84-
});
81+
await esSearchStrategyProvider(mockConfig$, mockLogger)
82+
.search({ params }, {}, mockContext)
83+
.subscribe(() => {
84+
expect(mockApiCaller).toBeCalled();
85+
expect(mockApiCaller.mock.calls[0][0]).toEqual({
86+
...params,
87+
track_total_hits: true,
88+
});
89+
done();
90+
});
8591
});
8692

87-
it('has all response parameters', async () => {
88-
const params = { index: 'logstash-*' };
89-
const esSearch = await esSearchStrategyProvider(mockConfig$, mockLogger);
90-
91-
const response = await esSearch.search((mockContext as unknown) as RequestHandlerContext, {
92-
params,
93-
});
94-
95-
expect(response.isRunning).toBe(false);
96-
expect(response.isPartial).toBe(false);
97-
expect(response).toHaveProperty('loaded');
98-
expect(response).toHaveProperty('rawResponse');
99-
});
93+
it('has all response parameters', async (done) =>
94+
await esSearchStrategyProvider(mockConfig$, mockLogger)
95+
.search(
96+
{
97+
params: { index: 'logstash-*' },
98+
},
99+
{},
100+
mockContext
101+
)
102+
.subscribe((data) => {
103+
expect(data.isRunning).toBe(false);
104+
expect(data.isPartial).toBe(false);
105+
expect(data).toHaveProperty('loaded');
106+
expect(data).toHaveProperty('rawResponse');
107+
done();
108+
}));
100109
});

src/plugins/data/server/search/es_search/es_search_strategy.ts

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19+
import { Observable, from } from 'rxjs';
1920
import { first } from 'rxjs/operators';
2021
import { SharedGlobalConfig, Logger } from 'kibana/server';
2122
import { SearchResponse } from 'elasticsearch';
22-
import { Observable } from 'rxjs';
2323
import { ApiResponse } from '@elastic/elasticsearch';
2424
import { SearchUsage } from '../collectors/usage';
2525
import { toSnakeCase } from './to_snake_case';
@@ -29,6 +29,7 @@ import {
2929
getTotalLoaded,
3030
getShardTimeout,
3131
shimAbortSignal,
32+
IEsSearchResponse,
3233
} from '..';
3334

3435
export const esSearchStrategyProvider = (
@@ -37,47 +38,52 @@ export const esSearchStrategyProvider = (
3738
usage?: SearchUsage
3839
): ISearchStrategy => {
3940
return {
40-
search: async (context, request, options) => {
41-
logger.debug(`search ${request.params?.index}`);
42-
const config = await config$.pipe(first()).toPromise();
43-
const uiSettingsClient = await context.core.uiSettings.client;
41+
search: (request, options, context) =>
42+
from(
43+
new Promise<IEsSearchResponse>(async (resolve, reject) => {
44+
logger.debug(`search ${request.params?.index}`);
45+
const config = await config$.pipe(first()).toPromise();
46+
const uiSettingsClient = await context.core.uiSettings.client;
4447

45-
// Only default index pattern type is supported here.
46-
// See data_enhanced for other type support.
47-
if (!!request.indexType) {
48-
throw new Error(`Unsupported index pattern type ${request.indexType}`);
49-
}
48+
// Only default index pattern type is supported here.
49+
// See data_enhanced for other type support.
50+
if (!!request.indexType) {
51+
throw new Error(`Unsupported index pattern type ${request.indexType}`);
52+
}
5053

51-
// ignoreThrottled is not supported in OSS
52-
const { ignoreThrottled, ...defaultParams } = await getDefaultSearchParams(uiSettingsClient);
54+
// ignoreThrottled is not supported in OSS
55+
const { ignoreThrottled, ...defaultParams } = await getDefaultSearchParams(
56+
uiSettingsClient
57+
);
5358

54-
const params = toSnakeCase({
55-
...defaultParams,
56-
...getShardTimeout(config),
57-
...request.params,
58-
});
59+
const params = toSnakeCase({
60+
...defaultParams,
61+
...getShardTimeout(config),
62+
...request.params,
63+
});
5964

60-
try {
61-
const promise = shimAbortSignal(
62-
context.core.elasticsearch.client.asCurrentUser.search(params),
63-
options?.abortSignal
64-
);
65-
const { body: rawResponse } = (await promise) as ApiResponse<SearchResponse<any>>;
65+
try {
66+
const promise = shimAbortSignal(
67+
context.core.elasticsearch.client.asCurrentUser.search(params),
68+
options?.abortSignal
69+
);
70+
const { body: rawResponse } = (await promise) as ApiResponse<SearchResponse<any>>;
6671

67-
if (usage) usage.trackSuccess(rawResponse.took);
72+
if (usage) usage.trackSuccess(rawResponse.took);
6873

69-
// The above query will either complete or timeout and throw an error.
70-
// There is no progress indication on this api.
71-
return {
72-
isPartial: false,
73-
isRunning: false,
74-
rawResponse,
75-
...getTotalLoaded(rawResponse._shards),
76-
};
77-
} catch (e) {
78-
if (usage) usage.trackError();
79-
throw e;
80-
}
81-
},
74+
// The above query will either complete or timeout and throw an error.
75+
// There is no progress indication on this api.
76+
resolve({
77+
isPartial: false,
78+
isRunning: false,
79+
rawResponse,
80+
...getTotalLoaded(rawResponse._shards),
81+
});
82+
} catch (e) {
83+
if (usage) usage.trackError();
84+
reject(e);
85+
}
86+
})
87+
),
8288
};
8389
};

src/plugins/data/server/search/routes/search.test.ts

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* under the License.
1818
*/
1919

20-
import { Observable } from 'rxjs';
20+
import { Observable, from } from 'rxjs';
2121

2222
import {
2323
CoreSetup,
@@ -66,7 +66,8 @@ describe('Search service', () => {
6666
},
6767
},
6868
};
69-
mockDataStart.search.search.mockResolvedValue(response);
69+
70+
mockDataStart.search.search.mockReturnValue(from(Promise.resolve(response)));
7071
const mockContext = {};
7172
const mockBody = { id: undefined, params: {} };
7273
const mockParams = { strategy: 'foo' };
@@ -83,20 +84,24 @@ describe('Search service', () => {
8384
await handler((mockContext as unknown) as RequestHandlerContext, mockRequest, mockResponse);
8485

8586
expect(mockDataStart.search.search).toBeCalled();
86-
expect(mockDataStart.search.search.mock.calls[0][1]).toStrictEqual(mockBody);
87+
expect(mockDataStart.search.search.mock.calls[0][0]).toStrictEqual(mockBody);
8788
expect(mockResponse.ok).toBeCalled();
8889
expect(mockResponse.ok.mock.calls[0][0]).toEqual({
8990
body: response,
9091
});
9192
});
9293

9394
it('handler throws an error if the search throws an error', async () => {
94-
mockDataStart.search.search.mockRejectedValue({
95-
message: 'oh no',
96-
body: {
97-
error: 'oops',
98-
},
99-
});
95+
const rejectedValue = from(
96+
Promise.reject({
97+
message: 'oh no',
98+
body: {
99+
error: 'oops',
100+
},
101+
})
102+
);
103+
104+
mockDataStart.search.search.mockReturnValue(rejectedValue);
100105

101106
const mockContext = {};
102107
const mockBody = { id: undefined, params: {} };
@@ -114,7 +119,7 @@ describe('Search service', () => {
114119
await handler((mockContext as unknown) as RequestHandlerContext, mockRequest, mockResponse);
115120

116121
expect(mockDataStart.search.search).toBeCalled();
117-
expect(mockDataStart.search.search.mock.calls[0][1]).toStrictEqual(mockBody);
122+
expect(mockDataStart.search.search.mock.calls[0][0]).toStrictEqual(mockBody);
118123
expect(mockResponse.customError).toBeCalled();
119124
const error: any = mockResponse.customError.mock.calls[0][0];
120125
expect(error.body.message).toBe('oh no');

0 commit comments

Comments
 (0)