Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP multiple datasets (add PODs) #9

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 65 additions & 47 deletions lib/data_updater.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@ const pidString = `${process.ppid ? `${process.ppid} > ` : ''}PID ${process.pid}
export default class DataUpdater extends EventEmitter {
/**
* Class constructor
* @param {string} dataUrl - the URL from which to fetch the geo.json data
* @param {string} apiRoot - the root URL from which to fetch the geo.json data
* @param {Object} dataPaths - the paths from which to pull data with label as key
* @returns {DataUpdater} - the created DataUpdater instance
*/
constructor (dataUrl) {
constructor (apiRoot, dataPaths) {
console.log('creating new DataUpdater with args:', 'apiRoot =', apiRoot, ', dataPaths =', dataPaths);
super();
this.dataUrl = dataUrl;
this.apiRoot = apiRoot;
this.dataPaths = {};
for (let [dataLabel, apiPath] of Object.entries(dataPaths)) {
this.dataPaths[dataLabel] = this.apiRoot + apiPath;
}
this.updateInProgress = false;
}

Expand All @@ -28,47 +34,54 @@ export default class DataUpdater extends EventEmitter {
* @returns {void}
*/
performUpdate (childProcess = false) {
let promise;
let promises;
this.updateInProgress = true;
if (childProcess) { // TODO: Not yet working
promise = new Promise((resolve, reject) => {
console.log(`(${pidString}) executing update on child process...`);
cp = fork(__filename, [], {
stdio: [process.stdin, process.stdout, process.stderr, 'ipc'],
env: { 'DATA_URL': this.dataUrl }
});
process.on('message', (data) => {
if (data === 'fetchLocationData') {
console.log(`(${pidString}) 'fetchLocationData': starting update.`);
this.fetchLocationData()
.then((cpResult) => {
console.log(`(${pidString}) 'fetchLocationData': update completed.`);
process.send('message', cpResult);
});
} else {
console.log(`(${pidString}) Unknown message value.`);
}
});
process.on('error', (error) => {
console.error(`(${pidString}) Error in child process execution: ${error.message}`, error.trace);
reject(error);
});
cp.on('message', (data) => {
resolve(data);
});
cp.on('exit', (code, signal) => {
console.log(`(${pidString}) child process exited with code ${code}${signal ? `and signal ${signal}` : ''}.`);
promises = Object.entries(this.dataPaths).map((label_and_uri) => {
const [dataLabel, uri] = label_and_uri;
return new Promise((resolve, reject) => {
console.log(`(${pidString}) executing update on child process...`);
cp = fork(__filename, [], {
stdio: [process.stdin, process.stdout, process.stderr, 'ipc'],
env: { 'DATA_URL': uri, 'DATA_LABEL': dataLabel }
});
process.on('message', (data) => {
if (data === 'fetchLocationData') {
console.log(`(${pidString}) 'fetchLocationData': starting update.`);
this.fetchLocationData(process.env.DATA_LABEL, process.env.DATA_URL)
.then((cpResult) => {
console.log(`(${pidString}) 'fetchLocationData': update completed for ${process.env.DATA_LABEL} (${process.env.DATA_URL}).`);
process.send('message', cpResult);
});
} else {
console.log(`(${pidString}) Unknown message value.`);
}
});
process.on('error', (error) => {
console.error(`(${pidString}) Error in child process execution: ${error.message}`, error.trace);
reject(error);
});
cp.on('message', (data) => {
resolve(data);
});
cp.on('exit', (code, signal) => {
console.log(`(${pidString}) child process exited with code ${code}${signal ? `and signal ${signal}` : ''}.`);
});
cp.send('fetchLocationData');
});
cp.send('fetchLocationData');
});
} else {
console.log(`(${pidString}) starting update.`);
promise = this.fetchLocationData();
promises = Object.entries(this.dataPaths)
.map(([dataLabel, uri]) => { return this.fetchLocationData(dataLabel, uri); });
}
promise
Promise.all(promises)
.then((data) => {
console.log(`(${pidString}) update complete, emitting update...`);
this.emit('update', data);
const updatedDataStore = {};
console.log(updatedDataStore, data)
for (let [label, geodata] of data) { updatedDataStore[label] = geodata; }
console.log(`(${pidString}) updates complete, emitting update event...`);
this.emit('update', updatedDataStore);
this.updateInProgress = false;
})
.catch((e) => {
Expand All @@ -79,13 +92,14 @@ export default class DataUpdater extends EventEmitter {

/**
* Pulls geo.json data from the external source and emits an update event with the new data
* @emits DataUpdater#update
* @returns {void}
* @param {string} label - the label to use when adding data to the store object
* @param {string} uri - the URI from which to fetch location data
* @returns {Promise} - a Promise object which resolves to an array of [label, dataset]
*/
fetchLocationData () {
fetchLocationData (label, uri) {
return new Promise((resolve, reject) => {
console.log(`[${new Date()}] Updating location data...`);
https.get(this.dataUrl, (res) => {
console.log(`[${new Date()}] Updating ${label} location data from ${uri}...`);
https.get(uri, (res) => {
const { statusCode } = res;
const contentType = res.headers['content-type'];
const error = this.handleErrorResponse(statusCode, contentType);
Expand All @@ -99,13 +113,17 @@ export default class DataUpdater extends EventEmitter {
const features = [];
res.pipe(FeatureParser.parse())
.each((feature) => {
// console.log(`Processing feature: ${feature}`);
features.push(JSON.parse(feature.toString()));
try {
features.push(JSON.parse(feature.toString()));
} catch (error) {
if (error instanceof SyntaxError && error.message === 'Unexpected token ] in JSON at position 0') {
console.log(`Received empty response from API for ${label} (${uri})`);
} else {
console.error(`ERROR while processing ${label} (${uri}) feature: ${feature}`, error);
}
}
});
res.on('end', () => {
const locationData = this.extractGeoJsonData(features);
resolve(locationData);
});
res.on('end', () => { resolve([label, this.extractGeoJsonData(features)]); });
console.log(`[${new Date()}] Location update complete.`);
}).on('error', (e) => {
console.error(`Got error: ${e.message}`);
Expand Down
Loading