Skip to content

Commit 06fc7b7

Browse files
committed
fix(ssubscribe): properly resubscribe in case of shard failover in RE TODO cleanup debug logs
1) when RE failover happens, there is a disconnect 2) affected Client reconnects and tries to resubscribe all existing listeners ISSUE #1: CROSSSLOT Error - client was doing ssubscribe ch1 ch2.. chN which, after the failover could result in CROSSSLOT ( naturally, becasuse now some slots could be owned by other shards ) FIX: send one ssubscribe command per channel instead of one ssubscribe for all channels ISSUE #2: MOVED Error - some/all of the channels might be moved somewhere else FIX: 1: propagate the error to the Cluster. 2: Cluster rediscovers topology. 3: Extract all existing subscriptions from all pubsub clients and resubscribe over the new topology. fixes: redis#2902
1 parent bd11e38 commit 06fc7b7

File tree

5 files changed

+100
-13
lines changed

5 files changed

+100
-13
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,14 @@ export default class RedisCommandsQueue {
338338
return this.#addPubSubCommand(command);
339339
}
340340

341+
getShardedChannels(): IterableIterator<string> {
342+
return this.#pubSub.getShardedChannels();
343+
}
344+
345+
removeShardedListeners(channel: string): ChannelListeners {
346+
return this.#pubSub.removeShardedListeners(channel);
347+
}
348+
341349
resubscribe(chainId?: symbol) {
342350
const commands = this.#pubSub.resubscribe();
343351
if (!commands.length) return;

packages/client/lib/client/index.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -765,17 +765,17 @@ export default class RedisClient<
765765
}
766766
});
767767
}
768-
768+
769769
if (this.#clientSideCache) {
770770
commands.push({cmd: this.#clientSideCache.trackingOn()});
771771
}
772772

773773
if (this.#options?.emitInvalidate) {
774774
commands.push({cmd: ['CLIENT', 'TRACKING', 'ON']});
775775
}
776-
776+
777777
const maintenanceHandshakeCmd = await EnterpriseMaintenanceManager.getHandshakeCommand(this.#options);
778-
778+
779779
if(maintenanceHandshakeCmd) {
780780
commands.push(maintenanceHandshakeCmd);
781781
};
@@ -818,6 +818,11 @@ export default class RedisClient<
818818
chainId = Symbol('Socket Initiator');
819819

820820
const resubscribePromise = this.#queue.resubscribe(chainId);
821+
resubscribePromise?.catch(error => {
822+
if (error.message && error.message.startsWith('MOVED')) {
823+
this.emit('__MOVED')
824+
}
825+
});
821826
if (resubscribePromise) {
822827
promises.push(resubscribePromise);
823828
}
@@ -1192,6 +1197,14 @@ export default class RedisClient<
11921197

11931198
sUnsubscribe = this.SUNSUBSCRIBE;
11941199

1200+
getShardedChannels(): IterableIterator<string> {
1201+
return this._self.#queue.getShardedChannels();
1202+
}
1203+
1204+
removeShardedListeners(channel: string): ChannelListeners {
1205+
return this._self.#queue.removeShardedListeners(channel);
1206+
}
1207+
11951208
async WATCH(key: RedisVariadicArgument) {
11961209
const reply = await this._self.sendCommand(
11971210
pushVariadicArguments(['WATCH'], key)

packages/client/lib/client/pub-sub.ts

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -323,25 +323,50 @@ export class PubSub {
323323
}
324324

325325
resubscribe() {
326-
const commands = [];
326+
const commands: PubSubCommand[] = [];
327327
for (const [type, listeners] of Object.entries(this.listeners)) {
328328
if (!listeners.size) continue;
329329

330330
this.#isActive = true;
331+
332+
if(type === PUBSUB_TYPE.SHARDED) {
333+
this.#shardedResubscribe(commands, listeners);
334+
} else {
335+
this.#normalResubscribe(commands, type, listeners);
336+
}
337+
}
338+
339+
return commands;
340+
}
341+
342+
#normalResubscribe(commands: PubSubCommand[], type: string, listeners: PubSubTypeListeners) {
343+
this.#subscribing++;
344+
const callback = () => this.#subscribing--;
345+
commands.push({
346+
args: [
347+
COMMANDS[type as PubSubType].subscribe,
348+
...listeners.keys()
349+
],
350+
channelsCounter: listeners.size,
351+
resolve: callback,
352+
reject: callback
353+
});
354+
}
355+
356+
#shardedResubscribe(commands: PubSubCommand[], listeners: PubSubTypeListeners) {
357+
const callback = () => this.#subscribing--;
358+
for(const channel of listeners.keys()) {
331359
this.#subscribing++;
332-
const callback = () => this.#subscribing--;
333360
commands.push({
334361
args: [
335-
COMMANDS[type as PubSubType].subscribe,
336-
...listeners.keys()
362+
COMMANDS[PUBSUB_TYPE.SHARDED].subscribe,
363+
channel
337364
],
338-
channelsCounter: listeners.size,
365+
channelsCounter: 1,
339366
resolve: callback,
340367
reject: callback
341-
} satisfies PubSubCommand);
368+
})
342369
}
343-
344-
return commands;
345370
}
346371

347372
handleMessageReply(reply: Array<Buffer>): boolean {
@@ -379,6 +404,10 @@ export class PubSub {
379404
return listeners;
380405
}
381406

407+
getShardedChannels(): IterableIterator<string> {
408+
return this.listeners[PUBSUB_TYPE.SHARDED].keys()
409+
}
410+
382411
#emitPubSubMessage(
383412
type: PubSubType,
384413
message: Buffer,

packages/client/lib/cluster/cluster-slots.ts

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,22 @@ export default class RedisClusterSlots<
185185
async #discover(rootNode: RedisClusterClientOptions) {
186186
this.clientSideCache?.clear();
187187
this.clientSideCache?.disable();
188+
189+
190+
const allChannelListeners = new Map<string, ChannelListeners>();
191+
192+
for (const master of this.masters) {
193+
const shardedClient = master.pubSub?.client;
194+
if (!shardedClient) continue;
195+
for (const channel of shardedClient.getShardedChannels()) {
196+
const listeners = shardedClient.removeShardedListeners(channel);
197+
if(allChannelListeners.get(channel)) {
198+
console.warn(`Found existing listeners, will be overwritten...`);
199+
}
200+
allChannelListeners.set(channel, listeners);
201+
}
202+
}
203+
188204
try {
189205
const addressesInUse = new Set<string>(),
190206
promises: Array<Promise<unknown>> = [],
@@ -224,6 +240,7 @@ export default class RedisClusterSlots<
224240
}
225241
}
226242

243+
//Keep only the nodes that are still in use
227244
for (const [address, node] of this.nodeByAddress.entries()) {
228245
if (addressesInUse.has(address)) continue;
229246

@@ -239,6 +256,9 @@ export default class RedisClusterSlots<
239256
this.nodeByAddress.delete(address);
240257
}
241258

259+
this.#emit('__refreshShardedChannels', allChannelListeners);
260+
261+
242262
await Promise.all(promises);
243263
this.clientSideCache?.enable();
244264

@@ -354,6 +374,9 @@ export default class RedisClusterSlots<
354374
.once('ready', () => emit('node-ready', client))
355375
.once('connect', () => emit('node-connect', client))
356376
.once('end', () => emit('node-disconnect', client));
377+
.on('__MOVED', () => {
378+
this.rediscover(client);
379+
})
357380
}
358381

