@@ -370,31 +370,76 @@ describe('Change Streams', function () {
370370 }
371371 ) ;
372372
373- it ( 'should cache the change stream resume token using iterator form' , {
374- metadata : { requires : { topology : 'replicaset' } } ,
373+ describe ( 'cache the change stream resume token' , ( ) => {
374+ describe ( 'using iterator form' , ( ) => {
375+ context ( '#next' , ( ) => {
376+ it ( 'caches the resume token on change' , {
377+ metadata : { requires : { topology : 'replicaset' } } ,
375378
376- async test ( ) {
377- await initIteratorMode ( changeStream ) ;
378- collection . insertOne ( { a : 1 } ) ;
379+ async test ( ) {
380+ await initIteratorMode ( changeStream ) ;
381+ await collection . insertOne ( { a : 1 } ) ;
379382
380- const hasNext = await changeStream . hasNext ( ) ;
381- expect ( hasNext ) . to . be . true ;
383+ const change = await changeStream . next ( ) ;
384+ expect ( change ) . to . have . property ( '_id' ) . that . deep . equals ( changeStream . resumeToken ) ;
385+ }
386+ } ) ;
382387
383- const change = await changeStream . next ( ) ;
384- expect ( change ) . to . have . property ( '_id' ) . that . deep . equals ( changeStream . resumeToken ) ;
385- }
386- } ) ;
388+ it ( 'caches the resume token correctly when preceded by #hasNext' , {
389+ metadata : { requires : { topology : 'replicaset' } } ,
390+ async test ( ) {
391+ await initIteratorMode ( changeStream ) ;
392+ await collection . insertOne ( { a : 1 } ) ;
387393
388- it ( 'should cache the change stream resume token using event listener form' , {
389- metadata : { requires : { topology : 'replicaset' } } ,
390- async test ( ) {
391- const willBeChange = once ( changeStream , 'change' ) ;
392- await once ( changeStream . cursor , 'init' ) ;
393- collection . insertOne ( { a : 1 } ) ;
394+ await changeStream . hasNext ( ) ;
394395
395- const [ change ] = await willBeChange ;
396- expect ( change ) . to . have . property ( '_id' ) . that . deep . equals ( changeStream . resumeToken ) ;
397- }
396+ const change = await changeStream . next ( ) ;
397+ expect ( change ) . to . have . property ( '_id' ) . that . deep . equals ( changeStream . resumeToken ) ;
398+ }
399+ } ) ;
400+ } ) ;
401+
402+ it ( '#tryNext' , {
403+ metadata : { requires : { topology : 'replicaset' } } ,
404+
405+ async test ( ) {
406+ await initIteratorMode ( changeStream ) ;
407+ await collection . insertOne ( { a : 1 } ) ;
408+
409+ const change = await changeStream . tryNext ( ) ;
410+ expect ( change ) . to . have . property ( '_id' ) . that . deep . equals ( changeStream . resumeToken ) ;
411+ }
412+ } ) ;
413+
414+ context ( '#hasNext' , ( ) => {
415+ it ( 'does not cache the resume token' , {
416+ metadata : { requires : { topology : 'replicaset' } } ,
417+ async test ( ) {
418+ await initIteratorMode ( changeStream ) ;
419+ const resumeToken = changeStream . resumeToken ;
420+
421+ await collection . insertOne ( { a : 1 } ) ;
422+
423+ const hasNext = await changeStream . hasNext ( ) ;
424+ expect ( hasNext ) . to . be . true ;
425+
426+ expect ( changeStream . resumeToken ) . to . equal ( resumeToken ) ;
427+ }
428+ } ) ;
429+ } ) ;
430+ } ) ;
431+
432+ it ( 'should cache using event listener form' , {
433+ metadata : { requires : { topology : 'replicaset' } } ,
434+ async test ( ) {
435+ const willBeChange = once ( changeStream , 'change' ) ;
436+ await once ( changeStream . cursor , 'init' ) ;
437+ await collection . insertOne ( { a : 1 } ) ;
438+
439+ const [ change ] = await willBeChange ;
440+ expect ( change ) . to . have . property ( '_id' ) . that . deep . equals ( changeStream . resumeToken ) ;
441+ }
442+ } ) ;
398443 } ) ;
399444
400445 it ( 'should error if resume token projected out of change stream document using iterator' , {
@@ -1816,6 +1861,144 @@ describe('Change Streams', function () {
18161861 } ) ;
18171862 } ) ;
18181863 } ) ;
1864+
1865+ describe ( "NODE-4763 - doesn't produce duplicates after resume" , function ( ) {
1866+ let client : MongoClient ;
1867+ let collection : Collection ;
1868+ let changeStream : ChangeStream ;
1869+ let aggregateEvents : CommandStartedEvent [ ] = [ ] ;
1870+ const resumableError = { code : 6 , message : 'host unreachable' } ;
1871+
1872+ beforeEach ( async function ( ) {
1873+ const dbName = 'node-4763' ;
1874+ const collectionName = 'test-collection' ;
1875+
1876+ client = this . configuration . newClient ( { monitorCommands : true } ) ;
1877+ client . on ( 'commandStarted' , filterForCommands ( [ 'aggregate' ] , aggregateEvents ) ) ;
1878+ collection = client . db ( dbName ) . collection ( collectionName ) ;
1879+
1880+ changeStream = collection . watch ( [ ] ) ;
1881+ } ) ;
1882+
1883+ afterEach ( async function ( ) {
1884+ await client . db ( 'admin' ) . command ( {
1885+ configureFailPoint : is4_2Server ( this . configuration . version )
1886+ ? 'failCommand'
1887+ : 'failGetMoreAfterCursorCheckout' ,
1888+ mode : 'off'
1889+ } as FailCommandFailPoint ) ;
1890+
1891+ await changeStream . close ( ) ;
1892+ await client . close ( ) ;
1893+ aggregateEvents = [ ] ;
1894+ } ) ;
1895+
1896+ describe ( 'when using iterator form' , function ( ) {
1897+ it ( '#next' , { requires : { topology : 'replicaset' } } , async function test ( ) {
1898+ await initIteratorMode ( changeStream ) ;
1899+
1900+ await collection . insertOne ( { a : 1 } ) ;
1901+ const change = await changeStream . next ( ) ;
1902+ expect ( change ) . to . containSubset ( {
1903+ operationType : 'insert' ,
1904+ fullDocument : { a : 1 }
1905+ } ) ;
1906+
1907+ await client . db ( 'admin' ) . command ( {
1908+ configureFailPoint : is4_2Server ( this . configuration . version )
1909+ ? 'failCommand'
1910+ : 'failGetMoreAfterCursorCheckout' ,
1911+ mode : { times : 1 } ,
1912+ data : {
1913+ failCommands : [ 'getMore' ] ,
1914+ errorCode : resumableError . code ,
1915+ errmsg : resumableError . message
1916+ }
1917+ } as FailCommandFailPoint ) ;
1918+
1919+ await collection . insertOne ( { a : 2 } ) ;
1920+ const change2 = await changeStream . next ( ) ;
1921+ expect ( change2 ) . to . containSubset ( {
1922+ operationType : 'insert' ,
1923+ fullDocument : { a : 2 }
1924+ } ) ;
1925+
1926+ expect ( aggregateEvents . length ) . to . equal ( 2 ) ;
1927+ } ) ;
1928+
1929+ it ( '#tryNext' , { requires : { topology : 'replicaset' } } , async function test ( ) {
1930+ await initIteratorMode ( changeStream ) ;
1931+
1932+ await collection . insertOne ( { a : 1 } ) ;
1933+ const change = await changeStream . tryNext ( ) ;
1934+ expect ( change ) . to . containSubset ( {
1935+ operationType : 'insert' ,
1936+ fullDocument : { a : 1 }
1937+ } ) ;
1938+
1939+ await client . db ( 'admin' ) . command ( {
1940+ configureFailPoint : is4_2Server ( this . configuration . version )
1941+ ? 'failCommand'
1942+ : 'failGetMoreAfterCursorCheckout' ,
1943+ mode : { times : 1 } ,
1944+ data : {
1945+ failCommands : [ 'getMore' ] ,
1946+ errorCode : resumableError . code ,
1947+ errmsg : resumableError . message
1948+ }
1949+ } as FailCommandFailPoint ) ;
1950+
1951+ await collection . insertOne ( { a : 2 } ) ;
1952+ const change2 = await changeStream . tryNext ( ) ;
1953+ expect ( change2 ) . to . containSubset ( {
1954+ operationType : 'insert' ,
1955+ fullDocument : { a : 2 }
1956+ } ) ;
1957+
1958+ expect ( aggregateEvents . length ) . to . equal ( 2 ) ;
1959+ } ) ;
1960+ } ) ;
1961+
1962+ it ( 'in an event listener form' , { requires : { topology : 'replicaset' } } , async function ( ) {
1963+ const willBeChange = on ( changeStream , 'change' ) ;
1964+ await once ( changeStream . cursor , 'init' ) ;
1965+
1966+ await collection . insertOne ( { a : 1 } ) ;
1967+ const change = await willBeChange . next ( ) ;
1968+ expect ( change . value [ 0 ] ) . to . containSubset ( {
1969+ operationType : 'insert' ,
1970+ fullDocument : { a : 1 }
1971+ } ) ;
1972+
1973+ await client . db ( 'admin' ) . command ( {
1974+ configureFailPoint : is4_2Server ( this . configuration . version )
1975+ ? 'failCommand'
1976+ : 'failGetMoreAfterCursorCheckout' ,
1977+ mode : { times : 1 } ,
1978+ data : {
1979+ failCommands : [ 'getMore' ] ,
1980+ errorCode : resumableError . code ,
1981+ errmsg : resumableError . message
1982+ }
1983+ } as FailCommandFailPoint ) ;
1984+
1985+ // There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when
1986+ // resuming a change stream don't return the change event.
1987+ // So we defer the insert until a period of time after the change stream has received the first change.
1988+ // 2000ms is long enough for the change stream to attempt to resume and fail once before exhausting the failpoint
1989+ // and succeeding.
1990+ await sleep ( 2000 ) ;
1991+ await collection . insertOne ( { a : 2 } ) ;
1992+
1993+ const change2 = await willBeChange . next ( ) ;
1994+ expect ( change2 . value [ 0 ] ) . to . containSubset ( {
1995+ operationType : 'insert' ,
1996+ fullDocument : { a : 2 }
1997+ } ) ;
1998+
1999+ expect ( aggregateEvents . length ) . to . equal ( 2 ) ;
2000+ } ) ;
2001+ } ) ;
18192002} ) ;
18202003
18212004describe ( 'ChangeStream resumability' , function ( ) {
0 commit comments