Skip to content

Add Milestone Snapshots #236

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 40 additions & 25 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ var Agent = require('./agent');
var Connection = require('./client/connection');
var emitter = require('./emitter');
var MemoryDB = require('./db/memory');
var MilestoneDB = require('./milestone-db');
var MemoryPubSub = require('./pubsub/memory');
var ot = require('./ot');
var projections = require('./projections');
Expand All @@ -24,6 +25,7 @@ function Backend(options) {
this.pubsub = options.pubsub || new MemoryPubSub();
// This contains any extra databases that can be queried
this.extraDbs = options.extraDbs || {};
this.milestoneDb = options.milestoneDb || new MilestoneDB();

// Map from projected collection -> {type, fields}
this.projections = {};
Expand Down Expand Up @@ -609,40 +611,53 @@ Backend.prototype.fetchSnapshot = function(agent, index, id, version, callback)
};

Backend.prototype._fetchSnapshot = function (collection, id, version, callback) {
// Bypass backend.getOps so that we don't call _sanitizeOps. We want to avoid this, because:
// - we want to avoid the 'op' middleware, because we later use the 'readSnapshots' middleware in _sanitizeSnapshots
// - we handle the projection in _sanitizeSnapshots
this.db.getOps(collection, id, 0, version, null, function (error, ops) {
var db = this.db;
this.milestoneDb.getMilestoneSnapshot(collection, id, version, function (error, milestoneSnapshot) {
if (error) return callback(error);

var type = null;
var data;
var fetchedVersion = 0;
// Bypass backend.getOps so that we don't call _sanitizeOps. We want to avoid this, because:
// - we want to avoid the 'op' middleware, because we later use the 'readSnapshots' middleware in _sanitizeSnapshots
// - we handle the projection in _sanitizeSnapshots
var from = milestoneSnapshot ? milestoneSnapshot.v : 0;
db.getOps(collection, id, from, version, null, function (error, ops) {
if (error) return callback(error);

for (var index = 0; index < ops.length; index++) {
var op = ops[index];
fetchedVersion = op.v + 1;
var type = null;
var data;
var fetchedVersion = 0;

if (op.create) {
type = types.map[op.create.type];
if (milestoneSnapshot) {
type = types.map[milestoneSnapshot.type];
if (!type) return callback({ code: 4008, message: 'Unknown type' });
data = type.create(op.create.data);
} else if (op.del) {
data = undefined;
type = null;
} else {
data = type.apply(data, op.op);
data = milestoneSnapshot.data;
fetchedVersion = milestoneSnapshot.v;
}
}

type = type ? type.uri : null;
for (var index = 0; index < ops.length; index++) {
var op = ops[index];
fetchedVersion = op.v + 1;

if (op.create) {
type = types.map[op.create.type];
if (!type) return callback({ code: 4008, message: 'Unknown type' });
data = type.create(op.create.data);
} else if (op.del) {
data = undefined;
type = null;
} else {
data = type.apply(data, op.op);
}
}

if (version > fetchedVersion) {
return callback({ code: 4024, message: 'Requested version exceeds latest snapshot version' });
}
type = type ? type.uri : null;

var snapshot = new Snapshot(id, fetchedVersion, type, data, null);
callback(null, snapshot);
if (version > fetchedVersion) {
return callback({ code: 4024, message: 'Requested version exceeds latest snapshot version' });
}

var snapshot = new Snapshot(id, fetchedVersion, type, data, null);
callback(null, snapshot);
});
});
};

Expand Down
1 change: 1 addition & 0 deletions lib/db/memory.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ MemoryDB.prototype.commit = function(collection, id, op, snapshot, options, call
if (err) return callback(err);
err = db._writeSnapshotSync(collection, id, snapshot);
if (err) return callback(err);

var succeeded = true;
callback(null, succeeded);
});
Expand Down
2 changes: 2 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ Backend.Backend = Backend;
Backend.DB = require('./db');
Backend.Error = require('./error');
Backend.MemoryDB = require('./db/memory');
Backend.MemoryMilestoneDB = require('./milestone-db/memory');
Backend.MemoryPubSub = require('./pubsub/memory');
Backend.MilestoneDB = require('./milestone-db');
Backend.ot = require('./ot');
Backend.projections = require('./projections');
Backend.PubSub = require('./pubsub');
Expand Down
39 changes: 39 additions & 0 deletions lib/milestone-db/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
var emitter = require('../emitter');

module.exports = MilestoneDB;
function MilestoneDB(options) {
emitter.EventEmitter.call(this);

// The interval at which milestone snapshots should be saved
this.interval = options && options.interval;
}
emitter.mixin(MilestoneDB);

MilestoneDB.prototype.close = function(callback) {
if (callback) process.nextTick(callback);
};

/**
* Fetch a milestone snapshot from the database
* @param {string} collection - name of the snapshot's collection
* @param {string} id - ID of the snapshot to fetch
* @param {number} version - the desired version of the milestone snapshot. The database will return
* the most recent milestone snapshot whose version is equal to or less than the provided value
* @param {Function} callback - a callback to invoke once the snapshot has been fetched. Should have
* the signature (error, snapshot) => void;
*/
MilestoneDB.prototype.getMilestoneSnapshot = function (collection, id, version, callback) {
process.nextTick(callback, null, undefined);
};

/**
* @param {string} collection - name of the snapshot's collection
* @param {Snapshot} snapshot - the milestone snapshot to save
* @param {Function} callback (optional) - a callback to invoke after the snapshot has been saved.
* Should have the signature (error, wasSaved) => void;
*/
MilestoneDB.prototype.saveMilestoneSnapshot = function (collection, snapshot, callback) {
var saved = false;
if (callback) return process.nextTick(callback, null, saved);
this.emit('save', saved, collection, snapshot);
};
61 changes: 61 additions & 0 deletions lib/milestone-db/memory.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
var MilestoneDB = require('./index');

/**
* In-memory ShareDB milestone database
*
* Milestone snapshots exist to speed up Backend.fetchSnapshot by providing milestones
* on top of which fewer ops can be applied to reach a desired version of the document.
* This very concept relies on persistence, which means that an in-memory database like
* this is in no way appropriate for production use.
*
* The main purpose of this class is to provide a simple example of implementation,
* and for use in tests.
*/
module.exports = MemoryMilestoneDB;
function MemoryMilestoneDB(options) {
MilestoneDB.call(this, options);

// Map from collection name -> doc id -> array of milestone snapshots
this._milestoneSnapshots = {};
}

MemoryMilestoneDB.prototype = Object.create(MilestoneDB.prototype);

MemoryMilestoneDB.prototype.getMilestoneSnapshot = function (collection, id, version, callback) {
var milestoneSnapshots = this._getMilestoneSnapshotsSync(collection, id);

var milestoneSnapshot;
for (var i = 0; i < milestoneSnapshots.length; i++) {
var nextMilestoneSnapshot = milestoneSnapshots[i];
if (nextMilestoneSnapshot.v <= version || version === null) {
milestoneSnapshot = nextMilestoneSnapshot;
} else {
break;
}
}

process.nextTick(callback, null, milestoneSnapshot);
};

MemoryMilestoneDB.prototype.saveMilestoneSnapshot = function (collection, snapshot, callback) {
var saved = false;
if (!snapshot) {
if (callback) return process.nextTick(callback, null, saved);
this.emit('save', saved, collection, snapshot);
}

var milestoneSnapshots = this._getMilestoneSnapshotsSync(collection, snapshot.id);
milestoneSnapshots.push(snapshot);
milestoneSnapshots.sort(function (a, b) {
return a.v - b.v;
});

saved = true;
if (callback) return process.nextTick(callback, null, saved);
this.emit('save', saved, collection, snapshot);
};

MemoryMilestoneDB.prototype._getMilestoneSnapshotsSync = function (collection, id) {
var collectionSnapshots = this._milestoneSnapshots[collection] || (this._milestoneSnapshots[collection] = {});
return collectionSnapshots[id] || (collectionSnapshots[id] = []);
};
17 changes: 17 additions & 0 deletions lib/submit-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ function SubmitRequest(backend, agent, index, id, op, options) {
// For custom use in middleware
this.custom = {};

// Whether or not to store a milestone snapshot. If left as null, the milestone
// snapshots are saved according to the interval provided to the milestone db
// options. If overridden to a boolean value, then that value is used instead of
// the interval logic.
this.saveMilestoneSnapshot = null;
this.suppressPublish = backend.suppressPublish;
this.maxRetries = backend.maxSubmitRetries;
this.retries = 0;
Expand Down Expand Up @@ -159,6 +164,9 @@ SubmitRequest.prototype.commit = function(callback) {
if (request.collection !== request.index) op.i = request.index;
backend.pubsub.publish(request.channels, op);
}
if (request._shouldSaveMilestoneSnapshot(request.snapshot)) {
request.backend.milestoneDb.saveMilestoneSnapshot(request.collection, request.snapshot);
}
callback();
});
});
Expand Down Expand Up @@ -216,6 +224,15 @@ SubmitRequest.prototype._addSnapshotMeta = function() {
meta.mtime = this.start;
};

