Skip to content

Commit

Permalink
Merge branch 'master' into fix-can-doc
Browse files Browse the repository at this point in the history
  • Loading branch information
k0ka committed Aug 20, 2024
2 parents 1bfe212 + 45044d7 commit ff400a1
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 11 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ You can find and compare releases at the [GitHub release page](https://github.co

## Unreleased

## v6.43.0

### Added

- Remove expired members from subscription topic stored in Redis set map https://github.com/nuwave/lighthouse/pull/2601

## v6.42.2

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"webonyx/graphql-php": "^15"
},
"require-dev": {
"algolia/algoliasearch-client-php": "^3 || ^4",
"algolia/algoliasearch-client-php": "^3",
"bensampo/laravel-enum": "^5 || ^6",
"dms/phpunit-arraysubset-asserts": "^0.4 || ^0.5",
"ergebnis/composer-normalize": "^2.2.2",
Expand Down
35 changes: 27 additions & 8 deletions src/Subscriptions/Storage/RedisStorageManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

/**
* Stores subscribers and topics in redis.
*
* - Topics are subscriptions like "userCreated" or "userDeleted".
* - Subscribers are clients that are listening to channels like "private-lighthouse-a7ef3d".
*
Expand Down Expand Up @@ -61,10 +62,13 @@ public function subscribersByTopic(string $topic): Collection
// As explained in storeSubscriber, we use redis sets to store the names of subscribers of a topic.
// We can retrieve all members of a set using the command smembers.
$subscriberIds = $this->connection->command('smembers', [$this->topicKey($topic)]);
if (count($subscriberIds) === 0) {
if ($subscriberIds === []) {
return new Collection();
}

// Store all keys as missing keys to remove the ones which are expired later.
$missingKeys = $subscriberIds;

// Since we store the individual subscribers with a prefix,
// but not in the set, we have to add the prefix here.
$subscriberIds = array_map([$this, 'channelKey'], $subscriberIds);
Expand All @@ -73,30 +77,45 @@ public function subscribersByTopic(string $topic): Collection
// This is like using multiple get calls (getSubscriber uses the get command).
$subscribers = $this->connection->command('mget', [$subscriberIds]);

return (new Collection($subscribers))
$subscribersCollection = (new Collection($subscribers))
->filter()
->map(static function (?string $subscriber): ?Subscriber {
// Some entries may be expired
->map(static function (?string $subscriber) use (&$missingKeys): ?Subscriber {
// Some entries may be expired.
if ($subscriber === null) {
return null;
}

// Other entries may contain invalid values
// Other entries may contain invalid values.
try {
return unserialize($subscriber);
$subscriber = unserialize($subscriber);

// This key exists so remove it from the list of missing keys.
$missingKeys = array_diff($missingKeys, [$subscriber->channel]);

return $subscriber;
} catch (\ErrorException) {
return null;
}
})
->filter();

// Remove expired subscribers from the set of subscribers of this topic.
if ($missingKeys !== []) {
$this->connection->command('srem', [
$this->topicKey($topic),
...$missingKeys,
]);
}

return $subscribersCollection;
}

public function storeSubscriber(Subscriber $subscriber, string $topic): void
{
$subscriber->topic = $topic;

// In contrast to the CacheStorageManager, we use redis sets.
// Instead of reading the entire list, adding the subscriber and storing the list;
// Instead of reading the entire list, adding the subscriber, and storing the list;
// we simply add the name of the subscriber to the set of subscribers of this topic using the sadd command...
$topicKey = $this->topicKey($topic);
$this->connection->command('sadd', [
Expand All @@ -108,7 +127,7 @@ public function storeSubscriber(Subscriber $subscriber, string $topic): void
$this->connection->command('expire', [$topicKey, $this->ttl]);
}

// Lastly, we store the subscriber as a serialized string...
// Lastly, we store the subscriber as a serialized string.
$setCommand = 'set';
$setArguments = [
$this->channelKey($subscriber->channel),
Expand Down
9 changes: 7 additions & 2 deletions tests/Unit/Subscriptions/Storage/RedisStorageManagerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public function testDeleteSubscriber(): void
))
->willReturnOnConsecutiveCalls(
serialize($subscriber),
true,
true,
);

$manager = new RedisStorageManager($config, $redisFactory);
Expand Down Expand Up @@ -195,18 +197,20 @@ public function testSubscribersByTopic(): void
$subscriber2,
];

$redisConnection->expects($this->exactly(2))
$redisConnection->expects($this->exactly(3))
->method('command')
->with(...$this->withConsecutive(
['smembers', ["graphql.topic.{$topic}"]],
['mget', [[
'graphql.subscriber.foo1',
'graphql.subscriber.foo2',
'graphql.subscriber.foo3',
'graphql.subscriber.foo4',
]]],
['srem', ["graphql.topic.{$topic}", 'foo3', 'foo4']],
))
->willReturnOnConsecutiveCalls(
['foo1', 'foo2', 'foo3'],
['foo1', 'foo2', 'foo3', 'foo4'],
[
serialize($subscriber1),
serialize($subscriber2),
Expand All @@ -217,6 +221,7 @@ public function testSubscribersByTopic(): void
// mget non-existing-entry
false,
],
null,
);

$this->assertEquals(
Expand Down

0 comments on commit ff400a1

Please sign in to comment.