Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export interface SearchServiceParams {

export interface SearchServiceFetchParams extends SearchServiceParams {
index: string;
includeFrozen?: boolean;
}

export interface SearchServiceValue {
Expand All @@ -50,5 +51,4 @@ export interface AsyncSearchProviderProgress {
loadedFieldCanditates: number;
loadedFieldValuePairs: number;
loadedHistograms: number;
getOverallProgress: () => number;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,76 +5,45 @@
* 2.0.
*/

import { shuffle, range } from 'lodash';
import { range } from 'lodash';
import type { ElasticsearchClient } from 'src/core/server';
import type { ApmIndicesConfig } from '../../settings/apm_indices/get_apm_indices';
import { fetchTransactionDurationFieldCandidates } from './query_field_candidates';
import { fetchTransactionDurationFieldValuePairs } from './query_field_value_pairs';
import { fetchTransactionDurationPercentiles } from './query_percentiles';
import { fetchTransactionDurationCorrelation } from './query_correlation';
import { fetchTransactionDurationHistogramRangeSteps } from './query_histogram_range_steps';
import { fetchTransactionDurationRanges, HistogramItem } from './query_ranges';
import type {
AsyncSearchProviderProgress,
SearchServiceParams,
SearchServiceFetchParams,
SearchServiceValue,
} from '../../../../common/search_strategies/correlations/types';
import { computeExpectationsAndRanges } from './utils/aggregation_utils';
import { fetchTransactionDurationFractions } from './query_fractions';

const CORRELATION_THRESHOLD = 0.3;
const KS_TEST_THRESHOLD = 0.1;

const currentTimeAsString = () => new Date().toISOString();
import type { ApmIndicesConfig } from '../../settings/apm_indices/get_apm_indices';
import {
fetchTransactionDurationFieldCandidates,
fetchTransactionDurationFieldValuePairs,
fetchTransactionDurationFractions,
fetchTransactionDurationPercentiles,
fetchTransactionDurationHistograms,
fetchTransactionDurationHistogramRangeSteps,
fetchTransactionDurationRanges,
} from './queries';
import { computeExpectationsAndRanges } from './utils';
import { asyncSearchServiceLogProvider } from './async_search_service_log';
import { asyncSearchServiceStateProvider } from './async_search_service_state';

export const asyncSearchServiceProvider = (
esClient: ElasticsearchClient,
getApmIndices: () => Promise<ApmIndicesConfig>,
searchServiceParams: SearchServiceParams,
includeFrozen: boolean
) => {
let isCancelled = false;
let isRunning = true;
let error: Error;
let ccsWarning = false;
const log: string[] = [];
const logMessage = (message: string) =>
log.push(`${currentTimeAsString()}: ${message}`);

const progress: AsyncSearchProviderProgress = {
started: Date.now(),
loadedHistogramStepsize: 0,
loadedOverallHistogram: 0,
loadedFieldCanditates: 0,
loadedFieldValuePairs: 0,
loadedHistograms: 0,
getOverallProgress: () =>
progress.loadedHistogramStepsize * 0.025 +
progress.loadedOverallHistogram * 0.025 +
progress.loadedFieldCanditates * 0.025 +
progress.loadedFieldValuePairs * 0.025 +
progress.loadedHistograms * 0.9,
};
const { addLogMessage, getLogMessages } = asyncSearchServiceLogProvider();

const values: SearchServiceValue[] = [];
let overallHistogram: HistogramItem[] | undefined;
const state = asyncSearchServiceStateProvider();

let percentileThresholdValue: number;

const cancel = () => {
logMessage(`Service cancelled.`);
isCancelled = true;
};

const fetchCorrelations = async () => {
async function fetchCorrelations() {
let params: SearchServiceFetchParams | undefined;

try {
const indices = await getApmIndices();
params = {
...searchServiceParams,
index: indices['apm_oss.transactionIndices'],
includeFrozen,
};

// 95th percentile to be displayed as a marker in the log log chart
Expand All @@ -86,37 +55,40 @@ export const asyncSearchServiceProvider = (
params,
params.percentileThreshold ? [params.percentileThreshold] : undefined
);
percentileThresholdValue =
const percentileThresholdValue =
percentileThreshold[`${params.percentileThreshold}.0`];
state.setPercentileThresholdValue(percentileThresholdValue);

logMessage(
addLogMessage(
`Fetched ${params.percentileThreshold}th percentile value of ${percentileThresholdValue} based on ${totalDocs} documents.`
);

// finish early if we weren't able to identify the percentileThresholdValue.
if (percentileThresholdValue === undefined) {
logMessage(
addLogMessage(
`Abort service since percentileThresholdValue could not be determined.`
);
progress.loadedHistogramStepsize = 1;
progress.loadedOverallHistogram = 1;
progress.loadedFieldCanditates = 1;
progress.loadedFieldValuePairs = 1;
progress.loadedHistograms = 1;
isRunning = false;
state.setProgress({
loadedHistogramStepsize: 1,
loadedOverallHistogram: 1,
loadedFieldCanditates: 1,
loadedFieldValuePairs: 1,
loadedHistograms: 1,
});
state.setIsRunning(false);
return;
}

const histogramRangeSteps = await fetchTransactionDurationHistogramRangeSteps(
esClient,
params
);
progress.loadedHistogramStepsize = 1;
state.setProgress({ loadedHistogramStepsize: 1 });

logMessage(`Loaded histogram range steps.`);
addLogMessage(`Loaded histogram range steps.`);

if (isCancelled) {
isRunning = false;
if (state.getIsCancelled()) {
state.setIsRunning(false);
return;
}

Expand All @@ -125,13 +97,13 @@ export const asyncSearchServiceProvider = (
params,
histogramRangeSteps
);
progress.loadedOverallHistogram = 1;
overallHistogram = overallLogHistogramChartData;
state.setProgress({ loadedOverallHistogram: 1 });
state.setOverallHistogram(overallLogHistogramChartData);

logMessage(`Loaded overall histogram chart data.`);
addLogMessage(`Loaded overall histogram chart data.`);

if (isCancelled) {
isRunning = false;
if (state.getIsCancelled()) {
state.setIsRunning(false);
return;
}

Expand All @@ -142,10 +114,10 @@ export const asyncSearchServiceProvider = (
} = await fetchTransactionDurationPercentiles(esClient, params, percents);
const percentiles = Object.values(percentilesRecords);

logMessage(`Loaded percentiles.`);
addLogMessage(`Loaded percentiles.`);

if (isCancelled) {
isRunning = false;
if (state.getIsCancelled()) {
state.setIsRunning(false);
return;
}

Expand All @@ -154,21 +126,22 @@ export const asyncSearchServiceProvider = (
params
);

logMessage(`Identified ${fieldCandidates.length} fieldCandidates.`);
addLogMessage(`Identified ${fieldCandidates.length} fieldCandidates.`);

progress.loadedFieldCanditates = 1;
state.setProgress({ loadedFieldCanditates: 1 });

const fieldValuePairs = await fetchTransactionDurationFieldValuePairs(
esClient,
params,
fieldCandidates,
progress
state,
addLogMessage
);

logMessage(`Identified ${fieldValuePairs.length} fieldValuePairs.`);
addLogMessage(`Identified ${fieldValuePairs.length} fieldValuePairs.`);

if (isCancelled) {
isRunning = false;
if (state.getIsCancelled()) {
state.setIsRunning(false);
return;
}

Expand All @@ -181,114 +154,75 @@ export const asyncSearchServiceProvider = (
totalDocCount,
} = await fetchTransactionDurationFractions(esClient, params, ranges);

logMessage(`Loaded fractions and totalDocCount of ${totalDocCount}.`);

async function* fetchTransactionDurationHistograms() {
for (const item of shuffle(fieldValuePairs)) {
if (params === undefined || item === undefined || isCancelled) {
isRunning = false;
return;
}

// If one of the fields have an error
// We don't want to stop the whole process
try {
const {
correlation,
ksTest,
} = await fetchTransactionDurationCorrelation(
esClient,
params,
expectations,
ranges,
fractions,
totalDocCount,
item.field,
item.value
);

if (isCancelled) {
isRunning = false;
return;
}

if (
correlation !== null &&
correlation > CORRELATION_THRESHOLD &&
ksTest !== null &&
ksTest < KS_TEST_THRESHOLD
) {
const logHistogram = await fetchTransactionDurationRanges(
esClient,
params,
histogramRangeSteps,
item.field,
item.value
);
yield {
...item,
correlation,
ksTest,
histogram: logHistogram,
};
} else {
yield undefined;
}
} catch (e) {
// don't fail the whole process for individual correlation queries,
// just add the error to the internal log and check if we'd want to set the
// cross-cluster search compatibility warning to true.
logMessage(
`Failed to fetch correlation/kstest for '${item.field}/${item.value}'`
);
if (params?.index.includes(':')) {
ccsWarning = true;
}
yield undefined;
}
}
}
addLogMessage(`Loaded fractions and totalDocCount of ${totalDocCount}.`);

let loadedHistograms = 0;
for await (const item of fetchTransactionDurationHistograms()) {
for await (const item of fetchTransactionDurationHistograms(
esClient,
addLogMessage,
params,
state,
expectations,
ranges,
fractions,
histogramRangeSteps,
totalDocCount,
fieldValuePairs
)) {
if (item !== undefined) {
values.push(item);
state.addValue(item);
}
loadedHistograms++;
progress.loadedHistograms = loadedHistograms / fieldValuePairs.length;
state.setProgress({
loadedHistograms: loadedHistograms / fieldValuePairs.length,
});
}

logMessage(
`Identified ${values.length} significant correlations out of ${fieldValuePairs.length} field/value pairs.`
addLogMessage(
`Identified ${
state.getState().values.length
} significant correlations out of ${
fieldValuePairs.length
} field/value pairs.`
);
} catch (e) {
error = e;
state.setError(e);
}

if (error !== undefined && params?.index.includes(':')) {
ccsWarning = true;
if (state.getState().error !== undefined && params?.index.includes(':')) {
state.setCcsWarning(true);
}

isRunning = false;
};
state.setIsRunning(false);
}

fetchCorrelations();

return () => {
const sortedValues = values.sort((a, b) => b.correlation - a.correlation);
const {
ccsWarning,
error,
isRunning,
overallHistogram,
percentileThresholdValue,
progress,
} = state.getState();

return {
ccsWarning,
error,
log,
log: getLogMessages(),
isRunning,
loaded: Math.round(progress.getOverallProgress() * 100),
loaded: Math.round(state.getOverallProgress() * 100),
overallHistogram,
started: progress.started,
total: 100,
values: sortedValues,
values: state.getValuesSortedByCorrelation(),
percentileThresholdValue,
cancel,
cancel: () => {
addLogMessage(`Service cancelled.`);
state.setIsCancelled(true);
},
};
};
};
Loading