Skip to content

Commit

Permalink
Move prepareClientToWrite out of loop for HGETALL command
Browse files Browse the repository at this point in the history
Similar to valkey-io#860 but this is for HGETALL families (HGETALL/HKEYS/HVALS).
This patch moves `prepareClientToWrite` out of the loop to reduce
the function overhead.

| test          | unstable(150c197) rpc | this patch (rpc) | improvements |
| ------------- | -------------------------- | ---------------- | ------------ |
| HGETALL h1    | 85084.66                   | 86926.29         |  2.16446772  |
| HGETALL h10   | 78400.62                   | 76893.5          | -1.922331737 |
| HGETALL h25   | 64487.01                   | 58802.77         | -8.814550403 |
| HGETALL h50   | 47587.32                   | 49360.78         |  3.726749058 |
| HGETALL h100  | 33028.37                   | 34454.25         |  4.317137055 |
| HGETALL h300  | 14628.22                   | 15540.98         |  6.239720212 |
| HGETALL h500  | 9593.52                    | 10395.44         |  8.358975642 |

Signed-off-by: Masahiro Ide <[email protected]>
  • Loading branch information
imasahiro authored and Masahiro Ide committed Oct 3, 2024
1 parent 150c197 commit b19eeba
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 30 deletions.
13 changes: 13 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,12 @@ void addReplyMapLen(client *c, long length) {
addReplyAggregateLen(c, length, prefix);
}

void addWritePreparedReplyMapLen(writePreparedClient *c, long length) {
int prefix = c->resp == 2 ? '*' : '%';
if (c->resp == 2) length *= 2;
_addReplyLongLongWithPrefix(c, length, prefix);
}