SubmitRequest.prototype._shouldSaveMilestoneSnapshot = function (snapshot) {
// If the flag is null, it's not been overridden by the consumer, so apply the interval
if (this.saveMilestoneSnapshot === null) {
return snapshot && snapshot.v % this.backend.milestoneDb.interval === 0;
}

return this.saveMilestoneSnapshot;
};

// Non-fatal client errors:
SubmitRequest.prototype.alreadySubmittedError = function() {
return {code: 4001, message: 'Op already submitted'};
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"expect.js": "^0.3.1",
"istanbul": "^0.4.2",
"jshint": "^2.9.2",
"mocha": "^5.2.0"
"mocha": "^5.2.0",
"sinon": "^6.1.5"
},
"scripts": {
"test": "./node_modules/.bin/mocha && npm run jshint",
Expand Down
48 changes: 48 additions & 0 deletions test/client/snapshot-request.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
var Backend = require('../../lib/backend');
var expect = require('expect.js');
var MemoryDb = require('../../lib/db/memory');
var MemoryMilestoneDb = require('../../lib/milestone-db/memory');
var sinon = require('sinon');
var util = require('../util');

describe('SnapshotRequest', function () {
var backend;
Expand Down Expand Up @@ -353,4 +357,48 @@ describe('SnapshotRequest', function () {
});
});
});

