diff --git a/cypress/component/helpers/yjs.cy.js b/cypress/component/helpers/yjs.cy.js index 427bf671d06..f61c29c2b27 100644 --- a/cypress/component/helpers/yjs.cy.js +++ b/cypress/component/helpers/yjs.cy.js @@ -4,10 +4,10 @@ */ import * as Y from 'yjs' -import { getDocumentState, getUpdateMessage, applyUpdateMessage } from '../../../src/helpers/yjs.js' +import { getDocumentState, documentStateToStep, applyStep } from '../../../src/helpers/yjs.js' describe('Yjs base64 wrapped with our helpers', function() { - it('applies step in wrong order', function() { + it('applies step generated from document state', function() { const source = new Y.Doc() const target = new Y.Doc() const sourceMap = source.getMap() @@ -17,44 +17,26 @@ describe('Yjs base64 wrapped with our helpers', function() { // console.log('afterTransaction', tr) }) - const state0 = getDocumentState(source) - // Add keyA to source and apply to target sourceMap.set('keyA', 'valueA') const stateA = getDocumentState(source) - const update0A = getUpdateMessage(source, state0) - applyUpdateMessage(target, update0A) + const step0A = documentStateToStep(stateA) + applyStep(target, step0A) expect(targetMap.get('keyA')).to.be.eq('valueA') // Add keyB to source, don't apply to target yet sourceMap.set('keyB', 'valueB') const stateB = getDocumentState(source) - const updateAB = getUpdateMessage(source, stateA) + const step0B = documentStateToStep(stateB) // Add keyC to source, apply to target sourceMap.set('keyC', 'valueC') - const updateBC = getUpdateMessage(source, stateB) - applyUpdateMessage(target, updateBC) - expect(targetMap.get('keyB')).to.be.eq(undefined) - expect(targetMap.get('keyC')).to.be.eq(undefined) // Apply keyB to target - applyUpdateMessage(target, updateAB) + applyStep(target, step0B) expect(targetMap.get('keyB')).to.be.eq('valueB') - expect(targetMap.get('keyC')).to.be.eq('valueC') - }) - - it('update message is empty if no additional state exists', function() { - const source = new Y.Doc() - const sourceMap = source.getMap() - const state0 = getDocumentState(source) - sourceMap.set('keyA', 'valueA') - const stateA = getDocumentState(source) - const update0A = getUpdateMessage(source, state0) - const updateAA = getUpdateMessage(source, stateA) - expect(update0A.length).to.be.eq(29) - expect(updateAA).to.be.eq(undefined) + expect(targetMap.get('keyC')).to.be.eq(undefined) }) }) diff --git a/cypress/e2e/api/SyncServiceProvider.spec.js b/cypress/e2e/api/SyncServiceProvider.spec.js index 6dcc5ba2b46..614e0983cc4 100644 --- a/cypress/e2e/api/SyncServiceProvider.spec.js +++ b/cypress/e2e/api/SyncServiceProvider.spec.js @@ -4,6 +4,7 @@ */ import { randUser } from '../../utils/index.js' +import SessionApi from '../../../src/services/SessionApi.js' import { SyncService } from '../../../src/services/SyncService.js' import createSyncServiceProvider from '../../../src/services/SyncServiceProvider.js' import { Doc } from 'yjs' @@ -39,9 +40,11 @@ describe('Sync service provider', function() { */ function createProvider(ydoc) { const queue = [] + const api = new SessionApi() const syncService = new SyncService({ serialize: () => 'Serialized', getDocumentState: () => null, + api, }) syncService.on('opened', () => syncService.startSync()) return createSyncServiceProvider({ diff --git a/lib/Service/DocumentService.php b/lib/Service/DocumentService.php index cd264c528b4..84fe079083b 100644 --- a/lib/Service/DocumentService.php +++ b/lib/Service/DocumentService.php @@ -208,7 +208,8 @@ public function addStep(Document $document, Session $session, array $steps, int $documentId = $session->getDocumentId(); $readOnly = $this->isReadOnly($this->getFileForSession($session, $shareToken), $shareToken); $stepsToInsert = []; - $querySteps = []; + $stepsIncludeQuery = false; + $documentState = null; $newVersion = $version; foreach ($steps as $step) { $message = YjsMessage::fromBase64($step); @@ -217,7 +218,7 @@ public function addStep(Document $document, Session $session, array $steps, int } // Filter out query steps as they would just trigger clients to send their steps again if ($message->getYjsMessageType() === YjsMessage::YJS_MESSAGE_SYNC && $message->getYjsSyncType() === YjsMessage::YJS_MESSAGE_SYNC_STEP1) { - $querySteps[] = $step; + $stepsIncludeQuery = true; } else { $stepsToInsert[] = $step; } @@ -228,8 +229,24 @@ public function addStep(Document $document, Session $session, array $steps, int } $newVersion = $this->insertSteps($document, $session, $stepsToInsert); } - // If there were any queries in the steps send the entire history - $getStepsSinceVersion = count($querySteps) > 0 ? 0 : $version; + + // By default send all steps the user has not received yet. + $getStepsSinceVersion = $version; + if ($stepsIncludeQuery) { + $this->logger->debug('Loading document state for ' . $documentId); + try { + $stateFile = $this->getStateFile($documentId); + $documentState = $stateFile->getContent(); + $this->logger->debug('Existing document, state file loaded ' . $documentId); + // If there were any queries in the steps send all steps since last save. + $getStepsSinceVersion = $document->getLastSavedVersion(); + } catch (NotFoundException $e) { + $this->logger->debug('Existing document, but no state file found for ' . $documentId); + // If there is no state file include all the steps. + $getStepsSinceVersion = 0; + } + } + $allSteps = $this->getSteps($documentId, $getStepsSinceVersion); $stepsToReturn = []; foreach ($allSteps as $step) { @@ -238,9 +255,11 @@ public function addStep(Document $document, Session $session, array $steps, int $stepsToReturn[] = $step; } } + return [ 'steps' => $stepsToReturn, - 'version' => $newVersion + 'version' => $newVersion, + 'documentState' => $documentState ]; } diff --git a/package-lock.json b/package-lock.json index b28fa7fd632..e1f294b1115 100644 --- a/package-lock.json +++ b/package-lock.json @@ -85,7 +85,6 @@ "vuex": "^3.6.2", "y-prosemirror": "^1.2.12", "y-protocols": "^1.0.6", - "y-websocket": "^2.0.4", "yjs": "^13.6.20" }, "devDependencies": { @@ -6981,22 +6980,6 @@ "integrity": "sha512-nne9/IiQ/hzIhY6pdDnbBtz7DjPTKrY00P/zvPSm5pOFkl6xuGrGnXn/VtTNNfNtAfZ9/1RtehkszU9qcTii0Q==", "dev": true }, - "node_modules/abstract-leveldown": { - "version": "6.2.3", - "resolved": "https://registry.npmjs.org/abstract-leveldown/-/abstract-leveldown-6.2.3.tgz", - "integrity": "sha512-BsLm5vFMRUrrLeCcRc+G0t2qOaTzpoJQLOubq2XM72eNpjF5UdU5o/5NvlNhx95XHcAvcl8OMXr4mlg/fRgUXQ==", - "optional": true, - "dependencies": { - "buffer": "^5.5.0", - "immediate": "^3.2.3", - "level-concat-iterator": "~2.0.0", - "level-supports": "~1.0.0", - "xtend": "~4.0.0" - }, - "engines": { - "node": ">=6" - } - }, "node_modules/acorn": { "version": "7.4.1", "resolved": "https://registry.npmjs.org/acorn/-/acorn-7.4.1.tgz", @@ -7333,12 +7316,6 @@ "integrity": "sha512-iAB+JbDEGXhyIUavoDl9WP/Jj106Kz9DEn1DPgYw5ruDn0e3Wgi3sKFm55sASdGBNOQB8F59d9qQ7deqrHA8wQ==", "dev": true }, - "node_modules/async-limiter": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.1.tgz", - "integrity": "sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==", - "optional": true - }, "node_modules/asynckit": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", @@ -7625,7 +7602,7 @@ "version": "1.5.1", "resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz", "integrity": "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==", - "devOptional": true, + "dev": true, "funding": [ { "type": "github", @@ -7928,7 +7905,7 @@ "version": "5.7.1", "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz", "integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==", - "devOptional": true, + "dev": true, "funding": [ { "type": "github", @@ -9790,19 +9767,6 @@ "node": ">=0.8" } }, - "node_modules/deferred-leveldown": { - "version": "5.3.0", - "resolved": "https://registry.npmjs.org/deferred-leveldown/-/deferred-leveldown-5.3.0.tgz", - "integrity": "sha512-a59VOT+oDy7vtAbLRCZwWgxu2BaCfd5Hk7wxJd48ei7I+nsg8Orlb9CLG0PMZienk9BSUKgeAqkO2+Lw+1+Ukw==", - "optional": true, - "dependencies": { - "abstract-leveldown": "~6.2.1", - "inherits": "^2.0.3" - }, - "engines": { - "node": ">=6" - } - }, "node_modules/define-data-property": { "version": "1.1.4", "resolved": "https://registry.npmjs.org/define-data-property/-/define-data-property-1.1.4.tgz", @@ -10414,21 +10378,6 @@ "node": ">= 4" } }, - "node_modules/encoding-down": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/encoding-down/-/encoding-down-6.3.0.tgz", - "integrity": "sha512-QKrV0iKR6MZVJV08QY0wp1e7vF6QbhnbQhb07bwpEyuz4uZiZgPlEGdkCROuFkUwdxlFaiPIhjyarH1ee/3vhw==", - "optional": true, - "dependencies": { - "abstract-leveldown": "^6.2.1", - "inherits": "^2.0.3", - "level-codec": "^9.0.0", - "level-errors": "^2.0.0" - }, - "engines": { - "node": ">=6" - } - }, "node_modules/end-of-stream": { "version": "1.4.4", "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.4.tgz", @@ -10486,18 +10435,6 @@ "node": ">=6" } }, - "node_modules/errno": { - "version": "0.1.8", - "resolved": "https://registry.npmjs.org/errno/-/errno-0.1.8.tgz", - "integrity": "sha512-dJ6oBr5SQ1VSd9qkk7ByRgb/1SH4JZjCHSW/mr63/QcXO9zLVxvJ6Oy13nio03rxpSnVDDjFor75SjVeZWPW/A==", - "optional": true, - "dependencies": { - "prr": "~1.0.1" - }, - "bin": { - "errno": "cli.js" - } - }, "node_modules/error-ex": { "version": "1.3.2", "resolved": "https://registry.npmjs.org/error-ex/-/error-ex-1.3.2.tgz", @@ -13098,7 +13035,7 @@ "version": "1.2.1", "resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.2.1.tgz", "integrity": "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==", - "devOptional": true, + "dev": true, "funding": [ { "type": "github", @@ -13123,12 +13060,6 @@ "node": ">= 4" } }, - "node_modules/immediate": { - "version": "3.3.0", - "resolved": "https://registry.npmjs.org/immediate/-/immediate-3.3.0.tgz", - "integrity": "sha512-HR7EVodfFUdQCTIeySw+WDRFJlPcLOJbXfwwZ7Oom6tjsvZ3bOkCDJHehQC3nxJrv7+f9XecwazynjU8e4Vw3Q==", - "optional": true - }, "node_modules/immutable": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/immutable/-/immutable-4.1.0.tgz", @@ -13223,7 +13154,7 @@ "version": "2.0.4", "resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.4.tgz", "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", - "devOptional": true + "dev": true }, "node_modules/ini": { "version": "2.0.0", @@ -16452,7 +16383,9 @@ "node_modules/lodash.debounce": { "version": "4.0.8", "resolved": "https://registry.npmjs.org/lodash.debounce/-/lodash.debounce-4.0.8.tgz", - "integrity": "sha512-FT1yDzDYEoYWhnSGnpE/4Kj1fLZkDFyqRb7fNt6FdYOSxlUWAtp42Eh6Wb0rGIv/m9Bgo7x4GhQbm5Ys4SG5ow==" + "integrity": "sha512-FT1yDzDYEoYWhnSGnpE/4Kj1fLZkDFyqRb7fNt6FdYOSxlUWAtp42Eh6Wb0rGIv/m9Bgo7x4GhQbm5Ys4SG5ow==", + "dev": true, + "peer": true }, "node_modules/lodash.get": { "version": "4.4.2", @@ -16712,12 +16645,6 @@ "yallist": "^2.1.2" } }, - "node_modules/ltgt": { - "version": "2.2.1", - "resolved": "https://registry.npmjs.org/ltgt/-/ltgt-2.2.1.tgz", - "integrity": "sha512-AI2r85+4MquTw9ZYqabu4nMwy9Oftlfa/e/52t9IjtfG+mGBbTNdAoZ3RQKLHR6r0wQnwZnPIEh/Ya6XTWAKNA==", - "optional": true - }, "node_modules/lunr": { "version": "2.3.9", "resolved": "https://registry.npmjs.org/lunr/-/lunr-2.3.9.tgz", @@ -21386,12 +21313,6 @@ "node": "^10 || ^12 || ^13.7 || ^14 || >=15.0.1" } }, - "node_modules/napi-macros": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/napi-macros/-/napi-macros-2.0.0.tgz", - "integrity": "sha512-A0xLykHtARfueITVDernsAWdtIMbOJgKgcluwENp3AlsKN/PloyO10HtmoqnFAQAcxPkgZN7wdfPfEd0zNGxbg==", - "optional": true - }, "node_modules/natural-compare": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/natural-compare/-/natural-compare-1.4.0.tgz", @@ -22794,12 +22715,6 @@ "resolved": "https://registry.npmjs.org/proxy-polyfill/-/proxy-polyfill-0.3.2.tgz", "integrity": "sha512-ENKSXOMCewnQTOyqrQXxEjIhzT6dy572mtehiItbDoIUF5Sv5UkmRUc8kowg2MFvr232Uo8rwRpNg3V5kgTKbA==" }, - "node_modules/prr": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/prr/-/prr-1.0.1.tgz", - "integrity": "sha512-yPw4Sng1gWghHQWj0B3ZggWUm4qVbPwPFcRG8KyxiU7J2OHFSoEHKS+EZ3fv5l1t9CyCiop6l/ZYeWbrgoQejw==", - "optional": true - }, "node_modules/pseudomap": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/pseudomap/-/pseudomap-1.0.2.tgz", @@ -25030,7 +24945,7 @@ "version": "5.2.1", "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz", "integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==", - "devOptional": true, + "dev": true, "funding": [ { "type": "github", @@ -25648,7 +25563,7 @@ "version": "1.3.0", "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz", "integrity": "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==", - "devOptional": true, + "dev": true, "dependencies": { "safe-buffer": "~5.2.0" } @@ -27441,7 +27356,7 @@ "version": "1.0.2", "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==", - "devOptional": true + "dev": true }, "node_modules/uuid": { "version": "10.0.0", @@ -28913,28 +28828,11 @@ "version": "4.0.2", "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", "integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==", - "devOptional": true, + "dev": true, "engines": { "node": ">=0.4" } }, - "node_modules/y-leveldb": { - "version": "0.1.2", - "resolved": "https://registry.npmjs.org/y-leveldb/-/y-leveldb-0.1.2.tgz", - "integrity": "sha512-6ulEn5AXfXJYi89rXPEg2mMHAyyw8+ZfeMMdOtBbV8FJpQ1NOrcgi6DTAcXof0dap84NjHPT2+9d0rb6cFsjEg==", - "optional": true, - "dependencies": { - "level": "^6.0.1", - "lib0": "^0.2.31" - }, - "funding": { - "type": "GitHub Sponsors ❤", - "url": "https://github.com/sponsors/dmonad" - }, - "peerDependencies": { - "yjs": "^13.0.0" - } - }, "node_modules/y-prosemirror": { "version": "1.2.12", "resolved": "https://registry.npmjs.org/y-prosemirror/-/y-prosemirror-1.2.12.tgz", @@ -33863,19 +33761,6 @@ "integrity": "sha512-nne9/IiQ/hzIhY6pdDnbBtz7DjPTKrY00P/zvPSm5pOFkl6xuGrGnXn/VtTNNfNtAfZ9/1RtehkszU9qcTii0Q==", "dev": true }, - "abstract-leveldown": { - "version": "6.2.3", - "resolved": "https://registry.npmjs.org/abstract-leveldown/-/abstract-leveldown-6.2.3.tgz", - "integrity": "sha512-BsLm5vFMRUrrLeCcRc+G0t2qOaTzpoJQLOubq2XM72eNpjF5UdU5o/5NvlNhx95XHcAvcl8OMXr4mlg/fRgUXQ==", - "optional": true, - "requires": { - "buffer": "^5.5.0", - "immediate": "^3.2.3", - "level-concat-iterator": "~2.0.0", - "level-supports": "~1.0.0", - "xtend": "~4.0.0" - } - }, "acorn": { "version": "7.4.1", "resolved": "https://registry.npmjs.org/acorn/-/acorn-7.4.1.tgz", @@ -34119,12 +34004,6 @@ "integrity": "sha512-iAB+JbDEGXhyIUavoDl9WP/Jj106Kz9DEn1DPgYw5ruDn0e3Wgi3sKFm55sASdGBNOQB8F59d9qQ7deqrHA8wQ==", "dev": true }, - "async-limiter": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.1.tgz", - "integrity": "sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==", - "optional": true - }, "asynckit": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", @@ -34344,7 +34223,7 @@ "version": "1.5.1", "resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.5.1.tgz", "integrity": "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==", - "devOptional": true + "dev": true }, "bcrypt-pbkdf": { "version": "1.0.2", @@ -34585,7 +34464,7 @@ "version": "5.7.1", "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.7.1.tgz", "integrity": "sha512-EHcyIPBQ4BSGlvjB16k5KgAJ27CIsHY/2JBmCRReo48y9rQ3MaUzWX3KVlBa4U7MyX02HdVj0K7C3WaB3ju7FQ==", - "devOptional": true, + "dev": true, "requires": { "base64-js": "^1.3.1", "ieee754": "^1.1.13" @@ -35982,16 +35861,6 @@ } } }, - "deferred-leveldown": { - "version": "5.3.0", - "resolved": "https://registry.npmjs.org/deferred-leveldown/-/deferred-leveldown-5.3.0.tgz", - "integrity": "sha512-a59VOT+oDy7vtAbLRCZwWgxu2BaCfd5Hk7wxJd48ei7I+nsg8Orlb9CLG0PMZienk9BSUKgeAqkO2+Lw+1+Ukw==", - "optional": true, - "requires": { - "abstract-leveldown": "~6.2.1", - "inherits": "^2.0.3" - } - }, "define-data-property": { "version": "1.1.4", "resolved": "https://registry.npmjs.org/define-data-property/-/define-data-property-1.1.4.tgz", @@ -36457,18 +36326,6 @@ "integrity": "sha512-/kyM18EfinwXZbno9FyUGeFh87KC8HRQBQGildHZbEuRyWFOmv1U10o9BBp8XVZDVNNuQKyIGIu5ZYAAXJ0V2Q==", "dev": true }, - "encoding-down": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/encoding-down/-/encoding-down-6.3.0.tgz", - "integrity": "sha512-QKrV0iKR6MZVJV08QY0wp1e7vF6QbhnbQhb07bwpEyuz4uZiZgPlEGdkCROuFkUwdxlFaiPIhjyarH1ee/3vhw==", - "optional": true, - "requires": { - "abstract-leveldown": "^6.2.1", - "inherits": "^2.0.3", - "level-codec": "^9.0.0", - "level-errors": "^2.0.0" - } - }, "end-of-stream": { "version": "1.4.4", "resolved": "https://registry.npmjs.org/end-of-stream/-/end-of-stream-1.4.4.tgz", @@ -36510,15 +36367,6 @@ "dev": true, "peer": true }, - "errno": { - "version": "0.1.8", - "resolved": "https://registry.npmjs.org/errno/-/errno-0.1.8.tgz", - "integrity": "sha512-dJ6oBr5SQ1VSd9qkk7ByRgb/1SH4JZjCHSW/mr63/QcXO9zLVxvJ6Oy13nio03rxpSnVDDjFor75SjVeZWPW/A==", - "optional": true, - "requires": { - "prr": "~1.0.1" - } - }, "error-ex": { "version": "1.3.2", "resolved": "https://registry.npmjs.org/error-ex/-/error-ex-1.3.2.tgz", @@ -38433,7 +38281,7 @@ "version": "1.2.1", "resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.2.1.tgz", "integrity": "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==", - "devOptional": true + "dev": true }, "ignore": { "version": "5.3.1", @@ -38441,12 +38289,6 @@ "integrity": "sha512-5Fytz/IraMjqpwfd34ke28PTVMjZjJG2MPn5t7OE4eUCUNf8BAa7b5WUS9/Qvr6mwOQS7Mk6vdsMno5he+T8Xw==", "dev": true }, - "immediate": { - "version": "3.3.0", - "resolved": "https://registry.npmjs.org/immediate/-/immediate-3.3.0.tgz", - "integrity": "sha512-HR7EVodfFUdQCTIeySw+WDRFJlPcLOJbXfwwZ7Oom6tjsvZ3bOkCDJHehQC3nxJrv7+f9XecwazynjU8e4Vw3Q==", - "optional": true - }, "immutable": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/immutable/-/immutable-4.1.0.tgz", @@ -38516,7 +38358,7 @@ "version": "2.0.4", "resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.4.tgz", "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", - "devOptional": true + "dev": true }, "ini": { "version": "2.0.0", @@ -40856,7 +40698,9 @@ "lodash.debounce": { "version": "4.0.8", "resolved": "https://registry.npmjs.org/lodash.debounce/-/lodash.debounce-4.0.8.tgz", - "integrity": "sha512-FT1yDzDYEoYWhnSGnpE/4Kj1fLZkDFyqRb7fNt6FdYOSxlUWAtp42Eh6Wb0rGIv/m9Bgo7x4GhQbm5Ys4SG5ow==" + "integrity": "sha512-FT1yDzDYEoYWhnSGnpE/4Kj1fLZkDFyqRb7fNt6FdYOSxlUWAtp42Eh6Wb0rGIv/m9Bgo7x4GhQbm5Ys4SG5ow==", + "dev": true, + "peer": true }, "lodash.get": { "version": "4.4.2", @@ -41059,12 +40903,6 @@ "yallist": "^2.1.2" } }, - "ltgt": { - "version": "2.2.1", - "resolved": "https://registry.npmjs.org/ltgt/-/ltgt-2.2.1.tgz", - "integrity": "sha512-AI2r85+4MquTw9ZYqabu4nMwy9Oftlfa/e/52t9IjtfG+mGBbTNdAoZ3RQKLHR6r0wQnwZnPIEh/Ya6XTWAKNA==", - "optional": true - }, "lunr": { "version": "2.3.9", "resolved": "https://registry.npmjs.org/lunr/-/lunr-2.3.9.tgz", @@ -43718,12 +43556,6 @@ "resolved": "https://registry.npmjs.org/nanoid/-/nanoid-3.3.7.tgz", "integrity": "sha512-eSRppjcPIatRIMC1U6UngP8XFcz8MQWGQdt1MTBQ7NaAmvXDfvNxbvWV3x2y6CdEUciCSsDHDQZbhYaB8QEo2g==" }, - "napi-macros": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/napi-macros/-/napi-macros-2.0.0.tgz", - "integrity": "sha512-A0xLykHtARfueITVDernsAWdtIMbOJgKgcluwENp3AlsKN/PloyO10HtmoqnFAQAcxPkgZN7wdfPfEd0zNGxbg==", - "optional": true - }, "natural-compare": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/natural-compare/-/natural-compare-1.4.0.tgz", @@ -44774,12 +44606,6 @@ "resolved": "https://registry.npmjs.org/proxy-polyfill/-/proxy-polyfill-0.3.2.tgz", "integrity": "sha512-ENKSXOMCewnQTOyqrQXxEjIhzT6dy572mtehiItbDoIUF5Sv5UkmRUc8kowg2MFvr232Uo8rwRpNg3V5kgTKbA==" }, - "prr": { - "version": "1.0.1", - "resolved": "https://registry.npmjs.org/prr/-/prr-1.0.1.tgz", - "integrity": "sha512-yPw4Sng1gWghHQWj0B3ZggWUm4qVbPwPFcRG8KyxiU7J2OHFSoEHKS+EZ3fv5l1t9CyCiop6l/ZYeWbrgoQejw==", - "optional": true - }, "pseudomap": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/pseudomap/-/pseudomap-1.0.2.tgz", @@ -46219,7 +46045,7 @@ "version": "5.2.1", "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.2.1.tgz", "integrity": "sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==", - "devOptional": true + "dev": true }, "safer-buffer": { "version": "2.1.2", @@ -46706,7 +46532,7 @@ "version": "1.3.0", "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.3.0.tgz", "integrity": "sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA==", - "devOptional": true, + "dev": true, "requires": { "safe-buffer": "~5.2.0" } @@ -47987,7 +47813,7 @@ "version": "1.0.2", "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==", - "devOptional": true + "dev": true }, "uuid": { "version": "10.0.0", @@ -48892,17 +48718,7 @@ "version": "4.0.2", "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", "integrity": "sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==", - "devOptional": true - }, - "y-leveldb": { - "version": "0.1.2", - "resolved": "https://registry.npmjs.org/y-leveldb/-/y-leveldb-0.1.2.tgz", - "integrity": "sha512-6ulEn5AXfXJYi89rXPEg2mMHAyyw8+ZfeMMdOtBbV8FJpQ1NOrcgi6DTAcXof0dap84NjHPT2+9d0rb6cFsjEg==", - "optional": true, - "requires": { - "level": "^6.0.1", - "lib0": "^0.2.31" - } + "dev": true }, "y-prosemirror": { "version": "1.2.12", diff --git a/package.json b/package.json index 26202a2f5fe..ec77f4c26ef 100644 --- a/package.json +++ b/package.json @@ -109,7 +109,6 @@ "vuex": "^3.6.2", "y-prosemirror": "^1.2.12", "y-protocols": "^1.0.6", - "y-websocket": "^2.0.4", "yjs": "^13.6.20" }, "engines": { diff --git a/src/components/Editor.vue b/src/components/Editor.vue index ac0fc586f32..df0eae7fe2a 100644 --- a/src/components/Editor.vue +++ b/src/components/Editor.vue @@ -92,8 +92,9 @@ import { import ReadonlyBar from './Menu/ReadonlyBar.vue' import { logger } from '../helpers/logger.js' -import { getDocumentState, applyDocumentState, getUpdateMessage } from '../helpers/yjs.js' +import { getDocumentState, applyDocumentState } from '../helpers/yjs.js' import { SyncService, ERROR_TYPE, IDLE_TIMEOUT } from './../services/SyncService.js' +import SessionApi from '../services/SessionApi.js' import createSyncServiceProvider from './../services/SyncServiceProvider.js' import AttachmentResolver from './../services/AttachmentResolver.js' import { extensionHighlight } from '../helpers/mappings.js' @@ -341,7 +342,6 @@ export default { }, created() { this.$ydoc = new Doc() - this.$queue = [] // The following can be useful for debugging ydoc updates // this.$ydoc.on('update', function(update, origin, doc, tr) { // console.debug('ydoc update', update, origin, doc, tr) @@ -349,6 +349,7 @@ export default { // }); this.$providers = [] this.$editor = null + this.$api = null this.$syncService = null this.$attachmentResolver = null }, @@ -374,16 +375,20 @@ export default { } const guestName = localStorage.getItem('nick') ? localStorage.getItem('nick') : '' - this.$syncService = new SyncService({ + this.$api = new SessionApi({ guestName, shareToken: this.shareToken, filePath: this.relativePath, - baseVersionEtag: this.$baseVersionEtag, forceRecreate: this.forceRecreate, + }) + + this.$syncService = new SyncService({ + baseVersionEtag: this.$baseVersionEtag, serialize: this.isRichEditor ? (content) => createMarkdownSerializer(this.$editor.schema).serialize(content ?? this.$editor.state.doc) : (content) => serializePlainText(content ?? this.$editor.state.doc), getDocumentState: () => getDocumentState(this.$ydoc), + api: this.$api, }) this.listenSyncServiceEvents() @@ -392,7 +397,6 @@ export default { ydoc: this.$ydoc, syncService: this.$syncService, fileId: this.fileId, - queue: this.$queue, initialSession: this.initialSession, disableBC: true, }) @@ -492,15 +496,7 @@ export default { }, onLoaded({ document, documentSource, documentState }) { - if (documentState) { - applyDocumentState(this.$ydoc, documentState, this.$providers[0]) - // distribute additional state that may exist locally - const updateMessage = getUpdateMessage(this.$ydoc, documentState) - if (updateMessage) { - logger.debug('onLoaded: Pushing local changes to server') - this.$queue.push(updateMessage) - } - } else { + if (!documentState) { this.setInitialYjsState(documentSource, { isRichEditor: this.isRichEditor }) } @@ -578,7 +574,9 @@ export default { this.$nextTick(() => { this.emit('sync-service:sync') }) - this.document = document + if (document) { + this.document = document + } }, onError({ type, data }) { @@ -690,8 +688,10 @@ export default { }, async close() { - await this.$syncService.sendRemainingSteps(this.$queue) + await this.$syncService.sendRemainingSteps() + .catch(err => logger.warn('Failed to send remaining steps', { err })) await this.disconnect() + .catch(err => logger.warn('Failed to disconnect', { err })) if (this.$editor) { try { this.unlistenEditorEvents() diff --git a/src/helpers/yjs.js b/src/helpers/yjs.js index da202ca2d9c..6d55d294f2a 100644 --- a/src/helpers/yjs.js +++ b/src/helpers/yjs.js @@ -8,7 +8,7 @@ import * as Y from 'yjs' import * as decoding from 'lib0/decoding.js' import * as encoding from 'lib0/encoding.js' import * as syncProtocol from 'y-protocols/sync' -import { messageSync } from 'y-websocket' +import { messageSync } from '../services/y-websocket.js' /** * Get Document state encode as base64. @@ -34,36 +34,44 @@ export function applyDocumentState(ydoc, documentState, origin) { } /** - * Update message for everything in ydoc that is not in encodedBaseUpdate + * Create a step from a document state + * i.e. create a sync protocol update message from it + * and encode it and wrap it in a step data structure. * - * @param {Y.Doc} ydoc - encode state of this doc - * @param {string} encodedBaseUpdate - base64 encoded doc update to build upon - * @return {Uint8Array|undefined} + * @param {string} documentState - base64 encoded doc state + * @return {string} base64 encoded yjs sync protocol update message */ -export function getUpdateMessage(ydoc, encodedBaseUpdate) { - const baseUpdate = decodeArrayBuffer(encodedBaseUpdate) - const baseStateVector = Y.encodeStateVectorFromUpdate(baseUpdate) - const docStateVector = Y.encodeStateVector(ydoc) - if (sameState(baseStateVector, docStateVector)) { - // no additional state in the ydoc - early return - return - } +export function documentStateToStep(documentState) { + const message = documentStateToUpdateMessage(documentState) + return { step: encodeArrayBuffer(message) } +} + +/** + * Create an update message from a document state + * i.e. decode the base64 encoded yjs update + * and create a sync protocol update message from it + * + * @param {string} documentState - base64 encoded doc state + * @return {Uint8Array} + */ +function documentStateToUpdateMessage(documentState) { + const update = decodeArrayBuffer(documentState) const encoder = encoding.createEncoder() encoding.writeVarUint(encoder, messageSync) - const update = Y.encodeStateAsUpdate(ydoc, baseStateVector) syncProtocol.writeUpdate(encoder, update) return encoding.toUint8Array(encoder) } /** - * Apply an updated message to the ydoc. + * Apply a step to the ydoc. * * Only used in tests right now. * @param {Y.Doc} ydoc - encode state of this doc - * @param {Uint8Array} updateMessage - y-websocket sync message with update + * @param {string} step - base64 encoded yjs sync update message * @param {object} origin - initiator object e.g. WebsocketProvider */ -export function applyUpdateMessage(ydoc, updateMessage, origin = 'origin') { +export function applyStep(ydoc, step, origin = 'origin') { + const updateMessage = decodeArrayBuffer(step.step) const decoder = decoding.createDecoder(updateMessage) const messageType = decoding.readVarUint(decoder) if (messageType !== messageSync) { @@ -80,26 +88,6 @@ export function applyUpdateMessage(ydoc, updateMessage, origin = 'origin') { ) } -/** - * Get the steps for sending to the server - * - * @param {object[]} queue - queue for the outgoing steps - */ -export function getSteps(queue) { - return queue.map(s => encodeArrayBuffer(s)) - .filter(s => s < 'AQ') -} - -/** - * Encode the latest awareness message for sending - * - * @param {object[]} queue - queue for the outgoing steps - */ -export function getAwareness(queue) { - return queue.map(s => encodeArrayBuffer(s)) - .findLast(s => s > 'AQ') || '' -} - /** * Log y.js messages with their type and initiator call stack * @@ -127,13 +115,3 @@ export function logStep(step) { break } } - -/** - * Helper function to check if two state vectors have the same state - * @param {Array} arr - state vector to compare - * @param {Array} other - state vector to compare against - */ -function sameState(arr, other) { - return arr.length === other.length - && arr.every((value, index) => other[index] === value) -} diff --git a/src/services/Outbox.ts b/src/services/Outbox.ts new file mode 100644 index 00000000000..2ca82416b8d --- /dev/null +++ b/src/services/Outbox.ts @@ -0,0 +1,64 @@ +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +import { encodeArrayBuffer } from '../helpers/base64.js' +import { logger } from '../helpers/logger.js' + +type Sendable = { + steps: string[], awareness: string +} + +export default class Outbox { + #awarenessUpdate = '' + #syncUpdate = '' + #syncQuery = '' + + storeStep(step: Uint8Array) { + const encoded = encodeArrayBuffer(step) + if (encoded < 'AAA' || encoded > 'Ag') { + logger.warn('Unexpected step type:', { step, encoded }) + return + } + if (encoded < 'AAE') { + this.#syncQuery = encoded + return + } + if (encoded < 'AQ') { + this.#syncUpdate = encoded + return + } + this.#awarenessUpdate = encoded + } + + getDataToSend(): Sendable { + return { + steps: [this.#syncUpdate, this.#syncQuery].filter(s => s), + awareness: this.#awarenessUpdate, + } + } + + get hasUpdate(): boolean { + return !!this.#syncUpdate + } + + /* + * Clear data that has just been sent. + * + * Only clear data that has not changed in the meantime. + * @param {Sendable} - data that was to the server + */ + clearSentData({ steps, awareness }: Sendable) { + if (steps.includes(this.#syncUpdate)) { + this.#syncUpdate = '' + } + if (steps.includes(this.#syncQuery)) { + this.#syncQuery = '' + } + if (this.#awarenessUpdate === awareness) { + this.#awarenessUpdate = '' + } + } + +} diff --git a/src/services/PollingBackend.js b/src/services/PollingBackend.js index 054bbcc865b..f7980d03d94 100644 --- a/src/services/PollingBackend.js +++ b/src/services/PollingBackend.js @@ -127,7 +127,7 @@ class PollingBackend { this.#fetchRetryCounter = 0 this.#syncService.emit('change', { document, sessions }) - this.#syncService._receiveSteps(data) + this.#syncService.receiveSteps(data) if (data.steps.length === 0) { if (!this.#initialLoadingFinished) { diff --git a/src/services/SyncService.js b/src/services/SyncService.js index 92fa361b129..61860b18fd9 100644 --- a/src/services/SyncService.js +++ b/src/services/SyncService.js @@ -9,8 +9,9 @@ import mitt from 'mitt' import debounce from 'debounce' import PollingBackend from './PollingBackend.js' -import SessionApi, { Connection } from './SessionApi.js' -import { getSteps, getAwareness } from '../helpers/yjs.js' +import Outbox from './Outbox.ts' +import { Connection } from './SessionApi.js' +import { documentStateToStep } from '../helpers/yjs.js' import { logger } from '../helpers/logger.js' /** @@ -56,14 +57,15 @@ class SyncService { #sendIntervalId #connection + #outbox = new Outbox() - constructor({ baseVersionEtag, serialize, getDocumentState, ...options }) { + constructor({ baseVersionEtag, serialize, getDocumentState, api }) { /** @type {import('mitt').Emitter} _bus */ this._bus = mitt() this.serialize = serialize this.getDocumentState = getDocumentState - this._api = new SessionApi(options) + this._api = api this.#connection = null this.stepClientIDs = [] @@ -153,52 +155,69 @@ class SyncService { }) } - sendSteps(getSendable) { + sendStep(step) { + this.#outbox.storeStep(step) + this.sendSteps() + } + + sendSteps() { // If already waiting to send, do nothing. if (this.#sendIntervalId) { return } - return new Promise((resolve, reject) => { - this.#sendIntervalId = setInterval(() => { - if (this.#connection && !this.sending) { - this.sendStepsNow(getSendable).then(resolve).catch(reject) - } - }, 200) - }) + this.#sendIntervalId = setInterval(() => { + if (this.#connection && !this.sending) { + this.sendStepsNow().catch(err => logger.error(err)) + } + }, 200) } - sendStepsNow(getSendable) { + async sendStepsNow() { this.sending = true clearInterval(this.#sendIntervalId) this.#sendIntervalId = null - const data = getSendable() - if (data.steps.length > 0) { + const sendable = this.#outbox.getDataToSend() + if (sendable.steps.length > 0) { this.emit('stateChange', { dirty: true }) } - return this.#connection.push(data) + if (!this.hasActiveConnection) { + return + } + return this.#connection.push({ ...sendable, version: this.version }) .then((response) => { + this.#outbox.clearSentData(sendable) + const { steps, documentState } = response.data + if (documentState) { + const documentStateStep = documentStateToStep(documentState) + this.emit('sync', { + version: this.version, + steps: [documentStateStep], + document: this.#connection.document, + }) + } this.pushError = 0 this.sending = false - this.emit('sync', { - steps: [], - document: this.#connection.document, - version: this.version, - }) + if (steps?.length > 0) { + this.receiveSteps({ steps }) + } }).catch(err => { const { response, code } = err this.sending = false this.pushError++ + logger.error('Failed to push the steps to the server', err) if (!response || code === 'ECONNABORTED') { this.emit('error', { type: ERROR_TYPE.CONNECTION_FAILED, data: {} }) } if (response?.status === 412) { this.emit('error', { type: ERROR_TYPE.LOAD_ERROR, data: response }) } else if (response?.status === 403) { - if (!data.document) { + // TODO: is this really about sendable? + if (!sendable.document) { // either the session is invalid or the document is read only. logger.error('failed to write to document - not allowed') this.emit('error', { type: ERROR_TYPE.PUSH_FORBIDDEN, data: {} }) } + // TODO: does response.data ever have a document? maybe for errors? // Only emit conflict event if we have synced until the latest version if (response.data.document?.currentVersion === this.version) { this.emit('error', { type: ERROR_TYPE.PUSH_FAILURE, data: {} }) @@ -211,7 +230,7 @@ class SyncService { }) } - _receiveSteps({ steps, document, sessions }) { + receiveSteps({ steps, document = null, sessions = [] }) { const awareness = sessions .filter(s => s.lastContact > (Math.floor(Date.now() / 1000) - COLLABORATOR_DISCONNECT_TIME)) .filter(s => s.lastAwarenessMessage) @@ -239,8 +258,7 @@ class SyncService { this.lastStepPush = Date.now() this.emit('sync', { steps: newSteps, - // TODO: do we actually need to dig into the connection here? - document: this.#connection.document, + document, version: this.version, }) } @@ -291,25 +309,12 @@ class SyncService { }) } - async sendRemainingSteps(queue) { - if (queue.length === 0) { + async sendRemainingSteps() { + if (!this.#outbox.hasUpdate) { return } - let outbox = [] - const steps = getSteps(queue) - const awareness = getAwareness(queue) - return this.sendStepsNow(() => { - const data = { steps, awareness, version: this.version } - outbox = [...queue] - logger.debug('sending final steps ', data) - return data - })?.then(() => { - // only keep the steps that were not send yet - queue.splice(0, - queue.length, - ...queue.filter(s => !outbox.includes(s)), - ) - }, err => logger.error(err)) + logger.debug('sending final steps') + return this.sendStepsNow().catch(err => logger.error(err)) } async close() { diff --git a/src/services/SyncServiceProvider.js b/src/services/SyncServiceProvider.js index 8572deafda7..e62ef4bcfc5 100644 --- a/src/services/SyncServiceProvider.js +++ b/src/services/SyncServiceProvider.js @@ -3,7 +3,7 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -import { WebsocketProvider } from 'y-websocket' +import { WebsocketProvider } from './y-websocket.js' import initWebSocketPolyfill from './WebSocketPolyfill.js' import { logger } from '../helpers/logger.js' diff --git a/src/services/WebSocketPolyfill.js b/src/services/WebSocketPolyfill.js index 0fb7b5477e5..e9a885566dd 100644 --- a/src/services/WebSocketPolyfill.js +++ b/src/services/WebSocketPolyfill.js @@ -5,7 +5,6 @@ import { logger } from '../helpers/logger.js' import { decodeArrayBuffer } from '../helpers/base64.ts' -import { getSteps, getAwareness } from '../helpers/yjs.js' import getNotifyBus from './NotifyService.js' /** @@ -13,14 +12,11 @@ import getNotifyBus from './NotifyService.js' * @param {object} syncService - the sync service to build upon * @param {number} fileId - id of the file to open * @param {object} initialSession - initial session to open - * @param {object[]} queue - queue for the outgoing steps */ -export default function initWebSocketPolyfill(syncService, fileId, initialSession, queue) { +export default function initWebSocketPolyfill(syncService, fileId, initialSession) { return class WebSocketPolyfill { #url - #session - #version binaryType onmessage onerror @@ -35,34 +31,19 @@ export default function initWebSocketPolyfill(syncService, fileId, initialSessio this.url = url logger.debug('WebSocketPolyfill#constructor', { url, fileId, initialSession }) this.#registerHandlers({ - opened: ({ version, session }) => { - logger.debug('opened ', { version, session }) - this.#version = version - this.#session = session - }, - loaded: ({ version, session, content }) => { - logger.debug('loaded ', { version, session }) - this.#version = version - this.#session = session - }, sync: ({ steps, version }) => { - logger.debug('synced ', { version, steps }) - this.#version = version if (steps) { steps.forEach(s => { const data = decodeArrayBuffer(s.step) this.onmessage({ data }) }) + logger.debug('synced ', { version, steps }) } }, }) syncService.open({ fileId, initialSession }).then((data) => { if (syncService.hasActiveConnection) { - const { version, session } = data - this.#version = version - this.#session = session - this.onopen?.() } }) @@ -74,32 +55,10 @@ export default function initWebSocketPolyfill(syncService, fileId, initialSessio .forEach(([key, value]) => syncService.on(key, value)) } - send(...data) { + send(step) { // Useful for debugging what steps are sent and how they were initiated - // data.forEach(logStep) - - queue.push(...data) - let outbox = [] - return syncService.sendSteps(() => { - const data = { - steps: getSteps(queue), - awareness: getAwareness(queue), - version: this.#version, - } - outbox = [...queue] - logger.debug('sending steps ', data) - return data - })?.then(ret => { - // only keep the steps that were not send yet - queue.splice(0, - queue.length, - ...queue.filter(s => !outbox.includes(s)), - ) - return ret - }, err => { - logger.error(`Failed to push the queue with ${queue.length} steps to the server`, err) - this.onerror?.(err) - }) + // logStep(step) + syncService.sendStep(step) } async close() { diff --git a/src/services/y-websocket.js b/src/services/y-websocket.js new file mode 100644 index 00000000000..7fdd7da5493 --- /dev/null +++ b/src/services/y-websocket.js @@ -0,0 +1,539 @@ +/** + * SPDX-FileCopyrightText: 2019 Kevin Jahns + * SPDX-License-Identifier: MIT + */ + +/** + * Based on the awesome y-websocket https://github.com/yjs/y-websocket/ + * Modified to match the needs of an http transport. + */ + +/* eslint-env browser */ +/* eslint-disable jsdoc/require-param-description */ + +import * as Y from 'yjs' // eslint-disable-line +import * as bc from 'lib0/broadcastchannel' +import * as time from 'lib0/time' +import * as encoding from 'lib0/encoding' +import * as decoding from 'lib0/decoding' +import * as syncProtocol from 'y-protocols/sync' +import * as authProtocol from 'y-protocols/auth' +import * as awarenessProtocol from 'y-protocols/awareness' +import { Observable } from 'lib0/observable' +import * as math from 'lib0/math' +import * as url from 'lib0/url' +import * as env from 'lib0/environment' + +export const messageSync = 0 +export const messageQueryAwareness = 3 +export const messageAwareness = 1 +export const messageAuth = 2 + +/** + * encoder, decoder, provider, emitSynced, messageType + * @type {Array} + */ +const messageHandlers = [] + +messageHandlers[messageSync] = ( + encoder, + decoder, + provider, + emitSynced, + _messageType, +) => { + encoding.writeVarUint(encoder, messageSync) + const decoderForRemote = decoding.clone(decoder) + const syncMessageType = syncProtocol.readSyncMessage( + decoder, + encoder, + provider.doc, + provider, + ) + // Message came from the broadcast channel + // Do not track in this.remote and do not emit sync. + if (!emitSynced) { + return + } + if ( + syncMessageType === syncProtocol.messageYjsSyncStep2 + || syncMessageType === syncProtocol.messageYjsUpdate + ) { + syncProtocol.readSyncMessage( + decoderForRemote, + encoding.createEncoder(), + provider.remote, + provider, + ) + } + if ( + syncMessageType === syncProtocol.messageYjsSyncStep2 + && !provider.synced + ) { + provider.synced = true + } +} + +// modified to only send own awareness +messageHandlers[messageQueryAwareness] = ( + encoder, + _decoder, + provider, + _emitSynced, + _messageType, +) => { + encoding.writeVarUint(encoder, messageAwareness) + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate( + provider.awareness, + [provider.doc.clientID], + // Array.from(provider.awareness.getStates().keys()), + ), + ) +} + +messageHandlers[messageAwareness] = ( + _encoder, + decoder, + provider, + _emitSynced, + _messageType, +) => { + awarenessProtocol.applyAwarenessUpdate( + provider.awareness, + decoding.readVarUint8Array(decoder), + provider, + ) +} + +messageHandlers[messageAuth] = ( + _encoder, + decoder, + provider, + _emitSynced, + _messageType, +) => { + authProtocol.readAuthMessage( + decoder, + provider.doc, + (_ydoc, reason) => permissionDeniedHandler(provider, reason), + ) +} + +// @todo - this should depend on awareness.outdatedTime +const messageReconnectTimeout = 30000 + +/** + * @param {WebsocketProvider} provider + * @param {string} reason + */ +const permissionDeniedHandler = (provider, reason) => + console.warn(`Permission denied to access ${provider.url}.\n${reason}`) + +/** + * @param {WebsocketProvider} provider + * @param {Uint8Array} buf + * @param {boolean} emitSynced + * @return {encoding.Encoder} + */ +const readMessage = (provider, buf, emitSynced) => { + const decoder = decoding.createDecoder(buf) + const encoder = encoding.createEncoder() + const messageType = decoding.readVarUint(decoder) + const messageHandler = provider.messageHandlers[messageType] + if (/** @type {any} */ (messageHandler)) { + messageHandler(encoder, decoder, provider, emitSynced, messageType) + } else { + console.error('Unable to compute message') + } + return encoder +} + +/** + * @param {WebsocketProvider} provider + */ +const setupWS = (provider) => { + if (provider.shouldConnect && provider.ws === null) { + const websocket = new provider._WS(provider.url, provider.protocols) + websocket.binaryType = 'arraybuffer' + provider.ws = websocket + provider.wsconnecting = true + provider.wsconnected = false + provider.synced = false + + websocket.onmessage = (event) => { + provider.wsLastMessageReceived = time.getUnixTime() + const encoder = readMessage(provider, new Uint8Array(event.data), true) + if (encoding.length(encoder) > 1) { + websocket.send(encoding.toUint8Array(encoder)) + } + } + websocket.onerror = (event) => { + provider.emit('connection-error', [event, provider]) + } + websocket.onclose = (event) => { + provider.emit('connection-close', [event, provider]) + provider.ws = null + provider.wsconnecting = false + if (provider.wsconnected) { + provider.wsconnected = false + provider.synced = false + // update awareness (all users except local left) + awarenessProtocol.removeAwarenessStates( + provider.awareness, + Array.from(provider.awareness.getStates().keys()).filter((client) => + client !== provider.doc.clientID, + ), + provider, + ) + provider.emit('status', [{ + status: 'disconnected', + }]) + } else { + provider.wsUnsuccessfulReconnects++ + } + // Start with no reconnect timeout and increase timeout by + // using exponential backoff starting with 100ms + setTimeout( + setupWS, + math.min( + math.pow(2, provider.wsUnsuccessfulReconnects) * 100, + provider.maxBackoffTime, + ), + provider, + ) + } + websocket.onopen = () => { + provider.wsLastMessageReceived = time.getUnixTime() + provider.wsconnecting = false + provider.wsconnected = true + provider.wsUnsuccessfulReconnects = 0 + provider.emit('status', [{ + status: 'connected', + }]) + // always send sync step 1 when connected + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageSync) + syncProtocol.writeSyncStep1(encoder, provider.doc) + websocket.send(encoding.toUint8Array(encoder)) + // broadcast local awareness state + if (provider.awareness.getLocalState() !== null) { + const encoderAwarenessState = encoding.createEncoder() + encoding.writeVarUint(encoderAwarenessState, messageAwareness) + encoding.writeVarUint8Array( + encoderAwarenessState, + awarenessProtocol.encodeAwarenessUpdate(provider.awareness, [ + provider.doc.clientID, + ]), + ) + websocket.send(encoding.toUint8Array(encoderAwarenessState)) + } + } + provider.emit('status', [{ + status: 'connecting', + }]) + } +} + +/** + * @param {WebsocketProvider} provider + * @param {ArrayBuffer} buf + */ +const broadcastMessage = (provider, buf) => { + const ws = provider.ws + if (provider.wsconnected && ws && ws.readyState === ws.OPEN) { + ws.send(buf) + } + if (provider.bcconnected) { + bc.publish(provider.bcChannel, buf, provider) + } +} + +/** + * Websocket Provider for Yjs. Creates a websocket connection to sync the shared document. + * The document name is attached to the provided url. I.e. the following example + * creates a websocket connection to http://localhost:1234/my-document-name + * + * @example + * import * as Y from 'yjs' + * import { WebsocketProvider } from 'y-websocket' + * const doc = new Y.Doc() + * const provider = new WebsocketProvider('http://localhost:1234', 'my-document-name', doc) + * + * @augments {Observable} + */ +export class WebsocketProvider extends Observable { + + /** + * @param {string} serverUrl + * @param {string} roomname + * @param {Y.Doc} doc + * @param {object} opts + * @param {boolean} [opts.connect] + * @param {awarenessProtocol.Awareness} [opts.awareness] + * @param {{[key: string]: string}} [opts.params] specify url parameters + * @param {Array} [opts.protocols] specify websocket protocols + * @param {typeof WebSocket} [opts.WebSocketPolyfill] Optionall provide a WebSocket polyfill + * @param {number} [opts.resyncInterval] Request server state every `resyncInterval` milliseconds + * @param {number} [opts.maxBackoffTime] Maximum amount of time to wait before trying to reconnect (we try to reconnect using exponential backoff) + * @param {boolean} [opts.disableBc] Disable cross-tab BroadcastChannel communication + */ + constructor(serverUrl, roomname, doc, { + connect = true, + awareness = new awarenessProtocol.Awareness(doc), + params = {}, + protocols = [], + WebSocketPolyfill = WebSocket, + resyncInterval = -1, + maxBackoffTime = 2500, + disableBc = false, + } = {}) { + super() + // ensure that url is always ends with / + while (serverUrl[serverUrl.length - 1] === '/') { + serverUrl = serverUrl.slice(0, serverUrl.length - 1) + } + this.serverUrl = serverUrl + this.bcChannel = serverUrl + '/' + roomname + this.maxBackoffTime = maxBackoffTime + /** + * The specified url parameters. This can be safely updated. The changed parameters will be used + * when a new connection is established. + * @type {{[key: string]: string}} + */ + this.params = params + this.protocols = protocols + this.roomname = roomname + this.doc = doc + this._WS = WebSocketPolyfill + this.awareness = awareness + this.wsconnected = false + this.wsconnecting = false + this.bcconnected = false + this.disableBc = disableBc + this.wsUnsuccessfulReconnects = 0 + this.messageHandlers = messageHandlers.slice() + /** + * @type {Y.Doc} + */ + this.remote = new Y.Doc() + /** + * @type {boolean} + */ + this._synced = false + /** + * @type {WebSocket?} + */ + this.ws = null + this.wsLastMessageReceived = 0 + /** + * Whether to connect to other peers or not + * @type {boolean} + */ + this.shouldConnect = connect + + /** + * @type {number} + */ + this._resyncInterval = 0 + if (resyncInterval > 0) { + this._resyncInterval = /** @type {any} */ (setInterval(() => { + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + // resend sync step 1 + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageSync) + syncProtocol.writeSyncStep1(encoder, doc) + this.ws.send(encoding.toUint8Array(encoder)) + } + }, resyncInterval)) + } + + /** + * @param {ArrayBuffer} data + * @param {any} origin + */ + this._bcSubscriber = (data, origin) => { + if (origin !== this) { + const encoder = readMessage(this, new Uint8Array(data), false) + if (encoding.length(encoder) > 1) { + bc.publish(this.bcChannel, encoding.toUint8Array(encoder), this) + } + } + } + /** + * Listens to Yjs updates and sends them to remote peers (ws and broadcastchannel) + * @param {Uint8Array} _update + * @param {any} origin + * @param {Y.Doc} doc + */ + this._updateHandler = (_update, origin, doc) => { + if (origin !== this) { + const from = Y.encodeStateVector(this.remote) + const fullUpdate = Y.encodeStateAsUpdate(doc, from) + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageSync) + syncProtocol.writeUpdate(encoder, fullUpdate) + broadcastMessage(this, encoding.toUint8Array(encoder)) + } + } + this.doc.on('update', this._updateHandler) + /** + * Send an awareness update message when local awareness changes + * modified to only send update about this client. + * @param {any} changed + * @param {any} _origin + */ + this._awarenessUpdateHandler = ({ added, updated, removed }, _origin) => { + // const changedClients = added.concat(updated).concat(removed) + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageAwareness) + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate( + awareness, + [this.doc.clientID], + // changedClients + ), + ) + broadcastMessage(this, encoding.toUint8Array(encoder)) + } + this._exitHandler = () => { + awarenessProtocol.removeAwarenessStates( + this.awareness, + [doc.clientID], + 'app closed', + ) + } + if (env.isNode && typeof process !== 'undefined') { + process.on('exit', this._exitHandler) + } + awareness.on('update', this._awarenessUpdateHandler) + this._checkInterval = /** @type {any} */ (setInterval(() => { + if ( + this.wsconnected + && messageReconnectTimeout + < time.getUnixTime() - this.wsLastMessageReceived + ) { + // no message received in a long time - not even your own awareness + // updates (which are updated every 15 seconds) + /** @type {WebSocket} */ (this.ws).close() + } + }, messageReconnectTimeout / 10)) + if (connect) { + this.connect() + } + } + + get url() { + const encodedParams = url.encodeQueryParams(this.params) + return this.serverUrl + '/' + this.roomname + + (encodedParams.length === 0 ? '' : '?' + encodedParams) + } + + /** + * @type {boolean} + */ + get synced() { + return this._synced + } + + set synced(state) { + if (this._synced !== state) { + this._synced = state + this.emit('synced', [state]) + this.emit('sync', [state]) + } + } + + destroy() { + if (this._resyncInterval !== 0) { + clearInterval(this._resyncInterval) + } + clearInterval(this._checkInterval) + this.disconnect() + if (env.isNode && typeof process !== 'undefined') { + process.off('exit', this._exitHandler) + } + this.awareness.off('update', this._awarenessUpdateHandler) + this.doc.off('update', this._updateHandler) + super.destroy() + } + + connectBc() { + if (this.disableBc) { + return + } + if (!this.bcconnected) { + bc.subscribe(this.bcChannel, this._bcSubscriber) + this.bcconnected = true + } + // send sync step1 to bc + // write sync step 1 + const encoderSync = encoding.createEncoder() + encoding.writeVarUint(encoderSync, messageSync) + syncProtocol.writeSyncStep1(encoderSync, this.doc) + bc.publish(this.bcChannel, encoding.toUint8Array(encoderSync), this) + // broadcast local state + const encoderState = encoding.createEncoder() + encoding.writeVarUint(encoderState, messageSync) + syncProtocol.writeSyncStep2(encoderState, this.doc) + bc.publish(this.bcChannel, encoding.toUint8Array(encoderState), this) + // write queryAwareness + const encoderAwarenessQuery = encoding.createEncoder() + encoding.writeVarUint(encoderAwarenessQuery, messageQueryAwareness) + bc.publish( + this.bcChannel, + encoding.toUint8Array(encoderAwarenessQuery), + this, + ) + // broadcast local awareness state + const encoderAwarenessState = encoding.createEncoder() + encoding.writeVarUint(encoderAwarenessState, messageAwareness) + encoding.writeVarUint8Array( + encoderAwarenessState, + awarenessProtocol.encodeAwarenessUpdate(this.awareness, [ + this.doc.clientID, + ]), + ) + bc.publish( + this.bcChannel, + encoding.toUint8Array(encoderAwarenessState), + this, + ) + } + + disconnectBc() { + // broadcast message with local awareness state set to null (indicating disconnect) + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, messageAwareness) + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate(this.awareness, [ + this.doc.clientID, + ], new Map()), + ) + broadcastMessage(this, encoding.toUint8Array(encoder)) + if (this.bcconnected) { + bc.unsubscribe(this.bcChannel, this._bcSubscriber) + this.bcconnected = false + } + } + + disconnect() { + this.shouldConnect = false + this.disconnectBc() + if (this.ws !== null) { + this.ws.close() + } + } + + connect() { + this.shouldConnect = true + if (!this.wsconnected && this.ws === null) { + setupWS(this) + this.connectBc() + } + } + +} diff --git a/src/tests/services/WebsocketPolyfill.spec.js b/src/tests/services/WebsocketPolyfill.spec.js index 6eb4ad72f3e..9a42c2a4e86 100644 --- a/src/tests/services/WebsocketPolyfill.spec.js +++ b/src/tests/services/WebsocketPolyfill.spec.js @@ -30,93 +30,4 @@ describe('Init function', () => { expect(syncService.open).toHaveBeenCalledWith({ fileId, initialSession }) }) - it('sends steps to sync service', async () => { - const syncService = { - on: jest.fn(), - open: jest.fn(() => Promise.resolve({ version: 123, session: {} })), - sendSteps: async getData => getData(), - } - const queue = [ 'initial' ] - const data = { dummy: 'data' } - const Polyfill = initWebSocketPolyfill(syncService, null, null, queue) - const websocket = new Polyfill('url') - const result = websocket.send(data) - expect(result).toBeInstanceOf(Promise) - expect(queue).toEqual([ 'initial' , data ]) - const dataSendOut = await result - expect(queue).toEqual([]) - expect(dataSendOut).toHaveProperty('awareness') - expect(dataSendOut).toHaveProperty('steps') - expect(dataSendOut).toHaveProperty('version') - }) - - it('handles early reject', async () => { - jest.spyOn(console, 'error').mockImplementation(() => {}) - const syncService = { - on: jest.fn(), - open: jest.fn(() => Promise.resolve({ version: 123, session: {} })), - sendSteps: jest.fn().mockRejectedValue('error before reading steps in sync service'), - } - const queue = [ 'initial' ] - const data = { dummy: 'data' } - const Polyfill = initWebSocketPolyfill(syncService, null, null, queue) - const websocket = new Polyfill('url') - const result = websocket.send(data) - expect(queue).toEqual([ 'initial' , data ]) - expect(result).toBeInstanceOf(Promise) - const returned = await result - expect(returned).toBeUndefined() - expect(queue).toEqual([ 'initial' , data ]) - }) - - it('handles reject after reading data', async () => { - jest.spyOn(console, 'error').mockImplementation(() => {}) - const syncService = { - on: jest.fn(), - open: jest.fn(() => Promise.resolve({ version: 123, session: {} })), - sendSteps: jest.fn().mockImplementation( async getData => { - getData() - throw 'error when sending in sync service' - }), - } - const queue = [ 'initial' ] - const data = { dummy: 'data' } - const Polyfill = initWebSocketPolyfill(syncService, null, null, queue) - const websocket = new Polyfill('url') - const result = websocket.send(data) - expect(queue).toEqual([ 'initial' , data ]) - expect(result).toBeInstanceOf(Promise) - const returned = await result - expect(returned).toBeUndefined() - expect(queue).toEqual([ 'initial' , data ]) - }) - - it('queue survives a close', async () => { - jest.spyOn(console, 'error').mockImplementation(() => {}) - const syncService = { - on: jest.fn(), - open: jest.fn(() => Promise.resolve({ version: 123, session: {} })), - sendSteps: jest.fn().mockImplementation( async getData => { - getData() - throw 'error when sending in sync service' - }), - sendStepsNow: jest.fn().mockImplementation( async getData => { - getData() - throw 'sendStepsNow error when sending' - }), - off: jest.fn(), - close: jest.fn( async data => data ), - } - const queue = [ 'initial' ] - const data = { dummy: 'data' } - const Polyfill = initWebSocketPolyfill(syncService, null, null, queue) - const websocket = new Polyfill('url') - websocket.onclose = jest.fn() - await websocket.send(data) - const promise = websocket.close() - expect(queue).toEqual([ 'initial' , data ]) - await promise - expect(queue).toEqual([ 'initial' , data ]) - }) - }) diff --git a/tsconfig.json b/tsconfig.json index 120039e57c8..9d64b435b9d 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -2,6 +2,7 @@ "extends": "@vue/tsconfig", "compilerOptions": { "allowSyntheticDefaultImports": true, + "allowJs": true, "declaration": true, "esModuleInterop": true, "lib": ["DOM", "ESNext"], @@ -21,4 +22,4 @@ "vueCompilerOptions": { "target": 2.7 } -} \ No newline at end of file +}