Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch snapshot by time #262

Merged
merged 3 commits into from
Jan 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,27 @@ Get a read-only snapshot of a document at the requested version.
}
```

`connection.fetchSnapshotByTimestamp(collection, id, timestamp, callback): void;`
Get a read-only snapshot of a document at the requested version.

* `collection` _(String)_
Collection name of the snapshot
* `id` _(String)_
ID of the snapshot
* `timestamp` _(number) [optional]_
The timestamp of the desired snapshot. The returned snapshot will be the latest snapshot before the provided timestamp
* `callback` _(Function)_
Called with `(error, snapshot)`, where `snapshot` takes the following form:

```javascript
{
id: string; // ID of the snapshot
v: number; // version number of the snapshot
type: string; // the OT type of the snapshot, or null if it doesn't exist or is deleted
data: any; // the snapshot
}
```

### Class: `ShareDB.Doc`

`doc.type` _(String_)
Expand Down Expand Up @@ -464,3 +485,5 @@ The `41xx` and `51xx` codes are reserved for use by ShareDB DB adapters, and the
* 5018 - Required QueryEmitter listener not assigned
* 5019 - getMilestoneSnapshot MilestoneDB method unimplemented
* 5020 - saveMilestoneSnapshot MilestoneDB method unimplemented
* 5021 - getMilestoneSnapshotAtOrBeforeTime MilestoneDB method unimplemented
* 5022 - getMilestoneSnapshotAtOrAfterTime MilestoneDB method unimplemented
6 changes: 6 additions & 0 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ Agent.prototype._handleMessage = function(request, callback) {
return this._submit(request.c, request.d, op, callback);
case 'nf':
return this._fetchSnapshot(request.c, request.d, request.v, callback);
case 'nt':
return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback);
default:
callback({code: 4000, message: 'Invalid or unknown message'});
}
Expand Down Expand Up @@ -589,3 +591,7 @@ Agent.prototype._createOp = function(request) {
Agent.prototype._fetchSnapshot = function (collection, id, version, callback) {
this.backend.fetchSnapshot(this, collection, id, version, callback);
};

Agent.prototype._fetchSnapshotByTimestamp = function (collection, id, timestamp, callback) {
this.backend.fetchSnapshotByTimestamp(this, collection, id, timestamp, callback);
};
113 changes: 83 additions & 30 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ Backend.prototype.SNAPSHOT_TYPES = {
// The current snapshot is being fetched (eg through backend.fetch)
current: 'current',
// A specific snapshot is being fetched by version (eg through backend.fetchSnapshot)
byVersion: 'byVersion'
byVersion: 'byVersion',
// A specific snapshot is being fetch by timestamp (eg through backend.fetchSnapshotByTimestamp)
byTimestamp: 'byTimestamp'
};

Backend.prototype._shimDocAction = function() {
Expand Down Expand Up @@ -627,6 +629,8 @@ Backend.prototype.fetchSnapshot = function(agent, index, id, version, callback)

Backend.prototype._fetchSnapshot = function (collection, id, version, callback) {
var db = this.db;
var backend = this;

this.milestoneDb.getMilestoneSnapshot(collection, id, version, function (error, milestoneSnapshot) {
if (error) return callback(error);

Expand All @@ -637,49 +641,98 @@ Backend.prototype._fetchSnapshot = function (collection, id, version, callback)
db.getOps(collection, id, from, version, null, function (error, ops) {
if (error) return callback(error);

var type = null;
var data;
var fetchedVersion = 0;

if (milestoneSnapshot) {
type = types.map[milestoneSnapshot.type];
if (!type) return callback({ code: 4008, message: 'Unknown type' });
data = milestoneSnapshot.data;
fetchedVersion = milestoneSnapshot.v;
}
backend._buildSnapshotFromOps(id, milestoneSnapshot, ops, function (error, snapshot) {
if (error) return callback(error);

for (var index = 0; index < ops.length; index++) {
var op = ops[index];
fetchedVersion = op.v + 1;

if (op.create) {
type = types.map[op.create.type];
if (!type) return callback({ code: 4008, message: 'Unknown type' });
data = type.create(op.create.data);
} else if (op.del) {
data = undefined;
type = null;
} else {
data = type.apply(data, op.op);
if (version > snapshot.v) {
return callback({ code: 4024, message: 'Requested version exceeds latest snapshot version' });
}
}

type = type ? type.uri : null;
callback(null, snapshot);
});
});
});
};

if (version > fetchedVersion) {
return callback({ code: 4024, message: 'Requested version exceeds latest snapshot version' });
}
Backend.prototype.fetchSnapshotByTimestamp = function (agent, index, id, timestamp, callback) {
var start = Date.now();
var backend = this;
var projection = this.projections[index];
var collection = projection ? projection.target : index;
var request = {
agent: agent,
index: index,
collection: collection,
id: id,
timestamp: timestamp
};

var snapshot = new Snapshot(id, fetchedVersion, type, data, null);
this._fetchSnapshotByTimestamp(collection, id, timestamp, function (error, snapshot) {
if (error) return callback(error);
var snapshotProjection = backend._getSnapshotProjection(backend.db, projection);
var snapshots = [snapshot];
var snapshotType = backend.SNAPSHOT_TYPES.byTimestamp;
backend._sanitizeSnapshots(agent, snapshotProjection, collection, snapshots, snapshotType, function (error) {
if (error) return callback(error);
backend.emit('timing', 'fetchSnapshot', Date.now() - start, request);
callback(null, snapshot);
});
});
};

Backend.prototype._fetchSnapshotByTimestamp = function (collection, id, timestamp, callback) {
var db = this.db;
var milestoneDb = this.milestoneDb;
var backend = this;

var milestoneSnapshot;
var from = 0;
var to = null;

milestoneDb.getMilestoneSnapshotAtOrBeforeTime(collection, id, timestamp, function (error, snapshot) {
if (error) return callback(error);
milestoneSnapshot = snapshot;
if (snapshot) from = snapshot.v;

milestoneDb.getMilestoneSnapshotAtOrAfterTime(collection, id, timestamp, function (error, snapshot) {
if (error) return callback(error);
if (snapshot) to = snapshot.v;

var options = {metadata: true};
db.getOps(collection, id, from, to, options, function (error, ops) {
if (error) return callback(error);
filterOpsInPlaceBeforeTimestamp(ops, timestamp);
backend._buildSnapshotFromOps(id, milestoneSnapshot, ops, callback);
});
});
});
};

Backend.prototype._buildSnapshotFromOps = function (id, startingSnapshot, ops, callback) {
var snapshot = startingSnapshot || new Snapshot(id, 0, null, undefined, null);
var error = ot.applyOps(snapshot, ops);
callback(error, snapshot);
};

function pluckIds(snapshots) {
var ids = [];
for (var i = 0; i < snapshots.length; i++) {
ids.push(snapshots[i].id);
}
return ids;
}

function filterOpsInPlaceBeforeTimestamp(ops, timestamp) {
if (timestamp === null) {
return;
}

for (var i = 0; i < ops.length; i++) {
var op = ops[i];
var opTimestamp = op.m && op.m.ts;
if (opTimestamp > timestamp) {
ops.length = i;
return;
}
}
}
34 changes: 32 additions & 2 deletions lib/client/connection.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
var Doc = require('./doc');
var Query = require('./query');
var SnapshotRequest = require('./snapshot-request');
var SnapshotVersionRequest = require('./snapshot-request/snapshot-version-request');
var SnapshotTimestampRequest = require('./snapshot-request/snapshot-timestamp-request');
var emitter = require('../emitter');
var ShareDBError = require('../error');
var types = require('../types');
Expand Down Expand Up @@ -233,6 +234,7 @@ Connection.prototype.handleMessage = function(message) {
return this._handleBulkMessage(message, '_handleUnsubscribe');

case 'nf':
case 'nt':
return this._handleSnapshotFetch(err, message);

case 'f':
Expand Down Expand Up @@ -634,7 +636,35 @@ Connection.prototype.fetchSnapshot = function(collection, id, version, callback)
}

var requestId = this.nextSnapshotRequestId++;
var snapshotRequest = new SnapshotRequest(this, requestId, collection, id, version, callback);
var snapshotRequest = new SnapshotVersionRequest(this, requestId, collection, id, version, callback);
this._snapshotRequests[snapshotRequest.requestId] = snapshotRequest;
snapshotRequest.send();
};

/**
* Fetch a read-only snapshot at a given timestamp
*
* @param collection - the collection name of the snapshot
* @param id - the ID of the snapshot
* @param timestamp (optional) - the timestamp to fetch
* @param callback - (error, snapshot) => void, where snapshot takes the following schema:
*
* {
* id: string; // ID of the snapshot
* v: number; // version number of the snapshot
* type: string; // the OT type of the snapshot, or null if it doesn't exist or is deleted
* data: any; // the snapshot
* }
*
*/
Connection.prototype.fetchSnapshotByTimestamp = function (collection, id, timestamp, callback) {
if (typeof timestamp === 'function') {
callback = timestamp;
timestamp = null;
}

var requestId = this.nextSnapshotRequestId++;
var snapshotRequest = new SnapshotTimestampRequest(this, requestId, collection, id, timestamp, callback);
this._snapshotRequests[snapshotRequest.requestId] = snapshotRequest;
snapshotRequest.send();
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
var Snapshot = require('../snapshot');
var util = require('../util');
var emitter = require('../emitter');
var Snapshot = require('../../snapshot');
var emitter = require('../../emitter');

module.exports = SnapshotRequest;

function SnapshotRequest(connection, requestId, collection, id, version, callback) {
function SnapshotRequest(connection, requestId, collection, id, callback) {
emitter.EventEmitter.call(this);

if (typeof callback !== 'function') {
throw new Error('Callback is required for SnapshotRequest');
}

if (!util.isValidVersion(version)) {
throw new Error('Snapshot version must be a positive integer or null');
}

this.requestId = requestId;
this.connection = connection;
this.id = id;
this.collection = collection;
this.version = version;
this.callback = callback;

this.sent = false;
Expand All @@ -31,15 +25,7 @@ SnapshotRequest.prototype.send = function () {
return;
}

var message = {
a: 'nf',
id: this.requestId,
c: this.collection,
d: this.id,
v: this.version,
};

this.connection.send(message);
this.connection.send(this._message());
this.sent = true;
};

Expand All @@ -61,6 +47,8 @@ SnapshotRequest.prototype._handleResponse = function (error, message) {
return this.callback(error);
}

var snapshot = new Snapshot(this.id, message.v, message.type, message.data, null);
var metadata = message.meta ? message.meta : null;
var snapshot = new Snapshot(this.id, message.v, message.type, message.data, metadata);

this.callback(null, snapshot);
};
26 changes: 26 additions & 0 deletions lib/client/snapshot-request/snapshot-timestamp-request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
var SnapshotRequest = require('./snapshot-request');
var util = require('../../util');

module.exports = SnapshotTimestampRequest;

function SnapshotTimestampRequest(connection, requestId, collection, id, timestamp, callback) {
SnapshotRequest.call(this, connection, requestId, collection, id, callback);

if (!util.isValidTimestamp(timestamp)) {
throw new Error('Snapshot timestamp must be a positive integer or null');
}

this.timestamp = timestamp;
}

SnapshotTimestampRequest.prototype = Object.create(SnapshotRequest.prototype);

SnapshotTimestampRequest.prototype._message = function () {
return {
a: 'nt',
id: this.requestId,
c: this.collection,
d: this.id,
ts: this.timestamp,
};
};
26 changes: 26 additions & 0 deletions lib/client/snapshot-request/snapshot-version-request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
var SnapshotRequest = require('./snapshot-request');
var util = require('../../util');

module.exports = SnapshotVersionRequest;

function SnapshotVersionRequest (connection, requestId, collection, id, version, callback) {
SnapshotRequest.call(this, connection, requestId, collection, id, callback);

if (!util.isValidVersion(version)) {
throw new Error('Snapshot version must be a positive integer or null');
}

this.version = version;
}

SnapshotVersionRequest.prototype = Object.create(SnapshotRequest.prototype);

SnapshotVersionRequest.prototype._message = function () {
return {
a: 'nf',
id: this.requestId,
c: this.collection,
d: this.id,
v: this.version,
};
};
Loading