Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ export const genericEntityDefinition: EntityDefinitionWithoutId = {
},
indexPatterns: [],
fields: [
// entity.id doesn't need to be mapped because it's the main entity field
// and it's already mapped by default

// We want this to make sure it's also extracted on CCS logs extraction
newestValue({ source: 'entity.id' }),
newestValue({ source: 'entity.name' }),
...getEntityFieldsDescriptions(),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ export class AssetManager {
): Promise<CheckPrivilegesResponse> {
const checkPrivileges = this.security.authz.checkPrivilegesDynamicallyWithRequest(request);

const sourceIndexPatterns = await this.logsExtractionClient.getIndexPatterns(
const sourceIndexPatterns = await this.logsExtractionClient.getLocalIndexPatterns(
additionalIndexPatterns
);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { ESQLSearchResponse } from '@kbn/es-types';
import { loggerMock } from '@kbn/logging-mocks';
import type { ElasticsearchClient } from '@kbn/core/server';
import { CcsLogsExtractionClient } from './ccs_logs_extraction_client';
import type { CRUDClient } from './crud_client';
import { getEntityDefinition } from '../../common/domain/definitions/registry';
import { executeEsqlQuery } from '../infra/elasticsearch/esql';
import {
ENGINE_METADATA_PAGINATION_FIRST_SEEN_LOG_FIELD,
HASHED_ID_FIELD,
} from './logs_extraction/logs_extraction_query_builder';

const ENGINE_METADATA_UNTYPED_ID_FIELD = 'entity.EngineMetadata.UntypedId';

jest.mock('../infra/elasticsearch/esql', () => {
const actual = jest.requireActual<typeof import('../infra/elasticsearch/esql')>(
'../infra/elasticsearch/esql'
);
return {
...actual,
executeEsqlQuery: jest.fn(),
};
});

const mockExecuteEsqlQuery = executeEsqlQuery as jest.MockedFunction<typeof executeEsqlQuery>;

function createMockCrudClient(): jest.Mocked<Pick<CRUDClient, 'upsertEntitiesBulk'>> {
return {
upsertEntitiesBulk: jest.fn().mockResolvedValue([]),
};
}

describe('CcsLogsExtractionClient', () => {
let client: CcsLogsExtractionClient;
let mockCrudClient: ReturnType<typeof createMockCrudClient>;
const mockLogger = loggerMock.create();
const mockEsClient = {} as jest.Mocked<ElasticsearchClient>;

beforeEach(() => {
jest.clearAllMocks();
mockCrudClient = createMockCrudClient();
client = new CcsLogsExtractionClient(
mockLogger,
mockEsClient,
mockCrudClient as unknown as CRUDClient
);
});

it('should extract to updates via CRUD client and return count and pages', async () => {
const mockEsqlResponse: ESQLSearchResponse = {
columns: [
{ name: '@timestamp', type: 'date' },
{ name: HASHED_ID_FIELD, type: 'keyword' },
{ name: 'entity.id', type: 'keyword' },
],
values: [
['2024-06-15T12:00:00.000Z', 'hash1', 'host:host-1'],
['2024-06-15T12:00:00.000Z', 'hash2', 'host:host-2'],
],
};

mockExecuteEsqlQuery.mockResolvedValue(mockEsqlResponse);

const result = await client.extractToUpdates({
type: 'host',
remoteIndexPatterns: ['remote_cluster:logs-*'],
fromDateISO: '2024-01-01T00:00:00.000Z',
toDateISO: '2024-06-15T23:59:59.999Z',
docsLimit: 10000,
entityDefinition: getEntityDefinition('host', 'default'),
});

expect(result).toEqual({ count: 2, pages: 1 });
expect(mockExecuteEsqlQuery).toHaveBeenCalledTimes(1);
expect(mockCrudClient.upsertEntitiesBulk).toHaveBeenCalledTimes(1);
expect(mockCrudClient.upsertEntitiesBulk).toHaveBeenCalledWith(
expect.objectContaining({
objects: expect.arrayContaining([
{ type: 'host', doc: expect.objectContaining({ 'entity.id': 'host:host-1' }) },
{ type: 'host', doc: expect.objectContaining({ 'entity.id': 'host:host-2' }) },
]),
force: true,
timestampGenerator: expect.any(Function),
})
);
});

it('should call upsertEntitiesBulk with flat entity doc (dot-notation keys)', async () => {
mockExecuteEsqlQuery.mockResolvedValue({
columns: [
{ name: 'entity.id', type: 'keyword' },
{ name: 'entity.name', type: 'keyword' },
],
values: [['user:u1', 'alice']],
});

await client.extractToUpdates({
type: 'user',
remoteIndexPatterns: ['other:filebeat-*'],
fromDateISO: '2024-01-01T00:00:00.000Z',
toDateISO: '2024-01-01T23:59:59.999Z',
docsLimit: 5000,
entityDefinition: getEntityDefinition('user', 'default'),
});

const call = mockCrudClient.upsertEntitiesBulk.mock.calls[0];
expect(call[0].objects).toEqual([
{ type: 'user', doc: { 'entity.id': 'user:u1', 'entity.name': 'alice' } },
]);
expect(call[0]).toMatchObject({ force: true });
const timestampGenerator = call[0].timestampGenerator;
expect(timestampGenerator).toBeDefined();
const ts = timestampGenerator!();
const tsDate = new Date(ts);
expect(tsDate.getTime()).toBeGreaterThanOrEqual(new Date('2024-01-01T23:59:59.999Z').getTime());
expect(tsDate.getTime()).toBeLessThanOrEqual(
new Date('2024-01-01T23:59:59.999Z').getTime() + 10001
);
});

it('should paginate when ESQL returns full page and run upsert per page', async () => {
const docsLimit = 2;
const firstPage: ESQLSearchResponse = {
columns: [
{ name: '@timestamp', type: 'date' },
{ name: ENGINE_METADATA_PAGINATION_FIRST_SEEN_LOG_FIELD, type: 'date' },
{ name: ENGINE_METADATA_UNTYPED_ID_FIELD, type: 'keyword' },
{ name: HASHED_ID_FIELD, type: 'keyword' },
{ name: 'entity.id', type: 'keyword' },
],
values: [
['2024-06-15T10:00:00.000Z', '2024-06-15T10:00:00.000Z', 'id1', 'hash1', 'host:h1'],
['2024-06-15T10:00:00.000Z', '2024-06-15T10:00:00.000Z', 'id2', 'hash2', 'host:h2'],
],
};
const secondPage: ESQLSearchResponse = {
columns: firstPage.columns,
values: [['2024-06-15T11:00:00.000Z', '2024-06-15T11:00:00.000Z', 'id3', 'hash3', 'host:h3']],
};

mockExecuteEsqlQuery.mockResolvedValueOnce(firstPage).mockResolvedValueOnce(secondPage);

const result = await client.extractToUpdates({
type: 'host',
remoteIndexPatterns: ['remote:logs-*'],
fromDateISO: '2024-01-01T00:00:00.000Z',
toDateISO: '2024-06-15T23:59:59.999Z',
docsLimit,
entityDefinition: getEntityDefinition('host', 'default'),
});

expect(result).toEqual({ count: 3, pages: 2 });
expect(mockExecuteEsqlQuery).toHaveBeenCalledTimes(2);
expect(mockCrudClient.upsertEntitiesBulk).toHaveBeenCalledTimes(2);
expect(mockCrudClient.upsertEntitiesBulk).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
objects: expect.arrayContaining([
{ type: 'host', doc: expect.objectContaining({ 'entity.id': 'host:h1' }) },
{ type: 'host', doc: expect.objectContaining({ 'entity.id': 'host:h2' }) },
]),
force: true,
timestampGenerator: expect.any(Function),
})
);
expect(mockCrudClient.upsertEntitiesBulk).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
objects: [{ type: 'host', doc: expect.objectContaining({ 'entity.id': 'host:h3' }) }],
force: true,
timestampGenerator: expect.any(Function),
})
);
});

