Skip to content

feat: Sharded pub/sub support via dedicated subscribers #1956

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4111c5d
Added a test case for sharded pubsub.
dmaier-redislabs Feb 5, 2025
0998570
Trying to find a way to add sharded PubSub by avoiding big refactorings
dmaier-redislabs Feb 7, 2025
014832f
Added the selection of the right sharded subscriber and fixed the tes…
dmaier-redislabs Feb 13, 2025
edbf4bc
Refactored the code to live in ClusterSubscriberGroup
dmaier-redislabs Feb 17, 2025
18b764b
Moved the test case to a dedicated.
dmaier-redislabs Feb 20, 2025
d73c783
Trying to find out why this test succeeds locally, but fails in the t…
dmaier-redislabs Feb 20, 2025
b9d8c52
Trying to find out why this test succeeds locally, but fails in the t…
dmaier-redislabs Feb 20, 2025
8d119a0
Moved the shareded pubsub test to the integration test folder
dmaier-redislabs Feb 20, 2025
8ecfdc3
Made sharded subscribers optional.
dmaier-redislabs Feb 21, 2025
c10b463
Fixed potentially leaking connections when calling disconnect on the …
dmaier-redislabs Feb 24, 2025
e5b0d62
Comparing the cluster map of the subscriber group with the one of the…
dmaier-redislabs Feb 24, 2025
4c2f8e2
Resubscribe shard channels (#1957)
tishun Feb 26, 2025
a40fa21
Merging from the feature branch back into my personal one (#1960)
dmaier-redislabs Feb 28, 2025
2a82596
Merge branch 'main' into dmaier-sharded-pubsub
dmaier-redislabs Feb 28, 2025
102ecd1
Merge branch 'main' into dmaier-sharded-pubsub
dmaier-redislabs Feb 28, 2025
bf8237a
Merge branch 'main' into dmaier-sharded-pubsub
dmaier-redislabs Feb 28, 2025
65954d7
Added support for sunsubscribe to the sharded subscriber group
dmaier-redislabs Mar 5, 2025
91f5e7c
Update test.yml
dmaier-redislabs Mar 5, 2025
2172acb
Update test_with_cov.yml
dmaier-redislabs Mar 5, 2025
7782385
Update release.yml
dmaier-redislabs Mar 5, 2025
d08080f
Update README.md
dmaier-redislabs Mar 5, 2025
8637e20
Update README.md
dmaier-redislabs Mar 5, 2025
00a0940
Update README.md
dmaier-redislabs Mar 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions lib/cluster/ClusterSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@ const debug = Debug("cluster:subscriber");

export default class ClusterSubscriber {
private started = false;

//There is only one connection for the entire pool
private subscriber: any = null;
private lastActiveSubscriber: any;

//The slot range for which this subscriber is responsible
private slotRange: number[] = []

constructor(
private connectionPool: ConnectionPool,
private emitter: EventEmitter
private emitter: EventEmitter,
private isSharded : boolean = false
) {
// If the current node we're using as the subscriber disappears
// from the node pool for some reason, we will select a new one
Expand Down Expand Up @@ -47,6 +53,22 @@ export default class ClusterSubscriber {
return this.subscriber;
}

/**
* Associate this subscriber to a specific slot range.
*
* Returns the range or an empty array if the slot range couldn't be associated.
*
* BTW: This is more for debugging and testing purposes.
*
* @param range
*/
associateSlotRange(range: number[]): number[] {
if (this.isSharded) {
this.slotRange = range;
}
return this.slotRange;
}

start(): void {
this.started = true;
this.selectSubscriber();
Expand All @@ -59,7 +81,6 @@ export default class ClusterSubscriber {
this.subscriber.disconnect();
this.subscriber = null;
}
debug("stopped");
}

private onSubscriberEnd = () => {
Expand Down Expand Up @@ -179,17 +200,27 @@ export default class ClusterSubscriber {
for (const event of [
"message",
"messageBuffer",
"smessage",
"smessageBuffer",
]) {
this.subscriber.on(event, (arg1, arg2) => {
this.emitter.emit(event, arg1, arg2);
});
}

for (const event of ["pmessage", "pmessageBuffer"]) {
this.subscriber.on(event, (arg1, arg2, arg3) => {
this.emitter.emit(event, arg1, arg2, arg3);
});
}

if (this.isSharded == true) {
for (const event of [
"smessage",
"smessageBuffer",
]) {
this.subscriber.on(event, (arg1, arg2) => {
this.emitter.emit(event, arg1, arg2);
});
}
}
}
}
138 changes: 138 additions & 0 deletions lib/cluster/ClusterSubscriberGroup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import {Debug} from "../utils";
import ClusterSubscriber from "./ClusterSubscriber";
import Cluster from "./index";
import ConnectionPool from "./ConnectionPool";
import {getNodeKey} from "./util";

const debug = Debug("cluster:subscriberGroup");


/**
* Redis differs between "normal" and sharded PubSub. If using the "normal" PubSub feature, exactly one
* ClusterSubscriber exists per cluster instance. This works because the Redis cluster bus forwards m
* messages between shards. However, this has scalability limitations, which is the reason why the sharded
* PubSub feature was added to Redis. With sharded PubSub, each shard is responsible for its own messages.
* Given that, we need at least one ClusterSubscriber per master endpoint/node.
*
* This class leverages the previously exising ClusterSubscriber by adding support for multiple such subscribers
* in alignment to the master nodes of the cluster. The ClusterSubscriber class was extended in a non-breaking way
* to support this feature.
*/
export default class ClusterSubscriberGroup {

private shardedSubscribers: Map<string, ClusterSubscriber> = new Map();
private clusterSlots: string[][] = [];
//Simple [min, max] slot ranges aren't enough because you can migrate single slots
private subscriberToSlotsIndex: Map<string, number[]> = new Map();

/**
* Register callbacks
*
* @param cluster
*/
constructor(private cluster: Cluster) {

cluster.on("+node", (redis) => {
this.addSubscriber(redis);
});

cluster.on("-node", (redis) => {
this.removeSubscriber(redis);
});

cluster.on("refresh", () => {
this.refreshSlots(cluster);
});
}


/**
* Add a subscriber to the group of subscribers
*
* @param redis
*/
addSubscriber(redis: any): ClusterSubscriber {
const pool: ConnectionPool= new ConnectionPool(redis.options);

if (pool.addMasterNode(redis)) {
const sub = new ClusterSubscriber(pool, this.cluster, true)
const nodeKey = getNodeKey(redis.options);
this.shardedSubscribers.set(nodeKey, sub);
sub.start();
this.cluster.emit("+subscriber");
return sub;
}

return null;
}


/**
* Removes a subscriber from the group
* @param options
*/
removeSubscriber(options: any): Map<string, ClusterSubscriber> {

const nodeKey = getNodeKey(options);
const sub = this.shardedSubscribers.get(nodeKey);

if (sub) {
sub.stop();
this.shardedSubscribers.delete(nodeKey);
this.cluster.emit("-subscriber");
}

return this.shardedSubscribers;
}

/**
* Get the responsible subscriber.
*
* Returns null if no subscriber was found
*
* @param slot
*/
getResponsibleSubscriber(slot: number ) : ClusterSubscriber {
const nodeKey = this.clusterSlots[slot][0]
return this.shardedSubscribers.get(nodeKey);
}

/**
* Refreshes the subscriber-related slot ranges
*
* Returns false if no refresh was needed
*
* @param cluster
*/
refreshSlots(cluster: Cluster) : boolean {
//If there was an actual change, then reassign the slot ranges
if (this.clusterSlots !== cluster.slots) {

debug("Refreshing the slots of the subscriber group.");
//Rebuild the slots index
this.subscriberToSlotsIndex = new Map();

for (let slot = 0; slot < cluster.slots.length; slot++) {
const node: string = cluster.slots[slot][0];

if (!this.subscriberToSlotsIndex.has(node)) {
this.subscriberToSlotsIndex.set(node, []);
}
this.subscriberToSlotsIndex.get(node).push(Number(slot))
}

//Update the subscribers from the index
this.shardedSubscribers.forEach((s: ClusterSubscriber, nodeKey: string) => {
s.associateSlotRange(this.subscriberToSlotsIndex.get(nodeKey));
})

this.clusterSlots = cluster.slots;
this.cluster.emit("subscribersReady")
return true;
}

return false;
}


}
68 changes: 50 additions & 18 deletions lib/cluster/ConnectionPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,55 @@ export default class ConnectionPool extends EventEmitter {
return this.nodes[role][sampleKey];
}


/**
* Add a master node to the pool
* @param node
*/
addMasterNode(node: RedisOptions) {
const key = getNodeKey(node.options);
const redis : Redis = this.createRedisFromOptions(node, node.options.readOnly)

//Master nodes aren't read-only
if (!node.options.readOnly) {
this.nodes.all[key] = redis;
this.nodes.master[key] = redis;
return true;
}

return false;
}


/**
* Creates a Redis connection instance from the node options
* @param node
* @param readOnly
*/
createRedisFromOptions(node: RedisOptions, readOnly: boolean) {
const redis = new Redis(
defaults(
{
// Never try to reconnect when a node is lose,
// instead, waiting for a `MOVED` error and
// fetch the slots again.
retryStrategy: null,
// Offline queue should be enabled so that
// we don't need to wait for the `ready` event
// before sending commands to the node.
enableOfflineQueue: true,
readOnly: readOnly,
},
node,
this.redisOptions,
{ lazyConnect: true }
)
);

return redis
}


/**
* Find or create a connection to the node
*/
Expand Down Expand Up @@ -66,24 +115,7 @@ export default class ConnectionPool extends EventEmitter {
}
} else {
debug("Connecting to %s as %s", key, readOnly ? "slave" : "master");
redis = new Redis(
defaults(
{
// Never try to reconnect when a node is lose,
// instead, waiting for a `MOVED` error and
// fetch the slots again.
retryStrategy: null,
// Offline queue should be enabled so that
// we don't need to wait for the `ready` event
// before sending commands to the node.
enableOfflineQueue: true,
readOnly: readOnly,
},
node,
this.redisOptions,
{ lazyConnect: true }
)
);
redis = this.createRedisFromOptions(node, readOnly);
this.nodes.all[key] = redis;
this.nodes[readOnly ? "slave" : "master"][key] = redis;

Expand Down
15 changes: 13 additions & 2 deletions lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import ClusterSubscriber from "./ClusterSubscriber";
import ConnectionPool from "./ConnectionPool";
import DelayQueue from "./DelayQueue";
import {
getConnectionName,
getConnectionName, getNodeKey,
getUniqueHostnamesFromOptions,
groupSrvRecords,
NodeKey,
Expand All @@ -37,6 +37,7 @@ import {
weightSrvRecords,
} from "./util";
import Deque = require("denque");
import ClusterSubscriberGroup from "./ClusterSubscriberGroup";

const debug = Debug("cluster");

Expand Down Expand Up @@ -96,6 +97,7 @@ class Cluster extends Commander {
private delayQueue: DelayQueue = new DelayQueue();
private offlineQueue = new Deque<OfflineQueueItem>();
private subscriber: ClusterSubscriber;
private shardedSubscribers: ClusterSubscriberGroup;
private slotsTimer: NodeJS.Timer;
private reconnectTimeout: NodeJS.Timer;
private isRefreshing = false;
Expand All @@ -120,6 +122,7 @@ class Cluster extends Commander {
EventEmitter.call(this);

this.startupNodes = startupNodes;
this.shardedSubscribers = new ClusterSubscriberGroup(this);
this.options = defaults({}, options, DEFAULT_CLUSTER_OPTIONS, this.options);

if (
Expand Down Expand Up @@ -547,7 +550,15 @@ class Cluster extends Commander {
Command.checkFlag("ENTER_SUBSCRIBER_MODE", command.name) ||
Command.checkFlag("EXIT_SUBSCRIBER_MODE", command.name)
) {
redis = _this.subscriber.getInstance();
if (command.name == "ssubscribe") {
const sub: ClusterSubscriber = _this.shardedSubscribers.getResponsibleSubscriber(targetSlot);
redis = sub.getInstance();
debug("Subscribing for channel " + command.getKeys() + " on node " + redis.options.port + ".")
}
else {
redis = _this.subscriber.getInstance();
}

if (!redis) {
command.reject(new AbortError("No subscriber for the cluster"));
return;
Expand Down
22 changes: 22 additions & 0 deletions test/cluster/basic.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { expect } from "chai";
import Redis, { Cluster } from "../../lib";


const masters = [30000, 30001, 30002];
const replicas = [30003, 30004, 30005];

Expand Down Expand Up @@ -148,4 +149,25 @@ describe("cluster", () => {
expect(await cluster2.get("prefix:foo")).to.eql("bar");
});
});


describe("Test if the client performs the hash-based sharding for simple set operations", () => {
it("Works when you don't get MOVED error responses", async () => {

// Verify that the cluster is configured with 3 master nodes
const cluster : Cluster = new Cluster([{ host: "127.0.0.1", port: masters[0] }]);
cluster.on("ready", () => {
expect(cluster.nodes("master").length).to.eql(3);
});

const keys = ["channel:test:3", "channel:test:2", "channel:test:0"]
for (const k of keys) {
let status: string = await cluster.set(k, "Test status per node");
expect(status).to.eql("OK");
let value: string = await cluster.get(k);
expect(value).to.eql("Test status per node");
}
})
});

});
Loading
Loading