Skip to content

Commit

Permalink
Fetch snapshot by time
Browse files Browse the repository at this point in the history
This change adds the ability fetch a snapshot by time. The motivation
for this is that fetching a document by time is quite a "natural" way
to think about document history, and allows us to - for example - fetch
multiple documents as they were at a given time, without having to
look up their exact version numbers first.

We add a new `Connection.fetchSnapshotByTimestamp` method, which
follows a very similar route to `Connection.fetchSnapshot`, and where
possible, as much code is re-used as possible:

  - both methods use a subclassed child of `SnapshotRequest`
  - both methods have their requests handled by the same machinery in
    `Connection`
  - both methods in the `Backend` have ops applied by a common method,
    but use their own methods for calls to middleware

In order to make this feature possible at scale, this change also adds
two new methods to the `MilestoneDB` interface:

  - `getMilestoneSnapshotAtOrBeforeTime`
  - `getMilestoneSnapshotAtOrAfterTime`

These methods are used to fetch milestone snapshots either side of the
requested timestamp, which means we only need to fetch the ops between
the two of them to reach the desired timestamp.

In the case where a milestone database is not being used, then fetching
a snapshot by timestamp is still possible, but it will fetch all the ops
for a document, and keep applying them from v0 until the timestamp is
reached, which is not particularly scalable.
  • Loading branch information
