Skip to content
This repository has been archived by the owner on Feb 7, 2024. It is now read-only.

Inconsistent behavior when using replication #778

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
70158a0
ensure that channel exists across instances when using redis manager …
mdprotacio Jun 2, 2021
c218a95
handle missing channel on server1 not propagating message to server2
mdprotacio Jun 2, 2021
2fe5fca
check against the list of channels in redis
mdprotacio Jun 2, 2021
5c21b8a
trying to fix
mdprotacio Jun 2, 2021
96f53bc
sync channel list across servers
mdprotacio Jun 2, 2021
7fbd37c
added documentation
mdprotacio Jun 2, 2021
374d686
channel should always be registered to propagate across servers
mdprotacio Jun 2, 2021
fb27ded
fixes on broadcasting to everyone
mdprotacio Jun 3, 2021
e12aae5
do not replicate on call to broadcast as it is already broadcasted ac…
mdprotacio Jun 3, 2021
d255a4e
include serverId in socketId
mdprotacio Jun 3, 2021
7ba243b
Revert "include serverId in socketId"
mdprotacio Jun 3, 2021
92eb8c3
remove channel from set only if connections is less than 1
mdprotacio Jun 3, 2021
2964c3f
update lastPongedAt whenever we send message to the connection to avo…
mdprotacio Jun 3, 2021
5619003
consider connection as ponged for every message sent to connection
mdprotacio Jun 3, 2021
1be80f1
mark connection as ponged to update lastPongedAt and avoid cleanup fo…
mdprotacio Jun 3, 2021
5ad16c1
trying to fix issues with pong on server message
mdprotacio Jun 3, 2021
eee83fe
mark connection as ponged prior sending message
mdprotacio Jun 3, 2021
c70b032
mark as ponged prior sending message
mdprotacio Jun 3, 2021
93bdc9d
add space after function keyword
mdprotacio Jun 3, 2021
2ca34a7
update connection on presence channel to be ponged upon subscription
mdprotacio Jun 3, 2021
1bb434d
update connection on presence channel to be ponged upon subscription
mdprotacio Jun 3, 2021
86e73b5
fixes linting errors
mdprotacio Jun 3, 2021
5ccae68
fixes linting errors
mdprotacio Jun 3, 2021
7cae46a
use pubsub to update lastPongedAt across servers
mdprotacio Jun 4, 2021
ad6d345
when a message is sent or received, let's update lastPongedAt
mdprotacio Jun 4, 2021
e3f0add
trying to fix issues with socket not in set
mdprotacio Jun 7, 2021
ebf00d3
pong only the receiver for broadcasting to avoid having to keep the m…
mdprotacio Jun 7, 2021
b9ce798
fixes spacing issues
mdprotacio Jun 7, 2021
00568fc
fixes more spacing issues
mdprotacio Jun 7, 2021
d481ab5
handle locks properly for connection cleanup
mdprotacio Jun 8, 2021
a298fcf
added space after try
mdprotacio Jun 8, 2021
33cce95
Merge branch 'master' of github.com:koalaphils/laravel-websockets
mdprotacio Jun 8, 2021
7bc0957
space after the try brace
mdprotacio Jun 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions src/ChannelManagers/LocalChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,7 @@ public function getMemberSockets($userId, $appId, $channelName): PromiseInterfac
*/
public function connectionPonged(ConnectionInterface $connection): PromiseInterface
{
$connection->lastPongedAt = Carbon::now();

return $this->updateConnectionInChannels($connection);
return $this->pongConnectionInChannels($connection);
}

/**
Expand Down Expand Up @@ -460,6 +458,27 @@ public function removeObsoleteConnections(): PromiseInterface
);
}

/**
* Pong connection in channels.
*
* @param ConnectionInterface $connection
* @return PromiseInterface[bool]
*/
public function pongConnectionInChannels(ConnectionInterface $connection): PromiseInterface
{
return $this->getLocalChannels($connection->app->id)
->then(function ($channels) use ($connection) {
foreach ($channels as $channel) {
if ($conn = $channel->getConnection($connection->socketId)) {
$conn->lastPongedAt = Carbon::now();
$channel->saveConnection($conn);
}
}

return true;
});
}

