Skip to content
This repository has been archived by the owner on Feb 7, 2024. It is now read-only.

Replication fix amendment to #778 #881

Merged
merged 5 commits into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 42 additions & 20 deletions src/ChannelManagers/LocalChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,10 @@ public function getLocalConnectionsCount($appId, string $channelName = null): Pr
return $channel->getName() === $channelName;
});
})
->flatMap(function (Channel $channel) {
return collect($channel->getConnections())->pluck('socketId');
})
->unique()->count();
->flatMap(function (Channel $channel) {
return collect($channel->getConnections())->pluck('socketId');
})
->unique()->count();
});
}

Expand Down Expand Up @@ -429,9 +429,7 @@ public function getMemberSockets($userId, $appId, $channelName): PromiseInterfac
*/
public function connectionPonged(ConnectionInterface $connection): PromiseInterface
{
$connection->lastPongedAt = Carbon::now();

return $this->updateConnectionInChannels($connection);
return $this->pongConnectionInChannels($connection);
}

/**
Expand All @@ -441,23 +439,47 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf
*/
public function removeObsoleteConnections(): PromiseInterface
{
if (! $this->lock()->acquire()) {
return Helpers::createFulfilledPromise(false);
}
$lock = $this->lock();
try {
if (! $lock->acquire()) {
return Helpers::createFulfilledPromise(false);
}

$this->getLocalConnections()->then(function ($connections) {
foreach ($connections as $connection) {
$differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now());
$this->getLocalConnections()->then(function ($connections) {
foreach ($connections as $connection) {
$differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now());

if ($differenceInSeconds > 120) {
$this->unsubscribeFromAllChannels($connection);
if ($differenceInSeconds > 120) {
$this->unsubscribeFromAllChannels($connection);
}
}
}
});
});

return Helpers::createFulfilledPromise(
$this->lock()->forceRelease()
);
return Helpers::createFulfilledPromise(true);
} finally {
optional($lock)->forceRelease();
}
}

/**
* Pong connection in channels.
*
* @param ConnectionInterface $connection
* @return PromiseInterface[bool]
*/
public function pongConnectionInChannels(ConnectionInterface $connection): PromiseInterface
{
return $this->getLocalChannels($connection->app->id)
->then(function ($channels) use ($connection) {
foreach ($channels as $channel) {
if ($conn = $channel->getConnection($connection->socketId)) {
$conn->lastPongedAt = Carbon::now();
$channel->saveConnection($conn);
}
}

return true;
});
}

/**
Expand Down
109 changes: 75 additions & 34 deletions src/ChannelManagers/RedisChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace BeyondCode\LaravelWebSockets\ChannelManagers;

use BeyondCode\LaravelWebSockets\Channels\Channel;
use BeyondCode\LaravelWebSockets\DashboardLogger;
use BeyondCode\LaravelWebSockets\Helpers;
use BeyondCode\LaravelWebSockets\Server\MockableConnection;
Expand Down Expand Up @@ -145,31 +144,18 @@ public function subscribeToChannel(ConnectionInterface $connection, string $chan
*/
public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface
{
return $this->getGlobalConnectionsCount($connection->app->id, $channelName)
return parent::unsubscribeFromChannel($connection, $channelName, $payload)
->then(function () use ($connection, $channelName) {
return $this->decrementSubscriptionsCount($connection->app->id, $channelName);
})
->then(function ($count) use ($connection, $channelName) {
if ($count === 0) {
// Make sure to not stay subscribed to the PubSub topic
// if there are no connections.
$this->removeConnectionFromSet($connection);
// If the total connections count gets to 0 after unsubscribe,
// try again to check & unsubscribe from the PubSub topic if needed.
if ($count < 1) {
$this->removeChannelFromSet($connection->app->id, $channelName);
$this->unsubscribeFromTopic($connection->app->id, $channelName);
}

$this->decrementSubscriptionsCount($connection->app->id, $channelName)
->then(function ($count) use ($connection, $channelName) {
// If the total connections count gets to 0 after unsubscribe,
// try again to check & unsubscribe from the PubSub topic if needed.
if ($count < 1) {
$this->unsubscribeFromTopic($connection->app->id, $channelName);
}
});
})
->then(function () use ($connection, $channelName) {
return $this->removeChannelFromSet($connection->app->id, $channelName);
})
->then(function () use ($connection) {
return $this->removeConnectionFromSet($connection);
})
->then(function () use ($connection, $channelName, $payload) {
return parent::unsubscribeFromChannel($connection, $channelName, $payload);
});
}

Expand Down Expand Up @@ -363,6 +349,16 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf
{
// This will update the score with the current timestamp.
return $this->addConnectionToSet($connection, Carbon::now())
->then(function () use ($connection) {
$payload = [
'socketId' => $connection->socketId,
'appId' => $connection->app->id,
'serverId' => $this->getServerId(),
];

return $this->publishClient
->publish($this->getPongRedisHash($connection->app->id), json_encode($payload));
})
->then(function () use ($connection) {
return parent::connectionPonged($connection);
});
Expand All @@ -375,18 +371,23 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf
*/
public function removeObsoleteConnections(): PromiseInterface
{
$this->lock()->get(function () {
$this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U'))
->then(function ($connections) {
foreach ($connections as $socketId => $appId) {
$connection = $this->fakeConnectionForApp($appId, $socketId);
$lock = $this->lock();
try {
$lock->get(function () {
$this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U'))
->then(function ($connections) {
foreach ($connections as $socketId => $appId) {
$connection = $this->fakeConnectionForApp($appId, $socketId);

$this->unsubscribeFromAllChannels($connection);
}
});
});
$this->unsubscribeFromAllChannels($connection);
}
});
});

return parent::removeObsoleteConnections();
return parent::removeObsoleteConnections();
} finally {
optional($lock)->forceRelease();
}
}

/**
Expand All @@ -404,6 +405,12 @@ public function onMessage(string $redisChannel, string $payload)
return;
}

if ($redisChannel == $this->getPongRedisHash($payload->appId)) {
$connection = $this->fakeConnectionForApp($payload->appId, $payload->socketId);

return parent::connectionPonged($connection);
}

$payload->channel = Str::after($redisChannel, "{$payload->appId}:");

if (! $channel = $this->find($payload->appId, $payload->channel)) {
Expand All @@ -429,6 +436,16 @@ public function onMessage(string $redisChannel, string $payload)
$channel->broadcastLocallyToEveryoneExcept($payload, $socketId, $appId);
}

public function find($appId, string $channel)
{
if (! $channelInstance = parent::find($appId, $channel)) {
$class = $this->getChannelClassName($channel);
$this->channels[$appId][$channel] = new $class($channel);
}

return parent::find($appId, $channel);
}

/**
* Build the Redis connection URL from Laravel database config.
*
Expand Down Expand Up @@ -577,7 +594,7 @@ public function getConnectionsFromSet(int $start = 0, int $stop = 0, bool $stric
* Add a channel to the set list.
*
* @param string|int $appId
* @param string $channel
* @param string $channel
* @return PromiseInterface
*/
public function addChannelToSet($appId, string $channel): PromiseInterface
Expand All @@ -601,6 +618,20 @@ public function removeChannelFromSet($appId, string $channel): PromiseInterface
);
}

