Skip to content

Commit

Permalink
Merge pull request #404 from ably/connection-status-propagation
Browse files Browse the repository at this point in the history
core/connection: dont cancel transient disconnect timer with connecting status
  • Loading branch information
AndyTWF authored Nov 27, 2024
2 parents c85fe27 + 363742a commit bef2c15
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 66 deletions.
41 changes: 36 additions & 5 deletions src/core/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,21 @@ export class DefaultConnection extends EventEmitter<ConnectionEventsMap> impleme
// If we're in the disconnected state, assume it's transient and set a timeout to propagate the change
if (chatState === ConnectionStatus.Disconnected && !this._transientTimeout) {
this._transientTimeout = setTimeout(() => {
this._transientTimeout = undefined;
this._applyStatusChange(stateChange);
this._onTransientDisconnectTimeout(stateChange);
}, TRANSIENT_TIMEOUT);
return;
}

// If we're in any state other than disconnected, and we have a transient timeout, clear it
if (this._transientTimeout) {
clearTimeout(this._transientTimeout);
this._transientTimeout = undefined;
// If we're in the connecting state, or disconnected state, and we have a transient timeout, we should ignore it -
// if we can reach connected in a reasonable time, we can assume the disconnect was transient and suppress the
// change
if (chatState === ConnectionStatus.Connecting || chatState === ConnectionStatus.Disconnected) {
this._logger.debug('ignoring transient state due to transient disconnect timeout', stateChange);
return;
}

this._cancelTransientDisconnectTimeout();
}

this._applyStatusChange(stateChange);
Expand Down Expand Up @@ -224,4 +229,30 @@ export class DefaultConnection extends EventEmitter<ConnectionEventsMap> impleme
}
}
}

/**
* Handles a transient disconnect timeout.
*
* @param statusChange The change in status.
*/
private _onTransientDisconnectTimeout(statusChange: ConnectionStatusChange): void {
this._logger.debug('transient disconnect timeout reached');
this._cancelTransientDisconnectTimeout();

// When we apply the status change, we should apply whatever the current state is at the time
this._applyStatusChange({
...statusChange,
current: this._mapAblyStatusToChat(this._connection.state),
});
}

/**
* Cancels the transient disconnect timeout.
*/
private _cancelTransientDisconnectTimeout(): void {
if (this._transientTimeout) {
clearTimeout(this._transientTimeout);
this._transientTimeout = undefined;
}
}
}
200 changes: 139 additions & 61 deletions test/core/connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as Ably from 'ably';
import { beforeEach, describe, expect, it, vi } from 'vitest';

import { ErrorInfo } from '../../__mocks__/ably/index.ts';
import { ConnectionStatus, DefaultConnection } from '../../src/core/connection.ts';
import { ConnectionStatus, ConnectionStatusChange, DefaultConnection } from '../../src/core/connection.ts';
import { makeTestLogger } from '../helper/logger.ts';

