diff --git a/src/modules/licence-import/jobs/import-licence.js b/src/modules/licence-import/jobs/import-licence.js index 8c6ec859..4ebe3458 100644 --- a/src/modules/licence-import/jobs/import-licence.js +++ b/src/modules/licence-import/jobs/import-licence.js @@ -2,6 +2,7 @@ const extract = require('../extract') const load = require('../load') +const ImportPointsJob = require('./import-points.js') const transform = require('../transform') const JOB_NAME = 'licence-import.import-licence' @@ -62,9 +63,19 @@ async function handler (job) { // Load licence to DB await load.licence.loadLicence(mapped) + } catch (error) { + global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error) + throw error + } +} + +async function onComplete (messageQueue, job) { + try { + const { data } = job.data.request - if (job.data.jobNumber === job.data.numberOfJobs) { - global.GlobalNotifier.omg(`${JOB_NAME}: finished`, { numberOfJobs: job.data.numberOfJobs }) + if (data.jobNumber === data.numberOfJobs) { + await messageQueue.publish(ImportPointsJob.createMessage()) + global.GlobalNotifier.omg(`${JOB_NAME}: finished`, { numberOfJobs: job.data.request.data.numberOfJobs }) } } catch (error) { global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error) @@ -76,5 +87,6 @@ module.exports = { createMessage, handler, name: JOB_NAME, + onComplete, options } diff --git a/src/modules/licence-import/jobs/import-points.js b/src/modules/licence-import/jobs/import-points.js new file mode 100644 index 00000000..9406d06a --- /dev/null +++ b/src/modules/licence-import/jobs/import-points.js @@ -0,0 +1,89 @@ +'use strict' + +const { pool } = require('../../../lib/connectors/db') + +const JOB_NAME = 'licence-import.import-points' + +function createMessage () { + return { + name: JOB_NAME, + options: { + singletonKey: JOB_NAME, + expireIn: '1 hours' + } + } +} + +async function handler () { + try { + global.GlobalNotifier.omg(`${JOB_NAME}: started`) + + return _importPoints() + } catch (error) { + global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error) + throw error + } +} + +async function onComplete () { + global.GlobalNotifier.omg(`${JOB_NAME}: finished`) +} + +async function _importPoints () { + return pool.query(` + INSERT INTO water.licence_version_purpose_points ( + licence_version_purpose_id, + description, + ngr_1, + ngr_2, + ngr_3, + ngr_4, + external_id, + nald_point_id + ) + SELECT + lvp.licence_version_purpose_id, + np."LOCAL_NAME" AS description, + concat_ws(' ', np."NGR1_SHEET", np."NGR1_EAST", np."NGR1_NORTH") AS ngr_1, + ( + CASE np."NGR2_SHEET" + WHEN 'null' THEN NULL + ELSE concat_ws(' ', np."NGR2_SHEET", np."NGR2_EAST", np."NGR2_NORTH") + END + ) AS ngr_2, + ( + CASE np."NGR3_SHEET" + WHEN 'null' THEN NULL + ELSE concat_ws(' ', np."NGR3_SHEET", np."NGR3_EAST", np."NGR3_NORTH") + END + ) AS ngr_3, + ( + CASE np."NGR4_SHEET" + WHEN 'null' THEN NULL + ELSE concat_ws(' ', np."NGR4_SHEET", np."NGR4_EAST", np."NGR4_NORTH") + END + ) AS ngr_4, + (concat_ws(':', napp."FGAC_REGION_CODE", napp."AABP_ID", napp."AAIP_ID")) AS external_id, + napp."AAIP_ID"::integer AS nald_point_id + FROM + "import"."NALD_ABS_PURP_POINTS" napp + INNER JOIN water.licence_version_purposes lvp + ON napp."FGAC_REGION_CODE" = split_part(lvp.external_id, ':', 1) AND napp."AABP_ID" = split_part(lvp.external_id, ':', 2) + INNER JOIN import."NALD_POINTS" np + ON np."ID" = napp."AAIP_ID" AND np."FGAC_REGION_CODE" = napp."FGAC_REGION_CODE" + ON CONFLICT(external_id) DO + UPDATE SET + description=excluded.description, + ngr_1=excluded.ngr_1, + ngr_2=excluded.ngr_2, + ngr_3=excluded.ngr_3, + ngr_4=excluded.ngr_4; + `) +} + +module.exports = { + createMessage, + handler, + onComplete, + name: JOB_NAME +} diff --git a/src/modules/licence-import/plugin.js b/src/modules/licence-import/plugin.js index 1e3fff03..11bf33d5 100644 --- a/src/modules/licence-import/plugin.js +++ b/src/modules/licence-import/plugin.js @@ -6,6 +6,7 @@ const DeleteRemovedDocumentsJob = require('./jobs/delete-removed-documents.js') const ImportCompanyJob = require('./jobs/import-company.js') const ImportLicenceJob = require('./jobs/import-licence.js') const ImportLicenceSystemJob = require('./jobs/import-licence-system.js') +const ImportPointsJob = require('./jobs/import-points.js') const ImportPurposeConditionTypesJob = require('./jobs/import-purpose-condition-types.js') const QueueCompaniesJob = require('./jobs/queue-companies.js') const QueueLicencesJob = require('./jobs/queue-licences.js') @@ -54,6 +55,14 @@ async function register (server, _options) { }) await server.messageQueue.subscribe(ImportLicenceJob.name, ImportLicenceJob.options, ImportLicenceJob.handler) + await server.messageQueue.onComplete(ImportLicenceJob.name, (executedJob) => { + return ImportLicenceJob.onComplete(server.messageQueue, executedJob) + }) + + await server.messageQueue.subscribe(ImportPointsJob.name, ImportPointsJob.options, ImportPointsJob.handler) + await server.messageQueue.onComplete(ImportPointsJob.name, () => { + return ImportPointsJob.onComplete() + }) cron.schedule(config.import.licences.schedule, async () => { await server.messageQueue.publish(DeleteRemovedDocumentsJob.createMessage()) diff --git a/test/modules/licence-import/jobs/import-licence.test.js b/test/modules/licence-import/jobs/import-licence.test.js index 44af4dd4..bc1c012e 100644 --- a/test/modules/licence-import/jobs/import-licence.test.js +++ b/test/modules/licence-import/jobs/import-licence.test.js @@ -123,13 +123,10 @@ experiment('Licence Import: Import Licence job', () => { } }) - test("a 'finished' message is logged", async () => { + test('a message is NOT logged', async () => { await ImportLicenceJob.handler(job) - const [message] = notifierStub.omg.lastCall.args - expect(message).to.equal('licence-import.import-licence: finished') - - expect(notifierStub.omg.called).to.be.true() + expect(notifierStub.omg.called).to.be.false() }) test('extracts the licence data, transforms it then loads it into the DB', async () => { @@ -164,4 +161,89 @@ experiment('Licence Import: Import Licence job', () => { }) }) }) + + experiment('.onComplete', () => { + let job + let messageQueue + + beforeEach(async () => { + messageQueue = { + publish: Sinon.stub() + } + }) + + experiment('and this is the first licence to be imported', () => { + beforeEach(() => { + job = { + failed: false, + data: { + request: { data: { licenceNumber: '01/123', jobNumber: 1, numberOfJobs: 10 } } + } + } + }) + + test('a "finished" message is NOT logged', async () => { + await ImportLicenceJob.onComplete(messageQueue, job) + + expect(notifierStub.omg.called).to.be.false() + }) + + test('the "import-points" job is NOT published to the queue', async () => { + await ImportLicenceJob.onComplete(messageQueue, job) + + expect(messageQueue.publish.called).to.be.false() + }) + }) + + experiment('and this is one of a number of licences to be imported', () => { + beforeEach(() => { + job = { + failed: false, + data: { + request: { data: { licenceNumber: '01/123', jobNumber: 2, numberOfJobs: 10 } } + } + } + }) + + test('a message is NOT logged', async () => { + await ImportLicenceJob.onComplete(messageQueue, job) + + expect(notifierStub.omg.called).to.be.false() + }) + + test('the "import-points" job is NOT published to the queue', async () => { + await ImportLicenceJob.onComplete(messageQueue, job) + + expect(messageQueue.publish.called).to.be.false() + }) + }) + + experiment('and this is the last licence to be imported', () => { + beforeEach(() => { + job = { + failed: false, + data: { + request: { data: { licenceNumber: '01/123', jobNumber: 10, numberOfJobs: 10 } } + } + } + }) + + test('a "finished" message is logged', async () => { + await ImportLicenceJob.onComplete(messageQueue, job) + + const [message] = notifierStub.omg.lastCall.args + expect(message).to.equal('licence-import.import-licence: finished') + + expect(notifierStub.omg.called).to.be.true() + }) + + test('the "import-points" job is published to the queue', async () => { + await ImportLicenceJob.onComplete(messageQueue, job) + + const jobMessage = messageQueue.publish.lastCall.args[0] + + expect(jobMessage.name).to.equal('licence-import.import-points') + }) + }) + }) })