Skip to content

Commit

Permalink
Send client state to cluster in re-connections[API-1644] (#1415)
Browse files Browse the repository at this point in the history
* Make ReadResultSet iterable [API-1315]

* Make ReadResultSet iterable [API-1315]

* Changes made according to comments on PR.[API-1315]

* Send client state to cluster in re-connections [API-1644]

* New test added related to check InitializeClientOnCluster

* New test added

* lint problem fixed
  • Loading branch information
harunalpak committed Nov 28, 2022
1 parent 53fe97b commit 60db202
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 26 deletions.
16 changes: 15 additions & 1 deletion src/network/ConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ export class ConnectionManager extends EventEmitter {
* counter, but may not show the latest total.
*/
private totalBytesWritten : number;
private establishedInitialClusterConnection: boolean;

constructor(
private readonly client: ClientForConnectionManager,
Expand Down Expand Up @@ -833,10 +834,23 @@ export class ConnectionManager extends EventEmitter {
this.connectionRegistry.setConnection(response.memberUuid, connection);
if (connectionsEmpty) {
this.clusterId = newClusterId;
if (clusterIdChanged) {
if (this.establishedInitialClusterConnection) {
// In split brain, the client might connect to the one half
// of the cluster, and then later might reconnect to the
// other half, after the half it was connected to is
// completely dead. Since the cluster id is preserved in
// split brain scenarios, it is impossible to distinguish
// reconnection to the same cluster vs reconnection to the
// other half of the split brain. However, in the latter,
// we might need to send some state to the other half of
// the split brain (like Compact schemas or user code
// deployment classes). That forces us to send the client
// state to the cluster after the first cluster connection,
// regardless the cluster id is changed or not.
this.connectionRegistry.setClientState(ClientState.CONNECTED_TO_CLUSTER);
this.initializeClientOnCluster(newClusterId);
} else {
this.establishedInitialClusterConnection = true;
this.connectionRegistry.setClientState(ClientState.INITIALIZED_ON_CLUSTER);
this.emitLifecycleEvent(LifecycleState.CONNECTED);
}
Expand Down
18 changes: 18 additions & 0 deletions test/TestUtil.js
Original file line number Diff line number Diff line change
Expand Up @@ -639,3 +639,21 @@ exports.calculateServerVersionFromString = (versionString) => {
return isNaN(version) ? BuildInfo.UNKNOWN_VERSION_ID : version;
};

/**
* This function will wait for the connections count to be equal to given parameter (connectionCount).
*/
exports.waitForConnectionCount = async (client, connectionCount) => {
let getConnectionsFn;
if (this.isClientVersionAtLeast('4.2')) {
const clientRegistry = client.connectionRegistry;
getConnectionsFn = clientRegistry.getConnections.bind(clientRegistry);
} else {
const connManager = client.getConnectionManager();
getConnectionsFn = connManager.getActiveConnections.bind(connManager);
}

await this.assertTrueEventually(async () => {
expect(getConnectionsFn().length).to.be.equal(connectionCount);
});
};

62 changes: 37 additions & 25 deletions test/integration/backward_compatible/serial/ClientReconnectTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
const { expect } = require('chai');
const RC = require('../../RC');
const TestUtil = require('../../../TestUtil');
const sinon = require('sinon');
const sandbox = sinon.createSandbox();
const { ConnectionManager } = require('../../../../lib/network/ConnectionManager');

/**
* Basic tests for reconnection to cluster scenarios.
Expand All @@ -26,28 +29,6 @@ describe('ClientReconnectTest', function () {
let cluster;
let client;

/**
* Waits for disconnection. getMap(), map.put() messages are not retryable. If terminateMember does not
* close the client connection immediately it is possible for the client to realize that later when map.put
* or getMap invocation started. In that case, the connection will be closed with TargetDisconnectedError.
* Because these client messages are not retryable, the invocation will be rejected with an error, leading
* to flaky tests. To avoid that, this function will wait for the connections count to be zero.
*/
const waitForDisconnection = async (client) => {
let getConnectionsFn;
if (TestUtil.isClientVersionAtLeast('4.2')) {
const clientRegistry = client.connectionRegistry;
getConnectionsFn = clientRegistry.getConnections.bind(clientRegistry);
} else {
const connManager = client.getConnectionManager();
getConnectionsFn = connManager.getActiveConnections.bind(connManager);
}

await TestUtil.assertTrueEventually(async () => {
expect(getConnectionsFn()).to.be.empty;
});
};

const testFactory = new TestUtil.TestFactory();

beforeEach(function () {
Expand All @@ -59,6 +40,37 @@ describe('ClientReconnectTest', function () {
await testFactory.shutdownAll();
});

it('should send the client state to the cluster after reconnections, ' +
+'regardless it is connected back to possibly the same cluster with the same id or not.', async function () {
const fakeInitializeClientOnCluster = sandbox.replace(
ConnectionManager.prototype,
'initializeClientOnCluster',
sandbox.fake(ConnectionManager.prototype.initializeClientOnCluster)
);
cluster = await testFactory.createClusterForSerialTests();
const member = await RC.startMember(cluster.id);
client = await testFactory.newHazelcastClientForSerialTests({
clusterName: cluster.id,
properties: {
'hazelcast.client.heartbeat.interval': 1000,
'hazelcast.client.heartbeat.timeout': 3000
}
});
await RC.terminateMember(cluster.id, member.uuid);
await TestUtil.waitForConnectionCount(client, 0);
await RC.startMember(cluster.id);
await TestUtil.waitForConnectionCount(client, 1);
fakeInitializeClientOnCluster.callCount.should.be.eq(1);
});

/**
* getMap(), map.put() messages are not retryable. If terminateMember does not
* close the client connection immediately it is possible for the client to realize that later when map.put
* or getMap invocation started. In that case, the connection will be closed with TargetDisconnectedError.
* Because these client messages are not retryable, the invocation will be rejected with an error, leading
* to flaky tests. To avoid that, we use the "TestUtil.waitForConnectionCount" function
* to wait for disconnection in the tests below.
*/
it('member restarts, while map.put in progress', async function () {
cluster = await testFactory.createClusterForSerialTests();
const member = await RC.startMember(cluster.id);
Expand All @@ -72,7 +84,7 @@ describe('ClientReconnectTest', function () {
const map = await client.getMap('test');

await RC.terminateMember(cluster.id, member.uuid);
await waitForDisconnection(client);
await TestUtil.waitForConnectionCount(client, 0);
await RC.startMember(cluster.id);

await map.put('testkey', 'testvalue');
Expand All @@ -95,7 +107,7 @@ describe('ClientReconnectTest', function () {
});
const map = await client.getMap('test');
await RC.terminateMember(cluster.id, member.uuid);
await waitForDisconnection(client);
await TestUtil.waitForConnectionCount(client, 0);

const promise = map.put('testkey', 'testvalue').then(() => {
return map.get('testkey');
Expand All @@ -119,7 +131,7 @@ describe('ClientReconnectTest', function () {
}
});
await RC.terminateMember(cluster.id, member.uuid);
await waitForDisconnection(client);
await TestUtil.waitForConnectionCount(client, 0);

let map;

Expand Down

0 comments on commit 60db202

Please sign in to comment.