Skip to content

Commit

Permalink
Notify Job Failure (#69)
Browse files Browse the repository at this point in the history
* Notify job failure through RPC

* bump version to 1.6.0
  • Loading branch information
EverettSummer authored Nov 2, 2023
1 parent 97a434c commit ecfd793
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 8 deletions.
5 changes: 4 additions & 1 deletion config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,7 @@ jobExpireTime:
Finished: 2

# fonts directory that can be used by ffmpeg
fontsDir: ${project_root}/fonts
fontsDir: ${project_root}/fonts

# will deprecate once Albireo is deprecated
albireoRPC: 'http://localhost:8080'
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "mira-video-manager",
"version": "1.5.5",
"version": "1.6.0",
"description": "Video Process for mira project",
"main": "index.js",
"scripts": {
Expand Down
22 changes: 18 additions & 4 deletions src/JobExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,14 @@

import { ConfigManager } from "./utils/ConfigManager";
import { inject, injectable, interfaces } from 'inversify';
import { EXEC_MODE_META, META_JOB_KEY, META_JOB_QUEUE, NORMAL_JOB_KEY, TYPES_VM } from './TYPES';
import {
EXEC_MODE_META,
META_JOB_KEY,
META_JOB_QUEUE,
NORMAL_JOB_KEY,
TYPES_VM,
VIDEO_JOB_RESULT_KEY
} from './TYPES';
import { JobMessage } from './domains/JobMessage';
import { DatabaseService } from './services/DatabaseService';
import { mkdir, stat } from 'fs/promises';
Expand Down Expand Up @@ -45,7 +52,7 @@ import { randomUUID } from 'crypto';
import { getStdLogger } from './utils/Logger';
import { JobCleaner } from './JobManager/JobCleaner';
import { JobType } from './domains/JobType';
import { VideoOutputMetadata } from './domains/VideoOutputMetadata';
import { JobFailureMessage } from './domains/JobFailureMessage';

const logger = getStdLogger();

Expand Down Expand Up @@ -90,6 +97,7 @@ export class JobExecutor implements JobApplication {
await this._jobCleaner.start(this.id);

await this._rabbitmqService.initPublisher(VIDEO_MANAGER_EXCHANGE, 'direct', VIDEO_MANAGER_GENERAL);
await this._rabbitmqService.initPublisher(VIDEO_MANAGER_EXCHANGE, 'direct', VIDEO_JOB_RESULT_KEY);
await this._rabbitmqService.initConsumer(VIDEO_MANAGER_EXCHANGE, 'direct', COMMAND_QUEUE, VIDEO_MANAGER_COMMAND);
if (this.execMode === EXEC_MODE_META) {
await this._rabbitmqService.initConsumer(JOB_EXCHANGE, 'direct', META_JOB_QUEUE, META_JOB_KEY, true);
Expand Down Expand Up @@ -232,10 +240,10 @@ export class JobExecutor implements JobApplication {
}
});

this.currentJM.events.on(JobManager.EVENT_JOB_FAILED, async (failedJob: Job) => {
// TODO: notify failed
this.currentJM.events.on(JobManager.EVENT_JOB_FAILED, async (jobId: string) => {
try {
await this.finalizeJM();
await this.sendJobFailureMessage(jobId);
} catch (err) {
logger.error(err);
this._sentry.capture(err);
Expand Down Expand Up @@ -294,6 +302,12 @@ export class JobExecutor implements JobApplication {
}
}

private async sendJobFailureMessage(jobId: string) {
const msg = new JobFailureMessage();
msg.jobId = jobId;
await this._rabbitmqService.publish(VIDEO_MANAGER_EXCHANGE, VIDEO_JOB_RESULT_KEY, msg);
}

/**
* @deprecated
* @param job
Expand Down
41 changes: 40 additions & 1 deletion src/JobScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ import {
} from '@irohalab/mira-shared';
import { randomUUID } from 'crypto';
import { getStdLogger } from './utils/Logger';
import { META_JOB_KEY, META_JOB_QUEUE, NORMAL_JOB_KEY } from './TYPES';
import { META_JOB_KEY, META_JOB_QUEUE, NORMAL_JOB_KEY, VIDEO_JOB_RESULT_KEY, VIDEO_JOB_RESULT_QUEUE } from './TYPES';
import { JobType } from './domains/JobType';
import { ValidateAction } from './domains/ValidateAction';
import axios from 'axios';
import { JobFailureMessage } from './domains/JobFailureMessage';

const JOB_STATUS_CHECK_INTERVAL = 15 * 60 * 1000;
const sleep = promisify(setTimeout);
Expand All @@ -53,6 +55,7 @@ const logger = getStdLogger();
@injectable()
export class JobScheduler implements JobApplication {
private _downloadMessageConsumeTag: string;
private _videoManagerJobMessageConsumeTag: string;
private _commandMessageConsumeTag: string;
private _jobMessageConsumeTag: string;
private _metaJobMessageConsumeTag: string;
Expand All @@ -70,6 +73,7 @@ export class JobScheduler implements JobApplication {
await this._rabbitmqService.initPublisher(JOB_EXCHANGE, 'direct', META_JOB_KEY);
await this._rabbitmqService.initPublisher(VIDEO_MANAGER_EXCHANGE, 'direct', VIDEO_MANAGER_COMMAND);
await this._rabbitmqService.initConsumer(VIDEO_MANAGER_EXCHANGE, 'direct', COMMAND_QUEUE, VIDEO_MANAGER_COMMAND, true);
await this._rabbitmqService.initConsumer(VIDEO_MANAGER_EXCHANGE, 'direct', VIDEO_JOB_RESULT_QUEUE, VIDEO_JOB_RESULT_KEY, true);
await this._rabbitmqService.initConsumer(JOB_EXCHANGE, 'direct', JOB_QUEUE, NORMAL_JOB_KEY, true);
await this._rabbitmqService.initConsumer(DOWNLOAD_MESSAGE_EXCHANGE, 'direct', DOWNLOAD_MESSAGE_QUEUE);
await this._rabbitmqService.initConsumer(JOB_EXCHANGE, 'direct', META_JOB_QUEUE, META_JOB_KEY, true);
Expand All @@ -83,6 +87,17 @@ export class JobScheduler implements JobApplication {
return false;
}
});

this._videoManagerJobMessageConsumeTag = await this._rabbitmqService.consume(VIDEO_JOB_RESULT_QUEUE, async (msg) => {
try {
await this.sendJobFailureNotification(msg as JobFailureMessage);
} catch (ex) {
logger.error(ex);
this._sentry.capture(ex);
}
return true;
});

this._commandMessageConsumeTag = await this._rabbitmqService.consume(COMMAND_QUEUE, async (msg) => {
try {
return await this.onCommandMessage(msg as CommandMessage);
Expand Down Expand Up @@ -273,4 +288,28 @@ export class JobScheduler implements JobApplication {
await this._rabbitmqService.publish(JOB_EXCHANGE, NORMAL_JOB_KEY, jobMessage);
}
}

private async sendJobFailureNotification(msg: JobFailureMessage): Promise<void> {
const job = await this._databaseService.getJobRepository().findOne({id: msg.jobId});
if (job) {
await this.callAlbireoRpc(job);
logger.info('sent notification for failed job ' + msg.jobId);
} else {
throw new Error('no job found for failed job message, job id is ' + msg.jobId);
}
}

private async callAlbireoRpc(job: Job): Promise<void> {
const rpcUrl = this._configManager.albireoRPCUrl();
await axios.post(`${rpcUrl}/video_job_failed`, {
job: {
id: job.id,
video_id: job.jobMessage.videoId,
bangumi_id: job.jobMessage.bangumiId,
jobType: job.jobMessage.jobType,
startTime: job.startTime.toISOString(),
endTIme: (job.finishedTime ?? new Date()).toISOString()
}
});
}
}
4 changes: 3 additions & 1 deletion src/TYPES.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ export const EXEC_MODE_META = 'META_MODE';

// job queues:
export const META_JOB_QUEUE = 'meta_job_queue';
export const VIDEO_JOB_RESULT_QUEUE = 'video_job_result_queue';

// routing key
export const NORMAL_JOB_KEY = 'normal_job';
export const META_JOB_KEY = 'meta_job';
export const META_JOB_KEY = 'meta_job';
export const VIDEO_JOB_RESULT_KEY = 'video_job_result_key';
24 changes: 24 additions & 0 deletions src/domains/JobFailureMessage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2023 IROHA LAB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { MQMessage } from '@irohalab/mira-shared';
import { nanoid } from 'nanoid';

export class JobFailureMessage implements MQMessage {
public id: string = nanoid(10);
public version: string = '1';
public jobId: string;
}
5 changes: 5 additions & 0 deletions src/utils/ConfigManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,9 @@ export interface ConfigManager extends BaseConfigManager {
* Get fonts directory
*/
fontsDir(): string;

/**
* Temp solution to communicate with Albireo
*/
albireoRPCUrl(): string;
}
5 changes: 5 additions & 0 deletions src/utils/ConfigManagerImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type AppConfg = {
jobLogPath: string;
jobExpireTime: {Canceled: number, UnrecoverableError: number, Finished: number};
fontsDir: string;
albireoRPC: string;
};

const CWD_PATTERN = /\${cwd}/;
Expand Down Expand Up @@ -233,4 +234,8 @@ export class ConfigManagerImpl implements ConfigManager {
}
return null;
}

public albireoRPCUrl(): string {
return this._config.albireoRPC;
}
}

0 comments on commit ecfd793

Please sign in to comment.