Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import type { ESQLSearchParams, ESQLSearchResponse } from '@kbn/es-types';
import type { IKibanaSearchRequest, IKibanaSearchResponse } from '@kbn/search-types';
import { catchError, filter as rxFilter, lastValueFrom, map, throwError } from 'rxjs';
import { inject, injectable } from 'inversify';
import type { RecordBatch } from 'apache-arrow';
import type { ElasticsearchClient } from '@kbn/core/server';
import type { LoggerServiceContract } from '../logger_service/logger_service';
import { LoggerServiceToken } from '../logger_service/logger_service';
import { EsServiceScopedToken } from '../es_service/tokens';

interface ExecuteQueryParams {
query: ESQLSearchParams['query'];
Expand All @@ -21,16 +24,27 @@ interface ExecuteQueryParams {
abortSignal?: AbortSignal;
}

interface ExecuteQueryStreamingParams {
query: string;
dropNullColumns?: boolean;
allowPartialResults?: boolean;
abortSignal?: AbortSignal;
}

export interface QueryServiceContract {
executeQuery(params: ExecuteQueryParams): Promise<ESQLSearchResponse>;
queryResponseToRecords<T extends Record<string, any>>(response: ESQLSearchResponse): T[];
executeQueryStreaming(
params: ExecuteQueryStreamingParams
): AsyncIterable<Record<string, unknown>>;
}

@injectable()
export class QueryService implements QueryServiceContract {
constructor(
private readonly searchClient: IScopedSearchClient,
@inject(LoggerServiceToken) private readonly logger: LoggerServiceContract
@inject(LoggerServiceToken) private readonly logger: LoggerServiceContract,
@inject(EsServiceScopedToken) private readonly esClient: ElasticsearchClient
) {}

async executeQuery({
Expand Down Expand Up @@ -93,6 +107,90 @@ export class QueryService implements QueryServiceContract {
}
}

async *executeQueryStreaming({
query,
dropNullColumns = false,
allowPartialResults = true,
abortSignal,
}: ExecuteQueryStreamingParams): AsyncIterable<Record<string, unknown>> {
try {
this.logger.debug({
message: () =>
`QueryService: Starting streaming query - ${JSON.stringify({
query,
dropNullColumns,
allowPartialResults,
})}`,
});

// Use helpers.esql() for Arrow streaming support
const esqlHelper = this.esClient.helpers.esql({
query,
drop_null_columns: dropNullColumns,
allow_partial_results: allowPartialResults,
...(abortSignal ? { signal: abortSignal } : {}),
});

// Get Arrow reader for streaming record batches
const reader = await esqlHelper.toArrowReader();

let columnNames: string[] = [];

// Process record batches and yield row objects
for await (const recordBatch of reader) {
// Extract column names from the first batch
if (columnNames.length === 0 && recordBatch.schema.fields.length > 0) {
columnNames = recordBatch.schema.fields.map((field) => field.name);
}

// Convert batch to row objects and yield them individually
const rows = this.recordBatchToRecords(recordBatch, columnNames);
for (const row of rows) {
yield row;
}
}

this.logger.debug({
message: 'QueryService: Streaming query completed successfully',
});
} catch (error) {
this.logger.error({
error,
code: 'ESQL_STREAMING_QUERY_ERROR',
type: 'QueryServiceError',
});

throw error;
}
}

private recordBatchToRecords(
recordBatch: RecordBatch,
columnNames: string[]
): Record<string, unknown>[] {
const rows: Record<string, unknown>[] = [];

// Process each row in the batch
for (let rowIndex = 0; rowIndex < recordBatch.numRows; rowIndex++) {
const rowObject: Record<string, unknown> = {};

// Extract values for each column in this row
for (let colIndex = 0; colIndex < recordBatch.numCols; colIndex++) {
const column = recordBatch.getChildAt(colIndex);
if (column) {
const columnName = columnNames[colIndex] || `column_${colIndex}`;
// Get the value at the current row index
const value = column.get(rowIndex);
rowObject[columnName] = value;
}
}

rows.push(rowObject);
}

return rows;
}

public queryResponseToRecords<T extends Record<string, any>>(response: ESQLSearchResponse): T[] {
const objects: T[] = [];

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import Boom from '@hapi/boom';
import { Response } from '@kbn/core-di-server';
import type { KibanaResponseFactory, RouteSecurity } from '@kbn/core-http-server';
import { inject, injectable } from 'inversify';
import type { Logger as KibanaLogger } from '@kbn/logging';
import { Logger } from '@kbn/core-di';
import { ALERTING_V2_API_PRIVILEGES } from '../lib/security/privileges';
import type { QueryServiceContract } from '../lib/services/query_service/query_service';
import { QueryService } from '../lib/services/query_service/query_service';

@injectable()
export class GetStreamingRoute {
static method = 'get' as const;
static path = `/internal/alerting/v2/streaming`;
static security: RouteSecurity = {
authz: {
requiredPrivileges: [ALERTING_V2_API_PRIVILEGES.rules.read],
},
};
static options = { access: 'internal' } as const;
static validate = {} as const;

constructor(
@inject(Logger) private readonly logger: KibanaLogger,
@inject(Response) private readonly response: KibanaResponseFactory,
@inject(QueryService) private readonly queryService: QueryServiceContract
) {}

async handle() {
try {
let rowCount = 0;

// Process rows as they arrive (true streaming)
for await (const rowObject of this.queryService.executeQueryStreaming({
query: 'FROM .alerts-events*',
dropNullColumns: false,
allowPartialResults: true,
})) {
// Log each row as it becomes available
this.logger.info(`ESQL Row Result: ${JSON.stringify(rowObject)}`);
rowCount++;
}

return this.response.ok({
body: { message: 'Streaming completed', rowCount },
});
} catch (e) {
const boom = Boom.isBoom(e) ? e : Boom.boomify(e);
return this.response.customError({
statusCode: boom.output.statusCode,
body: boom.output.payload,
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import { UpdateRuleRoute } from '../routes/update_rule_route';
import { GetRulesRoute } from '../routes/get_rules_route';
import { GetRuleRoute } from '../routes/get_rule_route';
import { DeleteRuleRoute } from '../routes/delete_rule_route';
import { GetStreamingRoute } from '../routes/poc_streaming';

export function bindRoutes({ bind }: ContainerModuleLoadOptions) {
bind(Route).toConstantValue(CreateRuleRoute);
bind(Route).toConstantValue(UpdateRuleRoute);
bind(Route).toConstantValue(GetRulesRoute);
bind(Route).toConstantValue(GetRuleRoute);
bind(Route).toConstantValue(DeleteRuleRoute);
bind(Route).toConstantValue(GetStreamingRoute);
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ export function bindServices({ bind }: ContainerModuleLoadOptions) {
const request = get(Request);
const data = get(PluginStart<AlertingServerStartDependencies['data']>('data'));
const loggerService = get(LoggerServiceToken);
const esClient = get(EsServiceScopedToken);
const searchClient = data.search.asScoped(request);
return new QueryService(searchClient, loggerService);
return new QueryService(searchClient, loggerService, esClient);
})
.inRequestScope();

Expand Down