it('should return error when second page ESQL call is aborted', async () => {
const docsLimit = 2;
const firstPage: ESQLSearchResponse = {
columns: [
{ name: '@timestamp', type: 'date' },
{ name: ENGINE_METADATA_PAGINATION_FIRST_SEEN_LOG_FIELD, type: 'date' },
{ name: ENGINE_METADATA_UNTYPED_ID_FIELD, type: 'keyword' },
{ name: HASHED_ID_FIELD, type: 'keyword' },
{ name: 'entity.id', type: 'keyword' },
],
values: [
['2024-06-15T10:00:00.000Z', '2024-06-15T10:00:00.000Z', 'id1', 'hash1', 'host:h1'],
['2024-06-15T10:00:00.000Z', '2024-06-15T10:00:00.000Z', 'id2', 'hash2', 'host:h2'],
],
};

const abortError = new DOMException('aborted', 'AbortError');
mockExecuteEsqlQuery.mockResolvedValueOnce(firstPage).mockRejectedValueOnce(abortError);

const result = await client.extractToUpdates({
type: 'host',
remoteIndexPatterns: ['remote:logs-*'],
fromDateISO: '2024-01-01T00:00:00.000Z',
toDateISO: '2024-06-15T23:59:59.999Z',
docsLimit,
entityDefinition: getEntityDefinition('host', 'default'),
abortController: new AbortController(),
});

await expect(result.error).toBeDefined();

expect(mockExecuteEsqlQuery).toHaveBeenCalledTimes(2);
expect(mockCrudClient.upsertEntitiesBulk).toHaveBeenCalledTimes(1);
});

