Skip to content

Commit f642e0f

Browse files
committed
feat: implement sync-within-subscribe marker
1 parent bdf4cfd commit f642e0f

File tree

4 files changed

+76
-43
lines changed

4 files changed

+76
-43
lines changed

spec/schedulers/TestScheduler-spec.ts

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { intervalProvider } from 'rxjs/internal/scheduler/intervalProvider';
1111
declare const rxTestScheduler: TestScheduler;
1212

1313
/** @test {TestScheduler} */
14-
describe('TestScheduler', () => {
14+
describe.only('TestScheduler', () => {
1515
it('should exist', () => {
1616
expect(TestScheduler).exist;
1717
expect(TestScheduler).to.be.a('function');
@@ -25,76 +25,88 @@ describe('TestScheduler', () => {
2525
it('should parse a marble string into a series of notifications and types', () => {
2626
const result = TestScheduler.parseMarbles('-------a---b---|', { a: 'A', b: 'B' });
2727
expect(result).deep.equal([
28-
{ frame: 70, notification: nextNotification('A') },
29-
{ frame: 110, notification: nextNotification('B') },
30-
{ frame: 150, notification: COMPLETE_NOTIFICATION }
28+
{ frame: 70, notification: nextNotification('A'), subscribing: false },
29+
{ frame: 110, notification: nextNotification('B'), subscribing: false },
30+
{ frame: 150, notification: COMPLETE_NOTIFICATION, subscribing: false }
3131
]);
3232
});
3333

3434
it('should parse a marble string, allowing spaces too', () => {
3535
const result = TestScheduler.parseMarbles('--a--b--| ', { a: 'A', b: 'B' });
3636
expect(result).deep.equal([
37-
{ frame: 20, notification: nextNotification('A') },
38-
{ frame: 50, notification: nextNotification('B') },
39-
{ frame: 80, notification: COMPLETE_NOTIFICATION }
37+
{ frame: 20, notification: nextNotification('A'), subscribing: false },
38+
{ frame: 50, notification: nextNotification('B'), subscribing: false },
39+
{ frame: 80, notification: COMPLETE_NOTIFICATION, subscribing: false }
4040
]);
4141
});
4242

4343
it('should parse a marble string with a subscription point', () => {
4444
const result = TestScheduler.parseMarbles('---^---a---b---|', { a: 'A', b: 'B' });
4545
expect(result).deep.equal([
46-
{ frame: 40, notification: nextNotification('A') },
47-
{ frame: 80, notification: nextNotification('B') },
48-
{ frame: 120, notification: COMPLETE_NOTIFICATION }
46+
{ frame: 40, notification: nextNotification('A'), subscribing: false },
47+
{ frame: 80, notification: nextNotification('B'), subscribing: false },
48+
{ frame: 120, notification: COMPLETE_NOTIFICATION, subscribing: false }
4949
]);
5050
});
5151

5252
it('should parse a marble string with an error', () => {
5353
const result = TestScheduler.parseMarbles('-------a---b---#', { a: 'A', b: 'B' }, 'omg error!');
5454
expect(result).deep.equal([
55-
{ frame: 70, notification: nextNotification('A') },
56-
{ frame: 110, notification: nextNotification('B') },
57-
{ frame: 150, notification: errorNotification('omg error!') }
55+
{ frame: 70, notification: nextNotification('A'), subscribing: false },
56+
{ frame: 110, notification: nextNotification('B'), subscribing: false },
57+
{ frame: 150, notification: errorNotification('omg error!'), subscribing: false }
5858
]);
5959
});
6060

6161
it('should default in the letter for the value if no value hash was passed', () => {
6262
const result = TestScheduler.parseMarbles('--a--b--c--');
6363
expect(result).deep.equal([
64-
{ frame: 20, notification: nextNotification('a') },
65-
{ frame: 50, notification: nextNotification('b') },
66-
{ frame: 80, notification: nextNotification('c') },
64+
{ frame: 20, notification: nextNotification('a'), subscribing: false },
65+
{ frame: 50, notification: nextNotification('b'), subscribing: false },
66+
{ frame: 80, notification: nextNotification('c'), subscribing: false },
6767
]);
6868
});
6969

7070
it('should handle grouped values', () => {
7171
const result = TestScheduler.parseMarbles('---(abc)---');
7272
expect(result).deep.equal([
73-
{ frame: 30, notification: nextNotification('a') },
74-
{ frame: 30, notification: nextNotification('b') },
75-
{ frame: 30, notification: nextNotification('c') }
73+
{ frame: 30, notification: nextNotification('a'), subscribing: false },
74+
{ frame: 30, notification: nextNotification('b'), subscribing: false },
75+
{ frame: 30, notification: nextNotification('c'), subscribing: false }
7676
]);
7777
});
7878

7979
it('should ignore whitespace when runMode=true', () => {
8080
const runMode = true;
8181
const result = TestScheduler.parseMarbles(' -a - b - c | ', { a: 'A', b: 'B', c: 'C' }, undefined, undefined, runMode);
8282
expect(result).deep.equal([
83-
{ frame: 10, notification: nextNotification('A') },
84-
{ frame: 30, notification: nextNotification('B') },
85-
{ frame: 50, notification: nextNotification('C') },
86-
{ frame: 60, notification: COMPLETE_NOTIFICATION }
83+
{ frame: 10, notification: nextNotification('A'), subscribing: false },
84+
{ frame: 30, notification: nextNotification('B'), subscribing: false },
85+
{ frame: 50, notification: nextNotification('C'), subscribing: false },
86+
{ frame: 60, notification: COMPLETE_NOTIFICATION, subscribing: false }
8787
]);
8888
});
8989

90-
it('should suppport time progression syntax when runMode=true', () => {
90+
it('should support time progression syntax when runMode=true', () => {
9191
const runMode = true;
9292
const result = TestScheduler.parseMarbles('10.2ms a 1.2s b 1m c|', { a: 'A', b: 'B', c: 'C' }, undefined, undefined, runMode);
9393
expect(result).deep.equal([
94-
{ frame: 10.2, notification: nextNotification('A') },
95-
{ frame: 10.2 + 10 + (1.2 * 1000), notification: nextNotification('B') },
96-
{ frame: 10.2 + 10 + (1.2 * 1000) + 10 + (1000 * 60), notification: nextNotification('C') },
97-
{ frame: 10.2 + 10 + (1.2 * 1000) + 10 + (1000 * 60) + 10, notification: COMPLETE_NOTIFICATION }
94+
{ frame: 10.2, notification: nextNotification('A'), subscribing: false },
95+
{ frame: 10.2 + 10 + (1.2 * 1000), notification: nextNotification('B'), subscribing: false },
96+
{ frame: 10.2 + 10 + (1.2 * 1000) + 10 + (1000 * 60), notification: nextNotification('C'), subscribing: false },
97+
{ frame: 10.2 + 10 + (1.2 * 1000) + 10 + (1000 * 60) + 10, notification: COMPLETE_NOTIFICATION, subscribing: false }
98+
]);
99+
});
100+
101+
it('should support a synchronous-within-subscribe marker', () => {
102+
const runMode = true;
103+
const result = TestScheduler.parseMarbles('--(^abc)--(d|)', undefined, undefined, undefined, runMode);
104+
expect(result).deep.equal([
105+
{ frame: 20, notification: nextNotification('a'), subscribing: true },
106+
{ frame: 20, notification: nextNotification('b'), subscribing: true },
107+
{ frame: 20, notification: nextNotification('c'), subscribing: true },
108+
{ frame: 100, notification: nextNotification('d'), subscribing: false },
109+
{ frame: 100, notification: COMPLETE_NOTIFICATION, subscribing: false }
98110
]);
99111
});
100112
});
@@ -161,10 +173,10 @@ describe('TestScheduler', () => {
161173
expect(expected.length).to.equal(0);
162174
});
163175

164-
it('should emit notifications at frame zero synchronously upon subscription', () => {
176+
it('should emit notifications marked with "^" synchronously upon subscription', () => {
165177
const result: string[] = [];
166178
const scheduler = new TestScheduler(null!);
167-
const source = scheduler.createColdObservable('(ab|)');
179+
const source = scheduler.createColdObservable('(^ab|)');
168180
source.subscribe({
169181
next: (value) => result.push(value),
170182
complete: () => result.push('complete'),
@@ -252,7 +264,7 @@ describe('TestScheduler', () => {
252264
});
253265

254266
it('should handle empty', () => {
255-
expectObservable(EMPTY).toBe('|', {});
267+
expectObservable(EMPTY).toBe('(^|)', {});
256268
});
257269

258270
it('should handle never', () => {

src/internal/testing/ColdObservable.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ export class ColdObservable<T> extends Observable<T> implements SubscriptionLogg
4141
const messagesLength = this.messages.length;
4242
for (let i = 0; i < messagesLength; i++) {
4343
const message = this.messages[i];
44-
if (message.frame === 0) {
44+
if (message.subscribing) {
4545
observeNotification(message.notification, subscriber);
4646
} else {
4747
subscriber.add(

src/internal/testing/TestMessage.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@ import { ObservableNotification } from '../types';
33
export interface TestMessage {
44
frame: number;
55
notification: ObservableNotification<any>;
6-
isGhost?: boolean;
6+
subscribing: boolean;
77
}

src/internal/testing/TestScheduler.ts

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ export class TestScheduler extends VirtualTimeScheduler {
8686
* @param error The error to use for the `#` marble (if present).
8787
*/
8888
createColdObservable<T = string>(marbles: string, values?: { [marble: string]: T }, error?: any): ColdObservable<T> {
89-
if (marbles.indexOf('^') !== -1) {
89+
if (marbles.match(/[^(]\^/)) {
9090
throw new Error('cold observable cannot have subscription offset "^"');
9191
}
9292
if (marbles.indexOf('!') !== -1) {
@@ -116,13 +116,15 @@ export class TestScheduler extends VirtualTimeScheduler {
116116
private materializeInnerObservable(observable: Observable<any>,
117117
outerFrame: number): TestMessage[] {
118118
const messages: TestMessage[] = [];
119+
let subscribing = true;
119120
observable.subscribe((value) => {
120-
messages.push({ frame: this.frame - outerFrame, notification: nextNotification(value) });
121+
messages.push({ frame: this.frame - outerFrame, notification: nextNotification(value), subscribing });
121122
}, (error) => {
122-
messages.push({ frame: this.frame - outerFrame, notification: errorNotification(error) });
123+
messages.push({ frame: this.frame - outerFrame, notification: errorNotification(error), subscribing });
123124
}, () => {
124-
messages.push({ frame: this.frame - outerFrame, notification: COMPLETE_NOTIFICATION });
125+
messages.push({ frame: this.frame - outerFrame, notification: COMPLETE_NOTIFICATION, subscribing });
125126
});
127+
subscribing = false;
126128
return messages;
127129
}
128130

@@ -137,18 +139,20 @@ export class TestScheduler extends VirtualTimeScheduler {
137139
let subscription: Subscription;
138140

139141
this.schedule(() => {
142+
let subscribing = true;
140143
subscription = observable.subscribe(x => {
141144
let value = x;
142145
// Support Observable-of-Observables
143146
if (x instanceof Observable) {
144147
value = this.materializeInnerObservable(value, this.frame);
145148
}
146-
actual.push({ frame: this.frame, notification: nextNotification(value) });
149+
actual.push({ frame: this.frame, notification: nextNotification(value), subscribing });
147150
}, (error) => {
148-
actual.push({ frame: this.frame, notification: errorNotification(error) });
151+
actual.push({ frame: this.frame, notification: errorNotification(error), subscribing });
149152
}, () => {
150-
actual.push({ frame: this.frame, notification: COMPLETE_NOTIFICATION });
153+
actual.push({ frame: this.frame, notification: COMPLETE_NOTIFICATION, subscribing });
151154
});
155+
subscribing = false;
152156
}, subscriptionFrame);
153157

154158
if (unsubscriptionFrame !== Infinity) {
@@ -308,8 +312,13 @@ export class TestScheduler extends VirtualTimeScheduler {
308312
}
309313
const len = marbles.length;
310314
const testMessages: TestMessage[] = [];
311-
const subIndex = runMode ? marbles.replace(/^[ ]+/, '').indexOf('^') : marbles.indexOf('^');
312-
let frame = subIndex === -1 ? 0 : (subIndex * -this.frameTimeFactor);
315+
const subMarbles = runMode ? marbles.replace(/^[ ]+/, '') : marbles;
316+
const subIndex = subMarbles.indexOf('^');
317+
// If the ^ character is the first character within a () group, it's a
318+
// synchronous-within-subscribe indicator for a cold observable.
319+
let frame = (subIndex === -1) || (subMarbles.charAt(subIndex - 1) === '(')
320+
? 0
321+
: (subIndex * -this.frameTimeFactor);
313322
const getValue = typeof values !== 'object' ?
314323
(x: any) => x :
315324
(x: any) => {
@@ -320,6 +329,7 @@ export class TestScheduler extends VirtualTimeScheduler {
320329
return values[x];
321330
};
322331
let groupStart = -1;
332+
let subscribing = false;
323333

324334
for (let i = 0; i < len; i++) {
325335
let nextFrame = frame;
@@ -345,13 +355,20 @@ export class TestScheduler extends VirtualTimeScheduler {
345355
break;
346356
case ')':
347357
groupStart = -1;
358+
subscribing = false;
348359
advanceFrameBy(1);
349360
break;
350361
case '|':
351362
notification = COMPLETE_NOTIFICATION;
352363
advanceFrameBy(1);
353364
break;
354365
case '^':
366+
if (groupStart > -1) {
367+
if (testMessages.length) {
368+
throw new Error('the synchronous-upon-subscription marker "^" must precede all notifications');
369+
}
370+
subscribing = true;
371+
}
355372
advanceFrameBy(1);
356373
break;
357374
case '#':
@@ -398,7 +415,11 @@ export class TestScheduler extends VirtualTimeScheduler {
398415
}
399416

400417
if (notification) {
401-
testMessages.push({ frame: groupStart > -1 ? groupStart : frame, notification });
418+
testMessages.push({
419+
frame: groupStart > -1 ? groupStart : frame,
420+
notification,
421+
subscribing
422+
});
402423
}
403424

404425
frame = nextFrame;

0 commit comments

Comments
 (0)