From 3e3336516423f97a8afb413758304e6cee1719c5 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Fri, 21 Mar 2025 14:50:07 +0100 Subject: [PATCH 1/2] install clustom assets --- .../sync_integrations/custom_assets.test.ts | 519 +++++++++++++++++- .../tasks/sync_integrations/custom_assets.ts | 148 ++++- .../sync_integrations_on_remote.test.ts | 46 ++ .../sync_integrations_on_remote.ts | 12 + .../sync_integrations_task.ts | 2 +- 5 files changed, 708 insertions(+), 19 deletions(-) diff --git a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.test.ts b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.test.ts index a3696fc4d128e..9c72f7f9489bb 100644 --- a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.test.ts +++ b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.test.ts @@ -5,7 +5,7 @@ * 2.0. */ -import { findIntegration, getCustomAssets } from './custom_assets'; +import { findIntegration, getCustomAssets, installCustomAsset } from './custom_assets'; describe('custom assets', () => { const integrations = [ @@ -223,4 +223,521 @@ describe('custom assets', () => { expect(customAssets).toEqual([]); }); }); + + describe('installCustomAsset', () => { + let esClientMock: any; + + it('should delete component template if deleted', async () => { + esClientMock = { + cluster: { + getComponentTemplate: jest.fn().mockResolvedValue({ + component_templates: [ + { + name: 'logs-system.auth@custom', + component_template: { + template: { + mappings: { + properties: { + new_field: { + type: 'text', + }, + }, + }, + }, + }, + }, + ], + }), + deleteComponentTemplate: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: true, + deleted_at: new Date().toISOString(), + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + template: {}, + type: 'component_template', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.cluster.deleteComponentTemplate).toHaveBeenCalledWith( + { + name: 'logs-system.auth@custom', + }, + expect.anything() + ); + }); + + it('should do nothing if component template deleted and not exists', async () => { + esClientMock = { + cluster: { + getComponentTemplate: jest.fn().mockResolvedValue({ + component_templates: [], + }), + deleteComponentTemplate: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: true, + deleted_at: new Date().toISOString(), + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + template: {}, + type: 'component_template', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.cluster.deleteComponentTemplate).not.toHaveBeenCalled(); + }); + + it('should install component template if not exists', async () => { + esClientMock = { + cluster: { + getComponentTemplate: jest.fn().mockResolvedValue({ + component_templates: [], + }), + putComponentTemplate: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: false, + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + template: { + mappings: { + properties: { + new_field: { + type: 'text', + }, + }, + }, + }, + type: 'component_template', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.cluster.putComponentTemplate).toHaveBeenCalledWith( + { + name: 'logs-system.auth@custom', + template: { + mappings: { + properties: { + new_field: { + type: 'text', + }, + }, + }, + }, + }, + expect.anything() + ); + }); + + it('should update component template if changed', async () => { + esClientMock = { + cluster: { + getComponentTemplate: jest.fn().mockResolvedValue({ + component_templates: [ + { + name: 'logs-system.auth@custom', + component_template: { + template: { + mappings: { + properties: { + new_field: { + type: 'text', + }, + }, + }, + }, + }, + }, + ], + }), + putComponentTemplate: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: false, + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + template: { + mappings: { + properties: { + new_field2: { + type: 'text', + }, + }, + }, + }, + type: 'component_template', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.cluster.putComponentTemplate).toHaveBeenCalledWith( + { + name: 'logs-system.auth@custom', + template: { + mappings: { + properties: { + new_field2: { + type: 'text', + }, + }, + }, + }, + }, + expect.anything() + ); + }); + + it('should not update component template if not changed', async () => { + esClientMock = { + cluster: { + getComponentTemplate: jest.fn().mockResolvedValue({ + component_templates: [ + { + name: 'logs-system.auth@custom', + component_template: { + template: { + mappings: { + properties: { + new_field: { + type: 'text', + }, + }, + }, + }, + }, + }, + ], + }), + putComponentTemplate: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: false, + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + template: { + mappings: { + properties: { + new_field: { + type: 'text', + }, + }, + }, + }, + type: 'component_template', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.cluster.putComponentTemplate).not.toHaveBeenCalled(); + }); + + // pipeline + + it('should delete ingest pipeline if deleted', async () => { + esClientMock = { + ingest: { + getPipeline: jest.fn().mockResolvedValue({ + 'logs-system.auth@custom': { + processors: [ + { + user_agent: { + field: 'user_agent', + }, + }, + ], + }, + }), + deletePipeline: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: true, + deleted_at: new Date().toISOString(), + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + processors: [], + type: 'ingest_pipeline', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.ingest.deletePipeline).toHaveBeenCalledWith( + { + id: 'logs-system.auth@custom', + }, + expect.anything() + ); + }); + + it('should do nothing if ingest pipeline deleted and not exists', async () => { + esClientMock = { + ingest: { + getPipeline: jest.fn().mockResolvedValue({}), + deletePipeline: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: true, + deleted_at: new Date().toISOString(), + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + processors: [], + type: 'ingest_pipeline', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.ingest.deletePipeline).not.toHaveBeenCalled(); + }); + + it('should install ingest pipeline if not exists', async () => { + esClientMock = { + ingest: { + getPipeline: jest.fn().mockResolvedValue({}), + putPipeline: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: false, + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + pipeline: { + processors: [ + { + user_agent: { + field: 'user_agent', + }, + }, + ], + version: 1, + }, + type: 'ingest_pipeline', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.ingest.putPipeline).toHaveBeenCalledWith( + { + id: 'logs-system.auth@custom', + processors: [ + { + user_agent: { + field: 'user_agent', + }, + }, + ], + version: 1, + }, + expect.anything() + ); + }); + + it('should update ingest pipeline if version changed', async () => { + esClientMock = { + ingest: { + getPipeline: jest.fn().mockResolvedValue({ + 'logs-system.auth@custom': { + processors: [ + { + user_agent: { + field: 'user_agent', + }, + }, + ], + version: 1, + }, + }), + putPipeline: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: false, + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + pipeline: { + processors: [ + { + user_agent: { + field: 'user_agent', + }, + }, + ], + version: 2, + }, + type: 'ingest_pipeline', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.ingest.putPipeline).toHaveBeenCalledWith( + { + id: 'logs-system.auth@custom', + processors: [ + { + user_agent: { + field: 'user_agent', + }, + }, + ], + version: 2, + }, + expect.anything() + ); + }); + + it('should update ingest pipeline if changed without version', async () => { + esClientMock = { + ingest: { + getPipeline: jest.fn().mockResolvedValue({ + 'logs-system.auth@custom': { + processors: [ + { + user_agent: { + field: 'user_agent', + }, + }, + ], + }, + }), + putPipeline: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: false, + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + pipeline: { + processors: [ + { + user_agent: { + field: 'user_agent2', + }, + }, + ], + }, + type: 'ingest_pipeline', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.ingest.putPipeline).toHaveBeenCalledWith( + { + id: 'logs-system.auth@custom', + processors: [ + { + user_agent: { + field: 'user_agent2', + }, + }, + ], + }, + expect.anything() + ); + }); + + it('should not update ingest pipeline if not changed', async () => { + esClientMock = { + ingest: { + getPipeline: jest.fn().mockResolvedValue({ + 'logs-system.auth@custom': { + processors: [ + { + user_agent: { + field: 'user_agent', + }, + }, + ], + version: 1, + }, + }), + putPipeline: jest.fn().mockResolvedValue({}), + }, + }; + + await installCustomAsset( + { + is_deleted: false, + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + pipeline: { + processors: [ + { + user_agent: { + field: 'user_agent', + }, + }, + ], + version: 1, + }, + type: 'ingest_pipeline', + }, + esClientMock, + new AbortController(), + { debug: jest.fn() } as any + ); + + expect(esClientMock.ingest.putPipeline).not.toHaveBeenCalled(); + }); + }); }); diff --git a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.ts b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.ts index aebf376204a70..2aee3d3a891f2 100644 --- a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.ts +++ b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.ts @@ -5,7 +5,13 @@ * 2.0. */ -import type { ElasticsearchClient } from '@kbn/core/server'; +import type { ElasticsearchClient, Logger } from '@kbn/core/server'; + +import { isEqual } from 'lodash'; +import type { + ClusterGetComponentTemplateResponse, + IngestGetPipelineResponse, +} from '@elastic/elasticsearch/lib/api/types'; import type { CustomAssetsData, IntegrationsData, SyncIntegrationsData } from './model'; @@ -25,21 +31,45 @@ export const findIntegration = (assetName: string, integrations: IntegrationsDat }); }; -export const getCustomAssets = async ( +function getComponentTemplate( esClient: ElasticsearchClient, - integrations: IntegrationsData[], - abortController: AbortController, - previousSyncIntegrationsData: SyncIntegrationsData | undefined -): Promise => { - const customTemplates = await esClient.cluster.getComponentTemplate( + name: string, + abortController: AbortController +): Promise { + return esClient.cluster.getComponentTemplate( { - name: '*@custom', + name, }, { ignore: [404], signal: abortController.signal, } ); +} + +function getPipeline( + esClient: ElasticsearchClient, + name: string, + abortController: AbortController +): Promise { + return esClient.ingest.getPipeline( + { + id: name, + }, + { + ignore: [404], + signal: abortController.signal, + } + ); +} + +export const getCustomAssets = async ( + esClient: ElasticsearchClient, + integrations: IntegrationsData[], + abortController: AbortController, + previousSyncIntegrationsData: SyncIntegrationsData | undefined +): Promise => { + const customTemplates = await getComponentTemplate(esClient, '*@custom', abortController); const customAssetsComponentTemplates = customTemplates.component_templates.reduce( (acc: CustomAssetsData[], template) => { @@ -58,15 +88,7 @@ export const getCustomAssets = async ( [] ); - const ingestPipelines = await esClient.ingest.getPipeline( - { - id: '*@custom', - }, - { - ignore: [404], - signal: abortController.signal, - } - ); + const ingestPipelines = await getPipeline(esClient, '*@custom', abortController); const customAssetsIngestPipelines = Object.keys(ingestPipelines).reduce( (acc: CustomAssetsData[], pipeline) => { @@ -124,3 +146,95 @@ function updateDeletedAssets( return deletedAssets; } + +export async function installCustomAsset( + customAsset: CustomAssetsData, + esClient: ElasticsearchClient, + abortController: AbortController, + logger: Logger +) { + if (customAsset.type === 'component_template') { + const customTemplates = await getComponentTemplate(esClient, customAsset.name, abortController); + const existingTemplate = customTemplates.component_templates?.find( + (template) => template.name === customAsset.name + ); + if (customAsset.is_deleted) { + if (existingTemplate) { + logger.debug(`Deleting component template: ${customAsset.name}`); + return esClient.cluster.deleteComponentTemplate( + { + name: customAsset.name, + }, + { + signal: abortController.signal, + } + ); + } else { + return; + } + } + let shouldUpdateTemplate = false; + if (existingTemplate) { + shouldUpdateTemplate = !isEqual( + existingTemplate.component_template.template, + customAsset.template + ); + } else { + shouldUpdateTemplate = true; + } + + if (shouldUpdateTemplate) { + logger.debug(`Updating component template: ${customAsset.name}`); + return esClient.cluster.putComponentTemplate( + { + name: customAsset.name, + template: customAsset.template, + }, + { + signal: abortController.signal, + } + ); + } + } else if (customAsset.type === 'ingest_pipeline') { + const ingestPipelines = await getPipeline(esClient, customAsset.name, abortController); + const existingPipeline = ingestPipelines[customAsset.name]; + + if (customAsset.is_deleted) { + if (existingPipeline) { + logger.debug(`Deleting ingest pipeline: ${customAsset.name}`); + return esClient.ingest.deletePipeline( + { + id: customAsset.name, + }, + { + signal: abortController.signal, + } + ); + } else { + return; + } + } + + let shouldUpdatePipeline = false; + if (existingPipeline) { + shouldUpdatePipeline = + (existingPipeline.version && existingPipeline.version < customAsset.pipeline.version) || + (!existingPipeline.version && !isEqual(existingPipeline, customAsset.pipeline)); + } else { + shouldUpdatePipeline = true; + } + + if (shouldUpdatePipeline) { + logger.debug(`Updating ingest pipeline: ${customAsset.name}`); + return esClient.ingest.putPipeline( + { + id: customAsset.name, + ...customAsset.pipeline, + }, + { + signal: abortController.signal, + } + ); + } + } +} diff --git a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_on_remote.test.ts b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_on_remote.test.ts index a94f0fc9736a0..ab2a75260e3cd 100644 --- a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_on_remote.test.ts +++ b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_on_remote.test.ts @@ -8,9 +8,12 @@ import { PackageNotFoundError } from '../../errors'; import { outputService } from '../../services'; +import { installCustomAsset } from './custom_assets'; + import { syncIntegrationsOnRemote } from './sync_integrations_on_remote'; jest.mock('../../services'); +jest.mock('./custom_assets'); const outputServiceMock = outputService as jest.Mocked; @@ -49,6 +52,7 @@ describe('syncIntegrationsOnRemote', () => { warn: jest.fn(), info: jest.fn(), }; + (installCustomAsset as jest.Mock).mockClear(); }); it('should throw error if multiple synced integrations ccr indices exist', async () => { @@ -88,6 +92,24 @@ describe('syncIntegrationsOnRemote', () => { updated_at: '2021-01-01T00:00:00.000Z', }, ], + custom_assets: { + 'component_template:logs-system.auth@custom': { + is_deleted: false, + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + template: {}, + type: 'component_template', + }, + 'ingest_pipeline:logs-system.auth@custom': { + is_deleted: false, + name: 'logs-system.auth@custom', + package_name: 'system', + package_version: '0.1.0', + pipeline: {}, + type: 'ingest_pipeline', + }, + }, }, }, ], @@ -404,4 +426,28 @@ describe('syncIntegrationsOnRemote', () => { expect(packageClientMock.installPackage).not.toHaveBeenCalled(); }); + + it('should install custom assets', async () => { + getIndicesMock.mockResolvedValue({ + 'fleet-synced-integrations-ccr-remote1': {}, + }); + searchMock.mockResolvedValue(getSyncedIntegrationsCCRDoc(true)); + packageClientMock.getInstallation.mockImplementation(() => ({ + install_status: 'installed', + version: '2.2.0', + })); + packageClientMock.installPackage.mockResolvedValue({ + status: 'installed', + }); + + await syncIntegrationsOnRemote( + esClientMock, + {} as any, + packageClientMock, + abortController, + loggerMock + ); + + expect(installCustomAsset).toHaveBeenCalledTimes(2); + }); }); diff --git a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_on_remote.ts b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_on_remote.ts index 847f681b805cd..237c8a8066101 100644 --- a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_on_remote.ts +++ b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_on_remote.ts @@ -15,6 +15,7 @@ import { outputService } from '../../services'; import { PackageNotFoundError } from '../../errors'; import type { SyncIntegrationsData } from './model'; +import { installCustomAsset } from './custom_assets'; const FLEET_SYNCED_INTEGRATIONS_CCR_INDEX_PREFIX = 'fleet-synced-integrations-ccr-*'; @@ -167,4 +168,15 @@ export const syncIntegrationsOnRemote = async ( } await installPackageIfNotInstalled(pkg, packageClient, logger, abortController); } + + for (const customAsset of Object.values(syncIntegrationsDoc?.custom_assets ?? {})) { + if (abortController.signal.aborted) { + throw new Error('Task was aborted'); + } + try { + await installCustomAsset(customAsset, esClient, abortController, logger); + } catch (error) { + logger.error(`Failed to install ${customAsset.type} ${customAsset.name}, error: ${error}`); + } + } }; diff --git a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_task.ts b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_task.ts index 9e195682659be..76c0e60109031 100644 --- a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_task.ts +++ b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/sync_integrations_task.ts @@ -28,7 +28,7 @@ import { getCustomAssets } from './custom_assets'; import type { SyncIntegrationsData } from './model'; export const TYPE = 'fleet:sync-integrations-task'; -export const VERSION = '1.0.2'; +export const VERSION = '1.0.3'; const TITLE = 'Fleet Sync Integrations Task'; const SCOPE = ['fleet']; const INTERVAL = '5m'; From 5d13c675280df30dc9f904fd81353befe2589816 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Mon, 24 Mar 2025 12:58:35 +0100 Subject: [PATCH 2/2] add retry, refactor --- .../tasks/sync_integrations/custom_assets.ts | 166 +++++++++++------- 1 file changed, 101 insertions(+), 65 deletions(-) diff --git a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.ts b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.ts index 2aee3d3a891f2..8bfedf60a664c 100644 --- a/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.ts +++ b/x-pack/platform/plugins/shared/fleet/server/tasks/sync_integrations/custom_assets.ts @@ -13,6 +13,8 @@ import type { IngestGetPipelineResponse, } from '@elastic/elasticsearch/lib/api/types'; +import { retryTransientEsErrors } from '../../services/epm/elasticsearch/retry'; + import type { CustomAssetsData, IntegrationsData, SyncIntegrationsData } from './model'; const DELETED_ASSET_TTL = 7 * 24 * 60 * 60 * 1000; // 7 days @@ -147,94 +149,128 @@ function updateDeletedAssets( return deletedAssets; } -export async function installCustomAsset( +async function updateComponentTemplate( customAsset: CustomAssetsData, esClient: ElasticsearchClient, abortController: AbortController, logger: Logger ) { - if (customAsset.type === 'component_template') { - const customTemplates = await getComponentTemplate(esClient, customAsset.name, abortController); - const existingTemplate = customTemplates.component_templates?.find( - (template) => template.name === customAsset.name + const customTemplates = await getComponentTemplate(esClient, customAsset.name, abortController); + const existingTemplate = customTemplates.component_templates?.find( + (template) => template.name === customAsset.name + ); + if (customAsset.is_deleted) { + if (existingTemplate) { + logger.debug(`Deleting component template: ${customAsset.name}`); + return retryTransientEsErrors( + () => + esClient.cluster.deleteComponentTemplate( + { + name: customAsset.name, + }, + { + signal: abortController.signal, + } + ), + { logger } + ); + } else { + return; + } + } + let shouldUpdateTemplate = false; + if (existingTemplate) { + shouldUpdateTemplate = !isEqual( + existingTemplate.component_template.template, + customAsset.template ); - if (customAsset.is_deleted) { - if (existingTemplate) { - logger.debug(`Deleting component template: ${customAsset.name}`); - return esClient.cluster.deleteComponentTemplate( + } else { + shouldUpdateTemplate = true; + } + + if (shouldUpdateTemplate) { + logger.debug(`Updating component template: ${customAsset.name}`); + return retryTransientEsErrors( + () => + esClient.cluster.putComponentTemplate( { name: customAsset.name, + template: customAsset.template, }, { signal: abortController.signal, } - ); - } else { - return; - } - } - let shouldUpdateTemplate = false; - if (existingTemplate) { - shouldUpdateTemplate = !isEqual( - existingTemplate.component_template.template, - customAsset.template + ), + { logger } + ); + } +} + +async function updateIngestPipeline( + customAsset: CustomAssetsData, + esClient: ElasticsearchClient, + abortController: AbortController, + logger: Logger +) { + const ingestPipelines = await getPipeline(esClient, customAsset.name, abortController); + const existingPipeline = ingestPipelines[customAsset.name]; + + if (customAsset.is_deleted) { + if (existingPipeline) { + logger.debug(`Deleting ingest pipeline: ${customAsset.name}`); + return retryTransientEsErrors( + () => + esClient.ingest.deletePipeline( + { + id: customAsset.name, + }, + { + signal: abortController.signal, + } + ), + { logger } ); } else { - shouldUpdateTemplate = true; + return; } + } - if (shouldUpdateTemplate) { - logger.debug(`Updating component template: ${customAsset.name}`); - return esClient.cluster.putComponentTemplate( - { - name: customAsset.name, - template: customAsset.template, - }, - { - signal: abortController.signal, - } - ); - } - } else if (customAsset.type === 'ingest_pipeline') { - const ingestPipelines = await getPipeline(esClient, customAsset.name, abortController); - const existingPipeline = ingestPipelines[customAsset.name]; + let shouldUpdatePipeline = false; + if (existingPipeline) { + shouldUpdatePipeline = + (existingPipeline.version && existingPipeline.version < customAsset.pipeline.version) || + (!existingPipeline.version && !isEqual(existingPipeline, customAsset.pipeline)); + } else { + shouldUpdatePipeline = true; + } - if (customAsset.is_deleted) { - if (existingPipeline) { - logger.debug(`Deleting ingest pipeline: ${customAsset.name}`); - return esClient.ingest.deletePipeline( + if (shouldUpdatePipeline) { + logger.debug(`Updating ingest pipeline: ${customAsset.name}`); + return retryTransientEsErrors( + () => + esClient.ingest.putPipeline( { id: customAsset.name, + ...customAsset.pipeline, }, { signal: abortController.signal, } - ); - } else { - return; - } - } - - let shouldUpdatePipeline = false; - if (existingPipeline) { - shouldUpdatePipeline = - (existingPipeline.version && existingPipeline.version < customAsset.pipeline.version) || - (!existingPipeline.version && !isEqual(existingPipeline, customAsset.pipeline)); - } else { - shouldUpdatePipeline = true; - } + ), + { logger } + ); + } +} - if (shouldUpdatePipeline) { - logger.debug(`Updating ingest pipeline: ${customAsset.name}`); - return esClient.ingest.putPipeline( - { - id: customAsset.name, - ...customAsset.pipeline, - }, - { - signal: abortController.signal, - } - ); - } +export async function installCustomAsset( + customAsset: CustomAssetsData, + esClient: ElasticsearchClient, + abortController: AbortController, + logger: Logger +) { + if (customAsset.type === 'component_template') { + return updateComponentTemplate(customAsset, esClient, abortController, logger); + } else if (customAsset.type === 'ingest_pipeline') { + return updateIngestPipeline(customAsset, esClient, abortController, logger); } }