55 */
66
77import { Subject } from 'rxjs' ;
8- import { bufferTime , filter , switchMap } from 'rxjs/operators' ;
8+ import { bufferTime , filter as rxFilter , switchMap } from 'rxjs/operators' ;
99import { reject , isUndefined } from 'lodash' ;
10- import { Client } from 'elasticsearch' ;
1110import type { PublicMethodsOf } from '@kbn/utility-types' ;
12- import { Logger , LegacyClusterClient } from 'src/core/server' ;
13- import { ESSearchResponse } from '../../../../typings/elasticsearch' ;
11+ import { Logger , ElasticsearchClient } from 'src/core/server' ;
1412import { EsContext } from '.' ;
1513import { IEvent , IValidatedEvent , SAVED_OBJECT_REL_PRIMARY } from '../types' ;
1614import { FindOptionsType } from '../event_log_client' ;
15+ import { esKuery } from '../../../../../src/plugins/data/server' ;
1716
1817export const EVENT_BUFFER_TIME = 1000 ; // milliseconds
1918export const EVENT_BUFFER_LENGTH = 100 ;
2019
21- export type EsClusterClient = Pick < LegacyClusterClient , 'callAsInternalUser' | 'asScoped' > ;
2220export type IClusterClientAdapter = PublicMethodsOf < ClusterClientAdapter > ;
2321
2422export interface Doc {
@@ -28,7 +26,7 @@ export interface Doc {
2826
2927export interface ConstructorOpts {
3028 logger : Logger ;
31- clusterClientPromise : Promise < EsClusterClient > ;
29+ elasticsearchClientPromise : Promise < ElasticsearchClient > ;
3230 context : EsContext ;
3331}
3432
@@ -41,14 +39,14 @@ export interface QueryEventsBySavedObjectResult {
4139
4240export class ClusterClientAdapter {
4341 private readonly logger : Logger ;
44- private readonly clusterClientPromise : Promise < EsClusterClient > ;
42+ private readonly elasticsearchClientPromise : Promise < ElasticsearchClient > ;
4543 private readonly docBuffer$ : Subject < Doc > ;
4644 private readonly context : EsContext ;
4745 private readonly docsBufferedFlushed : Promise < void > ;
4846
4947 constructor ( opts : ConstructorOpts ) {
5048 this . logger = opts . logger ;
51- this . clusterClientPromise = opts . clusterClientPromise ;
49+ this . elasticsearchClientPromise = opts . elasticsearchClientPromise ;
5250 this . context = opts . context ;
5351 this . docBuffer$ = new Subject < Doc > ( ) ;
5452
@@ -58,7 +56,7 @@ export class ClusterClientAdapter {
5856 this . docsBufferedFlushed = this . docBuffer$
5957 . pipe (
6058 bufferTime ( EVENT_BUFFER_TIME , null , EVENT_BUFFER_LENGTH ) ,
61- filter ( ( docs ) => docs . length > 0 ) ,
59+ rxFilter ( ( docs ) => docs . length > 0 ) ,
6260 switchMap ( async ( docs ) => await this . indexDocuments ( docs ) )
6361 )
6462 . toPromise ( ) ;
@@ -97,7 +95,8 @@ export class ClusterClientAdapter {
9795 }
9896
9997 try {
100- await this . callEs < ReturnType < Client [ 'bulk' ] > > ( 'bulk' , { body : bulkBody } ) ;
98+ const esClient = await this . elasticsearchClientPromise ;
99+ await esClient . bulk ( { body : bulkBody } ) ;
101100 } catch ( err ) {
102101 this . logger . error (
103102 `error writing bulk events: "${ err . message } "; docs: ${ JSON . stringify ( bulkBody ) } `
@@ -111,22 +110,24 @@ export class ClusterClientAdapter {
111110 path : `/_ilm/policy/${ policyName } ` ,
112111 } ;
113112 try {
114- await this . callEs ( 'transport.request' , request ) ;
113+ const esClient = await this . elasticsearchClientPromise ;
114+ await esClient . transport . request ( request ) ;
115115 } catch ( err ) {
116116 if ( err . statusCode === 404 ) return false ;
117117 throw new Error ( `error checking existance of ilm policy: ${ err . message } ` ) ;
118118 }
119119 return true ;
120120 }
121121
122- public async createIlmPolicy ( policyName : string , policy : unknown ) : Promise < void > {
122+ public async createIlmPolicy ( policyName : string , policy : Record < string , unknown > ) : Promise < void > {
123123 const request = {
124124 method : 'PUT' ,
125125 path : `/_ilm/policy/${ policyName } ` ,
126126 body : policy ,
127127 } ;
128128 try {
129- await this . callEs ( 'transport.request' , request ) ;
129+ const esClient = await this . elasticsearchClientPromise ;
130+ await esClient . transport . request ( request ) ;
130131 } catch ( err ) {
131132 throw new Error ( `error creating ilm policy: ${ err . message } ` ) ;
132133 }
@@ -135,27 +136,18 @@ export class ClusterClientAdapter {
135136 public async doesIndexTemplateExist ( name : string ) : Promise < boolean > {
136137 let result ;
137138 try {
138- result = await this . callEs < ReturnType < Client [ 'indices' ] [ 'existsTemplate' ] > > (
139- 'indices.existsTemplate' ,
140- { name }
141- ) ;
139+ const esClient = await this . elasticsearchClientPromise ;
140+ result = ( await esClient . indices . existsTemplate ( { name } ) ) . body ;
142141 } catch ( err ) {
143142 throw new Error ( `error checking existance of index template: ${ err . message } ` ) ;
144143 }
145144 return result as boolean ;
146145 }
147146
148- public async createIndexTemplate ( name : string , template : unknown ) : Promise < void > {
149- const addTemplateParams = {
150- name,
151- create : true ,
152- body : template ,
153- } ;
147+ public async createIndexTemplate ( name : string , template : Record < string , unknown > ) : Promise < void > {
154148 try {
155- await this . callEs < ReturnType < Client [ 'indices' ] [ 'putTemplate' ] > > (
156- 'indices.putTemplate' ,
157- addTemplateParams
158- ) ;
149+ const esClient = await this . elasticsearchClientPromise ;
150+ await esClient . indices . putTemplate ( { name, body : template , create : true } ) ;
159151 } catch ( err ) {
160152 // The error message doesn't have a type attribute we can look to guarantee it's due
161153 // to the template already existing (only long message) so we'll check ourselves to see
@@ -171,19 +163,21 @@ export class ClusterClientAdapter {
171163 public async doesAliasExist ( name : string ) : Promise < boolean > {
172164 let result ;
173165 try {
174- result = await this . callEs < ReturnType < Client [ 'indices' ] [ 'existsAlias' ] > > (
175- 'indices.existsAlias' ,
176- { name }
177- ) ;
166+ const esClient = await this . elasticsearchClientPromise ;
167+ result = ( await esClient . indices . existsAlias ( { name } ) ) . body ;
178168 } catch ( err ) {
179169 throw new Error ( `error checking existance of initial index: ${ err . message } ` ) ;
180170 }
181171 return result as boolean ;
182172 }
183173
184- public async createIndex ( name : string , body : unknown = { } ) : Promise < void > {
174+ public async createIndex (
175+ name : string ,
176+ body : string | Record < string , unknown > = { }
177+ ) : Promise < void > {
185178 try {
186- await this . callEs < ReturnType < Client [ 'indices' ] [ 'create' ] > > ( 'indices.create' , {
179+ const esClient = await this . elasticsearchClientPromise ;
180+ await esClient . indices . create ( {
187181 index : name ,
188182 body,
189183 } ) ;
@@ -200,7 +194,7 @@ export class ClusterClientAdapter {
200194 type : string ,
201195 ids : string [ ] ,
202196 // eslint-disable-next-line @typescript-eslint/naming-convention
203- { page, per_page : perPage , start, end, sort_field, sort_order } : FindOptionsType
197+ { page, per_page : perPage , start, end, sort_field, sort_order, filter } : FindOptionsType
204198 ) : Promise < QueryEventsBySavedObjectResult > {
205199 const defaultNamespaceQuery = {
206200 bool : {
@@ -220,12 +214,26 @@ export class ClusterClientAdapter {
220214 } ;
221215 const namespaceQuery = namespace === undefined ? defaultNamespaceQuery : namedNamespaceQuery ;
222216
217+ const esClient = await this . elasticsearchClientPromise ;
218+ let dslFilterQuery ;
219+ try {
220+ dslFilterQuery = filter
221+ ? esKuery . toElasticsearchQuery ( esKuery . fromKueryExpression ( filter ) )
222+ : [ ] ;
223+ } catch ( err ) {
224+ this . debug ( `Invalid kuery syntax for the filter (${ filter } ) error:` , {
225+ message : err . message ,
226+ statusCode : err . statusCode ,
227+ } ) ;
228+ throw err ;
229+ }
223230 const body = {
224231 size : perPage ,
225232 from : ( page - 1 ) * perPage ,
226233 sort : { [ sort_field ] : { order : sort_order } } ,
227234 query : {
228235 bool : {
236+ filter : dslFilterQuery ,
229237 must : reject (
230238 [
231239 {
@@ -283,8 +291,10 @@ export class ClusterClientAdapter {
283291
284292 try {
285293 const {
286- hits : { hits, total } ,
287- } : ESSearchResponse < unknown , { } > = await this . callEs ( 'search' , {
294+ body : {
295+ hits : { hits, total } ,
296+ } ,
297+ } = await esClient . search ( {
288298 index,
289299 track_total_hits : true ,
290300 body,
@@ -293,7 +303,7 @@ export class ClusterClientAdapter {
293303 page,
294304 per_page : perPage ,
295305 total : total . value ,
296- data : hits . map ( ( hit ) => hit . _source ) as IValidatedEvent [ ] ,
306+ data : hits . map ( ( hit : { _source : unknown } ) => hit . _source ) as IValidatedEvent [ ] ,
297307 } ;
298308 } catch ( err ) {
299309 throw new Error (
@@ -302,24 +312,6 @@ export class ClusterClientAdapter {
302312 }
303313 }
304314
305- // We have a common problem typing ES-DSL Queries
306- // eslint-disable-next-line @typescript-eslint/no-explicit-any
307- private async callEs < ESQueryResult = unknown > ( operation : string , body ?: any ) {
308- try {
309- this . debug ( `callEs(${ operation } ) calls:` , body ) ;
310- const clusterClient = await this . clusterClientPromise ;
311- const result = await clusterClient . callAsInternalUser ( operation , body ) ;
312- this . debug ( `callEs(${ operation } ) result:` , result ) ;
313- return result as ESQueryResult ;
314- } catch ( err ) {
315- this . debug ( `callEs(${ operation } ) error:` , {
316- message : err . message ,
317- statusCode : err . statusCode ,
318- } ) ;
319- throw err ;
320- }
321- }
322-
323315 private debug ( message : string , object ?: unknown ) {
324316 const objectString = object == null ? '' : JSON . stringify ( object ) ;
325317 this . logger . debug ( `esContext: ${ message } ${ objectString } ` ) ;
0 commit comments