describe('milestone snapshots enabled for every other version', function () {
var milestoneDb;
var db;

beforeEach(function () {
var options = { interval: 2 };
db = new MemoryDb();
milestoneDb = new MemoryMilestoneDb(options);
backend = new Backend({
db: db,
milestoneDb: milestoneDb
});
});

it('fetches a snapshot using the milestone', function (done) {
var doc = backend.connect().get('books', 'mocking-bird');

util.callInSeries([
function (next) {
doc.create({ title: 'To Kill a Mocking Bird' }, next);
},
function (next) {
doc.submitOp({ p: ['author'], oi: 'Harper Lea' }, next);
},
function (next) {
doc.submitOp({ p: ['author'], od: 'Harper Lea', oi: 'Harper Lee' }, next);
},
function (next) {
sinon.spy(milestoneDb, 'getMilestoneSnapshot');
sinon.spy(db, 'getOps');
backend.connect().fetchSnapshot('books', 'mocking-bird', 3, next);
},
function (snapshot, next) {
expect(milestoneDb.getMilestoneSnapshot.calledOnce).to.be(true);
expect(db.getOps.calledWith('books', 'mocking-bird', 2, 3)).to.be(true);
expect(snapshot.v).to.be(3);
expect(snapshot.data).to.eql({ title: 'To Kill a Mocking Bird', author: 'Harper Lee' });
next();
},
done
]);
});
});
});
8 changes: 6 additions & 2 deletions test/db-memory.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,12 @@ function snapshotComparator(sortProperties) {

// Run all the DB-based tests against the BasicQueryableMemoryDB.
require('./db')({
create: function(callback) {
var db = new BasicQueryableMemoryDB();
create: function(options, callback) {
if (typeof options === 'function') {
callback = options;
options = null;
}
var db = new BasicQueryableMemoryDB(options);
callback(null, db);
},
getQuery: function(options) {
Expand Down
1 change: 1 addition & 0 deletions test/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ var async = require('async');
var expect = require('expect.js');
var Backend = require('../lib/backend');
var ot = require('../lib/ot');
var Snapshot = require('../lib/snapshot');

module.exports = function(options) {
var create = options.create;
Expand Down
13 changes: 13 additions & 0 deletions test/milestone-db-memory.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
var MemoryMilestoneDB = require('./../lib/milestone-db/memory');

require('./milestone-db')({
create: function(options, callback) {
if (typeof options === 'function') {
callback = options;
options = null;
}

var db = new MemoryMilestoneDB(options);
callback(null, db);
}
});
Loading