Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion RELEASE-NOTES.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# Changelog

## 1.1.0
## 1.1.0 - 1.1.1

- Increase usability in other projects by separating `node` and browser modules [#100](https://github.com/Simperium/node-simperium/pull/100)
- Stop applying changes that have already been applied to an entity [#101](https://github.com/Simperium/node-simperium/pull/101)

## 1.0.4

Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "simperium",
"version": "1.1.0",
"version": "1.1.1",
"description": "A simperium client for node.js",
"main": "./lib/simperium/index.js",
"browser": {
Expand Down
10 changes: 10 additions & 0 deletions src/simperium/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ internal.updateAcknowledged = function( change ) {
};

internal.findAcknowledgedChange = function( change ) {
if (this.localQueue.seenChanges.has(change.id)) {
return this.localQueue.seenChanges.get(change.id);
}

var possibleChange = this.localQueue.sent[change.id];
if ( possibleChange ) {
if ( ( change.ccids || [] ).indexOf( possibleChange.ccid ) > -1 ) {
Expand Down Expand Up @@ -175,6 +179,10 @@ internal.applyChange = function( change, ghost ) {
}

if ( change.o === operation.MODIFY ) {
if ( ghost && ghost.version >= change.ev ) {
return;
}

if ( ghost && ( ghost.version !== change.sv ) ) {
internal.requestObjectVersion.call( this, change.id, change.sv ).then( data => {
internal.applyChange.call( this, change, { version: change.sv, data } )
Expand Down Expand Up @@ -714,6 +722,7 @@ Queue.prototype.run = function() {
function LocalQueue( store ) {
this.store = store;
this.sent = {};
this.seenChanges = new Map();
this.queues = {};
this.ready = false;
}
Expand All @@ -734,6 +743,7 @@ LocalQueue.prototype.pause = function() {

LocalQueue.prototype.acknowledge = function( change ) {
if ( this.sent[change.id] === change ) {
this.seenChanges.set( change.id, change );
delete this.sent[change.id];
}

Expand Down
221 changes: 221 additions & 0 deletions test/simperium/crossed_wires_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,227 @@ describe( 'Crossed Wires', () => {
notes.map( note => note.data.content ),
)
} )

it( 'ignores ccid after receiving a 409 for it', async() => {
/**
* Scenario:
*
* Client 1 sends a change (ccid x) "AC" => "ACD" : "=2\t+D"
* Client 2 sends a change (ccid y) "AC" => "ABC" : "=1\t+B\t=1"
*
* Server accepts ccid x as is and broadcasts back to clients
*
* c:{ccids:[y],v:"=1\t+B\t=1"}
*
* Server accepts ccid y, server sees that the change needs to be modified because of x:
*
* c:{ccids:[x],v:"=3\t+D"}
*
* Client 1 and Client 2 should now have Ghosts that match.
*/

// Two clients that need indexes downloaded
const bucketX = createBucket();
bucketX.id = 'x';
const bucketY = createBucket();
bucketY.id = 'y';
const clients = [bucketX, bucketY];

const responses = await Promise.all( [
waitForClient( bucketX, () => bucketX.channel.handleMessage( 'auth:user' ) ),
waitForClient( bucketY, () => bucketY.channel.handleMessage( 'auth:user' ) ),
] );

deepEqual(
Array( 2 ).fill( 'i:1:::10' ),
responses
);

const cvs = await Promise.all( clients.map( client => {
const indexed = new Promise( resolve => {
client.once( 'index', resolve );
} );
client.channel.handleMessage( 'i:' + JSON.stringify( {
index: [{
id: 'note-id',
v: 1,
d: { content: 'AC' }
}],
current: 'cv-1',
} ) );
return indexed;
} ) );

deepEqual( Array( 2 ).fill( 'cv-1' ), cvs );

deepEqual(
Array( 2 ).fill( { data: { content: 'AC' }, id: 'note-id' } ),
await Promise.all( clients.map( client => client.get( 'note-id' ) ) ),
);

const [changeY, changeX] = ( await Promise.all( [
waitForClient( bucketY, () => bucketY.update( 'note-id', { content: 'ABC' } ) ),
waitForClient( bucketX, () => bucketX.update( 'note-id', { content: 'ACD' } ) ),
] ) ).map( msg => JSON.parse( parseMessage( msg ).data ) );

equal( '=1\t+B\t=1', changeY.v.content.v );
equal( '=2\t+D', changeX.v.content.v );

/**
* At this point, both clients have sent a change and are waiting for the
* server to respond. Their `localQueue`s should have a `.sent['note-id']`.
*
* If a client were to update `note-id` at this moment, since it is waiting
* for the sent change to be acknowledged by the server it will indicate
* that with a `localQueue.queues['note-id']`.
*/
const [serverChange1] = [
[ { cv: 'cv-2', ccids: [changeY.ccid], sv: 1, ev: 2, id: 'note-id', o: 'M', v: { content: {
o: 'd', v: '=1\t+B\t=1'
} } } ],
// This ccid/change is modified by the server, see: '=3\t+D' vs '=2\t+D'
[ { cv: 'cv-3', ccids: [changeX.ccid], sv: 1, ev: 2, id: 'note-id', o: 'M', v: { content: {
o: 'd', v: '=3\t+D'
} } } ],
];

const notes = await Promise.all( [
new Promise( ( resolve, reject ) => {
bucketY.channel.on( 'acknowledge', () => {
setTimeout(() => resolve(bucketY.get('note-id')), 10);
} );

bucketY.channel.on( 'send', (data) => {
reject(new Error( 'should not send more things' ) );
} );

bucketY.channel.handleMessage( 'c:' + JSON.stringify([{
id: 'note-id',
error: 409,
ccids: serverChange1[0].ccids,
}] ) );

bucketY.channel.handleMessage( 'c:' + JSON.stringify( serverChange1 ) );
} ),
new Promise( resolve => {
bucketX.once( 'update', () => resolve( bucketX.get( 'note-id' ) ) );
bucketX.channel.handleMessage( 'c:' + JSON.stringify( serverChange1 ) );
} )
] );

deepEqual(
[ 'ABC', 'ABCD' ],
notes.map( note => note.data.content ),
)
} )

it( 'ignores inbound changes after they have already been applied', async() => {
/**
* Scenario:
*
* Client 1 sends a change (ccid x) "AC" => "ACD" : "=2\t+D"
* Client 2 sends a change (ccid y) "AC" => "ABC" : "=1\t+B\t=1"
*
* Server accepts ccid x as is and broadcasts back to clients
*
* c:{ccids:[y],v:"=1\t+B\t=1"}
*
* Server accepts ccid y, server sees that the change needs to be modified because of x:
*
* c:{ccids:[x],v:"=3\t+D"}
*
* Client 1 and Client 2 should now have Ghosts that match.
*/

// Two clients that need indexes downloaded
const bucketX = createBucket();
bucketX.id = 'x';
const bucketY = createBucket();
bucketY.id = 'y';
const clients = [bucketX, bucketY];

const responses = await Promise.all( [
waitForClient( bucketX, () => bucketX.channel.handleMessage( 'auth:user' ) ),
waitForClient( bucketY, () => bucketY.channel.handleMessage( 'auth:user' ) ),
] );

deepEqual(
Array( 2 ).fill( 'i:1:::10' ),
responses
);

const cvs = await Promise.all( clients.map( client => {
const indexed = new Promise( resolve => {
client.once( 'index', resolve );
} );
client.channel.handleMessage( 'i:' + JSON.stringify( {
index: [{
id: 'note-id',
v: 1,
d: { content: 'AC' }
}],
current: 'cv-1',
} ) );
return indexed;
} ) );

deepEqual( Array( 2 ).fill( 'cv-1' ), cvs );

deepEqual(
Array( 2 ).fill( { data: { content: 'AC' }, id: 'note-id' } ),
await Promise.all( clients.map( client => client.get( 'note-id' ) ) ),
);

const [changeY, changeX] = ( await Promise.all( [
waitForClient( bucketY, () => bucketY.update( 'note-id', { content: 'ABC' } ) ),
waitForClient( bucketX, () => bucketX.update( 'note-id', { content: 'ACD' } ) ),
] ) ).map( msg => JSON.parse( parseMessage( msg ).data ) );

equal( '=1\t+B\t=1', changeY.v.content.v );
equal( '=2\t+D', changeX.v.content.v );

/**
* At this point, both clients have sent a change and are waiting for the
* server to respond. Their `localQueue`s should have a `.sent['note-id']`.
*
* If a client were to update `note-id` at this moment, since it is waiting
* for the sent change to be acknowledged by the server it will indicate
* that with a `localQueue.queues['note-id']`.
*/
const [serverChange1] = [
[ { cv: 'cv-2', ccids: [changeY.ccid], sv: 1, ev: 2, id: 'note-id', o: 'M', v: { content: {
o: 'd', v: '=1\t+B\t=1'
} } } ],
// This ccid/change is modified by the server, see: '=3\t+D' vs '=2\t+D'
[ { cv: 'cv-3', ccids: [changeX.ccid], sv: 1, ev: 2, id: 'note-id', o: 'M', v: { content: {
o: 'd', v: '=3\t+D'
} } } ],
];

const notes = await Promise.all( [
new Promise( ( resolve, reject ) => {
bucketY.channel.on( 'acknowledge', () => {
setTimeout(() => resolve(bucketY.get('note-id')), 10);
} );

bucketY.channel.on( 'send', (data) => {
reject(new Error( 'should not send more things' ) );
} );

bucketY.channel.handleMessage( 'c:' + JSON.stringify( serverChange1 ) );
bucketY.channel.handleMessage('c:' + JSON.stringify(serverChange1));
} ),
new Promise( resolve => {
bucketX.once( 'update', () => resolve( bucketX.get( 'note-id' ) ) );
bucketX.channel.handleMessage( 'c:' + JSON.stringify( serverChange1 ) );
} )
] );

deepEqual(
[ 'ABC', 'ABCD' ],
notes.map( note => note.data.content ),
)
} )
} );

function waitForClient( client, action ) {
Expand Down