Skip to content

Commit

Permalink
Update the mongodb driver to 4.0 (agenda#1358)
Browse files Browse the repository at this point in the history
* update mongodb driver

Signed-off-by: Haris Sulaiman <[email protected]>

* update node version

Signed-off-by: Haris Sulaiman <[email protected]>

* test for 14.x and up

Signed-off-by: Haris Sulaiman <[email protected]>

* fix build

Signed-off-by: Haris Sulaiman <[email protected]>

* fix save job

Signed-off-by: Haris Sulaiman <[email protected]>

* Skip a test

Signed-off-by: Haris Sulaiman <[email protected]>

* .

Signed-off-by: Haris Sulaiman <[email protected]>

* merge updates from upstream

Signed-off-by: Haris Sulaiman <[email protected]>

* fix tests

Signed-off-by: Haris Sulaiman <[email protected]>

* remove skip from should not cause unhandledRejection

Signed-off-by: Haris Sulaiman <[email protected]>

* add 5.0 to the test matrix

Signed-off-by: Haris Sulaiman <[email protected]>

* Add a Notice to Readme regarding the upgradation to drvier 4.0

Signed-off-by: Haris Sulaiman <[email protected]>
  • Loading branch information
harisvsulaiman authored Aug 5, 2021
1 parent 3be804b commit de7ab29
Show file tree
Hide file tree
Showing 17 changed files with 205 additions and 181 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [10.x, 12.x, 14.x, 16.x]
mongodb-version: [3.4, 3.6, 4.0, 4.2, 4.4]
node-version: [14.x, 16.x]
mongodb-version: [3.4, 3.6, 4.0, 4.2, 4.4, 5.0]
steps:
- name: Git checkout
uses: actions/checkout@v2
Expand Down
17 changes: 12 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ _Kudos for making the comparison chart goes to [Bull](https://www.npmjs.com/pack

# Installation

### Notice

In order to support new MongoDB 5.0 and mongodb node.js driver/package the next release (5.x.x) of Agenda will be major. The required node version will become >=12. The mongodb dependency version will become >=3.2.

Install via NPM

npm install agenda
Expand All @@ -63,16 +67,19 @@ You will also need a working [Mongo](https://www.mongodb.com/) database (v3) to
# CJS / Module Imports

for regular javascript code, just use the default entrypoint

```js
const Agenda = require('agenda');
const Agenda = require("agenda");
```

For Typescript, Webpack or other module imports, use `agenda/es` entrypoint:
e.g.

```ts
import { Agenda } from 'agenda/es';
import { Agenda } from "agenda/es";
```
***NOTE***: If you're migrating from `@types/agenda` you also should change imports to `agenda/es`.

**_NOTE_**: If you're migrating from `@types/agenda` you also should change imports to `agenda/es`.
Instead of `import Agenda from 'agenda'` use `import Agenda from 'agenda/es'`.

# Example Usage
Expand Down Expand Up @@ -551,7 +558,7 @@ This functionality can also be achieved by first retrieving all the jobs from th
Disables any jobs matching the passed mongodb-native query, preventing any matching jobs from being run by the Job Processor.

```js
const numDisabled = await agenda.disable({name: 'pollExternalService'});
const numDisabled = await agenda.disable({ name: "pollExternalService" });
```

Similar to `agenda.cancel()`, this functionality can be acheived with a combination of `agenda.jobs()` and `job.disable()`
Expand All @@ -561,7 +568,7 @@ Similar to `agenda.cancel()`, this functionality can be acheived with a combinat
Enables any jobs matching the passed mongodb-native query, allowing any matching jobs to be run by the Job Processor.

```js
const numEnabled = await agenda.enable({name: 'pollExternalService'});
const numEnabled = await agenda.enable({ name: "pollExternalService" });
```

Similar to `agenda.cancel()`, this functionality can be acheived with a combination of `agenda.jobs()` and `job.enable()`
Expand Down
10 changes: 5 additions & 5 deletions lib/agenda/cancel.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Agenda } from ".";
import createDebugger from "debug";
import { FilterQuery } from "mongodb";
import { Document, Filter } from "mongodb";

const debug = createDebugger("agenda:cancel");

Expand All @@ -14,13 +14,13 @@ const debug = createDebugger("agenda:cancel");
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export const cancel = async function (
this: Agenda,
query: FilterQuery<any>
query: Filter<Document>
): Promise<number | undefined> {
debug("attempting to cancel all Agenda jobs", query);
try {
const { result } = await this._collection.deleteMany(query);
debug("%s jobs cancelled", result.n);
return result.n;
const { deletedCount } = await this._collection.deleteMany(query);
debug("%s jobs cancelled", deletedCount);
return deletedCount;
} catch (error) {
debug("error trying to delete jobs from MongoDB");
throw error;
Expand Down
30 changes: 13 additions & 17 deletions lib/agenda/database.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Collection, MongoClient, MongoClientOptions } from "mongodb";
import createDebugger from "debug";
import { hasMongoProtocol } from "./has-mongo-protocol";
import { AnyError, Collection, MongoClient, MongoClientOptions } from "mongodb";
import { Agenda } from ".";
import { hasMongoProtocol } from "./has-mongo-protocol";

const debug = createDebugger("agenda:database");

Expand All @@ -24,24 +24,15 @@ export const database = function (
this: Agenda,
url: string,
collection?: string,
options?: MongoClientOptions,
cb?: (error: Error, collection: Collection<any> | null) => void
options: MongoClientOptions = {},
cb?: (error: AnyError | undefined, collection: Collection<any> | null) => void
): Agenda | void {
if (!hasMongoProtocol(url)) {
url = "mongodb://" + url;
}

const reconnectOptions =
options?.useUnifiedTopology === true
? {}
: {
autoReconnect: true,
reconnectTries: Number.MAX_SAFE_INTEGER,
reconnectInterval: this._processEvery,
};

collection = collection || "agendaJobs";
options = { ...reconnectOptions, ...options };

MongoClient.connect(url, options, (error, client) => {
if (error) {
debug("error connecting to MongoDB using collection: [%s]", collection);
Expand All @@ -58,9 +49,14 @@ export const database = function (
"successful connection to MongoDB using collection: [%s]",
collection
);
this._db = client;
this._mdb = client.db();
this.db_init(collection, cb);

if (client) {
this._db = client;
this._mdb = client.db();
this.db_init(collection, cb);
} else {
throw new Error("Mongo Client is undefined");
}
});
return this;
};
6 changes: 3 additions & 3 deletions lib/agenda/db-init.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import createDebugger from "debug";
import { Collection } from "mongodb";
import { AnyError, Collection } from "mongodb";
import { Agenda } from ".";

const debug = createDebugger("agenda:db_init");
Expand All @@ -14,15 +14,15 @@ const debug = createDebugger("agenda:db_init");
export const dbInit = function (
this: Agenda,
collection = "agendaJobs",
cb?: (error: Error, collection: Collection<any> | null) => void
cb?: (error: AnyError | undefined, collection: Collection<any> | null) => void
): void {
debug("init database collection using name [%s]", collection);
this._collection = this._mdb.collection(collection);
debug("attempting index creation");
this._collection.createIndex(
this._indices,
{ name: "findAndLockNextJobIndex" },
(error: Error) => {
(error) => {
if (error) {
debug("index creation failed");
this.emit("error", error);
Expand Down
10 changes: 5 additions & 5 deletions lib/agenda/disable.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import createDebugger from "debug";
import { FilterQuery } from "mongodb";
import { Filter } from "mongodb";
import { Agenda } from ".";
const debug = createDebugger("agenda:disable");

Expand All @@ -12,15 +12,15 @@ const debug = createDebugger("agenda:disable");
*/
export const disable = async function (
this: Agenda,
query: FilterQuery<unknown> = {}
query: Filter<unknown> = {}
): Promise<number> {
debug("attempting to disable all jobs matching query", query);
try {
const { result } = await this._collection.updateMany(query, {
const { modifiedCount } = await this._collection.updateMany(query, {
$set: { disabled: true },
});
debug("%s jobs disabled", result.n);
return result.n;
debug("%s jobs disabled");
return modifiedCount;
} catch (error) {
debug("error trying to mark jobs as `disabled`");
throw error;
Expand Down
10 changes: 5 additions & 5 deletions lib/agenda/enable.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import createDebugger from "debug";
import { FilterQuery } from "mongodb";
import { Filter } from "mongodb";
import { Agenda } from ".";
const debug = createDebugger("agenda:enable");

Expand All @@ -13,15 +13,15 @@ const debug = createDebugger("agenda:enable");
*/
export const enable = async function (
this: Agenda,
query: FilterQuery<unknown> = {}
query: Filter<unknown> = {}
): Promise<number> {
debug("attempting to enable all jobs matching query", query);
try {
const { result } = await this._collection.updateMany(query, {
const { modifiedCount } = await this._collection.updateMany(query, {
$set: { disabled: false },
});
debug("%s jobs enabled", result.n);
return result.n;
debug("%s jobs enabled", modifiedCount);
return modifiedCount;
} catch (error) {
debug("error trying to mark jobs as `enabled`");
throw error;
Expand Down
125 changes: 50 additions & 75 deletions lib/agenda/find-and-lock-next-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,85 +23,60 @@ export const findAndLockNextJob = async function (
const lockDeadline = new Date(Date.now().valueOf() - definition.lockLifetime);
debug("_findAndLockNextJob(%s, [Function])", jobName);

// Don't try and access MongoDB if we've lost connection to it.
// Trying to resolve crash on Dev PC when it resumes from sleep. NOTE: Does this still happen?
// @ts-expect-error
const s = this._mdb.s || this._mdb.db.s;
if (
s.topology.connections &&
s.topology.connections().length === 0 &&
!this._mongoUseUnifiedTopology
) {
if (s.topology.autoReconnect && !s.topology.isDestroyed()) {
// Continue processing but notify that Agenda has lost the connection
debug(
"Missing MongoDB connection, not attempting to find and lock a job"
);
this.emit("error", new Error("Lost MongoDB connection"));
} else {
// No longer recoverable
debug(
"topology.autoReconnect: %s, topology.isDestroyed(): %s",
s.topology.autoReconnect,
s.topology.isDestroyed()
);
throw new Error(
"MongoDB connection is not recoverable, application restart required"
);
}
} else {
// /**
// * Query used to find job to run
// * @type {{$and: [*]}}
// */
const JOB_PROCESS_WHERE_QUERY = {
$and: [
{
name: jobName,
disabled: { $ne: true },
},
{
$or: [
{
lockedAt: { $eq: null },
nextRunAt: { $lte: this._nextScanAt },
},
{
lockedAt: { $lte: lockDeadline },
},
],
},
],
};
//**
//* Query used to find job to run
//* @type {{$and: [*]}}
//*/
const JOB_PROCESS_WHERE_QUERY = {
$and: [
{
name: jobName,
disabled: { $ne: true },
},
{
$or: [
{
lockedAt: { $eq: null },
nextRunAt: { $lte: this._nextScanAt },
},
{
lockedAt: { $lte: lockDeadline },
},
],
},
],
};

/**
* Query used to set a job as locked
* @type {{$set: {lockedAt: Date}}}
*/
const JOB_PROCESS_SET_QUERY = { $set: { lockedAt: now } };
/**
* Query used to set a job as locked
* @type {{$set: {lockedAt: Date}}}
*/
const JOB_PROCESS_SET_QUERY = { $set: { lockedAt: now } };

/**
* Query used to affect what gets returned
* @type {{returnOriginal: boolean, sort: object}}
*/
const JOB_RETURN_QUERY = { returnOriginal: false, sort: this._sort };
/**
* Query used to affect what gets returned
* @type {{returnOriginal: boolean, sort: object}}
*/
const JOB_RETURN_QUERY = { returnDocument: "after", sort: this._sort };

// Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed
const result = await this._collection.findOneAndUpdate(
JOB_PROCESS_WHERE_QUERY,
JOB_PROCESS_SET_QUERY,
JOB_RETURN_QUERY
);
// Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed
const result = await this._collection.findOneAndUpdate(
JOB_PROCESS_WHERE_QUERY,
JOB_PROCESS_SET_QUERY,
// @ts-ignore
JOB_RETURN_QUERY
);

let job: Job | undefined = undefined;
if (result.value) {
debug(
"found a job available to lock, creating a new job on Agenda with id [%s]",
result.value._id
);
job = createJob(this, result.value);
}
let job: Job | undefined = undefined;
if (result.value) {
debug(
"found a job available to lock, creating a new job on Agenda with id [%s]",
result.value._id
);

return job;
// @ts-ignore
job = createJob(this, result.value);
}

return job;
};
6 changes: 5 additions & 1 deletion lib/agenda/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
Db as MongoDb,
Collection,
MongoClientOptions,
AnyError,
} from "mongodb";
import { JobProcessingQueue } from "./job-processing-queue";
import { cancel } from "./cancel";
Expand Down Expand Up @@ -132,7 +133,10 @@ class Agenda extends EventEmitter {
*/
constructor(
config: AgendaConfig = {},
cb?: (error: Error, collection: Collection<any> | null) => void
cb?: (
error: AnyError | undefined,
collection: Collection<any> | null
) => void
) {
super();

Expand Down
4 changes: 2 additions & 2 deletions lib/agenda/jobs.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { FilterQuery } from "mongodb";
import { Filter } from "mongodb";
import { Agenda } from ".";
import { Job } from "../job";
import { createJob } from "../utils";
Expand All @@ -15,7 +15,7 @@ import { createJob } from "../utils";
*/
export const jobs = async function (
this: Agenda,
query: FilterQuery<any> = {},
query: Filter<any> = {},
sort = {},
limit = 0,
skip = 0
Expand Down
Loading

0 comments on commit de7ab29

Please sign in to comment.