Skip to content

Commit

Permalink
[Updater] Use BullMQ to schedule student updates
Browse files Browse the repository at this point in the history
  • Loading branch information
valtterikantanen committed Feb 26, 2025
1 parent 74bfcea commit d016cec
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 23 deletions.
10 changes: 3 additions & 7 deletions updater/sis-updater-scheduler/src/scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ const { chunk } = require('lodash')
const {
NATS_GROUP,
SIS_UPDATER_SCHEDULE_CHANNEL,
SIS_MISC_SCHEDULE_CHANNEL,
CHUNK_SIZE,
isDev,
DEV_SCHEDULE_COUNT,
Expand All @@ -19,6 +18,7 @@ const {
} = require('./config')
const { knexConnection } = require('./db/connection')
const { startPrePurge, startPurge } = require('./purge')
const { queue } = require('./queue')
const { logger } = require('./utils/logger')
const { redisClient } = require('./utils/redis')
const { stan, opts } = require('./utils/stan')
Expand Down Expand Up @@ -213,12 +213,8 @@ const scheduleByStudentNumbers = async studentNumbers => {
logger.info('Scheduling by student numbers')
const { knex } = knexConnection
const personsToUpdate = await knex('persons').column('id', 'student_number').whereIn('student_number', studentNumbers)

await eachLimit(
chunk(personsToUpdate, CHUNK_SIZE),
10,
async s => await createJobs(s, 'students', SIS_MISC_SCHEDULE_CHANNEL)
)
const personChunks = chunk(personsToUpdate, CHUNK_SIZE)
await queue.addBulk(personChunks.map(personsToUpdate => ({ name: 'students', data: personsToUpdate })))
}

const scheduleByCourseCodes = async courseCodes => {
Expand Down
16 changes: 1 addition & 15 deletions updater/sis-updater-worker/src/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const {
SIS_UPDATER_SCHEDULE_CHANNEL,
SIS_PURGE_CHANNEL,
SIS_MISC_SCHEDULE_CHANNEL,
NATS_GROUP,
REDIS_TOTAL_META_KEY,
REDIS_TOTAL_STUDENTS_KEY,
Expand All @@ -12,7 +11,7 @@ const {
const { dbConnections } = require('./db/connection')
const { postUpdate } = require('./postUpdate')
const { update } = require('./updater')
const { purge, prePurge, purgeByStudentNumber } = require('./updater/purge')
const { purge, prePurge } = require('./updater/purge')
const { loadMapsOnDemand } = require('./updater/shared')
const { logger } = require('./utils/logger')
const { redisClient } = require('./utils/redis')
Expand Down Expand Up @@ -111,13 +110,6 @@ const purgeMsgHandler = async purgeMsg => {
if (purgeMsg.action === 'PREPURGE_START') await prePurge(purgeMsg)
}

const miscMsgHandler = async miscMessage => {
const studentNumbers = miscMessage.entityIds.map(s => s.student_number)
const msgInUpdateFormat = { ...miscMessage, entityIds: miscMessage.entityIds.map(s => s.id) }
await purgeByStudentNumber(studentNumbers)
await updateMsgHandler(msgInUpdateFormat)
}

stan.on('error', error => {
logger.error({ message: 'NATS connection failed', meta: error })
if (!process.env.CI) process.exit(1)
Expand Down Expand Up @@ -150,10 +142,4 @@ dbConnections.on('connect', async () => {

const infoChannel = stan.subscribe('SIS_INFO_CHANNEL', NATS_GROUP, opts)
infoChannel.on('message', handleMessage(handleInfoMessage))

const miscChannel = stan.subscribe(SIS_MISC_SCHEDULE_CHANNEL, NATS_GROUP, opts)
miscChannel.on('message', handleMessage(miscMsgHandler))
miscChannel.on('error', error => {
logger.error({ message: 'Misc channel error', meta: error.stack })
})
})
25 changes: 24 additions & 1 deletion updater/sis-updater-worker/src/processor.js
Original file line number Diff line number Diff line change
@@ -1 +1,24 @@
module.exports = async job => {}
const { postUpdate } = require('./postUpdate')
const { update } = require('./updater')
const { purgeByStudentNumber } = require('./updater/purge')

const updateMsgHandler = async updateMsg => {
// TODO: Remove the following line after postUpdate is implemented correctly (worker.js calculates the processing time more accurately)
const startTime = new Date()
await update(updateMsg)
await postUpdate(updateMsg, startTime)
}

module.exports = async job => {
switch (job.name) {
case 'students': {
const studentNumbers = job.data.map(student => student.student_number)
const msgInUpdateFormat = { type: job.name, entityIds: job.data.map(student => student.id) }
await purgeByStudentNumber(studentNumbers)
await updateMsgHandler(msgInUpdateFormat)
break
}
default:
throw new Error(`Unknown job type: ${job.name}`)
}
}

0 comments on commit d016cec

Please sign in to comment.