@@ -30,6 +30,15 @@ import type {
3030 ListenerErrorInfo ,
3131} from './types'
3232
33+ import {
34+ Job ,
35+ SupervisorJob ,
36+ JobHandle ,
37+ JobCancellationReason ,
38+ JobCancellationException ,
39+ } from './job'
40+ import { Outcome } from './outcome'
41+
3342export type {
3443 ActionListener ,
3544 ActionListenerMiddleware ,
@@ -57,43 +66,76 @@ const defaultWhen: MiddlewarePhase = 'afterReducer'
5766const actualMiddlewarePhases = [ 'beforeReducer' , 'afterReducer' ] as const
5867
5968function createTakePattern < S > (
60- addListener : AddListenerOverloads < Unsubscribe , S , Dispatch < AnyAction > >
69+ addListener : AddListenerOverloads < Unsubscribe , S , Dispatch < AnyAction > > ,
70+ parentJob : Job < any >
6171) : TakePattern < S > {
72+ /**
73+ * A function that takes an ActionListenerPredicate and an optional timeout,
74+ * and resolves when either the predicate returns `true` based on an action
75+ * state combination or when the timeout expires.
76+ * If the parent listener is canceled while waiting, this will throw a
77+ * JobCancellationException.
78+ */
6279 async function take < P extends AnyActionListenerPredicate < S > > (
6380 predicate : P ,
6481 timeout : number | undefined
6582 ) {
83+ // Placeholder unsubscribe function until the listener is added
6684 let unsubscribe : Unsubscribe = ( ) => { }
6785
68- const tuplePromise = new Promise < [ AnyAction , S , S ] > ( ( resolve ) => {
69- unsubscribe = addListener ( {
70- predicate : predicate as any ,
71- listener : ( action , listenerApi ) : void => {
72- // One-shot listener that cleans up as soon as the predicate resolves
73- listenerApi . unsubscribe ( )
74- resolve ( [
75- action ,
76- listenerApi . getState ( ) ,
77- listenerApi . getOriginalState ( ) ,
78- ] )
79- } ,
80- } )
81- } )
82-
83- if ( timeout === undefined ) {
84- return tuplePromise
85- }
86+ // We'll add an additional nested Job representing this function.
87+ // TODO This is really a duplicate of the other job inside the middleware.
88+ // This behavior requires some additional nesting:
89+ // We're going to create a `Promise` representing the result of the listener,
90+ // but then wrap that in an `Outcome` for consistent error handling.
91+ let job : Job < [ AnyAction , S , S ] > = parentJob . launch ( async ( job ) =>
92+ Outcome . wrap (
93+ new Promise < [ AnyAction , S , S ] > ( ( resolve ) => {
94+ // Inside the Promise, we synchronously add the listener.
95+ unsubscribe = addListener ( {
96+ predicate : predicate as any ,
97+ listener : ( action , listenerApi ) : void => {
98+ // One-shot listener that cleans up as soon as the predicate passes
99+ listenerApi . unsubscribe ( )
100+ // Resolve the promise with the same arguments the predicate saw
101+ resolve ( [
102+ action ,
103+ listenerApi . getState ( ) ,
104+ listenerApi . getOriginalState ( ) ,
105+ ] )
106+ } ,
107+ parentJob,
108+ } )
109+ } )
110+ )
111+ )
86112
87- const timedOutPromise = new Promise < null > ( ( resolve , reject ) => {
88- setTimeout ( ( ) => {
89- resolve ( null )
90- } , timeout )
91- } )
113+ let result : Outcome < [ AnyAction , S , S ] >
92114
93- const result = await Promise . race ( [ tuplePromise , timedOutPromise ] )
115+ try {
116+ // Run the job and use the timeout if given
117+ result = await ( timeout !== undefined
118+ ? job . runWithTimeout ( timeout )
119+ : job . run ( ) )
94120
95- unsubscribe ( )
96- return result
121+ if ( result . isOk ( ) ) {
122+ // Resolve the actual `take` promise with the action+states
123+ return result . value
124+ } else {
125+ if (
126+ result . error instanceof JobCancellationException &&
127+ result . error . reason === JobCancellationReason . JobCancelled
128+ ) {
129+ // The `take` job itself was canceled due to timeout.
130+ return null
131+ }
132+ // The parent was canceled - reject this promise with that error
133+ throw result . error
134+ }
135+ } finally {
136+ // Always clean up the listener
137+ unsubscribe ( )
138+ }
97139 }
98140
99141 return take as TakePattern < S >
@@ -114,8 +156,12 @@ export const createListenerEntry: TypedCreateListenerEntry<unknown> = (
114156 predicate = options . actionCreator . match
115157 } else if ( 'matcher' in options ) {
116158 predicate = options . matcher
117- } else {
159+ } else if ( 'predicate' in options ) {
118160 predicate = options . predicate
161+ } else {
162+ throw new Error (
163+ 'Creating a listener requires one of the known fields for matching against actions'
164+ )
119165 }
120166
121167 const id = nanoid ( )
@@ -128,6 +174,7 @@ export const createListenerEntry: TypedCreateListenerEntry<unknown> = (
128174 unsubscribe : ( ) => {
129175 throw new Error ( 'Unsubscribe not initialized' )
130176 } ,
177+ parentJob : new SupervisorJob ( ) ,
131178 }
132179
133180 return entry
@@ -287,11 +334,6 @@ export function createActionListenerMiddleware<
287334 return true
288335 }
289336
290- const take = createTakePattern ( addListener )
291- const condition : ConditionFunction < S > = ( predicate , timeout ) => {
292- return take ( predicate , timeout ) . then ( Boolean )
293- }
294-
295337 const middleware : Middleware <
296338 {
297339 ( action : Action < 'actionListenerMiddleware/add' > ) : Unsubscribe
@@ -338,7 +380,6 @@ export function createActionListenerMiddleware<
338380 runListener = false
339381
340382 safelyNotifyError ( onError , predicateError , {
341- async : false ,
342383 raisedBy : 'predicate' ,
343384 phase : currentPhase ,
344385 } )
@@ -349,38 +390,47 @@ export function createActionListenerMiddleware<
349390 continue
350391 }
351392
352- try {
353- let promiseLikeOrUndefined = entry . listener ( action , {
354- ...api ,
355- getOriginalState,
356- condition,
357- take,
358- currentPhase,
359- extra,
360- unsubscribe : entry . unsubscribe ,
361- subscribe : ( ) => {
362- listenerMap . set ( entry . id , entry )
363- } ,
364- } )
393+ entry . parentJob . launchAndRun ( async ( jobHandle ) => {
394+ const take = createTakePattern ( addListener , jobHandle as Job < any > )
395+ const condition : ConditionFunction < S > = ( predicate , timeout ) => {
396+ return take ( predicate , timeout ) . then ( Boolean )
397+ }
365398
366- if ( promiseLikeOrUndefined ) {
367- Promise . resolve ( promiseLikeOrUndefined ) . catch (
368- ( asyncListenerError ) => {
369- safelyNotifyError ( onError , asyncListenerError , {
370- async : true ,
371- raisedBy : 'listener' ,
372- phase : currentPhase ,
373- } )
374- }
375- )
399+ const result = await Outcome . try ( async ( ) =>
400+ entry . listener ( action , {
401+ ...api ,
402+ getOriginalState,
403+ condition,
404+ take,
405+ currentPhase,
406+ extra,
407+ unsubscribe : entry . unsubscribe ,
408+ subscribe : ( ) => {
409+ listenerMap . set ( entry . id , entry )
410+ } ,
411+ job : jobHandle ,
412+ cancelPrevious : ( ) => {
413+ entry . parentJob . cancelChildren (
414+ new JobCancellationException (
415+ JobCancellationReason . JobCancelled
416+ ) ,
417+ [ jobHandle ]
418+ )
419+ } ,
420+ } )
421+ )
422+ if (
423+ result . isError ( ) &&
424+ ! ( result . error instanceof JobCancellationException )
425+ ) {
426+ safelyNotifyError ( onError , result . error , {
427+ raisedBy : 'listener' ,
428+ phase : currentPhase ,
429+ } )
376430 }
377- } catch ( syncListenerError ) {
378- safelyNotifyError ( onError , syncListenerError , {
379- async : false ,
380- raisedBy : 'listener' ,
381- phase : currentPhase ,
382- } )
383- }
431+
432+ return Outcome . ok ( 1 )
433+ } )
384434 }
385435 if ( currentPhase === 'beforeReducer' ) {
386436 result = next ( action )
0 commit comments