diff --git a/src/change_stream.ts b/src/change_stream.ts index 90bad77422..f5b94bdd56 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -20,16 +20,7 @@ import type { AggregateOptions } from './operations/aggregate'; import type { OperationParent } from './operations/command'; import type { ServerSessionId } from './sessions'; import { CSOTTimeoutContext, type TimeoutContext } from './timeout'; -import { filterOptions, getTopology, type MongoDBNamespace, squashError } from './utils'; - -const CHANGE_STREAM_OPTIONS = [ - 'resumeAfter', - 'startAfter', - 'startAtOperationTime', - 'fullDocument', - 'fullDocumentBeforeChange', - 'showExpandedEvents' -] as const; +import { type AnyOptions, getTopology, type MongoDBNamespace, squashError } from './utils'; const CHANGE_DOMAIN_TYPES = { COLLECTION: Symbol('Collection'), @@ -43,6 +34,14 @@ const NO_RESUME_TOKEN_ERROR = 'A change stream document has been received that lacks a resume token (_id).'; const CHANGESTREAM_CLOSED_ERROR = 'ChangeStream is closed'; +const INVALID_STAGE_OPTIONS = buildDisallowedChangeStreamOptions(); + +export function filterOutOptions(options: AnyOptions): AnyOptions { + return Object.fromEntries( + Object.entries(options).filter(([k, _]) => !INVALID_STAGE_OPTIONS.has(k)) + ); +} + /** * Represents the logical starting point for a new ChangeStream or resuming a ChangeStream on the server. * @see https://www.mongodb.com/docs/manual/changeStreams/#std-label-change-stream-resume @@ -898,7 +897,7 @@ export class ChangeStream< private _createChangeStreamCursor( options: ChangeStreamOptions | ChangeStreamCursorOptions ): ChangeStreamCursor { - const changeStreamStageOptions = filterOptions(options, CHANGE_STREAM_OPTIONS); + const changeStreamStageOptions: Document = filterOutOptions(options); if (this.type === CHANGE_DOMAIN_TYPES.CLUSTER) { changeStreamStageOptions.allChangesForCluster = true; } @@ -1084,3 +1083,76 @@ export class ChangeStream< } } } + +/** + * This function returns a list of options that are *not* supported by the $changeStream + * aggregation stage. This is best-effort - it uses the options "officially supported" by the driver + * to derive a list of known, unsupported options for the $changeStream stage. + * + * Notably, at runtime, users can still provide options unknown to the driver and the driver will + * *not* filter them out of the options object (see NODE-5510). + */ +function buildDisallowedChangeStreamOptions(): Set { + /** hard-coded list of allowed ChangeStream options */ + type CSOptions = + | 'resumeAfter' + | 'startAfter' + | 'startAtOperationTime' + | 'fullDocument' + | 'fullDocumentBeforeChange' + | 'showExpandedEvents'; + + /** + * a type representing all known options that the driver supports that are *not* change stream stage options. + * + * each known key is mapped to a non-optional string, so that if new driver-specific options are added, the + * instantiation of `denyList` below results in a TS error. + */ + type DisallowedOptions = { + [k in Exclude< + keyof ChangeStreamOptions & { timeoutContext: TimeoutContext }, + CSOptions + >]: string; + }; + + const denyList: DisallowedOptions = { + allowDiskUse: '', + authdb: '', + batchSize: '', + bsonRegExp: '', + bypassDocumentValidation: '', + bypassPinningCheck: '', + checkKeys: '', + collation: '', + comment: '', + cursor: '', + dbName: '', + enableUtf8Validation: '', + explain: '', + fieldsAsRaw: '', + hint: '', + ignoreUndefined: '', + let: '', + maxAwaitTimeMS: '', + maxTimeMS: '', + omitMaxTimeMS: '', + out: '', + promoteBuffers: '', + promoteLongs: '', + promoteValues: '', + raw: '', + rawData: '', + readConcern: '', + readPreference: '', + serializeFunctions: '', + session: '', + timeoutContext: '', + timeoutMS: '', + timeoutMode: '', + useBigInt64: '', + willRetryWrite: '', + writeConcern: '' + }; + + return new Set(Object.keys(denyList)); +} diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index a30ca19ed7..2b7e100386 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -157,10 +157,10 @@ describe('Change Streams', function () { }); }); - it('ignores any invalid option values', function () { + it('allows invalid option values', function () { const changeStream = collection.watch([], { invalidOption: true }); - expect(changeStream).not.to.have.nested.property( + expect(changeStream).to.have.nested.property( 'cursor.pipeline[0].$changeStream.invalidOption' ); }); @@ -1809,7 +1809,7 @@ describe('Change Streams', function () { }); context('invalid options', function () { - it('does not send invalid options on the aggregate command', { + it('server errors on invalid options on the initialize', { metadata: { requires: { topology: '!single' } }, test: async function () { const started: CommandStartedEvent[] = []; @@ -1819,35 +1819,8 @@ describe('Change Streams', function () { // @ts-expect-error: checking for invalid options cs = collection.watch([], doc); - const willBeChange = once(cs, 'change').then(args => args[0]); - await once(cs.cursor, 'init'); - - const result = await collection.insertOne({ a: Long.fromNumber(0) }); - expect(result).to.exist; - - await willBeChange; - expect(started[0].command).not.to.haveOwnProperty('invalidBSONOption'); - } - }); - - it('does not send invalid options on the getMore command', { - metadata: { requires: { topology: '!single' } }, - test: async function () { - const started: CommandStartedEvent[] = []; - - client.on('commandStarted', filterForCommands(['aggregate'], started)); - const doc = { invalidBSONOption: true }; - // @ts-expect-error: checking for invalid options - cs = collection.watch([], doc); - - const willBeChange = once(cs, 'change').then(args => args[0]); - await once(cs.cursor, 'init'); - - const result = await collection.insertOne({ a: Long.fromNumber(0) }); - expect(result).to.exist; - - await willBeChange; - expect(started[0].command).not.to.haveOwnProperty('invalidBSONOption'); + const error = await once(cs, 'change').catch(error => error); + expect(error).to.be.instanceOf(MongoServerError); } }); }); diff --git a/test/unit/change_stream.test.ts b/test/unit/change_stream.test.ts index 36976181bd..cefdd58a48 100644 --- a/test/unit/change_stream.test.ts +++ b/test/unit/change_stream.test.ts @@ -2,6 +2,7 @@ import { Long, Timestamp } from 'bson'; import { expect } from 'chai'; import * as sinon from 'sinon'; +import { filterOutOptions } from '../../src/change_stream'; import { ChangeStreamCursor } from '../../src/cursor/change_stream_cursor'; import { MongoClient } from '../../src/mongo_client'; import { MongoDBNamespace } from '../../src/utils'; @@ -11,6 +12,28 @@ describe('ChangeStreamCursor', function () { sinon.restore(); }); + describe('#filterOutOptions', function () { + const options = { + raw: false, + useBigInt64: false, + promoteLongs: true, + promoteValues: true, + promoteBuffers: false, + ignoreUndefined: false, + bsonRegExp: false, + serializeFunctions: false, + fieldsAsRaw: {}, + enableUtf8Validation: true, + fullDocument: true + }; + + it('filters out all invalid options', function () { + expect(filterOutOptions(options)).to.deep.equal({ + fullDocument: true + }); + }); + }); + describe('get resumeOptions()', function () { context('when there is a cached resumeToken', function () { it('copies all non-resume related options from the original cursor', function () {