/**
* Update the connection in all channels.
*
Expand Down
71 changes: 55 additions & 16 deletions src/ChannelManagers/RedisChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -145,25 +145,14 @@ public function subscribeToChannel(ConnectionInterface $connection, string $chan
*/
public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
{
return $this->getGlobalConnectionsCount($connection->app->id, $channelName)
return $this->decrementSubscriptionsCount($connection->app->id, $channelName)
->then(function ($count) use ($connection, $channelName) {
if ($count === 0) {
// Make sure to not stay subscribed to the PubSub topic
// if there are no connections.
// If the total connections count gets to 0 after unsubscribe,
// try again to check & unsubscribe from the PubSub topic if needed.
if ($count < 1) {
$this->unsubscribeFromTopic($connection->app->id, $channelName);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mdprotacio hey! I am still fighting with one issue, this is not exactly related with your PR, as problem also persists on latest 2.0.0-beta.36 but I am hoping you can help me
what currently I am able to debug, is that after execution unsubscribeFromTopic() on at least one of ws server nodes first message/-s on client side is not received, commenting out this line - problem solved, kinda, but what is consequences? and maybe you will better understand why is that happening

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I perfectly understand your issue. But it appears that maybe when you have unsubscribed from one node, the same is not propagated across other nodes. This may happen if the client is subscribed or connected using the other node but the unsubscribe happened on the other node. This, from what I can recall should have been cleaned up by the removeConnectionFromSet that is the next then of this block. It may be possible that unsubscribeFromTopic may have produced a failure and will stop processing the promise chain and may not remove the connection from the set.

$this->removeChannelFromSet($connection->app->id, $channelName);
}

$this->decrementSubscriptionsCount($connection->app->id, $channelName)
->then(function ($count) use ($connection, $channelName) {
// If the total connections count gets to 0 after unsubscribe,
// try again to check & unsubscribe from the PubSub topic if needed.
if ($count < 1) {
$this->unsubscribeFromTopic($connection->app->id, $channelName);
}
});
})
->then(function () use ($connection, $channelName) {
return $this->removeChannelFromSet($connection->app->id, $channelName);
})
->then(function () use ($connection) {
return $this->removeConnectionFromSet($connection);
Expand Down Expand Up @@ -363,6 +352,16 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf
{
// This will update the score with the current timestamp.
return $this->addConnectionToSet($connection, Carbon::now())
->then(function () use ($connection) {
$payload = [
'socketId' => $connection->socketId,
'appId' => $connection->app->id,
'serverId' => $this->getServerId(),
];

return $this->publishClient
->publish($this->getPongRedisHash($connection->app->id), json_encode($payload));
})
->then(function () use ($connection) {
return parent::connectionPonged($connection);
});
Expand Down Expand Up @@ -404,6 +403,12 @@ public function onMessage(string $redisChannel, string $payload)
return;
}

if ($redisChannel == $this->getPongRedisHash($payload->appId)) {
$connection = $this->fakeConnectionForApp($payload->appId, $payload->socketId);

return parent::connectionPonged($connection);
}

$payload->channel = Str::after($redisChannel, "{$payload->appId}:");

if (! $channel = $this->find($payload->appId, $payload->channel)) {
Expand All @@ -429,6 +434,16 @@ public function onMessage(string $redisChannel, string $payload)
$channel->broadcastLocallyToEveryoneExcept($payload, $socketId, $appId);
}

public function find($appId, string $channel)
{
if (! $channelInstance = parent::find($appId, $channel)) {
$class = $this->getChannelClassName($channel);
$this->channels[$appId][$channel] = new $class($channel);
}

return parent::find($appId, $channel);
}

/**
* Build the Redis connection URL from Laravel database config.
*
Expand Down Expand Up @@ -601,6 +616,20 @@ public function removeChannelFromSet($appId, string $channel): PromiseInterface
);
}

/**
* Check if channel is on the list.
*
* @param string|int $appId
* @param string $channel
* @return PromiseInterface
*/
public function isChannelInSet($appId, string $channel): PromiseInterface
{
return $this->publishClient->sismember(
$this->getChannelsRedisHash($appId), $channel
);
}

/**
* Set data for a topic. Might be used for the presence channels.
*
Expand Down Expand Up @@ -729,6 +758,16 @@ public function getRedisKey($appId = null, string $channel = null, array $suffix
return $hash;
}

/**
* Get the pong Redis hash.
*
* @param string|int $appId
*/
public function getPongRedisHash($appId): string
{
return $this->getRedisKey($appId, null, ['pong']);
}

/**
* Get the statistics Redis hash.
*
Expand Down
15 changes: 14 additions & 1 deletion src/Channels/Channel.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ public function getConnections()
return $this->connections;
}

/**
* Get connection by socketId.
*
* @param string socketId
* @return ?ConnectionInterface
*/
public function getConnection(string $socketId): ?ConnectionInterface
{
return $this->connections[$socketId] ?? null;
}

/**
* Check if the channel has connections.
*
Expand Down Expand Up @@ -158,6 +169,7 @@ public function broadcast($appId, stdClass $payload, bool $replicate = true): bo
collect($this->getConnections())
->each(function ($connection) use ($payload) {
$connection->send(json_encode($payload));
$this->channelManager->connectionPonged($connection);
});

if ($replicate) {
Expand Down Expand Up @@ -195,12 +207,13 @@ public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId,
}

if (is_null($socketId)) {
return $this->broadcast($appId, $payload, $replicate);
return $this->broadcast($appId, $payload, false);
}

collect($this->getConnections())->each(function (ConnectionInterface $connection) use ($socketId, $payload) {
if ($connection->socketId !== $socketId) {
$connection->send(json_encode($payload));
$this->channelManager->connectionPonged($connection);
}
});

Expand Down