diff --git a/src/ChannelManagers/LocalChannelManager.php b/src/ChannelManagers/LocalChannelManager.php index 913744baea..5144f26ab8 100644 --- a/src/ChannelManagers/LocalChannelManager.php +++ b/src/ChannelManagers/LocalChannelManager.php @@ -272,10 +272,10 @@ public function getLocalConnectionsCount($appId, string $channelName = null): Pr return $channel->getName() === $channelName; }); }) - ->flatMap(function (Channel $channel) { - return collect($channel->getConnections())->pluck('socketId'); - }) - ->unique()->count(); + ->flatMap(function (Channel $channel) { + return collect($channel->getConnections())->pluck('socketId'); + }) + ->unique()->count(); }); } @@ -429,9 +429,7 @@ public function getMemberSockets($userId, $appId, $channelName): PromiseInterfac */ public function connectionPonged(ConnectionInterface $connection): PromiseInterface { - $connection->lastPongedAt = Carbon::now(); - - return $this->updateConnectionInChannels($connection); + return $this->pongConnectionInChannels($connection); } /** @@ -441,23 +439,47 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf */ public function removeObsoleteConnections(): PromiseInterface { - if (! $this->lock()->acquire()) { - return Helpers::createFulfilledPromise(false); - } + $lock = $this->lock(); + try { + if (! $lock->acquire()) { + return Helpers::createFulfilledPromise(false); + } - $this->getLocalConnections()->then(function ($connections) { - foreach ($connections as $connection) { - $differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now()); + $this->getLocalConnections()->then(function ($connections) { + foreach ($connections as $connection) { + $differenceInSeconds = $connection->lastPongedAt->diffInSeconds(Carbon::now()); - if ($differenceInSeconds > 120) { - $this->unsubscribeFromAllChannels($connection); + if ($differenceInSeconds > 120) { + $this->unsubscribeFromAllChannels($connection); + } } - } - }); + }); - return Helpers::createFulfilledPromise( - $this->lock()->forceRelease() - ); + return Helpers::createFulfilledPromise(true); + } finally { + optional($lock)->forceRelease(); + } + } + + /** + * Pong connection in channels. + * + * @param ConnectionInterface $connection + * @return PromiseInterface[bool] + */ + public function pongConnectionInChannels(ConnectionInterface $connection): PromiseInterface + { + return $this->getLocalChannels($connection->app->id) + ->then(function ($channels) use ($connection) { + foreach ($channels as $channel) { + if ($conn = $channel->getConnection($connection->socketId)) { + $conn->lastPongedAt = Carbon::now(); + $channel->saveConnection($conn); + } + } + + return true; + }); } /** diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index a3b9ca0259..6c87948730 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -2,7 +2,6 @@ namespace BeyondCode\LaravelWebSockets\ChannelManagers; -use BeyondCode\LaravelWebSockets\Channels\Channel; use BeyondCode\LaravelWebSockets\DashboardLogger; use BeyondCode\LaravelWebSockets\Helpers; use BeyondCode\LaravelWebSockets\Server\MockableConnection; @@ -145,31 +144,18 @@ public function subscribeToChannel(ConnectionInterface $connection, string $chan */ public function unsubscribeFromChannel(ConnectionInterface $connection, string $channelName, stdClass $payload): PromiseInterface { - return $this->getGlobalConnectionsCount($connection->app->id, $channelName) + return parent::unsubscribeFromChannel($connection, $channelName, $payload) + ->then(function () use ($connection, $channelName) { + return $this->decrementSubscriptionsCount($connection->app->id, $channelName); + }) ->then(function ($count) use ($connection, $channelName) { - if ($count === 0) { - // Make sure to not stay subscribed to the PubSub topic - // if there are no connections. + $this->removeConnectionFromSet($connection); + // If the total connections count gets to 0 after unsubscribe, + // try again to check & unsubscribe from the PubSub topic if needed. + if ($count < 1) { + $this->removeChannelFromSet($connection->app->id, $channelName); $this->unsubscribeFromTopic($connection->app->id, $channelName); } - - $this->decrementSubscriptionsCount($connection->app->id, $channelName) - ->then(function ($count) use ($connection, $channelName) { - // If the total connections count gets to 0 after unsubscribe, - // try again to check & unsubscribe from the PubSub topic if needed. - if ($count < 1) { - $this->unsubscribeFromTopic($connection->app->id, $channelName); - } - }); - }) - ->then(function () use ($connection, $channelName) { - return $this->removeChannelFromSet($connection->app->id, $channelName); - }) - ->then(function () use ($connection) { - return $this->removeConnectionFromSet($connection); - }) - ->then(function () use ($connection, $channelName, $payload) { - return parent::unsubscribeFromChannel($connection, $channelName, $payload); }); } @@ -363,6 +349,16 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf { // This will update the score with the current timestamp. return $this->addConnectionToSet($connection, Carbon::now()) + ->then(function () use ($connection) { + $payload = [ + 'socketId' => $connection->socketId, + 'appId' => $connection->app->id, + 'serverId' => $this->getServerId(), + ]; + + return $this->publishClient + ->publish($this->getPongRedisHash($connection->app->id), json_encode($payload)); + }) ->then(function () use ($connection) { return parent::connectionPonged($connection); }); @@ -375,18 +371,23 @@ public function connectionPonged(ConnectionInterface $connection): PromiseInterf */ public function removeObsoleteConnections(): PromiseInterface { - $this->lock()->get(function () { - $this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) - ->then(function ($connections) { - foreach ($connections as $socketId => $appId) { - $connection = $this->fakeConnectionForApp($appId, $socketId); + $lock = $this->lock(); + try { + $lock->get(function () { + $this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($connections) { + foreach ($connections as $socketId => $appId) { + $connection = $this->fakeConnectionForApp($appId, $socketId); - $this->unsubscribeFromAllChannels($connection); - } - }); - }); + $this->unsubscribeFromAllChannels($connection); + } + }); + }); - return parent::removeObsoleteConnections(); + return parent::removeObsoleteConnections(); + } finally { + optional($lock)->forceRelease(); + } } /** @@ -404,6 +405,12 @@ public function onMessage(string $redisChannel, string $payload) return; } + if ($redisChannel == $this->getPongRedisHash($payload->appId)) { + $connection = $this->fakeConnectionForApp($payload->appId, $payload->socketId); + + return parent::connectionPonged($connection); + } + $payload->channel = Str::after($redisChannel, "{$payload->appId}:"); if (! $channel = $this->find($payload->appId, $payload->channel)) { @@ -429,6 +436,16 @@ public function onMessage(string $redisChannel, string $payload) $channel->broadcastLocallyToEveryoneExcept($payload, $socketId, $appId); } + public function find($appId, string $channel) + { + if (! $channelInstance = parent::find($appId, $channel)) { + $class = $this->getChannelClassName($channel); + $this->channels[$appId][$channel] = new $class($channel); + } + + return parent::find($appId, $channel); + } + /** * Build the Redis connection URL from Laravel database config. * @@ -601,6 +618,20 @@ public function removeChannelFromSet($appId, string $channel): PromiseInterface ); } + /** + * Check if channel is on the list. + * + * @param string|int $appId + * @param string $channel + * @return PromiseInterface + */ + public function isChannelInSet($appId, string $channel): PromiseInterface + { + return $this->publishClient->sismember( + $this->getChannelsRedisHash($appId), $channel + ); + } + /** * Set data for a topic. Might be used for the presence channels. * @@ -729,6 +760,16 @@ public function getRedisKey($appId = null, string $channel = null, array $suffix return $hash; } + /** + * Get the pong Redis hash. + * + * @param string|int $appId + */ + public function getPongRedisHash($appId): string + { + return $this->getRedisKey($appId, null, ['pong']); + } + /** * Get the statistics Redis hash. * diff --git a/src/Channels/Channel.php b/src/Channels/Channel.php index e022d2a2f5..dbd874fd8e 100644 --- a/src/Channels/Channel.php +++ b/src/Channels/Channel.php @@ -59,6 +59,17 @@ public function getConnections() return $this->connections; } + /** + * Get connection by socketId. + * + * @param string socketId + * @return ?ConnectionInterface + */ + public function getConnection(string $socketId): ?ConnectionInterface + { + return $this->connections[$socketId] ?? null; + } + /** * Check if the channel has connections. * @@ -159,6 +170,7 @@ public function broadcast($appId, stdClass $payload, bool $replicate = true): bo collect($this->getConnections()) ->each(function ($connection) use ($payload) { $connection->send(json_encode($payload)); + $this->channelManager->connectionPonged($connection); }); if ($replicate) { @@ -196,12 +208,13 @@ public function broadcastToEveryoneExcept(stdClass $payload, ?string $socketId, } if (is_null($socketId)) { - return $this->broadcast($appId, $payload, $replicate); + return $this->broadcast($appId, $payload, false); } collect($this->getConnections())->each(function (ConnectionInterface $connection) use ($socketId, $payload) { if ($connection->socketId !== $socketId) { $connection->send(json_encode($payload)); + $this->channelManager->connectionPonged($connection); } });