Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start importing licence version points to 'water' #1009

Merged
merged 3 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions src/modules/licence-import/jobs/import-licence.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const extract = require('../extract')
const load = require('../load')
const ImportPointsJob = require('./import-points.js')
const transform = require('../transform')

const JOB_NAME = 'licence-import.import-licence'
Expand Down Expand Up @@ -62,9 +63,19 @@ async function handler (job) {

// Load licence to DB
await load.licence.loadLicence(mapped)
} catch (error) {
global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error)
throw error
}
}

async function onComplete (messageQueue, job) {
try {
const { data } = job.data.request

if (job.data.jobNumber === job.data.numberOfJobs) {
global.GlobalNotifier.omg(`${JOB_NAME}: finished`, { numberOfJobs: job.data.numberOfJobs })
if (data.jobNumber === data.numberOfJobs) {
await messageQueue.publish(ImportPointsJob.createMessage())
global.GlobalNotifier.omg(`${JOB_NAME}: finished`, { numberOfJobs: job.data.request.data.numberOfJobs })
}
} catch (error) {
global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error)
Expand All @@ -76,5 +87,6 @@ module.exports = {
createMessage,
handler,
name: JOB_NAME,
onComplete,
options
}
89 changes: 89 additions & 0 deletions src/modules/licence-import/jobs/import-points.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
'use strict'

const { pool } = require('../../../lib/connectors/db')

const JOB_NAME = 'licence-import.import-points'

function createMessage () {
return {
name: JOB_NAME,
options: {
singletonKey: JOB_NAME,
expireIn: '1 hours'
}
}
}

async function handler () {
try {
global.GlobalNotifier.omg(`${JOB_NAME}: started`)

return _importPoints()
} catch (error) {
global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error)
throw error
}
}

async function onComplete () {
global.GlobalNotifier.omg(`${JOB_NAME}: finished`)
}

async function _importPoints () {
return pool.query(`
INSERT INTO water.licence_version_purpose_points (
licence_version_purpose_id,
description,
ngr_1,
ngr_2,
ngr_3,
ngr_4,
external_id,
nald_point_id
)
SELECT
lvp.licence_version_purpose_id,
np."LOCAL_NAME" AS description,
concat_ws(' ', np."NGR1_SHEET", np."NGR1_EAST", np."NGR1_NORTH") AS ngr_1,
(
CASE np."NGR2_SHEET"
WHEN 'null' THEN NULL
ELSE concat_ws(' ', np."NGR2_SHEET", np."NGR2_EAST", np."NGR2_NORTH")
END
) AS ngr_2,
(
CASE np."NGR3_SHEET"
WHEN 'null' THEN NULL
ELSE concat_ws(' ', np."NGR3_SHEET", np."NGR3_EAST", np."NGR3_NORTH")
END
) AS ngr_3,
(
CASE np."NGR4_SHEET"
WHEN 'null' THEN NULL
ELSE concat_ws(' ', np."NGR4_SHEET", np."NGR4_EAST", np."NGR4_NORTH")
END
) AS ngr_4,
(concat_ws(':', napp."FGAC_REGION_CODE", napp."AABP_ID", napp."AAIP_ID")) AS external_id,
napp."AAIP_ID"::integer AS nald_point_id
FROM
"import"."NALD_ABS_PURP_POINTS" napp
INNER JOIN water.licence_version_purposes lvp
ON napp."FGAC_REGION_CODE" = split_part(lvp.external_id, ':', 1) AND napp."AABP_ID" = split_part(lvp.external_id, ':', 2)
INNER JOIN import."NALD_POINTS" np
ON np."ID" = napp."AAIP_ID" AND np."FGAC_REGION_CODE" = napp."FGAC_REGION_CODE"
ON CONFLICT(external_id) DO
UPDATE SET
description=excluded.description,
ngr_1=excluded.ngr_1,
ngr_2=excluded.ngr_2,
ngr_3=excluded.ngr_3,
ngr_4=excluded.ngr_4;
`)
}

module.exports = {
createMessage,
handler,
onComplete,
name: JOB_NAME
}
9 changes: 9 additions & 0 deletions src/modules/licence-import/plugin.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const DeleteRemovedDocumentsJob = require('./jobs/delete-removed-documents.js')
const ImportCompanyJob = require('./jobs/import-company.js')
const ImportLicenceJob = require('./jobs/import-licence.js')
const ImportLicenceSystemJob = require('./jobs/import-licence-system.js')
const ImportPointsJob = require('./jobs/import-points.js')
const ImportPurposeConditionTypesJob = require('./jobs/import-purpose-condition-types.js')
const QueueCompaniesJob = require('./jobs/queue-companies.js')
const QueueLicencesJob = require('./jobs/queue-licences.js')
Expand Down Expand Up @@ -54,6 +55,14 @@ async function register (server, _options) {
})