Alec Gibson committed Dec 3, 2018
1 parent 95c81b8 commit 513f6b6
Show file tree
Hide file tree
Showing 16 changed files with 1,091 additions and 76 deletions.
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,27 @@ Get a read-only snapshot of a document at the requested version.
}
```

`connection.fetchSnapshotByTimestamp(collection, id, timestamp, callback): void;`
Get a read-only snapshot of a document at the requested version.

* `collection` _(String)_
Collection name of the snapshot
* `id` _(String)_
ID of the snapshot
* `timestamp` _(number) [optional]_
The timestamp of the desired snapshot. The returned snapshot will be the latest snapshot before the provided timestamp
* `callback` _(Function)_
Called with `(error, snapshot)`, where `snapshot` takes the following form:

```javascript
{
id: string; // ID of the snapshot
v: number; // version number of the snapshot
type: string; // the OT type of the snapshot, or null if it doesn't exist or is deleted
data: any; // the snapshot
}
```

### Class: `ShareDB.Doc`

`doc.type` _(String_)
Expand Down Expand Up @@ -464,3 +485,5 @@ The `41xx` and `51xx` codes are reserved for use by ShareDB DB adapters, and the
* 5018 - Required QueryEmitter listener not assigned
* 5019 - getMilestoneSnapshot MilestoneDB method unimplemented
* 5020 - saveMilestoneSnapshot MilestoneDB method unimplemented
* 5021 - getMilestoneSnapshotBeforeTime MilestoneDB method unimplemented
* 5022 - getMilestoneSnapshotAfterTime MilestoneDB method unimplemented
6 changes: 6 additions & 0 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ Agent.prototype._handleMessage = function(request, callback) {
return this._submit(request.c, request.d, op, callback);
case 'nf':
return this._fetchSnapshot(request.c, request.d, request.v, callback);
case 'nt':
return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback);
default:
callback({code: 4000, message: 'Invalid or unknown message'});
}
Expand Down Expand Up @@ -589,3 +591,7 @@ Agent.prototype._createOp = function(request) {
Agent.prototype._fetchSnapshot = function (collection, id, version, callback) {
this.backend.fetchSnapshot(this, collection, id, version, callback);
};

Agent.prototype._fetchSnapshotByTimestamp = function (collection, id, timestamp, callback) {
this.backend.fetchSnapshotByTimestamp(this, collection, id, timestamp, callback);
};
139 changes: 109 additions & 30 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ Backend.prototype.SNAPSHOT_TYPES = {
// The current snapshot is being fetched (eg through backend.fetch)
current: 'current',
// A specific snapshot is being fetched by version (eg through backend.fetchSnapshot)
byVersion: 'byVersion'
byVersion: 'byVersion',
// A specific snapshot is being fetch by timestamp (eg through backend.fetchSnapshotByTimestamp)
byTimestamp: 'byTimestamp'
};

Backend.prototype._shimDocAction = function() {
Expand Down Expand Up @@ -627,6 +629,8 @@ Backend.prototype.fetchSnapshot = function(agent, index, id, version, callback)

Backend.prototype._fetchSnapshot = function (collection, id, version, callback) {
var db = this.db;
var backend = this;

this.milestoneDb.getMilestoneSnapshot(collection, id, version, function (error, milestoneSnapshot) {
if (error) return callback(error);

Expand All @@ -637,45 +641,120 @@ Backend.prototype._fetchSnapshot = function (collection, id, version, callback)
db.getOps(collection, id, from, version, null, function (error, ops) {
if (error) return callback(error);

var type = null;
var data;
var fetchedVersion = 0;

if (milestoneSnapshot) {
type = types.map[milestoneSnapshot.type];
if (!type) return callback({ code: 4008, message: 'Unknown type' });
data = milestoneSnapshot.data;
fetchedVersion = milestoneSnapshot.v;
}
backend._buildSnapshotFromOps(id, milestoneSnapshot, ops, function (error, snapshot) {
if (error) return callback(error);

for (var index = 0; index < ops.length; index++) {
var op = ops[index];
fetchedVersion = op.v + 1;

if (op.create) {
type = types.map[op.create.type];
if (!type) return callback({ code: 4008, message: 'Unknown type' });
data = type.create(op.create.data);
} else if (op.del) {
data = undefined;
type = null;
} else {
data = type.apply(data, op.op);
if (version > snapshot.v) {
return callback({ code: 4024, message: 'Requested version exceeds latest snapshot version' });
}
}

type = type ? type.uri : null;
callback(null, snapshot);
});
});
});
};

if (version > fetchedVersion) {
return callback({ code: 4024, message: 'Requested version exceeds latest snapshot version' });
}
Backend.prototype.fetchSnapshotByTimestamp = function (agent, index, id, timestamp, callback) {
var start = Date.now();
var backend = this;
var projection = this.projections[index];
var collection = projection ? projection.target : index;
var request = {
agent: agent,
index: index,
collection: collection,
id: id,
timestamp: timestamp
};

var snapshot = new Snapshot(id, fetchedVersion, type, data, null);
this._fetchSnapshotByTimestamp(collection, id, timestamp, function (error, snapshot) {
if (error) return callback(error);
var snapshotProjection = backend._getSnapshotProjection(backend.db, projection);
var snapshots = [snapshot];
var snapshotType = backend.SNAPSHOT_TYPES.byTimestamp;
backend._sanitizeSnapshots(agent, snapshotProjection, collection, snapshots, snapshotType, function (error) {
if (error) return callback(error);
backend.emit('timing', 'fetchSnapshot', Date.now() - start, request);
callback(null, snapshot);
});
});
};

Backend.prototype._fetchSnapshotByTimestamp = function (collection, id, timestamp, callback) {
var db = this.db;
var milestoneDb = this.milestoneDb;
var backend = this;

var milestoneSnapshot;
var from = 0;
var to = null;

milestoneDb.getMilestoneSnapshotAtOrBeforeTime(collection, id, timestamp, function (error, snapshot) {
if (error) return callback(error);
milestoneSnapshot = snapshot;
if (snapshot) from = snapshot.v;

milestoneDb.getMilestoneSnapshotAtOrAfterTime(collection, id, timestamp, function (error, snapshot) {
if (error) return callback(error);
if (snapshot) to = snapshot.v;

var options = {metadata: true};
db.getOps(collection, id, from, to, options, function (error, ops) {
if (error) return callback(error);
backend._buildSnapshotFromOps(id, milestoneSnapshot, ops, callback, function shouldBreak(nextOp) {
var opTimestamp = nextOp && nextOp.m && nextOp.m.ts;
return timestamp !== null && opTimestamp > timestamp;
});
});
});
});
};

Backend.prototype._buildSnapshotFromOps = function (id, startingSnapshot, ops, callback, shouldBreak) {
if (typeof shouldBreak !== 'function') {
shouldBreak = function () {
return false;
};
}

var type = null;
var data;
var fetchedVersion = 0;

if (startingSnapshot) {
type = types.map[startingSnapshot.type];
if (!type) return callback({ code: 4008, message: 'Unknown type' });
data = startingSnapshot.data;
fetchedVersion = startingSnapshot.v;
}

for (var index = 0; index < ops.length; index++) {
var op = ops[index];

if (shouldBreak(op)) {
break;
}

fetchedVersion = op.v + 1;

if (op.create) {
type = types.map[op.create.type];
if (!type) return callback({ code: 4008, message: 'Unknown type' });
data = type.create(op.create.data);
} else if (op.del) {
data = undefined;
type = null;
} else {
data = type.apply(data, op.op);
}
}

type = type ? type.uri : null;

var snapshot = new Snapshot(id, fetchedVersion, type, data, null);
callback(null, snapshot);
};

function pluckIds(snapshots) {
var ids = [];
for (var i = 0; i < snapshots.length; i++) {
Expand Down
34 changes: 32 additions & 2 deletions lib/client/connection.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
var Doc = require('./doc');
var Query = require('./query');
var SnapshotRequest = require('./snapshot-request');
var SnapshotVersionRequest = require('./snapshot-request/snapshot-version-request');
var SnapshotTimestampRequest = require('./snapshot-request/snapshot-timestamp-request');
var emitter = require('../emitter');
var ShareDBError = require('../error');
var types = require('../types');
Expand Down Expand Up @@ -233,6 +234,7 @@ Connection.prototype.handleMessage = function(message) {
return this._handleBulkMessage(message, '_handleUnsubscribe');

case 'nf':
case 'nt':
return this._handleSnapshotFetch(err, message);

case 'f':
Expand Down Expand Up @@ -634,7 +636,35 @@ Connection.prototype.fetchSnapshot = function(collection, id, version, callback)
}

var requestId = this.nextSnapshotRequestId++;
var snapshotRequest = new SnapshotRequest(this, requestId, collection, id, version, callback);
var snapshotRequest = new SnapshotVersionRequest(this, requestId, collection, id, version, callback);
this._snapshotRequests[snapshotRequest.requestId] = snapshotRequest;
snapshotRequest.send();
};

/**
* Fetch a read-only snapshot at a given version
*
* @param collection - the collection name of the snapshot
* @param id - the ID of the snapshot
* @param timestamp (optional) - the timestamp to fetch
* @param callback - (error, snapshot) => void, where snapshot takes the following schema:
*
* {
* id: string; // ID of the snapshot
* v: number; // version number of the snapshot
* type: string; // the OT type of the snapshot, or null if it doesn't exist or is deleted
* data: any; // the snapshot
* }
*
*/
Connection.prototype.fetchSnapshotByTimestamp = function (collection, id, timestamp, callback) {
if (typeof timestamp === 'function') {
callback = timestamp;
timestamp = null;
}

var requestId = this.nextSnapshotRequestId++;
var snapshotRequest = new SnapshotTimestampRequest(this, requestId, collection, id, timestamp, callback);
this._snapshotRequests[snapshotRequest.requestId] = snapshotRequest;
snapshotRequest.send();
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
var Snapshot = require('../snapshot');
var util = require('../util');
var emitter = require('../emitter');
var Snapshot = require('../../snapshot');
var emitter = require('../../emitter');

module.exports = SnapshotRequest;

function SnapshotRequest(connection, requestId, collection, id, version, callback) {
function SnapshotRequest(connection, requestId, collection, id, callback) {
emitter.EventEmitter.call(this);

if (typeof callback !== 'function') {
throw new Error('Callback is required for SnapshotRequest');
}

if (!util.isValidVersion(version)) {
throw new Error('Snapshot version must be a positive integer or null');
}

this.requestId = requestId;
this.connection = connection;
this.id = id;
this.collection = collection;
this.version = version;
this.callback = callback;

this.sent = false;
Expand All @@ -31,15 +25,7 @@ SnapshotRequest.prototype.send = function () {
return;
}

var message = {
a: 'nf',
id: this.requestId,
c: this.collection,
d: this.id,
v: this.version,
};

this.connection.send(message);
this.connection.send(this._message());
this.sent = true;
};

Expand All @@ -61,6 +47,8 @@ SnapshotRequest.prototype._handleResponse = function (error, message) {
return this.callback(error);
}

var snapshot = new Snapshot(this.id, message.v, message.type, message.data, null);
var metadata = message.meta ? message.meta : null;
var snapshot = new Snapshot(this.id, message.v, message.type, message.data, metadata);

this.callback(null, snapshot);
};
26 changes: 26 additions & 0 deletions lib/client/snapshot-request/snapshot-timestamp-request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
var SnapshotRequest = require('./snapshot-request');
var util = require('../../util');

module.exports = SnapshotTimestampRequest;

function SnapshotTimestampRequest(connection, requestId, collection, id, timestamp, callback) {
SnapshotRequest.call(this, connection, requestId, collection, id, callback);

if (!util.isValidTimestamp(timestamp)) {
throw new Error('Snapshot timestamp must be a positive integer or null');
}

this.timestamp = timestamp;
}

SnapshotTimestampRequest.prototype = Object.create(SnapshotRequest.prototype);

SnapshotTimestampRequest.prototype._message = function () {
return {
a: 'nt',
id: this.requestId,
c: this.collection,
d: this.id,
ts: this.timestamp,
};
};
26 changes: 26 additions & 0 deletions lib/client/snapshot-request/snapshot-version-request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
var SnapshotRequest = require('./snapshot-request');
var util = require('../../util');

module.exports = SnapshotVersionRequest;

function SnapshotVersionRequest (connection, requestId, collection, id, version, callback) {
SnapshotRequest.call(this, connection, requestId, collection, id, callback);

if (!util.isValidVersion(version)) {
throw new Error('Snapshot version must be a positive integer or null');
}

this.version = version;
}

SnapshotVersionRequest.prototype = Object.create(SnapshotRequest.prototype);

SnapshotVersionRequest.prototype._message = function () {
return {
a: 'nf',
id: this.requestId,
c: this.collection,
d: this.id,
v: this.version,
};
};
Loading

0 comments on commit 513f6b6

Please sign in to comment.