Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

fix: extend existing polls instead of stacking them #619

Merged
merged 3 commits into from
Jun 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 22 additions & 37 deletions src/actions/fetchActions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import {
import { Dispatch } from 'redux'
import * as ClientFactory from '../services/clientFactory'
import { setErrorDisplay } from './displayActions'
import { Poller, IPollConfig } from '../services/poller';
import { delay } from '../util';

// ----------------------------------------
// Train Dialogs
Expand Down Expand Up @@ -175,14 +177,32 @@ const fetchTutorialsFulfilled = (tutorials: AppBase[]): ActionObject => {
// ----------------------------------------
// Training Status
// ----------------------------------------
const delay = <T>(ms: number, value: T = null): Promise<T> => new Promise<T>(resolve => setTimeout(() => resolve(value), ms))
const poller = new Poller({ interval: 2000 })

export const fetchApplicationTrainingStatusThunkAsync = (appId: string) => {
return async (dispatch: Dispatch<any>) => {
dispatch(fetchApplicationTrainingStatusAsync(appId))
// Wait 1 second before polling to ensure service has time to change status from previous to queued / running
await delay(1000)
pollTrainingStatusUntilResolvedOrMaxDuration(dispatch, appId, [TrainingStatusCode.Completed, TrainingStatusCode.Failed], 2000, 30000)

const clClient = ClientFactory.getInstance(AT.FETCH_APPLICATION_TRAININGSTATUS_ASYNC)
const pollConfig: IPollConfig<TrainingStatus> = {
id: appId,
maxDuration: 30000,
request: async () => {
const trainingStatus = await clClient.appGetTrainingStatus(appId)
console.log(`${new Date().getTime()} Poll app: ${appId}: `, trainingStatus.trainingStatus)
return trainingStatus
},
isResolved: trainingStatus => [TrainingStatusCode.Completed, TrainingStatusCode.Failed].includes(trainingStatus.trainingStatus),
onExpired: () => {
console.warn(`Polling for app ${appId} exceeded max duration. Stopping`)
dispatch(fetchApplicationTrainingStatusExpired(appId))
},
onUpdate: trainingStatus => dispatch(fetchApplicationTrainingStatusFulfilled(appId, trainingStatus)),
}

poller.addPoll(pollConfig)
}
}

Expand All @@ -208,41 +228,6 @@ const fetchApplicationTrainingStatusExpired = (appId: string): ActionObject => {
}
}

const pollTrainingStatusUntilResolvedOrMaxDuration = (dispatch: Dispatch<any>, appId: string, resolvedStates: TrainingStatusCode[], interval: number, maxDuration: number): Promise<void> => {
const start = new Date()
const end = start.getTime() + maxDuration
const clClient = ClientFactory.getInstance(null)

return new Promise<void>((resolve) => {
const timerId = setInterval(async () => {
// If current time is after max allowed polling duration then resolve
const now = (new Date()).getTime()
if (now >= end) {
console.warn(`Polling exceeded max duration. Stopping`)

if (timerId) {
clearInterval(timerId)
}

dispatch(fetchApplicationTrainingStatusExpired(appId))
resolve()
}

// Get training status and if it's one of the resolved states resolve promise
const trainingStatus = await clClient.appGetTrainingStatus(appId)
console.log(`Poll app: ${appId} training status: `, end, now, trainingStatus.trainingStatus)
dispatch(fetchApplicationTrainingStatusFulfilled(appId, trainingStatus))

if (resolvedStates.includes(trainingStatus.trainingStatus)) {
if (timerId) {
clearInterval(timerId)
}
resolve()
}
}, interval)
})
}

// -------------------------
// Entities
// -------------------------
Expand Down
148 changes: 148 additions & 0 deletions src/services/poller.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/
import * as poller from './poller'
import { delay } from '../util'

describe('Poller', () => {
test('poll should invoke onExpired callback when polling exceeds max duration', async () => {
// Arrange
const onExpiredMock = jest.fn()
const onUpdateMock = jest.fn(trainingStatus => {
})
const pollConfig: poller.IPollConfig<number> = {
id: 'pc1',
maxDuration: 500,
request: async () => {
return 0
},
isResolved: n => false,
onExpired: onExpiredMock,
onUpdate: onUpdateMock
}

const poller1 = new poller.Poller({ interval: 100 })
await poller1.addPoll(pollConfig)

expect(onExpiredMock.mock.calls.length).toBe(1)
expect(onUpdateMock.mock.calls.length).toBeGreaterThan(3)
})

test('poll should invoke request, isResolved, and onUpdate for each interval', async () => {
const requestMock = jest.fn(async () => {
return 0
})
const isResolvedMock = jest.fn(n => false)
const onExpiredMock = jest.fn()
const onUpdateMock = jest.fn(trainingStatus => {
})
const pollConfig: poller.IPollConfig<number> = {
id: 'pc1',
maxDuration: 500,
request: requestMock,
isResolved: isResolvedMock,
onExpired: onExpiredMock,
onUpdate: onUpdateMock
}

const poller1 = new poller.Poller({ interval: 100 })
await poller1.addPoll(pollConfig)

expect(requestMock.mock.calls.length).toBe(4)
expect(isResolvedMock.mock.calls.length).toBe(4)
expect(onUpdateMock.mock.calls.length).toBe(4)
})

test('poll should stop polling after isResolved returns true', async () => {
const onExpiredMock = jest.fn()
const onUpdateMock = jest.fn(trainingStatus => {
})
const pollConfig: poller.IPollConfig<number> = {
id: 'pc1',
maxDuration: 500,
request: async () => {
return 0
},
isResolved: n => true,
onExpired: onExpiredMock,
onUpdate: onUpdateMock
}

const poller1 = new poller.Poller({ interval: 100 })
await poller1.addPoll(pollConfig)

expect(onUpdateMock.mock.calls.length).toBe(1)
})

test('calling poll with same id should extend existing polls', async () => {
const pollConfig1: poller.IPollConfig<number> = {
id: 'pc1',
maxDuration: 400,
request: async () => {
return 0
},
isResolved: n => false,
onExpired: () => {},
onUpdate: () => {}
}

const pollConfig2: poller.IPollConfig<number> = {
id: 'pc1',
maxDuration: 400,
request: async () => {
return 0
},
isResolved: n => false,
onExpired: () => {},
onUpdate: () => {}
}

const now = new Date().getTime()
const poller1 = new poller.Poller({ interval: 100 })
const p1 = poller1.addPoll(pollConfig1)
await delay(200)
poller1.addPoll(pollConfig2)
await p1
const after = new Date().getTime()

// 200 + 400
expect(after - now).toBeGreaterThanOrEqual(600)
})

test('calling poll with different id should NOT extend existing polls', async () => {
const pollConfig1: poller.IPollConfig<number> = {
id: 'pc1',
maxDuration: 400,
request: async () => {
return 0
},
isResolved: n => false,
onExpired: () => {},
onUpdate: () => {}
}

const pollConfig2: poller.IPollConfig<number> = {
id: 'pc2',
maxDuration: 400,
request: async () => {
return 0
},
isResolved: n => false,
onExpired: () => {},
onUpdate: () => {}
}

const poller1 = new poller.Poller({ interval: 100 })

const now = new Date().getTime()
const p1 = poller1.addPoll(pollConfig1)
await delay(200)
poller1.addPoll(pollConfig2)

await p1 // Will still resolve after 400 expiration
const after = new Date().getTime()

expect(after - now).toBeGreaterThan(400)
})
})
94 changes: 94 additions & 0 deletions src/services/poller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License.
*/

export interface Deferred {
resolve: Function
reject: Function
pollConfig: IPollConfig<any>
}

export interface ActivePoll {
id: string
end: number
deferred: Deferred[]
}

export interface IPollConfig<T> {
id: string
maxDuration: number
request: () => Promise<T>
isResolved: (t: T) => boolean
onExpired: () => void
onUpdate: (t: T) => void
}

export interface IPollerOptions {
interval: number
}

const global = window
export class Poller {
private polls: ActivePoll[] = []
constructor(options: IPollerOptions) {
global.setInterval(async () => await this.poll(), options.interval)
}

addPoll<T>(pollConfig: IPollConfig<T>) {
const { id, maxDuration } = pollConfig
const start = new Date().getTime()
const end = start + maxDuration
const activeApp = this.polls.find(p => p.id === id)

if (activeApp) {
console.log(`Existing polling found for id: ${id} increasing end from ${activeApp.end} to: ${end}`)
activeApp.end = end
const promise = new Promise((resolve, reject) => {
activeApp.deferred.push({ resolve, reject, pollConfig })
})

return promise
}

console.log(`No polling found for id: ${id}. Starting new polling until: ${end}`)
const promise = new Promise((resolve, reject) => {
this.polls.push({
id,
end,
deferred: [{ resolve, reject, pollConfig }]
})
})

return promise
}

private async poll() {
const now = (new Date()).getTime()
// Alternate approach is to split this into three phases: Filter those expired, await all requests, then filter all resolved.
this.polls = (await Promise.all(this.polls.map(async poll => {
const { end } = poll
// If current time is after max allowed polling duration then resolve
if (now >= end) {
poll.deferred.forEach(deferred => {
deferred.pollConfig.onExpired()
deferred.resolve()
})
return undefined
}

// Get training status and if it's one of the resolved states resolve promise
const firstConfig = poll.deferred[0].pollConfig
const result = await firstConfig.request()
firstConfig.onUpdate(result)

// If trainings status is one of resolved states, remove app from polls to discontinue
if (firstConfig.isResolved(result)) {
poll.deferred.forEach(deferred => deferred.resolve())
return undefined
}

return poll
}))).filter(x => x)
}
}
3 changes: 3 additions & 0 deletions src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,6 @@ export function createEntityMapFromMemories(entities: models.EntityBase[], memor
export function getDefaultEntityMap(entities: models.EntityBase[]): Map<string, string> {
return entities.reduce((m, e) => m.set(e.entityId, `$${e.entityName}`), new Map<string, string>())
}

export const delay = <T>(ms: number, value: T = null): Promise<T> => new Promise<T>(resolve => setTimeout(() => resolve(value), ms))