await server.messageQueue.subscribe(ImportLicenceJob.name, ImportLicenceJob.options, ImportLicenceJob.handler)
await server.messageQueue.onComplete(ImportLicenceJob.name, (executedJob) => {
return ImportLicenceJob.onComplete(server.messageQueue, executedJob)
})

await server.messageQueue.subscribe(ImportPointsJob.name, ImportPointsJob.options, ImportPointsJob.handler)
await server.messageQueue.onComplete(ImportPointsJob.name, () => {
return ImportPointsJob.onComplete()
})

cron.schedule(config.import.licences.schedule, async () => {
await server.messageQueue.publish(DeleteRemovedDocumentsJob.createMessage())
Expand Down
92 changes: 87 additions & 5 deletions test/modules/licence-import/jobs/import-licence.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,10 @@ experiment('Licence Import: Import Licence job', () => {
}
})

test("a 'finished' message is logged", async () => {
test('a message is NOT logged', async () => {
await ImportLicenceJob.handler(job)

const [message] = notifierStub.omg.lastCall.args
expect(message).to.equal('licence-import.import-licence: finished')

expect(notifierStub.omg.called).to.be.true()
expect(notifierStub.omg.called).to.be.false()
})

test('extracts the licence data, transforms it then loads it into the DB', async () => {
Expand Down Expand Up @@ -164,4 +161,89 @@ experiment('Licence Import: Import Licence job', () => {
})
})
})

experiment('.onComplete', () => {
let job
let messageQueue

beforeEach(async () => {
messageQueue = {
publish: Sinon.stub()
}
})

experiment('and this is the first licence to be imported', () => {
beforeEach(() => {
job = {
failed: false,
data: {
request: { data: { licenceNumber: '01/123', jobNumber: 1, numberOfJobs: 10 } }
}
}
})

test('a "finished" message is NOT logged', async () => {
await ImportLicenceJob.onComplete(messageQueue, job)

expect(notifierStub.omg.called).to.be.false()
})

test('the "import-points" job is NOT published to the queue', async () => {
await ImportLicenceJob.onComplete(messageQueue, job)

expect(messageQueue.publish.called).to.be.false()
})
})

experiment('and this is one of a number of licences to be imported', () => {
beforeEach(() => {
job = {
failed: false,
data: {
request: { data: { licenceNumber: '01/123', jobNumber: 2, numberOfJobs: 10 } }
}
}
})

test('a message is NOT logged', async () => {
await ImportLicenceJob.onComplete(messageQueue, job)

expect(notifierStub.omg.called).to.be.false()
})

test('the "import-points" job is NOT published to the queue', async () => {
await ImportLicenceJob.onComplete(messageQueue, job)

expect(messageQueue.publish.called).to.be.false()
})
})

experiment('and this is the last licence to be imported', () => {
beforeEach(() => {
job = {
failed: false,
data: {
request: { data: { licenceNumber: '01/123', jobNumber: 10, numberOfJobs: 10 } }
}
}
})

test('a "finished" message is logged', async () => {
await ImportLicenceJob.onComplete(messageQueue, job)

const [message] = notifierStub.omg.lastCall.args
expect(message).to.equal('licence-import.import-licence: finished')

expect(notifierStub.omg.called).to.be.true()
})

test('the "import-points" job is published to the queue', async () => {
await ImportLicenceJob.onComplete(messageQueue, job)

const jobMessage = messageQueue.publish.lastCall.args[0]

expect(jobMessage.name).to.equal('licence-import.import-points')
})
})
})
})
Loading