359382
#createNodeClient(node: ShardNode<M, F, S, RESP, TYPE_MAPPING>, readonly?: boolean) {
@@ -374,7 +397,9 @@ export default class RedisClusterSlots<
374397

375398
async rediscover(startWith: RedisClientType<M, F, S, RESP>): Promise<void> {
376399
this.#runningRediscoverPromise ??= this.#rediscover(startWith)
377-
.finally(() => this.#runningRediscoverPromise = undefined);
400+
.finally(() => {
401+
this.#runningRediscoverPromise = undefined
402+
});
378403
return this.#runningRediscoverPromise;
379404
}
380405

packages/client/lib/cluster/index.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { EventEmitter } from 'node:events';
66
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
77
import RedisClusterSlots, { NodeAddressMap, ShardNode } from './cluster-slots';
88
import RedisClusterMultiCommand, { RedisClusterMultiCommandType } from './multi-command';
9-
import { PubSubListener } from '../client/pub-sub';
9+
import { ChannelListeners, PubSubListener } from '../client/pub-sub';
1010
import { ErrorReply } from '../errors';
1111
import { RedisTcpSocketOptions } from '../client/socket';
1212
import { ClientSideCacheConfig, PooledClientSideCacheProvider } from '../client/cache';
@@ -310,6 +310,7 @@ export default class RedisCluster<
310310

311311
this._options = options;
312312
this._slots = new RedisClusterSlots(options, this.emit.bind(this));
313+
this.on('__refreshShardedChannels', this.refreshShardedChannelsSubscriptions.bind(this));
313314

314315
if (options?.commandOptions) {
315316
this._commandOptions = options.commandOptions;
@@ -584,6 +585,17 @@ export default class RedisCluster<
584585
);
585586
}
586587

588+
refreshShardedChannelsSubscriptions(allChannelListeners: Map<string, ChannelListeners>) {
589+
for(const [channel, listeners] of allChannelListeners) {
590+
for(const bufListener of listeners.buffers) {
591+
this.sSubscribe(channel, bufListener, true);
592+
}
593+
for(const strListener of listeners.strings) {
594+
this.sSubscribe(channel, strListener);
595+
}
596+
}
597+
}
598+
587599
sUnsubscribe = this.SUNSUBSCRIBE;
588600

589601
/**

0 commit comments

Comments
 (0)