Skip to content

Commit

Permalink
fix: improve SIGINT handling for queued jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelcr committed Jan 25, 2023
1 parent 9b26439 commit e16fcd5
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
16 changes: 10 additions & 6 deletions src/token-processor/queue/job-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export class JobQueue {
* begin.
*/
start() {
console.log(`JobQueue starting queue...`);
logger.info(`JobQueue starting queue...`);
this.isRunning = true;
this.queue.start();
void this.runQueueLoop();
Expand All @@ -65,7 +65,7 @@ export class JobQueue {
* Shuts down the queue and waits for its current work to be complete.
*/
async close() {
console.log(`JobQueue closing, waiting on ${this.queue.pending} jobs to finish...`);
logger.info(`JobQueue closing, waiting on ${this.queue.pending} pending jobs...`);
this.isRunning = false;
await this.queue.onIdle();
this.queue.pause();
Expand All @@ -88,10 +88,14 @@ export class JobQueue {
this.jobIds.add(job.id);
await this.db.updateJobStatus({ id: job.id, status: DbJobStatus.queued });
void this.queue.add(async () => {
if (job.token_id) {
await new ProcessTokenJob({ db: this.db, job: job }).work();
} else if (job.smart_contract_id) {
await new ProcessSmartContractJob({ db: this.db, apiDb: this.apiDb, job: job }).work();
if (this.isRunning) {
if (job.token_id) {
await new ProcessTokenJob({ db: this.db, job: job }).work();
} else if (job.smart_contract_id) {
await new ProcessSmartContractJob({ db: this.db, apiDb: this.apiDb, job: job }).work();
}
} else {
logger.info(`JobQueue cancelling job ${job.id}, queue is now closed`);
}
this.jobIds.delete(job.id);
});
Expand Down
4 changes: 3 additions & 1 deletion src/token-processor/queue/job/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ export abstract class Job {
retries <= ENV.JOB_QUEUE_MAX_RETRIES
) {
logger.info(
`Job ${this.description()} recoverable error, trying again later: ${error.message}`
`Job ${this.description()} recoverable error after ${sw.getElapsed()}ms, trying again later: ${
error.message
}`
);
await this.db.updateJobStatus({ id: this.job.id, status: DbJobStatus.pending });
} else {
Expand Down

0 comments on commit e16fcd5

Please sign in to comment.