diff --git a/RELEASE-NOTES.txt b/RELEASE-NOTES.txt index bdc73b4..b4f6476 100644 --- a/RELEASE-NOTES.txt +++ b/RELEASE-NOTES.txt @@ -1,8 +1,9 @@ # Changelog -## 1.1.0 +## 1.1.0 - 1.1.1 - Increase usability in other projects by separating `node` and browser modules [#100](https://github.com/Simperium/node-simperium/pull/100) + - Stop applying changes that have already been applied to an entity [#101](https://github.com/Simperium/node-simperium/pull/101) ## 1.0.4 diff --git a/package-lock.json b/package-lock.json index 0c80657..f2ea1c6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "simperium", - "version": "1.1.0", + "version": "1.1.1", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index 53bf89b..6125b75 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "simperium", - "version": "1.1.0", + "version": "1.1.1", "description": "A simperium client for node.js", "main": "./lib/simperium/index.js", "browser": { diff --git a/src/simperium/channel.js b/src/simperium/channel.js index da16691..906e4ec 100644 --- a/src/simperium/channel.js +++ b/src/simperium/channel.js @@ -133,6 +133,10 @@ internal.updateAcknowledged = function( change ) { }; internal.findAcknowledgedChange = function( change ) { + if (this.localQueue.seenChanges.has(change.id)) { + return this.localQueue.seenChanges.get(change.id); + } + var possibleChange = this.localQueue.sent[change.id]; if ( possibleChange ) { if ( ( change.ccids || [] ).indexOf( possibleChange.ccid ) > -1 ) { @@ -175,6 +179,10 @@ internal.applyChange = function( change, ghost ) { } if ( change.o === operation.MODIFY ) { + if ( ghost && ghost.version >= change.ev ) { + return; + } + if ( ghost && ( ghost.version !== change.sv ) ) { internal.requestObjectVersion.call( this, change.id, change.sv ).then( data => { internal.applyChange.call( this, change, { version: change.sv, data } ) @@ -714,6 +722,7 @@ Queue.prototype.run = function() { function LocalQueue( store ) { this.store = store; this.sent = {}; + this.seenChanges = new Map(); this.queues = {}; this.ready = false; } @@ -734,6 +743,7 @@ LocalQueue.prototype.pause = function() { LocalQueue.prototype.acknowledge = function( change ) { if ( this.sent[change.id] === change ) { + this.seenChanges.set( change.id, change ); delete this.sent[change.id]; } diff --git a/test/simperium/crossed_wires_test.js b/test/simperium/crossed_wires_test.js index ef7d470..5a93fa5 100644 --- a/test/simperium/crossed_wires_test.js +++ b/test/simperium/crossed_wires_test.js @@ -105,6 +105,227 @@ describe( 'Crossed Wires', () => { notes.map( note => note.data.content ), ) } ) + + it( 'ignores ccid after receiving a 409 for it', async() => { + /** + * Scenario: + * + * Client 1 sends a change (ccid x) "AC" => "ACD" : "=2\t+D" + * Client 2 sends a change (ccid y) "AC" => "ABC" : "=1\t+B\t=1" + * + * Server accepts ccid x as is and broadcasts back to clients + * + * c:{ccids:[y],v:"=1\t+B\t=1"} + * + * Server accepts ccid y, server sees that the change needs to be modified because of x: + * + * c:{ccids:[x],v:"=3\t+D"} + * + * Client 1 and Client 2 should now have Ghosts that match. + */ + + // Two clients that need indexes downloaded + const bucketX = createBucket(); + bucketX.id = 'x'; + const bucketY = createBucket(); + bucketY.id = 'y'; + const clients = [bucketX, bucketY]; + + const responses = await Promise.all( [ + waitForClient( bucketX, () => bucketX.channel.handleMessage( 'auth:user' ) ), + waitForClient( bucketY, () => bucketY.channel.handleMessage( 'auth:user' ) ), + ] ); + + deepEqual( + Array( 2 ).fill( 'i:1:::10' ), + responses + ); + + const cvs = await Promise.all( clients.map( client => { + const indexed = new Promise( resolve => { + client.once( 'index', resolve ); + } ); + client.channel.handleMessage( 'i:' + JSON.stringify( { + index: [{ + id: 'note-id', + v: 1, + d: { content: 'AC' } + }], + current: 'cv-1', + } ) ); + return indexed; + } ) ); + + deepEqual( Array( 2 ).fill( 'cv-1' ), cvs ); + + deepEqual( + Array( 2 ).fill( { data: { content: 'AC' }, id: 'note-id' } ), + await Promise.all( clients.map( client => client.get( 'note-id' ) ) ), + ); + + const [changeY, changeX] = ( await Promise.all( [ + waitForClient( bucketY, () => bucketY.update( 'note-id', { content: 'ABC' } ) ), + waitForClient( bucketX, () => bucketX.update( 'note-id', { content: 'ACD' } ) ), + ] ) ).map( msg => JSON.parse( parseMessage( msg ).data ) ); + + equal( '=1\t+B\t=1', changeY.v.content.v ); + equal( '=2\t+D', changeX.v.content.v ); + + /** + * At this point, both clients have sent a change and are waiting for the + * server to respond. Their `localQueue`s should have a `.sent['note-id']`. + * + * If a client were to update `note-id` at this moment, since it is waiting + * for the sent change to be acknowledged by the server it will indicate + * that with a `localQueue.queues['note-id']`. + */ + const [serverChange1] = [ + [ { cv: 'cv-2', ccids: [changeY.ccid], sv: 1, ev: 2, id: 'note-id', o: 'M', v: { content: { + o: 'd', v: '=1\t+B\t=1' + } } } ], + // This ccid/change is modified by the server, see: '=3\t+D' vs '=2\t+D' + [ { cv: 'cv-3', ccids: [changeX.ccid], sv: 1, ev: 2, id: 'note-id', o: 'M', v: { content: { + o: 'd', v: '=3\t+D' + } } } ], + ]; + + const notes = await Promise.all( [ + new Promise( ( resolve, reject ) => { + bucketY.channel.on( 'acknowledge', () => { + setTimeout(() => resolve(bucketY.get('note-id')), 10); + } ); + + bucketY.channel.on( 'send', (data) => { + reject(new Error( 'should not send more things' ) ); + } ); + + bucketY.channel.handleMessage( 'c:' + JSON.stringify([{ + id: 'note-id', + error: 409, + ccids: serverChange1[0].ccids, + }] ) ); + + bucketY.channel.handleMessage( 'c:' + JSON.stringify( serverChange1 ) ); + } ), + new Promise( resolve => { + bucketX.once( 'update', () => resolve( bucketX.get( 'note-id' ) ) ); + bucketX.channel.handleMessage( 'c:' + JSON.stringify( serverChange1 ) ); + } ) + ] ); + + deepEqual( + [ 'ABC', 'ABCD' ], + notes.map( note => note.data.content ), + ) + } ) + + it( 'ignores inbound changes after they have already been applied', async() => { + /** + * Scenario: + * + * Client 1 sends a change (ccid x) "AC" => "ACD" : "=2\t+D" + * Client 2 sends a change (ccid y) "AC" => "ABC" : "=1\t+B\t=1" + * + * Server accepts ccid x as is and broadcasts back to clients + * + * c:{ccids:[y],v:"=1\t+B\t=1"} + * + * Server accepts ccid y, server sees that the change needs to be modified because of x: + * + * c:{ccids:[x],v:"=3\t+D"} + * + * Client 1 and Client 2 should now have Ghosts that match. + */ + + // Two clients that need indexes downloaded + const bucketX = createBucket(); + bucketX.id = 'x'; + const bucketY = createBucket(); + bucketY.id = 'y'; + const clients = [bucketX, bucketY]; + + const responses = await Promise.all( [ + waitForClient( bucketX, () => bucketX.channel.handleMessage( 'auth:user' ) ), + waitForClient( bucketY, () => bucketY.channel.handleMessage( 'auth:user' ) ), + ] ); + + deepEqual( + Array( 2 ).fill( 'i:1:::10' ), + responses + ); + + const cvs = await Promise.all( clients.map( client => { + const indexed = new Promise( resolve => { + client.once( 'index', resolve ); + } ); + client.channel.handleMessage( 'i:' + JSON.stringify( { + index: [{ + id: 'note-id', + v: 1, + d: { content: 'AC' } + }], + current: 'cv-1', + } ) ); + return indexed; + } ) ); + + deepEqual( Array( 2 ).fill( 'cv-1' ), cvs ); + + deepEqual( + Array( 2 ).fill( { data: { content: 'AC' }, id: 'note-id' } ), + await Promise.all( clients.map( client => client.get( 'note-id' ) ) ), + ); + + const [changeY, changeX] = ( await Promise.all( [ + waitForClient( bucketY, () => bucketY.update( 'note-id', { content: 'ABC' } ) ), + waitForClient( bucketX, () => bucketX.update( 'note-id', { content: 'ACD' } ) ), + ] ) ).map( msg => JSON.parse( parseMessage( msg ).data ) ); + + equal( '=1\t+B\t=1', changeY.v.content.v ); + equal( '=2\t+D', changeX.v.content.v ); + + /** + * At this point, both clients have sent a change and are waiting for the + * server to respond. Their `localQueue`s should have a `.sent['note-id']`. + * + * If a client were to update `note-id` at this moment, since it is waiting + * for the sent change to be acknowledged by the server it will indicate + * that with a `localQueue.queues['note-id']`. + */ + const [serverChange1] = [ + [ { cv: 'cv-2', ccids: [changeY.ccid], sv: 1, ev: 2, id: 'note-id', o: 'M', v: { content: { + o: 'd', v: '=1\t+B\t=1' + } } } ], + // This ccid/change is modified by the server, see: '=3\t+D' vs '=2\t+D' + [ { cv: 'cv-3', ccids: [changeX.ccid], sv: 1, ev: 2, id: 'note-id', o: 'M', v: { content: { + o: 'd', v: '=3\t+D' + } } } ], + ]; + + const notes = await Promise.all( [ + new Promise( ( resolve, reject ) => { + bucketY.channel.on( 'acknowledge', () => { + setTimeout(() => resolve(bucketY.get('note-id')), 10); + } ); + + bucketY.channel.on( 'send', (data) => { + reject(new Error( 'should not send more things' ) ); + } ); + + bucketY.channel.handleMessage( 'c:' + JSON.stringify( serverChange1 ) ); + bucketY.channel.handleMessage('c:' + JSON.stringify(serverChange1)); + } ), + new Promise( resolve => { + bucketX.once( 'update', () => resolve( bucketX.get( 'note-id' ) ) ); + bucketX.channel.handleMessage( 'c:' + JSON.stringify( serverChange1 ) ); + } ) + ] ); + + deepEqual( + [ 'ABC', 'ABCD' ], + notes.map( note => note.data.content ), + ) + } ) } ); function waitForClient( client, action ) {