it('should return zero count and pages when ESQL returns no rows', async () => {
mockExecuteEsqlQuery.mockResolvedValue({
columns: [{ name: HASHED_ID_FIELD, type: 'keyword' }],
values: [],
});

const result = await client.extractToUpdates({
type: 'host',
remoteIndexPatterns: ['remote:logs-*'],
fromDateISO: '2024-01-01T00:00:00.000Z',
toDateISO: '2024-01-01T23:59:59.999Z',
docsLimit: 10000,
entityDefinition: getEntityDefinition('host', 'default'),
});

expect(result).toEqual({ count: 0, pages: 0 });
expect(mockCrudClient.upsertEntitiesBulk).not.toHaveBeenCalled();
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { Logger } from '@kbn/logging';
import type { ElasticsearchClient } from '@kbn/core/server';
import moment from 'moment';
import type {
EntityType,
ManagedEntityDefinition,
} from '../../common/domain/definitions/entity_schema';
import type { PaginationParams } from './logs_extraction/logs_extraction_query_builder';
import {
buildCcsLogsExtractionEsqlQuery,
ENGINE_METADATA_PAGINATION_FIRST_SEEN_LOG_FIELD,
extractPaginationParams,
} from './logs_extraction/logs_extraction_query_builder';
import { executeEsqlQuery, esqlResponseToBulkObjects } from '../infra/elasticsearch/esql';
import type { CRUDClient } from './crud_client';

export interface CcsExtractToUpdatesParams {
type: EntityType;
remoteIndexPatterns: string[];
fromDateISO: string;
toDateISO: string;
docsLimit: number;
entityDefinition: ManagedEntityDefinition;
abortController?: AbortController;
}

export interface CcsExtractToUpdatesResult {
count: number;
pages: number;
error?: Error;
}

export class CcsLogsExtractionClient {
constructor(
private readonly logger: Logger,
private readonly esClient: ElasticsearchClient,
private readonly crudClient: CRUDClient
) {}

public async extractToUpdates(
params: CcsExtractToUpdatesParams
): Promise<CcsExtractToUpdatesResult> {
try {
return await this.doExtractToUpdates(params);
} catch (error) {
const wrappedError = new Error(
`Failed to extract to updates from CCS indices: ${error.message}`
);
this.logger.error(wrappedError);
return { count: 0, pages: 0, error: wrappedError };
}
}

private async doExtractToUpdates({
type,
remoteIndexPatterns,
fromDateISO,
toDateISO,
docsLimit,
entityDefinition,
abortController,
}: CcsExtractToUpdatesParams): Promise<CcsExtractToUpdatesResult> {
let totalCount = 0;
let pages = 0;
let pagination: PaginationParams | undefined;

const onAbort = () => this.logger.debug('Aborting CCS logs extraction');
abortController?.signal.addEventListener('abort', onAbort);

do {
const query = buildCcsLogsExtractionEsqlQuery({
indexPatterns: remoteIndexPatterns,
entityDefinition,
fromDateISO,
toDateISO,
docsLimit,
pagination,
});

this.logger.info(
`Running CCS extraction from ${fromDateISO} to ${toDateISO} ${
pagination
? `with pagination: ${pagination.timestampCursor} | ${pagination.idCursor}`
: ''
}`
);

const esqlResponse = await executeEsqlQuery({
esClient: this.esClient,
query,
abortController,
});

totalCount += esqlResponse.values.length;
pagination = extractPaginationParams(esqlResponse, docsLimit);

if (esqlResponse.values.length > 0) {
Comment thread
romulets marked this conversation as resolved.
pages++;
this.logger.debug(
`CCS extraction ingesting ${esqlResponse.values.length} partial entities`
);
const bulkObjects = esqlResponseToBulkObjects(esqlResponse, type, [
ENGINE_METADATA_PAGINATION_FIRST_SEEN_LOG_FIELD,
]);

const momentToDate = moment.utc(toDateISO);
let inc = 0;
await this.crudClient.upsertEntitiesBulk({
objects: bulkObjects,
force: true,
// It's good to generate a sparse timestamp to avoid too many ts collisions
// in the main extraction
timestampGenerator: () => {
inc++;
Comment thread
romulets marked this conversation as resolved.
return momentToDate.add(inc, 'ms').toISOString();
},
});
}
} while (pagination);

abortController?.signal.removeEventListener('abort', onAbort);

return { count: totalCount, pages };
}
}
Loading
Loading