/**
* Check if channel is on the list.
*
* @param string|int $appId
* @param string $channel
* @return PromiseInterface
*/
public function isChannelInSet($appId, string $channel): PromiseInterface
{
return $this->publishClient->sismember(
$this->getChannelsRedisHash($appId), $channel
);
}

/**
* Set data for a topic. Might be used for the presence channels.
*
Expand Down Expand Up @@ -729,6 +760,16 @@ public function getRedisKey($appId = null, string $channel = null, array $suffix
return $hash;
}

/**
* Get the pong Redis hash.
*
* @param string|int $appId
*/
public function getPongRedisHash($appId): string
{
return $this->getRedisKey($appId, null, ['pong']);
}

/**
* Get the statistics Redis hash.
*
Expand Down
17 changes: 16 additions & 1 deletion src/Channels/Channel.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ public function getConnections()
return $this->connections;
}

/**
* Get connection by socketId.
*
* @param string socketId
* @return ?ConnectionInterface
*/
public function getConnection(string $socketId): ?ConnectionInterface
{
return $this->connections[$socketId] ?? null;
}

/**
* Check if the channel has connections.
*
Expand All @@ -73,6 +84,7 @@ public function hasConnections(): bool
* Add a new connection to the channel.
*
* @see https://pusher.com/docs/pusher_protocol#presence-channel-events
*
* @param \Ratchet\ConnectionInterface $connection
* @param \stdClass $payload
* @return bool
Expand Down Expand Up @@ -158,6 +170,7 @@ public function broadcast($appId, stdClass $payload, bool $replicate = true): bo
collect($this->getConnections())
->each(function ($connection) use ($payload) {
$connection->send(json_encode($payload));
$this->channelManager->connectionPonged($connection);
});

if ($replicate) {
Expand Down Expand Up @@ -195,12 +208,13 @@ public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId,
}

if (is_null($socketId)) {
return $this->broadcast($appId, $payload, $replicate);
return $this->broadcast($appId, $payload, false);
}

collect($this->getConnections())->each(function (ConnectionInterface $connection) use ($socketId, $payload) {
if ($connection->socketId !== $socketId) {
$connection->send(json_encode($payload));
$this->channelManager->connectionPonged($connection);
}
});

Expand Down Expand Up @@ -228,6 +242,7 @@ public function broadcastLocallyToEveryoneExcept(stdClass $payload, ?string $soc
* @param \Ratchet\ConnectionInterface $connection
* @param \stdClass $payload
* @return void
*
* @throws InvalidSignature
*/
protected function verifySignature(ConnectionInterface $connection, stdClass $payload)
Expand Down