diff --git a/src/platform/packages/private/kbn-reporting/common/errors.ts b/src/platform/packages/private/kbn-reporting/common/errors.ts index 45a299115bf1b..3700602bd6f27 100644 --- a/src/platform/packages/private/kbn-reporting/common/errors.ts +++ b/src/platform/packages/private/kbn-reporting/common/errors.ts @@ -72,6 +72,13 @@ export class AuthenticationExpiredError extends ReportingError { } } +export class MissingAuthenticationError extends ReportingError { + static code = 'missing_authentication_header_error' as const; + public get code(): string { + return MissingAuthenticationError.code; + } +} + export class QueueTimeoutError extends ReportingError { static code = 'queue_timeout_error' as const; public get code(): string { diff --git a/src/platform/packages/private/kbn-reporting/common/types.ts b/src/platform/packages/private/kbn-reporting/common/types.ts index 0b5f6636ccc41..d8e4dc25d2ea7 100644 --- a/src/platform/packages/private/kbn-reporting/common/types.ts +++ b/src/platform/packages/private/kbn-reporting/common/types.ts @@ -66,6 +66,7 @@ export interface BaseParams { objectType: string; title: string; version: string; // to handle any state migrations + forceNow?: string; layout?: LayoutParams; // png & pdf only pagingStrategy?: CsvPagingStrategy; // csv only } @@ -152,6 +153,7 @@ export interface ReportSource { created_at: string; // timestamp in UTC '@timestamp'?: string; // creation timestamp, only used for data streams compatibility status: JOB_STATUS; + scheduled_report_id?: string; /* * `output` is only populated if the report job is completed or failed. diff --git a/x-pack/platform/plugins/private/reporting/server/core.ts b/x-pack/platform/plugins/private/reporting/server/core.ts index e5cf0c8a079e5..6b0de7f04b65d 100644 --- a/x-pack/platform/plugins/private/reporting/server/core.ts +++ b/x-pack/platform/plugins/private/reporting/server/core.ts @@ -54,7 +54,12 @@ import type { ReportingSetup } from '.'; import { createConfig } from './config'; import { reportingEventLoggerFactory } from './lib/event_logger/logger'; import type { IReport, ReportingStore } from './lib/store'; -import { ExecuteReportTask, ReportTaskParams } from './lib/tasks'; +import { + RunSingleReportTask, + ReportTaskParams, + RunScheduledReportTask, + ScheduledReportTaskParamsWithoutSpaceId, +} from './lib/tasks'; import type { ReportingPluginRouter } from './types'; import { EventTracker } from './usage'; import { SCHEDULED_REPORT_SAVED_OBJECT_TYPE } from './saved_objects'; @@ -100,7 +105,8 @@ export class ReportingCore { private pluginStartDeps?: ReportingInternalStart; private readonly pluginSetup$ = new Rx.ReplaySubject(); // observe async background setupDeps each are done private readonly pluginStart$ = new Rx.ReplaySubject(); // observe async background startDeps - private executeTask: ExecuteReportTask; + private runSingleReportTask: RunSingleReportTask; + private runScheduledReportTask: RunScheduledReportTask; private config: ReportingConfigType; private executing: Set; private exportTypesRegistry = new ExportTypesRegistry(); @@ -121,7 +127,16 @@ export class ReportingCore { this.getExportTypes().forEach((et) => { this.exportTypesRegistry.register(et); }); - this.executeTask = new ExecuteReportTask(this, config, this.logger); + this.runSingleReportTask = new RunSingleReportTask({ + reporting: this, + config, + logger: this.logger, + }); + this.runScheduledReportTask = new RunScheduledReportTask({ + reporting: this, + config, + logger: this.logger, + }); this.getContract = () => ({ registerExportTypes: (id) => id, @@ -146,9 +161,10 @@ export class ReportingCore { et.setup(setupDeps); }); - const { executeTask } = this; + const { runSingleReportTask, runScheduledReportTask } = this; setupDeps.taskManager.registerTaskDefinitions({ - [executeTask.TYPE]: executeTask.getTaskDefinition(), + [runSingleReportTask.TYPE]: runSingleReportTask.getTaskDefinition(), + [runScheduledReportTask.TYPE]: runScheduledReportTask.getTaskDefinition(), }); } @@ -164,9 +180,12 @@ export class ReportingCore { }); const { taskManager } = startDeps; - const { executeTask } = this; + const { runSingleReportTask, runScheduledReportTask } = this; // enable this instance to generate reports - await Promise.all([executeTask.init(taskManager)]); + await Promise.all([ + runSingleReportTask.init(taskManager), + runScheduledReportTask.init(taskManager), + ]); } public pluginStop() { @@ -326,7 +345,14 @@ export class ReportingCore { } public async scheduleTask(request: KibanaRequest, report: ReportTaskParams) { - return await this.executeTask.scheduleTask(request, report); + return await this.runSingleReportTask.scheduleTask(request, report); + } + + public async scheduleRecurringTask( + request: KibanaRequest, + report: ScheduledReportTaskParamsWithoutSpaceId + ) { + return await this.runScheduledReportTask.scheduleTask(request, report); } public async getStore() { @@ -385,13 +411,17 @@ export class ReportingCore { return dataViews; } - public async getSoClient(request: KibanaRequest) { + public async getScopedSoClient(request: KibanaRequest) { const { savedObjects } = await this.getPluginStartDeps(); - const savedObjectsClient = savedObjects.getScopedClient(request, { + return savedObjects.getScopedClient(request, { excludedExtensions: [SECURITY_EXTENSION_ID], includedHiddenTypes: [SCHEDULED_REPORT_SAVED_OBJECT_TYPE], }); - return savedObjectsClient; + } + + public async getInternalSoClient() { + const { savedObjects } = await this.getPluginStartDeps(); + return savedObjects.createInternalRepository([SCHEDULED_REPORT_SAVED_OBJECT_TYPE]); } public async getDataService() { diff --git a/x-pack/platform/plugins/private/reporting/server/lib/store/index.ts b/x-pack/platform/plugins/private/reporting/server/lib/store/index.ts index 168f958600815..148ad6238ce84 100644 --- a/x-pack/platform/plugins/private/reporting/server/lib/store/index.ts +++ b/x-pack/platform/plugins/private/reporting/server/lib/store/index.ts @@ -7,6 +7,7 @@ export { Report } from './report'; export { SavedReport } from './saved_report'; +export { ScheduledReport } from './scheduled_report'; export { ReportingStore } from './store'; export { IlmPolicyManager } from './ilm_policy_manager'; diff --git a/x-pack/platform/plugins/private/reporting/server/lib/store/report.test.ts b/x-pack/platform/plugins/private/reporting/server/lib/store/report.test.ts index ca1294f663da8..c4b5d984a1164 100644 --- a/x-pack/platform/plugins/private/reporting/server/lib/store/report.test.ts +++ b/x-pack/platform/plugins/private/reporting/server/lib/store/report.test.ts @@ -60,6 +60,61 @@ describe('Class Report', () => { expect(report._id).toBeDefined(); }); + it('constructs Report instance when scheduled_task_id is defined', () => { + const report = new Report({ + _index: '.reporting-test-index-12345', + jobtype: 'test-report', + created_by: 'created_by_test_string', + max_attempts: 50, + payload: { + headers: 'payload_test_field', + objectType: 'testOt', + title: 'cool report', + version: '7.14.0', + browserTimezone: 'UTC', + }, + meta: { objectType: 'test' }, + timeout: 30000, + scheduled_report_id: 'foobar', + }); + + expect(report.toReportSource()).toMatchObject({ + attempts: 0, + completed_at: undefined, + created_by: 'created_by_test_string', + jobtype: 'test-report', + max_attempts: 50, + meta: { objectType: 'test' }, + payload: { headers: 'payload_test_field', objectType: 'testOt' }, + started_at: undefined, + status: 'pending', + timeout: 30000, + scheduled_report_id: 'foobar', + }); + expect(report.toReportTaskJSON()).toMatchObject({ + attempts: 0, + created_by: 'created_by_test_string', + index: '.reporting-test-index-12345', + jobtype: 'test-report', + meta: { objectType: 'test' }, + payload: { headers: 'payload_test_field', objectType: 'testOt' }, + }); + expect(report.toApiJSON()).toMatchObject({ + attempts: 0, + created_by: 'created_by_test_string', + index: '.reporting-test-index-12345', + jobtype: 'test-report', + max_attempts: 50, + payload: { objectType: 'testOt' }, + meta: { objectType: 'test' }, + status: 'pending', + timeout: 30000, + scheduled_report_id: 'foobar', + }); + + expect(report._id).toBeDefined(); + }); + it('updateWithEsDoc method syncs fields to sync ES metadata', () => { const report = new Report({ _index: '.reporting-test-index-12345', diff --git a/x-pack/platform/plugins/private/reporting/server/lib/store/report.ts b/x-pack/platform/plugins/private/reporting/server/lib/store/report.ts index 6eb0960aedd93..d34ce0e58e793 100644 --- a/x-pack/platform/plugins/private/reporting/server/lib/store/report.ts +++ b/x-pack/platform/plugins/private/reporting/server/lib/store/report.ts @@ -40,6 +40,7 @@ export class Report implements Partial { public readonly status: ReportSource['status']; public readonly attempts: ReportSource['attempts']; + public readonly scheduled_report_id: ReportSource['scheduled_report_id']; // fields with undefined values exist in report jobs that have not been claimed public readonly kibana_name: ReportSource['kibana_name']; @@ -97,6 +98,7 @@ export class Report implements Partial { this.status = opts.status || JOB_STATUS.PENDING; this.output = opts.output || null; this.error = opts.error; + this.scheduled_report_id = opts.scheduled_report_id; this.queue_time_ms = fields?.queue_time_ms; this.execution_time_ms = fields?.execution_time_ms; @@ -139,6 +141,7 @@ export class Report implements Partial { process_expiration: this.process_expiration, output: this.output || null, metrics: this.metrics, + scheduled_report_id: this.scheduled_report_id, }; } @@ -187,6 +190,7 @@ export class Report implements Partial { payload: omit(this.payload, 'headers'), output: omit(this.output, 'content'), metrics: this.metrics, + scheduled_report_id: this.scheduled_report_id, }; } } diff --git a/x-pack/platform/plugins/private/reporting/server/lib/store/scheduled_report.test.ts b/x-pack/platform/plugins/private/reporting/server/lib/store/scheduled_report.test.ts new file mode 100644 index 0000000000000..c47c3f349c3c3 --- /dev/null +++ b/x-pack/platform/plugins/private/reporting/server/lib/store/scheduled_report.test.ts @@ -0,0 +1,178 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Frequency } from '@kbn/task-manager-plugin/server'; +import { ScheduledReport } from '.'; + +const payload = { + headers: '', + title: 'Test Report', + browserTimezone: '', + objectType: 'test', + version: '8.0.0', +}; + +test('ScheduledReport should return correctly formatted outputs', () => { + const scheduledReport = new ScheduledReport({ + runAt: new Date('2023-10-01T00:00:00Z'), + kibanaId: 'instance-uuid', + kibanaName: 'kibana', + queueTimeout: 120000, + reportSO: { + id: 'report-so-id-111', + attributes: { + createdAt: new Date().toISOString(), + createdBy: 'test-user', + enabled: true, + jobType: 'test1', + meta: { objectType: 'test' }, + migrationVersion: '8.0.0', + payload: JSON.stringify(payload), + schedule: { rrule: { freq: Frequency.DAILY, interval: 2, tzid: 'UTC' } }, + title: 'Test Report', + }, + references: [], + type: 'scheduled-report', + }, + }); + expect(scheduledReport.toReportTaskJSON()).toEqual({ + attempts: 1, + created_at: '2023-10-01T00:00:00.000Z', + created_by: 'test-user', + id: expect.any(String), + index: '.kibana-reporting', + jobtype: 'test1', + meta: { + objectType: 'test', + }, + payload: { + browserTimezone: '', + forceNow: '2023-10-01T00:00:00.000Z', + headers: '', + objectType: 'test', + title: 'Test Report [2023-10-01T00:00:00.000Z]', + version: '8.0.0', + }, + }); + + expect(scheduledReport.toReportSource()).toEqual({ + attempts: 1, + max_attempts: 1, + created_at: '2023-10-01T00:00:00.000Z', + created_by: 'test-user', + jobtype: 'test1', + meta: { + objectType: 'test', + }, + migration_version: '7.14.0', + kibana_id: 'instance-uuid', + kibana_name: 'kibana', + output: null, + payload: { + browserTimezone: '', + forceNow: '2023-10-01T00:00:00.000Z', + headers: '', + objectType: 'test', + title: 'Test Report [2023-10-01T00:00:00.000Z]', + version: '8.0.0', + }, + scheduled_report_id: 'report-so-id-111', + status: 'processing', + started_at: expect.any(String), + process_expiration: expect.any(String), + timeout: 120000, + }); + + expect(scheduledReport.toApiJSON()).toEqual({ + id: expect.any(String), + index: '.kibana-reporting', + kibana_id: 'instance-uuid', + kibana_name: 'kibana', + jobtype: 'test1', + created_at: '2023-10-01T00:00:00.000Z', + created_by: 'test-user', + meta: { + objectType: 'test', + }, + timeout: 120000, + max_attempts: 1, + status: 'processing', + attempts: 1, + started_at: expect.any(String), + migration_version: '7.14.0', + output: {}, + queue_time_ms: expect.any(Number), + payload: { + browserTimezone: '', + forceNow: '2023-10-01T00:00:00.000Z', + objectType: 'test', + title: 'Test Report [2023-10-01T00:00:00.000Z]', + version: '8.0.0', + }, + scheduled_report_id: 'report-so-id-111', + }); +}); + +test('ScheduledReport should throw an error if report payload is malformed', () => { + const createInstance = () => { + return new ScheduledReport({ + runAt: new Date('2023-10-01T00:00:00Z'), + kibanaId: 'instance-uuid', + kibanaName: 'kibana', + queueTimeout: 120000, + reportSO: { + id: 'report-so-id-111', + attributes: { + createdAt: new Date().toISOString(), + createdBy: 'test-user', + enabled: true, + jobType: 'test1', + meta: { objectType: 'test' }, + migrationVersion: '8.0.0', + payload: 'abc', + schedule: { rrule: { freq: Frequency.DAILY, interval: 2, tzid: 'UTC' } }, + title: 'Test Report', + }, + references: [], + type: 'scheduled-report', + }, + }); + }; + expect(createInstance).toThrowErrorMatchingInlineSnapshot( + `"Unable to parse payload from scheduled report saved object: SyntaxError: Unexpected token 'a', \\"abc\\" is not valid JSON"` + ); +}); + +test('ScheduledReport should throw an error if report saved object is missing ID', () => { + const createInstance = () => { + return new ScheduledReport({ + runAt: new Date('2023-10-01T00:00:00Z'), + kibanaId: 'instance-uuid', + kibanaName: 'kibana', + queueTimeout: 120000, + // @ts-expect-error - missing id + reportSO: { + attributes: { + createdAt: new Date().toISOString(), + createdBy: 'test-user', + enabled: true, + jobType: 'test1', + meta: { objectType: 'test' }, + migrationVersion: '8.0.0', + payload: JSON.stringify(payload), + schedule: { rrule: { freq: Frequency.DAILY, interval: 2, tzid: 'UTC' } }, + title: 'Test Report', + }, + references: [], + type: 'scheduled-report', + }, + }); + }; + expect(createInstance).toThrowErrorMatchingInlineSnapshot( + `"Invalid scheduled report saved object - no id"` + ); +}); diff --git a/x-pack/platform/plugins/private/reporting/server/lib/store/scheduled_report.ts b/x-pack/platform/plugins/private/reporting/server/lib/store/scheduled_report.ts new file mode 100644 index 0000000000000..1e13139ee8d5e --- /dev/null +++ b/x-pack/platform/plugins/private/reporting/server/lib/store/scheduled_report.ts @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import moment from 'moment'; +import { JOB_STATUS } from '@kbn/reporting-common'; + +import { SavedObject } from '@kbn/core/server'; +import { BasePayload } from '@kbn/reporting-common/types'; +import { Report } from './report'; +import { ScheduledReportType } from '../../types'; + +interface ConstructorOpts { + runAt: Date; + kibanaId: string; + kibanaName: string; + queueTimeout: number; + reportSO: SavedObject; +} + +export class ScheduledReport extends Report { + /* + * Create a report from a scheduled report saved object + */ + constructor(opts: ConstructorOpts) { + const { kibanaId, kibanaName, runAt, reportSO, queueTimeout } = opts; + const now = moment.utc(); + const startTime = now.toISOString(); + const expirationTime = now.add(queueTimeout).toISOString(); + + let payload: BasePayload; + try { + payload = JSON.parse(reportSO.attributes.payload); + } catch (e) { + throw new Error(`Unable to parse payload from scheduled report saved object: ${e}`); + } + + payload.forceNow = runAt.toISOString(); + payload.title = `${reportSO.attributes.title} [${runAt.toISOString()}]`; + + if (!reportSO.id) { + throw new Error(`Invalid scheduled report saved object - no id`); + } + + super( + { + migration_version: reportSO.attributes.migrationVersion, + jobtype: reportSO.attributes.jobType, + created_at: runAt.toISOString(), + created_by: reportSO.attributes.createdBy as string | false, + payload, + meta: reportSO.attributes.meta, + status: JOB_STATUS.PROCESSING, + attempts: 1, + process_expiration: expirationTime, + kibana_id: kibanaId, + kibana_name: kibanaName, + max_attempts: 1, + started_at: startTime, + timeout: queueTimeout, + scheduled_report_id: reportSO.id, + }, + { queue_time_ms: [now.diff(moment.utc(runAt), 'milliseconds')] } + ); + } +} diff --git a/x-pack/platform/plugins/private/reporting/server/lib/store/store.test.ts b/x-pack/platform/plugins/private/reporting/server/lib/store/store.test.ts index cece2608f4c48..ce7c438d44d2a 100644 --- a/x-pack/platform/plugins/private/reporting/server/lib/store/store.test.ts +++ b/x-pack/platform/plugins/private/reporting/server/lib/store/store.test.ts @@ -60,6 +60,53 @@ describe('ReportingStore', () => { }); }); + it('uses report status if set', async () => { + const store = new ReportingStore(mockCore, mockLogger); + const mockReport = new Report({ + _index: '.reporting-mock', + attempts: 0, + created_by: 'username1', + jobtype: 'unknowntype', + status: 'processing', + payload: {}, + meta: {}, + } as any); + await expect(store.addReport(mockReport)).resolves.toMatchObject({ + _primary_term: undefined, + _seq_no: undefined, + attempts: 0, + completed_at: undefined, + created_by: 'username1', + jobtype: 'unknowntype', + payload: {}, + meta: {}, + status: 'processing', + }); + }); + + it('defaults to pending status if not set', async () => { + const store = new ReportingStore(mockCore, mockLogger); + const mockReport = new Report({ + _index: '.reporting-mock', + attempts: 0, + created_by: 'username1', + jobtype: 'unknowntype', + payload: {}, + meta: {}, + } as any); + await expect(store.addReport(mockReport)).resolves.toMatchObject({ + _primary_term: undefined, + _seq_no: undefined, + attempts: 0, + completed_at: undefined, + created_by: 'username1', + jobtype: 'unknowntype', + payload: {}, + meta: {}, + status: 'pending', + }); + }); + it('throws if options has invalid indexInterval', async () => { const reportingConfig = { index: '.reporting-test', @@ -181,6 +228,7 @@ describe('ReportingStore', () => { }, "process_expiration": undefined, "queue_time_ms": undefined, + "scheduled_report_id": undefined, "started_at": undefined, "status": "pending", "timeout": 30000, diff --git a/x-pack/platform/plugins/private/reporting/server/lib/store/store.ts b/x-pack/platform/plugins/private/reporting/server/lib/store/store.ts index 671a9dbc924b1..417c9599a7744 100644 --- a/x-pack/platform/plugins/private/reporting/server/lib/store/store.ts +++ b/x-pack/platform/plugins/private/reporting/server/lib/store/store.ts @@ -145,8 +145,8 @@ export class ReportingStore { ...report.toReportSource(), ...sourceDoc({ process_expiration: new Date(0).toISOString(), - attempts: 0, - status: JOB_STATUS.PENDING, + attempts: report.attempts || 0, + status: report.status || JOB_STATUS.PENDING, }), }, }; diff --git a/x-pack/platform/plugins/private/reporting/server/lib/tasks/index.ts b/x-pack/platform/plugins/private/reporting/server/lib/tasks/index.ts index 841d499da1059..e3da53487caa6 100644 --- a/x-pack/platform/plugins/private/reporting/server/lib/tasks/index.ts +++ b/x-pack/platform/plugins/private/reporting/server/lib/tasks/index.ts @@ -5,14 +5,16 @@ * 2.0. */ -import { TaskRunCreatorFunction } from '@kbn/task-manager-plugin/server'; +import { RruleSchedule, TaskRegisterDefinition } from '@kbn/task-manager-plugin/server'; import { BasePayload, ReportSource } from '@kbn/reporting-common/types'; export const REPORTING_EXECUTE_TYPE = 'report:execute'; +export const SCHEDULED_REPORTING_EXECUTE_TYPE = 'report:execute-scheduled'; export const TIME_BETWEEN_ATTEMPTS = 10 * 1000; // 10 seconds -export { ExecuteReportTask } from './execute_report'; +export { RunSingleReportTask } from './run_single_report'; +export { RunScheduledReportTask } from './run_scheduled_report'; export interface ReportTaskParams { id: string; @@ -25,18 +27,21 @@ export interface ReportTaskParams { meta: ReportSource['meta']; } +export interface ScheduledReportTaskParams { + id: string; + jobtype: ReportSource['jobtype']; + spaceId: string; + schedule: RruleSchedule; +} + +export type ScheduledReportTaskParamsWithoutSpaceId = Omit; + export enum ReportingTaskStatus { UNINITIALIZED = 'uninitialized', INITIALIZED = 'initialized', } export interface ReportingTask { - getTaskDefinition: () => { - type: string; - title: string; - createTaskRunner: TaskRunCreatorFunction; - maxAttempts: number; - timeout: string; - }; + getTaskDefinition: () => TaskRegisterDefinition; getStatus: () => ReportingTaskStatus; } diff --git a/x-pack/platform/plugins/private/reporting/server/lib/tasks/execute_report.ts b/x-pack/platform/plugins/private/reporting/server/lib/tasks/run_report.ts similarity index 60% rename from x-pack/platform/plugins/private/reporting/server/lib/tasks/execute_report.ts rename to x-pack/platform/plugins/private/reporting/server/lib/tasks/run_report.ts index 8b99f2e9514f4..5ef691a139804 100644 --- a/x-pack/platform/plugins/private/reporting/server/lib/tasks/execute_report.ts +++ b/x-pack/platform/plugins/private/reporting/server/lib/tasks/run_report.ts @@ -15,10 +15,9 @@ import type { KibanaRequest, Logger } from '@kbn/core/server'; import { CancellationToken, KibanaShuttingDownError, - QueueTimeoutError, + MissingAuthenticationError, ReportingError, durationToNumber, - numberToDuration, } from '@kbn/reporting-common'; import type { ExecutionError, @@ -28,34 +27,26 @@ import type { TaskRunResult, } from '@kbn/reporting-common/types'; import { decryptJobHeaders, type ReportingConfigType } from '@kbn/reporting-server'; -import type { - RunContext, - TaskManagerStartContract, - TaskRunCreatorFunction, +import { + throwRetryableError, + type ConcreteTaskInstance, + type RunContext, + type TaskManagerStartContract, + type TaskRegisterDefinition, + type TaskRunCreatorFunction, } from '@kbn/task-manager-plugin/server'; -import { throwRetryableError } from '@kbn/task-manager-plugin/server'; import { ExportTypesRegistry } from '@kbn/reporting-server/export_types_registry'; import { kibanaRequestFactory } from '@kbn/core-http-server-utils'; import { DEFAULT_SPACE_ID } from '@kbn/spaces-plugin/common'; -import { - REPORTING_EXECUTE_TYPE, - ReportTaskParams, - ReportingTask, - ReportingTaskStatus, - TIME_BETWEEN_ATTEMPTS, -} from '.'; -import { getContentStream, finishedWithNoPendingCallbacks } from '../content_stream'; +import { mapToReportingError } from '../../../common/errors/map_to_reporting_error'; +import { ReportTaskParams, ReportingTask, ReportingTaskStatus, TIME_BETWEEN_ATTEMPTS } from '.'; import type { ReportingCore } from '../..'; -import { - isExecutionError, - mapToReportingError, -} from '../../../common/errors/map_to_reporting_error'; import { EventTracker } from '../../usage'; -import type { ReportingStore } from '../store'; import { Report, SavedReport } from '../store'; -import type { ReportFailedFields, ReportProcessingFields } from '../store/store'; +import type { ReportFailedFields } from '../store/store'; import { errorLogger } from './error_logger'; +import { finishedWithNoPendingCallbacks, getContentStream } from '../content_stream'; type CompletedReportOutput = Omit; @@ -72,12 +63,6 @@ interface GetHeadersOpts { requestFromTask?: KibanaRequest; spaceId: string | undefined; } -interface ReportingExecuteTaskInstance { - state: object; - taskType: string; - params: ReportTaskParams; - runAt?: Date; -} function isOutput(output: CompletedReportOutput | Error): output is CompletedReportOutput { return (output as CompletedReportOutput).size != null; @@ -95,63 +80,84 @@ function parseError(error: unknown): ExecutionError | unknown { return error; } -export class ExecuteReportTask implements ReportingTask { - public TYPE = REPORTING_EXECUTE_TYPE; - - private logger: Logger; - private taskManagerStart?: TaskManagerStartContract; - private kibanaId?: string; - private kibanaName?: string; - private exportTypesRegistry: ExportTypesRegistry; - private store?: ReportingStore; - private eventTracker?: EventTracker; - - constructor( - private reporting: ReportingCore, - private config: ReportingConfigType, - logger: Logger - ) { - this.logger = logger.get('runTask'); - this.exportTypesRegistry = this.reporting.getExportTypesRegistry(); +export interface ConstructorOpts { + config: ReportingConfigType; + logger: Logger; + reporting: ReportingCore; +} + +export interface PrepareJobResults { + isLastAttempt: boolean; + jobId: string; + report?: SavedReport; + task?: ReportTaskParams; +} + +type ReportTaskParamsType = Record; + +export abstract class RunReportTask + implements ReportingTask +{ + protected readonly logger: Logger; + protected readonly queueTimeout: number; + + protected taskManagerStart?: TaskManagerStartContract; + protected kibanaId?: string; + protected kibanaName?: string; + protected exportTypesRegistry: ExportTypesRegistry; + protected eventTracker?: EventTracker; + + constructor(protected readonly opts: ConstructorOpts) { + this.logger = opts.logger.get('runTask'); + this.exportTypesRegistry = opts.reporting.getExportTypesRegistry(); + this.queueTimeout = durationToNumber(opts.config.queue.timeout); } - /* - * To be called from plugin start - */ + // Abstract methods + public abstract get TYPE(): string; + + public abstract getTaskDefinition(): TaskRegisterDefinition; + + public abstract scheduleTask( + request: KibanaRequest, + params: TaskParams + ): Promise; + + protected abstract prepareJob(taskInstance: ConcreteTaskInstance): Promise; + + protected abstract getMaxAttempts(): number | undefined; + + // Public methods public async init(taskManager: TaskManagerStartContract) { this.taskManagerStart = taskManager; - const { reporting } = this; - const { uuid, name } = reporting.getServerInfo(); + const { uuid, name } = this.opts.reporting.getServerInfo(); this.kibanaId = uuid; this.kibanaName = name; } - /* - * Async get the ReportingStore: it is only available after PluginStart - */ - private async getStore(): Promise { - if (this.store) { - return this.store; + public getStatus() { + if (this.taskManagerStart) { + return ReportingTaskStatus.INITIALIZED; } - const { store } = await this.reporting.getPluginStartDeps(); - this.store = store; - return store; + + return ReportingTaskStatus.UNINITIALIZED; } - private getTaskManagerStart() { + // Protected methods + protected getTaskManagerStart() { if (!this.taskManagerStart) { throw new Error('Reporting task runner has not been initialized!'); } return this.taskManagerStart; } - private getEventTracker(report: Report) { + protected getEventTracker(report: Report) { if (this.eventTracker) { return this.eventTracker; } - const eventTracker = this.reporting.getEventTracker( + const eventTracker = this.opts.reporting.getEventTracker( report._id, report.jobtype, report.payload.objectType @@ -160,91 +166,12 @@ export class ExecuteReportTask implements ReportingTask { return this.eventTracker; } - private getJobContentEncoding(jobType: string) { + protected getJobContentEncoding(jobType: string) { const exportType = this.exportTypesRegistry.getByJobType(jobType); return exportType.jobContentEncoding; } - private async _claimJob(task: ReportTaskParams): Promise { - if (this.kibanaId == null) { - throw new Error(`Kibana instance ID is undefined!`); - } - if (this.kibanaName == null) { - throw new Error(`Kibana instance name is undefined!`); - } - - const store = await this.getStore(); - const report = await store.findReportFromTask(task); // receives seq_no and primary_term - const logger = this.logger.get(report._id); - - if (report.status === 'completed') { - throw new Error(`Can not claim the report job: it is already completed!`); - } - - const m = moment(); - - // check if job has exceeded the configured maxAttempts - const maxAttempts = this.getMaxAttempts(); - if (report.attempts >= maxAttempts) { - let err: ReportingError; - if (report.error && isExecutionError(report.error)) { - // We have an error stored from a previous attempts, so we'll use that - // error to fail the job and return it to the user. - const { error } = report; - err = mapToReportingError(error); - err.stack = error.stack; - } else { - if (report.error && report.error instanceof Error) { - errorLogger(logger, 'Error executing report', report.error); - } - err = new QueueTimeoutError( - `Max attempts reached (${maxAttempts}). Queue timeout reached.` - ); - } - await this._failJob(report, err); - throw err; - } - - const queueTimeout = durationToNumber(this.config.queue.timeout); - const startTime = m.toISOString(); - const expirationTime = m.add(queueTimeout).toISOString(); - - const doc: ReportProcessingFields = { - kibana_id: this.kibanaId, - kibana_name: this.kibanaName, - attempts: report.attempts + 1, - max_attempts: maxAttempts, - started_at: startTime, - timeout: queueTimeout, - process_expiration: expirationTime, - }; - - const claimedReport = new SavedReport({ - ...report, - ...doc, - }); - - logger.info( - `Claiming ${claimedReport.jobtype} ${report._id} ` + - `[_index: ${report._index}] ` + - `[_seq_no: ${report._seq_no}] ` + - `[_primary_term: ${report._primary_term}] ` + - `[attempts: ${report.attempts}] ` + - `[process_expiration: ${expirationTime}]` - ); - - // event tracking of claimed job - const eventTracker = this.getEventTracker(report); - const timeSinceCreation = Date.now() - new Date(report.created_at).valueOf(); - eventTracker?.claimJob({ timeSinceCreation }); - - const resp = await store.setReportClaimed(claimedReport, doc); - claimedReport._seq_no = resp._seq_no!; - claimedReport._primary_term = resp._primary_term!; - return claimedReport; - } - - private async _failJob( + protected async failJob( report: SavedReport, error?: ReportingError ): Promise> { @@ -255,13 +182,13 @@ export class ExecuteReportTask implements ReportingTask { let docOutput; if (error) { errorLogger(logger, message, error); - docOutput = this._formatOutput(error); + docOutput = this.formatOutput(error); } else { errorLogger(logger, message); } // update the report in the store - const store = await this.getStore(); + const store = await this.opts.reporting.getStore(); const completedTime = moment(); const doc: ReportFailedFields = { completed_at: completedTime.toISOString(), @@ -280,7 +207,7 @@ export class ExecuteReportTask implements ReportingTask { return await store.setReportFailed(report, doc); } - private async _saveExecutionError( + protected async saveExecutionError( report: SavedReport, failedToExecuteErr: any ): Promise> { @@ -291,7 +218,7 @@ export class ExecuteReportTask implements ReportingTask { errorLogger(logger, message, failedToExecuteErr); // update the report in the store - const store = await this.getStore(); + const store = await this.opts.reporting.getStore(); const doc: ReportFailedFields = { output: null, error: errorParsed, @@ -300,7 +227,7 @@ export class ExecuteReportTask implements ReportingTask { return await store.setReportError(report, doc); } - private _formatOutput(output: CompletedReportOutput | ReportingError): ReportOutput { + protected formatOutput(output: CompletedReportOutput | ReportingError): ReportOutput { const docOutput = {} as ReportOutput; const unknownMime = null; @@ -324,7 +251,7 @@ export class ExecuteReportTask implements ReportingTask { return docOutput; } - private async _getRequestToUse({ + protected async getRequestToUse({ requestFromTask, spaceId, encryptedHeaders, @@ -339,17 +266,17 @@ export class ExecuteReportTask implements ReportingTask { } let decryptedHeaders; - if (this.config.encryptionKey && encryptedHeaders) { + if (this.opts.config.encryptionKey && encryptedHeaders) { // get decrypted headers decryptedHeaders = await decryptJobHeaders( - this.config.encryptionKey, + this.opts.config.encryptionKey, encryptedHeaders, this.logger ); } if (!decryptedHeaders && !apiKeyAuthHeaders) { - throw new Error('No headers found to execute report'); + throw new MissingAuthenticationError(); } let headersToUse: Headers = {}; @@ -367,10 +294,10 @@ export class ExecuteReportTask implements ReportingTask { headersToUse = decryptedHeaders || {}; } - return this._getFakeRequest(headersToUse, spaceId, this.logger); + return this.getFakeRequest(headersToUse, spaceId, this.logger); } - private _getFakeRequest( + protected getFakeRequest( headers: Headers, spaceId: string | undefined, logger = this.logger @@ -381,7 +308,7 @@ export class ExecuteReportTask implements ReportingTask { }; const fakeRequest = kibanaRequestFactory(rawRequest); - const setupDeps = this.reporting.getPluginSetupDeps(); + const setupDeps = this.opts.reporting.getPluginSetupDeps(); const spacesService = setupDeps.spaces?.spacesService; if (spacesService) { if (spaceId && spaceId !== DEFAULT_SPACE_ID) { @@ -392,7 +319,7 @@ export class ExecuteReportTask implements ReportingTask { return fakeRequest; } - private async _performJob({ + protected async performJob({ task, fakeRequest, taskInstanceFields, @@ -405,8 +332,7 @@ export class ExecuteReportTask implements ReportingTask { } // run the report // if workerFn doesn't finish before timeout, call the cancellationToken and throw an error - const queueTimeout = durationToNumber(this.config.queue.timeout); - const request = await this._getRequestToUse({ + const request = await this.getRequestToUse({ requestFromTask: fakeRequest, spaceId: task.payload.spaceId, encryptedHeaders: task.payload.headers, @@ -422,11 +348,11 @@ export class ExecuteReportTask implements ReportingTask { cancellationToken, stream, }) - ).pipe(timeout(queueTimeout)) // throw an error if a value is not emitted before timeout + ).pipe(timeout(this.queueTimeout)) // throw an error if a value is not emitted before timeout ); } - private async _completeJob( + protected async completeJob( report: SavedReport, output: CompletedReportOutput ): Promise { @@ -436,8 +362,8 @@ export class ExecuteReportTask implements ReportingTask { logger.debug(`Saving ${report.jobtype} to ${docId}.`); const completedTime = moment(); - const docOutput = this._formatOutput(output); - const store = await this.getStore(); + const docOutput = this.formatOutput(output); + const store = await this.opts.reporting.getStore(); const doc = { completed_at: completedTime.toISOString(), metrics: output.metrics, @@ -477,25 +403,20 @@ export class ExecuteReportTask implements ReportingTask { } // Generic is used to let TS infer the return type at call site. - private async throwIfKibanaShutsDown(): Promise { - await Rx.firstValueFrom(this.reporting.getKibanaShutdown$()); + protected async throwIfKibanaShutsDown(): Promise { + await Rx.firstValueFrom(this.opts.reporting.getKibanaShutdown$()); throw new KibanaShuttingDownError(); } /* * Provides a TaskRunner for Task Manager */ - private getTaskRunner(): TaskRunCreatorFunction { + protected getTaskRunner(): TaskRunCreatorFunction { // Keep a separate local stack for each task run return ({ taskInstance, fakeRequest }: RunContext) => { let jobId: string; const cancellationToken = new CancellationToken(); - const { - attempts: taskAttempts, - params: reportTaskParams, - retryAt: taskRetryAt, - startedAt: taskStartedAt, - } = taskInstance; + const { retryAt: taskRetryAt, startedAt: taskStartedAt } = taskInstance; return { /* @@ -506,31 +427,29 @@ export class ExecuteReportTask implements ReportingTask { * If any error happens, additional retry attempts may be picked up by a separate instance */ run: async () => { - let report: SavedReport | undefined; - const isLastAttempt = taskAttempts >= this.getMaxAttempts(); - - // find the job in the store and set status to processing - const task = reportTaskParams as ReportTaskParams; - jobId = task?.id; - - try { - if (!jobId) { - throw new Error('Invalid report data provided in scheduled task!'); - } - if (!isLastAttempt) { - this.reporting.trackReport(jobId); - } + if (this.kibanaId == null) { + throw new Error(`Kibana instance ID is undefined!`); + } + if (this.kibanaName == null) { + throw new Error(`Kibana instance name is undefined!`); + } - // Update job status to claimed - report = await this._claimJob(task); - } catch (failedToClaim) { - // error claiming report - log the error - // could be version conflict, or too many attempts or no longer connected to ES - errorLogger(this.logger, `Error in claiming ${jobId}`, failedToClaim); + let report: SavedReport | undefined; + const { + isLastAttempt, + jobId: jId, + report: preparedReport, + task, + } = await this.prepareJob(taskInstance); + jobId = jId; + report = preparedReport; + + if (!isLastAttempt) { + this.opts.reporting.trackReport(jobId); } - if (!report) { - this.reporting.untrackReport(jobId); + if (!report || !task) { + this.opts.reporting.untrackReport(jobId); if (isLastAttempt) { errorLogger(this.logger, `Job ${jobId} failed too many times. Exiting...`); @@ -545,22 +464,27 @@ export class ExecuteReportTask implements ReportingTask { } const { jobtype: jobType, attempts } = report; - const maxAttempts = this.getMaxAttempts(); const logger = this.logger.get(jobId); - logger.debug( - `Starting ${jobType} report ${jobId}: attempt ${attempts} of ${maxAttempts}.` - ); - logger.debug(`Reports running: ${this.reporting.countConcurrentReports()}.`); + const maxAttempts = this.getMaxAttempts(); + if (maxAttempts) { + logger.debug( + `Starting ${jobType} report ${jobId}: attempt ${attempts} of ${maxAttempts}.` + ); + } else { + logger.debug(`Starting ${jobType} report ${jobId}.`); + } - const eventLog = this.reporting.getEventLogger( + logger.debug(`Reports running: ${this.opts.reporting.countConcurrentReports()}.`); + + const eventLog = this.opts.reporting.getEventLogger( new Report({ ...task, _id: task.id, _index: task.index }) ); try { const jobContentEncoding = this.getJobContentEncoding(jobType); const stream = await getContentStream( - this.reporting, + this.opts.reporting, { id: report._id, index: report._index, @@ -574,7 +498,7 @@ export class ExecuteReportTask implements ReportingTask { eventLog.logExecutionStart(); const output = await Promise.race([ - this._performJob({ + this.performJob({ task, fakeRequest, taskInstanceFields: { retryAt: taskRetryAt, startedAt: taskStartedAt }, @@ -601,7 +525,7 @@ export class ExecuteReportTask implements ReportingTask { if (output) { logger.debug(`Job output size: ${stream.bytesWritten} bytes.`); // Update the job status to "completed" - report = await this._completeJob(report, { + report = await this.completeJob(report, { ...output, size: stream.bytesWritten, }); @@ -612,11 +536,9 @@ export class ExecuteReportTask implements ReportingTask { } catch (failedToExecuteErr) { eventLog.logError(failedToExecuteErr); - await this._saveExecutionError(report, failedToExecuteErr).catch( - (failedToSaveError) => { - errorLogger(logger, `Error in saving execution error ${jobId}`, failedToSaveError); - } - ); + await this.saveExecutionError(report, failedToExecuteErr).catch((failedToSaveError) => { + errorLogger(logger, `Error in saving execution error ${jobId}`, failedToSaveError); + }); cancellationToken.cancel(); @@ -624,8 +546,8 @@ export class ExecuteReportTask implements ReportingTask { throwRetryableError(error, new Date(Date.now() + TIME_BETWEEN_ATTEMPTS)); } finally { - this.reporting.untrackReport(jobId); - logger.debug(`Reports running: ${this.reporting.countConcurrentReports()}.`); + this.opts.reporting.untrackReport(jobId); + logger.debug(`Reports running: ${this.opts.reporting.countConcurrentReports()}.`); } }, @@ -642,47 +564,4 @@ export class ExecuteReportTask implements ReportingTask { }; }; } - - private getMaxAttempts() { - return this.config.capture.maxAttempts ?? 1; - } - - public getTaskDefinition() { - // round up from ms to the nearest second - const queueTimeout = Math.ceil(numberToDuration(this.config.queue.timeout).asSeconds()) + 's'; - const maxConcurrency = this.config.queue.pollEnabled ? 1 : 0; - const maxAttempts = this.getMaxAttempts(); - - return { - type: REPORTING_EXECUTE_TYPE, - title: 'Reporting: execute job', - createTaskRunner: this.getTaskRunner(), - maxAttempts: maxAttempts + 1, // Add 1 so we get an extra attempt in case of failure during a Kibana restart - timeout: queueTimeout, - maxConcurrency, - }; - } - - public async scheduleTask(request: KibanaRequest, params: ReportTaskParams) { - const reportingHealth = await this.reporting.getHealthInfo(); - const shouldScheduleWithApiKey = - reportingHealth.hasPermanentEncryptionKey && reportingHealth.isSufficientlySecure; - const taskInstance: ReportingExecuteTaskInstance = { - taskType: REPORTING_EXECUTE_TYPE, - state: {}, - params, - }; - - return shouldScheduleWithApiKey - ? await this.getTaskManagerStart().schedule(taskInstance, { request }) - : await this.getTaskManagerStart().schedule(taskInstance); - } - - public getStatus() { - if (this.taskManagerStart) { - return ReportingTaskStatus.INITIALIZED; - } - - return ReportingTaskStatus.UNINITIALIZED; - } } diff --git a/x-pack/platform/plugins/private/reporting/server/lib/tasks/run_scheduled_report.test.ts b/x-pack/platform/plugins/private/reporting/server/lib/tasks/run_scheduled_report.test.ts new file mode 100644 index 0000000000000..2522e3644ad82 --- /dev/null +++ b/x-pack/platform/plugins/private/reporting/server/lib/tasks/run_scheduled_report.test.ts @@ -0,0 +1,443 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Transform } from 'stream'; +import type { estypes } from '@elastic/elasticsearch'; +import { loggingSystemMock } from '@kbn/core/server/mocks'; +import { JOB_STATUS, KibanaShuttingDownError } from '@kbn/reporting-common'; +import { ReportDocument } from '@kbn/reporting-common/types'; +import { createMockConfigSchema } from '@kbn/reporting-mocks-server'; +import { type ExportType, type ReportingConfigType } from '@kbn/reporting-server'; +import type { RunContext } from '@kbn/task-manager-plugin/server'; +import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks'; + +import { RunScheduledReportTask, SCHEDULED_REPORTING_EXECUTE_TYPE } from '.'; +import { ReportingCore } from '../..'; +import { createMockReportingCore } from '../../test_helpers'; +import { + FakeRawRequest, + KibanaRequest, + SavedObject, + SavedObjectsClientContract, +} from '@kbn/core/server'; +import { Frequency } from '@kbn/rrule'; +import { ReportingStore, SavedReport } from '../store'; +import { ScheduledReportType } from '../../types'; + +interface StreamMock { + getSeqNo: () => number; + getPrimaryTerm: () => number; + write: (data: string) => void; + fail: () => void; + end: () => void; + transform: Transform; +} + +function createStreamMock(): StreamMock { + const transform: Transform = new Transform({}); + + return { + getSeqNo: () => 10, + getPrimaryTerm: () => 20, + write: (data: string) => { + transform.push(`${data}\n`); + }, + fail: () => { + transform.emit('error', new Error('Stream failed')); + transform.end(); + }, + transform, + end: () => { + transform.end(); + }, + }; +} + +const mockStream = createStreamMock(); +jest.mock('../content_stream', () => ({ + getContentStream: () => mockStream, + finishedWithNoPendingCallbacks: () => Promise.resolve(), +})); + +const logger = loggingSystemMock.createLogger(); +const fakeRawRequest: FakeRawRequest = { + headers: { + authorization: `ApiKey skdjtq4u543yt3rhewrh`, + }, + path: '/', +}; + +const payload = { + headers: '', + title: 'Test Report', + browserTimezone: '', + objectType: 'test', + version: '8.0.0', +}; + +const reportSO: SavedObject = { + id: 'report-so-id', + attributes: { + createdAt: new Date().toISOString(), + createdBy: 'test-user', + enabled: true, + jobType: 'test1', + meta: { objectType: 'test' }, + migrationVersion: '8.0.0', + payload: JSON.stringify(payload), + schedule: { rrule: { freq: Frequency.DAILY, interval: 2, tzid: 'UTC' } }, + title: 'Test Report', + }, + references: [], + type: 'scheduled-report', +}; + +describe('Run Scheduled Report Task', () => { + let mockReporting: ReportingCore; + let configType: ReportingConfigType; + let soClient: SavedObjectsClientContract; + let reportStore: ReportingStore; + + const runTaskFn = jest.fn().mockResolvedValue({ content_type: 'application/pdf' }); + beforeAll(async () => { + configType = createMockConfigSchema(); + mockReporting = await createMockReportingCore(configType); + + soClient = await mockReporting.getInternalSoClient(); + soClient.get = jest.fn().mockImplementation(async () => { + return reportSO; + }); + + mockReporting.getExportTypesRegistry().register({ + id: 'test1', + name: 'Test1', + setup: jest.fn(), + start: jest.fn(), + createJob: () => new Promise(() => {}), + runTask: runTaskFn, + jobContentEncoding: 'base64', + jobType: 'test1', + validLicenses: [], + } as unknown as ExportType); + }); + + beforeEach(async () => { + reportStore = await mockReporting.getStore(); + reportStore.addReport = jest.fn().mockImplementation(async () => { + return new SavedReport({ + _id: '290357209345723095', + _index: '.reporting-fantastic', + _seq_no: 23, + _primary_term: 354000, + jobtype: 'test1', + migration_version: '8.0.0', + payload, + created_at: new Date().toISOString(), + created_by: 'test-user', + meta: { objectType: 'test' }, + scheduled_report_id: 'report-so-id', + status: JOB_STATUS.PROCESSING, + }); + }); + reportStore.setReportError = jest.fn(() => + Promise.resolve({ + _id: 'test', + jobtype: 'noop', + status: 'processing', + } as unknown as estypes.UpdateUpdateWriteResponseBase) + ); + }); + + it('Instance setup', () => { + const task = new RunScheduledReportTask({ + reporting: mockReporting, + config: configType, + logger, + }); + expect(task.getStatus()).toBe('uninitialized'); + expect(task.getTaskDefinition()).toMatchInlineSnapshot(` + Object { + "createTaskRunner": [Function], + "maxConcurrency": 1, + "timeout": "120s", + "title": "Reporting: execute scheduled job", + "type": "report:execute-scheduled", + } + `); + }); + + it('Instance start', () => { + const mockTaskManager = taskManagerMock.createStart(); + const task = new RunScheduledReportTask({ + reporting: mockReporting, + config: configType, + logger, + }); + expect(task.init(mockTaskManager)); + expect(task.getStatus()).toBe('initialized'); + }); + + it('create task runner', async () => { + logger.info = jest.fn(); + logger.error = jest.fn(); + + const task = new RunScheduledReportTask({ + reporting: mockReporting, + config: configType, + logger, + }); + const taskDef = task.getTaskDefinition(); + const taskRunner = taskDef.createTaskRunner({ + taskInstance: { + id: 'random-task-id', + params: { id: 'cool-reporting-id', jobtype: 'test1' }, + }, + } as unknown as RunContext); + expect(taskRunner).toHaveProperty('run'); + expect(taskRunner).toHaveProperty('cancel'); + }); + + it('Max Concurrency is 0 if pollEnabled is false', () => { + const queueConfig = { + queue: { pollEnabled: false, timeout: 55000 }, + } as unknown as ReportingConfigType['queue']; + + const task = new RunScheduledReportTask({ + reporting: mockReporting, + config: { ...configType, ...queueConfig }, + logger, + }); + expect(task.getStatus()).toBe('uninitialized'); + expect(task.getTaskDefinition()).toMatchInlineSnapshot(` + Object { + "createTaskRunner": [Function], + "maxConcurrency": 0, + "timeout": "55s", + "title": "Reporting: execute scheduled job", + "type": "report:execute-scheduled", + } + `); + }); + + it('schedules task with request', async () => { + const task = new RunScheduledReportTask({ + reporting: mockReporting, + config: configType, + logger, + }); + const mockTaskManager = taskManagerMock.createStart(); + await task.init(mockTaskManager); + + await task.scheduleTask(fakeRawRequest as unknown as KibanaRequest, { + id: 'report-so-id', + jobtype: 'test1', + schedule: { + rrule: { freq: Frequency.DAILY, interval: 2, tzid: 'UTC' }, + } as never, + }); + + expect(mockTaskManager.schedule).toHaveBeenCalledWith( + { + id: 'report-so-id', + taskType: SCHEDULED_REPORTING_EXECUTE_TYPE, + state: {}, + params: { + id: 'report-so-id', + spaceId: 'default', + jobtype: 'test1', + }, + schedule: { + rrule: { freq: Frequency.DAILY, interval: 2, tzid: 'UTC' }, + }, + }, + { request: fakeRawRequest } + ); + }); + + it('uses authorization headers from task manager fake request', async () => { + const runAt = new Date('2023-10-01T00:00:00Z'); + const task = new RunScheduledReportTask({ + reporting: mockReporting, + config: configType, + logger, + }); + + jest + // @ts-expect-error TS compilation fails: this overrides a private method of the RunScheduledReportTask instance + .spyOn(task, 'completeJob') + .mockResolvedValueOnce({ _id: 'test', jobtype: 'test1', status: 'pending' } as never); + const mockTaskManager = taskManagerMock.createStart(); + await task.init(mockTaskManager); + + const taskDef = task.getTaskDefinition(); + const taskRunner = taskDef.createTaskRunner({ + taskInstance: { + id: 'report-so-id', + runAt, + params: { + id: 'report-so-id', + jobtype: 'test1', + schedule: { + rrule: { freq: Frequency.DAILY, interval: 2, tzid: 'UTC' }, + }, + }, + }, + fakeRequest: fakeRawRequest, + } as unknown as RunContext); + + await taskRunner.run(); + + expect(soClient.get).toHaveBeenCalledWith('scheduled_report', 'report-so-id', { + namespace: 'default', + }); + expect(reportStore.addReport).toHaveBeenCalledWith( + expect.objectContaining({ + _id: expect.any(String), + _index: '.kibana-reporting', + jobtype: 'test1', + created_at: expect.any(String), + created_by: 'test-user', + payload: { + headers: '', + title: expect.any(String), + browserTimezone: '', + objectType: 'test', + version: '8.0.0', + forceNow: expect.any(String), + }, + meta: { objectType: 'test' }, + status: 'processing', + attempts: 1, + scheduled_report_id: 'report-so-id', + kibana_name: 'kibana', + kibana_id: 'instance-uuid', + started_at: expect.any(String), + timeout: 120000, + max_attempts: 1, + process_expiration: expect.any(String), + migration_version: '7.14.0', + }) + ); + expect(runTaskFn.mock.calls[0][0].request.headers).toEqual({ + authorization: 'ApiKey skdjtq4u543yt3rhewrh', + }); + }); + + it('throws if no fake request from task', async () => { + const task = new RunScheduledReportTask({ + reporting: mockReporting, + config: configType, + logger, + }); + + const mockTaskManager = taskManagerMock.createStart(); + await task.init(mockTaskManager); + + const taskDef = task.getTaskDefinition(); + const taskRunner = taskDef.createTaskRunner({ + taskInstance: { + id: 'report-so-id', + runAt: new Date('2023-10-01T00:00:00Z'), + params: { + id: 'report-so-id', + jobtype: 'test1', + schedule: { + rrule: { freq: Frequency.DAILY, interval: 2, tzid: 'UTC' }, + }, + }, + }, + fakeRequest: undefined, + } as unknown as RunContext); + + await expect(taskRunner.run()).rejects.toThrowErrorMatchingInlineSnapshot( + `"ReportingError(code: missing_authentication_header_error)"` + ); + + expect(soClient.get).toHaveBeenCalled(); + expect(reportStore.addReport).toHaveBeenCalled(); + + expect(reportStore.setReportError).toHaveBeenLastCalledWith( + expect.objectContaining({ + _id: '290357209345723095', + }), + expect.objectContaining({ + error: expect.objectContaining({ + message: `ReportingError(code: missing_authentication_header_error)`, + }), + }) + ); + }); + + it('throws during reporting if Kibana starts shutting down', async () => { + mockReporting.getExportTypesRegistry().register({ + id: 'noop', + name: 'Noop', + setup: jest.fn(), + start: jest.fn(), + createJob: () => new Promise(() => {}), + runTask: () => new Promise(() => {}), + jobContentExtension: 'pdf', + jobType: 'noop', + validLicenses: [], + } as unknown as ExportType); + const task = new RunScheduledReportTask({ + reporting: mockReporting, + config: configType, + logger, + }); + + jest + // @ts-expect-error TS compilation fails: this overrides a private method of the RunScheduledReportTask instance + .spyOn(task, 'prepareJob') + .mockResolvedValueOnce({ + isLastAttempt: false, + jobId: '290357209345723095', + report: { _id: '290357209345723095', jobtype: 'noop' }, + task: { + id: '290357209345723095', + index: '.reporting-fantastic', + jobtype: 'noop', + payload, + }, + } as never); + + const mockTaskManager = taskManagerMock.createStart(); + await task.init(mockTaskManager); + + const taskDef = task.getTaskDefinition(); + const taskRunner = taskDef.createTaskRunner({ + taskInstance: { + id: 'report-so-id', + params: { + id: 'report-so-id', + jobtype: 'test1', + schedule: { + rrule: { freq: Frequency.DAILY, interval: 2, tzid: 'UTC' }, + }, + }, + }, + fakeRequest: fakeRawRequest, + } as unknown as RunContext); + + const taskPromise = taskRunner.run(); + setImmediate(() => { + mockReporting.pluginStop(); + }); + await taskPromise.catch(() => {}); + + expect(reportStore.setReportError).toHaveBeenLastCalledWith( + expect.objectContaining({ + _id: '290357209345723095', + }), + expect.objectContaining({ + error: expect.objectContaining({ + message: `ReportingError(code: ${new KibanaShuttingDownError().code})`, + }), + }) + ); + }); +}); diff --git a/x-pack/platform/plugins/private/reporting/server/lib/tasks/run_scheduled_report.ts b/x-pack/platform/plugins/private/reporting/server/lib/tasks/run_scheduled_report.ts new file mode 100644 index 0000000000000..67f5a9dc1bf61 --- /dev/null +++ b/x-pack/platform/plugins/private/reporting/server/lib/tasks/run_scheduled_report.ts @@ -0,0 +1,118 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { KibanaRequest } from '@kbn/core/server'; +import { numberToDuration } from '@kbn/reporting-common'; +import type { ConcreteTaskInstance, TaskInstance } from '@kbn/task-manager-plugin/server'; + +import { DEFAULT_SPACE_ID } from '@kbn/spaces-utils'; +import { + SCHEDULED_REPORTING_EXECUTE_TYPE, + ScheduledReportTaskParams, + ScheduledReportTaskParamsWithoutSpaceId, +} from '.'; +import type { SavedReport } from '../store'; +import { errorLogger } from './error_logger'; +import { SCHEDULED_REPORT_SAVED_OBJECT_TYPE } from '../../saved_objects'; +import { PrepareJobResults, RunReportTask } from './run_report'; +import { ScheduledReport } from '../store/scheduled_report'; +import { ScheduledReportType } from '../../types'; + +type ScheduledReportTaskInstance = Omit & { + params: Omit; +}; +export class RunScheduledReportTask extends RunReportTask { + public get TYPE() { + return SCHEDULED_REPORTING_EXECUTE_TYPE; + } + + protected async prepareJob(taskInstance: ConcreteTaskInstance): Promise { + const { runAt, params: scheduledReportTaskParams } = taskInstance; + + let report: SavedReport | undefined; + let jobId: string; + const task = scheduledReportTaskParams as ScheduledReportTaskParams; + const reportSoId = task.id; + const reportSpaceId = task.spaceId || DEFAULT_SPACE_ID; + + try { + if (!reportSoId) { + throw new Error( + `Invalid scheduled report saved object data provided in scheduled task! - No saved object with id "${reportSoId}"` + ); + } + + const internalSoClient = await this.opts.reporting.getInternalSoClient(); + const reportSO = await internalSoClient.get( + SCHEDULED_REPORT_SAVED_OBJECT_TYPE, + reportSoId, + { namespace: reportSpaceId } + ); + + const store = await this.opts.reporting.getStore(); + + // Add the report to ReportingStore to show as processing + report = await store.addReport( + new ScheduledReport({ + runAt, + kibanaId: this.kibanaId!, + kibanaName: this.kibanaName!, + queueTimeout: this.queueTimeout, + reportSO, + }) + ); + + jobId = report._id; + if (!jobId) { + throw new Error(`Unable to store report document in ReportingStore`); + } + } catch (failedToClaim) { + // error claiming report - log the error + errorLogger(this.logger, `Error in running scheduled report ${reportSoId}`, failedToClaim); + } + + return { isLastAttempt: false, jobId: jobId!, report, task: report?.toReportTaskJSON() }; + } + + protected getMaxAttempts() { + return undefined; + } + + public getTaskDefinition() { + // round up from ms to the nearest second + const queueTimeout = + Math.ceil(numberToDuration(this.opts.config.queue.timeout).asSeconds()) + 's'; + const maxConcurrency = this.opts.config.queue.pollEnabled ? 1 : 0; + + return { + type: SCHEDULED_REPORTING_EXECUTE_TYPE, + title: 'Reporting: execute scheduled job', + createTaskRunner: this.getTaskRunner(), + timeout: queueTimeout, + maxConcurrency, + }; + } + + public async scheduleTask( + request: KibanaRequest, + params: ScheduledReportTaskParamsWithoutSpaceId + ) { + const spaceId = this.opts.reporting.getSpaceId(request, this.logger); + const taskInstance: ScheduledReportTaskInstance = { + id: params.id, + taskType: SCHEDULED_REPORTING_EXECUTE_TYPE, + state: {}, + params: { + id: params.id, + spaceId: spaceId || DEFAULT_SPACE_ID, + jobtype: params.jobtype, + }, + schedule: params.schedule, + }; + return await this.getTaskManagerStart().schedule(taskInstance, { request }); + } +} diff --git a/x-pack/platform/plugins/private/reporting/server/lib/tasks/execute_report.test.ts b/x-pack/platform/plugins/private/reporting/server/lib/tasks/run_single_report.test.ts similarity index 88% rename from x-pack/platform/plugins/private/reporting/server/lib/tasks/execute_report.test.ts rename to x-pack/platform/plugins/private/reporting/server/lib/tasks/run_single_report.test.ts index a8b5eb35f6ad0..58c28dffffe27 100644 --- a/x-pack/platform/plugins/private/reporting/server/lib/tasks/execute_report.test.ts +++ b/x-pack/platform/plugins/private/reporting/server/lib/tasks/run_single_report.test.ts @@ -16,7 +16,7 @@ import { cryptoFactory, type ExportType, type ReportingConfigType } from '@kbn/r import type { RunContext } from '@kbn/task-manager-plugin/server'; import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks'; -import { ExecuteReportTask, REPORTING_EXECUTE_TYPE } from '.'; +import { RunSingleReportTask, REPORTING_EXECUTE_TYPE } from '.'; import { ReportingCore } from '../..'; import { createMockReportingCore } from '../../test_helpers'; import { FakeRawRequest, KibanaRequest } from '@kbn/core/server'; @@ -104,7 +104,7 @@ const fakeRawRequest: FakeRawRequest = { path: '/', }; -describe('Execute Report Task', () => { +describe('Run Single Report Task', () => { let mockReporting: ReportingCore; let configType: ReportingConfigType; beforeAll(async () => { @@ -113,7 +113,7 @@ describe('Execute Report Task', () => { }); it('Instance setup', () => { - const task = new ExecuteReportTask(mockReporting, configType, logger); + const task = new RunSingleReportTask({ reporting: mockReporting, config: configType, logger }); expect(task.getStatus()).toBe('uninitialized'); expect(task.getTaskDefinition()).toMatchInlineSnapshot(` Object { @@ -129,7 +129,7 @@ describe('Execute Report Task', () => { it('Instance start', () => { const mockTaskManager = taskManagerMock.createStart(); - const task = new ExecuteReportTask(mockReporting, configType, logger); + const task = new RunSingleReportTask({ reporting: mockReporting, config: configType, logger }); expect(task.init(mockTaskManager)); expect(task.getStatus()).toBe('initialized'); }); @@ -138,7 +138,7 @@ describe('Execute Report Task', () => { logger.info = jest.fn(); logger.error = jest.fn(); - const task = new ExecuteReportTask(mockReporting, configType, logger); + const task = new RunSingleReportTask({ reporting: mockReporting, config: configType, logger }); const taskDef = task.getTaskDefinition(); const taskRunner = taskDef.createTaskRunner({ taskInstance: { @@ -155,7 +155,11 @@ describe('Execute Report Task', () => { queue: { pollEnabled: false, timeout: 55000 }, } as unknown as ReportingConfigType['queue']; - const task = new ExecuteReportTask(mockReporting, { ...configType, ...queueConfig }, logger); + const task = new RunSingleReportTask({ + reporting: mockReporting, + config: { ...configType, ...queueConfig }, + logger, + }); expect(task.getStatus()).toBe('uninitialized'); expect(task.getTaskDefinition()).toMatchInlineSnapshot(` Object { @@ -175,7 +179,7 @@ describe('Execute Report Task', () => { hasPermanentEncryptionKey: true, areNotificationsEnabled: true, }); - const task = new ExecuteReportTask(mockReporting, configType, logger); + const task = new RunSingleReportTask({ reporting: mockReporting, config: configType, logger }); const mockTaskManager = taskManagerMock.createStart(); await task.init(mockTaskManager); @@ -208,7 +212,7 @@ describe('Execute Report Task', () => { hasPermanentEncryptionKey: true, areNotificationsEnabled: false, }); - const task = new ExecuteReportTask(mockReporting, configType, logger); + const task = new RunSingleReportTask({ reporting: mockReporting, config: configType, logger }); const mockTaskManager = taskManagerMock.createStart(); await task.init(mockTaskManager); @@ -238,7 +242,7 @@ describe('Execute Report Task', () => { hasPermanentEncryptionKey: false, areNotificationsEnabled: true, }); - const task = new ExecuteReportTask(mockReporting, configType, logger); + const task = new RunSingleReportTask({ reporting: mockReporting, config: configType, logger }); const mockTaskManager = taskManagerMock.createStart(); await task.init(mockTaskManager); @@ -275,14 +279,14 @@ describe('Execute Report Task', () => { jobType: 'test1', validLicenses: [], } as unknown as ExportType); - const task = new ExecuteReportTask(mockReporting, configType, logger); + const task = new RunSingleReportTask({ reporting: mockReporting, config: configType, logger }); jest - // @ts-expect-error TS compilation fails: this overrides a private method of the ExecuteReportTask instance - .spyOn(task, '_claimJob') + // @ts-expect-error TS compilation fails: this overrides a private method of the RunSingleReportTask instance + .spyOn(task, 'claimJob') .mockResolvedValueOnce({ _id: 'test', jobtype: 'test1', status: 'pending' } as never); jest - // @ts-expect-error TS compilation fails: this overrides a private method of the ExecuteReportTask instance - .spyOn(task, '_completeJob') + // @ts-expect-error TS compilation fails: this overrides a private method of the RunSingleReportTask instance + .spyOn(task, 'completeJob') .mockResolvedValueOnce({ _id: 'test', jobtype: 'test1', status: 'pending' } as never); const mockTaskManager = taskManagerMock.createStart(); await task.init(mockTaskManager); @@ -320,14 +324,14 @@ describe('Execute Report Task', () => { jobType: 'test2', validLicenses: [], } as unknown as ExportType); - const task = new ExecuteReportTask(mockReporting, configType, logger); + const task = new RunSingleReportTask({ reporting: mockReporting, config: configType, logger }); jest - // @ts-expect-error TS compilation fails: this overrides a private method of the ExecuteReportTask instance - .spyOn(task, '_claimJob') + // @ts-expect-error TS compilation fails: this overrides a private method of the RunSingleReportTask instance + .spyOn(task, 'claimJob') .mockResolvedValueOnce({ _id: 'test', jobtype: 'test2', status: 'pending' } as never); jest - // @ts-expect-error TS compilation fails: this overrides a private method of the ExecuteReportTask instance - .spyOn(task, '_completeJob') + // @ts-expect-error TS compilation fails: this overrides a private method of the RunSingleReportTask instance + .spyOn(task, 'completeJob') .mockResolvedValueOnce({ _id: 'test', jobtype: 'test2', status: 'pending' } as never); const mockTaskManager = taskManagerMock.createStart(); await task.init(mockTaskManager); @@ -367,14 +371,14 @@ describe('Execute Report Task', () => { jobType: 'test3', validLicenses: [], } as unknown as ExportType); - const task = new ExecuteReportTask(mockReporting, configType, logger); + const task = new RunSingleReportTask({ reporting: mockReporting, config: configType, logger }); jest - // @ts-expect-error TS compilation fails: this overrides a private method of the ExecuteReportTask instance - .spyOn(task, '_claimJob') + // @ts-expect-error TS compilation fails: this overrides a private method of the RunSingleReportTask instance + .spyOn(task, 'claimJob') .mockResolvedValueOnce({ _id: 'test', jobtype: 'test3', status: 'pending' } as never); jest - // @ts-expect-error TS compilation fails: this overrides a private method of the ExecuteReportTask instance - .spyOn(task, '_completeJob') + // @ts-expect-error TS compilation fails: this overrides a private method of the RunSingleReportTask instance + .spyOn(task, 'completeJob') .mockResolvedValueOnce({ _id: 'test', jobtype: 'test3', status: 'pending' } as never); const mockTaskManager = taskManagerMock.createStart(); await task.init(mockTaskManager); @@ -421,10 +425,10 @@ describe('Execute Report Task', () => { status: 'processing', } as unknown as estypes.UpdateUpdateWriteResponseBase) ); - const task = new ExecuteReportTask(mockReporting, configType, logger); + const task = new RunSingleReportTask({ reporting: mockReporting, config: configType, logger }); jest - // @ts-expect-error TS compilation fails: this overrides a private method of the ExecuteReportTask instance - .spyOn(task, '_claimJob') + // @ts-expect-error TS compilation fails: this overrides a private method of the RunSingleReportTask instance + .spyOn(task, 'claimJob') .mockResolvedValueOnce({ _id: 'test', jobtype: 'noop', status: 'pending' } as never); const mockTaskManager = taskManagerMock.createStart(); await task.init(mockTaskManager); diff --git a/x-pack/platform/plugins/private/reporting/server/lib/tasks/run_single_report.ts b/x-pack/platform/plugins/private/reporting/server/lib/tasks/run_single_report.ts new file mode 100644 index 0000000000000..d4734801945b8 --- /dev/null +++ b/x-pack/platform/plugins/private/reporting/server/lib/tasks/run_single_report.ts @@ -0,0 +1,160 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import moment from 'moment'; +import type { KibanaRequest } from '@kbn/core/server'; +import { QueueTimeoutError, ReportingError, numberToDuration } from '@kbn/reporting-common'; +import type { ConcreteTaskInstance, TaskInstance } from '@kbn/task-manager-plugin/server'; + +import { REPORTING_EXECUTE_TYPE, ReportTaskParams } from '.'; +import { + isExecutionError, + mapToReportingError, +} from '../../../common/errors/map_to_reporting_error'; +import { SavedReport } from '../store'; +import type { ReportProcessingFields } from '../store/store'; +import { errorLogger } from './error_logger'; +import { PrepareJobResults, RunReportTask } from './run_report'; + +type SingleReportTaskInstance = Omit & { + params: ReportTaskParams; +}; +export class RunSingleReportTask extends RunReportTask { + public get TYPE() { + return REPORTING_EXECUTE_TYPE; + } + + private async claimJob(task: ReportTaskParams): Promise { + const store = await this.opts.reporting.getStore(); + const report = await store.findReportFromTask(task); // receives seq_no and primary_term + const logger = this.logger.get(report._id); + + if (report.status === 'completed') { + throw new Error(`Can not claim the report job: it is already completed!`); + } + + const m = moment(); + + // check if job has exceeded the configured maxAttempts + const maxAttempts = this.getMaxAttempts(); + if (report.attempts >= maxAttempts) { + let err: ReportingError; + if (report.error && isExecutionError(report.error)) { + // We have an error stored from a previous attempts, so we'll use that + // error to fail the job and return it to the user. + const { error } = report; + err = mapToReportingError(error); + err.stack = error.stack; + } else { + if (report.error && report.error instanceof Error) { + errorLogger(logger, 'Error executing report', report.error); + } + err = new QueueTimeoutError( + `Max attempts reached (${maxAttempts}). Queue timeout reached.` + ); + } + await this.failJob(report, err); + throw err; + } + + const startTime = m.toISOString(); + const expirationTime = m.add(this.queueTimeout).toISOString(); + + const doc: ReportProcessingFields = { + kibana_id: this.kibanaId, + kibana_name: this.kibanaName, + attempts: report.attempts + 1, + max_attempts: maxAttempts, + started_at: startTime, + timeout: this.queueTimeout, + process_expiration: expirationTime, + }; + + const claimedReport = new SavedReport({ ...report, ...doc }); + + logger.info( + `Claiming ${claimedReport.jobtype} ${report._id} ` + + `[_index: ${report._index}] ` + + `[_seq_no: ${report._seq_no}] ` + + `[_primary_term: ${report._primary_term}] ` + + `[attempts: ${report.attempts}] ` + + `[process_expiration: ${expirationTime}]` + ); + + // event tracking of claimed job + const eventTracker = this.getEventTracker(report); + const timeSinceCreation = Date.now() - new Date(report.created_at).valueOf(); + eventTracker?.claimJob({ timeSinceCreation }); + + const resp = await store.setReportClaimed(claimedReport, doc); + claimedReport._seq_no = resp._seq_no!; + claimedReport._primary_term = resp._primary_term!; + return claimedReport; + } + + protected async prepareJob(taskInstance: ConcreteTaskInstance): Promise { + const { attempts: taskAttempts, params: reportTaskParams } = taskInstance; + + let report: SavedReport | undefined; + const isLastAttempt = taskAttempts >= this.getMaxAttempts(); + + // find the job in the store and set status to processing + const task = reportTaskParams as ReportTaskParams; + const jobId = task?.id; + + try { + if (!jobId) { + throw new Error('Invalid report data provided in scheduled task!'); + } + + // Update job status to claimed + report = await this.claimJob(task); + } catch (failedToClaim) { + // error claiming report - log the error + // could be version conflict, or too many attempts or no longer connected to ES + errorLogger(this.logger, `Error in claiming ${jobId}`, failedToClaim); + } + + return { isLastAttempt, jobId, report, task }; + } + + protected getMaxAttempts() { + return this.opts.config.capture.maxAttempts ?? 1; + } + + public getTaskDefinition() { + // round up from ms to the nearest second + const queueTimeout = + Math.ceil(numberToDuration(this.opts.config.queue.timeout).asSeconds()) + 's'; + const maxConcurrency = this.opts.config.queue.pollEnabled ? 1 : 0; + const maxAttempts = this.getMaxAttempts(); + + return { + type: REPORTING_EXECUTE_TYPE, + title: 'Reporting: execute job', + createTaskRunner: this.getTaskRunner(), + maxAttempts: maxAttempts + 1, // Add 1 so we get an extra attempt in case of failure during a Kibana restart + timeout: queueTimeout, + maxConcurrency, + }; + } + + public async scheduleTask(request: KibanaRequest, params: ReportTaskParams) { + const reportingHealth = await this.opts.reporting.getHealthInfo(); + const shouldScheduleWithApiKey = + reportingHealth.hasPermanentEncryptionKey && reportingHealth.isSufficientlySecure; + const taskInstance: SingleReportTaskInstance = { + taskType: REPORTING_EXECUTE_TYPE, + state: {}, + params, + }; + + return shouldScheduleWithApiKey + ? await this.getTaskManagerStart().schedule(taskInstance, { request }) + : await this.getTaskManagerStart().schedule(taskInstance); + } +} diff --git a/x-pack/platform/plugins/private/reporting/server/routes/common/request_handler/generate_request_handler.test.ts b/x-pack/platform/plugins/private/reporting/server/routes/common/request_handler/generate_request_handler.test.ts index 6f866a56ef4c5..dc7f543f522a7 100644 --- a/x-pack/platform/plugins/private/reporting/server/routes/common/request_handler/generate_request_handler.test.ts +++ b/x-pack/platform/plugins/private/reporting/server/routes/common/request_handler/generate_request_handler.test.ts @@ -134,6 +134,7 @@ describe('Handle request to generate', () => { "output": null, "process_expiration": undefined, "queue_time_ms": undefined, + "scheduled_report_id": undefined, "started_at": undefined, "status": "pending", "timeout": undefined, diff --git a/x-pack/platform/plugins/private/reporting/server/routes/common/request_handler/lib/index.ts b/x-pack/platform/plugins/private/reporting/server/routes/common/request_handler/lib/index.ts index d80e6e2e93b0d..256f5046051f1 100644 --- a/x-pack/platform/plugins/private/reporting/server/routes/common/request_handler/lib/index.ts +++ b/x-pack/platform/plugins/private/reporting/server/routes/common/request_handler/lib/index.ts @@ -6,3 +6,4 @@ */ export { transformRawScheduledReportToReport } from './transform_raw_scheduled_report'; +export { transformRawScheduledReportToTaskParams } from './transform_raw_scheduled_report_to_task'; diff --git a/x-pack/platform/plugins/private/reporting/server/routes/common/request_handler/lib/transform_raw_scheduled_report_to_task.ts b/x-pack/platform/plugins/private/reporting/server/routes/common/request_handler/lib/transform_raw_scheduled_report_to_task.ts new file mode 100644 index 0000000000000..c221ae4d92493 --- /dev/null +++ b/x-pack/platform/plugins/private/reporting/server/routes/common/request_handler/lib/transform_raw_scheduled_report_to_task.ts @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { SavedObject } from '@kbn/core/server'; +import { ScheduledReportType } from '../../../../types'; +import { ScheduledReportTaskParamsWithoutSpaceId } from '../../../../lib/tasks'; + +export function transformRawScheduledReportToTaskParams( + rawScheduledReport: SavedObject +): ScheduledReportTaskParamsWithoutSpaceId { + return { + id: rawScheduledReport.id, + jobtype: rawScheduledReport.attributes.jobType, + schedule: rawScheduledReport.attributes.schedule, + }; +} diff --git a/x-pack/platform/plugins/private/reporting/server/routes/common/request_handler/schedule_request_handler.test.ts b/x-pack/platform/plugins/private/reporting/server/routes/common/request_handler/schedule_request_handler.test.ts index 418242ecb57b1..abf4ca60f4541 100644 --- a/x-pack/platform/plugins/private/reporting/server/routes/common/request_handler/schedule_request_handler.test.ts +++ b/x-pack/platform/plugins/private/reporting/server/routes/common/request_handler/schedule_request_handler.test.ts @@ -22,6 +22,7 @@ import { ReportingCore } from '../../..'; import { createMockReportingCore } from '../../../test_helpers'; import { ReportingRequestHandlerContext, ReportingSetup } from '../../../types'; import { ScheduleRequestHandler } from './schedule_request_handler'; +import { TaskStatus } from '@kbn/task-manager-plugin/server'; const getMockContext = () => ({ @@ -80,7 +81,7 @@ describe('Handle request to schedule', () => { mockContext = getMockContext(); mockContext.reporting = Promise.resolve({} as ReportingSetup); - soClient = await reportingCore.getSoClient(fakeRawRequest as unknown as KibanaRequest); + soClient = await reportingCore.getScopedSoClient(fakeRawRequest as unknown as KibanaRequest); soClient.create = jest.fn().mockImplementation(async (_, opts) => { return { id: 'foo', @@ -89,6 +90,20 @@ describe('Handle request to schedule', () => { }; }); + jest.spyOn(reportingCore, 'scheduleRecurringTask').mockResolvedValue({ + id: 'task-id', + scheduledAt: new Date(), + attempts: 0, + status: TaskStatus.Idle, + runAt: new Date(), + startedAt: new Date(), + retryAt: new Date(), + state: {}, + ownerId: 'reporting', + taskType: 'reporting:printable_pdf_v2', + params: {}, + }); + requestHandler = new ScheduleRequestHandler({ reporting: reportingCore, user: { username: 'testymcgee' }, @@ -102,7 +117,7 @@ describe('Handle request to schedule', () => { }); describe('enqueueJob', () => { - test('creates a scheduled report saved object', async () => { + test('creates a scheduled report saved object and schedules task', async () => { const report = await requestHandler.enqueueJob({ exportTypeId: 'printablePdfV2', jobParams: mockJobParams, @@ -165,6 +180,12 @@ describe('Handle request to schedule', () => { isDeprecated: false, }, }); + + expect(reportingCore.scheduleRecurringTask).toHaveBeenCalledWith(mockRequest, { + id: 'foo', + jobtype: 'printable_pdf_v2', + schedule: { rrule: { freq: 1, interval: 2, tzid: 'UTC' } }, + }); }); test('creates a scheduled report saved object with notification', async () => { diff --git a/x-pack/platform/plugins/private/reporting/server/routes/common/request_handler/schedule_request_handler.ts b/x-pack/platform/plugins/private/reporting/server/routes/common/request_handler/schedule_request_handler.ts index 3ac2a26d285a9..1c96e1d1a3614 100644 --- a/x-pack/platform/plugins/private/reporting/server/routes/common/request_handler/schedule_request_handler.ts +++ b/x-pack/platform/plugins/private/reporting/server/routes/common/request_handler/schedule_request_handler.ts @@ -19,7 +19,10 @@ import { } from '../../../types'; import { SCHEDULED_REPORT_SAVED_OBJECT_TYPE } from '../../../saved_objects'; import { RequestHandler, RequestParams } from './request_handler'; -import { transformRawScheduledReportToReport } from './lib'; +import { + transformRawScheduledReportToReport, + transformRawScheduledReportToTaskParams, +} from './lib'; // Using the limit specified in the cloud email service limits // https://www.elastic.co/docs/explore-analyze/alerts-cases/watcher/enable-watcher#cloud-email-service-limits @@ -108,8 +111,8 @@ export class ScheduleRequestHandler extends RequestHandler< const { exportTypeId, jobParams, schedule, notification } = params; const { reporting, logger, req, user } = this.opts; - const soClient = await reporting.getSoClient(req); - const { version, job, jobType } = await this.createJob(exportTypeId, jobParams); + const soClient = await reporting.getScopedSoClient(req); + const { version, job, jobType, name } = await this.createJob(exportTypeId, jobParams); const payload = { ...job, @@ -147,7 +150,14 @@ export class ScheduleRequestHandler extends RequestHandler< ); logger.debug(`Successfully created scheduled report: ${report.id}`); - // TODO - Schedule the report with Task Manager + // Schedule the report with Task Manager + const task = await reporting.scheduleRecurringTask( + req, + transformRawScheduledReportToTaskParams(report) + ); + logger.info( + `Scheduled "${name}" reporting task. Task ID: task:${task.id}. Report ID: ${report.id}` + ); return transformRawScheduledReportToReport(report); } diff --git a/x-pack/platform/plugins/private/reporting/server/routes/internal/schedule/integration_tests/scheduling_from_jobparams.test.ts b/x-pack/platform/plugins/private/reporting/server/routes/internal/schedule/integration_tests/scheduling_from_jobparams.test.ts index 23f2229090234..675953d2cbbf5 100644 --- a/x-pack/platform/plugins/private/reporting/server/routes/internal/schedule/integration_tests/scheduling_from_jobparams.test.ts +++ b/x-pack/platform/plugins/private/reporting/server/routes/internal/schedule/integration_tests/scheduling_from_jobparams.test.ts @@ -103,7 +103,7 @@ describe(`POST ${INTERNAL_ROUTES.SCHEDULE_PREFIX}`, () => { mockExportTypesRegistry = new ExportTypesRegistry(); mockExportTypesRegistry.register(mockPdfExportType); - soClient = await reportingCore.getSoClient(fakeRawRequest as unknown as KibanaRequest); + soClient = await reportingCore.getScopedSoClient(fakeRawRequest as unknown as KibanaRequest); soClient.create = jest.fn().mockImplementation(async (_, opts) => { return { id: 'foo', diff --git a/x-pack/platform/plugins/private/reporting/server/test_helpers/create_mock_reportingplugin.ts b/x-pack/platform/plugins/private/reporting/server/test_helpers/create_mock_reportingplugin.ts index c24dabf83f369..7780a6492a1db 100644 --- a/x-pack/platform/plugins/private/reporting/server/test_helpers/create_mock_reportingplugin.ts +++ b/x-pack/platform/plugins/private/reporting/server/test_helpers/create_mock_reportingplugin.ts @@ -80,7 +80,10 @@ export const createMockPluginStart = async ( return { analytics: coreSetupMock.analytics, esClient: elasticsearchServiceMock.createClusterClient(), - savedObjects: { getScopedClient: jest.fn().mockReturnValue(savedObjectsClient) }, + savedObjects: { + getScopedClient: jest.fn().mockReturnValue(savedObjectsClient), + createInternalRepository: jest.fn().mockReturnValue(savedObjectsClient), + }, uiSettings: { asScopedToClient: () => ({ get: jest.fn() }) }, discover: discoverPluginMock.createStartContract(), data: dataPluginMock.createStartContract(), diff --git a/x-pack/platform/plugins/private/reporting/tsconfig.json b/x-pack/platform/plugins/private/reporting/tsconfig.json index 5ec0123d9749f..64588202c1a83 100644 --- a/x-pack/platform/plugins/private/reporting/tsconfig.json +++ b/x-pack/platform/plugins/private/reporting/tsconfig.json @@ -54,7 +54,9 @@ "@kbn/core-security-common", "@kbn/core-http-server-utils", "@kbn/core-saved-objects-server", + "@kbn/rrule", "@kbn/notifications-plugin", + "@kbn/spaces-utils", ], "exclude": [ "target/**/*", diff --git a/x-pack/platform/plugins/shared/task_manager/server/constants.ts b/x-pack/platform/plugins/shared/task_manager/server/constants.ts index 86238e44ae642..3fa3ac2489b74 100644 --- a/x-pack/platform/plugins/shared/task_manager/server/constants.ts +++ b/x-pack/platform/plugins/shared/task_manager/server/constants.ts @@ -16,4 +16,5 @@ export const CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE: string[] = [ // task types requiring a concurrency 'report:execute', + 'report:execute-scheduled', ]; diff --git a/x-pack/platform/plugins/shared/task_manager/server/task_type_dictionary.ts b/x-pack/platform/plugins/shared/task_manager/server/task_type_dictionary.ts index cf21ad4c70ff7..84415cc81a0ce 100644 --- a/x-pack/platform/plugins/shared/task_manager/server/task_type_dictionary.ts +++ b/x-pack/platform/plugins/shared/task_manager/server/task_type_dictionary.ts @@ -35,6 +35,9 @@ export const REMOVED_TYPES: string[] = [ export const SHARED_CONCURRENCY_TASKS: string[][] = [ // for testing ['sampleTaskSharedConcurrencyType1', 'sampleTaskSharedConcurrencyType2'], + + // reporting + ['report:execute', 'report:execute-scheduled'], ]; /** diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts index a1ab089fdf847..777e0e7668326 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts @@ -169,6 +169,7 @@ export default function ({ getService }: FtrProviderContext) { 'osquery:telemetry-packs', 'osquery:telemetry-saved-queries', 'report:execute', + 'report:execute-scheduled', 'risk_engine:risk_scoring', 'search:agentless-connectors-manager', 'security-solution-ea-asset-criticality-ecs-migration', diff --git a/x-pack/test/reporting_api_integration/reporting_and_security/security_roles_privileges.ts b/x-pack/test/reporting_api_integration/reporting_and_security/security_roles_privileges.ts index 5604ff058c34e..f565f97188d17 100644 --- a/x-pack/test/reporting_api_integration/reporting_and_security/security_roles_privileges.ts +++ b/x-pack/test/reporting_api_integration/reporting_and_security/security_roles_privileges.ts @@ -7,6 +7,8 @@ import expect from '@kbn/expect'; import { SerializedSearchSourceFields } from '@kbn/data-plugin/common'; +import { SearchHit } from '@elastic/elasticsearch/lib/api/types'; +import { SerializedConcreteTaskInstance } from '@kbn/task-manager-plugin/server/task'; import { FtrProviderContext } from '../ftr_provider_context'; // eslint-disable-next-line import/no-default-export @@ -14,13 +16,33 @@ export default function ({ getService }: FtrProviderContext) { const reportingAPI = getService('reportingAPI'); const supertest = getService('supertest'); + function testExpectedTask( + id: string, + jobtype: string, + task: SearchHit<{ task: SerializedConcreteTaskInstance }> + ) { + expect(task._source?.task.taskType).to.eql('report:execute-scheduled'); + + const params = JSON.parse(task._source?.task.params ?? ''); + expect(params.id).to.eql(id); + expect(params.jobtype).to.eql(jobtype); + + expect(task._source?.task.apiKey).not.to.be(undefined); + expect(task._source?.task.schedule?.rrule).not.to.be(undefined); + + expect(task._source?.task.schedule?.interval).to.be(undefined); + } describe('Security Roles and Privileges for Applications', () => { + const scheduledReportIds: string[] = []; + const scheduledReportTaskIds: string[] = []; before(async () => { await reportingAPI.initEcommerce(); }); after(async () => { await reportingAPI.teardownEcommerce(); await reportingAPI.deleteAllReports(); + await reportingAPI.deleteScheduledReportSOs(scheduledReportIds); + await reportingAPI.deleteTasks(scheduledReportTaskIds); }); describe('Dashboard: Generate PDF report', () => { @@ -197,6 +219,12 @@ export default function ({ getService }: FtrProviderContext) { const soResult = await reportingAPI.getScheduledReportSO(res.body.job.id); expect(soResult.status).to.eql(200); expect(soResult.body._source.scheduled_report.title).to.eql('test PDF allowed'); + scheduledReportIds.push(res.body.job.id); + + const taskResult = await reportingAPI.getTask(res.body.job.id); + expect(taskResult.status).to.eql(200); + testExpectedTask(res.body.job.id, 'printable_pdf_v2', taskResult.body); + scheduledReportTaskIds.push(res.body.job.id); }); }); @@ -235,6 +263,12 @@ export default function ({ getService }: FtrProviderContext) { const soResult = await reportingAPI.getScheduledReportSO(res.body.job.id); expect(soResult.status).to.eql(200); expect(soResult.body._source.scheduled_report.title).to.eql('test PDF allowed'); + scheduledReportIds.push(res.body.job.id); + + const taskResult = await reportingAPI.getTask(res.body.job.id); + expect(taskResult.status).to.eql(200); + testExpectedTask(res.body.job.id, 'printable_pdf_v2', taskResult.body); + scheduledReportTaskIds.push(res.body.job.id); }); }); @@ -273,6 +307,12 @@ export default function ({ getService }: FtrProviderContext) { const soResult = await reportingAPI.getScheduledReportSO(res.body.job.id); expect(soResult.status).to.eql(200); expect(soResult.body._source.scheduled_report.title).to.eql('test PDF allowed'); + scheduledReportIds.push(res.body.job.id); + + const taskResult = await reportingAPI.getTask(res.body.job.id); + expect(taskResult.status).to.eql(200); + testExpectedTask(res.body.job.id, 'printable_pdf_v2', taskResult.body); + scheduledReportTaskIds.push(res.body.job.id); }); }); @@ -314,6 +354,12 @@ export default function ({ getService }: FtrProviderContext) { const soResult = await reportingAPI.getScheduledReportSO(res.body.job.id); expect(soResult.status).to.eql(200); expect(soResult.body._source.scheduled_report.title).to.eql('allowed search'); + scheduledReportIds.push(res.body.job.id); + + const taskResult = await reportingAPI.getTask(res.body.job.id); + expect(taskResult.status).to.eql(200); + testExpectedTask(res.body.job.id, 'csv_searchsource', taskResult.body); + scheduledReportTaskIds.push(res.body.job.id); }); }); diff --git a/x-pack/test/reporting_api_integration/services/scenarios.ts b/x-pack/test/reporting_api_integration/services/scenarios.ts index adcd3151aef20..aba6b22b9086b 100644 --- a/x-pack/test/reporting_api_integration/services/scenarios.ts +++ b/x-pack/test/reporting_api_integration/services/scenarios.ts @@ -311,6 +311,22 @@ export function createScenarios({ getService }: Pick { + return await Promise.all( + ids.map((id) => esSupertest.delete(`/${ALERTING_CASES_SAVED_OBJECT_INDEX}/_doc/${id}`)) + ); + }; + + const getTask = async (taskId: string) => { + return await esSupertest.get(`/.kibana_task_manager/_doc/task:${taskId}`); + }; + + const deleteTasks = async (ids: string[]) => { + return await Promise.all( + ids.map((id) => esSupertest.delete(`/.kibana_task_manager/_doc/task:${id}`)) + ); + }; + return { logTaskManagerHealth, initEcommerce, @@ -341,5 +357,8 @@ export function createScenarios({ getService }: Pick