diff --git a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.test.ts b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.test.ts index a3696fc4d128e..9c72f7f9489bb 100644 --- a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.test.ts +++ b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.test.ts @@ -5,7 +5,7 @@ * 2.0. */ -import { findIntegration, getCustomAssets } from './custom_assets'; +import { findIntegration, getCustomAssets, installCustomAsset } from './custom_assets'; describe('custom assets', () => { const integrations = [ @@ -223,4 +223,521 @@ describe('custom assets', () => { expect(customAssets).toEqual([]); }); }); + + describe('installCustomAsset', () => { + let esClientMock: any; + + it('should delete component template if deleted', async () => { + esClientMock = { + cluster: { + getComponentTemplate: jest.fn().mockResolvedValue({ + component_templates: [ + { + name: 'logs-system.auth@custom', + component_template: { + template: { + mappings: { + properties: { + new_field: { + type: 'text', + }, + }, + }, + }, + }, + }, + ], + }), + deleteComponentTemplate: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: true, + deleted_at: new Date().toISOString(), + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + template: {}, + type: 'component_template', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.cluster.deleteComponentTemplate).toHaveBeenCalledWith( + { + name: 'logs-system.auth@custom', + }, + expect.anything() + ); + }); + + it('should do nothing if component template deleted and not exists', async () => { + esClientMock = { + cluster: { + getComponentTemplate: jest.fn().mockResolvedValue({ + component_templates: [], + }), + deleteComponentTemplate: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: true, + deleted_at: new Date().toISOString(), + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + template: {}, + type: 'component_template', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.cluster.deleteComponentTemplate).not.toHaveBeenCalled(); + }); + + it('should install component template if not exists', async () => { + esClientMock = { + cluster: { + getComponentTemplate: jest.fn().mockResolvedValue({ + component_templates: [], + }), + putComponentTemplate: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: false, + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + template: { + mappings: { + properties: { + new_field: { + type: 'text', + }, + }, + }, + }, + type: 'component_template', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.cluster.putComponentTemplate).toHaveBeenCalledWith( + { + name: 'logs-system.auth@custom', + template: { + mappings: { + properties: { + new_field: { + type: 'text', + }, + }, + }, + }, + }, + expect.anything() + ); + }); + + it('should update component template if changed', async () => { + esClientMock = { + cluster: { + getComponentTemplate: jest.fn().mockResolvedValue({ + component_templates: [ + { + name: 'logs-system.auth@custom', + component_template: { + template: { + mappings: { + properties: { + new_field: { + type: 'text', + }, + }, + }, + }, + }, + }, + ], + }), + putComponentTemplate: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: false, + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + template: { + mappings: { + properties: { + new_field2: { + type: 'text', + }, + }, + }, + }, + type: 'component_template', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.cluster.putComponentTemplate).toHaveBeenCalledWith( + { + name: 'logs-system.auth@custom', + template: { + mappings: { + properties: { + new_field2: { + type: 'text', + }, + }, + }, + }, + }, + expect.anything() + ); + }); + + it('should not update component template if not changed', async () => { + esClientMock = { + cluster: { + getComponentTemplate: jest.fn().mockResolvedValue({ + component_templates: [ + { + name: 'logs-system.auth@custom', + component_template: { + template: { + mappings: { + properties: { + new_field: { + type: 'text', + }, + }, + }, + }, + }, + }, + ], + }), + putComponentTemplate: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: false, + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + template: { + mappings: { + properties: { + new_field: { + type: 'text', + }, + }, + }, + }, + type: 'component_template', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.cluster.putComponentTemplate).not.toHaveBeenCalled(); + }); + + // pipeline + + it('should delete ingest pipeline if deleted', async () => { + esClientMock = { + ingest: { + getPipeline: jest.fn().mockResolvedValue({ + 'logs-system.auth@custom': { + processors: [ + { + user_agent: { + field: 'user_agent', + }, + }, + ], + }, + }), + deletePipeline: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: true, + deleted_at: new Date().toISOString(), + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + processors: [], + type: 'ingest_pipeline', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.ingest.deletePipeline).toHaveBeenCalledWith( + { + id: 'logs-system.auth@custom', + }, + expect.anything() + ); + }); + + it('should do nothing if ingest pipeline deleted and not exists', async () => { + esClientMock = { + ingest: { + getPipeline: jest.fn().mockResolvedValue({}), + deletePipeline: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: true, + deleted_at: new Date().toISOString(), + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + processors: [], + type: 'ingest_pipeline', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.ingest.deletePipeline).not.toHaveBeenCalled(); + }); + + it('should install ingest pipeline if not exists', async () => { + esClientMock = { + ingest: { + getPipeline: jest.fn().mockResolvedValue({}), + putPipeline: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: false, + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + pipeline: { + processors: [ + { + user_agent: { + field: 'user_agent', + }, + }, + ], + version: 1, + }, + type: 'ingest_pipeline', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.ingest.putPipeline).toHaveBeenCalledWith( + { + id: 'logs-system.auth@custom', + processors: [ + { + user_agent: { + field: 'user_agent', + }, + }, + ], + version: 1, + }, + expect.anything() + ); + }); + + it('should update ingest pipeline if version changed', async () => { + esClientMock = { + ingest: { + getPipeline: jest.fn().mockResolvedValue({ + 'logs-system.auth@custom': { + processors: [ + { + user_agent: { + field: 'user_agent', + }, + }, + ], + version: 1, + }, + }), + putPipeline: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: false, + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + pipeline: { + processors: [ + { + user_agent: { + field: 'user_agent', + }, + }, + ], + version: 2, + }, + type: 'ingest_pipeline', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.ingest.putPipeline).toHaveBeenCalledWith( + { + id: 'logs-system.auth@custom', + processors: [ + { + user_agent: { + field: 'user_agent', + }, + }, + ], + version: 2, + }, + expect.anything() + ); + }); + + it('should update ingest pipeline if changed without version', async () => { + esClientMock = { + ingest: { + getPipeline: jest.fn().mockResolvedValue({ + 'logs-system.auth@custom': { + processors: [ + { + user_agent: { + field: 'user_agent', + }, + }, + ], + }, + }), + putPipeline: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: false, + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + pipeline: { + processors: [ + { + user_agent: { + field: 'user_agent2', + }, + }, + ], + }, + type: 'ingest_pipeline', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.ingest.putPipeline).toHaveBeenCalledWith( + { + id: 'logs-system.auth@custom', + processors: [ + { + user_agent: { + field: 'user_agent2', + }, + }, + ], + }, + expect.anything() + ); + }); + + it('should not update ingest pipeline if not changed', async () => { + esClientMock = { + ingest: { + getPipeline: jest.fn().mockResolvedValue({ + 'logs-system.auth@custom': { + processors: [ + { + user_agent: { + field: 'user_agent', + }, + }, + ], + version: 1, + }, + }), + putPipeline: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: false, + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + pipeline: { + processors: [ + { + user_agent: { + field: 'user_agent', + }, + }, + ], + version: 1, + }, + type: 'ingest_pipeline', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.ingest.putPipeline).not.toHaveBeenCalled(); + }); + }); }); diff --git a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.ts b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.ts index aebf376204a70..8bfedf60a664c 100644 --- a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.ts +++ b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.ts @@ -5,7 +5,15 @@ * 2.0. */ -import type { ElasticsearchClient } from '@kbn/core/server'; +import type { ElasticsearchClient, Logger } from '@kbn/core/server'; + +import { isEqual } from 'lodash'; +import type { + ClusterGetComponentTemplateResponse, + IngestGetPipelineResponse, +} from '@elastic/elasticsearch/lib/api/types'; + +import { retryTransientEsErrors } from '../../services/epm/elasticsearch/retry'; import type { CustomAssetsData, IntegrationsData, SyncIntegrationsData } from './model'; @@ -25,21 +33,45 @@ export const findIntegration = (assetName: string, integrations: IntegrationsDat }); }; -export const getCustomAssets = async ( +function getComponentTemplate( esClient: ElasticsearchClient, - integrations: IntegrationsData[], - abortController: AbortController, - previousSyncIntegrationsData: SyncIntegrationsData | undefined -): Promise => { - const customTemplates = await esClient.cluster.getComponentTemplate( + name: string, + abortController: AbortController +): Promise { + return esClient.cluster.getComponentTemplate( { - name: '*@custom', + name, }, { ignore: [404], signal: abortController.signal, } ); +} + +function getPipeline( + esClient: ElasticsearchClient, + name: string, + abortController: AbortController +): Promise { + return esClient.ingest.getPipeline( + { + id: name, + }, + { + ignore: [404], + signal: abortController.signal, + } + ); +} + +export const getCustomAssets = async ( + esClient: ElasticsearchClient, + integrations: IntegrationsData[], + abortController: AbortController, + previousSyncIntegrationsData: SyncIntegrationsData | undefined +): Promise => { + const customTemplates = await getComponentTemplate(esClient, '*@custom', abortController); const customAssetsComponentTemplates = customTemplates.component_templates.reduce( (acc: CustomAssetsData[], template) => { @@ -58,15 +90,7 @@ export const getCustomAssets = async ( [] ); - const ingestPipelines = await esClient.ingest.getPipeline( - { - id: '*@custom', - }, - { - ignore: [404], - signal: abortController.signal, - } - ); + const ingestPipelines = await getPipeline(esClient, '*@custom', abortController); const customAssetsIngestPipelines = Object.keys(ingestPipelines).reduce( (acc: CustomAssetsData[], pipeline) => { @@ -124,3 +148,129 @@ function updateDeletedAssets( return deletedAssets; } + +async function updateComponentTemplate( + customAsset: CustomAssetsData, + esClient: ElasticsearchClient, + abortController: AbortController, + logger: Logger +) { + const customTemplates = await getComponentTemplate(esClient, customAsset.name, abortController); + const existingTemplate = customTemplates.component_templates?.find( + (template) => template.name === customAsset.name + ); + if (customAsset.is_deleted) { + if (existingTemplate) { + logger.debug(`Deleting component template: ${customAsset.name}`); + return retryTransientEsErrors( + () => + esClient.cluster.deleteComponentTemplate( + { + name: customAsset.name, + }, + { + signal: abortController.signal, + } + ), + { logger } + ); + } else { + return; + } + } + let shouldUpdateTemplate = false; + if (existingTemplate) { + shouldUpdateTemplate = !isEqual( + existingTemplate.component_template.template, + customAsset.template + ); + } else { + shouldUpdateTemplate = true; + } + + if (shouldUpdateTemplate) { + logger.debug(`Updating component template: ${customAsset.name}`); + return retryTransientEsErrors( + () => + esClient.cluster.putComponentTemplate( + { + name: customAsset.name, + template: customAsset.template, + }, + { + signal: abortController.signal, + } + ), + { logger } + ); + } +} + +async function updateIngestPipeline( + customAsset: CustomAssetsData, + esClient: ElasticsearchClient, + abortController: AbortController, + logger: Logger +) { + const ingestPipelines = await getPipeline(esClient, customAsset.name, abortController); + const existingPipeline = ingestPipelines[customAsset.name]; + + if (customAsset.is_deleted) { + if (existingPipeline) { + logger.debug(`Deleting ingest pipeline: ${customAsset.name}`); + return retryTransientEsErrors( + () => + esClient.ingest.deletePipeline( + { + id: customAsset.name, + }, + { + signal: abortController.signal, + } + ), + { logger } + ); + } else { + return; + } + } + + let shouldUpdatePipeline = false; + if (existingPipeline) { + shouldUpdatePipeline = + (existingPipeline.version && existingPipeline.version < customAsset.pipeline.version) || + (!existingPipeline.version && !isEqual(existingPipeline, customAsset.pipeline)); + } else { + shouldUpdatePipeline = true; + } + + if (shouldUpdatePipeline) { + logger.debug(`Updating ingest pipeline: ${customAsset.name}`); + return retryTransientEsErrors( + () => + esClient.ingest.putPipeline( + { + id: customAsset.name, + ...customAsset.pipeline, + }, + { + signal: abortController.signal, + } + ), + { logger } + ); + } +} + +export async function installCustomAsset( + customAsset: CustomAssetsData, + esClient: ElasticsearchClient, + abortController: AbortController, + logger: Logger +) { + if (customAsset.type === 'component_template') { + return updateComponentTemplate(customAsset, esClient, abortController, logger); + } else if (customAsset.type === 'ingest_pipeline') { + return updateIngestPipeline(customAsset, esClient, abortController, logger); + } +} diff --git a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_on_remote.test.ts b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_on_remote.test.ts index a94f0fc9736a0..ab2a75260e3cd 100644 --- a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_on_remote.test.ts +++ b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_on_remote.test.ts @@ -8,9 +8,12 @@ import { PackageNotFoundError } from '../../errors'; import { outputService } from '../../services'; +import { installCustomAsset } from './custom_assets'; + import { syncIntegrationsOnRemote } from './sync_integrations_on_remote'; jest.mock('../../services'); +jest.mock('./custom_assets'); const outputServiceMock = outputService as jest.Mocked; @@ -49,6 +52,7 @@ describe('syncIntegrationsOnRemote', () => { warn: jest.fn(), info: jest.fn(), }; + (installCustomAsset as jest.Mock).mockClear(); }); it('should throw error if multiple synced integrations ccr indices exist', async () => { @@ -88,6 +92,24 @@ describe('syncIntegrationsOnRemote', () => { updated_at: '2021-01-01T00:00:00.000Z', }, ], + custom_assets: { + 'component_template:logs-system.auth@custom': { + is_deleted: false, + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + template: {}, + type: 'component_template', + }, + 'ingest_pipeline:logs-system.auth@custom': { + is_deleted: false, + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + pipeline: {}, + type: 'ingest_pipeline', + }, + }, }, }, ], @@ -404,4 +426,28 @@ describe('syncIntegrationsOnRemote', () => { expect(packageClientMock.installPackage).not.toHaveBeenCalled(); }); + + it('should install custom assets', async () => { + getIndicesMock.mockResolvedValue({ + 'fleet-synced-integrations-ccr-remote1': {}, + }); + searchMock.mockResolvedValue(getSyncedIntegrationsCCRDoc(true)); + packageClientMock.getInstallation.mockImplementation(() => ({ + install_status: 'installed', + version: '2.2.0', + })); + packageClientMock.installPackage.mockResolvedValue({ + status: 'installed', + }); + + await syncIntegrationsOnRemote( + esClientMock, + {} as any, + packageClientMock, + abortController, + loggerMock + ); + + expect(installCustomAsset).toHaveBeenCalledTimes(2); + }); }); diff --git a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_on_remote.ts b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_on_remote.ts index 847f681b805cd..237c8a8066101 100644 --- a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_on_remote.ts +++ b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_on_remote.ts @@ -15,6 +15,7 @@ import { outputService } from '../../services'; import { PackageNotFoundError } from '../../errors'; import type { SyncIntegrationsData } from './model'; +import { installCustomAsset } from './custom_assets'; const FLEET_SYNCED_INTEGRATIONS_CCR_INDEX_PREFIX = 'fleet-synced-integrations-ccr-*'; @@ -167,4 +168,15 @@ export const syncIntegrationsOnRemote = async ( } await installPackageIfNotInstalled(pkg, packageClient, logger, abortController); } + + for (const customAsset of Object.values(syncIntegrationsDoc?.custom_assets ?? {})) { + if (abortController.signal.aborted) { + throw new Error('Task was aborted'); + } + try { + await installCustomAsset(customAsset, esClient, abortController, logger); + } catch (error) { + logger.error(`Failed to install ${customAsset.type} ${customAsset.name}, error: ${error}`); + } + } }; diff --git a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_task.ts b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_task.ts index 9e195682659be..76c0e60109031 100644 --- a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_task.ts +++ b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_task.ts @@ -28,7 +28,7 @@ import { getCustomAssets } from './custom_assets'; import type { SyncIntegrationsData } from './model'; export const TYPE = 'fleet:sync-integrations-task'; -export const VERSION = '1.0.2'; +export const VERSION = '1.0.3'; const TITLE = 'Fleet Sync Integrations Task'; const SCOPE = ['fleet']; const INTERVAL = '5m';