Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import {
FilterCondition,
isAlwaysCondition,
Condition,
isFilterCondition,
isAndCondition,
Expand Down Expand Up @@ -62,6 +63,9 @@ export function conditionToQueryDsl(condition: Condition): any {
},
};
}
if (isAlwaysCondition(condition)) {
return { match_all: {} };
}
return {
match_none: {},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,7 @@ export type FlattenRecord = Record<PropertyKey, Primitive | Primitive[]>;
export const flattenRecord: z.ZodType<FlattenRecord> = z.record(
z.union([primitive, z.array(primitive)])
);

export const sampleDocument = recursiveRecord;

export type SampleDocument = RecursiveRecord;
3 changes: 2 additions & 1 deletion x-pack/solutions/observability/plugins/streams/kibana.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"usageCollection",
"licensing",
"taskManager",
"alerting"
"alerting",
"inference",
],
"optionalPlugins": [
"cloud",
Expand Down
12 changes: 4 additions & 8 deletions x-pack/solutions/observability/plugins/streams/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,18 @@ export class StreamsPlugin
}: {
request: KibanaRequest;
}): Promise<RouteHandlerScopedClients> => {
const [coreStart, assetClient] = await Promise.all([
core.getStartServices().then(([_coreStart]) => _coreStart),
const [[coreStart, pluginsStart], assetClient] = await Promise.all([
core.getStartServices(),
assetService.getClientWithRequest({ request }),
]);

const streamsClient = await streamsService.getClientWithRequest({ request, assetClient });

const scopedClusterClient = coreStart.elasticsearch.client.asScoped(request);
const soClient = coreStart.savedObjects.getScopedClient(request);
const inferenceClient = pluginsStart.inference.getClient({ request });

return {
scopedClusterClient,
soClient,
assetClient,
streamsClient,
};
return { scopedClusterClient, soClient, assetClient, streamsClient, inferenceClient };
},
},
core,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import {
RecursiveRecord,
SampleDocument,
conditionSchema,
conditionToQueryDsl,
getFields,
Expand Down Expand Up @@ -165,7 +165,7 @@ export const sampleStreamRoute = createServerRoute({
...searchBody,
});

return { documents: results.hits.hits.map((hit) => hit._source) as RecursiveRecord[] };
return { documents: results.hits.hits.map((hit) => hit._source) as SampleDocument[] };
},
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import {
FlattenRecord,
flattenRecord,
namedFieldDefinitionConfigSchema,
processorWithIdDefinitionSchema,
Expand All @@ -15,6 +16,7 @@ import { checkAccess } from '../../../lib/streams/stream_crud';
import { createServerRoute } from '../../create_server_route';
import { DefinitionNotFoundError } from '../../../lib/streams/errors/definition_not_found_error';
import { ProcessingSimulationParams, simulateProcessing } from './simulation_handler';
import { handleProcessingSuggestion } from './suggestions_handler';

const paramsSchema = z.object({
path: z.object({ name: z.string() }),
Expand Down Expand Up @@ -50,6 +52,51 @@ export const simulateProcessorRoute = createServerRoute({
},
});

export interface ProcessingSuggestionBody {
field: string;
connectorId: string;
samples: FlattenRecord[];
}

const processingSuggestionSchema = z.object({
field: z.string(),
connectorId: z.string(),
samples: z.array(flattenRecord),
});

const suggestionsParamsSchema = z.object({
path: z.object({ name: z.string() }),
body: processingSuggestionSchema,
});

export const processingSuggestionRoute = createServerRoute({
endpoint: 'POST /api/streams/{name}/processing/_suggestions',
options: {
access: 'internal',
},
security: {
authz: {
enabled: false,
reason:
'This API delegates security to the currently logged in user and their Elasticsearch permissions.',
},
},
params: suggestionsParamsSchema,
handler: async ({ params, request, logger, getScopedClients }) => {
const { inferenceClient, scopedClusterClient, streamsClient } = await getScopedClients({
request,
});
return handleProcessingSuggestion(
params.path.name,
params.body,
inferenceClient,
scopedClusterClient,
streamsClient
);
},
});

export const processingRoutes = {
...simulateProcessorRoute,
...processingSuggestionRoute,
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { IScopedClusterClient } from '@kbn/core/server';
import { get, groupBy, mapValues, orderBy, shuffle, uniq, uniqBy } from 'lodash';
import { InferenceClient } from '@kbn/inference-plugin/server';
import { FlattenRecord } from '@kbn/streams-schema';
import { StreamsClient } from '../../../lib/streams/client';
import { simulateProcessing } from './simulation_handler';
import { ProcessingSuggestionBody } from './route';

export const handleProcessingSuggestion = async (
name: string,
body: ProcessingSuggestionBody,
inferenceClient: InferenceClient,
scopedClusterClient: IScopedClusterClient,
streamsClient: StreamsClient
) => {
const { field, samples } = body;
// Turn sample messages into patterns to group by
const evalPattern = (sample: string) => {
return sample
.replace(/[ \t\n]+/g, ' ')
.replace(/[A-Za-z]+/g, 'a')
.replace(/[0-9]+/g, '0')
.replace(/(a a)+/g, 'a')
.replace(/(a0)+/g, 'f')
.replace(/(f:)+/g, 'f:')
.replace(/0(.0)+/g, 'p');
};

const NUMBER_PATTERN_CATEGORIES = 5;
const NUMBER_SAMPLES_PER_PATTERN = 8;

const samplesWithPatterns = samples.map((sample) => {
const pattern = evalPattern(get(sample, field) as string);
return {
document: sample,
fullPattern: pattern,
truncatedPattern: pattern.slice(0, 10),
fieldValue: get(sample, field) as string,
};
});

// Group samples by their truncated patterns
const groupedByTruncatedPattern = groupBy(samplesWithPatterns, 'truncatedPattern');
// Process each group to create pattern summaries
const patternSummaries = mapValues(
groupedByTruncatedPattern,
(samplesForTruncatedPattern, truncatedPattern) => {
const uniqueValues = uniq(samplesForTruncatedPattern.map(({ fieldValue }) => fieldValue));
const shuffledExamples = shuffle(uniqueValues);

return {
truncatedPattern,
count: samplesForTruncatedPattern.length,
exampleValues: shuffledExamples.slice(0, NUMBER_SAMPLES_PER_PATTERN),
};
}
);
// Convert to array, sort by count, and take top patterns
const patternsToProcess = orderBy(Object.values(patternSummaries), 'count', 'desc').slice(
0,
NUMBER_PATTERN_CATEGORIES
);

const results = await Promise.all(
patternsToProcess.map((sample) =>
processPattern(
sample,
name,
body,
inferenceClient,
scopedClusterClient,
streamsClient,
field,
samples
)
)
);

const deduplicatedSimulations = uniqBy(
results.flatMap((result) => result.simulations),
(simulation) => simulation!.pattern
);

return {
patterns: deduplicatedSimulations.map((simulation) => simulation!.pattern),
simulations: deduplicatedSimulations as SimulationWithPattern[],
};
};

type SimulationWithPattern = ReturnType<typeof simulateProcessing> & { pattern: string };

async function processPattern(
sample: { truncatedPattern: string; count: number; exampleValues: string[] },
name: string,
body: ProcessingSuggestionBody,
inferenceClient: InferenceClient,
scopedClusterClient: IScopedClusterClient,
streamsClient: StreamsClient,
field: string,
samples: FlattenRecord[]
) {
const chatResponse = await inferenceClient.output({
id: 'get_pattern_suggestions',
connectorId: body.connectorId,
// necessary due to a bug in the inference client - TODO remove when fixed
functionCalling: 'native',
system: `Instructions:
- You are an assistant for observability tasks with a strong knowledge of logs and log parsing.
- Use JSON format.
- For a single log source identified, provide the following information:
* Use 'source_name' as the key for the log source name.
* Use 'parsing_rule' as the key for the parsing rule.
- Use only Grok patterns for the parsing rule.
* Use %{{pattern:name:type}} syntax for Grok patterns when possible.
* Combine date and time into a single @timestamp field when it's possible.
- Use ECS (Elastic Common Schema) fields whenever possible.
- You are correct, factual, precise, and reliable.
`,
schema: {
type: 'object',
required: ['rules'],
properties: {
rules: {
type: 'array',
items: {
type: 'object',
required: ['parsing_rule'],
properties: {
source_name: {
type: 'string',
},
parsing_rule: {
type: 'string',
},
},
},
},
},
} as const,
input: `Logs:
${sample.exampleValues.join('\n')}
Given the raw messages coming from one data source, help us do the following:
1. Name the log source based on logs format.
2. Write a parsing rule for Elastic ingest pipeline to extract structured fields from the raw message.
Make sure that the parsing rule is unique per log source. When in doubt, suggest multiple patterns, one generic one matching the general case and more specific ones.
`,
});

const patterns = (
chatResponse.output.rules?.map((rule) => rule.parsing_rule).filter(Boolean) as string[]
).map(sanitizePattern);

const simulations = (
await Promise.all(
patterns.map(async (pattern) => {
// Validate match on current sample
const simulationResult = await simulateProcessing({
params: {
path: { name },
body: {
processing: [
{
id: 'grok-processor',
grok: {
field,
if: { always: {} },
patterns: [pattern],
},
},
],
documents: samples,
},
},
scopedClusterClient,
streamsClient,
});

if (simulationResult.is_non_additive_simulation) {
return null;
}

if (simulationResult.success_rate === 0) {
return null;
}

// TODO if success rate is zero, try to strip out the date part and try again

return {
...simulationResult,
pattern,
};
})
)
).filter(Boolean) as Array<SimulationWithPattern | null>;

return {
chatResponse,
simulations,
};
}

/**
* We need to keep parsing additive, but overwriting timestamp or message is super common.
* This is a workaround for now until we found the proper solution for deal with this kind of cases.
*/
function sanitizePattern(pattern: string): string {
return pattern
.replace(/%\{([^}]+):message\}/g, '%{$1:message_derived}')
.replace(/%\{([^}]+):@timestamp\}/g, '%{$1:@timestamp_derived}');
}
Loading