Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ export const sampleDocRiskScore = (riskScore?: unknown): SignalSourceHit => ({
sort: [],
});

export const sampleEmptyDocSearchResults = (): SignalSearchResponse => ({
export const sampleEmptyDocSearchResults = () => ({
took: 10,
timed_out: false,
_shards: {
Expand All @@ -446,7 +446,10 @@ export const sampleEmptyDocSearchResults = (): SignalSearchResponse => ({
skipped: 0,
},
hits: {
total: 0,
total: {
value: 0,
relation: 'eq' as const,
},
max_score: 100,
hits: [],
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export const getEventList = async ({
eventListConfig,
indexFields,
sortOrder = 'desc',
}: EventsOptions): Promise<estypes.SearchResponse<EventDoc>> => {
}: EventsOptions): Promise<estypes.SearchResponse<EventDoc, unknown>> => {
const {
inputIndex,
ruleExecutionLogger,
Expand Down Expand Up @@ -53,14 +53,13 @@ export const getEventList = async ({
fields: indexFields,
});

const { searchResult } = await singleSearchAfter({
const searchRequest = buildEventsSearchQuery({
aggregations: undefined,
searchAfterSortIds: searchAfter,
index: inputIndex,
from: tuple.from.toISOString(),
to: tuple.to.toISOString(),
services,
ruleExecutionLogger,
pageSize: calculatedPerPage,
size: calculatedPerPage,
filter: queryFilter,
primaryTimestamp,
secondaryTimestamp,
Expand All @@ -70,6 +69,12 @@ export const getEventList = async ({
overrideBody: eventListConfig,
});

const { searchResult } = await singleSearchAfter({
searchRequest,
services,
ruleExecutionLogger,
});

ruleExecutionLogger.debug(`Retrieved events items of size: ${searchResult.hits.hits.length}`);
return searchResult;
};
Expand Down Expand Up @@ -98,6 +103,7 @@ export const getEventCount = async ({
fields: indexFields,
});
const eventSearchQueryBodyQuery = buildEventsSearchQuery({
aggregations: undefined,
index,
from: tuple.from.toISOString(),
to: tuple.to.toISOString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ export interface SignalMatch {

export type GetDocumentListInterface = (params: {
searchAfter: estypes.SortResults | undefined;
}) => Promise<estypes.SearchResponse<EventDoc | ThreatListDoc>>;
}) => Promise<estypes.SearchResponse<EventDoc | ThreatListDoc, unknown>>;

export type CreateSignalInterface = (
params: EventItem[] | ThreatListItem[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,6 @@ import type { SignalSource } from '../types';
import type { GenericBulkCreateResponse } from '../factories/bulk_create_factory';
import type { NewTermsFieldsLatest } from '../../../../../common/api/detection_engine/model/alerts';

export type RecentTermsAggResult = ESSearchResponse<
SignalSource,
{ aggregations: ReturnType<typeof buildRecentTermsAgg> }
>;

export type NewTermsAggResult = ESSearchResponse<
SignalSource,
{ aggregations: ReturnType<typeof buildNewTermsAgg> }
>;

export type CompositeDocFetchAggResult = ESSearchResponse<
SignalSource,
{ aggregations: ReturnType<typeof buildCompositeDocFetchAgg> }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,12 @@ import { SERVER_APP_ID } from '../../../../../common/constants';
import { NewTermsRuleParams } from '../../rule_schema';
import type { SecurityAlertType } from '../types';
import { singleSearchAfter } from '../utils/single_search_after';
import { buildEventsSearchQuery } from '../utils/build_events_query';
import { getFilter } from '../utils/get_filter';
import { wrapNewTermsAlerts } from './wrap_new_terms_alerts';
import { bulkCreateSuppressedNewTermsAlertsInMemory } from './bulk_create_suppressed_alerts_in_memory';
import type { EventsAndTerms } from './types';
import type {
RecentTermsAggResult,
DocFetchAggResult,
NewTermsAggResult,
CreateAlertsHook,
} from './build_new_terms_aggregation';
import type { CreateAlertsHook } from './build_new_terms_aggregation';
import type { NewTermsFieldsLatest } from '../../../../../common/api/detection_engine/model/alerts';
import {
buildRecentTermsAgg,
Expand Down Expand Up @@ -149,7 +145,7 @@ export const createNewTermsAlertType = (): SecurityAlertType<
alertSuppression: params.alertSuppression,
licensing,
});
let afterKey;
let afterKey: Record<string, string | number | null> | undefined;

const result = createSearchAfterReturnType();

Expand All @@ -170,12 +166,7 @@ export const createNewTermsAlertType = (): SecurityAlertType<
// PHASE 1: Fetch a page of terms using a composite aggregation. This will collect a page from
// all of the terms seen over the last rule interval. In the next phase we'll determine which
// ones are new.
const {
searchResult,
searchDuration,
searchErrors,
loggedRequests: firstPhaseLoggedRequests = [],
} = await singleSearchAfter({
const searchRequest = buildEventsSearchQuery({
aggregations: buildRecentTermsAgg({
fields: params.newTermsFields,
after: afterKey,
Expand All @@ -185,13 +176,22 @@ export const createNewTermsAlertType = (): SecurityAlertType<
// The time range for the initial composite aggregation is the rule interval, `from` and `to`
from: tuple.from.toISOString(),
to: tuple.to.toISOString(),
services,
ruleExecutionLogger,
filter: esFilter,
pageSize: 0,
size: 0,
primaryTimestamp,
secondaryTimestamp,
runtimeMappings,
});

const {
searchResult,
searchDuration,
searchErrors,
loggedRequests: firstPhaseLoggedRequests = [],
} = await singleSearchAfter({
searchRequest,
services,
ruleExecutionLogger,
loggedRequestsConfig: isLoggedRequestsEnabled
? {
type: 'findAllTerms',
Expand All @@ -203,8 +203,7 @@ export const createNewTermsAlertType = (): SecurityAlertType<
: undefined,
});
loggedRequests.push(...firstPhaseLoggedRequests);
const searchResultWithAggs = searchResult as RecentTermsAggResult;
if (!searchResultWithAggs.aggregations) {
if (!searchResult.aggregations) {
throw new Error('Aggregations were missing on recent terms search result');
}
logger.debug(`Time spent on composite agg: ${searchDuration}`);
Expand All @@ -214,10 +213,10 @@ export const createNewTermsAlertType = (): SecurityAlertType<

// If the aggregation returns no after_key it signals that we've paged through all results
// and the current page is empty so we can immediately break.
if (searchResultWithAggs.aggregations.new_terms.after_key == null) {
if (searchResult.aggregations.new_terms.after_key == null) {
break;
}
const bucketsForField = searchResultWithAggs.aggregations.new_terms.buckets;
const bucketsForField = searchResult.aggregations.new_terms.buckets;

const createAlertsHook: CreateAlertsHook = async (aggResult) => {
const eventsAndTerms: EventsAndTerms[] = (
Expand Down Expand Up @@ -311,12 +310,7 @@ export const createNewTermsAlertType = (): SecurityAlertType<
// The aggregation filters out buckets for terms that exist prior to `tuple.from`, so the buckets in the
// response correspond to each new term.
const includeValues = transformBucketsToValues(params.newTermsFields, bucketsForField);
const {
searchResult: pageSearchResult,
searchDuration: pageSearchDuration,
searchErrors: pageSearchErrors,
loggedRequests: pageSearchLoggedRequests = [],
} = await singleSearchAfter({
const pageSearchRequest = buildEventsSearchQuery({
aggregations: buildNewTermsAgg({
newValueWindowStart: tuple.from,
timestampField: aggregatableTimestampField,
Expand All @@ -330,12 +324,20 @@ export const createNewTermsAlertType = (): SecurityAlertType<
// in addition to the rule interval
from: parsedHistoryWindowSize.toISOString(),
to: tuple.to.toISOString(),
services,
ruleExecutionLogger,
filter: esFilter,
pageSize: 0,
size: 0,
primaryTimestamp,
secondaryTimestamp,
});
const {
searchResult: pageSearchResult,
searchDuration: pageSearchDuration,
searchErrors: pageSearchErrors,
loggedRequests: pageSearchLoggedRequests = [],
} = await singleSearchAfter({
searchRequest: pageSearchRequest,
services,
ruleExecutionLogger,
loggedRequestsConfig: isLoggedRequestsEnabled
? {
type: 'findNewTerms',
Expand All @@ -350,26 +352,20 @@ export const createNewTermsAlertType = (): SecurityAlertType<

logger.debug(`Time spent on phase 2 terms agg: ${pageSearchDuration}`);

const pageSearchResultWithAggs = pageSearchResult as NewTermsAggResult;
if (!pageSearchResultWithAggs.aggregations) {
if (!pageSearchResult.aggregations) {
throw new Error('Aggregations were missing on new terms search result');
}

// PHASE 3: For each term that is not in the history window, fetch the oldest document in
// the rule interval for that term. This is the first document to contain the new term, and will
// become the basis of the resulting alert.
// One document could become multiple alerts if the document contains an array with multiple new terms.
if (pageSearchResultWithAggs.aggregations.new_terms.buckets.length > 0) {
const actualNewTerms = pageSearchResultWithAggs.aggregations.new_terms.buckets.map(
if (pageSearchResult.aggregations.new_terms.buckets.length > 0) {
const actualNewTerms = pageSearchResult.aggregations.new_terms.buckets.map(
(bucket) => bucket.key
);

const {
searchResult: docFetchSearchResult,
searchDuration: docFetchSearchDuration,
searchErrors: docFetchSearchErrors,
loggedRequests: docFetchLoggedRequests = [],
} = await singleSearchAfter({
const docFetchSearchRequest = buildEventsSearchQuery({
aggregations: buildDocFetchAgg({
timestampField: aggregatableTimestampField,
field: params.newTermsFields[0],
Expand All @@ -381,12 +377,20 @@ export const createNewTermsAlertType = (): SecurityAlertType<
// For phase 3, we go back to aggregating only over the rule interval - excluding the history window
from: tuple.from.toISOString(),
to: tuple.to.toISOString(),
services,
ruleExecutionLogger,
filter: esFilter,
pageSize: 0,
size: 0,
primaryTimestamp,
secondaryTimestamp,
});
const {
searchResult: docFetchSearchResult,
searchDuration: docFetchSearchDuration,
searchErrors: docFetchSearchErrors,
loggedRequests: docFetchLoggedRequests = [],
} = await singleSearchAfter({
searchRequest: docFetchSearchRequest,
services,
ruleExecutionLogger,
loggedRequestsConfig: isLoggedRequestsEnabled
? {
type: 'findDocuments',
Expand All @@ -401,13 +405,11 @@ export const createNewTermsAlertType = (): SecurityAlertType<
result.errors.push(...docFetchSearchErrors);
loggedRequests.push(...docFetchLoggedRequests);

const docFetchResultWithAggs = docFetchSearchResult as DocFetchAggResult;

if (!docFetchResultWithAggs.aggregations) {
if (!docFetchSearchResult.aggregations) {
throw new Error('Aggregations were missing on document fetch search result');
}

const bulkCreateResult = await createAlertsHook(docFetchResultWithAggs);
const bulkCreateResult = await createAlertsHook(docFetchSearchResult);

if (bulkCreateResult.alertsWereTruncated) {
result.warningMessages.push(
Expand All @@ -420,7 +422,7 @@ export const createNewTermsAlertType = (): SecurityAlertType<
}
}

afterKey = searchResultWithAggs.aggregations.new_terms.after_key;
afterKey = searchResult.aggregations.new_terms.after_key;
}

scheduleNotificationResponseActionsService({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
stringifyAfterKey,
} from '../utils/utils';
import type { GenericBulkCreateResponse } from '../utils/bulk_create_with_suppression';
import { buildEventsSearchQuery } from '../utils/build_events_query';

import type {
SecurityRuleServices,
Expand Down Expand Up @@ -135,12 +136,7 @@ const multiTermsCompositeNonRetryable = async ({
// PHASE 2: Take the page of results from Phase 1 and determine if each term exists in the history window.
// The aggregation filters out buckets for terms that exist prior to `tuple.from`, so the buckets in the
// response correspond to each new term.
const {
searchResult: pageSearchResult,
searchDuration: pageSearchDuration,
searchErrors: pageSearchErrors,
loggedRequests: pageSearchLoggedRequests = [],
} = await singleSearchAfter({
const searchRequest = buildEventsSearchQuery({
aggregations: buildCompositeNewTermsAgg({
newValueWindowStart: tuple.from,
timestampField: aggregatableTimestampField,
Expand All @@ -155,12 +151,20 @@ const multiTermsCompositeNonRetryable = async ({
// in addition to the rule interval
from: parsedHistoryWindowSize.toISOString(),
to: tuple.to.toISOString(),
services,
ruleExecutionLogger,
filter: esFilterForBatch,
pageSize: 0,
size: 0,
primaryTimestamp,
secondaryTimestamp,
});
const {
searchResult: pageSearchResult,
searchDuration: pageSearchDuration,
searchErrors: pageSearchErrors,
loggedRequests: pageSearchLoggedRequests = [],
} = await singleSearchAfter({
searchRequest,
services,
ruleExecutionLogger,
loggedRequestsConfig: isLoggedRequestsEnabled
? {
type: 'findNewTerms',
Expand All @@ -187,12 +191,7 @@ const multiTermsCompositeNonRetryable = async ({
// become the basis of the resulting alert.
// One document could become multiple alerts if the document contains an array with multiple new terms.
if (pageSearchResultWithAggs.aggregations.new_terms.buckets.length > 0) {
const {
searchResult: docFetchSearchResult,
searchDuration: docFetchSearchDuration,
searchErrors: docFetchSearchErrors,
loggedRequests: docFetchLoggedRequests = [],
} = await singleSearchAfter({
const searchRequestPhase3 = buildEventsSearchQuery({
aggregations: buildCompositeDocFetchAgg({
newValueWindowStart: tuple.from,
timestampField: aggregatableTimestampField,
Expand All @@ -205,12 +204,20 @@ const multiTermsCompositeNonRetryable = async ({
index: inputIndex,
from: parsedHistoryWindowSize.toISOString(),
to: tuple.to.toISOString(),
services,
ruleExecutionLogger,
filter: esFilterForBatch,
pageSize: 0,
size: 0,
primaryTimestamp,
secondaryTimestamp,
});
const {
searchResult: docFetchSearchResult,
searchDuration: docFetchSearchDuration,
searchErrors: docFetchSearchErrors,
loggedRequests: docFetchLoggedRequests = [],
} = await singleSearchAfter({
searchRequest: searchRequestPhase3,
services,
ruleExecutionLogger,
loggedRequestsConfig: isLoggedRequestsEnabled
? {
type: 'findDocuments',
Expand Down
Loading