Skip to content

Commit aeab23c

Browse files
committed
Merge remote-tracking branch 'origin/master' into NODE-2757/add-collation-to-bulk-find-operators
2 parents ceb2c58 + 634ae4f commit aeab23c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+1231
-913
lines changed

package.json

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@
2727
},
2828
"dependencies": {
2929
"bson": "^4.0.4",
30-
"denque": "^1.4.1",
31-
"lodash": "^4.17.20"
30+
"denque": "^1.4.1"
3231
},
3332
"devDependencies": {
3433
"@istanbuljs/nyc-config-typescript": "^1.0.1",
@@ -38,7 +37,6 @@
3837
"@types/bl": "^2.1.0",
3938
"@types/bson": "^4.0.2",
4039
"@types/kerberos": "^1.1.0",
41-
"@types/lodash": "^4.14.164",
4240
"@types/node": "^14.6.4",
4341
"@types/saslprep": "^1.0.0",
4442
"@typescript-eslint/eslint-plugin": "^3.10.0",

src/bulk/common.ts

Lines changed: 32 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import {
1313
} from '../utils';
1414
import { executeOperation } from '../operations/execute_operation';
1515
import { InsertOperation } from '../operations/insert';
16-
import { UpdateOperation } from '../operations/update';
17-
import { DeleteOperation } from '../operations/delete';
16+
import { UpdateOperation, UpdateStatement } from '../operations/update';
17+
import { DeleteOperation, DeleteStatement } from '../operations/delete';
1818
import { WriteConcern } from '../write_concern';
1919
import type { Collection } from '../collection';
2020
import type { Topology } from '../sdam/topology';
@@ -28,7 +28,7 @@ const WRITE_CONCERN_ERROR = 64;
2828
export const BatchType = {
2929
INSERT: 1,
3030
UPDATE: 2,
31-
REMOVE: 3
31+
DELETE: 3
3232
} as const;
3333

3434
/** @public */
@@ -139,12 +139,12 @@ export interface BulkResult {
139139
*
140140
* @public
141141
*/
142-
export class Batch {
142+
export class Batch<T = Document> {
143143
originalZeroIndex: number;
144144
currentIndex: number;
145145
originalIndexes: number[];
146146
batchType: BatchTypeId;
147-
operations: Document[];
147+
operations: T[];
148148
size: number;
149149
sizeBytes: number;
150150

@@ -483,12 +483,12 @@ function mergeBatchResults(
483483
}
484484

485485
// If we have an insert Batch type
486-
if (batch.batchType === BatchType.INSERT && result.n) {
486+
if (isInsertBatch(batch) && result.n) {
487487
bulkResult.nInserted = bulkResult.nInserted + result.n;
488488
}
489489

490490
// If we have an insert Batch type
491-
if (batch.batchType === BatchType.REMOVE && result.n) {
491+
if (isDeleteBatch(batch) && result.n) {
492492
bulkResult.nRemoved = bulkResult.nRemoved + result.n;
493493
}
494494

@@ -514,7 +514,7 @@ function mergeBatchResults(
514514
}
515515

516516
// If we have an update Batch type
517-
if (batch.batchType === BatchType.UPDATE && result.n) {
517+
if (isUpdateBatch(batch) && result.n) {
518518
const nModified = result.nModified;
519519
bulkResult.nUpserted = bulkResult.nUpserted + nUpserted;
520520
bulkResult.nMatched = bulkResult.nMatched + (result.n - nUpserted);
@@ -605,30 +605,30 @@ function executeCommands(
605605
}
606606

607607
if (finalOptions.retryWrites) {
608-
if (batch.batchType === BatchType.UPDATE) {
608+
if (isUpdateBatch(batch)) {
609609
finalOptions.retryWrites = finalOptions.retryWrites && !batch.operations.some(op => op.multi);
610610
}
611611

612-
if (batch.batchType === BatchType.REMOVE) {
612+
if (isDeleteBatch(batch)) {
613613
finalOptions.retryWrites =
614614
finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0);
615615
}
616616
}
617617

618618
try {
619-
if (batch.batchType === BatchType.INSERT) {
619+
if (isInsertBatch(batch)) {
620620
executeOperation(
621621
bulkOperation.s.topology,
622622
new InsertOperation(bulkOperation.s.namespace, batch.operations, finalOptions),
623623
resultHandler
624624
);
625-
} else if (batch.batchType === BatchType.UPDATE) {
625+
} else if (isUpdateBatch(batch)) {
626626
executeOperation(
627627
bulkOperation.s.topology,
628628
new UpdateOperation(bulkOperation.s.namespace, batch.operations, finalOptions),
629629
resultHandler
630630
);
631-
} else if (batch.batchType === BatchType.REMOVE) {
631+
} else if (isDeleteBatch(batch)) {
632632
executeOperation(
633633
bulkOperation.s.topology,
634634
new DeleteOperation(bulkOperation.s.namespace, batch.operations, finalOptions),
@@ -811,12 +811,12 @@ export class FindOperators {
811811

812812
/** Add a delete one operation to the bulk operation */
813813
deleteOne(): BulkOperationBase {
814-
return this.bulkOperation.addToOperationsList(BatchType.REMOVE, this.makeDeleteDocument(1));
814+
return this.bulkOperation.addToOperationsList(BatchType.DELETE, this.makeDeleteDocument(1));
815815
}
816816

817817
/** Add a delete many operation to the bulk operation */
818818
delete(): BulkOperationBase {
819-
return this.bulkOperation.addToOperationsList(BatchType.REMOVE, this.makeDeleteDocument(0));
819+
return this.bulkOperation.addToOperationsList(BatchType.DELETE, this.makeDeleteDocument(0));
820820
}
821821

822822
removeOne(): BulkOperationBase {
@@ -1059,7 +1059,7 @@ export abstract class BulkOperationBase {
10591059
* bulkOp.find({ h: 8 }).delete();
10601060
*
10611061
* // Add a replaceOne
1062-
* bulkOp.find({ i: 9 }).replaceOne({ j: 10 });
1062+
* bulkOp.find({ i: 9 }).replaceOne({writeConcern: { j: 10 }});
10631063
*
10641064
* // Update using a pipeline (requires Mongodb 4.2 or higher)
10651065
* bulk.find({ k: 11, y: { $exists: true }, z: { $exists: true } }).updateOne([
@@ -1143,28 +1143,28 @@ export abstract class BulkOperationBase {
11431143

11441144
if ('removeOne' in op) {
11451145
return this.addToOperationsList(
1146-
BatchType.REMOVE,
1146+
BatchType.DELETE,
11471147
makeDeleteStatement(this.s.topology, op.removeOne, false)
11481148
);
11491149
}
11501150

11511151
if ('removeMany' in op) {
11521152
return this.addToOperationsList(
1153-
BatchType.REMOVE,
1153+
BatchType.DELETE,
11541154
makeDeleteStatement(this.s.topology, op.removeMany, true)
11551155
);
11561156
}
11571157

11581158
if ('deleteOne' in op) {
11591159
return this.addToOperationsList(
1160-
BatchType.REMOVE,
1160+
BatchType.DELETE,
11611161
makeDeleteStatement(this.s.topology, op.deleteOne, false)
11621162
);
11631163
}
11641164

11651165
if ('deleteMany' in op) {
11661166
return this.addToOperationsList(
1167-
BatchType.REMOVE,
1167+
BatchType.DELETE,
11681168
makeDeleteStatement(this.s.topology, op.deleteMany, true)
11691169
);
11701170
}
@@ -1303,24 +1303,6 @@ function shouldForceServerObjectId(bulkOperation: BulkOperationBase): boolean {
13031303
return false;
13041304
}
13051305

1306-
/** @public */
1307-
export interface UpdateStatement {
1308-
/** The query that matches documents to update. */
1309-
q: Document;
1310-
/** The modifications to apply. */
1311-
u: Document | Document[];
1312-
/** If true, perform an insert if no documents match the query. */
1313-
upsert?: boolean;
1314-
/** If true, updates all documents that meet the query criteria. */
1315-
multi?: boolean;
1316-
/** Specifies the collation to use for the operation. */
1317-
collation?: CollationOptions;
1318-
/** An array of filter documents that determines which array elements to modify for an update operation on an array field. */
1319-
arrayFilters?: Document[];
1320-
/** A document or string that specifies the index to use to support the query predicate. */
1321-
hint?: Hint;
1322-
}
1323-
13241306
function makeUpdateStatement(
13251307
topology: Topology,
13261308
model: ReplaceOneModel | UpdateOneModel | UpdateManyModel,
@@ -1370,18 +1352,6 @@ function isUpdateStatement(model: Document): model is UpdateStatement {
13701352
return 'q' in model;
13711353
}
13721354

1373-
/** @public */
1374-
export interface DeleteStatement {
1375-
/** The query that matches documents to delete. */
1376-
q: Document;
1377-
/** The number of matching documents to delete. */
1378-
limit: number;
1379-
/** Specifies the collation to use for the operation. */
1380-
collation?: CollationOptions;
1381-
/** A document or string that specifies the index to use to support the query predicate. */
1382-
hint?: Hint;
1383-
}
1384-
13851355
function makeDeleteStatement(
13861356
topology: Topology,
13871357
model: DeleteOneModel | DeleteManyModel,
@@ -1420,3 +1390,15 @@ function makeDeleteStatement(
14201390
function isDeleteStatement(model: Document): model is DeleteStatement {
14211391
return 'q' in model;
14221392
}
1393+
1394+
function isInsertBatch(batch: Batch): boolean {
1395+
return batch.batchType === BatchType.INSERT;
1396+
}
1397+
1398+
function isUpdateBatch(batch: Batch): batch is Batch<UpdateStatement> {
1399+
return batch.batchType === BatchType.UPDATE;
1400+
}
1401+
1402+
function isDeleteBatch(batch: Batch): batch is Batch<DeleteStatement> {
1403+
return batch.batchType === BatchType.DELETE;
1404+
}

src/bulk/ordered.ts

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,9 @@
11
import * as BSON from '../bson';
2-
import {
3-
BulkOperationBase,
4-
Batch,
5-
BatchType,
6-
BulkWriteOptions,
7-
UpdateStatement,
8-
DeleteStatement,
9-
BatchTypeId
10-
} from './common';
2+
import { BulkOperationBase, Batch, BatchType, BulkWriteOptions, BatchTypeId } from './common';
113
import type { Document } from '../bson';
124
import type { Collection } from '../collection';
5+
import type { UpdateStatement } from '../operations/update';
6+
import type { DeleteStatement } from '../operations/delete';
137

148
/** @public */
159
export class OrderedBulkOperation extends BulkOperationBase {

src/bulk/unordered.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ import {
44
Batch,
55
BatchType,
66
BulkWriteOptions,
7-
UpdateStatement,
8-
DeleteStatement,
97
BulkWriteResult,
108
BatchTypeId
119
} from './common';
1210
import type { Callback } from '../utils';
1311
import type { Document } from '../bson';
1412
import type { Collection } from '../collection';
13+
import type { UpdateStatement } from '../operations/update';
14+
import type { DeleteStatement } from '../operations/delete';
1515

1616
/** @public */
1717
export class UnorderedBulkOperation extends BulkOperationBase {
@@ -52,7 +52,7 @@ export class UnorderedBulkOperation extends BulkOperationBase {
5252
this.s.currentBatch = this.s.currentInsertBatch;
5353
} else if (batchType === BatchType.UPDATE) {
5454
this.s.currentBatch = this.s.currentUpdateBatch;
55-
} else if (batchType === BatchType.REMOVE) {
55+
} else if (batchType === BatchType.DELETE) {
5656
this.s.currentBatch = this.s.currentRemoveBatch;
5757
}
5858

@@ -99,7 +99,7 @@ export class UnorderedBulkOperation extends BulkOperationBase {
9999
});
100100
} else if (batchType === BatchType.UPDATE) {
101101
this.s.currentUpdateBatch = this.s.currentBatch;
102-
} else if (batchType === BatchType.REMOVE) {
102+
} else if (batchType === BatchType.DELETE) {
103103
this.s.currentRemoveBatch = this.s.currentBatch;
104104
}
105105

src/change_stream.ts

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ const CHANGE_DOMAIN_TYPES = {
4646
const NO_RESUME_TOKEN_ERROR = new MongoError(
4747
'A change stream document has been received that lacks a resume token (_id).'
4848
);
49+
const NO_CURSOR_ERROR = new MongoError('ChangeStream has no cursor');
4950
const CHANGESTREAM_CLOSED_ERROR = new MongoError('ChangeStream is closed');
5051

5152
/** @public */
@@ -287,8 +288,7 @@ export class ChangeStream extends EventEmitter {
287288
next(callback?: Callback): Promise<void> | void {
288289
return maybePromise(callback, cb => {
289290
getCursor(this, (err, cursor) => {
290-
if (err) return cb(err); // failed to resume, raise an error
291-
if (!cursor) return cb(new MongoError('Cursor is undefined'));
291+
if (err || !cursor) return cb(err); // failed to resume, raise an error
292292
cursor.next((error, change) => {
293293
if (error) {
294294
this[kResumeQueue].push(() => this.next(cb));
@@ -330,11 +330,23 @@ export class ChangeStream extends EventEmitter {
330330
*/
331331
stream(options?: CursorStreamOptions): Readable {
332332
this.streamOptions = options;
333-
if (!this.cursor) {
334-
throw new MongoError('ChangeStream has no cursor, unable to stream');
335-
}
333+
if (!this.cursor) throw NO_CURSOR_ERROR;
336334
return this.cursor.stream(options);
337335
}
336+
337+
/**
338+
* Try to get the next available document from the Change Stream's cursor or `null` if an empty batch is returned
339+
*/
340+
tryNext(): Promise<Document | null>;
341+
tryNext(callback: Callback<Document | null>): void;
342+
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
343+
return maybePromise(callback, cb => {
344+
getCursor(this, (err, cursor) => {
345+
if (err || !cursor) return cb(err); // failed to resume, raise an error
346+
return cursor.tryNext(cb);
347+
});
348+
});
349+
}
338350
}
339351

340352
/** @internal */
@@ -707,11 +719,16 @@ function getCursor(changeStream: ChangeStream, callback: Callback<ChangeStreamCu
707719
function processResumeQueue(changeStream: ChangeStream, err?: Error) {
708720
while (changeStream[kResumeQueue].length) {
709721
const request = changeStream[kResumeQueue].pop();
710-
if (changeStream[kClosed] && !err) {
711-
request(CHANGESTREAM_CLOSED_ERROR);
712-
return;
722+
if (!err) {
723+
if (changeStream[kClosed]) {
724+
request(CHANGESTREAM_CLOSED_ERROR);
725+
return;
726+
}
727+
if (!changeStream.cursor) {
728+
request(NO_CURSOR_ERROR);
729+
return;
730+
}
713731
}
714-
715732
request(err, changeStream.cursor);
716733
}
717734
}

0 commit comments

Comments
 (0)