void addReplySetLen(client *c, long length) {
int prefix = c->resp == 2 ? '*' : '~';
addReplyAggregateLen(c, length, prefix);
Expand Down Expand Up @@ -1136,6 +1142,13 @@ void addReplyBulkSds(client *c, sds s) {
_addReplyToBufferOrList(c, "\r\n", 2);
}

void addWritePreparedReplyBulkSds(writePreparedClient *c, sds s) {
_addReplyLongLongWithPrefix(c, sdslen(s), '$');
_addReplyToBufferOrList(c, s, sdslen(s));
sdsfree(s);
_addReplyToBufferOrList(c, "\r\n", 2);
}

/* Set sds to a deferred reply (for symmetry with addReplyBulkSds it also frees the sds) */
void setDeferredReplyBulkSds(client *c, void *node, sds s) {
sds reply = sdscatprintf(sdsempty(), "$%d\r\n%s\r\n", (unsigned)sdslen(s), s);
Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2789,6 +2789,7 @@ void addReply(client *c, robj *obj);
void addReplyStatusLength(client *c, const char *s, size_t len);
void addReplySds(client *c, sds s);
void addReplyBulkSds(client *c, sds s);
void addWritePreparedReplyBulkSds(writePreparedClient *c, sds s);
void setDeferredReplyBulkSds(client *c, void *node, sds s);
void addReplyErrorObject(client *c, robj *err);
void addReplyOrErrorObject(client *c, robj *reply);
Expand All @@ -2808,6 +2809,7 @@ void addReplyLongLong(client *c, long long ll);
void addReplyArrayLen(client *c, long length);
void addWritePreparedReplyArrayLen(writePreparedClient *c, long length);
void addReplyMapLen(client *c, long length);
void addWritePreparedReplyMapLen(writePreparedClient *c, long length);
void addReplySetLen(client *c, long length);
void addReplyAttributeLen(client *c, long length);
void addReplyPushLen(client *c, long length);
Expand Down
62 changes: 32 additions & 30 deletions src/t_hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -791,20 +791,20 @@ void hstrlenCommand(client *c) {
addReplyLongLong(c, hashTypeGetValueLength(o, c->argv[2]->ptr));
}

static void addHashIteratorCursorToReply(client *c, hashTypeIterator *hi, int what) {
static void addHashIteratorCursorToReply(writePreparedClient *wpc, hashTypeIterator *hi, int what) {
if (hi->encoding == OBJ_ENCODING_LISTPACK) {
unsigned char *vstr = NULL;
unsigned int vlen = UINT_MAX;
long long vll = LLONG_MAX;

hashTypeCurrentFromListpack(hi, what, &vstr, &vlen, &vll);
if (vstr)
addReplyBulkCBuffer(c, vstr, vlen);
addWritePreparedReplyBulkCBuffer(wpc, vstr, vlen);
else
addReplyBulkLongLong(c, vll);
addWritePreparedReplyBulkLongLong(wpc, vll);
} else if (hi->encoding == OBJ_ENCODING_HT) {
sds value = hashTypeCurrentFromHashTable(hi, what);
addReplyBulkCBuffer(c, value, sdslen(value));
addWritePreparedReplyBulkCBuffer(wpc, value, sdslen(value));
} else {
serverPanic("Unknown hash encoding");
}
Expand All @@ -818,23 +818,24 @@ void genericHgetallCommand(client *c, int flags) {
robj *emptyResp = (flags & OBJ_HASH_KEY && flags & OBJ_HASH_VALUE) ? shared.emptymap[c->resp] : shared.emptyarray;
if ((o = lookupKeyReadOrReply(c, c->argv[1], emptyResp)) == NULL || checkType(c, o, OBJ_HASH)) return;

writePreparedClient *wpc = prepareClientForFutureWrites(c);
/* We return a map if the user requested keys and values, like in the
* HGETALL case. Otherwise to use a flat array makes more sense. */
length = hashTypeLength(o);
if (flags & OBJ_HASH_KEY && flags & OBJ_HASH_VALUE) {
addReplyMapLen(c, length);
addWritePreparedReplyMapLen(wpc, length);
} else {
addReplyArrayLen(c, length);
addWritePreparedReplyArrayLen(wpc, length);
}

hi = hashTypeInitIterator(o);
while (hashTypeNext(hi) != C_ERR) {
if (flags & OBJ_HASH_KEY) {
addHashIteratorCursorToReply(c, hi, OBJ_HASH_KEY);
addHashIteratorCursorToReply(wpc, hi, OBJ_HASH_KEY);
count++;
}
if (flags & OBJ_HASH_VALUE) {
addHashIteratorCursorToReply(c, hi, OBJ_HASH_VALUE);
addHashIteratorCursorToReply(wpc, hi, OBJ_HASH_VALUE);
count++;
}
}
Expand Down Expand Up @@ -874,18 +875,18 @@ void hscanCommand(client *c) {
scanGenericCommand(c, o, cursor);
}

static void hrandfieldReplyWithListpack(client *c, unsigned int count, listpackEntry *keys, listpackEntry *vals) {
static void hrandfieldReplyWithListpack(writePreparedClient *wpc, unsigned int count, listpackEntry *keys, listpackEntry *vals) {
for (unsigned long i = 0; i < count; i++) {
if (vals && c->resp > 2) addReplyArrayLen(c, 2);
if (vals && wpc->resp > 2) addWritePreparedReplyArrayLen(wpc, 2);
if (keys[i].sval)
addReplyBulkCBuffer(c, keys[i].sval, keys[i].slen);
addWritePreparedReplyBulkCBuffer(wpc, keys[i].sval, keys[i].slen);
else
addReplyBulkLongLong(c, keys[i].lval);
addWritePreparedReplyBulkLongLong(wpc, keys[i].lval);
if (vals) {
if (vals[i].sval)
addReplyBulkCBuffer(c, vals[i].sval, vals[i].slen);
addWritePreparedReplyBulkCBuffer(wpc, vals[i].sval, vals[i].slen);
else
addReplyBulkLongLong(c, vals[i].lval);
addWritePreparedReplyBulkLongLong(wpc, vals[i].lval);
}
}
}
Expand Down Expand Up @@ -921,25 +922,26 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
return;
}

writePreparedClient *wpc = prepareClientForFutureWrites(c);
/* CASE 1: The count was negative, so the extraction method is just:
* "return N random elements" sampling the whole set every time.
* This case is trivial and can be served without auxiliary data
* structures. This case is the only one that also needs to return the
* elements in random order. */
if (!uniq || count == 1) {
if (withvalues && c->resp == 2)
addReplyArrayLen(c, count * 2);
addWritePreparedReplyArrayLen(wpc, count * 2);
else
addReplyArrayLen(c, count);
addWritePreparedReplyArrayLen(wpc, count);
if (hash->encoding == OBJ_ENCODING_HT) {
sds key, value;
while (count--) {
dictEntry *de = dictGetFairRandomKey(hash->ptr);
key = dictGetKey(de);
value = dictGetVal(de);
if (withvalues && c->resp > 2) addReplyArrayLen(c, 2);
addReplyBulkCBuffer(c, key, sdslen(key));
if (withvalues) addReplyBulkCBuffer(c, value, sdslen(value));
if (withvalues && c->resp > 2) addWritePreparedReplyArrayLen(wpc, 2);
addWritePreparedReplyBulkCBuffer(c, key, sdslen(key));
if (withvalues) addWritePreparedReplyBulkCBuffer(c, value, sdslen(value));
if (c->flag.close_asap) break;
}
} else if (hash->encoding == OBJ_ENCODING_LISTPACK) {
Expand All @@ -953,7 +955,7 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
sample_count = count > limit ? limit : count;
count -= sample_count;
lpRandomPairs(hash->ptr, sample_count, keys, vals);
hrandfieldReplyWithListpack(c, sample_count, keys, vals);
hrandfieldReplyWithListpack(wpc, sample_count, keys, vals);
if (c->flag.close_asap) break;
}
zfree(keys);
Expand All @@ -965,19 +967,19 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
/* Initiate reply count, RESP3 responds with nested array, RESP2 with flat one. */
long reply_size = count < size ? count : size;
if (withvalues && c->resp == 2)
addReplyArrayLen(c, reply_size * 2);
addWritePreparedReplyArrayLen(wpc, reply_size * 2);
else
addReplyArrayLen(c, reply_size);
addWritePreparedReplyArrayLen(wpc, reply_size);

/* CASE 2:
* The number of requested elements is greater than the number of
* elements inside the hash: simply return the whole hash. */
if (count >= size) {
hashTypeIterator *hi = hashTypeInitIterator(hash);
while (hashTypeNext(hi) != C_ERR) {
if (withvalues && c->resp > 2) addReplyArrayLen(c, 2);
addHashIteratorCursorToReply(c, hi, OBJ_HASH_KEY);
if (withvalues) addHashIteratorCursorToReply(c, hi, OBJ_HASH_VALUE);
if (withvalues && c->resp > 2) addWritePreparedReplyArrayLen(wpc, 2);
addHashIteratorCursorToReply(wpc, hi, OBJ_HASH_KEY);
if (withvalues) addHashIteratorCursorToReply(wpc, hi, OBJ_HASH_VALUE);
}
hashTypeReleaseIterator(hi);
return;
Expand All @@ -996,7 +998,7 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
keys = zmalloc(sizeof(listpackEntry) * count);
if (withvalues) vals = zmalloc(sizeof(listpackEntry) * count);
serverAssert(lpRandomPairsUnique(hash->ptr, count, keys, vals) == count);
hrandfieldReplyWithListpack(c, count, keys, vals);
hrandfieldReplyWithListpack(wpc, count, keys, vals);
zfree(keys);
zfree(vals);
return;
Expand Down Expand Up @@ -1049,9 +1051,9 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
while ((de = dictNext(di)) != NULL) {
sds key = dictGetKey(de);
sds value = dictGetVal(de);
if (withvalues && c->resp > 2) addReplyArrayLen(c, 2);
addReplyBulkSds(c, key);
if (withvalues) addReplyBulkSds(c, value);
if (withvalues && c->resp > 2) addWritePreparedReplyArrayLen(wpc, 2);
addWritePreparedReplyBulkSds(wpc, key);
if (withvalues) addWritePreparedReplyBulkSds(wpc, value);
}

dictReleaseIterator(di);
Expand Down Expand Up @@ -1082,7 +1084,7 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
added++;

/* We can reply right away, so that we don't need to store the value in the dict. */
if (withvalues && c->resp > 2) addReplyArrayLen(c, 2);
if (withvalues && c->resp > 2) addWritePreparedReplyArrayLen(wpc, 2);
hashReplyFromListpackEntry(c, &key);
if (withvalues) hashReplyFromListpackEntry(c, &value);
}
Expand Down

0 comments on commit b19eeba

Please sign in to comment.