Skip to content

Commit 1407847

Browse files
baileympearsonZLY201
authored andcommitted
fix(NODE-4735): fix change stream consecutive resumabilty (mongodb#3453)
1 parent 0076623 commit 1407847

File tree

4 files changed

+187
-87
lines changed

4 files changed

+187
-87
lines changed

src/change_stream.ts

+39-28
Original file line numberDiff line numberDiff line change
@@ -648,21 +648,25 @@ export class ChangeStream<
648648
hasNext(callback?: Callback): Promise<boolean> | void {
649649
this._setIsIterator();
650650
return maybeCallback(async () => {
651-
try {
652-
const hasNext = await this.cursor.hasNext();
653-
return hasNext;
654-
} catch (error) {
651+
// Change streams must resume indefinitely while each resume event succeeds.
652+
// This loop continues until either a change event is received or until a resume attempt
653+
// fails.
654+
// eslint-disable-next-line no-constant-condition
655+
while (true) {
655656
try {
656-
await this._processErrorIteratorMode(error);
657657
const hasNext = await this.cursor.hasNext();
658658
return hasNext;
659659
} catch (error) {
660660
try {
661-
await this.close();
662-
} catch {
663-
// We are not concerned with errors from close()
661+
await this._processErrorIteratorMode(error);
662+
} catch (error) {
663+
try {
664+
await this.close();
665+
} catch {
666+
// We are not concerned with errors from close()
667+
}
668+
throw error;
664669
}
665-
throw error;
666670
}
667671
}
668672
}, callback);
@@ -675,23 +679,26 @@ export class ChangeStream<
675679
next(callback?: Callback<TChange>): Promise<TChange> | void {
676680
this._setIsIterator();
677681
return maybeCallback(async () => {
678-
try {
679-
const change = await this.cursor.next();
680-
const processedChange = this._processChange(change ?? null);
681-
return processedChange;
682-
} catch (error) {
682+
// Change streams must resume indefinitely while each resume event succeeds.
683+
// This loop continues until either a change event is received or until a resume attempt
684+
// fails.
685+
// eslint-disable-next-line no-constant-condition
686+
while (true) {
683687
try {
684-
await this._processErrorIteratorMode(error);
685688
const change = await this.cursor.next();
686689
const processedChange = this._processChange(change ?? null);
687690
return processedChange;
688691
} catch (error) {
689692
try {
690-
await this.close();
691-
} catch {
692-
// We are not concerned with errors from close()
693+
await this._processErrorIteratorMode(error);
694+
} catch (error) {
695+
try {
696+
await this.close();
697+
} catch {
698+
// We are not concerned with errors from close()
699+
}
700+
throw error;
693701
}
694-
throw error;
695702
}
696703
}
697704
}, callback);
@@ -706,21 +713,25 @@ export class ChangeStream<
706713
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
707714
this._setIsIterator();
708715
return maybeCallback(async () => {
709-
try {
710-
const change = await this.cursor.tryNext();
711-
return change ?? null;
712-
} catch (error) {
716+
// Change streams must resume indefinitely while each resume event succeeds.
717+
// This loop continues until either a change event is received or until a resume attempt
718+
// fails.
719+
// eslint-disable-next-line no-constant-condition
720+
while (true) {
713721
try {
714-
await this._processErrorIteratorMode(error);
715722
const change = await this.cursor.tryNext();
716723
return change ?? null;
717724
} catch (error) {
718725
try {
719-
await this.close();
720-
} catch {
721-
// We are not concerned with errors from close()
726+
await this._processErrorIteratorMode(error);
727+
} catch (error) {
728+
try {
729+
await this.close();
730+
} catch {
731+
// We are not concerned with errors from close()
732+
}
733+
throw error;
722734
}
723-
throw error;
724735
}
725736
}
726737
}, callback);

test/integration/change-streams/change_stream.test.ts

+147-50
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { promisify } from 'util';
1010
import {
1111
AbstractCursor,
1212
ChangeStream,
13+
ChangeStreamDocument,
1314
ChangeStreamOptions,
1415
Collection,
1516
CommandStartedEvent,
@@ -1037,56 +1038,6 @@ describe('Change Streams', function () {
10371038
});
10381039

10391040
describe('Change Stream Resume Error Tests', function () {
1040-
describe('TODO(NODE-4670): fix consecutive resumes unified tests', function () {
1041-
let client: MongoClient;
1042-
let changeStream: ChangeStream;
1043-
1044-
beforeEach(async function () {
1045-
client = this.configuration.newClient();
1046-
await client.connect();
1047-
});
1048-
1049-
afterEach(async function () {
1050-
await changeStream.close();
1051-
await client.close();
1052-
});
1053-
1054-
it('should support consecutive resumes', {
1055-
metadata: { requires: { topology: 'replicaset', mongodb: '>=4.2' } },
1056-
async test() {
1057-
const failCommand: FailPoint = {
1058-
configureFailPoint: 'failCommand',
1059-
mode: {
1060-
times: 2
1061-
},
1062-
data: {
1063-
failCommands: ['getMore'],
1064-
closeConnection: true
1065-
}
1066-
};
1067-
1068-
await client.db('admin').command(failCommand);
1069-
1070-
const collection = client.db('test_consecutive_resume').collection('collection');
1071-
1072-
changeStream = collection.watch([], { batchSize: 1 });
1073-
1074-
await initIteratorMode(changeStream);
1075-
1076-
await collection.insertOne({ name: 'bumpy' });
1077-
await collection.insertOne({ name: 'bumpy' });
1078-
await collection.insertOne({ name: 'bumpy' });
1079-
1080-
await sleep(1000);
1081-
1082-
for (let i = 0; i < 3; ++i) {
1083-
const change = await changeStream.next();
1084-
expect(change).not.to.be.null;
1085-
}
1086-
}
1087-
});
1088-
});
1089-
10901041
it.skip('should continue piping changes after a resumable error', {
10911042
metadata: { requires: { topology: 'replicaset' } },
10921043
test: done => {
@@ -1767,7 +1718,44 @@ describe('ChangeStream resumability', function () {
17671718
expect(aggregateEvents).to.have.lengthOf(2);
17681719
}
17691720
);
1721+
1722+
it(
1723+
`supports consecutive resumes on error code ${code} ${error}`,
1724+
{ requires: { topology: '!single', mongodb: '>=4.2' } },
1725+
async function () {
1726+
changeStream = collection.watch([]);
1727+
await initIteratorMode(changeStream);
1728+
1729+
await client.db('admin').command({
1730+
configureFailPoint: is4_2Server(this.configuration.version)
1731+
? 'failCommand'
1732+
: 'failGetMoreAfterCursorCheckout',
1733+
mode: { times: 5 },
1734+
data: {
1735+
failCommands: ['getMore'],
1736+
errorCode: code,
1737+
errmsg: message
1738+
}
1739+
} as FailPoint);
1740+
1741+
// There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when
1742+
// resuming a change stream don't return the change event. So we defer the insert until a period of time
1743+
// after the change stream has started listening for a change. 2000ms is long enough for the change
1744+
// stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding.
1745+
const [, value] = await Promise.allSettled([
1746+
sleep(2000).then(() => collection.insertOne({ name: 'bailey' })),
1747+
changeStream.next()
1748+
]);
1749+
1750+
const change = (value as PromiseFulfilledResult<ChangeStreamDocument>).value;
1751+
1752+
expect(change).to.have.property('operationType', 'insert');
1753+
1754+
expect(aggregateEvents).to.have.lengthOf(6);
1755+
}
1756+
);
17701757
}
1758+
17711759
for (const { error, code, message } of resumableErrorCodes) {
17721760
it(
17731761
`resumes on error code ${code} (${error})`,
@@ -1896,6 +1884,42 @@ describe('ChangeStream resumability', function () {
18961884
expect(aggregateEvents).to.have.lengthOf(2);
18971885
}
18981886
);
1887+
1888+
it(
1889+
`supports consecutive resumes on error code ${code} ${error}`,
1890+
{ requires: { topology: '!single', mongodb: '>=4.2' } },
1891+
async function () {
1892+
changeStream = collection.watch([]);
1893+
await initIteratorMode(changeStream);
1894+
1895+
await client.db('admin').command({
1896+
configureFailPoint: is4_2Server(this.configuration.version)
1897+
? 'failCommand'
1898+
: 'failGetMoreAfterCursorCheckout',
1899+
mode: { times: 5 },
1900+
data: {
1901+
failCommands: ['getMore'],
1902+
errorCode: code,
1903+
errmsg: message
1904+
}
1905+
} as FailPoint);
1906+
1907+
// There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when
1908+
// resuming a change stream don't return the change event. So we defer the insert until a period of time
1909+
// after the change stream has started listening for a change. 2000ms is long enough for the change
1910+
// stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding.
1911+
const [, value] = await Promise.allSettled([
1912+
sleep(2000).then(() => collection.insertOne({ name: 'bailey' })),
1913+
changeStream.hasNext()
1914+
]);
1915+
1916+
const change = (value as PromiseFulfilledResult<boolean>).value;
1917+
1918+
expect(change).to.be.true;
1919+
1920+
expect(aggregateEvents).to.have.lengthOf(6);
1921+
}
1922+
);
18991923
}
19001924

19011925
for (const { error, code, message } of resumableErrorCodes) {
@@ -2033,6 +2057,42 @@ describe('ChangeStream resumability', function () {
20332057
expect(aggregateEvents).to.have.lengthOf(2);
20342058
}
20352059
);
2060+
2061+
it(
2062+
`supports consecutive resumes on error code ${code} ${error}`,
2063+
{ requires: { topology: '!single', mongodb: '>=4.2' } },
2064+
async function () {
2065+
changeStream = collection.watch([]);
2066+
await initIteratorMode(changeStream);
2067+
2068+
await client.db('admin').command({
2069+
configureFailPoint: is4_2Server(this.configuration.version)
2070+
? 'failCommand'
2071+
: 'failGetMoreAfterCursorCheckout',
2072+
mode: { times: 5 },
2073+
data: {
2074+
failCommands: ['getMore'],
2075+
errorCode: code,
2076+
errmsg: message
2077+
}
2078+
} as FailPoint);
2079+
2080+
try {
2081+
// tryNext is not blocking and on sharded clusters we don't have control of when
2082+
// the actual change event will be ready on the change stream pipeline. This introduces
2083+
// a race condition, where sometimes we receive the change event and sometimes
2084+
// we don't when we call tryNext, depending on the timing of the sharded cluster.
2085+
2086+
// Since we really only care about the resumability, it's enough for this test to throw
2087+
// if tryNext ever throws and assert on the number of aggregate events.
2088+
await changeStream.tryNext();
2089+
} catch (err) {
2090+
expect.fail(`expected tryNext to resume, received error instead: ${err}`);
2091+
}
2092+
2093+
expect(aggregateEvents).to.have.lengthOf(6);
2094+
}
2095+
);
20362096
}
20372097

20382098
for (const { error, code, message } of resumableErrorCodes) {
@@ -2171,6 +2231,43 @@ describe('ChangeStream resumability', function () {
21712231
expect(aggregateEvents).to.have.lengthOf(2);
21722232
}
21732233
);
2234+
2235+
it(
2236+
`supports consecutive resumes on error code ${code} (${error})`,
2237+
{ requires: { topology: '!single', mongodb: '>=4.2' } },
2238+
async function () {
2239+
changeStream = collection.watch([]);
2240+
2241+
await client.db('admin').command({
2242+
configureFailPoint: is4_2Server(this.configuration.version)
2243+
? 'failCommand'
2244+
: 'failGetMoreAfterCursorCheckout',
2245+
mode: { times: 5 },
2246+
data: {
2247+
failCommands: ['getMore'],
2248+
errorCode: code,
2249+
errmsg: message
2250+
}
2251+
} as FailPoint);
2252+
2253+
const changes = once(changeStream, 'change');
2254+
await once(changeStream.cursor, 'init');
2255+
2256+
// There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when
2257+
// resuming a change stream don't return the change event. So we defer the insert until a period of time
2258+
// after the change stream has started listening for a change. 2000ms is long enough for the change
2259+
// stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding.
2260+
const [, value] = await Promise.allSettled([
2261+
sleep(2000).then(() => collection.insertOne({ name: 'bailey' })),
2262+
changes
2263+
]);
2264+
2265+
const [change] = (value as PromiseFulfilledResult<ChangeStreamDocument[]>).value;
2266+
expect(change).to.have.property('operationType', 'insert');
2267+
2268+
expect(aggregateEvents).to.have.lengthOf(6);
2269+
}
2270+
);
21742271
}
21752272

21762273
it(

test/integration/change-streams/change_streams.spec.test.ts

+1-5
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,5 @@ import { loadSpecTests } from '../../spec';
44
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';
55

66
describe('Change Streams Spec - Unified', function () {
7-
runUnifiedSuite(loadSpecTests(path.join('change-streams', 'unified')), test =>
8-
test.description === 'Test consecutive resume'
9-
? 'TODO(NODE-4670): fix consecutive resume change stream test'
10-
: false
11-
);
7+
runUnifiedSuite(loadSpecTests(path.join('change-streams', 'unified')));
128
});

test/integration/unified-test-format/unified_test_format.spec.test.ts

-4
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,6 @@ const filter: TestFilter = ({ description }) => {
2323
return 'TODO(NODE-3308): failures due unnecessary getMore and killCursors calls in 5.0';
2424
}
2525

26-
if (description === 'Test consecutive resume') {
27-
return 'TODO(NODE-4670): fix consecutive resume change stream test';
28-
}
29-
3026
if (
3127
process.env.AUTH === 'auth' &&
3228
[

0 commit comments

Comments
 (0)