Skip to content

Commit 0656a25

Browse files
committed
mange workers directly
1 parent 0f2bf71 commit 0656a25

File tree

3 files changed

+72
-85
lines changed

3 files changed

+72
-85
lines changed

backend/.env.example

+1
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,4 @@ REDIS_URL="redis://localhost:6379/0"
2424
REDIS_URL_TEST="redis://localhost:6379/1"
2525

2626
TASKS_ENABLED=true
27+
TASK_PROCESSORS=1

backend/config/tasks.ts

+2-5
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,8 @@ export const configTasks = {
77
// queues: async () => { return ["queueA", "queueB"]; } as string[] | (() => Promise<string[]>)>,
88
// how long to sleep between jobs / scheduler checks
99
timeout: 5000,
10-
// at minimum, how many parallel taskProcessors should this node spawn?
11-
// (have number > 0 to enable, and < 1 to disable)
12-
minTaskProcessors: 1,
13-
// at maximum, how many parallel taskProcessors should this node spawn?
14-
maxTaskProcessors: 5,
10+
// how many parallel workers we run?
11+
taskProcessors: await loadFromEnvIfSet("TASK_PROCESSORS", 1),
1512
// how often should we check the event loop to spawn more taskProcessors?
1613
checkTimeout: 500,
1714
// how many ms would constitute an event loop delay to halt taskProcessors spawning?

backend/initializers/resque.ts

+69-80
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ import {
1010
import {
1111
Queue,
1212
Scheduler,
13-
MultiWorker,
1413
type ParsedJob,
1514
type Job,
15+
Worker,
1616
} from "node-resque";
1717
import { randomUUID } from "crypto";
1818
import { Initializer } from "../classes/Initializer";
@@ -97,86 +97,79 @@ export class Resque extends Initializer {
9797
if (api.resque.scheduler) return api.resque.scheduler.end();
9898
};
9999

100-
startMultiWorker = async () => {
101-
api.resque.multiWorker = new MultiWorker(
102-
{
103-
connection: { redis: api.redis.redis },
104-
queues: Array.isArray(config.tasks.queues)
105-
? config.tasks.queues
106-
: await config.tasks.queues(),
107-
timeout: config.tasks.timeout,
108-
checkTimeout: config.tasks.checkTimeout,
109-
minTaskProcessors: config.tasks.minTaskProcessors,
110-
maxTaskProcessors: config.tasks.maxTaskProcessors,
111-
maxEventLoopDelay: config.tasks.maxEventLoopDelay,
112-
},
113-
api.resque.jobs,
114-
);
115-
116-
// normal worker emitters
117-
api.resque.multiWorker.on("start", (workerId) => {
118-
logger.info(`[resque:worker] started, ${workerId}`);
119-
});
120-
api.resque.multiWorker.on("end", (workerId) => {
121-
logger.info(`[resque:worker] ended, ${workerId}`);
122-
});
123-
api.resque.multiWorker.on("cleaning_worker", (workerId, worker, pid) => {
124-
logger.debug(
125-
`[resque:worker] cleaning worker, ${workerId}, ${worker}, ${pid}`,
126-
);
127-
});
128-
api.resque.multiWorker.on("poll", (workerId, queue) => {
129-
logger.debug(`[resque:worker] polling, ${workerId}, ${queue}`);
130-
});
131-
api.resque.multiWorker.on("job", (workerId, queue, job: ParsedJob) => {
132-
logger.debug(
133-
`[resque:worker] job acquired, ${workerId}, ${queue}, ${job.class}, ${JSON.stringify(job.args[0])}`,
100+
startWorkers = async () => {
101+
let id = 0;
102+
103+
while (id < config.tasks.taskProcessors) {
104+
const worker = new Worker(
105+
{
106+
connection: { redis: api.redis.redis },
107+
queues: Array.isArray(config.tasks.queues)
108+
? config.tasks.queues
109+
: await config.tasks.queues(),
110+
timeout: config.tasks.timeout,
111+
name: `worker-${id}`,
112+
},
113+
api.resque.jobs,
134114
);
135-
});
136-
api.resque.multiWorker.on(
137-
"reEnqueue",
138-
(workerId, queue, job: ParsedJob, plugin) => {
115+
116+
// normal worker emitters
117+
worker.on("start", () => {
118+
logger.info(`[resque:worker:${id}] started`);
119+
});
120+
worker.on("end", () => {
121+
logger.info(`[resque:worker:${id}] ended`);
122+
});
123+
worker.on("cleaning_worker", () => {
124+
logger.debug(`[resque:worker:${id}] cleaning worker`);
125+
});
126+
worker.on("poll", (queue) => {
127+
logger.debug(`[resque:worker:${id}] polling, ${queue}`);
128+
});
129+
worker.on("job", (queue, job: ParsedJob) => {
139130
logger.debug(
140-
`[resque:worker] job reEnqueue, ${workerId}, ${queue}, ${job.class}, ${JSON.stringify(job.args[0])}`,
131+
`[resque:worker:${id}] job acquired, ${queue}, ${job.class}, ${JSON.stringify(job.args[0])}`,
141132
);
142-
},
143-
);
144-
api.resque.multiWorker.on("pause", (workerId) => {
145-
logger.debug(`[resque:worker] paused, ${workerId}`);
146-
});
133+
});
134+
worker.on("reEnqueue", (queue, job: ParsedJob, plugin) => {
135+
logger.debug(
136+
`[resque:worker:${id}] job reEnqueue, ${queue}, ${job.class}, ${JSON.stringify(job.args[0])}`,
137+
);
138+
});
139+
worker.on("pause", () => {
140+
logger.debug(`[resque:worker:${id}] paused`);
141+
});
147142

148-
api.resque.multiWorker.on("failure", (workerId, queue, job, failure) => {
149-
logger.warn(
150-
`[resque:worker] job failed, ${workerId}, ${queue}, ${job.class}, ${JSON.stringify(job?.args[0] ?? {})}: ${failure}`,
151-
);
152-
});
153-
api.resque.multiWorker.on("error", (error, workerId, queue, job) => {
154-
logger.warn(
155-
`[resque:worker] job error, ${workerId}, ${queue}, ${job?.class}, ${JSON.stringify(job?.args[0] ?? {})}: ${error}`,
156-
);
157-
});
143+
worker.on("failure", (queue, job, failure, duration) => {
144+
logger.warn(
145+
`[resque:worker:${id}] job failed, ${queue}, ${job.class}, ${JSON.stringify(job?.args[0] ?? {})}: ${failure} (${duration}ms)`,
146+
);
147+
});
148+
worker.on("error", (error, queue, job) => {
149+
logger.warn(
150+
`[resque:worker:${id}] job error, ${queue}, ${job?.class}, ${JSON.stringify(job?.args[0] ?? {})}: ${error}`,
151+
);
152+
});
158153

159-
api.resque.multiWorker.on(
160-
"success",
161-
(workerId, queue, job: ParsedJob, result, duration) => {
154+
worker.on("success", (queue, job: ParsedJob, result, duration) => {
162155
logger.info(
163-
`[resque:worker] job success, ${workerId}, ${queue}, ${job.class}, ${JSON.stringify(job.args[0])} | ${JSON.stringify(result)} (${duration}ms)`,
156+
`[resque:worker:${id}] job success ${queue}, ${job.class}, ${JSON.stringify(job.args[0])} | ${JSON.stringify(result)} (${duration}ms)`,
164157
);
165-
},
166-
);
158+
});
167159

168-
api.resque.multiWorker.on("multiWorkerAction", (verb, delay) => {
169-
logger.debug(`[resque:worker] multiworker ${verb}, ${delay}`);
170-
});
160+
api.resque.workers.push(worker);
161+
id++;
162+
}
171163

172-
if (config.tasks.minTaskProcessors > 0) {
173-
api.resque.multiWorker.start();
164+
for (const worker of api.resque.workers) {
165+
await worker.connect();
166+
await worker.start();
174167
}
175168
};
176169

177-
stopMultiWorker = async () => {
178-
if (api.resque.multiWorker && config.tasks.minTaskProcessors > 0) {
179-
return api.resque.multiWorker.stop();
170+
stopWorkers = async () => {
171+
for (const worker of api.resque.workers) {
172+
await worker.end();
180173
}
181174
};
182175

@@ -241,37 +234,33 @@ export class Resque extends Initializer {
241234
};
242235

243236
async initialize() {
244-
const resqueContainer = { jobs: await this.loadJobs() } as {
237+
const resqueContainer = {
238+
jobs: await this.loadJobs(),
239+
workers: [] as Worker[],
240+
} as {
245241
queue: Queue;
246242
scheduler: Scheduler;
247-
multiWorker: MultiWorker;
243+
workers: Worker[];
248244
jobs: Awaited<ReturnType<Resque["loadJobs"]>>;
249245
};
250246

251247
return resqueContainer;
252248
}
253249

254250
async start() {
255-
if (
256-
config.tasks.minTaskProcessors === 0 &&
257-
config.tasks.maxTaskProcessors > 0
258-
) {
259-
config.tasks.minTaskProcessors = 1;
260-
}
261-
262251
await this.startQueue();
263252

264253
if (api.runMode === RUN_MODE.SERVER) {
265254
await this.startScheduler();
266-
await this.startMultiWorker();
255+
await this.startWorkers();
267256
}
268257
}
269258

270259
async stop() {
271260
await this.stopQueue();
272261

273262
if (api.runMode === RUN_MODE.SERVER) {
274-
await this.stopMultiWorker();
263+
await this.stopWorkers();
275264
await this.stopScheduler();
276265
}
277266
}

0 commit comments

Comments
 (0)