Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions x-pack/plugins/reporting/server/lib/tasks/monitor_reports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,31 +92,42 @@ export class MonitorReportsTask implements ReportingTask {
return;
}

const {
_id: jobId,
_source: { process_expiration: processExpiration, status },
} = recoveredJob;
const report = new SavedReport({ ...recoveredJob, ...recoveredJob._source });
const { _id: jobId, process_expiration: processExpiration, status } = report;
const eventLog = this.reporting.getEventLogger(report);

if (![statuses.JOB_STATUS_PENDING, statuses.JOB_STATUS_PROCESSING].includes(status)) {
throw new Error(`Invalid job status in the monitoring search result: ${status}`); // only pending or processing jobs possibility need rescheduling
const invalidStatusError = new Error(
`Invalid job status in the monitoring search result: ${status}`
); // only pending or processing jobs possibility need rescheduling
this.logger.error(invalidStatusError);
eventLog.logError(invalidStatusError);

// fatal: can not reschedule the job
throw invalidStatusError;
}

if (status === statuses.JOB_STATUS_PENDING) {
this.logger.info(
const migratingJobError = new Error(
`${jobId} was scheduled in a previous version and left in [${status}] status. Rescheduling...`
);
this.logger.error(migratingJobError);
eventLog.logError(migratingJobError);
}

if (status === statuses.JOB_STATUS_PROCESSING) {
const expirationTime = moment(processExpiration);
const overdueValue = moment().valueOf() - expirationTime.valueOf();
this.logger.info(
const overdueExpirationError = new Error(
`${jobId} status is [${status}] and the expiration time was [${overdueValue}ms] ago. Rescheduling...`
);
this.logger.error(overdueExpirationError);
eventLog.logError(overdueExpirationError);
}

eventLog.logRetry();

// clear process expiration and set status to pending
const report = new SavedReport({ ...recoveredJob, ...recoveredJob._source });
await reportingStore.prepareReportForRetry(report); // if there is a version conflict response, this just throws and logs an error

// clear process expiration and reschedule
Expand Down Expand Up @@ -154,8 +165,6 @@ export class MonitorReportsTask implements ReportingTask {

const newTask = await this.reporting.scheduleTask(task);

this.reporting.getEventLogger({ _id: task.id, ...task }, newTask).logRetry();

return newTask;
}

Expand Down