@@ -1677,4 +1677,109 @@ describe('Change Streams', function() {
16771677 . then ( ( ) => finish ( ) , err => finish ( err ) ) ;
16781678 }
16791679 } ) ;
1680+
1681+ describe ( 'should properly handle a changeStream event being processed mid-close' , function ( ) {
1682+ let client , coll ;
1683+
1684+ function write ( ) {
1685+ return Promise . resolve ( )
1686+ . then ( ( ) => coll . insertOne ( { a : 1 } ) )
1687+ . then ( ( ) => coll . insertOne ( { b : 2 } ) )
1688+ . then ( ( ) => coll . insertOne ( { c : 3 } ) ) ;
1689+ }
1690+
1691+ beforeEach ( function ( ) {
1692+ client = this . configuration . newClient ( ) ;
1693+ return client . connect ( ) . then ( _client => {
1694+ client = _client ;
1695+ coll = client . db ( this . configuration . db ) . collection ( 'tester' ) ;
1696+ } ) ;
1697+ } ) ;
1698+
1699+ afterEach ( function ( ) {
1700+ coll = undefined ;
1701+ if ( client ) {
1702+ return client . close ( ) . then ( ( ) => {
1703+ client = undefined ;
1704+ } ) ;
1705+ }
1706+ } ) ;
1707+
1708+ it ( 'when invoked with promises' , {
1709+ metadata : { requires : { topology : 'replicaset' , mongodb : '>=3.5.10' } } ,
1710+ test : function ( ) {
1711+ function read ( ) {
1712+ const changeStream = coll . watch ( ) ;
1713+ return Promise . resolve ( )
1714+ . then ( ( ) => changeStream . next ( ) )
1715+ . then ( ( ) => changeStream . next ( ) )
1716+ . then ( ( ) => {
1717+ const nextP = changeStream . next ( ) ;
1718+
1719+ return changeStream . close ( ) . then ( ( ) => nextP ) ;
1720+ } ) ;
1721+ }
1722+
1723+ return Promise . all ( [ read ( ) , write ( ) ] ) . then (
1724+ ( ) => Promise . reject ( new Error ( 'Expected operation to fail with error' ) ) ,
1725+ err => expect ( err . message ) . to . equal ( 'ChangeStream is closed' )
1726+ ) ;
1727+ }
1728+ } ) ;
1729+
1730+ it ( 'when invoked with callbacks' , {
1731+ metadata : { requires : { topology : 'replicaset' , mongodb : '>=3.5.10' } } ,
1732+ test : function ( done ) {
1733+ const changeStream = coll . watch ( ) ;
1734+
1735+ changeStream . next ( ( ) => {
1736+ changeStream . next ( ( ) => {
1737+ changeStream . next ( err => {
1738+ let _err = null ;
1739+ try {
1740+ expect ( err . message ) . to . equal ( 'ChangeStream is closed' ) ;
1741+ } catch ( e ) {
1742+ _err = e ;
1743+ } finally {
1744+ done ( _err ) ;
1745+ }
1746+ } ) ;
1747+ changeStream . close ( ) ;
1748+ } ) ;
1749+ } ) ;
1750+
1751+ write ( ) . catch ( ( ) => { } ) ;
1752+ }
1753+ } ) ;
1754+
1755+ it ( 'when invoked using eventEmitter API' , {
1756+ metadata : { requires : { topology : 'replicaset' , mongodb : '>=3.5.10' } } ,
1757+ test : function ( done ) {
1758+ let closed = false ;
1759+ const close = _err => {
1760+ if ( closed ) {
1761+ return ;
1762+ }
1763+ closed = true ;
1764+ return done ( _err ) ;
1765+ } ;
1766+
1767+ const changeStream = coll . watch ( ) ;
1768+
1769+ let counter = 0 ;
1770+ changeStream . on ( 'change' , ( ) => {
1771+ counter += 1 ;
1772+ if ( counter === 2 ) {
1773+ changeStream . close ( ) ;
1774+ setTimeout ( ( ) => close ( ) ) ;
1775+ } else if ( counter >= 3 ) {
1776+ close ( new Error ( 'Should not have received more than 2 events' ) ) ;
1777+ }
1778+ } ) ;
1779+ changeStream . on ( 'error' , err => close ( err ) ) ;
1780+
1781+ setTimeout ( ( ) => write ( ) . catch ( ( ) => { } ) ) ;
1782+ }
1783+ } ) ;
1784+ } ) ;
16801785} ) ;
0 commit comments