Skip to content

Commit

Permalink
job ergonmics
Browse files Browse the repository at this point in the history
  • Loading branch information
evantahler committed Oct 14, 2024
1 parent 0656a25 commit ad3b12a
Showing 1 changed file with 18 additions and 14 deletions.
32 changes: 18 additions & 14 deletions backend/initializers/resque.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import {
type Job,
Worker,
} from "node-resque";
import { randomUUID } from "crypto";
import { Initializer } from "../classes/Initializer";
import { TypedError } from "../classes/TypedError";

Expand All @@ -26,12 +25,14 @@ declare module "../classes/API" {
}
}

let SERVER_JOB_COUNTER = 1;

export class Resque extends Initializer {
constructor() {
super(namespace);

this.loadPriority = 250;
this.startPriority = 150;
this.startPriority = 10000;
this.stopPriority = 900;
}

Expand Down Expand Up @@ -108,52 +109,52 @@ export class Resque extends Initializer {
? config.tasks.queues
: await config.tasks.queues(),
timeout: config.tasks.timeout,
name: `worker-${id}`,
name: `worker:${id}`,
},
api.resque.jobs,
);

// normal worker emitters
worker.on("start", () => {
logger.info(`[resque:worker:${id}] started`);
logger.info(`[resque:${worker.name}] started`);
});
worker.on("end", () => {
logger.info(`[resque:worker:${id}] ended`);
logger.info(`[resque:${worker.name}] ended`);
});
worker.on("cleaning_worker", () => {
logger.debug(`[resque:worker:${id}] cleaning worker`);
logger.debug(`[resque:${worker.name}] cleaning worker`);
});
worker.on("poll", (queue) => {
logger.debug(`[resque:worker:${id}] polling, ${queue}`);
logger.debug(`[resque:${worker.name}] polling, ${queue}`);
});
worker.on("job", (queue, job: ParsedJob) => {
logger.debug(
`[resque:worker:${id}] job acquired, ${queue}, ${job.class}, ${JSON.stringify(job.args[0])}`,
`[resque:${worker.name}] job acquired, ${queue}, ${job.class}, ${JSON.stringify(job.args[0])}`,
);
});
worker.on("reEnqueue", (queue, job: ParsedJob, plugin) => {
logger.debug(
`[resque:worker:${id}] job reEnqueue, ${queue}, ${job.class}, ${JSON.stringify(job.args[0])}`,
`[resque:${worker.name}] job reEnqueue, ${queue}, ${job.class}, ${JSON.stringify(job.args[0])}`,
);
});
worker.on("pause", () => {
logger.debug(`[resque:worker:${id}] paused`);
logger.debug(`[resque:${worker.name}] paused`);
});

worker.on("failure", (queue, job, failure, duration) => {
logger.warn(
`[resque:worker:${id}] job failed, ${queue}, ${job.class}, ${JSON.stringify(job?.args[0] ?? {})}: ${failure} (${duration}ms)`,
`[resque:${worker.name}] job failed, ${queue}, ${job.class}, ${JSON.stringify(job?.args[0] ?? {})}: ${failure} (${duration}ms)`,
);
});
worker.on("error", (error, queue, job) => {
logger.warn(
`[resque:worker:${id}] job error, ${queue}, ${job?.class}, ${JSON.stringify(job?.args[0] ?? {})}: ${error}`,
`[resque:${worker.name}] job error, ${queue}, ${job?.class}, ${JSON.stringify(job?.args[0] ?? {})}: ${error}`,
);
});

worker.on("success", (queue, job: ParsedJob, result, duration) => {
logger.info(
`[resque:worker:${id}] job success ${queue}, ${job.class}, ${JSON.stringify(job.args[0])} | ${JSON.stringify(result)} (${duration}ms)`,
`[resque:${worker.name}] job success ${queue}, ${job.class}, ${JSON.stringify(job.args[0])} | ${JSON.stringify(result)} (${duration}ms)`,
);
});

Expand Down Expand Up @@ -193,7 +194,10 @@ export class Resque extends Initializer {
pluginOptions: {},

perform: async function (params: ActionParams<typeof action>) {
const connection = new Connection("resque", `job:${randomUUID()}}`);
const connection = new Connection(
"resque",
`job:${api.process.name}:${SERVER_JOB_COUNTER++}}`,
);
const paramsAsFormData = new FormData();

if (typeof params.entries === "function") {
Expand Down

0 comments on commit ad3b12a

Please sign in to comment.