Skip to content
This repository has been archived by the owner on Feb 7, 2024. It is now read-only.

Commit

Permalink
Replication fix amendment to #778 (#881)
Browse files Browse the repository at this point in the history
* fixes on replication

* trying to fix #778 #issuecomment-907319726

* code spacing fixes

* codestyle fixes

* trigger workflow locally
  • Loading branch information
mdprotacio authored Jan 5, 2022
1 parent 491d164 commit 171480d
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 54 deletions.
62 changes: 42 additions & 20 deletions src/ChannelManagers/LocalChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,10 @@ public function getLocalConnectionsCount($appId, string $channelName = null): Pr
return $channel->getName() === $channelName;
});
})
->flatMap(function (Channel $channel) {
return collect($channel->getConnections())->pluck('socketId');
})
->unique()->count();
->flatMap(function (Channel $channel) {
return collect($channel->getConnections())->pluck('socketId');
})
->unique()->count();
});
}

Expand Down Expand Up @@ -429,9 +429,7 @@ public function getMemberSockets($userId, $appId, $channelName): PromiseInterfac
*/
public function connectionPonged(ConnectionInterface $connection): PromiseInterface
{
$connection->lastPongedAt = Carbon::now();

return $this->updateConnectionInChannels($connection);
return $this->pongConnectionInChannels($connection);
}

/**
Expand All @@ -441,23 +439,47 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf
*/
public function removeObsoleteConnections(): PromiseInterface
{
if (! $this->lock()->acquire()) {
return Helpers::createFulfilledPromise(false);
}
$lock = $this->lock();
try {
if (! $lock->acquire()) {
return Helpers::createFulfilledPromise(false);
}

$this->getLocalConnections()->then(function ($connections) {
foreach ($connections as $connection) {
$differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now());
$this->getLocalConnections()->then(function ($connections) {
foreach ($connections as $connection) {
$differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now());

if ($differenceInSeconds > 120) {
$this->unsubscribeFromAllChannels($connection);
if ($differenceInSeconds > 120) {
$this->unsubscribeFromAllChannels($connection);
}
}
}
});
});

return Helpers::createFulfilledPromise(
$this->lock()->forceRelease()
);
return Helpers::createFulfilledPromise(true);
} finally {
optional($lock)->forceRelease();
}
}

/**
* Pong connection in channels.
*
* @param ConnectionInterface $connection
* @return PromiseInterface[bool]
*/
public function pongConnectionInChannels(ConnectionInterface $connection): PromiseInterface
{
return $this->getLocalChannels($connection->app->id)
->then(function ($channels) use ($connection) {
foreach ($channels as $channel) {
if ($conn = $channel->getConnection($connection->socketId)) {
$conn->lastPongedAt = Carbon::now();
$channel->saveConnection($conn);
}
}

return true;
});
}

