Skip to content

Commit dcd48de

Browse files
remove support for stream() transform
1 parent 25de3df commit dcd48de

File tree

6 files changed

+9
-81
lines changed

6 files changed

+9
-81
lines changed

src/change_stream.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import type { Readable } from 'stream';
33
import type { Binary, Document, Timestamp } from './bson';
44
import { Collection } from './collection';
55
import { CHANGE, CLOSE, END, ERROR, INIT, MORE, RESPONSE, RESUME_TOKEN_CHANGED } from './constants';
6-
import { type CursorStreamOptions, CursorTimeoutContext } from './cursor/abstract_cursor';
6+
import { CursorTimeoutContext } from './cursor/abstract_cursor';
77
import { ChangeStreamCursor, type ChangeStreamCursorOptions } from './cursor/change_stream_cursor';
88
import { Db } from './db';
99
import {
@@ -598,7 +598,6 @@ export class ChangeStream<
598598
type: symbol;
599599
/** @internal */
600600
private cursor: ChangeStreamCursor<TSchema, TChange>;
601-
streamOptions?: CursorStreamOptions;
602601
/** @internal */
603602
private cursorStream?: Readable & AsyncIterable<TChange>;
604603
/** @internal */
@@ -866,13 +865,12 @@ export class ChangeStream<
866865
*
867866
* @throws MongoChangeStreamError if the underlying cursor or the change stream is closed
868867
*/
869-
stream(options?: CursorStreamOptions): Readable & AsyncIterable<TChange> {
868+
stream(): Readable & AsyncIterable<TChange> {
870869
if (this.closed) {
871870
throw new MongoChangeStreamError(CHANGESTREAM_CLOSED_ERROR);
872871
}
873872

874-
this.streamOptions = options;
875-
return this.cursor.stream(options);
873+
return this.cursor.stream();
876874
}
877875

878876
/** @internal */

src/cursor/abstract_cursor.ts

Lines changed: 1 addition & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,6 @@ export const CURSOR_FLAGS = [
5959
'partial'
6060
] as const;
6161

62-
/** @public */
63-
export interface CursorStreamOptions {
64-
/** A transformation method applied to each document emitted by the stream */
65-
transform?(this: void, doc: Document): Document;
66-
}
67-
6862
/** @public */
6963
export type CursorFlag = (typeof CURSOR_FLAGS)[number];
7064

@@ -523,7 +517,7 @@ export abstract class AbstractCursor<
523517
}
524518
}
525519

526-
stream(options?: CursorStreamOptions): Readable & AsyncIterable<TSchema> {
520+
stream(): Readable & AsyncIterable<TSchema> {
527521
const readable = new ReadableCursorStream(this);
528522
const abortListener = addAbortListener(this.signal, function () {
529523
readable.destroy(this.reason);
@@ -532,31 +526,6 @@ export abstract class AbstractCursor<
532526
abortListener?.[kDispose]();
533527
});
534528

535-
if (options?.transform) {
536-
const transform = options.transform;
537-
538-
const transformedStream = readable.pipe(
539-
new Transform({
540-
objectMode: true,
541-
highWaterMark: 1,
542-
transform(chunk, _, callback) {
543-
try {
544-
const transformed = transform(chunk);
545-
callback(undefined, transformed);
546-
} catch (err) {
547-
callback(err);
548-
}
549-
}
550-
})
551-
);
552-
553-
// Bubble errors to transformed stream, because otherwise no way
554-
// to handle this error.
555-
readable.on('error', err => transformedStream.emit('error', err));
556-
557-
return transformedStream;
558-
}
559-
560529
return readable;
561530
}
562531

src/index.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -366,8 +366,7 @@ export type {
366366
export type {
367367
AbstractCursorEvents,
368368
AbstractCursorOptions,
369-
CursorFlag,
370-
CursorStreamOptions
369+
CursorFlag
371370
} from './cursor/abstract_cursor';
372371
export type {
373372
CursorTimeoutContext,

test/integration/change-streams/change_stream.test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -786,7 +786,8 @@ describe('Change Streams', function () {
786786

787787
const transform = doc => ({ doc: JSON.stringify(doc) });
788788
changeStream
789-
.stream({ transform })
789+
.stream()
790+
.map(transform)
790791
.on('error', () => null)
791792
.pipe(outStream)
792793
.on('error', () => null);

test/integration/crud/misc_cursors.test.js

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1823,9 +1823,7 @@ describe('Cursor', function () {
18231823

18241824
const filename = path.join(os.tmpdir(), '_nodemongodbnative_stream_out.txt');
18251825
const out = fs.createWriteStream(filename);
1826-
const stream = collection.find().stream({
1827-
transform: doc => JSON.stringify(doc)
1828-
});
1826+
const stream = collection.find().stream().map(JSON.stringify);
18291827

18301828
stream.pipe(out);
18311829
// Wait for output stream to close
@@ -3746,14 +3744,13 @@ describe('Cursor', function () {
37463744
{ _id: 2, a: { b: 1, c: 0 } }
37473745
];
37483746
const resultSet = new Set();
3749-
const transformParam = transformFunc != null ? { transform: transformFunc } : null;
37503747
Promise.resolve()
37513748
.then(() => db.createCollection(collectionName))
37523749
.then(() => (collection = db.collection(collectionName)))
37533750
.then(() => collection.insertMany(docs))
37543751
.then(() => {
37553752
cursor = collection.find();
3756-
return cursor.stream(transformParam);
3753+
return cursor.stream().map(transformFunc ?? (doc => doc));
37573754
})
37583755
.then(stream => {
37593756
stream.on('data', function (doc) {

test/integration/node-specific/abstract_cursor.test.ts

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -317,42 +317,6 @@ describe('class AbstractCursor', function () {
317317
});
318318
});
319319

320-
describe('transform stream error handling', function () {
321-
let client: MongoClient;
322-
let collection: Collection;
323-
const docs = [{ count: 0 }];
324-
325-
beforeEach(async function () {
326-
client = this.configuration.newClient();
327-
328-
collection = client.db('abstract_cursor_integration').collection('test');
329-
330-
await collection.insertMany(docs);
331-
});
332-
333-
afterEach(async function () {
334-
await collection.deleteMany({});
335-
await client.close();
336-
});
337-
338-
it('propagates errors to transform stream', async function () {
339-
const transform = new Transform({
340-
transform(data, encoding, callback) {
341-
callback(null, data);
342-
}
343-
});
344-
345-
// MongoServerError: unknown operator: $bar
346-
const stream = collection.find({ foo: { $bar: 25 } }).stream({ transform });
347-
348-
const error: Error | null = await new Promise(resolve => {
349-
stream.on('error', error => resolve(error));
350-
stream.on('end', () => resolve(null));
351-
});
352-
expect(error).to.be.instanceof(MongoServerError);
353-
});
354-
});
355-
356320
describe('cursor end state', function () {
357321
let client: MongoClient;
358322
let cursor: FindCursor;

0 commit comments

Comments
 (0)