diff --git a/x-pack/plugins/actions/server/action_type_registry.ts b/x-pack/plugins/actions/server/action_type_registry.ts index d274118650b61..09f7fa5370fc0 100644 --- a/x-pack/plugins/actions/server/action_type_registry.ts +++ b/x-pack/plugins/actions/server/action_type_registry.ts @@ -152,8 +152,7 @@ export class ActionTypeRegistry { [`actions:${actionType.id}`]: { title: actionType.name, maxAttempts, - createTaskRunner: (context: RunContext) => - this.taskRunnerFactory.create(context, maxAttempts), + createTaskRunner: (context: RunContext) => this.taskRunnerFactory.create(context), }, }); // No need to notify usage on basic action types diff --git a/x-pack/plugins/actions/server/lib/task_runner_factory.test.ts b/x-pack/plugins/actions/server/lib/task_runner_factory.test.ts index 404c3ba452086..eae6cd5bc06c2 100644 --- a/x-pack/plugins/actions/server/lib/task_runner_factory.test.ts +++ b/x-pack/plugins/actions/server/lib/task_runner_factory.test.ts @@ -12,14 +12,22 @@ import { TaskRunnerFactory } from './task_runner_factory'; import { actionTypeRegistryMock } from '../action_type_registry.mock'; import { actionExecutorMock } from './action_executor.mock'; import { encryptedSavedObjectsMock } from '@kbn/encrypted-saved-objects-plugin/server/mocks'; -import { savedObjectsClientMock, loggingSystemMock, httpServiceMock } from '@kbn/core/server/mocks'; +import { + savedObjectsClientMock, + loggingSystemMock, + httpServiceMock, + savedObjectsRepositoryMock, +} from '@kbn/core/server/mocks'; import { eventLoggerMock } from '@kbn/event-log-plugin/server/mocks'; import { ActionTypeDisabledError } from './errors'; import { actionsClientMock } from '../mocks'; import { inMemoryMetricsMock } from '../monitoring/in_memory_metrics.mock'; import { IN_MEMORY_METRICS } from '../monitoring'; import { pick } from 'lodash'; -import { isRetryableError } from '@kbn/task-manager-plugin/server/task_running'; +import { + isRetryableError, + isUnrecoverableError, +} from '@kbn/task-manager-plugin/server/task_running'; const executeParamsFields = [ 'actionId', @@ -86,15 +94,12 @@ const taskRunnerFactoryInitializerParams = { logger: loggingSystemMock.create().get(), encryptedSavedObjectsClient: mockedEncryptedSavedObjectsClient, basePathService: httpServiceMock.createBasePath(), - getUnsecuredSavedObjectsClient: jest.fn().mockReturnValue(services.savedObjectsClient), + savedObjectsRepository: savedObjectsRepositoryMock.create(), }; beforeEach(() => { jest.resetAllMocks(); actionExecutorInitializerParams.getServices.mockReturnValue(services); - taskRunnerFactoryInitializerParams.getUnsecuredSavedObjectsClient.mockReturnValue( - services.savedObjectsClient - ); }); test(`throws an error if factory isn't initialized`, () => { @@ -410,36 +415,18 @@ test('executes the task by calling the executor with proper parameters when noti ); }); -test('cleans up action_task_params object', async () => { +test('cleans up action_task_params object through the cleanup runner method', async () => { const taskRunner = taskRunnerFactory.create({ taskInstance: mockedTaskInstance, }); - mockedActionExecutor.execute.mockResolvedValueOnce({ status: 'ok', actionId: '2' }); - spaceIdToNamespace.mockReturnValueOnce('namespace-test'); - mockedEncryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ - id: '3', - type: 'action_task_params', - attributes: { - actionId: '2', - params: { baz: true }, - executionId: '123abc', - apiKey: Buffer.from('123:abc').toString('base64'), - }, - references: [ - { - id: '2', - name: 'actionRef', - type: 'action', - }, - ], - }); - - await taskRunner.run(); + await taskRunner.cleanup(); - expect(services.savedObjectsClient.delete).toHaveBeenCalledWith('action_task_params', '3', { - refresh: false, - }); + expect(taskRunnerFactoryInitializerParams.savedObjectsRepository.delete).toHaveBeenCalledWith( + 'action_task_params', + '3', + { refresh: false } + ); }); test('task runner should implement CancellableTask cancel method with logging warning message', async () => { @@ -474,37 +461,22 @@ test('task runner should implement CancellableTask cancel method with logging wa ); }); -test('runs successfully when cleanup fails and logs the error', async () => { +test('cleanup runs successfully when action_task_params cleanup fails and logs the error', async () => { const taskRunner = taskRunnerFactory.create({ taskInstance: mockedTaskInstance, }); - mockedActionExecutor.execute.mockResolvedValueOnce({ status: 'ok', actionId: '2' }); - spaceIdToNamespace.mockReturnValueOnce('namespace-test'); - mockedEncryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ - id: '3', - type: 'action_task_params', - attributes: { - actionId: '2', - params: { baz: true }, - executionId: '123abc', - apiKey: Buffer.from('123:abc').toString('base64'), - }, - references: [ - { - id: '2', - name: 'actionRef', - type: 'action', - }, - ], - }); - services.savedObjectsClient.delete.mockRejectedValueOnce(new Error('Fail')); + taskRunnerFactoryInitializerParams.savedObjectsRepository.delete.mockRejectedValueOnce( + new Error('Fail') + ); - await taskRunner.run(); + await taskRunner.cleanup(); - expect(services.savedObjectsClient.delete).toHaveBeenCalledWith('action_task_params', '3', { - refresh: false, - }); + expect(taskRunnerFactoryInitializerParams.savedObjectsRepository.delete).toHaveBeenCalledWith( + 'action_task_params', + '3', + { refresh: false } + ); expect(taskRunnerFactoryInitializerParams.logger.error).toHaveBeenCalledWith( 'Failed to cleanup action_task_params object [id="3"]: Fail' ); @@ -814,15 +786,12 @@ test(`doesn't use API key when not provided`, async () => { }); test(`throws an error when license doesn't support the action type`, async () => { - const taskRunner = taskRunnerFactory.create( - { - taskInstance: { - ...mockedTaskInstance, - attempts: 1, - }, + const taskRunner = taskRunnerFactory.create({ + taskInstance: { + ...mockedTaskInstance, + attempts: 1, }, - 2 - ); + }); mockedEncryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ id: '3', @@ -849,7 +818,7 @@ test(`throws an error when license doesn't support the action type`, async () => await taskRunner.run(); throw new Error('Should have thrown'); } catch (e) { - expect(isRetryableError(e)).toEqual(true); + expect(isUnrecoverableError(e)).toEqual(true); } }); @@ -895,56 +864,11 @@ test(`will throw an error with retry: false if the task is not retryable`, async expect(err).toBeDefined(); expect(isRetryableError(err)).toEqual(false); expect(taskRunnerFactoryInitializerParams.logger.error as jest.Mock).toHaveBeenCalledWith( - `Action '2' failed and will not retry: Error message` - ); -}); - -test(`treats errors as successes if the task is not retryable`, async () => { - const taskRunner = taskRunnerFactory.create({ - taskInstance: { - ...mockedTaskInstance, - attempts: 1, - }, - }); - - mockedEncryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({ - id: '3', - type: 'action_task_params', - attributes: { - actionId: '2', - params: { baz: true }, - executionId: '123abc', - apiKey: Buffer.from('123:abc').toString('base64'), - }, - references: [ - { - id: '2', - name: 'actionRef', - type: 'action', - }, - ], - }); - mockedActionExecutor.execute.mockResolvedValueOnce({ - status: 'error', - actionId: '2', - message: 'Error message', - data: { foo: true }, - retry: false, - }); - - let err; - try { - await taskRunner.run(); - } catch (e) { - err = e; - } - expect(err).toBeUndefined(); - expect(taskRunnerFactoryInitializerParams.logger.error as jest.Mock).toHaveBeenCalledWith( - `Action '2' failed and will not retry: Error message` + `Action '2' failed: Error message` ); }); -test('will throw a retry error if the error is thrown instead of returned', async () => { +test('will rethrow the error if the error is thrown instead of returned', async () => { const taskRunner = taskRunnerFactory.create({ taskInstance: { ...mockedTaskInstance, @@ -969,7 +893,8 @@ test('will throw a retry error if the error is thrown instead of returned', asyn }, ], }); - mockedActionExecutor.execute.mockRejectedValueOnce({}); + const thrownError = new Error('Fail'); + mockedActionExecutor.execute.mockRejectedValueOnce(thrownError); let err; try { @@ -978,10 +903,10 @@ test('will throw a retry error if the error is thrown instead of returned', asyn err = e; } expect(err).toBeDefined(); - expect(isRetryableError(err)).toEqual(true); expect(taskRunnerFactoryInitializerParams.logger.error as jest.Mock).toHaveBeenCalledWith( - `Action '2' failed and will retry: undefined` + `Action '2' failed: Fail` ); + expect(thrownError).toEqual(err); }); test('increments monitoring metrics after execution', async () => { diff --git a/x-pack/plugins/actions/server/lib/task_runner_factory.ts b/x-pack/plugins/actions/server/lib/task_runner_factory.ts index de347637f5cf4..3b8f4e6853a99 100644 --- a/x-pack/plugins/actions/server/lib/task_runner_factory.ts +++ b/x-pack/plugins/actions/server/lib/task_runner_factory.ts @@ -10,18 +10,20 @@ import { pick } from 'lodash'; import { addSpaceIdToPath } from '@kbn/spaces-plugin/server'; import { Logger, - SavedObjectsClientContract, - KibanaRequest, CoreKibanaRequest, IBasePath, SavedObject, Headers, FakeRawRequest, SavedObjectReference, + ISavedObjectsRepository, } from '@kbn/core/server'; import { RunContext } from '@kbn/task-manager-plugin/server'; import { EncryptedSavedObjectsClient } from '@kbn/encrypted-saved-objects-plugin/server'; -import { throwRetryableError } from '@kbn/task-manager-plugin/server/task_running'; +import { + throwRetryableError, + throwUnrecoverableError, +} from '@kbn/task-manager-plugin/server/task_running'; import { ActionExecutorContract } from './action_executor'; import { ActionTaskParams, @@ -40,6 +42,7 @@ import { import { RelatedSavedObjects, validatedRelatedSavedObjects } from './related_saved_objects'; import { injectSavedObjectReferences } from './action_task_params_utils'; import { InMemoryMetrics, IN_MEMORY_METRICS } from '../monitoring'; +import { ActionTypeDisabledError } from './errors'; export interface TaskRunnerContext { logger: Logger; @@ -47,7 +50,7 @@ export interface TaskRunnerContext { encryptedSavedObjectsClient: EncryptedSavedObjectsClient; spaceIdToNamespace: SpaceIdToNamespaceFunction; basePathService: IBasePath; - getUnsecuredSavedObjectsClient: (request: KibanaRequest) => SavedObjectsClientContract; + savedObjectsRepository: ISavedObjectsRepository; } export class TaskRunnerFactory { @@ -69,7 +72,7 @@ export class TaskRunnerFactory { this.taskRunnerContext = taskRunnerContext; } - public create({ taskInstance }: RunContext, maxAttempts: number = 1) { + public create({ taskInstance }: RunContext) { if (!this.isInitialized) { throw new Error('TaskRunnerFactory not initialized'); } @@ -80,7 +83,7 @@ export class TaskRunnerFactory { encryptedSavedObjectsClient, spaceIdToNamespace, basePathService, - getUnsecuredSavedObjectsClient, + savedObjectsRepository, } = this.taskRunnerContext!; const taskInfo = { @@ -88,10 +91,10 @@ export class TaskRunnerFactory { attempts: taskInstance.attempts, }; const actionExecutionId = uuidv4(); + const actionTaskExecutorParams = taskInstance.params as ActionTaskExecutorParams; return { async run() { - const actionTaskExecutorParams = taskInstance.params as ActionTaskExecutorParams; const { spaceId } = actionTaskExecutorParams; const { @@ -115,12 +118,6 @@ export class TaskRunnerFactory { const request = getFakeRequest(apiKey); basePathService.set(request, path); - // TM will treat a task as a failure if `attempts >= maxAttempts` - // so we need to handle that here to avoid TM persisting the failed task - const isRetryableBasedOnAttempts = taskInfo.attempts < maxAttempts; - const willRetryMessage = `and will retry`; - const willNotRetryMessage = `and will not retry`; - let executorResult: ActionTypeExecutorResult | undefined; try { executorResult = await actionExecutor.execute({ @@ -136,65 +133,28 @@ export class TaskRunnerFactory { ...getSource(references, source), }); } catch (e) { - logger.error( - `Action '${actionId}' failed ${ - isRetryableBasedOnAttempts ? willRetryMessage : willNotRetryMessage - }: ${e.message}` - ); - if (isRetryableBasedOnAttempts) { - // To retry, we will throw a Task Manager RetryableError - throw throwRetryableError(new Error(e.message), true); + logger.error(`Action '${actionId}' failed: ${e.message}`); + if (e instanceof ActionTypeDisabledError) { + // We'll stop re-trying due to action being forbidden + throwUnrecoverableError(e); } + throw e; } inMemoryMetrics.increment(IN_MEMORY_METRICS.ACTION_EXECUTIONS); - if ( - executorResult && - executorResult?.status === 'error' && - executorResult?.retry !== undefined && - isRetryableBasedOnAttempts - ) { + if (executorResult.status === 'error') { inMemoryMetrics.increment(IN_MEMORY_METRICS.ACTION_FAILURES); - logger.error( - `Action '${actionId}' failed ${ - !!executorResult.retry ? willRetryMessage : willNotRetryMessage - }: ${executorResult.message}` - ); - // When the return status is `error`, we will throw a Task Manager RetryableError + logger.error(`Action '${actionId}' failed: ${executorResult.message}`); + // Task manager error handler only kicks in when an error thrown (at this time) + // So what we have to do is throw when the return status is `error`. throw throwRetryableError( new Error(executorResult.message), executorResult.retry as boolean | Date ); - } else if (executorResult && executorResult?.status === 'error') { - inMemoryMetrics.increment(IN_MEMORY_METRICS.ACTION_FAILURES); - logger.error( - `Action '${actionId}' failed ${willNotRetryMessage}: ${executorResult.message}` - ); - } - - // Cleanup action_task_params object now that we're done with it - if (isPersistedActionTask(actionTaskExecutorParams)) { - try { - // If the request has reached this far we can assume the user is allowed to run clean up - // We would idealy secure every operation but in order to support clean up of legacy alerts - // we allow this operation in an unsecured manner - // Once support for legacy alert RBAC is dropped, this can be secured - await getUnsecuredSavedObjectsClient(request).delete( - ACTION_TASK_PARAMS_SAVED_OBJECT_TYPE, - actionTaskExecutorParams.actionTaskParamsId, - { refresh: false } - ); - } catch (e) { - // Log error only, we shouldn't fail the task because of an error here (if ever there's retry logic) - logger.error( - `Failed to cleanup ${ACTION_TASK_PARAMS_SAVED_OBJECT_TYPE} object [id="${actionTaskExecutorParams.actionTaskParamsId}"]: ${e.message}` - ); - } } }, cancel: async () => { // Write event log entry - const actionTaskExecutorParams = taskInstance.params as ActionTaskExecutorParams; const { spaceId } = actionTaskExecutorParams; const { @@ -227,6 +187,23 @@ export class TaskRunnerFactory { ); return { state: {} }; }, + cleanup: async () => { + // Cleanup action_task_params object now that we're done with it + if (isPersistedActionTask(actionTaskExecutorParams)) { + try { + await savedObjectsRepository.delete( + ACTION_TASK_PARAMS_SAVED_OBJECT_TYPE, + actionTaskExecutorParams.actionTaskParamsId, + { refresh: false, namespace: spaceIdToNamespace(actionTaskExecutorParams.spaceId) } + ); + } catch (e) { + // Log error only, we shouldn't fail the task because of an error here (if ever there's retry logic) + logger.error( + `Failed to cleanup ${ACTION_TASK_PARAMS_SAVED_OBJECT_TYPE} object [id="${actionTaskExecutorParams.actionTaskParamsId}"]: ${e.message}` + ); + } + } + }, }; } } diff --git a/x-pack/plugins/actions/server/plugin.ts b/x-pack/plugins/actions/server/plugin.ts index 76e59fa370fc2..debd5917cf687 100644 --- a/x-pack/plugins/actions/server/plugin.ts +++ b/x-pack/plugins/actions/server/plugin.ts @@ -524,8 +524,9 @@ export class ActionsPlugin implements Plugin spaceIdToNamespace(plugins.spaces, spaceId), - getUnsecuredSavedObjectsClient: (request: KibanaRequest) => - this.getUnsecuredSavedObjectsClient(core.savedObjects, request), + savedObjectsRepository: core.savedObjects.createInternalRepository([ + ACTION_TASK_PARAMS_SAVED_OBJECT_TYPE, + ]), }); this.eventLogService!.isEsContextReady().then(() => { diff --git a/x-pack/plugins/alerting/server/rules_client/common/try_to_remove_tasks.ts b/x-pack/plugins/alerting/server/rules_client/common/try_to_remove_tasks.ts index 89ad52d232332..962f68bf5d959 100644 --- a/x-pack/plugins/alerting/server/rules_client/common/try_to_remove_tasks.ts +++ b/x-pack/plugins/alerting/server/rules_client/common/try_to_remove_tasks.ts @@ -20,12 +20,12 @@ export const tryToRemoveTasks = async ({ }) => { const taskIdsFailedToBeDeleted: string[] = []; const taskIdsSuccessfullyDeleted: string[] = []; - return await withSpan({ name: 'taskManager.bulkRemoveIfExist', type: 'rules' }, async () => { + return await withSpan({ name: 'taskManager.bulkRemove', type: 'rules' }, async () => { if (taskIdsToDelete.length > 0) { try { - const resultFromDeletingTasks = await taskManager.bulkRemoveIfExist(taskIdsToDelete); + const resultFromDeletingTasks = await taskManager.bulkRemove(taskIdsToDelete); resultFromDeletingTasks?.statuses.forEach((status) => { - if (status.success) { + if (status.success || status.error?.statusCode === 404) { taskIdsSuccessfullyDeleted.push(status.id); } else { taskIdsFailedToBeDeleted.push(status.id); @@ -49,7 +49,7 @@ export const tryToRemoveTasks = async ({ logger.error( `Failure to delete schedules for underlying tasks: ${taskIdsToDelete.join( ', ' - )}. TaskManager bulkRemoveIfExist failed with Error: ${error.message}` + )}. TaskManager bulkRemove failed with Error: ${error.message}` ); } } diff --git a/x-pack/plugins/alerting/server/rules_client/tests/bulk_delete.test.ts b/x-pack/plugins/alerting/server/rules_client/tests/bulk_delete.test.ts index 92621434dc7b9..b61ef395d4e72 100644 --- a/x-pack/plugins/alerting/server/rules_client/tests/bulk_delete.test.ts +++ b/x-pack/plugins/alerting/server/rules_client/tests/bulk_delete.test.ts @@ -141,8 +141,8 @@ describe('bulkDelete', () => { enabledRule1, enabledRule2, ]); - expect(taskManager.bulkRemoveIfExist).toHaveBeenCalledTimes(1); - expect(taskManager.bulkRemoveIfExist).toHaveBeenCalledWith(['id1']); + expect(taskManager.bulkRemove).toHaveBeenCalledTimes(1); + expect(taskManager.bulkRemove).toHaveBeenCalledWith(['id1']); expect(bulkMarkApiKeysForInvalidation).toHaveBeenCalledTimes(1); expect(bulkMarkApiKeysForInvalidation).toHaveBeenCalledWith( { apiKeys: ['MTIzOmFiYw=='] }, @@ -205,8 +205,8 @@ describe('bulkDelete', () => { const result = await rulesClient.bulkDeleteRules({ ids: ['id1', 'id2'] }); expect(unsecuredSavedObjectsClient.bulkDelete).toHaveBeenCalledTimes(4); - expect(taskManager.bulkRemoveIfExist).toHaveBeenCalledTimes(1); - expect(taskManager.bulkRemoveIfExist).toHaveBeenCalledWith(['id1']); + expect(taskManager.bulkRemove).toHaveBeenCalledTimes(1); + expect(taskManager.bulkRemove).toHaveBeenCalledWith(['id1']); expect(bulkMarkApiKeysForInvalidation).toHaveBeenCalledTimes(1); expect(bulkMarkApiKeysForInvalidation).toHaveBeenCalledWith( { apiKeys: ['MTIzOmFiYw=='] }, @@ -263,8 +263,8 @@ describe('bulkDelete', () => { const result = await rulesClient.bulkDeleteRules({ ids: ['id1', 'id2'] }); expect(unsecuredSavedObjectsClient.bulkDelete).toHaveBeenCalledTimes(2); - expect(taskManager.bulkRemoveIfExist).toHaveBeenCalledTimes(1); - expect(taskManager.bulkRemoveIfExist).toHaveBeenCalledWith(['id1', 'id2']); + expect(taskManager.bulkRemove).toHaveBeenCalledTimes(1); + expect(taskManager.bulkRemove).toHaveBeenCalledWith(['id1', 'id2']); expect(bulkMarkApiKeysForInvalidation).toHaveBeenCalledTimes(1); expect(bulkMarkApiKeysForInvalidation).toHaveBeenCalledWith( { apiKeys: ['MTIzOmFiYw==', 'MzIxOmFiYw=='] }, @@ -321,7 +321,7 @@ describe('bulkDelete', () => { { id: 'id2', type: 'alert', success: true }, ], }); - taskManager.bulkRemoveIfExist.mockImplementation(async () => ({ + taskManager.bulkRemove.mockImplementation(async () => ({ statuses: [ { id: 'id1', @@ -349,7 +349,7 @@ describe('bulkDelete', () => { { id: 'id2', type: 'alert', success: true }, ], }); - taskManager.bulkRemoveIfExist.mockImplementation(() => { + taskManager.bulkRemove.mockImplementation(() => { throw new Error('UPS'); }); @@ -357,7 +357,7 @@ describe('bulkDelete', () => { expect(logger.error).toBeCalledTimes(1); expect(logger.error).toBeCalledWith( - 'Failure to delete schedules for underlying tasks: id1, id2. TaskManager bulkRemoveIfExist failed with Error: UPS' + 'Failure to delete schedules for underlying tasks: id1, id2. TaskManager bulkRemove failed with Error: UPS' ); }); @@ -369,7 +369,7 @@ describe('bulkDelete', () => { { id: 'id2', type: 'alert', success: true }, ], }); - taskManager.bulkRemoveIfExist.mockImplementation(async () => ({ + taskManager.bulkRemove.mockImplementation(async () => ({ statuses: [ { id: 'id1', diff --git a/x-pack/plugins/alerting/server/rules_client/tests/bulk_disable.test.ts b/x-pack/plugins/alerting/server/rules_client/tests/bulk_disable.test.ts index fb5995f8724aa..e69dd1b0ffb44 100644 --- a/x-pack/plugins/alerting/server/rules_client/tests/bulk_disable.test.ts +++ b/x-pack/plugins/alerting/server/rules_client/tests/bulk_disable.test.ts @@ -399,7 +399,7 @@ describe('bulkDisableRules', () => { ], }); - taskManager.bulkRemoveIfExist.mockResolvedValue({ + taskManager.bulkRemove.mockResolvedValue({ statuses: [ { id: 'id1', type: 'alert', success: true }, { id: 'id2', type: 'alert', success: false }, @@ -408,8 +408,8 @@ describe('bulkDisableRules', () => { await rulesClient.bulkDisableRules({ filter: 'fake_filter' }); - expect(taskManager.bulkRemoveIfExist).toHaveBeenCalledTimes(1); - expect(taskManager.bulkRemoveIfExist).toHaveBeenCalledWith(['taskId1', 'taskId2']); + expect(taskManager.bulkRemove).toHaveBeenCalledTimes(1); + expect(taskManager.bulkRemove).toHaveBeenCalledWith(['taskId1', 'taskId2']); expect(logger.debug).toBeCalledTimes(1); expect(logger.debug).toBeCalledWith( @@ -477,7 +477,7 @@ describe('bulkDisableRules', () => { ); }); - test('should not throw an error if taskManager.bulkRemoveIfExist throw an error', async () => { + test('should not throw an error if taskManager.bulkRemove throw an error', async () => { unsecuredSavedObjectsClient.bulkCreate.mockResolvedValue({ saved_objects: [ { @@ -490,15 +490,15 @@ describe('bulkDisableRules', () => { ], }); - taskManager.bulkRemoveIfExist.mockImplementation(() => { - throw new Error('Something happend during bulkRemoveIfExist'); + taskManager.bulkRemove.mockImplementation(() => { + throw new Error('Something happend during bulkRemove'); }); await rulesClient.bulkDisableRules({ filter: 'fake_filter' }); expect(logger.error).toBeCalledTimes(1); expect(logger.error).toHaveBeenCalledWith( - 'Failure to delete schedules for underlying tasks: taskId1. TaskManager bulkRemoveIfExist failed with Error: Something happend during bulkRemoveIfExist' + 'Failure to delete schedules for underlying tasks: taskId1. TaskManager bulkRemove failed with Error: Something happend during bulkRemove' ); }); }); diff --git a/x-pack/plugins/alerting/server/rules_client/tests/lib.ts b/x-pack/plugins/alerting/server/rules_client/tests/lib.ts index 6328708f4bae0..f036e2cb02298 100644 --- a/x-pack/plugins/alerting/server/rules_client/tests/lib.ts +++ b/x-pack/plugins/alerting/server/rules_client/tests/lib.ts @@ -68,7 +68,7 @@ export function getBeforeSetup( ownerId: null, enabled: false, }); - taskManager.bulkRemoveIfExist.mockResolvedValue({ + taskManager.bulkRemove.mockResolvedValue({ statuses: [{ id: 'taskId', type: 'alert', success: true }], }); const actionsClient = actionsClientMock.create(); diff --git a/x-pack/plugins/task_manager/server/lib/bulk_remove_if_exist.test.ts b/x-pack/plugins/task_manager/server/lib/bulk_remove_if_exist.test.ts deleted file mode 100644 index 0b1c08f448398..0000000000000 --- a/x-pack/plugins/task_manager/server/lib/bulk_remove_if_exist.test.ts +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { v4 as uuidv4 } from 'uuid'; -import { SavedObjectsErrorHelpers } from '@kbn/core/server'; -import { bulkRemoveIfExist } from './bulk_remove_if_exist'; -import { taskStoreMock } from '../task_store.mock'; - -describe('removeIfExists', () => { - const ids = [uuidv4(), uuidv4()]; - - test('removes the tasks by its IDs', async () => { - const ts = taskStoreMock.create({}); - - expect(await bulkRemoveIfExist(ts, ids)).toBe(undefined); - expect(ts.bulkRemove).toHaveBeenCalledWith(ids); - }); - - test('handles 404 errors caused by the task not existing', async () => { - const ts = taskStoreMock.create({}); - - ts.bulkRemove.mockRejectedValue( - SavedObjectsErrorHelpers.createGenericNotFoundError('task', ids[0]) - ); - - expect(await bulkRemoveIfExist(ts, ids)).toBe(undefined); - expect(ts.bulkRemove).toHaveBeenCalledWith(ids); - }); - - test('throws if any other error is caused by task removal', async () => { - const ts = taskStoreMock.create({}); - - const error = SavedObjectsErrorHelpers.createInvalidVersionError(uuidv4()); - ts.bulkRemove.mockRejectedValue(error); - - expect(bulkRemoveIfExist(ts, ids)).rejects.toBe(error); - expect(ts.bulkRemove).toHaveBeenCalledWith(ids); - }); -}); diff --git a/x-pack/plugins/task_manager/server/lib/bulk_remove_if_exist.ts b/x-pack/plugins/task_manager/server/lib/bulk_remove_if_exist.ts deleted file mode 100644 index c3c1a61868e60..0000000000000 --- a/x-pack/plugins/task_manager/server/lib/bulk_remove_if_exist.ts +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { SavedObjectsErrorHelpers } from '@kbn/core/server'; -import { TaskStore } from '../task_store'; - -/** - * Removes a task from the store, ignoring a not found error - * Other errors are re-thrown - * - * @param taskStore - * @param taskIds - */ -export async function bulkRemoveIfExist(taskStore: TaskStore, taskIds: string[]) { - try { - return await taskStore.bulkRemove(taskIds); - } catch (err) { - if (!SavedObjectsErrorHelpers.isNotFoundError(err)) { - throw err; - } - } -} diff --git a/x-pack/plugins/task_manager/server/mocks.ts b/x-pack/plugins/task_manager/server/mocks.ts index 551efc9c8c52b..1363b0ab9b1ee 100644 --- a/x-pack/plugins/task_manager/server/mocks.ts +++ b/x-pack/plugins/task_manager/server/mocks.ts @@ -24,12 +24,12 @@ const createStartMock = () => { get: jest.fn(), aggregate: jest.fn(), remove: jest.fn(), + bulkRemove: jest.fn(), schedule: jest.fn(), runSoon: jest.fn(), ephemeralRunNow: jest.fn(), ensureScheduled: jest.fn(), removeIfExists: jest.fn(), - bulkRemoveIfExist: jest.fn(), supportsEphemeralTasks: jest.fn(), bulkUpdateSchedules: jest.fn(), bulkSchedule: jest.fn(), diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index 100254f6dae32..1fb28465f5bce 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -18,12 +18,10 @@ import { ServiceStatusLevels, CoreStatus, } from '@kbn/core/server'; -import type { SavedObjectsBulkDeleteResponse } from '@kbn/core/server'; import { TaskPollingLifecycle } from './polling_lifecycle'; import { TaskManagerConfig } from './config'; import { createInitialMiddleware, addMiddlewareToChain, Middleware } from './lib/middleware'; import { removeIfExists } from './lib/remove_if_exists'; -import { bulkRemoveIfExist } from './lib/bulk_remove_if_exist'; import { setupSavedObjects } from './saved_objects'; import { TaskDefinitionRegistry, TaskTypeDictionary, REMOVED_TYPES } from './task_type_dictionary'; import { AggregationOpts, FetchResult, SearchOpts, TaskStore } from './task_store'; @@ -61,10 +59,8 @@ export type TaskManagerStartContract = Pick< | 'bulkDisable' | 'bulkSchedule' > & - Pick & { + Pick & { removeIfExists: TaskStore['remove']; - } & { - bulkRemoveIfExist: (ids: string[]) => Promise; } & { supportsEphemeralTasks: () => boolean; getRegisteredTypes: () => string[]; @@ -275,7 +271,7 @@ export class TaskManagerPlugin taskStore.aggregate(opts), get: (id: string) => taskStore.get(id), remove: (id: string) => taskStore.remove(id), - bulkRemoveIfExist: (ids: string[]) => bulkRemoveIfExist(taskStore, ids), + bulkRemove: (ids: string[]) => taskStore.bulkRemove(ids), removeIfExists: (id: string) => removeIfExists(taskStore, id), schedule: (...args) => taskScheduling.schedule(...args), bulkSchedule: (...args) => taskScheduling.bulkSchedule(...args), diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index 65790f83c422e..8b2149f8bfd10 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -270,7 +270,18 @@ export class TaskPollingLifecycle { this.createTaskRunnerForTask, // place tasks in the Task Pool async (tasks: TaskRunner[]) => { - const result = await this.pool.run(tasks); + const tasksToRun = []; + const removeTaskPromises = []; + for (const task of tasks) { + if (task.isAdHocTaskAndOutOfAttempts) { + this.logger.debug(`Removing ${task} because the max attempts have been reached.`); + removeTaskPromises.push(task.removeTask()); + } else { + tasksToRun.push(task); + } + } + // Wait for all the promises at once to speed up the polling cycle + const [result] = await Promise.all([this.pool.run(tasksToRun), ...removeTaskPromises]); // Emit the load after fetching tasks, giving us a good metric for evaluating how // busy Task manager tends to be in this Kibana instance this.emitEvent(asTaskManagerStatEvent('load', asOk(this.pool.workerLoad))); diff --git a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts index 28957d7618449..56686ea7d46c1 100644 --- a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts +++ b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts @@ -138,8 +138,7 @@ if (doc['task.runAt'].size()!=0) { script: { source: ` if (params.claimableTaskTypes.contains(ctx._source.task.taskType)) { - if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType]) { - if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) { + if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) { ctx._source.task.scheduledAt=ctx._source.task.retryAt; } else { ctx._source.task.scheduledAt=ctx._source.task.runAt; @@ -147,9 +146,6 @@ if (doc['task.runAt'].size()!=0) { ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates) .map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`) .join(' ')} - } else { - ctx._source.task.status = "failed"; - } } else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) { ctx._source.task.status = "unrecognized"; } else { diff --git a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts index 0b5036beb612d..d2137f6c82b08 100644 --- a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts +++ b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts @@ -146,11 +146,7 @@ export const updateFieldsAndMarkAsFailed = ({ return { source: ` if (params.claimableTaskTypes.contains(ctx._source.task.taskType)) { - if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType]) { - ${setScheduledAtAndMarkAsClaimed} - } else { - ctx._source.task.status = "failed"; - } + ${setScheduledAtAndMarkAsClaimed} } else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) { ctx._source.task.status = "unrecognized"; } else { diff --git a/x-pack/plugins/task_manager/server/task.ts b/x-pack/plugins/task_manager/server/task.ts index 1b1def5cc16df..a9f49b661a700 100644 --- a/x-pack/plugins/task_manager/server/task.ts +++ b/x-pack/plugins/task_manager/server/task.ts @@ -91,6 +91,7 @@ export type CancelFunction = () => Promise; export interface CancellableTask { run: RunFunction; cancel?: CancelFunction; + cleanup?: () => Promise; } export type TaskRunCreatorFunction = (context: RunContext) => CancellableTask; diff --git a/x-pack/plugins/task_manager/server/task_pool.test.ts b/x-pack/plugins/task_manager/server/task_pool.test.ts index baef1587cd242..10e440184ab2e 100644 --- a/x-pack/plugins/task_manager/server/task_pool.test.ts +++ b/x-pack/plugins/task_manager/server/task_pool.test.ts @@ -409,6 +409,8 @@ describe('TaskPool', () => { run: mockRun(), stage: TaskRunningStage.PENDING, toString: () => `TaskType "shooooo"`, + isAdHocTaskAndOutOfAttempts: false, + removeTask: jest.fn(), get expiration() { return new Date(); }, diff --git a/x-pack/plugins/task_manager/server/task_running/ephemeral_task_runner.ts b/x-pack/plugins/task_manager/server/task_running/ephemeral_task_runner.ts index baf4e253c67a9..a41297c90f67b 100644 --- a/x-pack/plugins/task_manager/server/task_running/ephemeral_task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_running/ephemeral_task_runner.ts @@ -181,6 +181,14 @@ export class EphemeralTaskManagerRunner implements TaskRunner { return this.expiration < new Date(); } + /** + * Returns true whenever the task is ad hoc and has ran out of attempts. When true before + * running a task, the task should be deleted instead of ran. + */ + public get isAdHocTaskAndOutOfAttempts() { + return false; + } + public get isEphemeral() { return true; } @@ -252,6 +260,11 @@ export class EphemeralTaskManagerRunner implements TaskRunner { } } + /** + * Used by the non-ephemeral task runner + */ + public async removeTask(): Promise {} + /** * Noop for Ephemeral tasks * diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts index 2598d8084a783..5812384c66c8e 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts @@ -896,9 +896,8 @@ describe('TaskManagerRunner', () => { await runner.run(); - const instance = store.update.mock.calls[0][0]; - expect(instance.status).toBe('failed'); - expect(instance.enabled).not.toBeDefined(); + expect(store.remove).toHaveBeenCalled(); + expect(store.update).not.toHaveBeenCalled(); expect(onTaskEvent).toHaveBeenCalledWith( withAnyTiming( @@ -1101,11 +1100,8 @@ describe('TaskManagerRunner', () => { await runner.run(); - expect(store.update).toHaveBeenCalledTimes(1); - const instance = store.update.mock.calls[0][0]; - - expect(instance.status).toBe('failed'); - expect(instance.enabled).not.toBeDefined(); + expect(store.remove).toHaveBeenCalled(); + expect(store.update).not.toHaveBeenCalled(); }); test('bypasses getRetry function (returning false) on error of a recurring task', async () => { @@ -1170,13 +1166,8 @@ describe('TaskManagerRunner', () => { await runner.run(); - expect(store.update).toHaveBeenCalledTimes(1); - const instance = store.update.mock.calls[0][0]; - expect(instance.attempts).toEqual(3); - expect(instance.status).toEqual('failed'); - expect(instance.retryAt!).toBeNull(); - expect(instance.runAt.getTime()).toBeLessThanOrEqual(Date.now()); - expect(instance.enabled).not.toBeDefined(); + expect(store.remove).toHaveBeenCalled(); + expect(store.update).not.toHaveBeenCalled(); }); test(`Doesn't fail recurring tasks when maxAttempts reached`, async () => { @@ -1403,9 +1394,8 @@ describe('TaskManagerRunner', () => { await runner.run(); - const instance = store.update.mock.calls[0][0]; - expect(instance.status).toBe('failed'); - expect(instance.enabled).not.toBeDefined(); + expect(store.remove).toHaveBeenCalled(); + expect(store.update).not.toHaveBeenCalled(); expect(onTaskEvent).toHaveBeenCalledWith( withAnyTiming( @@ -1509,6 +1499,113 @@ describe('TaskManagerRunner', () => { }); }); + describe('isAdHocTaskAndOutOfAttempts', () => { + it(`should return false if the task doesn't have a schedule`, async () => { + const { runner } = await pendingStageSetup({ + instance: { + id: 'foo', + taskType: 'testbar', + }, + }); + + expect(runner.isAdHocTaskAndOutOfAttempts).toEqual(false); + }); + + it(`should return false if the recurring task still has attempts remaining`, async () => { + const { runner } = await pendingStageSetup({ + instance: { + id: 'foo', + taskType: 'testbar', + attempts: 4, + }, + }); + + expect(runner.isAdHocTaskAndOutOfAttempts).toEqual(false); + }); + + it(`should return true if the recurring task is out of attempts`, async () => { + const { runner } = await pendingStageSetup({ + instance: { + id: 'foo', + taskType: 'testbar', + attempts: 5, + }, + }); + + expect(runner.isAdHocTaskAndOutOfAttempts).toEqual(true); + }); + }); + + describe('removeTask()', () => { + it(`should remove the task saved-object`, async () => { + const { runner, store } = await readyToRunStageSetup({ + instance: { + id: 'foo', + taskType: 'testbar', + }, + }); + + await runner.run(); + await runner.removeTask(); + expect(store.remove).toHaveBeenCalledWith('foo'); + }); + + it(`should call the task cleanup function if defined`, async () => { + const cleanupFn = jest.fn(); + const { runner } = await readyToRunStageSetup({ + instance: { + id: 'foo', + taskType: 'testbar2', + }, + definitions: { + testbar2: { + title: 'Bar!', + createTaskRunner: () => ({ + async run() { + return { state: {} }; + }, + cancel: jest.fn(), + cleanup: cleanupFn, + }), + }, + }, + }); + + // Remove task is called after run() with the this.task object defined + await runner.run(); + expect(cleanupFn).toHaveBeenCalledTimes(1); + }); + + it(`doesn't throw an error if the cleanup function throws an error`, async () => { + const cleanupFn = jest.fn().mockRejectedValue(new Error('Fail')); + const { runner, logger } = await readyToRunStageSetup({ + instance: { + id: 'foo', + taskType: 'testbar2', + }, + definitions: { + testbar2: { + title: 'Bar!', + createTaskRunner: () => ({ + async run() { + return { state: {} }; + }, + cancel: jest.fn(), + cleanup: cleanupFn, + }), + }, + }, + }); + + // Remove task is called after run() with the this.task object defined + await runner.run(); + expect(cleanupFn).toHaveBeenCalledTimes(1); + expect(logger.error).toHaveBeenCalledWith( + `Error encountered when running onTaskRemoved() hook for testbar2 "foo": Fail` + ); + }); + }); + interface TestOpts { instance?: Partial; definitions?: TaskDefinitionRegistry; diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.ts index 36a185b5658f1..858d89f46d70a 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.ts @@ -73,6 +73,8 @@ export interface TaskRunner { isEphemeral?: boolean; toString: () => string; isSameTask: (executionId: string) => boolean; + isAdHocTaskAndOutOfAttempts: boolean; + removeTask: () => Promise; } export enum TaskRunningStage { @@ -258,6 +260,14 @@ export class TaskManagerRunner implements TaskRunner { return this.expiration < new Date(); } + /** + * Returns true whenever the task is ad hoc and has ran out of attempts. When true before + * running a task, the task should be deleted instead of ran. + */ + public get isAdHocTaskAndOutOfAttempts() { + return !this.instance.task.schedule && this.instance.task.attempts >= this.getMaxAttempts(); + } + /** * Returns a log-friendly representation of this task. */ @@ -330,6 +340,19 @@ export class TaskManagerRunner implements TaskRunner { } } + public async removeTask(): Promise { + await this.bufferedTaskStore.remove(this.id); + if (this.task?.cleanup) { + try { + await this.task.cleanup(); + } catch (e) { + this.logger.error( + `Error encountered when running onTaskRemoved() hook for ${this}: ${e.message}` + ); + } + } + } + /** * Attempts to claim exclusive rights to run the task. If the attempt fails * with a 409 (http conflict), we assume another Kibana instance beat us to the punch. @@ -474,8 +497,7 @@ export class TaskManagerRunner implements TaskRunner { return false; } - const maxAttempts = this.definition.maxAttempts || this.defaultMaxAttempts; - return this.instance.task.attempts < maxAttempts; + return this.instance.task.attempts < this.getMaxAttempts(); } private rescheduleFailedRun = ( @@ -536,7 +558,17 @@ export class TaskManagerRunner implements TaskRunner { unwrap )(result); - if (!this.isExpired) { + if (this.isExpired) { + this.usageCounter?.incrementCounter({ + counterName: `taskManagerUpdateSkippedDueToTaskExpiration`, + counterType: 'taskManagerTaskRunner', + incrementBy: 1, + }); + } else if (fieldUpdates.status === TaskStatus.Failed) { + // Delete the SO instead so it doesn't remain in the index forever + this.instance = asRan(this.instance.task); + await this.removeTask(); + } else { this.instance = asRan( await this.bufferedTaskStore.update( defaults( @@ -551,12 +583,6 @@ export class TaskManagerRunner implements TaskRunner { ) ) ); - } else { - this.usageCounter?.incrementCounter({ - counterName: `taskManagerUpdateSkippedDueToTaskExpiration`, - counterType: 'taskManagerTaskRunner', - incrementBy: 1, - }); } return fieldUpdates.status === TaskStatus.Failed @@ -569,8 +595,8 @@ export class TaskManagerRunner implements TaskRunner { private async processResultWhenDone(): Promise { // not a recurring task: clean up by removing the task instance from store try { - await this.bufferedTaskStore.remove(this.id); this.instance = asRan(this.instance.task); + await this.removeTask(); } catch (err) { if (err.statusCode === 404) { this.logger.warn(`Task cleanup of ${this} failed in processing. Was remove called twice?`); @@ -660,6 +686,12 @@ export class TaskManagerRunner implements TaskRunner { } return result; } + + private getMaxAttempts() { + return this.definition.maxAttempts !== undefined + ? this.definition.maxAttempts + : this.defaultMaxAttempts; + } } function sanitizeInstance(instance: ConcreteTaskInstance): ConcreteTaskInstance { diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/run_soon.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/run_soon.ts index ceaba73b2a2ac..beff33120922d 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/run_soon.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/run_soon.ts @@ -22,7 +22,11 @@ export default function createRunSoonTests({ getService }: FtrProviderContext) { const objectRemover = new ObjectRemover(supertest); before(async () => { - await esArchiver.load('x-pack/test/functional/es_archives/rules_scheduled_task_id'); + // Not 100% sure why, seems the rules need to be loaded separately to avoid the task + // failing to load the rule during execution and deleting itself. Otherwise + // we have flakiness + await esArchiver.load('x-pack/test/functional/es_archives/rules_scheduled_task_id/rules'); + await esArchiver.load('x-pack/test/functional/es_archives/rules_scheduled_task_id/tasks'); }); afterEach(async () => { @@ -30,7 +34,8 @@ export default function createRunSoonTests({ getService }: FtrProviderContext) { }); after(async () => { - await esArchiver.unload('x-pack/test/functional/es_archives/rules_scheduled_task_id'); + await esArchiver.unload('x-pack/test/functional/es_archives/rules_scheduled_task_id/tasks'); + await esArchiver.unload('x-pack/test/functional/es_archives/rules_scheduled_task_id/rules'); }); it('should successfully run rule where scheduled task id is different than rule id', async () => { diff --git a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/scheduled_task_id.ts b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/scheduled_task_id.ts index b7ad0e47947e1..a22246224d5c2 100644 --- a/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/scheduled_task_id.ts +++ b/x-pack/test/alerting_api_integration/spaces_only/tests/alerting/group4/scheduled_task_id.ts @@ -36,11 +36,16 @@ export default function createScheduledTaskIdTests({ getService }: FtrProviderCo } before(async () => { - await esArchiver.load('x-pack/test/functional/es_archives/rules_scheduled_task_id'); + // Not 100% sure why, seems the rules need to be loaded separately to avoid the task + // failing to load the rule during execution and deleting itself. Otherwise + // we have flakiness + await esArchiver.load('x-pack/test/functional/es_archives/rules_scheduled_task_id/rules'); + await esArchiver.load('x-pack/test/functional/es_archives/rules_scheduled_task_id/tasks'); }); after(async () => { - await esArchiver.unload('x-pack/test/functional/es_archives/rules_scheduled_task_id'); + await esArchiver.unload('x-pack/test/functional/es_archives/rules_scheduled_task_id/tasks'); + await esArchiver.unload('x-pack/test/functional/es_archives/rules_scheduled_task_id/rules'); }); it('cannot create rule with same ID as a scheduled task ID used by another rule', async () => { diff --git a/x-pack/test/functional/es_archives/rules_scheduled_task_id/data.json b/x-pack/test/functional/es_archives/rules_scheduled_task_id/rules/data.json similarity index 58% rename from x-pack/test/functional/es_archives/rules_scheduled_task_id/data.json rename to x-pack/test/functional/es_archives/rules_scheduled_task_id/rules/data.json index 25032f09eeb51..6cbc4d402f87d 100644 --- a/x-pack/test/functional/es_archives/rules_scheduled_task_id/data.json +++ b/x-pack/test/functional/es_archives/rules_scheduled_task_id/rules/data.json @@ -40,39 +40,6 @@ } } -{ - "type": "doc", - "value": { - "id": "task:329798f0-b0b0-11ea-9510-fdf248d5f2a4", - "index": ".kibana_task_manager_1", - "source": { - "migrationVersion": { - "task": "7.16.0" - }, - "task": { - "attempts": 0, - "ownerId": null, - "params": "{\"alertId\":\"74f3e6d7-b7bb-477d-ac28-92ee22728e6e\",\"spaceId\":\"default\"}", - "retryAt": null, - "runAt": "2021-11-05T16:21:52.148Z", - "schedule": { - "interval": "1m" - }, - "scheduledAt": "2021-11-05T15:28:42.055Z", - "scope": [ - "alerting" - ], - "startedAt": null, - "status": "idle", - "taskType": "alerting:example.always-firing" - }, - "references": [], - "type": "task", - "updated_at": "2021-11-05T16:21:37.629Z" - } - } -} - { "type": "doc", "value": { @@ -114,36 +81,3 @@ } } } - -{ - "type": "doc", - "value": { - "id": "task:46be60d4-ae63-48ed-ab6f-f4d9b4defacf", - "index": ".kibana_task_manager_1", - "source": { - "migrationVersion": { - "task": "7.16.0" - }, - "task": { - "attempts": 0, - "ownerId": null, - "params": "{\"alertId\":\"46be60d4-ae63-48ed-ab6f-f4d9b4defacf\",\"spaceId\":\"default\"}", - "retryAt": null, - "runAt": "2021-11-05T16:21:52.148Z", - "schedule": { - "interval": "1m" - }, - "scheduledAt": "2021-11-05T15:28:42.055Z", - "scope": [ - "alerting" - ], - "startedAt": null, - "status": "idle", - "taskType": "sampleTaskRemovedType" - }, - "references": [], - "type": "task", - "updated_at": "2021-11-05T16:21:37.629Z" - } - } -} diff --git a/x-pack/test/functional/es_archives/rules_scheduled_task_id/mappings.json b/x-pack/test/functional/es_archives/rules_scheduled_task_id/rules/mappings.json similarity index 78% rename from x-pack/test/functional/es_archives/rules_scheduled_task_id/mappings.json rename to x-pack/test/functional/es_archives/rules_scheduled_task_id/rules/mappings.json index 45adfd491a09b..babc00babc838 100644 --- a/x-pack/test/functional/es_archives/rules_scheduled_task_id/mappings.json +++ b/x-pack/test/functional/es_archives/rules_scheduled_task_id/rules/mappings.json @@ -342,119 +342,3 @@ } } } - -{ - "type": "index", - "value": { - "aliases": { - ".kibana_task_manager": { - } - }, - "index": ".kibana_task_manager_1", - "mappings": { - "_meta": { - "migrationMappingPropertyHashes": { - "migrationVersion": "4a1746014a75ade3a714e1db5763276f", - "namespace": "2f4316de49999235636386fe51dc06c1", - "namespaces": "2f4316de49999235636386fe51dc06c1", - "originId": "2f4316de49999235636386fe51dc06c1", - "references": "7997cf5a56cc02bdc9c93361bde732b0", - "task": "235412e52d09e7165fac8a67a43ad6b4", - "type": "2f4316de49999235636386fe51dc06c1", - "updated_at": "00da57df13e94e9d98437d13ace4bfe0" - } - }, - "dynamic": "strict", - "properties": { - "migrationVersion": { - "dynamic": "true", - "properties": { - "task": { - "fields": { - "keyword": { - "ignore_above": 256, - "type": "keyword" - } - }, - "type": "text" - } - } - }, - "references": { - "properties": { - "id": { - "type": "keyword" - }, - "name": { - "type": "keyword" - }, - "type": { - "type": "keyword" - } - }, - "type": "nested" - }, - "task": { - "properties": { - "attempts": { - "type": "integer" - }, - "ownerId": { - "type": "keyword" - }, - "params": { - "type": "text" - }, - "retryAt": { - "type": "date" - }, - "runAt": { - "type": "date" - }, - "schedule": { - "properties": { - "interval": { - "type": "keyword" - } - } - }, - "scheduledAt": { - "type": "date" - }, - "scope": { - "type": "keyword" - }, - "startedAt": { - "type": "date" - }, - "state": { - "type": "text" - }, - "status": { - "type": "keyword" - }, - "taskType": { - "type": "keyword" - }, - "user": { - "type": "keyword" - } - } - }, - "type": { - "type": "keyword" - }, - "updated_at": { - "type": "date" - } - } - }, - "settings": { - "index": { - "auto_expand_replicas": "0-1", - "number_of_replicas": "0", - "number_of_shards": "1" - } - } - } -} diff --git a/x-pack/test/functional/es_archives/rules_scheduled_task_id/tasks/data.json b/x-pack/test/functional/es_archives/rules_scheduled_task_id/tasks/data.json new file mode 100644 index 0000000000000..c71e4d14cf72d --- /dev/null +++ b/x-pack/test/functional/es_archives/rules_scheduled_task_id/tasks/data.json @@ -0,0 +1,65 @@ +{ + "type": "doc", + "value": { + "id": "task:329798f0-b0b0-11ea-9510-fdf248d5f2a4", + "index": ".kibana_task_manager_1", + "source": { + "migrationVersion": { + "task": "7.16.0" + }, + "task": { + "attempts": 0, + "ownerId": null, + "params": "{\"alertId\":\"74f3e6d7-b7bb-477d-ac28-92ee22728e6e\",\"spaceId\":\"default\"}", + "retryAt": null, + "runAt": "2021-11-05T16:21:52.148Z", + "schedule": { + "interval": "1m" + }, + "scheduledAt": "2021-11-05T15:28:42.055Z", + "scope": [ + "alerting" + ], + "startedAt": null, + "status": "idle", + "taskType": "alerting:example.always-firing" + }, + "references": [], + "type": "task", + "updated_at": "2021-11-05T16:21:37.629Z" + } + } +} + +{ + "type": "doc", + "value": { + "id": "task:46be60d4-ae63-48ed-ab6f-f4d9b4defacf", + "index": ".kibana_task_manager_1", + "source": { + "migrationVersion": { + "task": "7.16.0" + }, + "task": { + "attempts": 0, + "ownerId": null, + "params": "{\"alertId\":\"46be60d4-ae63-48ed-ab6f-f4d9b4defacf\",\"spaceId\":\"default\"}", + "retryAt": null, + "runAt": "2021-11-05T16:21:52.148Z", + "schedule": { + "interval": "1m" + }, + "scheduledAt": "2021-11-05T15:28:42.055Z", + "scope": [ + "alerting" + ], + "startedAt": null, + "status": "idle", + "taskType": "sampleTaskRemovedType" + }, + "references": [], + "type": "task", + "updated_at": "2021-11-05T16:21:37.629Z" + } + } +} diff --git a/x-pack/test/functional/es_archives/rules_scheduled_task_id/tasks/mappings.json b/x-pack/test/functional/es_archives/rules_scheduled_task_id/tasks/mappings.json new file mode 100644 index 0000000000000..265d017bd72dc --- /dev/null +++ b/x-pack/test/functional/es_archives/rules_scheduled_task_id/tasks/mappings.json @@ -0,0 +1,115 @@ +{ + "type": "index", + "value": { + "aliases": { + ".kibana_task_manager": { + } + }, + "index": ".kibana_task_manager_1", + "mappings": { + "_meta": { + "migrationMappingPropertyHashes": { + "migrationVersion": "4a1746014a75ade3a714e1db5763276f", + "namespace": "2f4316de49999235636386fe51dc06c1", + "namespaces": "2f4316de49999235636386fe51dc06c1", + "originId": "2f4316de49999235636386fe51dc06c1", + "references": "7997cf5a56cc02bdc9c93361bde732b0", + "task": "235412e52d09e7165fac8a67a43ad6b4", + "type": "2f4316de49999235636386fe51dc06c1", + "updated_at": "00da57df13e94e9d98437d13ace4bfe0" + } + }, + "dynamic": "strict", + "properties": { + "migrationVersion": { + "dynamic": "true", + "properties": { + "task": { + "fields": { + "keyword": { + "ignore_above": 256, + "type": "keyword" + } + }, + "type": "text" + } + } + }, + "references": { + "properties": { + "id": { + "type": "keyword" + }, + "name": { + "type": "keyword" + }, + "type": { + "type": "keyword" + } + }, + "type": "nested" + }, + "task": { + "properties": { + "attempts": { + "type": "integer" + }, + "ownerId": { + "type": "keyword" + }, + "params": { + "type": "text" + }, + "retryAt": { + "type": "date" + }, + "runAt": { + "type": "date" + }, + "schedule": { + "properties": { + "interval": { + "type": "keyword" + } + } + }, + "scheduledAt": { + "type": "date" + }, + "scope": { + "type": "keyword" + }, + "startedAt": { + "type": "date" + }, + "state": { + "type": "text" + }, + "status": { + "type": "keyword" + }, + "taskType": { + "type": "keyword" + }, + "user": { + "type": "keyword" + } + } + }, + "type": { + "type": "keyword" + }, + "updated_at": { + "type": "date" + } + } + }, + "settings": { + "index": { + "auto_expand_replicas": "0-1", + "number_of_replicas": "0", + "number_of_shards": "1" + } + } + } +} diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts index 6cc68282ac1da..987a498d420d2 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts @@ -10,7 +10,6 @@ import { random } from 'lodash'; import expect from '@kbn/expect'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import TaskManagerMapping from '@kbn/task-manager-plugin/server/saved_objects/mappings.json'; -import { DEFAULT_POLL_INTERVAL } from '@kbn/task-manager-plugin/server/config'; import { ConcreteTaskInstance, BulkUpdateTaskResult } from '@kbn/task-manager-plugin/server'; import { FtrProviderContext } from '../../ftr_provider_context'; @@ -18,8 +17,6 @@ const { task: { properties: taskManagerIndexMapping }, } = TaskManagerMapping; -const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); - export interface RawDoc { _id: string; _source: any; @@ -563,12 +560,12 @@ export default function ({ getService }: FtrProviderContext) { params: { throwOnMarkAsRunning: true }, }); - await delay(DEFAULT_POLL_INTERVAL * 3); + expect(originalTask.attempts).to.eql(0); + // Wait for task manager to attempt running the task a second time await retry.try(async () => { const task = await currentTask(originalTask.id); - expect(task.attempts).to.eql(3); - expect(task.status).to.eql('failed'); + expect(task.attempts).to.eql(2); }); }); @@ -769,17 +766,15 @@ export default function ({ getService }: FtrProviderContext) { }); }); - it('should mark non-recurring task as failed if task is still running but maxAttempts has been reached', async () => { - const task = await scheduleTask({ + it('should delete the task if it is still running but maxAttempts has been reached', async () => { + await scheduleTask({ taskType: 'sampleOneTimeTaskThrowingError', params: {}, }); await retry.try(async () => { - const [scheduledTask] = (await currentTasks()).docs; - expect(scheduledTask.id).to.eql(task.id); - expect(scheduledTask.status).to.eql('failed'); - expect(scheduledTask.attempts).to.eql(3); + const results = (await currentTasks()).docs; + expect(results.length).to.eql(0); }); }); @@ -894,38 +889,6 @@ export default function ({ getService }: FtrProviderContext) { }); }); - it('should allow a failed task to be rerun using runSoon', async () => { - const taskThatFailsBeforeRunNow = await scheduleTask({ - taskType: 'singleAttemptSampleTask', - params: { - waitForParams: true, - }, - }); - // tell the task to fail on its next run - await provideParamsToTasksWaitingForParams(taskThatFailsBeforeRunNow.id, { - failWith: 'error on first run', - }); - - // wait for task to fail - await retry.try(async () => { - const tasks = (await currentTasks()).docs; - expect(getTaskById(tasks, taskThatFailsBeforeRunNow.id).status).to.eql('failed'); - }); - - // run the task again - await runTaskSoon({ - id: taskThatFailsBeforeRunNow.id, - }); - - // runTaskSoon should successfully update the runAt property of the task - await retry.try(async () => { - const tasks = (await currentTasks()).docs; - expect( - Date.parse(getTaskById(tasks, taskThatFailsBeforeRunNow.id).runAt) - ).to.be.greaterThan(Date.parse(taskThatFailsBeforeRunNow.runAt)); - }); - }); - // TODO: Add this back in with https://github.com/elastic/kibana/issues/106139 // it('should return the resulting task state when asked to run an ephemeral task now', async () => { // const ephemeralTask = await runEphemeralTaskNow({