Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ export const sampleDocRiskScore = (riskScore?: unknown): SignalSourceHit => ({
sort: [],
});

export const sampleEmptyDocSearchResults = (): SignalSearchResponse => ({
export const sampleEmptyDocSearchResults = () => ({
took: 10,
timed_out: false,
_shards: {
Expand All @@ -446,7 +446,10 @@ export const sampleEmptyDocSearchResults = (): SignalSearchResponse => ({
skipped: 0,
},
hits: {
total: 0,
total: {
value: 0,
relation: 'eq' as const,
},
max_score: 100,
hits: [],
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export const getEventList = async ({
eventListConfig,
indexFields,
sortOrder = 'desc',
}: EventsOptions): Promise<estypes.SearchResponse<EventDoc>> => {
}: EventsOptions): Promise<estypes.SearchResponse<EventDoc, unknown>> => {
const {
inputIndex,
ruleExecutionLogger,
Expand Down Expand Up @@ -53,14 +53,13 @@ export const getEventList = async ({
fields: indexFields,
});

const { searchResult } = await singleSearchAfter({
const searchRequest = buildEventsSearchQuery({
aggregations: undefined,
searchAfterSortIds: searchAfter,
index: inputIndex,
from: tuple.from.toISOString(),
to: tuple.to.toISOString(),
services,
ruleExecutionLogger,
pageSize: calculatedPerPage,
size: calculatedPerPage,
filter: queryFilter,
primaryTimestamp,
secondaryTimestamp,
Expand All @@ -70,6 +69,12 @@ export const getEventList = async ({
overrideBody: eventListConfig,
});

const { searchResult } = await singleSearchAfter({
searchRequest,
services,
ruleExecutionLogger,
});

ruleExecutionLogger.debug(`Retrieved events items of size: ${searchResult.hits.hits.length}`);
return searchResult;
};
Expand Down Expand Up @@ -98,6 +103,7 @@ export const getEventCount = async ({
fields: indexFields,
});
const eventSearchQueryBodyQuery = buildEventsSearchQuery({
aggregations: undefined,
index,
from: tuple.from.toISOString(),
to: tuple.to.toISOString(),
Expand All @@ -107,7 +113,7 @@ export const getEventCount = async ({
secondaryTimestamp,
searchAfterSortIds: undefined,
runtimeMappings: undefined,
}).body?.query;
}).query;
const response = await esClient.count({
body: { query: eventSearchQueryBodyQuery },
ignore_unavailable: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ export interface SignalMatch {

export type GetDocumentListInterface = (params: {
searchAfter: estypes.SortResults | undefined;
}) => Promise<estypes.SearchResponse<EventDoc | ThreatListDoc>>;
}) => Promise<estypes.SearchResponse<EventDoc | ThreatListDoc, unknown>>;

export type CreateSignalInterface = (
params: EventItem[] | ThreatListItem[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* 2.0.
*/

import { set } from '@kbn/safer-lodash-set';
import dateMath from '@kbn/datemath';
import type { KibanaRequest, SavedObjectsClientContract } from '@kbn/core/server';
import type { MlPluginSetup } from '@kbn/ml-plugin/server';
Expand Down Expand Up @@ -54,9 +55,9 @@ export const findMlSignals = async ({

if (isLoggedRequestsEnabled) {
const searchQuery = buildAnomalyQuery(params);
searchQuery.index = '.ml-anomalies-*';
set(searchQuery, 'body.index', '.ml-anomalies-*');
loggedRequests.push({
request: logSearchRequest(searchQuery),
request: logSearchRequest(searchQuery.body),
description: i18n.ML_SEARCH_ANOMALIES_DESCRIPTION,
duration: anomalyResults.took,
request_type: 'findAnomalies',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,6 @@ import type { SignalSource } from '../types';
import type { GenericBulkCreateResponse } from '../factories/bulk_create_factory';
import type { NewTermsFieldsLatest } from '../../../../../common/api/detection_engine/model/alerts';

export type RecentTermsAggResult = ESSearchResponse<
SignalSource,
{ body: { aggregations: ReturnType<typeof buildRecentTermsAgg> } }
>;

export type NewTermsAggResult = ESSearchResponse<
SignalSource,
{ body: { aggregations: ReturnType<typeof buildNewTermsAgg> } }
>;

export type CompositeDocFetchAggResult = ESSearchResponse<
SignalSource,
{ body: { aggregations: ReturnType<typeof buildCompositeDocFetchAgg> } }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,12 @@ import { SERVER_APP_ID } from '../../../../../common/constants';
import { NewTermsRuleParams } from '../../rule_schema';
import type { SecurityAlertType } from '../types';
import { singleSearchAfter } from '../utils/single_search_after';
import { buildEventsSearchQuery } from '../utils/build_events_query';
import { getFilter } from '../utils/get_filter';
import { wrapNewTermsAlerts } from './wrap_new_terms_alerts';
import { bulkCreateSuppressedNewTermsAlertsInMemory } from './bulk_create_suppressed_alerts_in_memory';
import type { EventsAndTerms } from './types';
import type {
RecentTermsAggResult,
DocFetchAggResult,
NewTermsAggResult,
CreateAlertsHook,
} from './build_new_terms_aggregation';
import type { CreateAlertsHook } from './build_new_terms_aggregation';
import type { NewTermsFieldsLatest } from '../../../../../common/api/detection_engine/model/alerts';
import {
buildRecentTermsAgg,
Expand Down Expand Up @@ -149,7 +145,7 @@ export const createNewTermsAlertType = (): SecurityAlertType<
alertSuppression: params.alertSuppression,
licensing,
});
let afterKey;
let afterKey: Record<string, string | number | null> | undefined;

const result = createSearchAfterReturnType();

Expand All @@ -170,12 +166,7 @@ export const createNewTermsAlertType = (): SecurityAlertType<
// PHASE 1: Fetch a page of terms using a composite aggregation. This will collect a page from
// all of the terms seen over the last rule interval. In the next phase we'll determine which
// ones are new.
const {
searchResult,
searchDuration,
searchErrors,
loggedRequests: firstPhaseLoggedRequests = [],
} = await singleSearchAfter({
const searchRequest = buildEventsSearchQuery({
aggregations: buildRecentTermsAgg({
fields: params.newTermsFields,
after: afterKey,
Expand All @@ -185,13 +176,22 @@ export const createNewTermsAlertType = (): SecurityAlertType<
// The time range for the initial composite aggregation is the rule interval, `from` and `to`
from: tuple.from.toISOString(),
to: tuple.to.toISOString(),
services,
ruleExecutionLogger,
filter: esFilter,
pageSize: 0,
size: 0,
primaryTimestamp,
secondaryTimestamp,
runtimeMappings,
});

const {
searchResult,
searchDuration,
searchErrors,
loggedRequests: firstPhaseLoggedRequests = [],
} = await singleSearchAfter({
searchRequest,
services,
ruleExecutionLogger,
loggedRequestsConfig: isLoggedRequestsEnabled
? {
type: 'findAllTerms',
Expand All @@ -203,8 +203,7 @@ export const createNewTermsAlertType = (): SecurityAlertType<
: undefined,
});
loggedRequests.push(...firstPhaseLoggedRequests);
const searchResultWithAggs = searchResult as RecentTermsAggResult;
if (!searchResultWithAggs.aggregations) {
if (!searchResult.aggregations) {
throw new Error('Aggregations were missing on recent terms search result');
}
logger.debug(`Time spent on composite agg: ${searchDuration}`);
Expand All @@ -214,10 +213,10 @@ export const createNewTermsAlertType = (): SecurityAlertType<

// If the aggregation returns no after_key it signals that we've paged through all results
// and the current page is empty so we can immediately break.
if (searchResultWithAggs.aggregations.new_terms.after_key == null) {
if (searchResult.aggregations.new_terms.after_key == null) {
break;
}
const bucketsForField = searchResultWithAggs.aggregations.new_terms.buckets;
const bucketsForField = searchResult.aggregations.new_terms.buckets;

const createAlertsHook: CreateAlertsHook = async (aggResult) => {
const eventsAndTerms: EventsAndTerms[] = (
Expand Down Expand Up @@ -311,12 +310,7 @@ export const createNewTermsAlertType = (): SecurityAlertType<
// The aggregation filters out buckets for terms that exist prior to `tuple.from`, so the buckets in the
// response correspond to each new term.
const includeValues = transformBucketsToValues(params.newTermsFields, bucketsForField);
const {
searchResult: pageSearchResult,
searchDuration: pageSearchDuration,
searchErrors: pageSearchErrors,
loggedRequests: pageSearchLoggedRequests = [],
} = await singleSearchAfter({
const pageSearchRequest = buildEventsSearchQuery({
aggregations: buildNewTermsAgg({
newValueWindowStart: tuple.from,
timestampField: aggregatableTimestampField,
Expand All @@ -330,12 +324,20 @@ export const createNewTermsAlertType = (): SecurityAlertType<
// in addition to the rule interval
from: parsedHistoryWindowSize.toISOString(),
to: tuple.to.toISOString(),
services,
ruleExecutionLogger,
filter: esFilter,
pageSize: 0,
size: 0,
primaryTimestamp,
secondaryTimestamp,
});
const {
searchResult: pageSearchResult,
searchDuration: pageSearchDuration,
searchErrors: pageSearchErrors,
loggedRequests: pageSearchLoggedRequests = [],
} = await singleSearchAfter({
searchRequest: pageSearchRequest,
services,
ruleExecutionLogger,
loggedRequestsConfig: isLoggedRequestsEnabled
? {
type: 'findNewTerms',
Expand All @@ -350,26 +352,20 @@ export const createNewTermsAlertType = (): SecurityAlertType<

logger.debug(`Time spent on phase 2 terms agg: ${pageSearchDuration}`);

const pageSearchResultWithAggs = pageSearchResult as NewTermsAggResult;
if (!pageSearchResultWithAggs.aggregations) {
if (!pageSearchResult.aggregations) {
throw new Error('Aggregations were missing on new terms search result');
}

// PHASE 3: For each term that is not in the history window, fetch the oldest document in
// the rule interval for that term. This is the first document to contain the new term, and will
// become the basis of the resulting alert.
// One document could become multiple alerts if the document contains an array with multiple new terms.
if (pageSearchResultWithAggs.aggregations.new_terms.buckets.length > 0) {
const actualNewTerms = pageSearchResultWithAggs.aggregations.new_terms.buckets.map(
if (pageSearchResult.aggregations.new_terms.buckets.length > 0) {
const actualNewTerms = pageSearchResult.aggregations.new_terms.buckets.map(
(bucket) => bucket.key
);

const {
searchResult: docFetchSearchResult,
searchDuration: docFetchSearchDuration,
searchErrors: docFetchSearchErrors,
loggedRequests: docFetchLoggedRequests = [],
} = await singleSearchAfter({
const docFetchSearchRequest = buildEventsSearchQuery({
aggregations: buildDocFetchAgg({
timestampField: aggregatableTimestampField,
field: params.newTermsFields[0],
Expand All @@ -381,12 +377,20 @@ export const createNewTermsAlertType = (): SecurityAlertType<
// For phase 3, we go back to aggregating only over the rule interval - excluding the history window
from: tuple.from.toISOString(),
to: tuple.to.toISOString(),
services,
ruleExecutionLogger,
filter: esFilter,
pageSize: 0,
size: 0,
primaryTimestamp,
secondaryTimestamp,
});
const {
searchResult: docFetchSearchResult,
searchDuration: docFetchSearchDuration,
searchErrors: docFetchSearchErrors,
loggedRequests: docFetchLoggedRequests = [],
} = await singleSearchAfter({
searchRequest: docFetchSearchRequest,
services,
ruleExecutionLogger,
loggedRequestsConfig: isLoggedRequestsEnabled
? {
type: 'findDocuments',
Expand All @@ -401,13 +405,11 @@ export const createNewTermsAlertType = (): SecurityAlertType<
result.errors.push(...docFetchSearchErrors);
loggedRequests.push(...docFetchLoggedRequests);

const docFetchResultWithAggs = docFetchSearchResult as DocFetchAggResult;

if (!docFetchResultWithAggs.aggregations) {
if (!docFetchSearchResult.aggregations) {
throw new Error('Aggregations were missing on document fetch search result');
}

const bulkCreateResult = await createAlertsHook(docFetchResultWithAggs);
const bulkCreateResult = await createAlertsHook(docFetchSearchResult);

if (bulkCreateResult.alertsWereTruncated) {
result.warningMessages.push(
Expand All @@ -420,7 +422,7 @@ export const createNewTermsAlertType = (): SecurityAlertType<
}
}

afterKey = searchResultWithAggs.aggregations.new_terms.after_key;
afterKey = searchResult.aggregations.new_terms.after_key;
}

scheduleNotificationResponseActionsService({
Expand Down
Loading