Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@
import { SavedObjectsClientContract } from '@kbn/core-saved-objects-api-server';
import { IScopedClusterClient, Logger } from '@kbn/core/server';
import { EntityV2 } from '@kbn/entities-schema';
import { without } from 'lodash';
import {
ReadSourceDefinitionOptions,
readSourceDefinitions,
storeSourceDefinition,
} from './definitions/source_definition';
import { readTypeDefinitions, storeTypeDefinition } from './definitions/type_definition';
import { getEntityInstancesQuery } from './queries';
import { mergeEntitiesList } from './queries/utils';
import { mergeEntitiesList, sortEntitiesList } from './queries/utils';
import {
EntitySourceDefinition,
EntityTypeDefinition,
Expand All @@ -25,6 +24,7 @@ import {
} from './types';
import { UnknownEntityType } from './errors/unknown_entity_type';
import { runESQLQuery } from './run_esql_query';
import { validateFields } from './validate_fields';

export class EntityClient {
constructor(
Expand Down Expand Up @@ -59,72 +59,63 @@ export class EntityClient {
sort,
limit,
}: SearchBySources) {
const entities = await Promise.all(
sources.map(async (source) => {
const mandatoryFields = [
...source.identity_fields,
...(source.timestamp_field ? [source.timestamp_field] : []),
...(source.display_name ? [source.display_name] : []),
];
const metaFields = [...metadataFields, ...source.metadata_fields];

// operations on an unmapped field result in a failing query so we verify
// field capabilities beforehand
const { fields } = await this.options.clusterClient.asCurrentUser.fieldCaps({
index: source.index_patterns,
fields: [...mandatoryFields, ...metaFields],
});

const sourceHasMandatoryFields = mandatoryFields.every((field) => !!fields[field]);
if (!sourceHasMandatoryFields) {
// we can't build entities without id fields so we ignore the source.
// TODO filters should likely behave similarly. we should also throw
const missingFields = mandatoryFields.filter((field) => !fields[field]);
this.options.logger.info(
`Ignoring source for type [${source.type_id}] with index_patterns [${
source.index_patterns
}] because some mandatory fields [${missingFields.join(', ')}] are not mapped`
);
return [];
}

// but metadata field not being available is fine
const availableMetadataFields = metaFields.filter((field) => fields[field]);
if (availableMetadataFields.length < metaFields.length) {
this.options.logger.info(
`Ignoring unmapped fields [${without(metaFields, ...availableMetadataFields).join(
', '
)}]`
);
}

const { query, filter } = getEntityInstancesQuery({
source: {
...source,
metadata_fields: availableMetadataFields,
filters: [...source.filters, ...filters],
},
start,
end,
sort,
limit,
});
this.options.logger.debug(
() => `Entity query: ${query}\nfilter: ${JSON.stringify(filter, null, 2)}`
);

const rawEntities = await runESQLQuery<EntityV2>('resolve entities', {
query,
filter,
esClient: this.options.clusterClient.asCurrentUser,
logger: this.options.logger,
});

return rawEntities;
})
).then((results) => results.flat());

return mergeEntitiesList(sources, entities).slice(0, limit);
const searches = sources.map(async (source) => {
const availableMetadataFields = await validateFields({
source,
metadataFields,
esClient: this.options.clusterClient.asCurrentUser,
logger: this.options.logger,
});

const { query, filter } = getEntityInstancesQuery({
source: {
...source,
metadata_fields: availableMetadataFields,
filters: [...source.filters, ...filters],
},
start,
end,
sort,
limit,
});
this.options.logger.debug(
() => `Entity query: ${query}\nfilter: ${JSON.stringify(filter, null, 2)}`
);

const rawEntities = await runESQLQuery<EntityV2>('resolve entities', {
query,
filter,
esClient: this.options.clusterClient.asCurrentUser,
logger: this.options.logger,
});

return rawEntities;
});

const results = await Promise.allSettled(searches);
const entities = (
results.filter((result) => result.status === 'fulfilled') as Array<
PromiseFulfilledResult<EntityV2[]>
>
).flatMap((result) => result.value);
const errors = (
results.filter((result) => result.status === 'rejected') as PromiseRejectedResult[]
).map((result) => result.reason.message);

if (sources.length === 1) {
return { entities, errors };
}

// we have to manually merge, sort and limit entities since we run
// independant queries for each source
return {
errors,
entities: sortEntitiesList({
sources,
sort,
entities: mergeEntitiesList({ entities, sources, metadataFields }),
}).slice(0, limit),
};
}

async storeTypeDefinition(type: EntityTypeDefinition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ describe('getEntityInstancesQuery', () => {

expect(query).toEqual(
'FROM logs-*, metrics-* | ' +
'STATS host.name = VALUES(host.name::keyword), entity.last_seen_timestamp = MAX(custom_timestamp_field), service.id = MAX(service.id::keyword) BY service.name::keyword | ' +
'STATS host.name = TOP(host.name::keyword, 10, "asc"), entity.last_seen_timestamp = MAX(custom_timestamp_field), service.id = MAX(service.id::keyword) BY service.name::keyword | ' +
'RENAME `service.name::keyword` AS service.name | ' +
'EVAL entity.type = "service", entity.id = service.name, entity.display_name = COALESCE(service.id, entity.id) | ' +
'SORT entity.id DESC | ' +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import { fromKueryExpression, toElasticsearchQuery } from '@kbn/es-query';
import { asKeyword } from './utils';
import { asKeyword, defaultSort } from './utils';
import { EntitySourceDefinition, SortBy } from '../types';

const sourceCommand = ({ source }: { source: EntitySourceDefinition }) => {
Expand Down Expand Up @@ -46,7 +46,7 @@ const dslFilter = ({
const statsCommand = ({ source }: { source: EntitySourceDefinition }) => {
const aggs = source.metadata_fields
.filter((field) => !source.identity_fields.some((idField) => idField === field))
.map((field) => `${field} = VALUES(${asKeyword(field)})`);
.map((field) => `${field} = TOP(${asKeyword(field)}, 10, "asc")`);

if (source.timestamp_field) {
aggs.push(`entity.last_seen_timestamp = MAX(${source.timestamp_field})`);
Expand Down Expand Up @@ -84,15 +84,11 @@ const evalCommand = ({ source }: { source: EntitySourceDefinition }) => {
};

const sortCommand = ({ source, sort }: { source: EntitySourceDefinition; sort?: SortBy }) => {
if (sort) {
return `SORT ${sort.field} ${sort.direction}`;
if (!sort) {
sort = defaultSort([source]);
}

if (source.timestamp_field) {
return `SORT entity.last_seen_timestamp DESC`;
}

return `SORT entity.id ASC`;
return `SORT ${sort.field} ${sort.direction}`;
};

export function getEntityInstancesQuery({
Expand Down
Loading