Skip to content

Commit 42f1ee1

Browse files
authored
fix: fixed autopipeline performances. (#1226)
1 parent 71f2994 commit 42f1ee1

File tree

8 files changed

+90
-33
lines changed

8 files changed

+90
-33
lines changed

Diff for: .gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ node_modules
55
built
66

77
.vscode
8-
benchmarks/fixtures/*.txt
8+
benchmarks/fixtures/*.txt
9+

Diff for: lib/autoPipelining.ts

+2-14
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,6 @@ export const notAllowedAutoPipelineCommands = [
1818
"unpsubscribe",
1919
];
2020

21-
function findAutoPipeline(
22-
client,
23-
_commandName,
24-
...args: Array<string>
25-
): string {
26-
if (!client.isCluster) {
27-
return "main";
28-
}
29-
30-
// We have slot information, we can improve routing by grouping slots served by the same subset of nodes
31-
return client.slots[calculateSlot(args[0])].join(",");
32-
}
33-
3421
function executeAutoPipeline(client, slotKey: string) {
3522
/*
3623
If a pipeline is already executing, keep queueing up commands
@@ -116,7 +103,8 @@ export function executeWithAutoPipelining(
116103
});
117104
}
118105

119-
const slotKey = findAutoPipeline(client, commandName, ...args);
106+
// If we have slot information, we can improve routing by grouping slots served by the same subset of nodes
107+
const slotKey = client.isCluster ? client.slots[calculateSlot(args[0])].join(",") : 'main';
120108

121109
if (!client._autoPipelines.has(slotKey)) {
122110
const pipeline = client.pipeline();

Diff for: lib/cluster/index.ts

+24
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ class Cluster extends EventEmitter {
6969
private isRefreshing = false;
7070
public isCluster = true;
7171
private _autoPipelines: Map<string, typeof Pipeline> = new Map();
72+
private _groupsIds: {[key: string]: number} = {};
73+
private _groupsBySlot: number[] = Array(16384);
7274
private _runningAutoPipelines: Set<string> = new Set();
7375
private _readyDelayedCallbacks: CallbackFunction[] = [];
7476
public _addedScriptHashes: { [key: string]: any } = {};
@@ -188,7 +190,10 @@ class Cluster extends EventEmitter {
188190
return;
189191
}
190192

193+
// Make sure only one timer is active at a time
191194
clearInterval(this._addedScriptHashesCleanInterval);
195+
196+
// Start the script cache cleaning
192197
this._addedScriptHashesCleanInterval = setInterval(() => {
193198
this._addedScriptHashes = {};
194199
}, this.options.maxScriptsCachingTime);
@@ -627,6 +632,7 @@ class Cluster extends EventEmitter {
627632
} else {
628633
_this.slots[slot] = [key];
629634
}
635+
_this._groupsBySlot[slot] = _this._groupsIds[_this.slots[slot].join(';')];
630636
_this.connectionPool.findOrCreate(_this.natMapper(key));
631637
tryConnection();
632638
debug("refreshing slot caches... (triggered by MOVED error)");
@@ -860,6 +866,24 @@ class Cluster extends EventEmitter {
860866
}
861867
}
862868

869+
// Assign to each node keys a numeric value to make autopipeline comparison faster.
870+
this._groupsIds = Object.create(null);
871+
let j = 0;
872+
for (let i = 0; i < 16384; i++) {
873+
const target = (this.slots[i] || []).join(';');
874+
875+
if (!target.length) {
876+
this._groupsBySlot[i] = undefined;
877+
continue;
878+
}
879+
880+
if (!this._groupsIds[target]) {
881+
this._groupsIds[target] = ++j;
882+
}
883+
884+
this._groupsBySlot[i] = this._groupsIds[target];
885+
}
886+
863887
this.connectionPool.reset(nodes);
864888
callback();
865889
}, this.options.slotsRefreshTimeout)

Diff for: lib/pipeline.ts

+4-5
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,10 @@ import Commander from "./commander";
1515
*/
1616
function generateMultiWithNodes(redis, keys) {
1717
const slot = calculateSlot(keys[0]);
18-
const target = redis.slots[slot].join(",");
19-
18+
const target = redis._groupsBySlot[slot];
19+
2020
for (let i = 1; i < keys.length; i++) {
21-
const currentTarget = redis.slots[calculateSlot(keys[i])].join(",");
22-
23-
if (currentTarget !== target) {
21+
if (redis._groupsBySlot[calculateSlot(keys[i])] !== target) {
2422
return -1;
2523
}
2624
}
@@ -158,6 +156,7 @@ Pipeline.prototype.fillResult = function (value, position) {
158156
moved: function (slot, key) {
159157
_this.preferKey = key;
160158
_this.redis.slots[errv[1]] = [key];
159+
_this.redis._groupsBySlot[errv[1]] = _this.redis._groupsIds[_this.redis.slots[errv[1]].join(";")];
161160
_this.redis.refreshSlotsCache();
162161
_this.exec();
163162
},

Diff for: lib/redis/index.ts

+4
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,11 @@ Redis.prototype.connect = function (callback) {
304304
reject(new Error("Redis is already connecting/connected"));
305305
return;
306306
}
307+
308+
// Make sure only one timer is active at a time
307309
clearInterval(this._addedScriptHashesCleanInterval);
310+
311+
// Start the script cache cleaning
308312
this._addedScriptHashesCleanInterval = setInterval(() => {
309313
this._addedScriptHashes = {};
310314
}, this.options.maxScriptsCachingTime);

Diff for: test/functional/cluster/autopipelining.ts

+19-13
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ use(require("chai-as-promised"));
1111
Instead foo1 and foo2 are usually served by different nodes in a 3-nodes cluster.
1212
*/
1313
describe("autoPipelining for cluster", function () {
14+
function changeSlot(cluster, from, to) {
15+
cluster.slots[from] = cluster.slots[to];
16+
cluster._groupsBySlot[from] = cluster._groupsBySlot[to];
17+
}
18+
1419
beforeEach(() => {
1520
const slotTable = [
1621
[0, 5000, ["127.0.0.1", 30001]],
@@ -402,11 +407,12 @@ describe("autoPipelining for cluster", function () {
402407
const promise4 = cluster.set("foo6", "bar");
403408

404409
// Override slots to induce a failure
405-
const key1Slot = calculateKeySlot("foo1");
406-
const key2Slot = calculateKeySlot("foo2");
407-
const key5Slot = calculateKeySlot("foo5");
408-
cluster.slots[key1Slot] = cluster.slots[key2Slot];
409-
cluster.slots[key2Slot] = cluster.slots[key5Slot];
410+
const key1Slot = calculateKeySlot('foo1');
411+
const key2Slot = calculateKeySlot('foo2');
412+
const key5Slot = calculateKeySlot('foo5');
413+
414+
changeSlot(cluster, key1Slot, key2Slot);
415+
changeSlot(cluster, key2Slot, key5Slot);
410416

411417
await expect(promise1).to.eventually.be.rejectedWith(
412418
"All keys in the pipeline should belong to the same slots allocation group"
@@ -492,11 +498,11 @@ describe("autoPipelining for cluster", function () {
492498
expect(cluster.autoPipelineQueueSize).to.eql(4);
493499

494500
// Override slots to induce a failure
495-
const key1Slot = calculateKeySlot("foo1");
496-
const key2Slot = calculateKeySlot("foo2");
497-
const key5Slot = calculateKeySlot("foo5");
498-
cluster.slots[key1Slot] = cluster.slots[key2Slot];
499-
cluster.slots[key2Slot] = cluster.slots[key5Slot];
501+
const key1Slot = calculateKeySlot('foo1');
502+
const key2Slot = calculateKeySlot('foo2');
503+
const key5Slot = calculateKeySlot('foo5');
504+
changeSlot(cluster, key1Slot, key2Slot);
505+
changeSlot(cluster, key2Slot, key5Slot);
500506
});
501507
});
502508

@@ -541,9 +547,9 @@ describe("autoPipelining for cluster", function () {
541547

542548
expect(cluster.autoPipelineQueueSize).to.eql(3);
543549

544-
const key1Slot = calculateKeySlot("foo1");
545-
const key2Slot = calculateKeySlot("foo2");
546-
cluster.slots[key1Slot] = cluster.slots[key2Slot];
550+
const key1Slot = calculateKeySlot('foo1');
551+
const key2Slot = calculateKeySlot('foo2');
552+
changeSlot(cluster, key1Slot, key2Slot);
547553
});
548554
});
549555
});

Diff for: test/functional/cluster/connect.ts

+18
Original file line numberDiff line numberDiff line change
@@ -438,4 +438,22 @@ describe("cluster:disconnect", function () {
438438
done();
439439
});
440440
});
441+
442+
it("should clear the added script hashes interval even when no connection succeeded", function (done) {
443+
const cluster = new Cluster([{ host: "127.0.0.1", port: "0" }], {
444+
enableReadyCheck: false,
445+
});
446+
447+
let attempt = 0;
448+
cluster.on("error", function () {
449+
if(attempt < 5) {
450+
attempt ++;
451+
return
452+
}
453+
cluster.quit();
454+
455+
expect(cluster._addedScriptHashesCleanInterval).to.be.null;
456+
done();
457+
});
458+
});
441459
});

Diff for: test/functional/connection.ts

+17
Original file line numberDiff line numberDiff line change
@@ -549,4 +549,21 @@ describe("disconnection", function () {
549549
}
550550
});
551551
});
552+
553+
it("should clear the added script hashes interval even when no connection succeeded", function (done) {
554+
let attempt = 0;
555+
const redis = new Redis(0, 'localhost');
556+
557+
redis.on("error", function () {
558+
if(attempt < 5) {
559+
attempt ++;
560+
return
561+
}
562+
563+
redis.quit();
564+
565+
expect(redis._addedScriptHashesCleanInterval).to.be.null;
566+
done();
567+
});
568+
});
552569
});

0 commit comments

Comments
 (0)