-
Notifications
You must be signed in to change notification settings - Fork 453
/
Copy pathdoc.js
942 lines (832 loc) · 31.7 KB
/
doc.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
var emitter = require('../emitter');
var logger = require('../logger');
var ShareDBError = require('../error');
var types = require('../types');
/**
* A Doc is a client's view on a sharejs document.
*
* It is is uniquely identified by its `id` and `collection`. Documents
* should not be created directly. Create them with connection.get()
*
*
* Subscriptions
* -------------
*
* We can subscribe a document to stay in sync with the server.
* doc.subscribe(function(error) {
* doc.subscribed // = true
* })
* The server now sends us all changes concerning this document and these are
* applied to our data. If the subscription was successful the initial
* data and version sent by the server are loaded into the document.
*
* To stop listening to the changes we call `doc.unsubscribe()`.
*
* If we just want to load the data but not stay up-to-date, we call
* doc.fetch(function(error) {
* doc.data // sent by server
* })
*
*
* Events
* ------
*
* You can use doc.on(eventName, callback) to subscribe to the following events:
* - `before op (op, source)` Fired before a partial operation is applied to the data.
* It may be used to read the old data just before applying an operation
* - `op (op, source)` Fired after every partial operation with this operation as the
* first argument
* - `create (source)` The document was created. That means its type was
* set and it has some initial data.
* - `del (data, source)` Fired after the document is deleted, that is
* the data is null. It is passed the data before delteion as an
* arguments
* - `load ()` Fired when a new snapshot is ingested from a fetch, subscribe, or query
*/
module.exports = Doc;
function Doc(connection, collection, id) {
emitter.EventEmitter.call(this);
this.connection = connection;
this.collection = collection;
this.id = id;
this.version = null;
this.type = null;
this.data = undefined;
// Array of callbacks or nulls as placeholders
this.inflightFetch = [];
this.inflightSubscribe = [];
this.inflightUnsubscribe = [];
this.pendingFetch = [];
// Whether we think we are subscribed on the server. Synchronously set to
// false on calls to unsubscribe and disconnect. Should never be true when
// this.wantSubscribe is false
this.subscribed = false;
// Whether to re-establish the subscription on reconnect
this.wantSubscribe = false;
// The op that is currently roundtripping to the server, or null.
//
// When the connection reconnects, the inflight op is resubmitted.
//
// This has the same format as an entry in pendingOps
this.inflightOp = null;
// All ops that are waiting for the server to acknowledge this.inflightOp
// This used to just be a single operation, but creates & deletes can't be
// composed with regular operations.
//
// This is a list of {[create:{...}], [del:true], [op:...], callbacks:[...]}
this.pendingOps = [];
// The OT type of this document. An uncreated document has type `null`
this.type = null;
// The applyStack enables us to track any ops submitted while we are
// applying an op incrementally. This value is an array when we are
// performing an incremental apply and null otherwise. When it is an array,
// all submitted ops should be pushed onto it. The `_otApply` method will
// reset it back to null when all incremental apply loops are complete.
this.applyStack = null;
// Disable the default behavior of composing submitted ops. This is read at
// the time of op submit, so it may be toggled on before submitting a
// specifc op and toggled off afterward
this.preventCompose = false;
}
emitter.mixin(Doc);
Doc.prototype.destroy = function(callback) {
var doc = this;
doc.whenNothingPending(function() {
if (doc.wantSubscribe) {
doc.unsubscribe(function(err) {
if (err) {
if (callback) return callback(err);
return doc.emit('error', err);
}
doc.connection._destroyDoc(doc);
if (callback) callback();
});
} else {
doc.connection._destroyDoc(doc);
if (callback) callback();
}
});
};
// ****** Manipulating the document data, version and type.
// Set the document's type, and associated properties. Most of the logic in
// this function exists to update the document based on any added & removed API
// methods.
//
// @param newType OT type provided by the ottypes library or its name or uri
Doc.prototype._setType = function(newType) {
if (typeof newType === 'string') {
newType = types.map[newType];
}
if (newType) {
this.type = newType;
} else if (newType === null) {
this.type = newType;
// If we removed the type from the object, also remove its data
this.data = undefined;
} else {
var err = new ShareDBError(4008, 'Missing type ' + newType);
return this.emit('error', err);
}
};
// Ingest snapshot data. This data must include a version, snapshot and type.
// This is used both to ingest data that was exported with a webpage and data
// that was received from the server during a fetch.
//
// @param snapshot.v version
// @param snapshot.data
// @param snapshot.type
// @param callback
Doc.prototype.ingestSnapshot = function(snapshot, callback) {
if (!snapshot) return callback && callback();
if (typeof snapshot.v !== 'number') {
var err = new ShareDBError(5008, 'Missing version in ingested snapshot. ' + this.collection + '.' + this.id);
if (callback) return callback(err);
return this.emit('error', err);
}
// If the doc is already created or there are ops pending, we cannot use the
// ingested snapshot and need ops in order to update the document
if (this.type || this.hasWritePending()) {
// The version should only be null on a created document when it was
// created locally without fetching
if (this.version == null) {
if (this.hasWritePending()) {
// If we have pending ops and we get a snapshot for a locally created
// document, we have to wait for the pending ops to complete, because
// we don't know what version to fetch ops from. It is possible that
// the snapshot came from our local op, but it is also possible that
// the doc was created remotely (which would conflict and be an error)
return callback && this.once('no write pending', callback);
}
// Otherwise, we've encounted an error state
var err = new ShareDBError(5009, 'Cannot ingest snapshot in doc with null version. ' + this.collection + '.' + this.id);
if (callback) return callback(err);
return this.emit('error', err);
}
// If we got a snapshot for a version further along than the document is
// currently, issue a fetch to get the latest ops and catch us up
if (snapshot.v > this.version) return this.fetch(callback);
return callback && callback();
}
// Ignore the snapshot if we are already at a newer version. Under no
// circumstance should we ever set the current version backward
if (this.version > snapshot.v) return callback && callback();
this.version = snapshot.v;
var type = (snapshot.type === undefined) ? types.defaultType : snapshot.type;
this._setType(type);
this.data = (this.type && this.type.deserialize) ?
this.type.deserialize(snapshot.data) :
snapshot.data;
this.emit('load');
callback && callback();
};
Doc.prototype.whenNothingPending = function(callback) {
if (this.hasPending()) {
this.once('nothing pending', callback);
return;
}
callback();
};
Doc.prototype.hasPending = function() {
return !!(
this.inflightOp ||
this.pendingOps.length ||
this.inflightFetch.length ||
this.inflightSubscribe.length ||
this.inflightUnsubscribe.length ||
this.pendingFetch.length
);
};
Doc.prototype.hasWritePending = function() {
return !!(this.inflightOp || this.pendingOps.length);
};
Doc.prototype._emitNothingPending = function() {
if (this.hasWritePending()) return;
this.emit('no write pending');
if (this.hasPending()) return;
this.emit('nothing pending');
};
// **** Helpers for network messages
Doc.prototype._emitResponseError = function(err, callback) {
if (callback) {
callback(err);
this._emitNothingPending();
return;
}
this._emitNothingPending();
this.emit('error', err);
};
Doc.prototype._handleFetch = function(err, snapshot) {
var callback = this.inflightFetch.shift();
if (err) return this._emitResponseError(err, callback);
this.ingestSnapshot(snapshot, callback);
this._emitNothingPending();
};
Doc.prototype._handleSubscribe = function(err, snapshot) {
var callback = this.inflightSubscribe.shift();
if (err) return this._emitResponseError(err, callback);
// Indicate we are subscribed only if the client still wants to be. In the
// time since calling subscribe and receiving a response from the server,
// unsubscribe could have been called and we might already be unsubscribed
// but not have received the response. Also, because requests from the
// client are not serialized and may take different async time to process,
// it is possible that we could hear responses back in a different order
// from the order originally sent
if (this.wantSubscribe) this.subscribed = true;
this.ingestSnapshot(snapshot, callback);
this._emitNothingPending();
};
Doc.prototype._handleUnsubscribe = function(err) {
var callback = this.inflightUnsubscribe.shift();
if (err) return this._emitResponseError(err, callback);
if (callback) callback();
this._emitNothingPending();
};
Doc.prototype._handleOp = function(err, message) {
if (err) {
if (this.inflightOp) {
// The server has rejected submission of the current operation. If we get
// an error code 4002 "Op submit rejected", this was done intentionally
// and we should roll back but not return an error to the user.
if (err.code === 4002) err = null;
return this._rollback(err);
}
return this.emit('error', err);
}
if (this.inflightOp &&
message.src === this.inflightOp.src &&
message.seq === this.inflightOp.seq) {
// The op has already been applied locally. Just update the version
// and pending state appropriately
this._opAcknowledged(message);
return;
}
if (this.version == null || message.v > this.version) {
// This will happen in normal operation if we become subscribed to a
// new document via a query. It can also happen if we get an op for
// a future version beyond the version we are expecting next. This
// could happen if the server doesn't publish an op for whatever reason
// or because of a race condition. In any case, we can send a fetch
// command to catch back up.
//
// Fetch only sends a new fetch command if no fetches are inflight, which
// will act as a natural debouncing so we don't send multiple fetch
// requests for many ops received at once.
this.fetch();
return;
}
if (message.v < this.version) {
// We can safely ignore the old (duplicate) operation.
return;
}
if (this.inflightOp) {
var transformErr = transformX(this.inflightOp, message);
if (transformErr) return this._hardRollback(transformErr);
}
for (var i = 0; i < this.pendingOps.length; i++) {
var transformErr = transformX(this.pendingOps[i], message);
if (transformErr) return this._hardRollback(transformErr);
}
this.version++;
try {
this._otApply(message, false);
} catch (error) {
return this._hardRollback(error);
}
};
// Called whenever (you guessed it!) the connection state changes. This will
// happen when we get disconnected & reconnect.
Doc.prototype._onConnectionStateChanged = function() {
if (this.connection.canSend) {
this.flush();
this._resubscribe();
} else {
if (this.inflightOp) {
this.pendingOps.unshift(this.inflightOp);
this.inflightOp = null;
}
this.subscribed = false;
if (this.inflightFetch.length || this.inflightSubscribe.length) {
this.pendingFetch = this.pendingFetch.concat(this.inflightFetch, this.inflightSubscribe);
this.inflightFetch.length = 0;
this.inflightSubscribe.length = 0;
}
if (this.inflightUnsubscribe.length) {
var callbacks = this.inflightUnsubscribe;
this.inflightUnsubscribe = [];
callEach(callbacks);
}
}
};
Doc.prototype._resubscribe = function() {
var callbacks = this.pendingFetch;
this.pendingFetch = [];
if (this.wantSubscribe) {
if (callbacks.length) {
this.subscribe(function(err) {
callEach(callbacks, err);
});
return;
}
this.subscribe();
return;
}
if (callbacks.length) {
this.fetch(function(err) {
callEach(callbacks, err);
});
}
};
// Request the current document snapshot or ops that bring us up to date
Doc.prototype.fetch = function(callback) {
if (this.connection.canSend) {
var isDuplicate = this.connection.sendFetch(this);
pushActionCallback(this.inflightFetch, isDuplicate, callback);
return;
}
this.pendingFetch.push(callback);
};
// Fetch the initial document and keep receiving updates
Doc.prototype.subscribe = function(callback) {
this.wantSubscribe = true;
if (this.connection.canSend) {
var isDuplicate = this.connection.sendSubscribe(this);
pushActionCallback(this.inflightSubscribe, isDuplicate, callback);
return;
}
this.pendingFetch.push(callback);
};
// Unsubscribe. The data will stay around in local memory, but we'll stop
// receiving updates
Doc.prototype.unsubscribe = function(callback) {
this.wantSubscribe = false;
// The subscribed state should be conservative in indicating when we are
// subscribed on the server. We'll actually be unsubscribed some time
// between sending the message and hearing back, but we cannot know exactly
// when. Thus, immediately mark us as not subscribed
this.subscribed = false;
if (this.connection.canSend) {
var isDuplicate = this.connection.sendUnsubscribe(this);
pushActionCallback(this.inflightUnsubscribe, isDuplicate, callback);
return;
}
if (callback) process.nextTick(callback);
};
function pushActionCallback(inflight, isDuplicate, callback) {
if (isDuplicate) {
var lastCallback = inflight.pop();
inflight.push(function(err) {
lastCallback && lastCallback(err);
callback && callback(err);
});
} else {
inflight.push(callback);
}
}
// Operations //
// Send the next pending op to the server, if we can.
//
// Only one operation can be in-flight at a time. If an operation is already on
// its way, or we're not currently connected, this method does nothing.
Doc.prototype.flush = function() {
// Ignore if we can't send or we are already sending an op
if (!this.connection.canSend || this.inflightOp) return;
// Send first pending op unless paused
if (!this.paused && this.pendingOps.length) {
this._sendOp();
}
};
// Helper function to set op to contain a no-op.
function setNoOp(op) {
delete op.op;
delete op.create;
delete op.del;
}
// Transform server op data by a client op, and vice versa. Ops are edited in place.
function transformX(client, server) {
// Order of statements in this function matters. Be especially careful if
// refactoring this function
// A client delete op should dominate if both the server and the client
// delete the document. Thus, any ops following the client delete (such as a
// subsequent create) will be maintained, since the server op is transformed
// to a no-op
if (client.del) return setNoOp(server);
if (server.del) {
return new ShareDBError(4017, 'Document was deleted');
}
if (server.create) {
return new ShareDBError(4018, 'Document alredy created');
}
// Ignore no-op coming from server
if (!server.op) return;
// I believe that this should not occur, but check just in case
if (client.create) {
return new ShareDBError(4018, 'Document already created');
}
// They both edited the document. This is the normal case for this function -
// as in, most of the time we'll end up down here.
//
// You should be wondering why I'm using client.type instead of this.type.
// The reason is, if we get ops at an old version of the document, this.type
// might be undefined or a totally different type. By pinning the type to the
// op data, we make sure the right type has its transform function called.
if (client.type.transformX) {
var result = client.type.transformX(client.op, server.op);
client.op = result[0];
server.op = result[1];
} else {
var clientOp = client.type.transform(client.op, server.op, 'left');
var serverOp = client.type.transform(server.op, client.op, 'right');
client.op = clientOp;
server.op = serverOp;
}
};
/**
* Applies the operation to the snapshot
*
* If the operation is create or delete it emits `create` or `del`. Then the
* operation is applied to the snapshot and `op` and `after op` are emitted.
* If the type supports incremental updates and `this.incremental` is true we
* fire `op` after every small operation.
*
* This is the only function to fire the above mentioned events.
*
* @private
*/
Doc.prototype._otApply = function(op, source) {
if (op.op) {
if (!this.type) {
// Throw here, because all usage of _otApply should be wrapped with a try/catch
throw new ShareDBError(4015, 'Cannot apply op to uncreated document. ' + this.collection + '.' + this.id);
}
// Iteratively apply multi-component remote operations and rollback ops
// (source === false) for the default JSON0 OT type. It could use
// type.shatter(), but since this code is so specific to use cases for the
// JSON0 type and ShareDB explicitly bundles the default type, we might as
// well write it this way and save needing to iterate through the op
// components twice.
//
// Ideally, we would not need this extra complexity. However, it is
// helpful for implementing bindings that update DOM nodes and other
// stateful objects by translating op events directly into corresponding
// mutations. Such bindings are most easily written as responding to
// individual op components one at a time in order, and it is important
// that the snapshot only include updates from the particular op component
// at the time of emission. Eliminating this would require rethinking how
// such external bindings are implemented.
if (!source && this.type === types.defaultType && op.op.length > 1) {
if (!this.applyStack) this.applyStack = [];
var stackLength = this.applyStack.length;
for (var i = 0; i < op.op.length; i++) {
var component = op.op[i];
var componentOp = {op: [component]};
// Transform componentOp against any ops that have been submitted
// sychronously inside of an op event handler since we began apply of
// our operation
for (var j = stackLength; j < this.applyStack.length; j++) {
var transformErr = transformX(this.applyStack[j], componentOp);
if (transformErr) return this._hardRollback(transformErr);
}
// Apply the individual op component
this.emit('before op', componentOp.op, source);
this.data = this.type.apply(this.data, componentOp.op);
this.emit('op', componentOp.op, source);
}
// Pop whatever was submitted since we started applying this op
this._popApplyStack(stackLength);
return;
}
// The 'before op' event enables clients to pull any necessary data out of
// the snapshot before it gets changed
this.emit('before op', op.op, source);
// Apply the operation to the local data, mutating it in place
this.data = this.type.apply(this.data, op.op);
// Emit an 'op' event once the local data includes the changes from the
// op. For locally submitted ops, this will be synchronously with
// submission and before the server or other clients have received the op.
// For ops from other clients, this will be after the op has been
// committed to the database and published
this.emit('op', op.op, source);
return;
}
if (op.create) {
this._setType(op.create.type);
this.data = (this.type.deserialize) ?
(this.type.createDeserialized) ?
this.type.createDeserialized(op.create.data) :
this.type.deserialize(this.type.create(op.create.data)) :
this.type.create(op.create.data);
this.emit('create', source);
return;
}
if (op.del) {
var oldData = this.data;
this._setType(null);
this.emit('del', oldData, source);
return;
}
};
// ***** Sending operations
// Actually send op to the server.
Doc.prototype._sendOp = function() {
// Wait until we have a src id from the server
var src = this.connection.id;
if (!src) return;
// When there is no inflightOp, send the first item in pendingOps. If
// there is inflightOp, try sending it again
if (!this.inflightOp) {
// Send first pending op
this.inflightOp = this.pendingOps.shift();
}
var op = this.inflightOp;
if (!op) {
var err = new ShareDBError(5010, 'No op to send on call to _sendOp');
return this.emit('error', err);
}
// Track data for retrying ops
op.sentAt = Date.now();
op.retries = (op.retries == null) ? 0 : op.retries + 1;
// The src + seq number is a unique ID representing this operation. This tuple
// is used on the server to detect when ops have been sent multiple times and
// on the client to match acknowledgement of an op back to the inflightOp.
// Note that the src could be different from this.connection.id after a
// reconnect, since an op may still be pending after the reconnection and
// this.connection.id will change. In case an op is sent multiple times, we
// also need to be careful not to override the original seq value.
if (op.seq == null) op.seq = this.connection.seq++;
this.connection.sendOp(this, op);
// src isn't needed on the first try, since the server session will have the
// same id, but it must be set on the inflightOp in case it is sent again
// after a reconnect and the connection's id has changed by then
if (op.src == null) op.src = src;
};
// Queues the operation for submission to the server and applies it locally.
//
// Internal method called to do the actual work for submit(), create() and del().
// @private
//
// @param op
// @param [op.op]
// @param [op.del]
// @param [op.create]
// @param [callback] called when operation is submitted
Doc.prototype._submit = function(op, source, callback) {
// Locally submitted ops must always have a truthy source
if (!source) source = true;
// The op contains either op, create, delete, or none of the above (a no-op).
if (op.op) {
if (!this.type) {
var err = new ShareDBError(4015, 'Cannot submit op. Document has not been created. ' + this.collection + '.' + this.id);
if (callback) return callback(err);
return this.emit('error', err);
}
// Try to normalize the op. This removes trailing skip:0's and things like that.
if (this.type.normalize) op.op = this.type.normalize(op.op);
}
try {
this._pushOp(op, callback);
this._otApply(op, source);
} catch (error) {
return this._hardRollback(error);
}
// The call to flush is delayed so if submit() is called multiple times
// synchronously, all the ops are combined before being sent to the server.
var doc = this;
process.nextTick(function() {
doc.flush();
});
};
Doc.prototype._pushOp = function(op, callback) {
if (this.applyStack) {
// If we are in the process of incrementally applying an operation, don't
// compose the op and push it onto the applyStack so it can be transformed
// against other components from the op or ops being applied
this.applyStack.push(op);
} else {
// If the type supports composes, try to compose the operation onto the
// end of the last pending operation.
var composed = this._tryCompose(op);
if (composed) {
composed.callbacks.push(callback);
return;
}
}
// Push on to the pendingOps queue of ops to submit if we didn't compose
op.type = this.type;
op.callbacks = [callback];
this.pendingOps.push(op);
};
Doc.prototype._popApplyStack = function(to) {
if (to > 0) {
this.applyStack.length = to;
return;
}
// Once we have completed the outermost apply loop, reset to null and no
// longer add ops to the applyStack as they are submitted
var op = this.applyStack[0];
this.applyStack = null;
if (!op) return;
// Compose the ops added since the beginning of the apply stack, since we
// had to skip compose when they were originally pushed
var i = this.pendingOps.indexOf(op);
if (i === -1) return;
var ops = this.pendingOps.splice(i);
for (var i = 0; i < ops.length; i++) {
var op = ops[i];
var composed = this._tryCompose(op);
if (composed) {
composed.callbacks = composed.callbacks.concat(op.callbacks);
} else {
this.pendingOps.push(op);
}
}
};
// Try to compose a submitted op into the last pending op. Returns the
// composed op if it succeeds, undefined otherwise
Doc.prototype._tryCompose = function(op) {
if (this.preventCompose) return;
// We can only compose into the last pending op. Inflight ops have already
// been sent to the server, so we can't modify them
var last = this.pendingOps[this.pendingOps.length - 1];
if (!last) return;
// Compose an op into a create by applying it. This effectively makes the op
// invisible, as if the document were created including the op originally
if (last.create && op.op) {
last.create.data = this.type.apply(last.create.data, op.op);
return last;
}
// Compose two ops into a single op if supported by the type. Types that
// support compose must be able to compose any two ops together
if (last.op && op.op && this.type.compose) {
last.op = this.type.compose(last.op, op.op);
return last;
}
};
// *** Client OT entrypoints.
// Submit an operation to the document.
//
// @param operation handled by the OT type
// @param options {source: ...}
// @param [callback] called after operation submitted
//
// @fires before op, op, after op
Doc.prototype.submitOp = function(component, options, callback) {
if (typeof options === 'function') {
callback = options;
options = null;
}
var op = {op: component};
var source = options && options.source;
this._submit(op, source, callback);
};
// Create the document, which in ShareJS semantics means to set its type. Every
// object implicitly exists in the database but has no data and no type. Create
// sets the type of the object and can optionally set some initial data on the
// object, depending on the type.
//
// @param data initial
// @param type OT type
// @param options {source: ...}
// @param callback called when operation submitted
Doc.prototype.create = function(data, type, options, callback) {
if (typeof type === 'function') {
callback = type;
options = null;
type = null;
} else if (typeof options === 'function') {
callback = options;
options = null;
}
if (!type) {
type = types.defaultType.uri;
}
if (this.type) {
var err = new ShareDBError(4016, 'Document already exists');
if (callback) return callback(err);
return this.emit('error', err);
}
var op = {create: {type: type, data: data}};
var source = options && options.source;
this._submit(op, source, callback);
};
// Delete the document. This creates and submits a delete operation to the
// server. Deleting resets the object's type to null and deletes its data. The
// document still exists, and still has the version it used to have before you
// deleted it (well, old version +1).
//
// @param options {source: ...}
// @param callback called when operation submitted
Doc.prototype.del = function(options, callback) {
if (typeof options === 'function') {
callback = options;
options = null;
}
if (!this.type) {
var err = new ShareDBError(4015, 'Document does not exist');
if (callback) return callback(err);
return this.emit('error', err);
}
var op = {del: true};
var source = options && options.source;
this._submit(op, source, callback);
};
// Stops the document from sending any operations to the server.
Doc.prototype.pause = function() {
this.paused = true;
};
// Continue sending operations to the server
Doc.prototype.resume = function() {
this.paused = false;
this.flush();
};
// *** Receiving operations
// This is called when the server acknowledges an operation from the client.
Doc.prototype._opAcknowledged = function(message) {
if (this.inflightOp.create) {
this.version = message.v;
} else if (message.v !== this.version) {
// We should already be at the same version, because the server should
// have sent all the ops that have happened before acknowledging our op
logger.warn('Invalid version from server. Expected: ' + this.version + ' Received: ' + message.v, message);
// Fetching should get us back to a working document state
return this.fetch();
}
// The op was committed successfully. Increment the version number
this.version++;
this._clearInflightOp();
};
Doc.prototype._rollback = function(err) {
// The server has rejected submission of the current operation. Invert by
// just the inflight op if possible. If not possible to invert, cancel all
// pending ops and fetch the latest from the server to get us back into a
// working state, then call back
var op = this.inflightOp;
if (op.op && op.type.invert) {
op.op = op.type.invert(op.op);
// Transform the undo operation by any pending ops.
for (var i = 0; i < this.pendingOps.length; i++) {
var transformErr = transformX(this.pendingOps[i], op);
if (transformErr) return this._hardRollback(transformErr);
}
// ... and apply it locally, reverting the changes.
//
// This operation is applied to look like it comes from a remote source.
// I'm still not 100% sure about this functionality, because its really a
// local op. Basically, the problem is that if the client's op is rejected
// by the server, the editor window should update to reflect the undo.
try {
this._otApply(op, false);
} catch (error) {
return this._hardRollback(error);
}
this._clearInflightOp(err);
return;
}
this._hardRollback(err);
};
Doc.prototype._hardRollback = function(err) {
// Store pending ops so that we can notify their callbacks of the error.
// We combine the inflight op and the pending ops, because it's possible
// to hit a condition where we have no inflight op, but we do have pending
// ops. This can happen when an invalid op is submitted, which causes us
// to hard rollback before the pending op was flushed.
var pendingOps = [];
if (this.inflightOp) pendingOps.push(this.inflightOp);
pendingOps = pendingOps.concat(this.pendingOps);
// Cancel all pending ops and reset if we can't invert
this._setType(null);
this.version = null;
this.inflightOp = null;
this.pendingOps = [];
// Fetch the latest version from the server to get us back into a working state
var doc = this;
this.fetch(function() {
// We want to check that no errors are swallowed, so we check that:
// - there are callbacks to call, and
// - that every single pending op called a callback
// If there are no ops queued, or one of them didn't handle the error,
// then we emit the error.
var allOpsHadCallbacks = !!pendingOps.length;
for (var i = 0; i < pendingOps.length; i++) {
allOpsHadCallbacks = callEach(pendingOps[i].callbacks, err) && allOpsHadCallbacks;
}
if (err && !allOpsHadCallbacks) return doc.emit('error', err);
});
};
Doc.prototype._clearInflightOp = function(err) {
var called = callEach(this.inflightOp.callbacks, err);
this.inflightOp = null;
this.flush();
this._emitNothingPending();
if (err && !called) return this.emit('error', err);
};
function callEach(callbacks, err) {
var called = false;
for (var i = 0; i < callbacks.length; i++) {
var callback = callbacks[i];
if (callback) {
callback(err);
called = true;
}
}
return called;
}