diff --git a/x-pack/plugins/integration_assistant/__jest__/fixtures/ecs_mapping.ts b/x-pack/plugins/integration_assistant/__jest__/fixtures/ecs_mapping.ts index 112f9f2348dec..e0c2054a49e84 100644 --- a/x-pack/plugins/integration_assistant/__jest__/fixtures/ecs_mapping.ts +++ b/x-pack/plugins/integration_assistant/__jest__/fixtures/ecs_mapping.ts @@ -4,6 +4,7 @@ * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ + export const ecsMappingExpectedResults = { mapping: { mysql_enterprise: { @@ -441,18 +442,21 @@ export const ecsTestState = { ecs: 'teststring', exAnswer: 'testanswer', finalized: false, + chunkSize: 30, currentPipeline: { test: 'testpipeline' }, duplicateFields: [], missingKeys: [], invalidEcsFields: [], + finalMapping: { test: 'testmapping' }, + sampleChunks: [''], results: { test: 'testresults' }, samplesFormat: 'testsamplesFormat', ecsVersion: 'testversion', currentMapping: { test1: 'test1' }, lastExecutedChain: 'testchain', rawSamples: ['{"test1": "test1"}'], - samples: ['{ "test1": "test1" }'], + prefixedSamples: ['{ "test1": "test1" }'], packageName: 'testpackage', dataStreamName: 'testDataStream', - formattedSamples: '{"test1": "test1"}', + combinedSamples: '{"test1": "test1"}', }; diff --git a/x-pack/plugins/integration_assistant/server/graphs/categorization/graph.ts b/x-pack/plugins/integration_assistant/server/graphs/categorization/graph.ts index fae09ecc32d87..ff170a23fdf7a 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/categorization/graph.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/categorization/graph.ts @@ -13,7 +13,7 @@ import type { ActionsClientSimpleChatModel, } from '@kbn/langchain/server/language_models'; import type { CategorizationState } from '../../types'; -import { modifySamples, formatSamples } from '../../util/samples'; +import { prefixSamples, formatSamples } from '../../util/samples'; import { handleCategorization } from './categorization'; import { handleValidatePipeline } from '../../util/graph'; import { handleCategorizationValidation } from './validate'; @@ -106,7 +106,7 @@ const graphState: StateGraphArgs['channels'] = { }; function modelInput(state: CategorizationState): Partial { - const samples = modifySamples(state); + const samples = prefixSamples(state); const formattedSamples = formatSamples(samples); const initialPipeline = JSON.parse(JSON.stringify(state.currentPipeline)); return { diff --git a/x-pack/plugins/integration_assistant/server/graphs/ecs/chunk.test.ts b/x-pack/plugins/integration_assistant/server/graphs/ecs/chunk.test.ts new file mode 100644 index 0000000000000..51aab586253a6 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/ecs/chunk.test.ts @@ -0,0 +1,17 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { mergeAndChunkSamples } from './chunk'; + +describe('test chunks', () => { + it('mergeAndChunkSamples()', async () => { + const objects = ['{"a": 1, "b": 2, "c": {"d": 3}}', '{"a": 2, "b": 3, "e": 4}']; + const chunkSize = 2; + const result = mergeAndChunkSamples(objects, chunkSize); + expect(result).toEqual(['{"a":1,"b":2}', '{"c":{"d":3},"e":4}']); + }); +}); diff --git a/x-pack/plugins/integration_assistant/server/graphs/ecs/chunk.ts b/x-pack/plugins/integration_assistant/server/graphs/ecs/chunk.ts new file mode 100644 index 0000000000000..62448776bdf8c --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/ecs/chunk.ts @@ -0,0 +1,83 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +/* eslint-disable @typescript-eslint/no-explicit-any */ + +import { merge } from '../../util/samples'; + +interface NestedObject { + [key: string]: any; +} + +// Takes an array of JSON strings and merges them into a single object. +// The resulting object will be a combined object that includes all unique fields from the input samples. +// While merging the samples, the function will prioritize non-empty values over empty values. +// The function then splits the combined object into chunks of a given size, to be used in the ECS mapping subgraph. +export function mergeAndChunkSamples(objects: string[], chunkSize: number): string[] { + let result: NestedObject = {}; + + for (const obj of objects) { + const sample: NestedObject = JSON.parse(obj); + result = merge(result, sample); + } + + const chunks = generateChunks(result, chunkSize); + + // Each chunk is used for the combinedSamples state when passed to the subgraph, which should be a nicely formatted string + return chunks.map((chunk) => JSON.stringify(chunk)); +} + +// This function takes the already merged array of samples, and splits it up into chunks of a given size. +// Size is determined by the count of fields with an actual value (not nested objects etc). +// This is to be able to run the ECS mapping sub graph concurrently with a larger number of total unique fields without getting confused. +function generateChunks(mergedSamples: NestedObject, chunkSize: number): NestedObject[] { + const chunks: NestedObject[] = []; + let currentChunk: NestedObject = {}; + let currentSize = 0; + + function traverse(current: NestedObject, path: string[] = []) { + for (const [key, value] of Object.entries(current)) { + const newPath = [...path, key]; + + // If the value is a nested object, recurse into it + if (typeof value === 'object' && value !== null && !Array.isArray(value)) { + traverse(value, newPath); + } else { + // For non-object values, add them to the current chunk + let target = currentChunk; + + // Recreate the nested structure in the current chunk + for (let i = 0; i < newPath.length - 1; i++) { + if (!(newPath[i] in target)) { + target[newPath[i]] = {}; + } + target = target[newPath[i]]; + } + + // Add the value to the deepest level of the structure + target[newPath[newPath.length - 1]] = value; + currentSize++; + + // If the chunk is full, add it to the chunks and start a new chunk + if (currentSize === chunkSize) { + chunks.push(currentChunk); + currentChunk = {}; + currentSize = 0; + } + } + } + } + + // Start the traversal from the root object + traverse(mergedSamples); + + // Add any remaining items in the last chunk + if (currentSize > 0) { + chunks.push(currentChunk); + } + + return chunks; +} diff --git a/x-pack/plugins/integration_assistant/server/graphs/ecs/duplicates.ts b/x-pack/plugins/integration_assistant/server/graphs/ecs/duplicates.ts index fd11a660e75ab..a9508901860db 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/ecs/duplicates.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/ecs/duplicates.ts @@ -16,9 +16,8 @@ export async function handleDuplicates( state: EcsMappingState, model: ActionsClientChatOpenAI | ActionsClientSimpleChatModel ) { - const ecsDuplicatesPrompt = ECS_DUPLICATES_PROMPT; const outputParser = new JsonOutputParser(); - const ecsDuplicatesGraph = ecsDuplicatesPrompt.pipe(model).pipe(outputParser); + const ecsDuplicatesGraph = ECS_DUPLICATES_PROMPT.pipe(model).pipe(outputParser); const currentMapping = await ecsDuplicatesGraph.invoke({ ecs: state.ecs, diff --git a/x-pack/plugins/integration_assistant/server/graphs/ecs/graph.test.ts b/x-pack/plugins/integration_assistant/server/graphs/ecs/graph.test.ts index 0ae626924c349..62e51e7d68c71 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/ecs/graph.test.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/ecs/graph.test.ts @@ -77,7 +77,6 @@ describe('EcsGraph', () => { it('Runs the whole graph, with mocked outputs from the LLM.', async () => { // The mocked outputs are specifically crafted to trigger ALL different conditions, allowing us to test the whole graph. // This is why we have all the expects ensuring each function was called. - const ecsGraph = await getEcsGraph(mockLlm); const response = await ecsGraph.invoke(mockedRequest); expect(response.results).toStrictEqual(ecsMappingExpectedResults); diff --git a/x-pack/plugins/integration_assistant/server/graphs/ecs/graph.ts b/x-pack/plugins/integration_assistant/server/graphs/ecs/graph.ts index 1d02f3c8970d8..4b1e4c4c37791 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/ecs/graph.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/ecs/graph.ts @@ -9,120 +9,32 @@ import type { ActionsClientChatOpenAI, ActionsClientSimpleChatModel, } from '@kbn/langchain/server/language_models'; -import type { StateGraphArgs } from '@langchain/langgraph'; -import { END, START, StateGraph } from '@langchain/langgraph'; +import { END, START, StateGraph, Send } from '@langchain/langgraph'; import type { EcsMappingState } from '../../types'; -import { mergeSamples, modifySamples } from '../../util/samples'; -import { ECS_EXAMPLE_ANSWER, ECS_FIELDS } from './constants'; +import { modelInput, modelOutput, modelSubOutput } from './model'; import { handleDuplicates } from './duplicates'; import { handleInvalidEcs } from './invalid'; import { handleEcsMapping } from './mapping'; import { handleMissingKeys } from './missing'; -import { createPipeline } from './pipeline'; import { handleValidateMappings } from './validate'; +import { graphState } from './state'; -const graphState: StateGraphArgs['channels'] = { - ecs: { - value: (x: string, y?: string) => y ?? x, - default: () => '', - }, - lastExecutedChain: { - value: (x: string, y?: string) => y ?? x, - default: () => '', - }, - rawSamples: { - value: (x: string[], y?: string[]) => y ?? x, - default: () => [], - }, - samples: { - value: (x: string[], y?: string[]) => y ?? x, - default: () => [], - }, - formattedSamples: { - value: (x: string, y?: string) => y ?? x, - default: () => '', - }, - exAnswer: { - value: (x: string, y?: string) => y ?? x, - default: () => '', - }, - packageName: { - value: (x: string, y?: string) => y ?? x, - default: () => '', - }, - dataStreamName: { - value: (x: string, y?: string) => y ?? x, - default: () => '', - }, - finalized: { - value: (x: boolean, y?: boolean) => y ?? x, - default: () => false, - }, - currentMapping: { - value: (x: object, y?: object) => y ?? x, - default: () => ({}), - }, - currentPipeline: { - value: (x: object, y?: object) => y ?? x, - default: () => ({}), - }, - duplicateFields: { - value: (x: string[], y?: string[]) => y ?? x, - default: () => [], - }, - missingKeys: { - value: (x: string[], y?: string[]) => y ?? x, - default: () => [], - }, - invalidEcsFields: { - value: (x: string[], y?: string[]) => y ?? x, - default: () => [], - }, - results: { - value: (x: object, y?: object) => y ?? x, - default: () => ({}), - }, - samplesFormat: { - value: (x: string, y?: string) => y ?? x, - default: () => 'json', - }, - ecsVersion: { - value: (x: string, y?: string) => y ?? x, - default: () => '8.11.0', - }, -}; - -function modelInput(state: EcsMappingState): Partial { - const samples = modifySamples(state); - const formattedSamples = mergeSamples(samples); - return { - exAnswer: JSON.stringify(ECS_EXAMPLE_ANSWER, null, 2), - ecs: JSON.stringify(ECS_FIELDS, null, 2), - samples, - finalized: false, - formattedSamples, - lastExecutedChain: 'modelInput', - }; -} - -function modelOutput(state: EcsMappingState): Partial { - const currentPipeline = createPipeline(state); - return { - finalized: true, - lastExecutedChain: 'modelOutput', - results: { - mapping: state.currentMapping, - pipeline: currentPipeline, - }, +const handleCreateMappingChunks = async (state: EcsMappingState) => { + // Cherrypick a shallow copy of state to pass to subgraph + const stateParams = { + exAnswer: state.exAnswer, + prefixedSamples: state.prefixedSamples, + ecs: state.ecs, + dataStreamName: state.dataStreamName, + packageName: state.packageName, }; -} - -function inputRouter(state: EcsMappingState): string { if (Object.keys(state.currentMapping).length === 0) { - return 'ecsMapping'; + return state.sampleChunks.map((chunk) => { + return new Send('subGraph', { ...stateParams, combinedSamples: chunk }); + }); } return 'modelOutput'; -} +}; function chainRouter(state: EcsMappingState): string { if (Object.keys(state.duplicateFields).length > 0) { @@ -135,38 +47,52 @@ function chainRouter(state: EcsMappingState): string { return 'invalidEcsFields'; } if (!state.finalized) { - return 'modelOutput'; + return 'modelSubOutput'; } return END; } -export async function getEcsGraph(model: ActionsClientChatOpenAI | ActionsClientSimpleChatModel) { +// This is added as a separate graph to be able to run these steps concurrently from handleCreateMappingChunks +async function getEcsSubGraph(model: ActionsClientChatOpenAI | ActionsClientSimpleChatModel) { const workflow = new StateGraph({ channels: graphState, }) - .addNode('modelInput', modelInput) - .addNode('modelOutput', modelOutput) - .addNode('handleEcsMapping', (state: EcsMappingState) => handleEcsMapping(state, model)) + .addNode('modelSubOutput', modelSubOutput) .addNode('handleValidation', handleValidateMappings) + .addNode('handleEcsMapping', (state: EcsMappingState) => handleEcsMapping(state, model)) .addNode('handleDuplicates', (state: EcsMappingState) => handleDuplicates(state, model)) .addNode('handleMissingKeys', (state: EcsMappingState) => handleMissingKeys(state, model)) .addNode('handleInvalidEcs', (state: EcsMappingState) => handleInvalidEcs(state, model)) - .addEdge(START, 'modelInput') - .addEdge('modelOutput', END) + .addEdge(START, 'handleEcsMapping') .addEdge('handleEcsMapping', 'handleValidation') .addEdge('handleDuplicates', 'handleValidation') .addEdge('handleMissingKeys', 'handleValidation') .addEdge('handleInvalidEcs', 'handleValidation') - .addConditionalEdges('modelInput', inputRouter, { - ecsMapping: 'handleEcsMapping', - modelOutput: 'modelOutput', - }) .addConditionalEdges('handleValidation', chainRouter, { duplicateFields: 'handleDuplicates', missingKeys: 'handleMissingKeys', invalidEcsFields: 'handleInvalidEcs', - modelOutput: 'modelOutput', - }); + modelSubOutput: 'modelSubOutput', + }) + .addEdge('modelSubOutput', END); + + const compiledEcsSubGraph = workflow.compile(); + + return compiledEcsSubGraph; +} + +export async function getEcsGraph(model: ActionsClientChatOpenAI | ActionsClientSimpleChatModel) { + const subGraph = await getEcsSubGraph(model); + const workflow = new StateGraph({ + channels: graphState, + }) + .addNode('modelInput', modelInput) + .addNode('modelOutput', modelOutput) + .addNode('subGraph', subGraph) + .addEdge(START, 'modelInput') + .addEdge('subGraph', 'modelOutput') + .addConditionalEdges('modelInput', handleCreateMappingChunks) + .addEdge('modelOutput', END); const compiledEcsGraph = workflow.compile(); diff --git a/x-pack/plugins/integration_assistant/server/graphs/ecs/invalid.ts b/x-pack/plugins/integration_assistant/server/graphs/ecs/invalid.ts index dcbba0ebe9d13..8e2d1baf4c423 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/ecs/invalid.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/ecs/invalid.ts @@ -16,15 +16,14 @@ export async function handleInvalidEcs( state: EcsMappingState, model: ActionsClientChatOpenAI | ActionsClientSimpleChatModel ) { - const ecsInvalidEcsPrompt = ECS_INVALID_PROMPT; const outputParser = new JsonOutputParser(); - const ecsInvalidEcsGraph = ecsInvalidEcsPrompt.pipe(model).pipe(outputParser); + const ecsInvalidEcsGraph = ECS_INVALID_PROMPT.pipe(model).pipe(outputParser); const currentMapping = await ecsInvalidEcsGraph.invoke({ ecs: state.ecs, current_mapping: JSON.stringify(state.currentMapping, null, 2), ex_answer: state.exAnswer, - formatted_samples: state.formattedSamples, + combined_samples: state.combinedSamples, invalid_ecs_fields: state.invalidEcsFields, }); diff --git a/x-pack/plugins/integration_assistant/server/graphs/ecs/mapping.ts b/x-pack/plugins/integration_assistant/server/graphs/ecs/mapping.ts index 7ecb108659f45..30c51dcc01dd9 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/ecs/mapping.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/ecs/mapping.ts @@ -16,17 +16,15 @@ export async function handleEcsMapping( state: EcsMappingState, model: ActionsClientChatOpenAI | ActionsClientSimpleChatModel ) { - const ecsMainPrompt = ECS_MAIN_PROMPT; const outputParser = new JsonOutputParser(); - const ecsMainGraph = ecsMainPrompt.pipe(model).pipe(outputParser); + const ecsMainGraph = ECS_MAIN_PROMPT.pipe(model).pipe(outputParser); const currentMapping = await ecsMainGraph.invoke({ ecs: state.ecs, - formatted_samples: state.formattedSamples, + combined_samples: state.combinedSamples, package_name: state.packageName, data_stream_name: state.dataStreamName, ex_answer: state.exAnswer, }); - return { currentMapping, lastExecutedChain: 'ecsMapping' }; } diff --git a/x-pack/plugins/integration_assistant/server/graphs/ecs/missing.ts b/x-pack/plugins/integration_assistant/server/graphs/ecs/missing.ts index d7f1f65b2b4ea..0a23b35bd3b72 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/ecs/missing.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/ecs/missing.ts @@ -16,15 +16,14 @@ export async function handleMissingKeys( state: EcsMappingState, model: ActionsClientChatOpenAI | ActionsClientSimpleChatModel ) { - const ecsMissingPrompt = ECS_MISSING_KEYS_PROMPT; const outputParser = new JsonOutputParser(); - const ecsMissingGraph = ecsMissingPrompt.pipe(model).pipe(outputParser); + const ecsMissingGraph = ECS_MISSING_KEYS_PROMPT.pipe(model).pipe(outputParser); const currentMapping = await ecsMissingGraph.invoke({ ecs: state.ecs, current_mapping: JSON.stringify(state.currentMapping, null, 2), ex_answer: state.exAnswer, - formatted_samples: state.formattedSamples, + combined_samples: state.combinedSamples, missing_keys: state?.missingKeys, }); diff --git a/x-pack/plugins/integration_assistant/server/graphs/ecs/model.ts b/x-pack/plugins/integration_assistant/server/graphs/ecs/model.ts new file mode 100644 index 0000000000000..9bc2909ab7942 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/ecs/model.ts @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { prefixSamples } from '../../util/samples'; +import { ECS_EXAMPLE_ANSWER, ECS_FIELDS } from './constants'; +import { createPipeline } from './pipeline'; +import { mergeAndChunkSamples } from './chunk'; +import type { EcsMappingState } from '../../types'; + +export function modelSubOutput(state: EcsMappingState): Partial { + return { + lastExecutedChain: 'ModelSubOutput', + finalMapping: state.currentMapping, + }; +} + +export function modelInput(state: EcsMappingState): Partial { + const prefixedSamples = prefixSamples(state); + const sampleChunks = mergeAndChunkSamples(prefixedSamples, state.chunkSize); + return { + exAnswer: JSON.stringify(ECS_EXAMPLE_ANSWER, null, 2), + ecs: JSON.stringify(ECS_FIELDS, null, 2), + prefixedSamples, + sampleChunks, + finalized: false, + lastExecutedChain: 'modelInput', + }; +} + +export function modelOutput(state: EcsMappingState): Partial { + const currentPipeline = createPipeline(state); + return { + finalized: true, + lastExecutedChain: 'modelOutput', + results: { + mapping: state.finalMapping, + pipeline: currentPipeline, + }, + }; +} diff --git a/x-pack/plugins/integration_assistant/server/graphs/ecs/pipeline.ts b/x-pack/plugins/integration_assistant/server/graphs/ecs/pipeline.ts index 0dc7e772a94cf..35c1fb4d31f42 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/ecs/pipeline.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/ecs/pipeline.ts @@ -161,7 +161,7 @@ function generateProcessors(ecsMapping: object, samples: object, basePath: strin } export function createPipeline(state: EcsMappingState): IngestPipeline { - const samples = JSON.parse(state.formattedSamples); + const samples = JSON.parse(state.combinedSamples); const processors = generateProcessors(state.currentMapping, samples); // Retrieve all source field names from convert processors to populate single remove processor: diff --git a/x-pack/plugins/integration_assistant/server/graphs/ecs/prompts.ts b/x-pack/plugins/integration_assistant/server/graphs/ecs/prompts.ts index f336b2cde4b48..4e5e4794d5b8f 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/ecs/prompts.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/ecs/prompts.ts @@ -15,15 +15,16 @@ Here is some context for you to reference for your task, read it carefully as yo {ecs} - -{formatted_samples} - `, ], [ 'human', `Looking at the combined sample from {package_name} {data_stream_name} provided above. The combined sample is a JSON object that includes all unique fields from the log samples sent by {package_name} {data_stream_name}. + +{combined_samples} + + Go through each value step by step and modify it with the following process: 1. Check if the name of each key and its current value matches the description and usecase of any of the above ECS fields. 2. If one or more relevant ECS field is found, pick the one you are most confident about. @@ -70,9 +71,9 @@ Here is some context for you to reference your task, read it carefully as you wi {ecs} - -{formatted_samples} - + +{combined_samples} + {current_mapping} @@ -84,6 +85,7 @@ Here is some context for you to reference your task, read it carefully as you wi {invalid_ecs_fields} + To resolve the invalid ecs fields, go through each key and value defined in the invalid fields, and modify the current mapping step by step, and ensure they follow these guidelines: - Update the provided current mapping object, the value should be the corresponding Elastic Common Schema field name. If no good or valid match is found the value should always be null. @@ -111,9 +113,9 @@ Here is some context for you to reference for your task, read it carefully as yo {ecs} - -{formatted_samples} - + +{combined_samples} + {current_mapping} @@ -126,7 +128,7 @@ Here is some context for you to reference for your task, read it carefully as yo {missing_keys} -Help resolve the issue by adding the missing keys, look up example values from the formatted samples, and go through each missing key step by step, resolve it by following these guidelines: +Help resolve the issue by adding the missing keys, look up example values from the combined samples, and go through each missing key step by step, resolve it by following these guidelines: - Update the provided current mapping object with all the missing keys, the value should be the corresponding Elastic Common Schema field name. If no good match is found the value should always be null. - Do not respond with anything except the updated current mapping JSON object enclosed with 3 backticks (\`). See example response below. diff --git a/x-pack/plugins/integration_assistant/server/graphs/ecs/state.ts b/x-pack/plugins/integration_assistant/server/graphs/ecs/state.ts new file mode 100644 index 0000000000000..35a307f1de934 --- /dev/null +++ b/x-pack/plugins/integration_assistant/server/graphs/ecs/state.ts @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { StateGraphArgs } from '@langchain/langgraph'; +import type { EcsMappingState } from '../../types'; +import { merge } from '../../util/samples'; + +export const graphState: StateGraphArgs['channels'] = { + ecs: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, + chunkSize: { + value: (x: number, y?: number) => y ?? x, + default: () => 10, + }, + lastExecutedChain: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, + rawSamples: { + value: (x: string[], y?: string[]) => y ?? x, + default: () => [], + }, + prefixedSamples: { + value: (x: string[], y?: string[]) => y ?? x, + default: () => [], + }, + combinedSamples: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, + sampleChunks: { + value: (x: string[], y?: string[]) => y ?? x, + default: () => [], + }, + exAnswer: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, + packageName: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, + dataStreamName: { + value: (x: string, y?: string) => y ?? x, + default: () => '', + }, + finalized: { + value: (x: boolean, y?: boolean) => y ?? x, + default: () => false, + }, + currentMapping: { + value: (x: object, y?: object) => y ?? x, + default: () => ({}), + }, + finalMapping: { + reducer: merge, + default: () => ({}), + }, + currentPipeline: { + value: (x: object, y?: object) => y ?? x, + default: () => ({}), + }, + duplicateFields: { + value: (x: string[], y?: string[]) => y ?? x, + default: () => [], + }, + missingKeys: { + value: (x: string[], y?: string[]) => y ?? x, + default: () => [], + }, + invalidEcsFields: { + value: (x: string[], y?: string[]) => y ?? x, + default: () => [], + }, + results: { + value: (x: object, y?: object) => y ?? x, + default: () => ({}), + }, + samplesFormat: { + value: (x: string, y?: string) => y ?? x, + default: () => 'json', + }, + ecsVersion: { + value: (x: string, y?: string) => y ?? x, + default: () => '8.11.0', + }, +}; diff --git a/x-pack/plugins/integration_assistant/server/graphs/ecs/validate.ts b/x-pack/plugins/integration_assistant/server/graphs/ecs/validate.ts index 34f4f520243c9..fdc86b6854ae9 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/ecs/validate.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/ecs/validate.ts @@ -41,9 +41,9 @@ function extractKeys(data: AnyObject, prefix: string = ''): Set { return keys; } -function findMissingFields(formattedSamples: string, ecsMapping: AnyObject): string[] { - const combinedSamples = JSON.parse(formattedSamples); - const uniqueKeysFromSamples = extractKeys(combinedSamples); +function findMissingFields(combinedSamples: string, ecsMapping: AnyObject): string[] { + const parsedSamples = JSON.parse(combinedSamples); + const uniqueKeysFromSamples = extractKeys(parsedSamples); const ecsResponseKeys = extractKeys(ecsMapping); const missingKeys = [...uniqueKeysFromSamples].filter((key) => !ecsResponseKeys.has(key)); @@ -94,8 +94,8 @@ function getValueFromPath(obj: AnyObject, path: string[]): unknown { return path.reduce((acc, key) => (acc && acc[key] !== undefined ? acc[key] : null), obj); } -function findDuplicateFields(samples: string[], ecsMapping: AnyObject): string[] { - const parsedSamples = samples.map((sample) => JSON.parse(sample)); +function findDuplicateFields(prefixedSamples: string[], ecsMapping: AnyObject): string[] { + const parsedSamples = prefixedSamples.map((sample) => JSON.parse(sample)); const results: string[] = []; const output: Record = {}; @@ -123,18 +123,17 @@ function findDuplicateFields(samples: string[], ecsMapping: AnyObject): string[] } } } - return results; } // Function to find invalid ECS fields -export function findInvalidEcsFields(ecsMapping: AnyObject): string[] { +export function findInvalidEcsFields(currentMapping: AnyObject): string[] { const results: string[] = []; const output: Record = {}; const ecsDict = ECS_FULL; const ecsReserved = ECS_RESERVED; - processMapping([], ecsMapping, output); + processMapping([], currentMapping, output); const filteredOutput = Object.fromEntries( Object.entries(output).filter(([key, _]) => key !== null) ); @@ -150,13 +149,12 @@ export function findInvalidEcsFields(ecsMapping: AnyObject): string[] { results.push(`Reserved ECS field mapping identified for ${ecsValue} : ${field.join(', ')}`); } } - return results; } export function handleValidateMappings(state: EcsMappingState): AnyObject { - const missingKeys = findMissingFields(state?.formattedSamples, state?.currentMapping); - const duplicateFields = findDuplicateFields(state?.samples, state?.currentMapping); + const missingKeys = findMissingFields(state?.combinedSamples, state?.currentMapping); + const duplicateFields = findDuplicateFields(state?.prefixedSamples, state?.currentMapping); const invalidEcsFields = findInvalidEcsFields(state?.currentMapping); return { missingKeys, diff --git a/x-pack/plugins/integration_assistant/server/graphs/related/graph.ts b/x-pack/plugins/integration_assistant/server/graphs/related/graph.ts index bd387b7177f75..22eb69f7d2a2d 100644 --- a/x-pack/plugins/integration_assistant/server/graphs/related/graph.ts +++ b/x-pack/plugins/integration_assistant/server/graphs/related/graph.ts @@ -13,7 +13,7 @@ import type { ActionsClientSimpleChatModel, } from '@kbn/langchain/server/language_models'; import type { RelatedState } from '../../types'; -import { modifySamples, formatSamples } from '../../util/samples'; +import { prefixSamples, formatSamples } from '../../util/samples'; import { handleValidatePipeline } from '../../util/graph'; import { handleRelated } from './related'; import { handleErrors } from './errors'; @@ -92,7 +92,7 @@ const graphState: StateGraphArgs['channels'] = { }; function modelInput(state: RelatedState): Partial { - const samples = modifySamples(state); + const samples = prefixSamples(state); const formattedSamples = formatSamples(samples); const initialPipeline = JSON.parse(JSON.stringify(state.currentPipeline)); return { diff --git a/x-pack/plugins/integration_assistant/server/types.ts b/x-pack/plugins/integration_assistant/server/types.ts index 3bbe25a8fbd0f..82d43b2a19b43 100644 --- a/x-pack/plugins/integration_assistant/server/types.ts +++ b/x-pack/plugins/integration_assistant/server/types.ts @@ -57,15 +57,18 @@ export interface CategorizationState { export interface EcsMappingState { ecs: string; + chunkSize: number; lastExecutedChain: string; rawSamples: string[]; - samples: string[]; - formattedSamples: string; + prefixedSamples: string[]; + combinedSamples: string; + sampleChunks: string[]; exAnswer: string; packageName: string; dataStreamName: string; finalized: boolean; currentMapping: object; + finalMapping: object; currentPipeline: object; duplicateFields: string[]; missingKeys: string[]; diff --git a/x-pack/plugins/integration_assistant/server/util/samples.ts b/x-pack/plugins/integration_assistant/server/util/samples.ts index f6728653e75ca..766856f644a86 100644 --- a/x-pack/plugins/integration_assistant/server/util/samples.ts +++ b/x-pack/plugins/integration_assistant/server/util/samples.ts @@ -24,7 +24,10 @@ interface Field { fields?: Field[]; } -export function modifySamples(state: EcsMappingState | CategorizationState | RelatedState) { +// Given a graph state, it collects the rawSamples (array of JSON strings) and prefixes them with the packageName and dataStreamName, returning an array of prefixed JSON strings. +export function prefixSamples( + state: EcsMappingState | CategorizationState | RelatedState +): string[] { const modifiedSamples: string[] = []; const rawSamples = state.rawSamples; const packageName = state.packageName; @@ -44,55 +47,6 @@ export function modifySamples(state: EcsMappingState | CategorizationState | Rel return modifiedSamples; } -function isEmptyValue(value: unknown): boolean { - return ( - value === null || - value === undefined || - (typeof value === 'object' && !Array.isArray(value) && Object.keys(value).length === 0) || - (Array.isArray(value) && value.length === 0) - ); -} - -function merge(target: Record, source: Record): Record { - for (const [key, sourceValue] of Object.entries(source)) { - if (key !== '__proto__' && key !== 'constructor') { - if (Object.prototype.hasOwnProperty.call(target, key)) { - const targetValue = target[key]; - if (Array.isArray(sourceValue)) { - target[key] = sourceValue; - } else if ( - typeof sourceValue === 'object' && - sourceValue !== null && - typeof targetValue === 'object' && - targetValue !== null && - !Array.isArray(targetValue) - ) { - target[key] = merge(targetValue, sourceValue); - } else if (isEmptyValue(targetValue) && !isEmptyValue(sourceValue)) { - target[key] = sourceValue; - } - } else if (!isEmptyValue(sourceValue)) { - target[key] = sourceValue; - } - } - } - return target; -} - -export function mergeSamples(objects: any[]): string { - let result: Record = {}; - - for (const obj of objects) { - let sample: Record = obj; - if (typeof obj === 'string') { - sample = JSON.parse(obj); - } - result = merge(result, sample); - } - - return JSON.stringify(result, null, 2); -} - export function formatSamples(samples: string[]): string { const formattedSamples: unknown[] = []; @@ -208,3 +162,52 @@ export function generateFields(mergedDocs: string): string { return yaml.safeDump(fieldsStructure, { sortKeys: false }); } + +function isEmptyValue(value: unknown): boolean { + return ( + value === null || + value === undefined || + (typeof value === 'object' && !Array.isArray(value) && Object.keys(value).length === 0) || + (Array.isArray(value) && value.length === 0) + ); +} + +export function merge( + target: Record, + source: Record +): Record { + for (const [key, sourceValue] of Object.entries(source)) { + const targetValue = target[key]; + if (Array.isArray(sourceValue)) { + // Directly assign arrays + target[key] = sourceValue; + } else if ( + typeof sourceValue === 'object' && + sourceValue !== null && + !Array.isArray(targetValue) + ) { + if (typeof targetValue !== 'object' || isEmptyValue(targetValue)) { + target[key] = merge({}, sourceValue); + } else { + target[key] = merge(targetValue, sourceValue); + } + } else if (!(key in target) || (isEmptyValue(targetValue) && !isEmptyValue(sourceValue))) { + target[key] = sourceValue; + } + } + return target; +} + +export function mergeSamples(objects: any[]): string { + let result: Record = {}; + + for (const obj of objects) { + let sample: Record = obj; + if (typeof obj === 'string') { + sample = JSON.parse(obj); + } + result = merge(result, sample); + } + + return JSON.stringify(result, null, 2); +}