/**
Expand Down
107 changes: 74 additions & 33 deletions src/ChannelManagers/RedisChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace BeyondCode\LaravelWebSockets\ChannelManagers;

use BeyondCode\LaravelWebSockets\Channels\Channel;
use BeyondCode\LaravelWebSockets\DashboardLogger;
use BeyondCode\LaravelWebSockets\Helpers;
use BeyondCode\LaravelWebSockets\Server\MockableConnection;
Expand Down Expand Up @@ -145,31 +144,18 @@ public function subscribeToChannel(ConnectionInterface $connection, string $chan
*/
public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
{
return $this->getGlobalConnectionsCount($connection->app->id, $channelName)
return parent::unsubscribeFromChannel($connection, $channelName, $payload)
->then(function () use ($connection, $channelName) {
return $this->decrementSubscriptionsCount($connection->app->id, $channelName);
})
->then(function ($count) use ($connection, $channelName) {
if ($count === 0) {
// Make sure to not stay subscribed to the PubSub topic
// if there are no connections.
$this->removeConnectionFromSet($connection);
// If the total connections count gets to 0 after unsubscribe,
// try again to check & unsubscribe from the PubSub topic if needed.
if ($count < 1) {
$this->removeChannelFromSet($connection->app->id, $channelName);
$this->unsubscribeFromTopic($connection->app->id, $channelName);
}

$this->decrementSubscriptionsCount($connection->app->id, $channelName)
->then(function ($count) use ($connection, $channelName) {
// If the total connections count gets to 0 after unsubscribe,
// try again to check & unsubscribe from the PubSub topic if needed.
if ($count < 1) {
$this->unsubscribeFromTopic($connection->app->id, $channelName);
}
});
})
->then(function () use ($connection, $channelName) {
return $this->removeChannelFromSet($connection->app->id, $channelName);
})
->then(function () use ($connection) {
return $this->removeConnectionFromSet($connection);
})
->then(function () use ($connection, $channelName, $payload) {
return parent::unsubscribeFromChannel($connection, $channelName, $payload);
});
}

Expand Down Expand Up @@ -363,6 +349,16 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf
{
// This will update the score with the current timestamp.
return $this->addConnectionToSet($connection, Carbon::now())
->then(function () use ($connection) {
$payload = [
'socketId' => $connection->socketId,
'appId' => $connection->app->id,
'serverId' => $this->getServerId(),
];

return $this->publishClient
->publish($this->getPongRedisHash($connection->app->id), json_encode($payload));
})
->then(function () use ($connection) {
return parent::connectionPonged($connection);
});
Expand All @@ -375,18 +371,23 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf
*/
public function removeObsoleteConnections(): PromiseInterface
{
$this->lock()->get(function () {
$this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U'))
->then(function ($connections) {
foreach ($connections as $socketId => $appId) {
$connection = $this->fakeConnectionForApp($appId, $socketId);
$lock = $this->lock();
try {
$lock->get(function () {
$this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U'))
->then(function ($connections) {
foreach ($connections as $socketId => $appId) {
$connection = $this->fakeConnectionForApp($appId, $socketId);

$this->unsubscribeFromAllChannels($connection);
}
});
});
$this->unsubscribeFromAllChannels($connection);
}
});
});

return parent::removeObsoleteConnections();
return parent::removeObsoleteConnections();
} finally {
optional($lock)->forceRelease();
}
}

/**
Expand All @@ -404,6 +405,12 @@ public function onMessage(string $redisChannel, string $payload)
return;
}

if ($redisChannel == $this->getPongRedisHash($payload->appId)) {
$connection = $this->fakeConnectionForApp($payload->appId, $payload->socketId);

return parent::connectionPonged($connection);
}

$payload->channel = Str::after($redisChannel, "{$payload->appId}:");

if (! $channel = $this->find($payload->appId, $payload->channel)) {
Expand All @@ -429,6 +436,16 @@ public function onMessage(string $redisChannel, string $payload)
$channel->broadcastLocallyToEveryoneExcept($payload, $socketId, $appId);
}

public function find($appId, string $channel)
{
if (! $channelInstance = parent::find($appId, $channel)) {
$class = $this->getChannelClassName($channel);
$this->channels[$appId][$channel] = new $class($channel);
}

return parent::find($appId, $channel);
}

/**
* Build the Redis connection URL from Laravel database config.
*
Expand Down Expand Up @@ -601,6 +618,20 @@ public function removeChannelFromSet($appId, string $channel): PromiseInterface
);
}

/**
* Check if channel is on the list.
*
* @param string|int $appId
* @param string $channel
* @return PromiseInterface
*/
public function isChannelInSet($appId, string $channel): PromiseInterface
{
return $this->publishClient->sismember(
$this->getChannelsRedisHash($appId), $channel
);
}

/**
* Set data for a topic. Might be used for the presence channels.
*
Expand Down Expand Up @@ -729,6 +760,16 @@ public function getRedisKey($appId = null, string $channel = null, array $suffix
return $hash;
}

/**
* Get the pong Redis hash.
*
* @param string|int $appId
*/
public function getPongRedisHash($appId): string
{
return $this->getRedisKey($appId, null, ['pong']);
}

/**
* Get the statistics Redis hash.
*
Expand Down
15 changes: 14 additions & 1 deletion src/Channels/Channel.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ public function getConnections()
return $this->connections;
}

/**
* Get connection by socketId.
*
* @param string socketId
* @return ?ConnectionInterface
*/
public function getConnection(string $socketId): ?ConnectionInterface
{
return $this->connections[$socketId] ?? null;
}

/**
* Check if the channel has connections.
*
Expand Down Expand Up @@ -159,6 +170,7 @@ public function broadcast($appId, stdClass $payload, bool $replicate = true): bo
collect($this->getConnections())
->each(function ($connection) use ($payload) {
$connection->send(json_encode($payload));
$this->channelManager->connectionPonged($connection);
});

if ($replicate) {
Expand Down Expand Up @@ -196,12 +208,13 @@ public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId,
}

if (is_null($socketId)) {
return $this->broadcast($appId, $payload, $replicate);
return $this->broadcast($appId, $payload, false);
}

collect($this->getConnections())->each(function (ConnectionInterface $connection) use ($socketId, $payload) {
if ($connection->socketId !== $socketId) {
$connection->send(json_encode($payload));
$this->channelManager->connectionPonged($connection);
}
});

Expand Down

0 comments on commit 171480d

Please sign in to comment.