11'use strict' ;
2- const { once } = require ( 'events' ) ;
2+ const { on , once } = require ( 'events' ) ;
33const { Readable, Writable } = require ( 'stream' ) ;
44
55const { MessageStream } = require ( '../../../src/cmap/message_stream' ) ;
@@ -26,11 +26,14 @@ describe('MessageStream', function () {
2626 let firstHello ;
2727 let secondHello ;
2828 let thirdHello ;
29+ let partial ;
2930
3031 beforeEach ( function ( ) {
3132 firstHello = generateOpMsgBuffer ( response ) ;
3233 secondHello = generateOpMsgBuffer ( response ) ;
3334 thirdHello = generateOpMsgBuffer ( response ) ;
35+ partial = Buffer . alloc ( 5 ) ;
36+ partial . writeInt32LE ( 100 , 0 ) ;
3437 } ) ;
3538
3639 it ( 'only reads the last message in the buffer' , async function ( ) {
@@ -46,6 +49,22 @@ describe('MessageStream', function () {
4649 // Make sure there is nothing left in the buffer.
4750 expect ( messageStream . buffer . length ) . to . equal ( 0 ) ;
4851 } ) ;
52+
53+ it ( 'does not read partial messages' , async function ( ) {
54+ const inputStream = bufferToStream (
55+ Buffer . concat ( [ firstHello , secondHello , thirdHello , partial ] )
56+ ) ;
57+ const messageStream = new MessageStream ( ) ;
58+ messageStream . isStreamingProtocol = true ;
59+
60+ inputStream . pipe ( messageStream ) ;
61+ const messages = await once ( messageStream , 'message' ) ;
62+ const msg = messages [ 0 ] ;
63+ msg . parse ( ) ;
64+ expect ( msg ) . to . have . property ( 'documents' ) . that . deep . equals ( [ response ] ) ;
65+ // Make sure the buffer wasn't read to the end.
66+ expect ( messageStream . buffer . length ) . to . equal ( 5 ) ;
67+ } ) ;
4968 } ) ;
5069
5170 context ( 'when the stream is not using the streaming protocol' , function ( ) {
@@ -62,52 +81,48 @@ describe('MessageStream', function () {
6281 thirdHello = generateOpMsgBuffer ( response ) ;
6382 } ) ;
6483
65- it ( 'reads all messages in the buffer' , function ( done ) {
84+ it ( 'reads all messages in the buffer' , async function ( ) {
6685 const inputStream = bufferToStream ( Buffer . concat ( [ firstHello , secondHello , thirdHello ] ) ) ;
6786 const messageStream = new MessageStream ( ) ;
6887
69- messageStream . on ( 'message' , msg => {
88+ inputStream . pipe ( messageStream ) ;
89+ for await ( const messages of on ( messageStream , 'message' ) ) {
7090 messageCount ++ ;
91+ const msg = messages [ 0 ] ;
7192 msg . parse ( ) ;
7293 expect ( msg ) . to . have . property ( 'documents' ) . that . deep . equals ( [ response ] ) ;
7394 // Test will not complete until 3 messages processed.
7495 if ( messageCount === 3 ) {
75- done ( ) ;
96+ return ;
7697 }
77- } ) ;
78-
79- inputStream . pipe ( messageStream ) ;
98+ }
8099 } ) ;
81100 } ) ;
82101
83102 context ( 'when the messages are invalid' , function ( ) {
84103 context ( 'when the message size is negative' , function ( ) {
85- it ( 'emits an error' , function ( done ) {
104+ it ( 'emits an error' , async function ( ) {
86105 const inputStream = bufferToStream ( Buffer . from ( 'ffffffff' , 'hex' ) ) ;
87106 const messageStream = new MessageStream ( ) ;
88107
89- messageStream . on ( 'error' , err => {
90- expect ( err ) . to . have . property ( 'message' ) . that . equals ( 'Invalid message size: -1' ) ;
91- done ( ) ;
92- } ) ;
93-
94108 inputStream . pipe ( messageStream ) ;
109+ const errors = await once ( messageStream , 'error' )
110+ const err = errors [ 0 ] ;
111+ expect ( err ) . to . have . property ( 'message' ) . that . equals ( 'Invalid message size: -1' ) ;
95112 } ) ;
96113 } ) ;
97114
98115 context ( 'when the message size exceeds the bson maximum' , function ( ) {
99- it ( 'emits an error' , function ( done ) {
116+ it ( 'emits an error' , async function ( ) {
100117 const inputStream = bufferToStream ( Buffer . from ( '01000004' , 'hex' ) ) ;
101118 const messageStream = new MessageStream ( ) ;
102119
103- messageStream . on ( 'error' , err => {
104- expect ( err )
105- . to . have . property ( 'message' )
106- . that . equals ( 'Invalid message size: 67108865, max allowed: 67108864' ) ;
107- done ( ) ;
108- } ) ;
109-
110120 inputStream . pipe ( messageStream ) ;
121+ const errors = await once ( messageStream , 'error' ) ;
122+ const err = errors [ 0 ] ;
123+ expect ( err )
124+ . to . have . property ( 'message' )
125+ . that . equals ( 'Invalid message size: 67108865, max allowed: 67108864' ) ;
111126 } ) ;
112127 } ) ;
113128 } ) ;
0 commit comments