From 61f3f6288b67a74b7aad39acaa68c409736a8c01 Mon Sep 17 00:00:00 2001 From: Alan Cruikshanks Date: Tue, 27 Aug 2024 11:30:31 +0100 Subject: [PATCH 1/3] Start importing licence version points to 'water' https://eaflood.atlassian.net/browse/WATER-4645 > Part of the work to migrate return versions from NALD to WRLS We've been extending and amending the import of return versions from NALD to WRLS as part of our work to switch from NALD to WRLS for managing them. To support this we [Created a return-requirement-points table](https://github.com/DEFRA/water-abstraction-service/pull/2540) and [updated the import](https://github.com/DEFRA/water-abstraction-import/pull/933) to import them. These points are selected by users as part of the return requirements setup journey we've built. Currently, we extract them from the JSON blob stored in the `permit.licence` table. The problem we've encountered is the import service only populates the points for licences that - Have not ended - Have a current licence version Else, `permit.licence` is not populated with the points data our journey relies on causing it to throw an error. There are also places like the view licence page that are affected by this. It is perfectly valid, for example, that we have an 'ended' licence that we need to correct the historic return versions. And no matter the state that we can see what points the licence was linked to. Why the previous team never opted to extract licence points to their own table we don't know. But this change imports licence version points to a table so we can stop depending on the `permit.licence` table and its incomplete data. From a176899abfb3e789d55b573de59dedbea5d2b15d Mon Sep 17 00:00:00 2001 From: Alan Cruikshanks Date: Tue, 27 Aug 2024 17:33:47 +0100 Subject: [PATCH 2/3] Add new import points job to licence import When we amended the import to bring in return requirement points we were able to tag on another query to those already being run as part of the 'charging-data' job. Unfortunately, for 'reasons', the previous team made the licence import job much more complicated. It is broken down into various 'jobs', which typically - queue up a bunch of jobs, for example, licences to import - process each individual licence import job We're in the middle of migrating the licence import to [water-abstraction-system](https://github.com/DEFRA/water-abstraction-system) which will do away with all this needless complexity. But that won't 'land' for some time and we need points now. This is a half-way house. Adding this licence-based query to the 'charging-data' job didn't seem right. But replicating how things are done in the 'licence-import' job seemed overkill. So, we've amended the job process in licence import to queue up a new 'import-points' job when the last licence has been imported. But the job is just running a single query to create or update the points in one go. --- .../licence-import/jobs/import-licence.js | 16 +++- .../licence-import/jobs/import-points.js | 89 +++++++++++++++++++ src/modules/licence-import/plugin.js | 9 ++ 3 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 src/modules/licence-import/jobs/import-points.js diff --git a/src/modules/licence-import/jobs/import-licence.js b/src/modules/licence-import/jobs/import-licence.js index 8c6ec859..4ebe3458 100644 --- a/src/modules/licence-import/jobs/import-licence.js +++ b/src/modules/licence-import/jobs/import-licence.js @@ -2,6 +2,7 @@ const extract = require('../extract') const load = require('../load') +const ImportPointsJob = require('./import-points.js') const transform = require('../transform') const JOB_NAME = 'licence-import.import-licence' @@ -62,9 +63,19 @@ async function handler (job) { // Load licence to DB await load.licence.loadLicence(mapped) + } catch (error) { + global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error) + throw error + } +} + +async function onComplete (messageQueue, job) { + try { + const { data } = job.data.request - if (job.data.jobNumber === job.data.numberOfJobs) { - global.GlobalNotifier.omg(`${JOB_NAME}: finished`, { numberOfJobs: job.data.numberOfJobs }) + if (data.jobNumber === data.numberOfJobs) { + await messageQueue.publish(ImportPointsJob.createMessage()) + global.GlobalNotifier.omg(`${JOB_NAME}: finished`, { numberOfJobs: job.data.request.data.numberOfJobs }) } } catch (error) { global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error) @@ -76,5 +87,6 @@ module.exports = { createMessage, handler, name: JOB_NAME, + onComplete, options } diff --git a/src/modules/licence-import/jobs/import-points.js b/src/modules/licence-import/jobs/import-points.js new file mode 100644 index 00000000..9406d06a --- /dev/null +++ b/src/modules/licence-import/jobs/import-points.js @@ -0,0 +1,89 @@ +'use strict' + +const { pool } = require('../../../lib/connectors/db') + +const JOB_NAME = 'licence-import.import-points' + +function createMessage () { + return { + name: JOB_NAME, + options: { + singletonKey: JOB_NAME, + expireIn: '1 hours' + } + } +} + +async function handler () { + try { + global.GlobalNotifier.omg(`${JOB_NAME}: started`) + + return _importPoints() + } catch (error) { + global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error) + throw error + } +} + +async function onComplete () { + global.GlobalNotifier.omg(`${JOB_NAME}: finished`) +} + +async function _importPoints () { + return pool.query(` + INSERT INTO water.licence_version_purpose_points ( + licence_version_purpose_id, + description, + ngr_1, + ngr_2, + ngr_3, + ngr_4, + external_id, + nald_point_id + ) + SELECT + lvp.licence_version_purpose_id, + np."LOCAL_NAME" AS description, + concat_ws(' ', np."NGR1_SHEET", np."NGR1_EAST", np."NGR1_NORTH") AS ngr_1, + ( + CASE np."NGR2_SHEET" + WHEN 'null' THEN NULL + ELSE concat_ws(' ', np."NGR2_SHEET", np."NGR2_EAST", np."NGR2_NORTH") + END + ) AS ngr_2, + ( + CASE np."NGR3_SHEET" + WHEN 'null' THEN NULL + ELSE concat_ws(' ', np."NGR3_SHEET", np."NGR3_EAST", np."NGR3_NORTH") + END + ) AS ngr_3, + ( + CASE np."NGR4_SHEET" + WHEN 'null' THEN NULL + ELSE concat_ws(' ', np."NGR4_SHEET", np."NGR4_EAST", np."NGR4_NORTH") + END + ) AS ngr_4, + (concat_ws(':', napp."FGAC_REGION_CODE", napp."AABP_ID", napp."AAIP_ID")) AS external_id, + napp."AAIP_ID"::integer AS nald_point_id + FROM + "import"."NALD_ABS_PURP_POINTS" napp + INNER JOIN water.licence_version_purposes lvp + ON napp."FGAC_REGION_CODE" = split_part(lvp.external_id, ':', 1) AND napp."AABP_ID" = split_part(lvp.external_id, ':', 2) + INNER JOIN import."NALD_POINTS" np + ON np."ID" = napp."AAIP_ID" AND np."FGAC_REGION_CODE" = napp."FGAC_REGION_CODE" + ON CONFLICT(external_id) DO + UPDATE SET + description=excluded.description, + ngr_1=excluded.ngr_1, + ngr_2=excluded.ngr_2, + ngr_3=excluded.ngr_3, + ngr_4=excluded.ngr_4; + `) +} + +module.exports = { + createMessage, + handler, + onComplete, + name: JOB_NAME +} diff --git a/src/modules/licence-import/plugin.js b/src/modules/licence-import/plugin.js index 1e3fff03..11bf33d5 100644 --- a/src/modules/licence-import/plugin.js +++ b/src/modules/licence-import/plugin.js @@ -6,6 +6,7 @@ const DeleteRemovedDocumentsJob = require('./jobs/delete-removed-documents.js') const ImportCompanyJob = require('./jobs/import-company.js') const ImportLicenceJob = require('./jobs/import-licence.js') const ImportLicenceSystemJob = require('./jobs/import-licence-system.js') +const ImportPointsJob = require('./jobs/import-points.js') const ImportPurposeConditionTypesJob = require('./jobs/import-purpose-condition-types.js') const QueueCompaniesJob = require('./jobs/queue-companies.js') const QueueLicencesJob = require('./jobs/queue-licences.js') @@ -54,6 +55,14 @@ async function register (server, _options) { }) await server.messageQueue.subscribe(ImportLicenceJob.name, ImportLicenceJob.options, ImportLicenceJob.handler) + await server.messageQueue.onComplete(ImportLicenceJob.name, (executedJob) => { + return ImportLicenceJob.onComplete(server.messageQueue, executedJob) + }) + + await server.messageQueue.subscribe(ImportPointsJob.name, ImportPointsJob.options, ImportPointsJob.handler) + await server.messageQueue.onComplete(ImportPointsJob.name, () => { + return ImportPointsJob.onComplete() + }) cron.schedule(config.import.licences.schedule, async () => { await server.messageQueue.publish(DeleteRemovedDocumentsJob.createMessage()) From c98c954cc16d8a622ca22551390319a57bbb9d88 Mon Sep 17 00:00:00 2001 From: Alan Cruikshanks Date: Tue, 27 Aug 2024 18:14:45 +0100 Subject: [PATCH 3/3] Fix tests --- .../jobs/import-licence.test.js | 92 ++++++++++++++++++- 1 file changed, 87 insertions(+), 5 deletions(-) diff --git a/test/modules/licence-import/jobs/import-licence.test.js b/test/modules/licence-import/jobs/import-licence.test.js index 44af4dd4..bc1c012e 100644 --- a/test/modules/licence-import/jobs/import-licence.test.js +++ b/test/modules/licence-import/jobs/import-licence.test.js @@ -123,13 +123,10 @@ experiment('Licence Import: Import Licence job', () => { } }) - test("a 'finished' message is logged", async () => { + test('a message is NOT logged', async () => { await ImportLicenceJob.handler(job) - const [message] = notifierStub.omg.lastCall.args - expect(message).to.equal('licence-import.import-licence: finished') - - expect(notifierStub.omg.called).to.be.true() + expect(notifierStub.omg.called).to.be.false() }) test('extracts the licence data, transforms it then loads it into the DB', async () => { @@ -164,4 +161,89 @@ experiment('Licence Import: Import Licence job', () => { }) }) }) + + experiment('.onComplete', () => { + let job + let messageQueue + + beforeEach(async () => { + messageQueue = { + publish: Sinon.stub() + } + }) + + experiment('and this is the first licence to be imported', () => { + beforeEach(() => { + job = { + failed: false, + data: { + request: { data: { licenceNumber: '01/123', jobNumber: 1, numberOfJobs: 10 } } + } + } + }) + + test('a "finished" message is NOT logged', async () => { + await ImportLicenceJob.onComplete(messageQueue, job) + + expect(notifierStub.omg.called).to.be.false() + }) + + test('the "import-points" job is NOT published to the queue', async () => { + await ImportLicenceJob.onComplete(messageQueue, job) + + expect(messageQueue.publish.called).to.be.false() + }) + }) + + experiment('and this is one of a number of licences to be imported', () => { + beforeEach(() => { + job = { + failed: false, + data: { + request: { data: { licenceNumber: '01/123', jobNumber: 2, numberOfJobs: 10 } } + } + } + }) + + test('a message is NOT logged', async () => { + await ImportLicenceJob.onComplete(messageQueue, job) + + expect(notifierStub.omg.called).to.be.false() + }) + + test('the "import-points" job is NOT published to the queue', async () => { + await ImportLicenceJob.onComplete(messageQueue, job) + + expect(messageQueue.publish.called).to.be.false() + }) + }) + + experiment('and this is the last licence to be imported', () => { + beforeEach(() => { + job = { + failed: false, + data: { + request: { data: { licenceNumber: '01/123', jobNumber: 10, numberOfJobs: 10 } } + } + } + }) + + test('a "finished" message is logged', async () => { + await ImportLicenceJob.onComplete(messageQueue, job) + + const [message] = notifierStub.omg.lastCall.args + expect(message).to.equal('licence-import.import-licence: finished') + + expect(notifierStub.omg.called).to.be.true() + }) + + test('the "import-points" job is published to the queue', async () => { + await ImportLicenceJob.onComplete(messageQueue, job) + + const jobMessage = messageQueue.publish.lastCall.args[0] + + expect(jobMessage.name).to.equal('licence-import.import-points') + }) + }) + }) })