diff --git a/x-pack/solutions/security/plugins/entity_store/common/domain/definitions/generic.ts b/x-pack/solutions/security/plugins/entity_store/common/domain/definitions/generic.ts index fff3f5371767f..613168a004ad0 100644 --- a/x-pack/solutions/security/plugins/entity_store/common/domain/definitions/generic.ts +++ b/x-pack/solutions/security/plugins/entity_store/common/domain/definitions/generic.ts @@ -18,9 +18,8 @@ export const genericEntityDefinition: EntityDefinitionWithoutId = { }, indexPatterns: [], fields: [ - // entity.id doesn't need to be mapped because it's the main entity field - // and it's already mapped by default - + // We want this to make sure it's also extracted on CCS logs extraction + newestValue({ source: 'entity.id' }), newestValue({ source: 'entity.name' }), ...getEntityFieldsDescriptions(), diff --git a/x-pack/solutions/security/plugins/entity_store/server/domain/asset_manager.ts b/x-pack/solutions/security/plugins/entity_store/server/domain/asset_manager.ts index 1b8545e3db712..bb2b53a0c8c3c 100644 --- a/x-pack/solutions/security/plugins/entity_store/server/domain/asset_manager.ts +++ b/x-pack/solutions/security/plugins/entity_store/server/domain/asset_manager.ts @@ -237,7 +237,7 @@ export class AssetManager { ): Promise { const checkPrivileges = this.security.authz.checkPrivilegesDynamicallyWithRequest(request); - const sourceIndexPatterns = await this.logsExtractionClient.getIndexPatterns( + const sourceIndexPatterns = await this.logsExtractionClient.getLocalIndexPatterns( additionalIndexPatterns ); diff --git a/x-pack/solutions/security/plugins/entity_store/server/domain/ccs_logs_extraction_client.test.ts b/x-pack/solutions/security/plugins/entity_store/server/domain/ccs_logs_extraction_client.test.ts new file mode 100644 index 0000000000000..4179f62c8b0fd --- /dev/null +++ b/x-pack/solutions/security/plugins/entity_store/server/domain/ccs_logs_extraction_client.test.ts @@ -0,0 +1,236 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { ESQLSearchResponse } from '@kbn/es-types'; +import { loggerMock } from '@kbn/logging-mocks'; +import type { ElasticsearchClient } from '@kbn/core/server'; +import { CcsLogsExtractionClient } from './ccs_logs_extraction_client'; +import type { CRUDClient } from './crud_client'; +import { getEntityDefinition } from '../../common/domain/definitions/registry'; +import { executeEsqlQuery } from '../infra/elasticsearch/esql'; +import { + ENGINE_METADATA_PAGINATION_FIRST_SEEN_LOG_FIELD, + HASHED_ID_FIELD, +} from './logs_extraction/logs_extraction_query_builder'; + +const ENGINE_METADATA_UNTYPED_ID_FIELD = 'entity.EngineMetadata.UntypedId'; + +jest.mock('../infra/elasticsearch/esql', () => { + const actual = jest.requireActual( + '../infra/elasticsearch/esql' + ); + return { + ...actual, + executeEsqlQuery: jest.fn(), + }; +}); + +const mockExecuteEsqlQuery = executeEsqlQuery as jest.MockedFunction; + +function createMockCrudClient(): jest.Mocked> { + return { + upsertEntitiesBulk: jest.fn().mockResolvedValue([]), + }; +} + +describe('CcsLogsExtractionClient', () => { + let client: CcsLogsExtractionClient; + let mockCrudClient: ReturnType; + const mockLogger = loggerMock.create(); + const mockEsClient = {} as jest.Mocked; + + beforeEach(() => { + jest.clearAllMocks(); + mockCrudClient = createMockCrudClient(); + client = new CcsLogsExtractionClient( + mockLogger, + mockEsClient, + mockCrudClient as unknown as CRUDClient + ); + }); + + it('should extract to updates via CRUD client and return count and pages', async () => { + const mockEsqlResponse: ESQLSearchResponse = { + columns: [ + { name: '@timestamp', type: 'date' }, + { name: HASHED_ID_FIELD, type: 'keyword' }, + { name: 'entity.id', type: 'keyword' }, + ], + values: [ + ['2024-06-15T12:00:00.000Z', 'hash1', 'host:host-1'], + ['2024-06-15T12:00:00.000Z', 'hash2', 'host:host-2'], + ], + }; + + mockExecuteEsqlQuery.mockResolvedValue(mockEsqlResponse); + + const result = await client.extractToUpdates({ + type: 'host', + remoteIndexPatterns: ['remote_cluster:logs-*'], + fromDateISO: '2024-01-01T00:00:00.000Z', + toDateISO: '2024-06-15T23:59:59.999Z', + docsLimit: 10000, + entityDefinition: getEntityDefinition('host', 'default'), + }); + + expect(result).toEqual({ count: 2, pages: 1 }); + expect(mockExecuteEsqlQuery).toHaveBeenCalledTimes(1); + expect(mockCrudClient.upsertEntitiesBulk).toHaveBeenCalledTimes(1); + expect(mockCrudClient.upsertEntitiesBulk).toHaveBeenCalledWith( + expect.objectContaining({ + objects: expect.arrayContaining([ + { type: 'host', doc: expect.objectContaining({ 'entity.id': 'host:host-1' }) }, + { type: 'host', doc: expect.objectContaining({ 'entity.id': 'host:host-2' }) }, + ]), + force: true, + timestampGenerator: expect.any(Function), + }) + ); + }); + + it('should call upsertEntitiesBulk with flat entity doc (dot-notation keys)', async () => { + mockExecuteEsqlQuery.mockResolvedValue({ + columns: [ + { name: 'entity.id', type: 'keyword' }, + { name: 'entity.name', type: 'keyword' }, + ], + values: [['user:u1', 'alice']], + }); + + await client.extractToUpdates({ + type: 'user', + remoteIndexPatterns: ['other:filebeat-*'], + fromDateISO: '2024-01-01T00:00:00.000Z', + toDateISO: '2024-01-01T23:59:59.999Z', + docsLimit: 5000, + entityDefinition: getEntityDefinition('user', 'default'), + }); + + const call = mockCrudClient.upsertEntitiesBulk.mock.calls[0]; + expect(call[0].objects).toEqual([ + { type: 'user', doc: { 'entity.id': 'user:u1', 'entity.name': 'alice' } }, + ]); + expect(call[0]).toMatchObject({ force: true }); + const timestampGenerator = call[0].timestampGenerator; + expect(timestampGenerator).toBeDefined(); + const ts = timestampGenerator!(); + const tsDate = new Date(ts); + expect(tsDate.getTime()).toBeGreaterThanOrEqual(new Date('2024-01-01T23:59:59.999Z').getTime()); + expect(tsDate.getTime()).toBeLessThanOrEqual( + new Date('2024-01-01T23:59:59.999Z').getTime() + 10001 + ); + }); + + it('should paginate when ESQL returns full page and run upsert per page', async () => { + const docsLimit = 2; + const firstPage: ESQLSearchResponse = { + columns: [ + { name: '@timestamp', type: 'date' }, + { name: ENGINE_METADATA_PAGINATION_FIRST_SEEN_LOG_FIELD, type: 'date' }, + { name: ENGINE_METADATA_UNTYPED_ID_FIELD, type: 'keyword' }, + { name: HASHED_ID_FIELD, type: 'keyword' }, + { name: 'entity.id', type: 'keyword' }, + ], + values: [ + ['2024-06-15T10:00:00.000Z', '2024-06-15T10:00:00.000Z', 'id1', 'hash1', 'host:h1'], + ['2024-06-15T10:00:00.000Z', '2024-06-15T10:00:00.000Z', 'id2', 'hash2', 'host:h2'], + ], + }; + const secondPage: ESQLSearchResponse = { + columns: firstPage.columns, + values: [['2024-06-15T11:00:00.000Z', '2024-06-15T11:00:00.000Z', 'id3', 'hash3', 'host:h3']], + }; + + mockExecuteEsqlQuery.mockResolvedValueOnce(firstPage).mockResolvedValueOnce(secondPage); + + const result = await client.extractToUpdates({ + type: 'host', + remoteIndexPatterns: ['remote:logs-*'], + fromDateISO: '2024-01-01T00:00:00.000Z', + toDateISO: '2024-06-15T23:59:59.999Z', + docsLimit, + entityDefinition: getEntityDefinition('host', 'default'), + }); + + expect(result).toEqual({ count: 3, pages: 2 }); + expect(mockExecuteEsqlQuery).toHaveBeenCalledTimes(2); + expect(mockCrudClient.upsertEntitiesBulk).toHaveBeenCalledTimes(2); + expect(mockCrudClient.upsertEntitiesBulk).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + objects: expect.arrayContaining([ + { type: 'host', doc: expect.objectContaining({ 'entity.id': 'host:h1' }) }, + { type: 'host', doc: expect.objectContaining({ 'entity.id': 'host:h2' }) }, + ]), + force: true, + timestampGenerator: expect.any(Function), + }) + ); + expect(mockCrudClient.upsertEntitiesBulk).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + objects: [{ type: 'host', doc: expect.objectContaining({ 'entity.id': 'host:h3' }) }], + force: true, + timestampGenerator: expect.any(Function), + }) + ); + }); + + it('should return error when second page ESQL call is aborted', async () => { + const docsLimit = 2; + const firstPage: ESQLSearchResponse = { + columns: [ + { name: '@timestamp', type: 'date' }, + { name: ENGINE_METADATA_PAGINATION_FIRST_SEEN_LOG_FIELD, type: 'date' }, + { name: ENGINE_METADATA_UNTYPED_ID_FIELD, type: 'keyword' }, + { name: HASHED_ID_FIELD, type: 'keyword' }, + { name: 'entity.id', type: 'keyword' }, + ], + values: [ + ['2024-06-15T10:00:00.000Z', '2024-06-15T10:00:00.000Z', 'id1', 'hash1', 'host:h1'], + ['2024-06-15T10:00:00.000Z', '2024-06-15T10:00:00.000Z', 'id2', 'hash2', 'host:h2'], + ], + }; + + const abortError = new DOMException('aborted', 'AbortError'); + mockExecuteEsqlQuery.mockResolvedValueOnce(firstPage).mockRejectedValueOnce(abortError); + + const result = await client.extractToUpdates({ + type: 'host', + remoteIndexPatterns: ['remote:logs-*'], + fromDateISO: '2024-01-01T00:00:00.000Z', + toDateISO: '2024-06-15T23:59:59.999Z', + docsLimit, + entityDefinition: getEntityDefinition('host', 'default'), + abortController: new AbortController(), + }); + + await expect(result.error).toBeDefined(); + + expect(mockExecuteEsqlQuery).toHaveBeenCalledTimes(2); + expect(mockCrudClient.upsertEntitiesBulk).toHaveBeenCalledTimes(1); + }); + + it('should return zero count and pages when ESQL returns no rows', async () => { + mockExecuteEsqlQuery.mockResolvedValue({ + columns: [{ name: HASHED_ID_FIELD, type: 'keyword' }], + values: [], + }); + + const result = await client.extractToUpdates({ + type: 'host', + remoteIndexPatterns: ['remote:logs-*'], + fromDateISO: '2024-01-01T00:00:00.000Z', + toDateISO: '2024-01-01T23:59:59.999Z', + docsLimit: 10000, + entityDefinition: getEntityDefinition('host', 'default'), + }); + + expect(result).toEqual({ count: 0, pages: 0 }); + expect(mockCrudClient.upsertEntitiesBulk).not.toHaveBeenCalled(); + }); +}); diff --git a/x-pack/solutions/security/plugins/entity_store/server/domain/ccs_logs_extraction_client.ts b/x-pack/solutions/security/plugins/entity_store/server/domain/ccs_logs_extraction_client.ts new file mode 100644 index 0000000000000..f3005c835588e --- /dev/null +++ b/x-pack/solutions/security/plugins/entity_store/server/domain/ccs_logs_extraction_client.ts @@ -0,0 +1,132 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Logger } from '@kbn/logging'; +import type { ElasticsearchClient } from '@kbn/core/server'; +import moment from 'moment'; +import type { + EntityType, + ManagedEntityDefinition, +} from '../../common/domain/definitions/entity_schema'; +import type { PaginationParams } from './logs_extraction/logs_extraction_query_builder'; +import { + buildCcsLogsExtractionEsqlQuery, + ENGINE_METADATA_PAGINATION_FIRST_SEEN_LOG_FIELD, + extractPaginationParams, +} from './logs_extraction/logs_extraction_query_builder'; +import { executeEsqlQuery, esqlResponseToBulkObjects } from '../infra/elasticsearch/esql'; +import type { CRUDClient } from './crud_client'; + +export interface CcsExtractToUpdatesParams { + type: EntityType; + remoteIndexPatterns: string[]; + fromDateISO: string; + toDateISO: string; + docsLimit: number; + entityDefinition: ManagedEntityDefinition; + abortController?: AbortController; +} + +export interface CcsExtractToUpdatesResult { + count: number; + pages: number; + error?: Error; +} + +export class CcsLogsExtractionClient { + constructor( + private readonly logger: Logger, + private readonly esClient: ElasticsearchClient, + private readonly crudClient: CRUDClient + ) {} + + public async extractToUpdates( + params: CcsExtractToUpdatesParams + ): Promise { + try { + return await this.doExtractToUpdates(params); + } catch (error) { + const wrappedError = new Error( + `Failed to extract to updates from CCS indices: ${error.message}` + ); + this.logger.error(wrappedError); + return { count: 0, pages: 0, error: wrappedError }; + } + } + + private async doExtractToUpdates({ + type, + remoteIndexPatterns, + fromDateISO, + toDateISO, + docsLimit, + entityDefinition, + abortController, + }: CcsExtractToUpdatesParams): Promise { + let totalCount = 0; + let pages = 0; + let pagination: PaginationParams | undefined; + + const onAbort = () => this.logger.debug('Aborting CCS logs extraction'); + abortController?.signal.addEventListener('abort', onAbort); + + do { + const query = buildCcsLogsExtractionEsqlQuery({ + indexPatterns: remoteIndexPatterns, + entityDefinition, + fromDateISO, + toDateISO, + docsLimit, + pagination, + }); + + this.logger.info( + `Running CCS extraction from ${fromDateISO} to ${toDateISO} ${ + pagination + ? `with pagination: ${pagination.timestampCursor} | ${pagination.idCursor}` + : '' + }` + ); + + const esqlResponse = await executeEsqlQuery({ + esClient: this.esClient, + query, + abortController, + }); + + totalCount += esqlResponse.values.length; + pagination = extractPaginationParams(esqlResponse, docsLimit); + + if (esqlResponse.values.length > 0) { + pages++; + this.logger.debug( + `CCS extraction ingesting ${esqlResponse.values.length} partial entities` + ); + const bulkObjects = esqlResponseToBulkObjects(esqlResponse, type, [ + ENGINE_METADATA_PAGINATION_FIRST_SEEN_LOG_FIELD, + ]); + + const momentToDate = moment.utc(toDateISO); + let inc = 0; + await this.crudClient.upsertEntitiesBulk({ + objects: bulkObjects, + force: true, + // It's good to generate a sparse timestamp to avoid too many ts collisions + // in the main extraction + timestampGenerator: () => { + inc++; + return momentToDate.add(inc, 'ms').toISOString(); + }, + }); + } + } while (pagination); + + abortController?.signal.removeEventListener('abort', onAbort); + + return { count: totalCount, pages }; + } +} diff --git a/x-pack/solutions/security/plugins/entity_store/server/domain/crud_client/index.ts b/x-pack/solutions/security/plugins/entity_store/server/domain/crud_client/index.ts index 685abde22cae6..ec7b6373702b7 100644 --- a/x-pack/solutions/security/plugins/entity_store/server/domain/crud_client/index.ts +++ b/x-pack/solutions/security/plugins/entity_store/server/domain/crud_client/index.ts @@ -40,6 +40,12 @@ interface BulkObjectResponse { reason: string; } +interface UpsertEntitiesBulkParams { + objects: BulkObject[]; + force?: boolean; + timestampGenerator?: () => string; +} + export class CRUDClient { private readonly logger: Logger; private readonly esClient: ElasticsearchClient; @@ -94,15 +100,22 @@ export class CRUDClient { // UPDATES index for log extraction task to pick up. This will result in // appropriate Entities being created or updated on next log extraction run. // This is considered a bulk asynchronous upsert. - public async upsertEntitiesBulk( - objects: BulkObject[], - force: boolean - ): Promise { + public async upsertEntitiesBulk({ + objects, + force = false, + timestampGenerator, + }: UpsertEntitiesBulkParams): Promise { const operations: (BulkOperationContainer | BulkUpdateAction)[] = []; this.logger.debug(`Preparing ${objects.length} entities for bulk upsert`); for (const { type: entityType, doc } of objects) { - const readyDoc = validateAndTransformDocForUpsert(entityType, this.namespace, doc, force); + const readyDoc = validateAndTransformDocForUpsert( + entityType, + this.namespace, + doc, + force, + timestampGenerator + ); operations.push({ create: {} }, readyDoc); } diff --git a/x-pack/solutions/security/plugins/entity_store/server/domain/crud_client/utils.test.ts b/x-pack/solutions/security/plugins/entity_store/server/domain/crud_client/utils.test.ts index 9ae59c37fb53b..db4da2f3007fb 100644 --- a/x-pack/solutions/security/plugins/entity_store/server/domain/crud_client/utils.test.ts +++ b/x-pack/solutions/security/plugins/entity_store/server/domain/crud_client/utils.test.ts @@ -173,4 +173,24 @@ describe('crud_client utils', () => { expect(() => validateAndTransformDocForUpsert('generic', 'default', doc, true)).not.toThrow(); }); + + it('validateAndTransformDocForUpsert: accepts flat doc (dot-notation keys) when force=true', () => { + mockGetEntityDefinition.mockReturnValue( + createDefinition('user', [createField('entity.id'), createField('entity.name')]) + ); + + const flatDoc = { + 'entity.id': 'user:u1', + 'entity.name': 'alice', + } as unknown as Entity; + + const result = validateAndTransformDocForUpsert('user', 'default', flatDoc, true); + + expect(result).toHaveProperty('@timestamp'); + expect(typeof result['@timestamp']).toBe('string'); + expect(result['entity.id']).toBe('user:u1'); + expect(result['entity.name']).toBe('alice'); + expect(result).toHaveProperty('user'); + expect((result.user as Record).entity).toBeUndefined(); + }); }); diff --git a/x-pack/solutions/security/plugins/entity_store/server/domain/crud_client/utils.ts b/x-pack/solutions/security/plugins/entity_store/server/domain/crud_client/utils.ts index b8eb5906200f7..14ddfcfe06883 100644 --- a/x-pack/solutions/security/plugins/entity_store/server/domain/crud_client/utils.ts +++ b/x-pack/solutions/security/plugins/entity_store/server/domain/crud_client/utils.ts @@ -29,7 +29,8 @@ export function validateAndTransformDocForUpsert( entityType: EntityType, namespace: string, doc: Entity, - force: boolean + force: boolean, + timestampGenerator?: () => string ): Record { const definition = getEntityDefinition(entityType, namespace); if (!force) { @@ -37,7 +38,7 @@ export function validateAndTransformDocForUpsert( const fieldDescriptions = getFieldDescriptions(flat, definition); assertOnlyNonForcedAttributesInReq(fieldDescriptions); } - return transformDocForUpsert(entityType, doc); + return transformDocForUpsert(entityType, doc, timestampGenerator); } function getFieldDescriptions( @@ -97,10 +98,14 @@ function assertOnlyNonForcedAttributesInReq(fields: Record) } } -function transformDocForUpsert(type: EntityType, data: Partial): Record { +function transformDocForUpsert( + type: EntityType, + data: Partial, + timestampGenerator?: () => string +): Record { const doc: Record = { - '@timestamp': new Date().toISOString(), ...data, + '@timestamp': timestampGenerator ? timestampGenerator() : new Date().toISOString(), }; if (type === GENERIC_TYPE) { diff --git a/x-pack/solutions/security/plugins/entity_store/server/domain/logs_extraction/__snapshots__/logs_extraction_query_builder.test.ts.snap b/x-pack/solutions/security/plugins/entity_store/server/domain/logs_extraction/__snapshots__/logs_extraction_query_builder.test.ts.snap index cd0497e9097a5..74ba3ced1cc2c 100644 --- a/x-pack/solutions/security/plugins/entity_store/server/domain/logs_extraction/__snapshots__/logs_extraction_query_builder.test.ts.snap +++ b/x-pack/solutions/security/plugins/entity_store/server/domain/logs_extraction/__snapshots__/logs_extraction_query_builder.test.ts.snap @@ -1,5 +1,484 @@ // Jest Snapshot v1, https://goo.gl/fbAQLP +exports[`buildCcsLogsExtractionEsqlQuery generates expected query for host entity type 1`] = ` +"SET unmapped_fields=\\"nullify\\"; + FROM remote:metrics-* + METADATA _index + | WHERE ((host.entity.id IS NOT NULL AND host.entity.id != \\"\\") OR (host.id IS NOT NULL AND host.id != \\"\\") OR (host.name IS NOT NULL AND host.name != \\"\\") OR (host.hostname IS NOT NULL AND host.hostname != \\"\\")) + AND @timestamp > TO_DATETIME(\\"2022-01-01T00:00:00.000Z\\") + AND @timestamp <= TO_DATETIME(\\"2022-01-01T23:59:59.999Z\\")| EVAL recent.entity.EngineMetadata.UntypedId = CONCAT(\\"host:\\", CASE((host.entity.id IS NOT NULL AND host.entity.id != \\"\\"), host.entity.id, +(host.id IS NOT NULL AND host.id != \\"\\"), host.id, +(host.name IS NOT NULL AND host.name != \\"\\" AND host.domain IS NOT NULL AND host.domain != \\"\\"), CONCAT(host.name, \\".\\", host.domain), +(host.hostname IS NOT NULL AND host.hostname != \\"\\" AND host.domain IS NOT NULL AND host.domain != \\"\\"), CONCAT(host.hostname, \\".\\", host.domain), +(host.name IS NOT NULL AND host.name != \\"\\"), host.name, +(host.hostname IS NOT NULL AND host.hostname != \\"\\"), host.hostname, NULL)) + | STATS + @timestamp = MAX(@timestamp), + recent.entity.EngineMetadata.FirstSeenLogInPage = MIN(@timestamp), + entity.name = LAST(TO_STRING(host.name), @timestamp) WHERE host.name IS NOT NULL, + host.entity.id = FIRST(TO_STRING(host.entity.id), @timestamp) WHERE host.entity.id IS NOT NULL, + host.name = TOP(MV_DEDUPE(TO_STRING(host.name)), 10) WHERE host.name IS NOT NULL, + host.domain = TOP(MV_DEDUPE(TO_STRING(host.domain)), 10) WHERE host.domain IS NOT NULL, + host.hostname = TOP(MV_DEDUPE(TO_STRING(host.hostname)), 10) WHERE host.hostname IS NOT NULL, + host.id = TOP(MV_DEDUPE(TO_STRING(host.id)), 10) WHERE host.id IS NOT NULL, + host.os.name = TOP(MV_DEDUPE(TO_STRING(host.os.name)), 10) WHERE host.os.name IS NOT NULL, + host.os.type = TOP(MV_DEDUPE(TO_STRING(host.os.type)), 10) WHERE host.os.type IS NOT NULL, + host.ip = TOP(MV_DEDUPE(TO_IP(host.ip)), 10) WHERE host.ip IS NOT NULL, + host.mac = TOP(MV_DEDUPE(TO_STRING(host.mac)), 10) WHERE host.mac IS NOT NULL, + host.type = TOP(MV_DEDUPE(TO_STRING(host.type)), 10) WHERE host.type IS NOT NULL, + host.architecture = TOP(MV_DEDUPE(TO_STRING(host.architecture)), 10) WHERE host.architecture IS NOT NULL, + host.boot.id = LAST(TO_STRING(host.boot.id), @timestamp) WHERE host.boot.id IS NOT NULL, + host.cpu.usage = LAST(host.cpu.usage, @timestamp) WHERE host.cpu.usage IS NOT NULL, + host.disk.read.bytes = LAST(TO_LONG(host.disk.read.bytes), @timestamp) WHERE host.disk.read.bytes IS NOT NULL, + host.disk.write.bytes = LAST(TO_LONG(host.disk.write.bytes), @timestamp) WHERE host.disk.write.bytes IS NOT NULL, + host.network.egress.bytes = LAST(TO_LONG(host.network.egress.bytes), @timestamp) WHERE host.network.egress.bytes IS NOT NULL, + host.network.egress.packets = LAST(TO_LONG(host.network.egress.packets), @timestamp) WHERE host.network.egress.packets IS NOT NULL, + host.network.ingress.bytes = LAST(TO_LONG(host.network.ingress.bytes), @timestamp) WHERE host.network.ingress.bytes IS NOT NULL, + host.network.ingress.packets = LAST(TO_LONG(host.network.ingress.packets), @timestamp) WHERE host.network.ingress.packets IS NOT NULL, + host.uptime = LAST(TO_LONG(host.uptime), @timestamp) WHERE host.uptime IS NOT NULL, + host.pid_ns_ino = LAST(TO_STRING(host.pid_ns_ino), @timestamp) WHERE host.pid_ns_ino IS NOT NULL, + host.os.family = LAST(TO_STRING(host.os.family), @timestamp) WHERE host.os.family IS NOT NULL, + host.os.full = LAST(TO_STRING(host.os.full), @timestamp) WHERE host.os.full IS NOT NULL, + host.os.kernel = LAST(TO_STRING(host.os.kernel), @timestamp) WHERE host.os.kernel IS NOT NULL, + host.os.platform = LAST(TO_STRING(host.os.platform), @timestamp) WHERE host.os.platform IS NOT NULL, + host.os.version = LAST(TO_STRING(host.os.version), @timestamp) WHERE host.os.version IS NOT NULL, + host.geo.city_name = LAST(TO_STRING(host.geo.city_name), @timestamp) WHERE host.geo.city_name IS NOT NULL, + host.geo.continent_code = LAST(TO_STRING(host.geo.continent_code), @timestamp) WHERE host.geo.continent_code IS NOT NULL, + host.geo.continent_name = LAST(TO_STRING(host.geo.continent_name), @timestamp) WHERE host.geo.continent_name IS NOT NULL, + host.geo.country_iso_code = LAST(TO_STRING(host.geo.country_iso_code), @timestamp) WHERE host.geo.country_iso_code IS NOT NULL, + host.geo.country_name = LAST(TO_STRING(host.geo.country_name), @timestamp) WHERE host.geo.country_name IS NOT NULL, + host.geo.name = TOP(MV_DEDUPE(TO_STRING(host.geo.name)), 10) WHERE host.geo.name IS NOT NULL, + host.geo.postal_code = TOP(MV_DEDUPE(TO_STRING(host.geo.postal_code)), 10) WHERE host.geo.postal_code IS NOT NULL, + host.geo.region_iso_code = TOP(MV_DEDUPE(TO_STRING(host.geo.region_iso_code)), 10) WHERE host.geo.region_iso_code IS NOT NULL, + host.geo.region_name = TOP(MV_DEDUPE(TO_STRING(host.geo.region_name)), 10) WHERE host.geo.region_name IS NOT NULL, + host.geo.timezone = TOP(MV_DEDUPE(TO_STRING(host.geo.timezone)), 10) WHERE host.geo.timezone IS NOT NULL, + entity.source = LAST(TO_STRING(_index), @timestamp) WHERE _index IS NOT NULL, + asset.id = LAST(TO_STRING(asset.id), @timestamp) WHERE asset.id IS NOT NULL, + asset.name = LAST(TO_STRING(asset.name), @timestamp) WHERE asset.name IS NOT NULL, + asset.owner = LAST(TO_STRING(asset.owner), @timestamp) WHERE asset.owner IS NOT NULL, + asset.serial_number = LAST(TO_STRING(asset.serial_number), @timestamp) WHERE asset.serial_number IS NOT NULL, + asset.model = LAST(TO_STRING(asset.model), @timestamp) WHERE asset.model IS NOT NULL, + asset.vendor = LAST(TO_STRING(asset.vendor), @timestamp) WHERE asset.vendor IS NOT NULL, + asset.environment = LAST(TO_STRING(asset.environment), @timestamp) WHERE asset.environment IS NOT NULL, + asset.criticality = LAST(TO_STRING(asset.criticality), @timestamp) WHERE asset.criticality IS NOT NULL, + asset.business_unit = LAST(TO_STRING(asset.business_unit), @timestamp) WHERE asset.business_unit IS NOT NULL, + entity.risk.calculated_level = LAST(TO_STRING(host.risk.calculated_level), @timestamp) WHERE host.risk.calculated_level IS NOT NULL, + entity.risk.calculated_score = LAST(TO_DOUBLE(host.risk.calculated_score), @timestamp) WHERE host.risk.calculated_score IS NOT NULL, + entity.risk.calculated_score_norm = LAST(TO_DOUBLE(host.risk.calculated_score_norm), @timestamp) WHERE host.risk.calculated_score_norm IS NOT NULL, + entity.source = LAST(TO_STRING(host.entity.source), @timestamp) WHERE host.entity.source IS NOT NULL, + entity.type = LAST(TO_STRING(host.entity.type), @timestamp) WHERE host.entity.type IS NOT NULL, + entity.sub_type = LAST(TO_STRING(host.entity.sub_type), @timestamp) WHERE host.entity.sub_type IS NOT NULL, + entity.url = LAST(TO_STRING(host.entity.url), @timestamp) WHERE host.entity.url IS NOT NULL, + entity.attributes.watchlists = TOP(MV_DEDUPE(TO_STRING(host.entity.attributes.watchlists)), 10) WHERE host.entity.attributes.watchlists IS NOT NULL, + entity.attributes.asset = LAST(TO_BOOLEAN(host.entity.attributes.asset), @timestamp) WHERE host.entity.attributes.asset IS NOT NULL, + entity.attributes.managed = LAST(TO_BOOLEAN(host.entity.attributes.managed), @timestamp) WHERE host.entity.attributes.managed IS NOT NULL, + entity.attributes.mfa_enabled = LAST(TO_BOOLEAN(host.entity.attributes.mfa_enabled), @timestamp) WHERE host.entity.attributes.mfa_enabled IS NOT NULL, + entity.lifecycle.first_seen = LAST(TO_DATETIME(host.entity.lifecycle.first_seen), @timestamp) WHERE host.entity.lifecycle.first_seen IS NOT NULL, + entity.lifecycle.last_activity = LAST(TO_DATETIME(host.entity.lifecycle.last_activity), @timestamp) WHERE host.entity.lifecycle.last_activity IS NOT NULL, + entity.behaviors.rule_names = TOP(MV_DEDUPE(TO_STRING(host.entity.behaviors.rule_names)), 100) WHERE host.entity.behaviors.rule_names IS NOT NULL, + entity.behaviors.anomaly_job_ids = TOP(MV_DEDUPE(TO_STRING(host.entity.behaviors.anomaly_job_ids)), 100) WHERE host.entity.behaviors.anomaly_job_ids IS NOT NULL, + entity.relationships.communicates_with = TOP(MV_DEDUPE(TO_STRING(host.entity.relationships.communicates_with)), 10) WHERE host.entity.relationships.communicates_with IS NOT NULL, + entity.relationships.depends_on = TOP(MV_DEDUPE(TO_STRING(host.entity.relationships.depends_on)), 10) WHERE host.entity.relationships.depends_on IS NOT NULL, + entity.relationships.owns_inferred = TOP(MV_DEDUPE(TO_STRING(host.entity.relationships.owns_inferred)), 10) WHERE host.entity.relationships.owns_inferred IS NOT NULL, + entity.relationships.accesses_infrequently = TOP(MV_DEDUPE(TO_STRING(host.entity.relationships.accesses_infrequently)), 10) WHERE host.entity.relationships.accesses_infrequently IS NOT NULL, + entity.relationships.accesses_frequently = TOP(MV_DEDUPE(TO_STRING(host.entity.relationships.accesses_frequently)), 10) WHERE host.entity.relationships.accesses_frequently IS NOT NULL, + entity.relationships.owns = TOP(MV_DEDUPE(TO_STRING(host.entity.relationships.owns)), 10) WHERE host.entity.relationships.owns IS NOT NULL, + entity.relationships.supervises = TOP(MV_DEDUPE(TO_STRING(host.entity.relationships.supervises)), 10) WHERE host.entity.relationships.supervises IS NOT NULL, + entity.relationships.resolution.resolved_to = LAST(TO_STRING(host.entity.relationships.resolution.resolved_to), @timestamp) WHERE host.entity.relationships.resolution.resolved_to IS NOT NULL, + entity.relationships.resolution.risk.calculated_level = LAST(TO_STRING(host.entity.relationships.resolution.risk.calculated_level), @timestamp) WHERE host.entity.relationships.resolution.risk.calculated_level IS NOT NULL, + entity.relationships.resolution.risk.calculated_score = LAST(TO_DOUBLE(host.entity.relationships.resolution.risk.calculated_score), @timestamp) WHERE host.entity.relationships.resolution.risk.calculated_score IS NOT NULL, + entity.relationships.resolution.risk.calculated_score_norm = LAST(TO_DOUBLE(host.entity.relationships.resolution.risk.calculated_score_norm), @timestamp) WHERE host.entity.relationships.resolution.risk.calculated_score_norm IS NOT NULL + BY recent.entity.EngineMetadata.UntypedId + | EVAL + entity.EngineMetadata.FirstSeenLogInPage = recent.entity.EngineMetadata.FirstSeenLogInPage, + entity.EngineMetadata.UntypedId = recent.entity.EngineMetadata.UntypedId + | SORT recent.entity.EngineMetadata.FirstSeenLogInPage ASC, recent.entity.EngineMetadata.UntypedId ASC + + | KEEP entity.name, + host.entity.id, + host.name, + host.domain, + host.hostname, + host.id, + host.os.name, + host.os.type, + host.ip, + host.mac, + host.type, + host.architecture, + host.boot.id, + host.cpu.usage, + host.disk.read.bytes, + host.disk.write.bytes, + host.network.egress.bytes, + host.network.egress.packets, + host.network.ingress.bytes, + host.network.ingress.packets, + host.uptime, + host.pid_ns_ino, + host.os.family, + host.os.full, + host.os.kernel, + host.os.platform, + host.os.version, + host.geo.city_name, + host.geo.continent_code, + host.geo.continent_name, + host.geo.country_iso_code, + host.geo.country_name, + host.geo.name, + host.geo.postal_code, + host.geo.region_iso_code, + host.geo.region_name, + host.geo.timezone, + entity.source, + asset.id, + asset.name, + asset.owner, + asset.serial_number, + asset.model, + asset.vendor, + asset.environment, + asset.criticality, + asset.business_unit, + entity.risk.calculated_level, + entity.risk.calculated_score, + entity.risk.calculated_score_norm, + entity.source, + entity.type, + entity.sub_type, + entity.url, + entity.attributes.watchlists, + entity.attributes.asset, + entity.attributes.managed, + entity.attributes.mfa_enabled, + entity.lifecycle.first_seen, + entity.lifecycle.last_activity, + entity.behaviors.rule_names, + entity.behaviors.anomaly_job_ids, + entity.relationships.communicates_with, + entity.relationships.depends_on, + entity.relationships.owns_inferred, + entity.relationships.accesses_infrequently, + entity.relationships.accesses_frequently, + entity.relationships.owns, + entity.relationships.supervises, + entity.relationships.resolution.resolved_to, + entity.relationships.resolution.risk.calculated_level, + entity.relationships.resolution.risk.calculated_score, + entity.relationships.resolution.risk.calculated_score_norm, + @timestamp, + entity.EngineMetadata.FirstSeenLogInPage, + entity.EngineMetadata.UntypedId + | LIMIT 5000" +`; + +exports[`buildCcsLogsExtractionEsqlQuery generates expected query with pagination 1`] = ` +"SET unmapped_fields=\\"nullify\\"; + FROM remote:logs-* + METADATA _index + | WHERE ((user.entity.id IS NOT NULL AND user.entity.id != \\"\\") OR (user.id IS NOT NULL AND user.id != \\"\\") OR (user.name IS NOT NULL AND user.name != \\"\\") OR (user.email IS NOT NULL AND user.email != \\"\\")) + AND @timestamp > TO_DATETIME(\\"2022-01-01T00:00:00.000Z\\") + AND @timestamp <= TO_DATETIME(\\"2022-01-01T23:59:59.999Z\\")| EVAL recent.entity.EngineMetadata.UntypedId = CONCAT(\\"user:\\", CASE((user.entity.id IS NOT NULL AND user.entity.id != \\"\\"), user.entity.id, +(user.name IS NOT NULL AND user.name != \\"\\" AND host.entity.id IS NOT NULL AND host.entity.id != \\"\\"), CONCAT(user.name, \\"@\\", host.entity.id), +(user.name IS NOT NULL AND user.name != \\"\\" AND host.id IS NOT NULL AND host.id != \\"\\"), CONCAT(user.name, \\"@\\", host.id), +(user.name IS NOT NULL AND user.name != \\"\\" AND host.name IS NOT NULL AND host.name != \\"\\"), CONCAT(user.name, \\"@\\", host.name), +(user.id IS NOT NULL AND user.id != \\"\\"), user.id, +(user.email IS NOT NULL AND user.email != \\"\\"), user.email, +(user.name IS NOT NULL AND user.name != \\"\\" AND user.domain IS NOT NULL AND user.domain != \\"\\"), CONCAT(user.name, \\"@\\", user.domain), +(user.name IS NOT NULL AND user.name != \\"\\"), user.name, NULL)) + | STATS + @timestamp = MAX(@timestamp), + recent.entity.EngineMetadata.FirstSeenLogInPage = MIN(@timestamp), + entity.name = LAST(TO_STRING(user.name), @timestamp) WHERE user.name IS NOT NULL, + user.entity.id = FIRST(TO_STRING(user.entity.id), @timestamp) WHERE user.entity.id IS NOT NULL, + user.domain = TOP(MV_DEDUPE(TO_STRING(user.domain)), 10) WHERE user.domain IS NOT NULL, + user.email = TOP(MV_DEDUPE(TO_STRING(user.email)), 10) WHERE user.email IS NOT NULL, + user.name = TOP(MV_DEDUPE(TO_STRING(user.name)), 10) WHERE user.name IS NOT NULL, + user.full_name = TOP(MV_DEDUPE(TO_STRING(user.full_name)), 10) WHERE user.full_name IS NOT NULL, + user.hash = TOP(MV_DEDUPE(TO_STRING(user.hash)), 10) WHERE user.hash IS NOT NULL, + user.id = TOP(MV_DEDUPE(TO_STRING(user.id)), 10) WHERE user.id IS NOT NULL, + user.roles = TOP(MV_DEDUPE(TO_STRING(user.roles)), 10) WHERE user.roles IS NOT NULL, + user.group.domain = TOP(MV_DEDUPE(TO_STRING(user.group.domain)), 10) WHERE user.group.domain IS NOT NULL, + user.group.id = TOP(MV_DEDUPE(TO_STRING(user.group.id)), 10) WHERE user.group.id IS NOT NULL, + user.group.name = TOP(MV_DEDUPE(TO_STRING(user.group.name)), 10) WHERE user.group.name IS NOT NULL, + entity.source = LAST(TO_STRING(_index), @timestamp) WHERE _index IS NOT NULL, + asset.id = LAST(TO_STRING(asset.id), @timestamp) WHERE asset.id IS NOT NULL, + asset.name = LAST(TO_STRING(asset.name), @timestamp) WHERE asset.name IS NOT NULL, + asset.owner = LAST(TO_STRING(asset.owner), @timestamp) WHERE asset.owner IS NOT NULL, + asset.serial_number = LAST(TO_STRING(asset.serial_number), @timestamp) WHERE asset.serial_number IS NOT NULL, + asset.model = LAST(TO_STRING(asset.model), @timestamp) WHERE asset.model IS NOT NULL, + asset.vendor = LAST(TO_STRING(asset.vendor), @timestamp) WHERE asset.vendor IS NOT NULL, + asset.environment = LAST(TO_STRING(asset.environment), @timestamp) WHERE asset.environment IS NOT NULL, + asset.criticality = LAST(TO_STRING(asset.criticality), @timestamp) WHERE asset.criticality IS NOT NULL, + asset.business_unit = LAST(TO_STRING(asset.business_unit), @timestamp) WHERE asset.business_unit IS NOT NULL, + entity.risk.calculated_level = LAST(TO_STRING(user.risk.calculated_level), @timestamp) WHERE user.risk.calculated_level IS NOT NULL, + entity.risk.calculated_score = LAST(TO_DOUBLE(user.risk.calculated_score), @timestamp) WHERE user.risk.calculated_score IS NOT NULL, + entity.risk.calculated_score_norm = LAST(TO_DOUBLE(user.risk.calculated_score_norm), @timestamp) WHERE user.risk.calculated_score_norm IS NOT NULL, + entity.source = LAST(TO_STRING(user.entity.source), @timestamp) WHERE user.entity.source IS NOT NULL, + entity.type = LAST(TO_STRING(user.entity.type), @timestamp) WHERE user.entity.type IS NOT NULL, + entity.sub_type = LAST(TO_STRING(user.entity.sub_type), @timestamp) WHERE user.entity.sub_type IS NOT NULL, + entity.url = LAST(TO_STRING(user.entity.url), @timestamp) WHERE user.entity.url IS NOT NULL, + entity.attributes.watchlists = TOP(MV_DEDUPE(TO_STRING(user.entity.attributes.watchlists)), 10) WHERE user.entity.attributes.watchlists IS NOT NULL, + entity.attributes.asset = LAST(TO_BOOLEAN(user.entity.attributes.asset), @timestamp) WHERE user.entity.attributes.asset IS NOT NULL, + entity.attributes.managed = LAST(TO_BOOLEAN(user.entity.attributes.managed), @timestamp) WHERE user.entity.attributes.managed IS NOT NULL, + entity.attributes.mfa_enabled = LAST(TO_BOOLEAN(user.entity.attributes.mfa_enabled), @timestamp) WHERE user.entity.attributes.mfa_enabled IS NOT NULL, + entity.lifecycle.first_seen = LAST(TO_DATETIME(user.entity.lifecycle.first_seen), @timestamp) WHERE user.entity.lifecycle.first_seen IS NOT NULL, + entity.lifecycle.last_activity = LAST(TO_DATETIME(user.entity.lifecycle.last_activity), @timestamp) WHERE user.entity.lifecycle.last_activity IS NOT NULL, + entity.behaviors.rule_names = TOP(MV_DEDUPE(TO_STRING(user.entity.behaviors.rule_names)), 100) WHERE user.entity.behaviors.rule_names IS NOT NULL, + entity.behaviors.anomaly_job_ids = TOP(MV_DEDUPE(TO_STRING(user.entity.behaviors.anomaly_job_ids)), 100) WHERE user.entity.behaviors.anomaly_job_ids IS NOT NULL, + entity.relationships.communicates_with = TOP(MV_DEDUPE(TO_STRING(user.entity.relationships.communicates_with)), 10) WHERE user.entity.relationships.communicates_with IS NOT NULL, + entity.relationships.depends_on = TOP(MV_DEDUPE(TO_STRING(user.entity.relationships.depends_on)), 10) WHERE user.entity.relationships.depends_on IS NOT NULL, + entity.relationships.owns_inferred = TOP(MV_DEDUPE(TO_STRING(user.entity.relationships.owns_inferred)), 10) WHERE user.entity.relationships.owns_inferred IS NOT NULL, + entity.relationships.accesses_infrequently = TOP(MV_DEDUPE(TO_STRING(user.entity.relationships.accesses_infrequently)), 10) WHERE user.entity.relationships.accesses_infrequently IS NOT NULL, + entity.relationships.accesses_frequently = TOP(MV_DEDUPE(TO_STRING(user.entity.relationships.accesses_frequently)), 10) WHERE user.entity.relationships.accesses_frequently IS NOT NULL, + entity.relationships.owns = TOP(MV_DEDUPE(TO_STRING(user.entity.relationships.owns)), 10) WHERE user.entity.relationships.owns IS NOT NULL, + entity.relationships.supervises = TOP(MV_DEDUPE(TO_STRING(user.entity.relationships.supervises)), 10) WHERE user.entity.relationships.supervises IS NOT NULL, + entity.relationships.resolution.resolved_to = LAST(TO_STRING(user.entity.relationships.resolution.resolved_to), @timestamp) WHERE user.entity.relationships.resolution.resolved_to IS NOT NULL, + entity.relationships.resolution.risk.calculated_level = LAST(TO_STRING(user.entity.relationships.resolution.risk.calculated_level), @timestamp) WHERE user.entity.relationships.resolution.risk.calculated_level IS NOT NULL, + entity.relationships.resolution.risk.calculated_score = LAST(TO_DOUBLE(user.entity.relationships.resolution.risk.calculated_score), @timestamp) WHERE user.entity.relationships.resolution.risk.calculated_score IS NOT NULL, + entity.relationships.resolution.risk.calculated_score_norm = LAST(TO_DOUBLE(user.entity.relationships.resolution.risk.calculated_score_norm), @timestamp) WHERE user.entity.relationships.resolution.risk.calculated_score_norm IS NOT NULL, + host.entity.id = TOP(MV_DEDUPE(TO_STRING(host.entity.id)), 10) WHERE host.entity.id IS NOT NULL, + host.id = TOP(MV_DEDUPE(TO_STRING(host.id)), 10) WHERE host.id IS NOT NULL, + host.name = TOP(MV_DEDUPE(TO_STRING(host.name)), 10) WHERE host.name IS NOT NULL + BY recent.entity.EngineMetadata.UntypedId + | EVAL + entity.EngineMetadata.FirstSeenLogInPage = recent.entity.EngineMetadata.FirstSeenLogInPage, + entity.EngineMetadata.UntypedId = recent.entity.EngineMetadata.UntypedId + | SORT recent.entity.EngineMetadata.FirstSeenLogInPage ASC, recent.entity.EngineMetadata.UntypedId ASC + | WHERE entity.EngineMetadata.FirstSeenLogInPage > TO_DATETIME(\\"2022-01-01T12:00:00.000Z\\") + OR (entity.EngineMetadata.FirstSeenLogInPage == TO_DATETIME(\\"2022-01-01T12:00:00.000Z\\") + AND recent.entity.EngineMetadata.UntypedId > \\"cursor-id\\") + | KEEP entity.name, + user.entity.id, + user.domain, + user.email, + user.name, + user.full_name, + user.hash, + user.id, + user.roles, + user.group.domain, + user.group.id, + user.group.name, + entity.source, + asset.id, + asset.name, + asset.owner, + asset.serial_number, + asset.model, + asset.vendor, + asset.environment, + asset.criticality, + asset.business_unit, + entity.risk.calculated_level, + entity.risk.calculated_score, + entity.risk.calculated_score_norm, + entity.source, + entity.type, + entity.sub_type, + entity.url, + entity.attributes.watchlists, + entity.attributes.asset, + entity.attributes.managed, + entity.attributes.mfa_enabled, + entity.lifecycle.first_seen, + entity.lifecycle.last_activity, + entity.behaviors.rule_names, + entity.behaviors.anomaly_job_ids, + entity.relationships.communicates_with, + entity.relationships.depends_on, + entity.relationships.owns_inferred, + entity.relationships.accesses_infrequently, + entity.relationships.accesses_frequently, + entity.relationships.owns, + entity.relationships.supervises, + entity.relationships.resolution.resolved_to, + entity.relationships.resolution.risk.calculated_level, + entity.relationships.resolution.risk.calculated_score, + entity.relationships.resolution.risk.calculated_score_norm, + host.entity.id, + host.id, + host.name, + @timestamp, + entity.EngineMetadata.FirstSeenLogInPage, + entity.EngineMetadata.UntypedId + | LIMIT 10000" +`; + +exports[`buildCcsLogsExtractionEsqlQuery generates query without LOOKUP JOIN 1`] = ` +"SET unmapped_fields=\\"nullify\\"; + FROM remote_cluster:logs-* + METADATA _index + | WHERE ((host.entity.id IS NOT NULL AND host.entity.id != \\"\\") OR (host.id IS NOT NULL AND host.id != \\"\\") OR (host.name IS NOT NULL AND host.name != \\"\\") OR (host.hostname IS NOT NULL AND host.hostname != \\"\\")) + AND @timestamp > TO_DATETIME(\\"2022-01-01T00:00:00.000Z\\") + AND @timestamp <= TO_DATETIME(\\"2022-01-01T23:59:59.999Z\\")| EVAL recent.entity.EngineMetadata.UntypedId = CONCAT(\\"host:\\", CASE((host.entity.id IS NOT NULL AND host.entity.id != \\"\\"), host.entity.id, +(host.id IS NOT NULL AND host.id != \\"\\"), host.id, +(host.name IS NOT NULL AND host.name != \\"\\" AND host.domain IS NOT NULL AND host.domain != \\"\\"), CONCAT(host.name, \\".\\", host.domain), +(host.hostname IS NOT NULL AND host.hostname != \\"\\" AND host.domain IS NOT NULL AND host.domain != \\"\\"), CONCAT(host.hostname, \\".\\", host.domain), +(host.name IS NOT NULL AND host.name != \\"\\"), host.name, +(host.hostname IS NOT NULL AND host.hostname != \\"\\"), host.hostname, NULL)) + | STATS + @timestamp = MAX(@timestamp), + recent.entity.EngineMetadata.FirstSeenLogInPage = MIN(@timestamp), + entity.name = LAST(TO_STRING(host.name), @timestamp) WHERE host.name IS NOT NULL, + host.entity.id = FIRST(TO_STRING(host.entity.id), @timestamp) WHERE host.entity.id IS NOT NULL, + host.name = TOP(MV_DEDUPE(TO_STRING(host.name)), 10) WHERE host.name IS NOT NULL, + host.domain = TOP(MV_DEDUPE(TO_STRING(host.domain)), 10) WHERE host.domain IS NOT NULL, + host.hostname = TOP(MV_DEDUPE(TO_STRING(host.hostname)), 10) WHERE host.hostname IS NOT NULL, + host.id = TOP(MV_DEDUPE(TO_STRING(host.id)), 10) WHERE host.id IS NOT NULL, + host.os.name = TOP(MV_DEDUPE(TO_STRING(host.os.name)), 10) WHERE host.os.name IS NOT NULL, + host.os.type = TOP(MV_DEDUPE(TO_STRING(host.os.type)), 10) WHERE host.os.type IS NOT NULL, + host.ip = TOP(MV_DEDUPE(TO_IP(host.ip)), 10) WHERE host.ip IS NOT NULL, + host.mac = TOP(MV_DEDUPE(TO_STRING(host.mac)), 10) WHERE host.mac IS NOT NULL, + host.type = TOP(MV_DEDUPE(TO_STRING(host.type)), 10) WHERE host.type IS NOT NULL, + host.architecture = TOP(MV_DEDUPE(TO_STRING(host.architecture)), 10) WHERE host.architecture IS NOT NULL, + host.boot.id = LAST(TO_STRING(host.boot.id), @timestamp) WHERE host.boot.id IS NOT NULL, + host.cpu.usage = LAST(host.cpu.usage, @timestamp) WHERE host.cpu.usage IS NOT NULL, + host.disk.read.bytes = LAST(TO_LONG(host.disk.read.bytes), @timestamp) WHERE host.disk.read.bytes IS NOT NULL, + host.disk.write.bytes = LAST(TO_LONG(host.disk.write.bytes), @timestamp) WHERE host.disk.write.bytes IS NOT NULL, + host.network.egress.bytes = LAST(TO_LONG(host.network.egress.bytes), @timestamp) WHERE host.network.egress.bytes IS NOT NULL, + host.network.egress.packets = LAST(TO_LONG(host.network.egress.packets), @timestamp) WHERE host.network.egress.packets IS NOT NULL, + host.network.ingress.bytes = LAST(TO_LONG(host.network.ingress.bytes), @timestamp) WHERE host.network.ingress.bytes IS NOT NULL, + host.network.ingress.packets = LAST(TO_LONG(host.network.ingress.packets), @timestamp) WHERE host.network.ingress.packets IS NOT NULL, + host.uptime = LAST(TO_LONG(host.uptime), @timestamp) WHERE host.uptime IS NOT NULL, + host.pid_ns_ino = LAST(TO_STRING(host.pid_ns_ino), @timestamp) WHERE host.pid_ns_ino IS NOT NULL, + host.os.family = LAST(TO_STRING(host.os.family), @timestamp) WHERE host.os.family IS NOT NULL, + host.os.full = LAST(TO_STRING(host.os.full), @timestamp) WHERE host.os.full IS NOT NULL, + host.os.kernel = LAST(TO_STRING(host.os.kernel), @timestamp) WHERE host.os.kernel IS NOT NULL, + host.os.platform = LAST(TO_STRING(host.os.platform), @timestamp) WHERE host.os.platform IS NOT NULL, + host.os.version = LAST(TO_STRING(host.os.version), @timestamp) WHERE host.os.version IS NOT NULL, + host.geo.city_name = LAST(TO_STRING(host.geo.city_name), @timestamp) WHERE host.geo.city_name IS NOT NULL, + host.geo.continent_code = LAST(TO_STRING(host.geo.continent_code), @timestamp) WHERE host.geo.continent_code IS NOT NULL, + host.geo.continent_name = LAST(TO_STRING(host.geo.continent_name), @timestamp) WHERE host.geo.continent_name IS NOT NULL, + host.geo.country_iso_code = LAST(TO_STRING(host.geo.country_iso_code), @timestamp) WHERE host.geo.country_iso_code IS NOT NULL, + host.geo.country_name = LAST(TO_STRING(host.geo.country_name), @timestamp) WHERE host.geo.country_name IS NOT NULL, + host.geo.name = TOP(MV_DEDUPE(TO_STRING(host.geo.name)), 10) WHERE host.geo.name IS NOT NULL, + host.geo.postal_code = TOP(MV_DEDUPE(TO_STRING(host.geo.postal_code)), 10) WHERE host.geo.postal_code IS NOT NULL, + host.geo.region_iso_code = TOP(MV_DEDUPE(TO_STRING(host.geo.region_iso_code)), 10) WHERE host.geo.region_iso_code IS NOT NULL, + host.geo.region_name = TOP(MV_DEDUPE(TO_STRING(host.geo.region_name)), 10) WHERE host.geo.region_name IS NOT NULL, + host.geo.timezone = TOP(MV_DEDUPE(TO_STRING(host.geo.timezone)), 10) WHERE host.geo.timezone IS NOT NULL, + entity.source = LAST(TO_STRING(_index), @timestamp) WHERE _index IS NOT NULL, + asset.id = LAST(TO_STRING(asset.id), @timestamp) WHERE asset.id IS NOT NULL, + asset.name = LAST(TO_STRING(asset.name), @timestamp) WHERE asset.name IS NOT NULL, + asset.owner = LAST(TO_STRING(asset.owner), @timestamp) WHERE asset.owner IS NOT NULL, + asset.serial_number = LAST(TO_STRING(asset.serial_number), @timestamp) WHERE asset.serial_number IS NOT NULL, + asset.model = LAST(TO_STRING(asset.model), @timestamp) WHERE asset.model IS NOT NULL, + asset.vendor = LAST(TO_STRING(asset.vendor), @timestamp) WHERE asset.vendor IS NOT NULL, + asset.environment = LAST(TO_STRING(asset.environment), @timestamp) WHERE asset.environment IS NOT NULL, + asset.criticality = LAST(TO_STRING(asset.criticality), @timestamp) WHERE asset.criticality IS NOT NULL, + asset.business_unit = LAST(TO_STRING(asset.business_unit), @timestamp) WHERE asset.business_unit IS NOT NULL, + entity.risk.calculated_level = LAST(TO_STRING(host.risk.calculated_level), @timestamp) WHERE host.risk.calculated_level IS NOT NULL, + entity.risk.calculated_score = LAST(TO_DOUBLE(host.risk.calculated_score), @timestamp) WHERE host.risk.calculated_score IS NOT NULL, + entity.risk.calculated_score_norm = LAST(TO_DOUBLE(host.risk.calculated_score_norm), @timestamp) WHERE host.risk.calculated_score_norm IS NOT NULL, + entity.source = LAST(TO_STRING(host.entity.source), @timestamp) WHERE host.entity.source IS NOT NULL, + entity.type = LAST(TO_STRING(host.entity.type), @timestamp) WHERE host.entity.type IS NOT NULL, + entity.sub_type = LAST(TO_STRING(host.entity.sub_type), @timestamp) WHERE host.entity.sub_type IS NOT NULL, + entity.url = LAST(TO_STRING(host.entity.url), @timestamp) WHERE host.entity.url IS NOT NULL, + entity.attributes.watchlists = TOP(MV_DEDUPE(TO_STRING(host.entity.attributes.watchlists)), 10) WHERE host.entity.attributes.watchlists IS NOT NULL, + entity.attributes.asset = LAST(TO_BOOLEAN(host.entity.attributes.asset), @timestamp) WHERE host.entity.attributes.asset IS NOT NULL, + entity.attributes.managed = LAST(TO_BOOLEAN(host.entity.attributes.managed), @timestamp) WHERE host.entity.attributes.managed IS NOT NULL, + entity.attributes.mfa_enabled = LAST(TO_BOOLEAN(host.entity.attributes.mfa_enabled), @timestamp) WHERE host.entity.attributes.mfa_enabled IS NOT NULL, + entity.lifecycle.first_seen = LAST(TO_DATETIME(host.entity.lifecycle.first_seen), @timestamp) WHERE host.entity.lifecycle.first_seen IS NOT NULL, + entity.lifecycle.last_activity = LAST(TO_DATETIME(host.entity.lifecycle.last_activity), @timestamp) WHERE host.entity.lifecycle.last_activity IS NOT NULL, + entity.behaviors.rule_names = TOP(MV_DEDUPE(TO_STRING(host.entity.behaviors.rule_names)), 100) WHERE host.entity.behaviors.rule_names IS NOT NULL, + entity.behaviors.anomaly_job_ids = TOP(MV_DEDUPE(TO_STRING(host.entity.behaviors.anomaly_job_ids)), 100) WHERE host.entity.behaviors.anomaly_job_ids IS NOT NULL, + entity.relationships.communicates_with = TOP(MV_DEDUPE(TO_STRING(host.entity.relationships.communicates_with)), 10) WHERE host.entity.relationships.communicates_with IS NOT NULL, + entity.relationships.depends_on = TOP(MV_DEDUPE(TO_STRING(host.entity.relationships.depends_on)), 10) WHERE host.entity.relationships.depends_on IS NOT NULL, + entity.relationships.owns_inferred = TOP(MV_DEDUPE(TO_STRING(host.entity.relationships.owns_inferred)), 10) WHERE host.entity.relationships.owns_inferred IS NOT NULL, + entity.relationships.accesses_infrequently = TOP(MV_DEDUPE(TO_STRING(host.entity.relationships.accesses_infrequently)), 10) WHERE host.entity.relationships.accesses_infrequently IS NOT NULL, + entity.relationships.accesses_frequently = TOP(MV_DEDUPE(TO_STRING(host.entity.relationships.accesses_frequently)), 10) WHERE host.entity.relationships.accesses_frequently IS NOT NULL, + entity.relationships.owns = TOP(MV_DEDUPE(TO_STRING(host.entity.relationships.owns)), 10) WHERE host.entity.relationships.owns IS NOT NULL, + entity.relationships.supervises = TOP(MV_DEDUPE(TO_STRING(host.entity.relationships.supervises)), 10) WHERE host.entity.relationships.supervises IS NOT NULL, + entity.relationships.resolution.resolved_to = LAST(TO_STRING(host.entity.relationships.resolution.resolved_to), @timestamp) WHERE host.entity.relationships.resolution.resolved_to IS NOT NULL, + entity.relationships.resolution.risk.calculated_level = LAST(TO_STRING(host.entity.relationships.resolution.risk.calculated_level), @timestamp) WHERE host.entity.relationships.resolution.risk.calculated_level IS NOT NULL, + entity.relationships.resolution.risk.calculated_score = LAST(TO_DOUBLE(host.entity.relationships.resolution.risk.calculated_score), @timestamp) WHERE host.entity.relationships.resolution.risk.calculated_score IS NOT NULL, + entity.relationships.resolution.risk.calculated_score_norm = LAST(TO_DOUBLE(host.entity.relationships.resolution.risk.calculated_score_norm), @timestamp) WHERE host.entity.relationships.resolution.risk.calculated_score_norm IS NOT NULL + BY recent.entity.EngineMetadata.UntypedId + | EVAL + entity.EngineMetadata.FirstSeenLogInPage = recent.entity.EngineMetadata.FirstSeenLogInPage, + entity.EngineMetadata.UntypedId = recent.entity.EngineMetadata.UntypedId + | SORT recent.entity.EngineMetadata.FirstSeenLogInPage ASC, recent.entity.EngineMetadata.UntypedId ASC + + | KEEP entity.name, + host.entity.id, + host.name, + host.domain, + host.hostname, + host.id, + host.os.name, + host.os.type, + host.ip, + host.mac, + host.type, + host.architecture, + host.boot.id, + host.cpu.usage, + host.disk.read.bytes, + host.disk.write.bytes, + host.network.egress.bytes, + host.network.egress.packets, + host.network.ingress.bytes, + host.network.ingress.packets, + host.uptime, + host.pid_ns_ino, + host.os.family, + host.os.full, + host.os.kernel, + host.os.platform, + host.os.version, + host.geo.city_name, + host.geo.continent_code, + host.geo.continent_name, + host.geo.country_iso_code, + host.geo.country_name, + host.geo.name, + host.geo.postal_code, + host.geo.region_iso_code, + host.geo.region_name, + host.geo.timezone, + entity.source, + asset.id, + asset.name, + asset.owner, + asset.serial_number, + asset.model, + asset.vendor, + asset.environment, + asset.criticality, + asset.business_unit, + entity.risk.calculated_level, + entity.risk.calculated_score, + entity.risk.calculated_score_norm, + entity.source, + entity.type, + entity.sub_type, + entity.url, + entity.attributes.watchlists, + entity.attributes.asset, + entity.attributes.managed, + entity.attributes.mfa_enabled, + entity.lifecycle.first_seen, + entity.lifecycle.last_activity, + entity.behaviors.rule_names, + entity.behaviors.anomaly_job_ids, + entity.relationships.communicates_with, + entity.relationships.depends_on, + entity.relationships.owns_inferred, + entity.relationships.accesses_infrequently, + entity.relationships.accesses_frequently, + entity.relationships.owns, + entity.relationships.supervises, + entity.relationships.resolution.resolved_to, + entity.relationships.resolution.risk.calculated_level, + entity.relationships.resolution.risk.calculated_score, + entity.relationships.resolution.risk.calculated_score_norm, + @timestamp, + entity.EngineMetadata.FirstSeenLogInPage, + entity.EngineMetadata.UntypedId + | LIMIT 10000" +`; + exports[`buildLogsExtractionEsqlQuery generates the expected query for generic entity description 1`] = ` "FROM test-index-* METADATA _index @@ -10,7 +489,8 @@ exports[`buildLogsExtractionEsqlQuery generates the expected query for generic e | STATS entity.EngineMetadata.FirstSeenLogInPage = MIN(@timestamp), recent.timestamp = MAX(@timestamp), - recent.entity.name = LAST(TO_STRING(entity.name), @timestamp) WHERE entity.name IS NOT NULL, + recent.entity.id = LAST(TO_STRING(entity.id), @timestamp) WHERE entity.id IS NOT NULL, + recent.entity.name = LAST(TO_STRING(entity.name), @timestamp) WHERE entity.name IS NOT NULL, recent.entity.source = LAST(TO_STRING(entity.source), @timestamp) WHERE entity.source IS NOT NULL, recent.entity.type = LAST(TO_STRING(entity.type), @timestamp) WHERE entity.type IS NOT NULL, recent.entity.sub_type = LAST(TO_STRING(entity.sub_type), @timestamp) WHERE entity.sub_type IS NOT NULL, @@ -150,7 +630,8 @@ exports[`buildLogsExtractionEsqlQuery generates the expected query for generic e | RENAME recent.entity.id AS entity.id, recent.entity.EngineMetadata.UntypedId AS entity.EngineMetadata.UntypedId - | KEEP entity.name, + | KEEP entity.id, + entity.name, entity.source, entity.type, entity.sub_type, diff --git a/x-pack/solutions/security/plugins/entity_store/server/domain/logs_extraction/logs_extraction_query_builder.test.ts b/x-pack/solutions/security/plugins/entity_store/server/domain/logs_extraction/logs_extraction_query_builder.test.ts index 8b75f2ccd4db7..9bcb78d0faa06 100644 --- a/x-pack/solutions/security/plugins/entity_store/server/domain/logs_extraction/logs_extraction_query_builder.test.ts +++ b/x-pack/solutions/security/plugins/entity_store/server/domain/logs_extraction/logs_extraction_query_builder.test.ts @@ -7,6 +7,7 @@ import { buildLogsExtractionEsqlQuery, + buildCcsLogsExtractionEsqlQuery, buildRemainingLogsCountQuery, } from './logs_extraction_query_builder'; import { getEntityDefinition } from '../../../common/domain/definitions/registry'; @@ -61,6 +62,47 @@ describe('buildLogsExtractionEsqlQuery', () => { }); }); +describe('buildCcsLogsExtractionEsqlQuery', () => { + it('generates query without LOOKUP JOIN', () => { + const query = buildCcsLogsExtractionEsqlQuery({ + indexPatterns: ['remote_cluster:logs-*'], + entityDefinition: getEntityDefinition('host', 'default'), + fromDateISO: '2022-01-01T00:00:00.000Z', + toDateISO: '2022-01-01T23:59:59.999Z', + docsLimit: 10000, + }); + expect(query).not.toContain('LOOKUP JOIN'); + expect(query).toMatchSnapshot(); + }); + + it('generates expected query for host entity type', () => { + const query = buildCcsLogsExtractionEsqlQuery({ + indexPatterns: ['remote:metrics-*'], + entityDefinition: getEntityDefinition('host', 'default'), + fromDateISO: '2022-01-01T00:00:00.000Z', + toDateISO: '2022-01-01T23:59:59.999Z', + docsLimit: 5000, + }); + expect(query).toMatchSnapshot(); + }); + + it('generates expected query with pagination', () => { + const query = buildCcsLogsExtractionEsqlQuery({ + indexPatterns: ['remote:logs-*'], + entityDefinition: getEntityDefinition('user', 'default'), + fromDateISO: '2022-01-01T00:00:00.000Z', + toDateISO: '2022-01-01T23:59:59.999Z', + docsLimit: 10000, + pagination: { + timestampCursor: '2022-01-01T12:00:00.000Z', + idCursor: 'cursor-id', + }, + }); + expect(query).toContain('FirstSeenLogInPage > TO_DATETIME("2022-01-01T12:00:00.000Z")'); + expect(query).toMatchSnapshot(); + }); +}); + describe('buildRemainingLogsCountQuery', () => { Object.values(EntityType.Values).forEach((type) => { it(`generates the expected query for ${type} entity type`, () => { diff --git a/x-pack/solutions/security/plugins/entity_store/server/domain/logs_extraction/logs_extraction_query_builder.ts b/x-pack/solutions/security/plugins/entity_store/server/domain/logs_extraction/logs_extraction_query_builder.ts index 49c76e6b0cf4e..4de8148d2856c 100644 --- a/x-pack/solutions/security/plugins/entity_store/server/domain/logs_extraction/logs_extraction_query_builder.ts +++ b/x-pack/solutions/security/plugins/entity_store/server/domain/logs_extraction/logs_extraction_query_builder.ts @@ -27,9 +27,11 @@ const ENGINE_METADATA_TYPE_FIELD = 'entity.EngineMetadata.Type'; const MAIN_ENTITY_ID_FIELD = 'entity.id'; const ENTITY_NAME_FIELD = 'entity.name'; +const ENTITY_TYPE_FIELD = 'entity.type'; const TIMESTAMP_FIELD = '@timestamp'; const METADATA_FIELDS = ['_index']; + const DEFAULT_FIELDS_TO_KEEP = [ TIMESTAMP_FIELD, MAIN_ENTITY_ID_FIELD, @@ -40,6 +42,13 @@ const DEFAULT_FIELDS_TO_KEEP = [ ENGINE_METADATA_PAGINATION_FIRST_SEEN_LOG_FIELD, ]; +const CCS_FIELDS_TO_KEEP = [ + TIMESTAMP_FIELD, + ENGINE_METADATA_PAGINATION_FIRST_SEEN_LOG_FIELD, + // Keep it for debug visibility purposes + ENGINE_METADATA_UNTYPED_ID_FIELD, +]; + const RECENT_DATA_PREFIX = 'recent'; // Some fields have only src and we need to fallback to it. function recentData(dest: string) { @@ -127,7 +136,7 @@ export function buildLogsExtractionEsqlQuery({ | STATS ${ENGINE_METADATA_PAGINATION_FIRST_SEEN_LOG_FIELD} = MIN(${TIMESTAMP_FIELD}), ${recentData('timestamp')} = MAX(${TIMESTAMP_FIELD}), - ${recentFieldStats(fields)} + ${aggregationStats(fields)} BY ${recentData(ENGINE_METADATA_UNTYPED_ID_FIELD)}` + // early sort, paginate and limit so we perform data retention operations (LOOKUP JOIN) only on documents // that are needed @@ -160,19 +169,68 @@ export function buildLogsExtractionEsqlQuery({ ); } -function recentFieldStats(fields: EntityField[]) { +export interface CcsLogsExtractionQueryParams { + indexPatterns: string[]; + entityDefinition: EntityDefinition; + fromDateISO: string; + toDateISO: string; + docsLimit: number; + recoveryId?: string; + pagination?: PaginationParams; +} + +/** + * Builds ESQL for CCS-only extraction: same aggregation as main but no LOOKUP JOIN. + * Writes partial entities to updates with @timestamp = nowISO so the next run intakes them. + */ +export function buildCcsLogsExtractionEsqlQuery({ + indexPatterns, + entityDefinition: { fields, type }, + fromDateISO, + toDateISO, + docsLimit, + recoveryId, + pagination, +}: CcsLogsExtractionQueryParams): string { + return ( + `SET unmapped_fields="nullify"; + ${buildExtractionSourceClause({ indexPatterns, type, fromDateISO, toDateISO, recoveryId })}` + + // Using the same structure as the main logs extraction re-use fields to perform the aggregation + `| EVAL ${recentData(ENGINE_METADATA_UNTYPED_ID_FIELD)} = ${getEuidEsqlEvaluation(type, { + withTypeId: true, + })} + | STATS + ${TIMESTAMP_FIELD} = MAX(${TIMESTAMP_FIELD}), + ${recentData(ENGINE_METADATA_PAGINATION_FIRST_SEEN_LOG_FIELD)} = MIN(${TIMESTAMP_FIELD}), + ${aggregationStats(fields, false)} + BY ${recentData(ENGINE_METADATA_UNTYPED_ID_FIELD)} + | EVAL + ${ENGINE_METADATA_PAGINATION_FIRST_SEEN_LOG_FIELD} = ${recentData( + ENGINE_METADATA_PAGINATION_FIRST_SEEN_LOG_FIELD + )}, + ${ENGINE_METADATA_UNTYPED_ID_FIELD} = ${recentData(ENGINE_METADATA_UNTYPED_ID_FIELD)} + | SORT ${recentData(ENGINE_METADATA_PAGINATION_FIRST_SEEN_LOG_FIELD)} ASC, ${recentData( + ENGINE_METADATA_UNTYPED_ID_FIELD + )} ASC + ${getPaginationWhereClause(pagination, recoveryId ? { fromDateISO, recoveryId } : undefined)} + | KEEP ${fieldsToKeep(fields, CCS_FIELDS_TO_KEEP)} + | LIMIT ${docsLimit}` + ); +} + +function aggregationStats(fields: EntityField[], renameToRecent: boolean = true) { return fields .map((field) => { const { retention, destination: dest, source } = field; - const recentDest = recentData(dest); + const finalDest = renameToRecent ? recentData(dest) : dest; const castedSrc = castSrcType(field); switch (retention.operation) { case 'collect_values': - return `${recentDest} = TOP(MV_DEDUPE(${castedSrc}), ${retention.maxLength}) WHERE ${source} IS NOT NULL`; + return `${finalDest} = TOP(MV_DEDUPE(${castedSrc}), ${retention.maxLength}) WHERE ${source} IS NOT NULL`; case 'prefer_newest_value': - return `${recentDest} = LAST(${castedSrc}, ${TIMESTAMP_FIELD}) WHERE ${source} IS NOT NULL`; + return `${finalDest} = LAST(${castedSrc}, ${TIMESTAMP_FIELD}) WHERE ${source} IS NOT NULL`; case 'prefer_oldest_value': - return `${recentDest} = FIRST(${castedSrc}, ${TIMESTAMP_FIELD}) WHERE ${source} IS NOT NULL`; + return `${finalDest} = FIRST(${castedSrc}, ${TIMESTAMP_FIELD}) WHERE ${source} IS NOT NULL`; default: throw new Error('unknown field operation'); } @@ -206,10 +264,13 @@ function mergedFieldStats(idFieldName: string, fields: EntityField[]) { .join(',\n '); } -function fieldsToKeep(fields: EntityField[]) { +function fieldsToKeep( + fields: EntityField[], + defaultFieldsToKeep: string[] = DEFAULT_FIELDS_TO_KEEP +) { return fields .map(({ destination }) => destination) - .concat(DEFAULT_FIELDS_TO_KEEP) + .concat(defaultFieldsToKeep) .join(',\n '); } @@ -232,7 +293,7 @@ function customFieldEvalLogic(type: EntityType, entityTypeFallback?: string) { if (entityTypeFallback) { // If type doesn't exist, fallback to the entity type fallback - evals.push(`entity.type = COALESCE(entity.type, "${entityTypeFallback}")`); + evals.push(`${ENTITY_TYPE_FIELD} = COALESCE(${ENTITY_TYPE_FIELD}, "${entityTypeFallback}")`); } return evals.join(',\n '); diff --git a/x-pack/solutions/security/plugins/entity_store/server/domain/logs_extraction_client.test.ts b/x-pack/solutions/security/plugins/entity_store/server/domain/logs_extraction_client.test.ts index 536644bf5a3b8..7f28eb2795cd2 100644 --- a/x-pack/solutions/security/plugins/entity_store/server/domain/logs_extraction_client.test.ts +++ b/x-pack/solutions/security/plugins/entity_store/server/domain/logs_extraction_client.test.ts @@ -6,6 +6,7 @@ */ import { LogsExtractionClient } from './logs_extraction_client'; +import type { CcsLogsExtractionClient } from './ccs_logs_extraction_client'; import { loggerMock } from '@kbn/logging-mocks'; import type { ElasticsearchClient } from '@kbn/core/server'; import type { DataViewsService } from '@kbn/data-views-plugin/common'; @@ -21,6 +22,14 @@ import { LogExtractionState, type EngineDescriptorClient } from './definitions/s import { ENGINE_STATUS } from './constants'; import type { EntityType } from '../../common/domain/definitions/entity_schema'; +function createMockCcsLogsExtractionClient(): jest.Mocked< + Pick +> { + return { + extractToUpdates: jest.fn().mockResolvedValue({ count: 0, pages: 0 }), + }; +} + jest.mock('../infra/elasticsearch/esql'); jest.mock('../infra/elasticsearch/ingest'); @@ -54,6 +63,7 @@ describe('LogsExtractionClient', () => { let mockEngineDescriptorClient: jest.Mocked< Pick >; + let mockCcsLogsExtractionClient: ReturnType; beforeEach(() => { jest.clearAllMocks(); @@ -67,14 +77,16 @@ describe('LogsExtractionClient', () => { findOrThrow: jest.fn(), update: jest.fn().mockResolvedValue({}), }; - - client = new LogsExtractionClient( - mockLogger, - 'default', - mockEsClient, - mockDataViewsService, - mockEngineDescriptorClient as unknown as EngineDescriptorClient - ); + mockCcsLogsExtractionClient = createMockCcsLogsExtractionClient(); + + client = new LogsExtractionClient({ + logger: mockLogger, + namespace: 'default', + esClient: mockEsClient, + dataViewsService: mockDataViewsService, + engineDescriptorClient: mockEngineDescriptorClient as unknown as EngineDescriptorClient, + ccsLogsExtractionClient: mockCcsLogsExtractionClient as unknown as CcsLogsExtractionClient, + }); }); describe('extractLogs', () => { @@ -481,13 +493,14 @@ describe('LogsExtractionClient', () => { }); }); - it('should filter out cross-cluster search (CCS) remote indices', async () => { + it('should filter out cross-cluster search (CCS) remote indices from main query and run CCS in parallel', async () => { const mockEsqlResponse: ESQLSearchResponse = { columns: [ { name: '@timestamp', type: 'date' }, { name: HASHED_ID_FIELD, type: 'keyword' }, + { name: 'entity.id', type: 'keyword' }, ], - values: [['2024-01-02T10:00:00.000Z', 'hash1']], + values: [['2024-01-02T10:00:00.000Z', 'hash1', 'user:u1']], }; const mockDataView = { @@ -508,10 +521,74 @@ describe('LogsExtractionClient', () => { const result = await client.extractLogs('user'); expect(result.success).toBe(true); + // Main extraction uses local indices only; CCS client (injected) runs in parallel. expect(result.success && result.scannedIndices).toContain('logs-*'); expect(result.success && result.scannedIndices).toContain('metrics-*'); - expect(result.success && result.scannedIndices).not.toContain('remote_cluster:logs-*'); - expect(result.success && result.scannedIndices).not.toContain('other:filebeat-*'); + expect(result.success && result.scannedIndices).toContain('remote_cluster:logs-*'); + expect(result.success && result.scannedIndices).toContain('other:filebeat-*'); + // Main query runs once; injected CCS client is invoked for remote patterns + expect(mockExecuteEsqlQuery).toHaveBeenCalledTimes(1); + expect(mockCcsLogsExtractionClient.extractToUpdates).toHaveBeenCalledTimes(1); + expect(mockCcsLogsExtractionClient.extractToUpdates).toHaveBeenCalledWith( + expect.objectContaining({ + type: 'user', + remoteIndexPatterns: ['remote_cluster:logs-*', 'other:filebeat-*'], + }) + ); + }); + + it('should store CCS errors in the saved object while main execution remains unchanged', async () => { + const mockEsqlResponse: ESQLSearchResponse = { + columns: [ + { name: '@timestamp', type: 'date' }, + { name: HASHED_ID_FIELD, type: 'keyword' }, + { name: 'entity.id', type: 'keyword' }, + ], + values: [['2024-01-02T10:00:00.000Z', 'hash1', 'user:u1']], + }; + + const mockDataView = { + getIndexPattern: jest.fn().mockReturnValue('logs-*,remote_cluster:logs-*'), + }; + + const ccsError = new Error('CCS connection failed'); + mockEngineDescriptorClient.findOrThrow.mockResolvedValue( + createMockEngineDescriptor('user') as Awaited< + ReturnType + > + ); + mockDataViewsService.get.mockResolvedValue(mockDataView as any); + mockExecuteEsqlQuery.mockResolvedValue(mockEsqlResponse); + mockIngestEntities.mockResolvedValue(undefined); + mockCcsLogsExtractionClient.extractToUpdates.mockResolvedValue({ + count: 0, + pages: 0, + error: ccsError, + }); + + const result = await client.extractLogs('user'); + + // Main execution is unchanged: success, count from main query, ESQL and ingest called once + expect(result.success).toBe(true); + expect(result.success && result.count).toBe(1); + expect(result.success && result.scannedIndices).toContain('logs-*'); + expect(result.success && result.scannedIndices).toContain('remote_cluster:logs-*'); + expect(mockExecuteEsqlQuery).toHaveBeenCalledTimes(1); + expect(mockIngestEntities).toHaveBeenCalledTimes(1); + + // CCS error is stored in the saved object + expect(mockEngineDescriptorClient.update).toHaveBeenCalledWith( + 'user', + expect.objectContaining({ + logExtractionState: expect.objectContaining({ + paginationTimestamp: undefined, + paginationId: undefined, + lastExecutionTimestamp: expect.any(String), + }), + error: { message: ccsError.message, action: 'extractLogs' }, + }), + { mergeAttributes: false } + ); }); it('should fallback to logs-* when data view is not found', async () => { @@ -605,6 +682,44 @@ describe('LogsExtractionClient', () => { }); }); + describe('getLocalAndRemoteIndexPatterns', () => { + it('should split local and CCS remote index patterns', async () => { + const mockDataView = { + getIndexPattern: jest + .fn() + .mockReturnValue('logs-*,remote_cluster:logs-*,metrics-*,other:filebeat-*'), + }; + mockDataViewsService.get.mockResolvedValue(mockDataView as any); + + const { localIndexPatterns, remoteIndexPatterns } = + await client.getLocalAndRemoteIndexPatterns(['custom-index']); + + expect(localIndexPatterns).toContain('logs-*'); + expect(localIndexPatterns).toContain('metrics-*'); + expect(localIndexPatterns).toContain('custom-index'); + expect(localIndexPatterns).not.toContain('remote_cluster:logs-*'); + expect(localIndexPatterns).not.toContain('other:filebeat-*'); + + expect(remoteIndexPatterns).toContain('remote_cluster:logs-*'); + expect(remoteIndexPatterns).toContain('other:filebeat-*'); + expect(remoteIndexPatterns).not.toContain('logs-*'); + expect(remoteIndexPatterns).not.toContain('metrics-*'); + }); + + it('should exclude alerts index from both local and remote', async () => { + const mockDataView = { + getIndexPattern: jest.fn().mockReturnValue('logs-*,.alerts-security.alerts-default'), + }; + mockDataViewsService.get.mockResolvedValue(mockDataView as any); + + const { localIndexPatterns, remoteIndexPatterns } = + await client.getLocalAndRemoteIndexPatterns(); + + expect(localIndexPatterns).not.toContain('.alerts-security.alerts-default'); + expect(remoteIndexPatterns).not.toContain('.alerts-security.alerts-default'); + }); + }); + describe('getRemainingLogsCount', () => { it('should return document_count from ESQL response when engine is started', async () => { const mockEsqlResponse: ESQLSearchResponse = { diff --git a/x-pack/solutions/security/plugins/entity_store/server/domain/logs_extraction_client.ts b/x-pack/solutions/security/plugins/entity_store/server/domain/logs_extraction_client.ts index b455a48eabfa7..70fa7ab6f40a8 100644 --- a/x-pack/solutions/security/plugins/entity_store/server/domain/logs_extraction_client.ts +++ b/x-pack/solutions/security/plugins/entity_store/server/domain/logs_extraction_client.ts @@ -38,6 +38,7 @@ import type { } from './definitions/saved_objects'; import { ENGINE_STATUS } from './constants'; import { parseDurationToMs } from '../infra/time'; +import type { CcsLogsExtractionClient } from './ccs_logs_extraction_client'; interface LogsExtractionOptions { specificWindow?: { @@ -61,14 +62,38 @@ interface ExtractedLogsSummaryError { type ExtractedLogsSummary = ExtractedLogsSummarySuccess | ExtractedLogsSummaryError; +export interface LogsExtractionClientDependencies { + logger: Logger; + namespace: string; + esClient: ElasticsearchClient; + dataViewsService: DataViewsService; + engineDescriptorClient: EngineDescriptorClient; + ccsLogsExtractionClient: CcsLogsExtractionClient; +} + export class LogsExtractionClient { - constructor( - private logger: Logger, - private namespace: string, - private esClient: ElasticsearchClient, - private dataViewsService: DataViewsService, - private engineDescriptorClient: EngineDescriptorClient - ) {} + logger: Logger; + namespace: string; + esClient: ElasticsearchClient; + dataViewsService: DataViewsService; + engineDescriptorClient: EngineDescriptorClient; + ccsLogsExtractionClient: CcsLogsExtractionClient; + + constructor({ + logger, + namespace, + esClient, + dataViewsService, + engineDescriptorClient, + ccsLogsExtractionClient, + }: LogsExtractionClientDependencies) { + this.logger = logger; + this.namespace = namespace; + this.esClient = esClient; + this.dataViewsService = dataViewsService; + this.engineDescriptorClient = engineDescriptorClient; + this.ccsLogsExtractionClient = ccsLogsExtractionClient; + } public async extractLogs( type: EntityType, @@ -90,12 +115,13 @@ export class LogsExtractionClient { const delayMs = parseDurationToMs(engineDescriptor.logExtractionState.delay); const entityDefinition = getEntityDefinition(type, this.namespace); - const { count, pages, indexPatterns } = await this.runQueryAndIngestDocs({ - engineDescriptor, - opts, - delayMs, - entityDefinition, - }); + const { count, pages, indexPatterns, lastSearchTimestamp, ccsError } = + await this.runQueryAndIngestDocs({ + engineDescriptor, + opts, + delayMs, + entityDefinition, + }); const operationResult = { success: true as const, @@ -121,8 +147,10 @@ export class LogsExtractionClient { paginationTimestamp: undefined, paginationId: undefined, - lastExecutionTimestamp: moment().utc().toISOString(), + // Store last searched timestamp to start window from here + lastExecutionTimestamp: lastSearchTimestamp || moment().utc().toISOString(), }, + error: ccsError ? { message: ccsError.message, action: 'extractLogs' } : undefined, // we need to do a full write to overwrite pagination // id and timestamp cursors with undefined @@ -140,7 +168,7 @@ export class LogsExtractionClient { try { const engineDescriptor = await this.engineDescriptorClient.findOrThrow(type); const delayMs = parseDurationToMs(engineDescriptor.logExtractionState.delay); - const indexPatterns = await this.getIndexPatterns( + const indexPatterns = await this.getLocalIndexPatterns( engineDescriptor.logExtractionState.additionalIndexPatterns ); const { fromDateISO } = this.getExtractionWindow( @@ -184,9 +212,15 @@ export class LogsExtractionClient { opts?: LogsExtractionOptions; delayMs: number; entityDefinition: ManagedEntityDefinition; - }) { + }): Promise<{ + count: number; + pages: number; + indexPatterns: string[]; + lastSearchTimestamp: string; + ccsError?: Error; + }> { const { docsLimit } = engineDescriptor.logExtractionState; - const indexPatterns = await this.getIndexPatterns( + const { localIndexPatterns, remoteIndexPatterns } = await this.getLocalAndRemoteIndexPatterns( engineDescriptor.logExtractionState.additionalIndexPatterns ); const latestIndex = getLatestEntitiesIndexName(this.namespace); @@ -195,12 +229,61 @@ export class LogsExtractionClient { opts?.specificWindow || this.getExtractionWindow(engineDescriptor.logExtractionState, delayMs); - // This is a sanity check to ensure the extraction window is valid - // Ideally we have this validation on the API only and we trust - // that our internal logic is correct. For the time being, we validate it here - // too, so we have a few runs and understand it. this.validateExtractionWindow(fromDateISO, toDateISO); + const mainPromise = this.runMainExtractionLoop({ + engineDescriptor, + opts, + indexPatterns: localIndexPatterns, + latestIndex, + fromDateISO, + toDateISO, + docsLimit, + entityDefinition, + }); + + if (remoteIndexPatterns.length > 0) { + const ccsPromise = this.ccsLogsExtractionClient.extractToUpdates({ + type: engineDescriptor.type, + remoteIndexPatterns, + fromDateISO, + toDateISO, + docsLimit, + entityDefinition, + abortController: opts?.abortController, + }); + + const [mainResult, ccsResult] = await Promise.all([mainPromise, ccsPromise]); + + return { + ...mainResult, + indexPatterns: [...localIndexPatterns, ...remoteIndexPatterns], + ccsError: ccsResult.error, + }; + } + + return await mainPromise; + } + + private async runMainExtractionLoop({ + engineDescriptor, + opts, + indexPatterns, + latestIndex, + fromDateISO, + toDateISO, + docsLimit, + entityDefinition, + }: { + engineDescriptor: EngineDescriptor; + opts?: LogsExtractionOptions; + indexPatterns: string[]; + latestIndex: string; + fromDateISO: string; + toDateISO: string; + docsLimit: number; + entityDefinition: ManagedEntityDefinition; + }) { let totalCount = 0; let pages = 0; let pagination: PaginationParams | undefined; @@ -211,7 +294,7 @@ export class LogsExtractionClient { let recoveryId: string | undefined = engineDescriptor.logExtractionState.paginationId; if (recoveryId) { this.logger.warn( - `Recovering from corrupt state, using paginationTimestamp ${fromDateISO} and paginationId ${recoveryId} beggning of the window.` + `Recovering from corrupt state, using paginationTimestamp ${fromDateISO} and paginationId ${recoveryId} beginning of the window.` ); } do { @@ -226,7 +309,6 @@ export class LogsExtractionClient { recoveryId, }); - // Recovery id already used, clean up for next iteration recoveryId = undefined; this.logger.debug( @@ -260,20 +342,15 @@ export class LogsExtractionClient { abortController: opts?.abortController, }); - // On pagination we save the pagination to the last seen timestamp cursor - // so we can recover from corrupt state if (pagination) { await this.engineDescriptorClient.update(engineDescriptor.type, { logExtractionState: { ...engineDescriptor.logExtractionState, paginationTimestamp: pagination?.timestampCursor, paginationId: pagination?.idCursor, - lastExecutionTimestamp: moment().utc().toISOString(), }, }); } - - // should never be larger than limit, just being safe } while (pagination); opts?.abortController?.signal.removeEventListener('abort', onAbort); @@ -282,6 +359,7 @@ export class LogsExtractionClient { count: totalCount, pages, indexPatterns, + lastSearchTimestamp: toDateISO, }; } @@ -343,7 +421,45 @@ export class LogsExtractionClient { return { success: false, error }; } - public async getIndexPatterns(additionalIndexPatterns: string[] = []): Promise { + /** + * Returns local and remote (CCS) index patterns separately. + * Main extraction uses local only (LOOKUP JOIN does not support CCS). + * CCS extraction uses remote only. + */ + public async getLocalAndRemoteIndexPatterns( + additionalIndexPatterns: string[] = [] + ): Promise<{ localIndexPatterns: string[]; remoteIndexPatterns: string[] }> { + const all = await this.getAllIndexPatternsIncludingRemote(additionalIndexPatterns); + const alertsIndex = getAlertsIndexName(this.namespace); + const withoutAlerts = all.filter((index) => index !== alertsIndex); + const localIndexPatterns: string[] = []; + const remoteIndexPatterns: string[] = []; + + withoutAlerts.forEach((index) => { + if (isCCSRemoteIndexName(index)) { + remoteIndexPatterns.push(index); + } else { + localIndexPatterns.push(index); + } + }); + + return { localIndexPatterns, remoteIndexPatterns }; + } + + public async getLocalIndexPatterns(additionalIndexPatterns: string[] = []): Promise { + const { localIndexPatterns } = await this.getLocalAndRemoteIndexPatterns( + additionalIndexPatterns + ); + return localIndexPatterns; + } + + /** + * Builds the full list of index patterns (updates, additional, security data view) + * including CCS remote indices, without filtering by alerts or CCS. + */ + private async getAllIndexPatternsIncludingRemote( + additionalIndexPatterns: string[] = [] + ): Promise { const updatesDataStream = getUpdatesEntitiesDataStreamName(this.namespace); const indexPatterns: string[] = [updatesDataStream, ...additionalIndexPatterns]; @@ -351,13 +467,8 @@ export class LogsExtractionClient { const secSolDataView = await this.dataViewsService.get( getSecuritySolutionDataViewName(this.namespace) ); - - const alertsIndex = getAlertsIndexName(this.namespace); - const cleanIndices = secSolDataView - .getIndexPattern() - .split(',') - .filter((index) => index !== alertsIndex && !isCCSRemoteIndexName(index)); - indexPatterns.push(...cleanIndices); + const secSolIndices = secSolDataView.getIndexPattern().split(','); + indexPatterns.push(...secSolIndices); } catch (error) { this.logger.warn( 'Problems finding security solution data view indices, defaulting to logs-*' diff --git a/x-pack/solutions/security/plugins/entity_store/server/infra/elasticsearch/esql.test.ts b/x-pack/solutions/security/plugins/entity_store/server/infra/elasticsearch/esql.test.ts new file mode 100644 index 0000000000000..86ae313c8b258 --- /dev/null +++ b/x-pack/solutions/security/plugins/entity_store/server/infra/elasticsearch/esql.test.ts @@ -0,0 +1,119 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { ESQLSearchResponse } from '@kbn/es-types'; +import { esqlResponseToBulkObjects } from './esql'; + +describe('esqlResponseToBulkObjects', () => { + it('converts columnar ESQL response to bulk objects with type and doc', () => { + const response: ESQLSearchResponse = { + columns: [ + { name: '@timestamp', type: 'date' }, + { name: 'entity.id', type: 'keyword' }, + { name: 'entity.name', type: 'keyword' }, + ], + values: [ + ['2024-06-15T12:00:00.000Z', 'host:host-1', 'server-1'], + ['2024-06-15T12:00:00.000Z', 'host:host-2', 'server-2'], + ], + }; + + const result = esqlResponseToBulkObjects(response, 'host', []); + + expect(result).toHaveLength(2); + expect(result[0]).toEqual({ + type: 'host', + doc: { + '@timestamp': '2024-06-15T12:00:00.000Z', + 'entity.id': 'host:host-1', + 'entity.name': 'server-1', + }, + }); + expect(result[1]).toEqual({ + type: 'host', + doc: { + '@timestamp': '2024-06-15T12:00:00.000Z', + 'entity.id': 'host:host-2', + 'entity.name': 'server-2', + }, + }); + }); + + it('omits fields listed in fieldsToIgnore from each doc', () => { + const response: ESQLSearchResponse = { + columns: [ + { name: 'entity.EngineMetadata.PaginationFirstSeenLog', type: 'date' }, + { name: 'entity.id', type: 'keyword' }, + ], + values: [['2024-06-15T10:00:00.000Z', 'user:u1']], + }; + + const result = esqlResponseToBulkObjects(response, 'user', [ + 'entity.EngineMetadata.PaginationFirstSeenLog', + ]); + + expect(result).toHaveLength(1); + expect(result[0].doc).toEqual({ 'entity.id': 'user:u1' }); + expect(result[0].doc).not.toHaveProperty('entity.EngineMetadata.PaginationFirstSeenLog'); + }); + + it('skips null values in each row', () => { + const response: ESQLSearchResponse = { + columns: [ + { name: 'entity.id', type: 'keyword' }, + { name: 'entity.name', type: 'keyword' }, + ], + values: [['host:h1', null]], + }; + + const result = esqlResponseToBulkObjects(response, 'host', []); + + expect(result[0].doc).toEqual({ 'entity.id': 'host:h1' }); + expect(result[0].doc).not.toHaveProperty('entity.name'); + }); + + it('returns empty array when values is empty', () => { + const response: ESQLSearchResponse = { + columns: [{ name: 'entity.id', type: 'keyword' }], + values: [], + }; + + const result = esqlResponseToBulkObjects(response, 'host', []); + + expect(result).toEqual([]); + }); + + it('preserves dot-notation keys in doc', () => { + const response: ESQLSearchResponse = { + columns: [ + { name: 'entity.id', type: 'keyword' }, + { name: 'entity.name', type: 'keyword' }, + ], + values: [['user:u1', 'alice']], + }; + + const result = esqlResponseToBulkObjects(response, 'user', []); + + expect(result[0].doc).toEqual({ + 'entity.id': 'user:u1', + 'entity.name': 'alice', + }); + }); + + it('uses the provided entity type for every object', () => { + const response: ESQLSearchResponse = { + columns: [{ name: 'entity.id', type: 'keyword' }], + values: [['host:h1'], ['host:h2']], + }; + + const result = esqlResponseToBulkObjects(response, 'host', []); + + expect(result.every((o) => o.type === 'host')).toBe(true); + expect(result[0].type).toBe('host'); + expect(result[1].type).toBe('host'); + }); +}); diff --git a/x-pack/solutions/security/plugins/entity_store/server/infra/elasticsearch/esql.ts b/x-pack/solutions/security/plugins/entity_store/server/infra/elasticsearch/esql.ts index 03cb41877191e..0d0aceacd2c14 100644 --- a/x-pack/solutions/security/plugins/entity_store/server/infra/elasticsearch/esql.ts +++ b/x-pack/solutions/security/plugins/entity_store/server/infra/elasticsearch/esql.ts @@ -8,6 +8,8 @@ import type { TransportRequestOptions } from '@elastic/elasticsearch'; import type { ElasticsearchClient } from '@kbn/core/server'; import type { ESQLSearchResponse } from '@kbn/es-types'; +import type { EntityType } from '../../../common/domain/definitions/entity_schema'; +import type { Entity } from '../../../common/domain/definitions/entity.gen'; interface ExecuteEsqlQueryParams { esClient: ElasticsearchClient; @@ -36,3 +38,29 @@ export const executeEsqlQuery = async ({ return response; }; + +/** + * Converts columnar ESQL response to bulk objects for the CRUD client. + * Keeps flat dot-notation keys (e.g. entity.id); the CRUD API would flatten them later anyway. + */ +export const esqlResponseToBulkObjects = ( + esqlResponse: ESQLSearchResponse, + type: EntityType, + fieldsToIgnore: string[] +): Array<{ type: EntityType; doc: Entity }> => { + const { columns, values } = esqlResponse; + const objects: Array<{ type: EntityType; doc: Entity }> = []; + + for (const row of values) { + const doc: Record = {}; + for (let i = 0; i < row.length; i++) { + const key = columns[i].name; + if (fieldsToIgnore.includes(key) || row[i] === null) { + continue; + } + doc[key] = row[i]; + } + objects.push({ type, doc: doc as Entity }); + } + return objects; +}; diff --git a/x-pack/solutions/security/plugins/entity_store/server/request_context_factory.ts b/x-pack/solutions/security/plugins/entity_store/server/request_context_factory.ts index bd4d5b6a1937e..c98f54bdea91e 100644 --- a/x-pack/solutions/security/plugins/entity_store/server/request_context_factory.ts +++ b/x-pack/solutions/security/plugins/entity_store/server/request_context_factory.ts @@ -16,6 +16,7 @@ import type { import { AssetManager } from './domain/asset_manager'; import { FeatureFlags } from './infra/feature_flags'; import { EngineDescriptorClient } from './domain/definitions/saved_objects'; +import { CcsLogsExtractionClient } from './domain/ccs_logs_extraction_client'; import { LogsExtractionClient } from './domain/logs_extraction_client'; import { CRUDClient } from './domain/crud_client'; import type { TelemetryReporter } from './telemetry/events'; @@ -54,19 +55,21 @@ export async function createRequestHandlerContext({ logger ); + const esClient = core.elasticsearch.client.asCurrentUser; const crudClient = new CRUDClient({ logger, - esClient: core.elasticsearch.client.asCurrentUser, + esClient, namespace, }); - - const logsExtractionClient = new LogsExtractionClient( + const ccsLogsExtractionClient = new CcsLogsExtractionClient(logger, esClient, crudClient); + const logsExtractionClient = new LogsExtractionClient({ logger, namespace, - core.elasticsearch.client.asCurrentUser, + esClient, dataViewsService, - engineDescriptorClient - ); + engineDescriptorClient, + ccsLogsExtractionClient, + }); return { core, @@ -83,8 +86,10 @@ export async function createRequestHandlerContext({ analytics, }), crudClient, + ccsLogsExtractionClient, featureFlags: new FeatureFlags(core.uiSettings.client), logsExtractionClient, security: startPlugins.security, + namespace, }; } diff --git a/x-pack/solutions/security/plugins/entity_store/server/routes/apis/crud/upsert_bulk.ts b/x-pack/solutions/security/plugins/entity_store/server/routes/apis/crud/upsert_bulk.ts index b41cdaa3a1aec..308fbb9afb345 100644 --- a/x-pack/solutions/security/plugins/entity_store/server/routes/apis/crud/upsert_bulk.ts +++ b/x-pack/solutions/security/plugins/entity_store/server/routes/apis/crud/upsert_bulk.ts @@ -61,7 +61,10 @@ export function registerCRUDUpsertBulk(router: EntityStorePluginRouter) { } try { - const errors = await crudClient.upsertEntitiesBulk(req.body.entities, req.query.force); + const errors = await crudClient.upsertEntitiesBulk({ + objects: req.body.entities, + force: req.query.force, + }); return res.ok({ body: { ok: true, diff --git a/x-pack/solutions/security/plugins/entity_store/server/routes/apis/force_ccs_extract_to_updates.ts b/x-pack/solutions/security/plugins/entity_store/server/routes/apis/force_ccs_extract_to_updates.ts new file mode 100644 index 0000000000000..86955f20fdea2 --- /dev/null +++ b/x-pack/solutions/security/plugins/entity_store/server/routes/apis/force_ccs_extract_to_updates.ts @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { buildRouteValidationWithZod } from '@kbn/zod-helpers'; +import { z } from '@kbn/zod'; +import type { IKibanaResponse } from '@kbn/core-http-server'; +import { API_VERSIONS, DEFAULT_ENTITY_STORE_PERMISSIONS } from '../constants'; +import type { EntityStorePluginRouter } from '../../types'; +import { wrapMiddlewares } from '../middleware'; +import { EntityType } from '../../../common/domain/definitions/entity_schema'; +import { getEntityDefinition } from '../../../common/domain/definitions/registry'; + +const DEFAULT_DOCS_LIMIT = 10000; + +const paramsSchema = z.object({ + entityType: EntityType, +}); + +const bodySchema = z.object({ + indexPatterns: z.array(z.string()).min(1), + fromDateISO: z.string().datetime(), + toDateISO: z.string().datetime(), + docsLimit: z.number().int().positive().optional(), +}); + +export function registerForceCcsExtractToUpdates(router: EntityStorePluginRouter) { + router.versioned + .post({ + path: '/internal/security/entity_store/{entityType}/force_ccs_extract_to_updates', + access: 'internal', + security: { + authz: DEFAULT_ENTITY_STORE_PERMISSIONS, + }, + enableQueryVersion: true, + }) + .addVersion( + { + version: API_VERSIONS.internal.v2, + validate: { + request: { + params: buildRouteValidationWithZod(paramsSchema), + body: buildRouteValidationWithZod(bodySchema), + }, + }, + }, + wrapMiddlewares(async (ctx, req, res): Promise => { + const entityStoreCtx = await ctx.entityStore; + const { logger: baseLogger, ccsLogsExtractionClient, namespace } = entityStoreCtx; + const { entityType } = req.params; + const { indexPatterns, fromDateISO, toDateISO, docsLimit } = req.body; + + const logger = baseLogger.get('forceCcsExtractToUpdates').get(entityType); + logger.debug( + `Force CCS extract to updates API called for entity type: ${entityType}, index patterns: ${indexPatterns.join( + ', ' + )}` + ); + + const entityDefinition = getEntityDefinition(entityType, namespace); + const result = await ccsLogsExtractionClient.extractToUpdates({ + type: entityType, + remoteIndexPatterns: indexPatterns, + fromDateISO, + toDateISO, + docsLimit: docsLimit ?? DEFAULT_DOCS_LIMIT, + entityDefinition, + }); + + if (result.error) { + return res.customError({ + statusCode: 500, + body: { + message: result.error.message, + }, + }); + } + + return res.ok({ + body: { count: result.count, pages: result.pages }, + }); + }) + ); +} diff --git a/x-pack/solutions/security/plugins/entity_store/server/routes/apis/index.ts b/x-pack/solutions/security/plugins/entity_store/server/routes/apis/index.ts index 792bde4084fe7..1347ac16ccfa9 100644 --- a/x-pack/solutions/security/plugins/entity_store/server/routes/apis/index.ts +++ b/x-pack/solutions/security/plugins/entity_store/server/routes/apis/index.ts @@ -9,6 +9,7 @@ export { registerInstall } from './install'; export { registerStop } from './stop'; export { registerStatus } from './status'; export { registerForceLogExtraction } from './force_log_extraction'; +export { registerForceCcsExtractToUpdates } from './force_ccs_extract_to_updates'; export { registerUninstall } from './uninstall'; export { registerCRUDUpsert } from './crud/upsert'; export { registerCRUDUpsertBulk } from './crud/upsert_bulk'; diff --git a/x-pack/solutions/security/plugins/entity_store/server/routes/register_routes.ts b/x-pack/solutions/security/plugins/entity_store/server/routes/register_routes.ts index 640309c7a2f2d..001af14cc8326 100644 --- a/x-pack/solutions/security/plugins/entity_store/server/routes/register_routes.ts +++ b/x-pack/solutions/security/plugins/entity_store/server/routes/register_routes.ts @@ -12,6 +12,7 @@ import { registerStart, registerStatus, registerForceLogExtraction, + registerForceCcsExtractToUpdates, registerCRUDUpsert, registerCRUDUpsertBulk, registerCRUDDelete, @@ -24,6 +25,7 @@ export function registerRoutes(router: EntityStorePluginRouter) { registerStatus(router); registerUninstall(router); registerForceLogExtraction(router); + registerForceCcsExtractToUpdates(router); registerCRUDUpsert(router); registerCRUDUpsertBulk(router); registerCRUDDelete(router); diff --git a/x-pack/solutions/security/plugins/entity_store/server/tasks/factories.ts b/x-pack/solutions/security/plugins/entity_store/server/tasks/factories.ts index 6a639fe111772..08224a98e3075 100644 --- a/x-pack/solutions/security/plugins/entity_store/server/tasks/factories.ts +++ b/x-pack/solutions/security/plugins/entity_store/server/tasks/factories.ts @@ -9,6 +9,8 @@ import type { Logger } from '@kbn/logging'; import type { KibanaRequest } from '@kbn/core/server'; import type { EntityStoreCoreSetup } from '../types'; import { LogsExtractionClient } from '../domain/logs_extraction_client'; +import { CcsLogsExtractionClient } from '../domain/ccs_logs_extraction_client'; +import { CRUDClient } from '../domain/crud_client'; import { EngineDescriptorClient } from '../domain/definitions/saved_objects'; export interface LogsExtractionClientFactoryResult { @@ -37,13 +39,22 @@ export async function createLogsExtractionClient({ internalUserClient ); - const logsExtractionClient = new LogsExtractionClient( + const esClient = clusterClient.asCurrentUser; + const crudClient = new CRUDClient({ logger, + esClient, namespace, - clusterClient.asCurrentUser, + }); + const ccsLogsExtractionClient = new CcsLogsExtractionClient(logger, esClient, crudClient); + + const logsExtractionClient = new LogsExtractionClient({ + logger, + namespace, + esClient, dataViewsService, - new EngineDescriptorClient(soClient, namespace, logger) - ); + engineDescriptorClient: new EngineDescriptorClient(soClient, namespace, logger), + ccsLogsExtractionClient, + }); return { logsExtractionClient, diff --git a/x-pack/solutions/security/plugins/entity_store/server/types.ts b/x-pack/solutions/security/plugins/entity_store/server/types.ts index af92af9d9245e..53384ee649d9a 100644 --- a/x-pack/solutions/security/plugins/entity_store/server/types.ts +++ b/x-pack/solutions/security/plugins/entity_store/server/types.ts @@ -25,6 +25,7 @@ import type { SpacesPluginSetup, SpacesPluginStart } from '@kbn/spaces-plugin/se import type { CoreSetup } from '@kbn/core/server'; import type { AssetManager } from './domain/asset_manager'; import type { FeatureFlags } from './infra/feature_flags'; +import type { CcsLogsExtractionClient } from './domain/ccs_logs_extraction_client'; import type { LogsExtractionClient } from './domain/logs_extraction_client'; import type { CRUDClient } from './domain/crud_client'; import type { RegisterEntityMaintainerConfig } from './tasks/entity_maintainer/types'; @@ -48,9 +49,11 @@ export interface EntityStoreApiRequestHandlerContext { logger: Logger; assetManager: AssetManager; crudClient: CRUDClient; + ccsLogsExtractionClient: CcsLogsExtractionClient; featureFlags: FeatureFlags; logsExtractionClient: LogsExtractionClient; security: SecurityPluginStart; + namespace: string; } export type EntityStoreRequestHandlerContext = CustomRequestHandlerContext<{ diff --git a/x-pack/solutions/security/plugins/entity_store/test/scout/api/fixtures/constants.ts b/x-pack/solutions/security/plugins/entity_store/test/scout/api/fixtures/constants.ts index f0e5078c8b1a9..5a76357ab0a97 100644 --- a/x-pack/solutions/security/plugins/entity_store/test/scout/api/fixtures/constants.ts +++ b/x-pack/solutions/security/plugins/entity_store/test/scout/api/fixtures/constants.ts @@ -24,6 +24,8 @@ export const ENTITY_STORE_ROUTES = { UNINSTALL: 'internal/security/entity_store/uninstall', FORCE_LOG_EXTRACTION: (entityType: string) => `internal/security/entity_store/${entityType}/force_log_extraction`, + FORCE_CCS_EXTRACT_TO_UPDATES: (entityType: string) => + `internal/security/entity_store/${entityType}/force_ccs_extract_to_updates`, CRUD_UPSERT: (entityType: string) => `internal/security/entity_store/entities/${entityType}`, CRUD_UPSERT_BULK: 'internal/security/entity_store/entities/bulk', CRUD_DELETE: 'internal/security/entity_store/entities/', diff --git a/x-pack/solutions/security/plugins/entity_store/test/scout/api/tests/ccs_logs_extraction.spec.ts b/x-pack/solutions/security/plugins/entity_store/test/scout/api/tests/ccs_logs_extraction.spec.ts new file mode 100644 index 0000000000000..8410bc0680cea --- /dev/null +++ b/x-pack/solutions/security/plugins/entity_store/test/scout/api/tests/ccs_logs_extraction.spec.ts @@ -0,0 +1,203 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { apiTest } from '@kbn/scout-security'; +import { expect } from '@kbn/scout-security/api'; +import type { EsClient } from '@kbn/scout-security'; +import { + COMMON_HEADERS, + ENTITY_STORE_ROUTES, + ENTITY_STORE_TAGS, + UPDATES_INDEX, +} from '../fixtures/constants'; +import { FF_ENABLE_ENTITY_STORE_V2 } from '../../../../common'; + +const CCS_TEST_HOST_LOGS_INDEX = 'ccs-test-host-logs'; +const FROM_DATE = '2026-02-25T10:00:00Z'; +const TO_DATE = '2026-02-25T12:00:00Z'; +const MAX_DATE_OF_UPDATES = '2026-02-25T12:10:01Z'; + +async function createCcsTestHostLogsIndex(esClient: EsClient) { + await esClient.indices.create({ + index: CCS_TEST_HOST_LOGS_INDEX, + mappings: { + properties: { + '@timestamp': { type: 'date' }, + host: { + properties: { + entity: { properties: { id: { type: 'keyword' } } }, + id: { type: 'keyword' }, + name: { type: 'keyword' }, + domain: { type: 'keyword' }, + hostname: { type: 'keyword' }, + architecture: { type: 'keyword' }, + }, + }, + }, + }, + }); +} + +async function ingestHostDoc( + esClient: EsClient, + doc: Record & { '@timestamp': string } +) { + await esClient.index({ + index: CCS_TEST_HOST_LOGS_INDEX, + refresh: 'wait_for', + body: doc, + }); +} + +apiTest.describe( + 'Entity Store CCS logs extraction (test against local instance)', + { tag: ENTITY_STORE_TAGS }, + () => { + let defaultHeaders: Record; + + apiTest.beforeAll(async ({ samlAuth, apiClient, kbnClient }) => { + const credentials = await samlAuth.asInteractiveUser('admin'); + defaultHeaders = { + ...credentials.cookieHeader, + ...COMMON_HEADERS, + }; + + await kbnClient.uiSettings.update({ + [FF_ENABLE_ENTITY_STORE_V2]: true, + }); + + await apiClient.post(ENTITY_STORE_ROUTES.INSTALL, { + headers: defaultHeaders, + responseType: 'json', + body: {}, + }); + }); + + apiTest.afterAll(async ({ apiClient, esClient }) => { + await esClient.indices.delete( + { + index: CCS_TEST_HOST_LOGS_INDEX, + }, + { ignore: [404] } + ); + await apiClient.post(ENTITY_STORE_ROUTES.UNINSTALL, { + headers: defaultHeaders, + responseType: 'json', + body: {}, + }); + }); + + apiTest( + 'Should run CCS extraction and write aggregated host entities to updates index', + async ({ apiClient, esClient }) => { + await createCcsTestHostLogsIndex(esClient); + + // Entity A: host.entity.id — multiple docs for collect_values (host.architecture) and prefer_newest_value (entity.name from host.name) + await ingestHostDoc(esClient, { + '@timestamp': '2026-02-25T10:00:01Z', + host: { entity: { id: 'host-entity-a' }, name: 'name-a1', architecture: 'x86_64' }, + }); + await ingestHostDoc(esClient, { + '@timestamp': '2026-02-25T10:05:00Z', + host: { entity: { id: 'host-entity-a' }, name: 'name-a2', architecture: 'aarch64' }, + }); + await ingestHostDoc(esClient, { + '@timestamp': '2026-02-25T10:10:00Z', + host: { entity: { id: 'host-entity-a' }, name: 'name-a3', architecture: 'arm64' }, + }); + + // Entity B: host.id + await ingestHostDoc(esClient, { + '@timestamp': '2026-02-25T10:15:00Z', + host: { id: 'host-id-b', name: 'server-b' }, + }); + + // Entity C: host.name only (no domain) + await ingestHostDoc(esClient, { + '@timestamp': '2026-02-25T10:20:00Z', + host: { name: 'server-c' }, + }); + + // Entity D: host.name + host.domain + await ingestHostDoc(esClient, { + '@timestamp': '2026-02-25T10:25:00Z', + host: { name: 'server-d', domain: 'example.com', hostname: 'server-d' }, + }); + + const extractResponse = await apiClient.post( + ENTITY_STORE_ROUTES.FORCE_CCS_EXTRACT_TO_UPDATES('host'), + { + headers: defaultHeaders, + responseType: 'json', + body: { + indexPatterns: [CCS_TEST_HOST_LOGS_INDEX], + fromDateISO: FROM_DATE, + toDateISO: TO_DATE, + docsLimit: 1000, + }, + } + ); + expect(extractResponse.statusCode).toBe(200); + expect(extractResponse.body).toMatchObject({ count: 4, pages: 1 }); + + await esClient.indices.refresh({ index: UPDATES_INDEX }); + + const searchResponse = await esClient.search({ + index: UPDATES_INDEX, + size: 10, + query: { + range: { + '@timestamp': { + gte: TO_DATE, + lt: MAX_DATE_OF_UPDATES, + }, + }, + }, + }); + + const hits = searchResponse.hits.hits as Array<{ _source: Record }>; + expect(hits).toHaveLength(4); + + const byId = Object.fromEntries( + hits.map((h) => [ + (h._source as Record)['entity.EngineMetadata.UntypedId'] as string, + h._source, + ]) + ); + + // Entity A: prefer_newest_value (entity.name from host.name) = last value + const entityA = byId['host:host-entity-a'] as Record; + expect(entityA).toBeDefined(); + expect(entityA['entity.name']).toBe('name-a3'); + + // Entity A: collect_values (host.architecture) = multiple values, deduped + const hostArchitectureA = entityA['host.architecture']; + expect(Array.isArray(hostArchitectureA)).toBe(true); + expect((hostArchitectureA as string[]).sort()).toStrictEqual([ + 'aarch64', + 'arm64', + 'x86_64', + ]); + + // Entity B + const entityB = byId['host:host-id-b'] as Record; + expect(entityB).toBeDefined(); + expect(entityB['entity.name']).toBe('server-b'); + + // Entity C + const entityC = byId['host:server-c'] as Record; + expect(entityC).toBeDefined(); + expect(entityC['entity.name']).toBe('server-c'); + + // Entity D + const entityD = byId['host:server-d.example.com'] as Record; + expect(entityD).toBeDefined(); + expect(entityD['entity.name']).toBe('server-d'); + } + ); + } +);