Skip to content

Commit ba68fc4

Browse files
committed
fix: replace fetch in intervals by timeout recursive calls
- In the pipeline data loading, Intervals would lead to refetches while previous data where not yet finished to be retrieved. - Applied the same logic for jobs monitoring just in case to reduce a bit the polling frequency
1 parent 73aa453 commit ba68fc4

File tree

1 file changed

+155
-111
lines changed

1 file changed

+155
-111
lines changed

src/main/data/middlewares/pipeline.ts

+155-111
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ import { dialog, ipcMain } from 'electron'
6464
import { IPC } from 'shared/constants/ipc'
6565
import { pathToFileURL } from 'url'
6666
import { MainWindow, MainWindowInstance } from 'main/windows'
67+
import { AbortError } from 'node-fetch'
6768

6869
// prettier-ignore
6970
/**
@@ -193,110 +194,129 @@ async function downloadJobResults(j: Job, targetFolder: string) {
193194
* @param dispatch redux store dispatch function
194195
*/
195196
function startMonitor(j: Job, ws: Webservice, getState, dispatch) {
196-
let monitor = null
197197
const fetchJobData = pipelineAPI.fetchJobData(j)
198-
monitor = setInterval(() => {
199-
if (selectStatus(getState()) == PipelineStatus.STOPPED) {
200-
error('The pipeline has stopped working while executing job', j)
201-
clearInterval(monitor)
202-
} else
203-
fetchJobData(ws)
204-
.then((value) => {
205-
// info('received job data ', value)
206-
// don't log the job messages, it's too verbose
207-
info('received job data ', {
208-
...value,
209-
messages: ['removed to keep log cleaner'],
198+
const maxAttempt = 3
199+
const timeoutMonitor = async (attempt) => {
200+
if (attempt >= maxAttempt) {
201+
error(
202+
'Job monitoring failed after',
203+
maxAttempt,
204+
'attempts to fetch the job',
205+
j
206+
)
207+
return
208+
}
209+
const isFinished = await fetchJobData(ws)
210+
.then((value) => {
211+
// info('received job data ', value)
212+
// don't log the job messages, it's too verbose
213+
info('received job data ', {
214+
...value,
215+
messages: ['removed to keep log cleaner'],
216+
})
217+
let updatedJob = {
218+
...j,
219+
jobData: value,
220+
}
221+
const finished = [
222+
JobStatus.ERROR,
223+
JobStatus.FAIL,
224+
JobStatus.SUCCESS,
225+
].includes(value.status)
226+
if (finished) {
227+
updatedJob.state = JobState.ENDED
228+
}
229+
const newJobName = `${
230+
updatedJob.jobData.nicename ??
231+
updatedJob.jobData.script.nicename
232+
}_${timestamp()}`
233+
const downloadFolder = selectDownloadPath(getState())
234+
if (updatedJob.jobData?.results?.namedResults) {
235+
// If job has results, download them
236+
downloadJobResults(
237+
updatedJob,
238+
`${downloadFolder}/${newJobName}`
239+
)
240+
.then((downloadedJob) => {
241+
dispatch(updateJob(downloadedJob))
242+
// Only delete job if it has been downloaded
243+
if (downloadedJob.jobData.downloadedFolder) {
244+
const deleteJob =
245+
pipelineAPI.deleteJob(downloadedJob)
246+
deleteJob().then((response) => {
247+
info(
248+
downloadedJob.jobData.jobId,
249+
'delete response',
250+
response.status,
251+
response.statusText
252+
)
253+
})
254+
}
255+
})
256+
.catch((e) => {
257+
error('Error downloading job results', e)
258+
})
259+
} else if (finished) {
260+
info('job is finished without results')
261+
// job is finished wihout results : keep the log
262+
downloadJobLog(
263+
updatedJob,
264+
`${downloadFolder}/${newJobName}`
265+
).then((jobWithLog) => {
266+
dispatch(updateJob(jobWithLog))
267+
const deleteJob = pipelineAPI.deleteJob(jobWithLog)
268+
deleteJob().then((response) => {
269+
info(
270+
jobWithLog.jobData.jobId,
271+
'delete response',
272+
response.status,
273+
response.statusText
274+
)
275+
})
210276
})
211-
let updatedJob = {
277+
} else {
278+
dispatch(updateJob(updatedJob))
279+
}
280+
return finished
281+
})
282+
.catch((e) => {
283+
error('Error fetching data for job', j, e)
284+
dispatch(
285+
updateJob({
212286
...j,
213-
jobData: value,
214-
}
215-
const finished = [
216-
JobStatus.ERROR,
217-
JobStatus.FAIL,
218-
JobStatus.SUCCESS,
219-
].includes(value.status)
220-
if (finished) {
221-
clearInterval(monitor)
222-
updatedJob.state = JobState.ENDED
223-
}
224-
const newJobName = `${
225-
updatedJob.jobData.nicename ??
226-
updatedJob.jobData.script.nicename
227-
}_${timestamp()}`
228-
const downloadFolder = selectDownloadPath(getState())
229-
if (updatedJob.jobData?.results?.namedResults) {
230-
// If job has results, download them
231-
downloadJobResults(
232-
updatedJob,
233-
`${downloadFolder}/${newJobName}`
234-
)
235-
.then((downloadedJob) => {
236-
dispatch(updateJob(downloadedJob))
237-
// Only delete job if it has been downloaded
238-
if (downloadedJob.jobData.downloadedFolder) {
239-
const deleteJob =
240-
pipelineAPI.deleteJob(downloadedJob)
241-
deleteJob().then((response) => {
242-
info(
243-
downloadedJob.jobData.jobId,
244-
'delete response',
245-
response.status,
246-
response.statusText
247-
)
248-
})
249-
}
250-
})
251-
.catch((e) => {
252-
error('Error downloading job results', e)
253-
})
254-
} else if (finished) {
255-
info('job is finished without results')
256-
// job is finished wihout results : keep the log
257-
downloadJobLog(
258-
updatedJob,
259-
`${downloadFolder}/${newJobName}`
260-
).then((jobWithLog) => {
261-
dispatch(updateJob(jobWithLog))
262-
const deleteJob = pipelineAPI.deleteJob(jobWithLog)
263-
deleteJob().then((response) => {
264-
info(
265-
jobWithLog.jobData.jobId,
266-
'delete response',
267-
response.status,
268-
response.statusText
269-
)
270-
})
271-
})
272-
} else {
273-
dispatch(updateJob(updatedJob))
274-
}
275-
})
276-
.catch((e) => {
277-
error('Error fetching data for job', j, e)
278-
if (j.jobRequestError) {
279-
clearInterval(monitor)
280-
}
281-
dispatch(
282-
updateJob({
283-
...j,
284-
jobData: {
285-
...j.jobData,
286-
status: JobStatus.ERROR,
287+
jobData: {
288+
...j.jobData,
289+
status: JobStatus.ERROR,
290+
},
291+
errors: [
292+
{
293+
error:
294+
e instanceof ParserException
295+
? e.parsedText
296+
: String(e),
287297
},
288-
errors: [
289-
{
290-
error:
291-
e instanceof ParserException
292-
? e.parsedText
293-
: String(e),
294-
},
295-
],
296-
})
297-
)
298-
})
299-
}, 1000)
298+
],
299+
})
300+
)
301+
if (j.jobRequestError) {
302+
//clearInterval(monitor)
303+
return true
304+
} else {
305+
// relaunch a new attempt to get the job data
306+
timeoutMonitor(attempt + 1)
307+
return true // deativate the default monitor
308+
}
309+
})
310+
if (selectStatus(getState()) == PipelineStatus.STOPPED) {
311+
error('The pipeline has stopped working while executing job', j)
312+
}
313+
if (!isFinished) {
314+
// wait 1 sec before refetching if job is not in finished state
315+
setTimeout(() => timeoutMonitor(0), 1000)
316+
}
317+
}
318+
// Start the monitor
319+
timeoutMonitor(0)
300320
}
301321

302322
// Store managed pipeline instance
@@ -359,32 +379,41 @@ export function pipelineMiddleware({ getState, dispatch }) {
359379
break
360380
case useWebservice.type:
361381
// Action dispatched when the pipeline instance is launched
362-
const newWebservice = action.payload
363-
let fetchScriptsInterval = null
382+
const newWebservice = action.payload as Webservice
364383
const fetchAlive = pipelineAPI.fetchAlive()
365384
const fetchScripts = pipelineAPI.fetchScripts()
366-
fetchScriptsInterval = setInterval(() => {
385+
let maxAttempt = 2
386+
const loadPipelineData = async (attempt: number) => {
387+
if (attempt == maxAttempt) {
388+
error(
389+
'useWebservice',
390+
`${maxAttempt} attempts to fetch webservice data failed, stopping pipeline.`,
391+
'Please check pipeline logs.'
392+
)
393+
getPipelineInstance(state)?.stop(action.payload)
394+
dispatch(setStatus(PipelineStatus.STOPPED))
395+
return
396+
}
367397
if (selectStatus(getState()) == PipelineStatus.STOPPED) {
368398
error(
369399
'useWebservice',
370400
'Pipeline has been stopped during webservice monitoring.',
371401
'Please check pipeline logs.'
372402
)
373-
clearInterval(fetchScriptsInterval)
403+
return
374404
} else if (newWebservice) {
375405
fetchAlive(newWebservice)
376406
.then((alive) => {
377-
dispatch(setAlive(alive))
378-
})
379-
.then(() => fetchScripts(newWebservice))
380-
.then((scripts: Array<Script>) => {
381407
info(
382408
'useWebservice',
383409
'Pipeline is ready to be used'
384410
)
411+
dispatch(setAlive(alive))
412+
})
413+
.then(() => fetchScripts(newWebservice))
414+
.then((scripts: Array<Script>) => {
385415
dispatch(setScripts(scripts))
386416
dispatch(setStatus(PipelineStatus.RUNNING))
387-
clearInterval(fetchScriptsInterval)
388417
return pipelineAPI.fetchDatatypes()(
389418
newWebservice
390419
)
@@ -431,15 +460,30 @@ export function pipelineMiddleware({ getState, dispatch }) {
431460
})
432461
.catch((e) => {
433462
error('useWebservice', e, e.parsedText)
434-
if (
463+
if (e instanceof AbortError) {
464+
info(
465+
`Trying for a new attempt ${attempt} / ${maxAttempt}`
466+
)
467+
loadPipelineData(attempt + 1)
468+
} else if (
435469
selectStatus(getState()) ==
436470
PipelineStatus.RUNNING
437471
) {
472+
error(
473+
'useWebservice',
474+
'Pipeline has stopped working during webservice monitoring.',
475+
'Please check pipeline logs.'
476+
)
438477
dispatch(setStatus(PipelineStatus.STOPPED))
478+
} else {
479+
// Error during webservice check (possibly pipeline starting), retry in 1 second
480+
setTimeout(() => loadPipelineData(0), 1000)
439481
}
440482
})
441483
}
442-
}, 1000)
484+
}
485+
// Start pipeline data loading process
486+
loadPipelineData(0)
443487
break
444488
case setScripts.type:
445489
for (const script of action.payload as Array<Script>) {

0 commit comments

Comments
 (0)