interface TestContext {
Expand Down Expand Up @@ -59,6 +59,9 @@ describe('connection', () => {
context.channelLevelListeners.add(listener);

context.emulateStateChange = (stateChange: Ably.ConnectionStateChange) => {
vi.spyOn(connection, 'state', 'get').mockReturnValue(stateChange.current);
vi.spyOn(connection, 'errorReason', 'get').mockReturnValue(stateChange.reason as ErrorInfo);

for (const cb of context.channelLevelListeners) {
cb(stateChange);
}
Expand Down Expand Up @@ -195,71 +198,146 @@ describe('connection', () => {
},
);

it<TestContext>('handles transient disconnections', (context) =>
new Promise<void>((done) => {
// Start the channel in a connected state
vi.spyOn(context.realtime.connection, 'state', 'get').mockReturnValue('connected');
vi.spyOn(context.realtime.connection, 'errorReason', 'get').mockReturnValue(
new Ably.ErrorInfo('error', 500, 99999),
);
it<TestContext>('handles transient disconnections', async (context) => {
// Start the channel in a connected state
vi.spyOn(context.realtime.connection, 'state', 'get').mockReturnValue('connected');
vi.spyOn(context.realtime.connection, 'errorReason', 'get').mockReturnValue(
new Ably.ErrorInfo('error', 500, 99999),
);

const connection = new DefaultConnection(context.realtime, makeTestLogger());
expect(connection.status).toEqual(ConnectionStatus.Connected);
const connection = new DefaultConnection(context.realtime, makeTestLogger());
expect(connection.status).toEqual(ConnectionStatus.Connected);

// Set a listener that stores the state change
const stateChanges: ConnectionStatus[] = [];
connection.onStatusChange((status) => {
stateChanges.push(status.current);
});
// Set a listener that stores the state change
const stateChanges: ConnectionStatus[] = [];
connection.onStatusChange((status) => {
stateChanges.push(status.current);
});

// Transition to a disconnected state
context.emulateStateChange({ current: 'disconnected', previous: 'connected' });
// Transition to a disconnected state
context.emulateStateChange({ current: 'disconnected', previous: 'connected' });

// Wait for 3 seconds (well below the transient timeout)
void new Promise((resolve) => setTimeout(resolve, 3000)).then(() => {
// Transition back to a connected state
context.emulateStateChange({ current: 'connected', previous: 'disconnected' });
// Wait for 3 seconds (well below the transient timeout)
await new Promise((resolve) => setTimeout(resolve, 3000)).then(() => {
// Transition back to a connected state
context.emulateStateChange({ current: 'connected', previous: 'disconnected' });

// Assert that we have only seen the connected state
expect(stateChanges).toEqual([]);
// Assert that we have only seen the connected state
expect(stateChanges).toEqual([]);
});
});

done();
});
}));
it<TestContext>('handles transient disconnections with intermediate state changes', async (context) => {
// Start the channel in a connected state
vi.spyOn(context.realtime.connection, 'state', 'get').mockReturnValue('connected');
vi.spyOn(context.realtime.connection, 'errorReason', 'get').mockReturnValue(
new Ably.ErrorInfo('error', 500, 99999),
);

it<TestContext>(
'emits disconnections after a period of time',
(context) =>
new Promise<void>((done) => {
// Start the channel in a connected state
vi.spyOn(context.realtime.connection, 'state', 'get').mockReturnValue('connected');
vi.spyOn(context.realtime.connection, 'errorReason', 'get').mockReturnValue(
new Ably.ErrorInfo('error', 500, 99999),
);

const connection = new DefaultConnection(context.realtime, makeTestLogger());
expect(connection.status).toEqual(ConnectionStatus.Connected);

// Set a listener that stores the state change
const stateChanges: ConnectionStatus[] = [];
connection.onStatusChange((status) => {
stateChanges.push(status.current);
});

// Transition to a disconnected state
context.emulateStateChange({ current: 'disconnected', previous: 'connected' });

// Wait for longer than the transient timeout
void new Promise((resolve) => setTimeout(resolve, 6000)).then(() => {
// Transition back to a connected state
context.emulateStateChange({ current: 'connected', previous: 'disconnected' });

// Assert that we have only seen the connected state
expect(stateChanges).toEqual([ConnectionStatus.Disconnected, ConnectionStatus.Connected]);

done();
});
}),
10000,
);
const connection = new DefaultConnection(context.realtime, makeTestLogger());
expect(connection.status).toEqual(ConnectionStatus.Connected);

// Set a listener that stores the state change
const stateChanges: ConnectionStatus[] = [];
connection.onStatusChange((status) => {
stateChanges.push(status.current);
});

// Transition to a disconnected state
const disconnectError = new Ably.ErrorInfo('error', 500, 99999);
context.emulateStateChange({ current: 'disconnected', previous: 'connected', reason: disconnectError });

// Now transition to a connecting state
context.emulateStateChange({ current: 'connecting', previous: 'disconnected' });

// Wait for 3 seconds (well below the transient timeout)
await new Promise((resolve) => setTimeout(resolve, 3000)).then(() => {
// Transition back to a connected state
context.emulateStateChange({ current: 'connected', previous: 'disconnected' });

// Assert that we have only seen the connected state
expect(stateChanges).toEqual([]);
});
});

it<TestContext>('emits disconnections after a period of time', async (context) => {
// Start the channel in a connected state
vi.spyOn(context.realtime.connection, 'state', 'get').mockReturnValue('connected');
vi.spyOn(context.realtime.connection, 'errorReason', 'get').mockReturnValue(
new Ably.ErrorInfo('error', 500, 99999),
);

const connection = new DefaultConnection(context.realtime, makeTestLogger());
expect(connection.status).toEqual(ConnectionStatus.Connected);

// Set a listener that stores the state change
const stateChanges: ConnectionStatusChange[] = [];
connection.onStatusChange((status) => {
stateChanges.push(status);
});

// Transition to a disconnected state
const disconnectError = new Ably.ErrorInfo('error', 500, 99999);
context.emulateStateChange({ current: 'disconnected', previous: 'connected', reason: disconnectError });

// Wait for longer than the transient timeout
await new Promise((resolve) => setTimeout(resolve, 6000)).then(() => {
// Transition back to a connected state
context.emulateStateChange({ current: 'connected', previous: 'disconnected' });

// Assert that we have only seen the connected state
expect(stateChanges.map((change) => change.current)).toEqual([
ConnectionStatus.Disconnected,
ConnectionStatus.Connected,
]);

// The first change should have the disconnect error
expect(stateChanges[0]?.error).toEqual(disconnectError);
});
}, 10000);

it<TestContext>('emits disconnections after a period of time to an intermediate state', async (context) => {
// Start the channel in a connected state
vi.spyOn(context.realtime.connection, 'state', 'get').mockReturnValue('connected');
vi.spyOn(context.realtime.connection, 'errorReason', 'get').mockReturnValue(
new Ably.ErrorInfo('error', 500, 99999),
);

const connection = new DefaultConnection(context.realtime, makeTestLogger());
expect(connection.status).toEqual(ConnectionStatus.Connected);

// Set a listener that stores the state change
const stateChanges: ConnectionStatusChange[] = [];
connection.onStatusChange((status) => {
stateChanges.push(status);
});

// Transition to a disconnected state
const disconnectError = new Ably.ErrorInfo('error', 500, 99999);
context.emulateStateChange({ current: 'disconnected', previous: 'connected', reason: disconnectError });

// Now transition to a connecting state
context.emulateStateChange({ current: 'connecting', previous: 'disconnected' });

// Now transition to a disconnected state
context.emulateStateChange({ current: 'disconnected', previous: 'connecting' });

// Now transition to a connecting state
context.emulateStateChange({ current: 'connecting', previous: 'disconnected' });

// Wait for longer than the transient timeout
await new Promise((resolve) => setTimeout(resolve, 8000)).then(() => {
// Transition back to a connected state
context.emulateStateChange({ current: 'connected', previous: 'connecting' });

// Assert that we have only seen the connected state
expect(stateChanges.map((change) => change.current)).toEqual([
ConnectionStatus.Connecting,
ConnectionStatus.Connected,
]);

// The first change should have the disconnect error
expect(stateChanges[0]?.error).toEqual(disconnectError);
});
}, 10000);
});

0 comments on commit bef2c15

Please sign in to comment.