Skip to content

Commit

Permalink
Enable module API SendClusterMessage to use light message header type (
Browse files Browse the repository at this point in the history
…#1572)

This change uses the light message header for cluster module message
type to be sent to a target node or broadcast across the cluster. The
light message header was introduced in Valkey 8 to reduce network in/out
over clusterbus and have been already used for pub/sub message transfer.
It's only used if the target node supports the light message header
(~30B) otherwise it falls back to using the regular header (~2KB). The
module API `VM_SendClusterMessage` remains as is.

---------

Signed-off-by: Harkrishn Patro <[email protected]>
Signed-off-by: Harkrishn Patro <[email protected]>
Co-authored-by: Viktor Söderqvist <[email protected]>
  • Loading branch information
hpatro and zuiderkwast authored Feb 3, 2025
1 parent 26c6f1a commit e7dbce3
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 47 deletions.
150 changes: 116 additions & 34 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,13 @@ dictType clusterSdsToListType = {
typedef struct {
enum {
ITER_DICT,
ITER_LIST
ITER_LIST,
ITER_NODE,
} type;
union {
dictIterator di;
listIter li;
clusterNode *node;
};
} ClusterNodeIterator;

Expand All @@ -215,6 +217,11 @@ static void clusterNodeIterInitMyShard(ClusterNodeIterator *iter) {
listRewind(nodes, &iter->li);
}

static void clusterNodeIterNode(ClusterNodeIterator *iter, clusterNode *node) {
iter->type = ITER_NODE;
iter->node = node;
}

static clusterNode *clusterNodeIterNext(ClusterNodeIterator *iter) {
switch (iter->type) {
case ITER_DICT: {
Expand All @@ -229,13 +236,24 @@ static clusterNode *clusterNodeIterNext(ClusterNodeIterator *iter) {
/* Return the value associated with the node, or NULL if no more nodes */
return ln ? listNodeValue(ln) : NULL;
}

case ITER_NODE: {
if (iter->node) {
clusterNode *node = iter->node;
iter->node = NULL;
return node;
}
return NULL;
}
}
serverPanic("Unknown iterator type %d", iter->type);
}

static void clusterNodeIterReset(ClusterNodeIterator *iter) {
if (iter->type == ITER_DICT) {
dictResetIterator(&iter->di);
} else if (iter->type == ITER_NODE) {
iter->node = NULL;
}
}

Expand Down Expand Up @@ -988,7 +1006,7 @@ void clusterUpdateMyselfFlags(void) {
int nofailover = server.cluster_replica_no_failover ? CLUSTER_NODE_NOFAILOVER : 0;
myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
myself->flags |= nofailover;
myself->flags |= CLUSTER_NODE_LIGHT_HDR_SUPPORTED;
myself->flags |= CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED | CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED;
if (myself->flags != oldflags) {
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE);
}
Expand Down Expand Up @@ -3019,18 +3037,34 @@ static void clusterProcessPublishPacket(clusterMsgDataPublish *publish_data, uin
}
}

static void clusterProcessLightPacket(clusterLink *link, uint16_t type) {
clusterMsgLight *hdr = (clusterMsgLight *)link->rcvbuf;
static void clusterProcessModulePacket(clusterMsgModule *module_data, clusterNode *sender) {
if (!sender) return; /* Protect the module from unknown nodes. */
/* We need to route this message back to the right module subscribed
* for the right message type. */
uint64_t module_id = module_data->module_id; /* Endian-safe ID */
uint32_t len = ntohl(module_data->len);
uint8_t type = module_data->type;
unsigned char *payload = module_data->bulk_data;
moduleCallClusterReceivers(sender->name, module_id, type, payload, len);
}

static void clusterProcessLightPacket(clusterNode *sender, clusterLink *link, uint16_t type) {
clusterMsgLight *hdr = (clusterMsgLight *)link->rcvbuf;
serverLog(LL_DEBUG, "Processing light packet of type: %s", clusterGetMessageTypeString(type));
if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
clusterProcessPublishPacket(&hdr->data.publish.msg, type);
} else if (type == CLUSTERMSG_TYPE_MODULE) {
clusterProcessModulePacket(&hdr->data.module.msg, sender);
} else {
serverAssert(0);
}
}

static inline int messageTypeSupportsLightHdr(uint16_t type) {
switch (type) {
case CLUSTERMSG_TYPE_PUBLISH: return 1;
case CLUSTERMSG_TYPE_PUBLISHSHARD: return 1;
case CLUSTERMSG_TYPE_MODULE: return 1;
}
return 0;
}
Expand Down Expand Up @@ -3123,8 +3157,14 @@ int clusterIsValidPacket(clusterLink *link) {
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataUpdate);
} else if (type == CLUSTERMSG_TYPE_MODULE) {
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgModule) - 3 + ntohl(hdr->data.module.msg.len);
if (is_light) {
clusterMsgLight *hdr_light = (clusterMsgLight *)link->rcvbuf;
explen = sizeof(clusterMsgLight) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgModule) - 3 + ntohl(hdr_light->data.module.msg.len);
} else {
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgModule) - 3 + ntohl(hdr->data.module.msg.len);
}
} else {
/* We don't know this type of packet, so we assume it's well formed. */
explen = totlen;
Expand Down Expand Up @@ -3178,7 +3218,7 @@ int clusterProcessPacket(clusterLink *link) {
}
clusterNode *sender = link->node;
sender->data_received = now;
clusterProcessLightPacket(link, type);
clusterProcessLightPacket(sender, link, type);
return 1;
}

Expand All @@ -3195,10 +3235,16 @@ int clusterProcessPacket(clusterLink *link) {

/* Checks if the node supports light message hdr */
if (sender) {
if (flags & CLUSTER_NODE_LIGHT_HDR_SUPPORTED) {
sender->flags |= CLUSTER_NODE_LIGHT_HDR_SUPPORTED;
if (flags & CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED) {
sender->flags |= CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED;
} else {
sender->flags &= ~CLUSTER_NODE_LIGHT_HDR_SUPPORTED;
sender->flags &= ~CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED;
}

if (flags & CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED) {
sender->flags |= CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED;
} else {
sender->flags &= ~CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED;
}
}

Expand Down Expand Up @@ -3701,14 +3747,7 @@ int clusterProcessPacket(clusterLink *link) {
* config accordingly. */
clusterUpdateSlotsConfigWith(n, reportedConfigEpoch, hdr->data.update.nodecfg.slots);
} else if (type == CLUSTERMSG_TYPE_MODULE) {
if (!sender) return 1; /* Protect the module from unknown nodes. */
/* We need to route this message back to the right module subscribed
* for the right message type. */
uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */
uint32_t len = ntohl(hdr->data.module.msg.len);
uint8_t type = hdr->data.module.msg.type;
unsigned char *payload = hdr->data.module.msg.bulk_data;
moduleCallClusterReceivers(sender->name, module_id, type, payload, len);
clusterProcessModulePacket(&hdr->data.module.msg, sender);
} else {
serverLog(LL_WARNING, "Received unknown packet type: %d", type);
}
Expand Down Expand Up @@ -4322,26 +4361,69 @@ void clusterSendUpdate(clusterLink *link, clusterNode *node) {
clusterMsgSendBlockDecrRefCount(msgblock);
}

/* Send a MODULE message.
/* Create a MODULE message block.
*
* If link is NULL, then the message is broadcasted to the whole cluster. */
void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type, const char *payload, uint32_t len) {
uint32_t msglen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
* If is_light is 1, then build a message block with `clusterMsgLight` struct else `clusterMsg`. */
static clusterMsgSendBlock *createModuleMsgBlock(int64_t module_id, uint8_t type, const char *payload, uint32_t len, int is_light) {
uint32_t msglen;
int msgtype;
clusterMsgSendBlock *msgblock;
clusterMsgModule *module_data;

if (is_light) {
msglen = sizeof(clusterMsgLight) - sizeof(union clusterMsgData);
msgtype = CLUSTERMSG_TYPE_MODULE | CLUSTERMSG_LIGHT;
} else {
msglen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
msgtype = CLUSTERMSG_TYPE_MODULE;
}
msglen += sizeof(clusterMsgModule) - 3 + len;
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_MODULE, msglen);
msgblock = createClusterMsgSendBlock(msgtype, msglen);

clusterMsg *hdr = getMessageFromSendBlock(msgblock);
hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */
hdr->data.module.msg.type = type;
hdr->data.module.msg.len = htonl(len);
memcpy(hdr->data.module.msg.bulk_data, payload, len);
if (is_light) {
clusterMsgLight *hdr = getLightMessageFromSendBlock(msgblock);
module_data = &hdr->data.module.msg;
} else {
clusterMsg *hdr = getMessageFromSendBlock(msgblock);
module_data = &hdr->data.module.msg;
}

if (link)
clusterSendMessage(link, msgblock);
else
clusterBroadcastMessage(msgblock);
module_data->module_id = module_id; /* Already endian adjusted */
module_data->type = type;
module_data->len = htonl(len);
memcpy(module_data->bulk_data, payload, len);

clusterMsgSendBlockDecrRefCount(msgblock);
return msgblock;
}

/* Send a MODULE message.
*
* If link is NULL, then the message is broadcasted to the whole cluster. */
void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type, const char *payload, uint32_t len) {
clusterMsgSendBlock *msgblock[CLUSTERMSG_HDR_NUM] = {0};
ClusterNodeIterator iter;

if (link) {
clusterNodeIterNode(&iter, link->node);
} else {
/* Broadcast to all the nodes. */
clusterNodeIterInitAllNodes(&iter);
}
clusterNode *node;
while ((node = clusterNodeIterNext(&iter)) != NULL) {
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
int is_light = nodeSupportsLightMsgHdrForModule(node) ? CLUSTERMSG_HDR_LIGHT : CLUSTERMSG_HDR_NORMAL;
if (msgblock[is_light] == NULL) {
msgblock[is_light] = createModuleMsgBlock(module_id, type, payload, len, is_light);
}
clusterSendMessage(node->link, msgblock[is_light]);
}
clusterNodeIterReset(&iter);
for (int hdr_type = CLUSTERMSG_HDR_NORMAL; hdr_type < CLUSTERMSG_HDR_NUM; hdr_type++) {
if (msgblock[hdr_type]) {
clusterMsgSendBlockDecrRefCount(msgblock[hdr_type]);
}
}
}

/* This function gets a cluster node ID string as target, the same way the nodes
Expand Down Expand Up @@ -4391,7 +4473,7 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
clusterNode *node;
while ((node = clusterNodeIterNext(&iter)) != NULL) {
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
if (nodeSupportsLightMsgHdr(node)) {
if (nodeSupportsLightMsgHdrForPubSub(node)) {
clusterSendMessage(node->link, msgblock_light);
} else {
if (msgblock == NULL) {
Expand Down
35 changes: 22 additions & 13 deletions src/cluster_legacy.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,19 @@ typedef struct clusterLink {
} clusterLink;

/* Cluster node flags and macros. */
#define CLUSTER_NODE_PRIMARY (1 << 0) /* The node is a primary */
#define CLUSTER_NODE_REPLICA (1 << 1) /* The node is a replica */
#define CLUSTER_NODE_PFAIL (1 << 2) /* Failure? Need acknowledge */
#define CLUSTER_NODE_FAIL (1 << 3) /* The node is believed to be malfunctioning */
#define CLUSTER_NODE_MYSELF (1 << 4) /* This node is myself */
#define CLUSTER_NODE_HANDSHAKE (1 << 5) /* We have still to exchange the first ping */
#define CLUSTER_NODE_NOADDR (1 << 6) /* We don't know the address of this node */
#define CLUSTER_NODE_MEET (1 << 7) /* Send a MEET message to this node */
#define CLUSTER_NODE_MIGRATE_TO (1 << 8) /* Primary eligible for replica migration. */
#define CLUSTER_NODE_NOFAILOVER (1 << 9) /* Replica will not try to failover. */
#define CLUSTER_NODE_EXTENSIONS_SUPPORTED (1 << 10) /* This node supports extensions. */
#define CLUSTER_NODE_LIGHT_HDR_SUPPORTED (1 << 11) /* This node supports light pubsub message header. */
#define CLUSTER_NODE_PRIMARY (1 << 0) /* The node is a primary */
#define CLUSTER_NODE_REPLICA (1 << 1) /* The node is a replica */
#define CLUSTER_NODE_PFAIL (1 << 2) /* Failure? Need acknowledge */
#define CLUSTER_NODE_FAIL (1 << 3) /* The node is believed to be malfunctioning */
#define CLUSTER_NODE_MYSELF (1 << 4) /* This node is myself */
#define CLUSTER_NODE_HANDSHAKE (1 << 5) /* We have still to exchange the first ping */
#define CLUSTER_NODE_NOADDR (1 << 6) /* We don't know the address of this node */
#define CLUSTER_NODE_MEET (1 << 7) /* Send a MEET message to this node */
#define CLUSTER_NODE_MIGRATE_TO (1 << 8) /* Primary eligible for replica migration. */
#define CLUSTER_NODE_NOFAILOVER (1 << 9) /* Replica will not try to failover. */
#define CLUSTER_NODE_EXTENSIONS_SUPPORTED (1 << 10) /* This node supports extensions. */
#define CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED (1 << 11) /* This node supports light message header for publish type. */
#define CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED (1 << 12) /* This node supports light message header for module type. */
#define CLUSTER_NODE_NULL_NAME \
"\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \
"\000\000\000\000\000\000\000\000\000\000\000\000"
Expand All @@ -67,7 +68,8 @@ typedef struct clusterLink {
#define nodeFailed(n) ((n)->flags & CLUSTER_NODE_FAIL)
#define nodeCantFailover(n) ((n)->flags & CLUSTER_NODE_NOFAILOVER)
#define nodeSupportsExtensions(n) ((n)->flags & CLUSTER_NODE_EXTENSIONS_SUPPORTED)
#define nodeSupportsLightMsgHdr(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_SUPPORTED)
#define nodeSupportsLightMsgHdrForPubSub(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED)
#define nodeSupportsLightMsgHdrForModule(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED)
#define nodeInNormalState(n) (!((n)->flags & (CLUSTER_NODE_HANDSHAKE | CLUSTER_NODE_MEET | CLUSTER_NODE_PFAIL | CLUSTER_NODE_FAIL)))

/* This structure represent elements of node->fail_reports. */
Expand Down Expand Up @@ -104,6 +106,13 @@ typedef struct clusterNodeFailReport {
/* We check for the modifier bit to determine if the message is sent using light header.*/
#define IS_LIGHT_MESSAGE(type) ((type) & CLUSTERMSG_LIGHT)

/* Types of header supported over the cluster bus. */
typedef enum {
CLUSTERMSG_HDR_NORMAL = 0, /* This corresponds to `clusterMsg` struct. */
CLUSTERMSG_HDR_LIGHT, /* This corresponds to `clusterMsgLight` struct. */
CLUSTERMSG_HDR_NUM, /* Overall count of header type supported. */
} clusterMsgHdrType;

/* Initially we don't know our "name", but we'll find it once we connect
* to the first node, using the getsockname() function. Then we'll use this
* address for all the next messages. */
Expand Down
3 changes: 3 additions & 0 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -8983,6 +8983,9 @@ void VM_RegisterClusterMessageReceiver(ValkeyModuleCtx *ctx,
* at the specified target, which is a VALKEYMODULE_NODE_ID_LEN bytes node ID, as
* returned by the receiver callback or by the nodes iteration functions.
*
* In Valkey 8.1 and later, the cluster protocol overhead for this message is
* ~30B, to compare with earlier versions where it's ~2KB.
*
* The function returns VALKEYMODULE_OK if the message was successfully sent,
* otherwise if the node is not connected or such node ID does not map to any
* known cluster node, VALKEYMODULE_ERR is returned. */
Expand Down
8 changes: 8 additions & 0 deletions tests/assets/minimal-cluster.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Minimal configuration for testing.
always-show-logo yes
daemonize no
pidfile /var/run/valkey.pid
loglevel verbose
enable-debug-command yes
cluster-enabled yes
enable-module-command yes
16 changes: 16 additions & 0 deletions tests/modules/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,29 @@ int test_cluster_shards(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int arg
return VALKEYMODULE_OK;
}

#define MSGTYPE_DING 1
#define MSGTYPE_DONG 2

/* test.pingall */
int PingallCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);

ValkeyModule_SendClusterMessage(ctx, NULL, MSGTYPE_DING, "Hey", 3);
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
}

int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);

if (ValkeyModule_Init(ctx, "cluster", 1, VALKEYMODULE_APIVER_1)== VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx, "test.pingall", PingallCommand, "readonly", 0, 0, 0) ==
VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

if (ValkeyModule_CreateCommand(ctx, "test.cluster_slots", test_cluster_slots, "", 0, 0, 0) == VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

Expand Down
19 changes: 19 additions & 0 deletions tests/unit/moduleapi/cluster.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,25 @@ source tests/support/cli.tcl
# cluster creation is complicated with TLS, and the current tests don't really need that coverage
tags {tls:skip external:skip cluster modules} {

set testmodule [file normalize tests/modules/cluster.so]
set modules [list loadmodule $testmodule]
start_cluster 3 0 [list config_lines $modules] {
set node1 [srv 0 client]
set node2 [srv -1 client]
set node3 [srv -2 client]

test "Cluster module send message API - VM_SendClusterMessage" {
assert_equal OK [$node1 test.pingall]
assert_equal 2 [CI 0 cluster_stats_messages_module_sent]
wait_for_condition 50 100 {
[CI 1 cluster_stats_messages_module_received] eq 1 &&
[CI 2 cluster_stats_messages_module_received] eq 1
} else {
fail "node 2 or node 3 didn't receive cluster module message"
}
}
}

set testmodule_nokey [file normalize tests/modules/blockonbackground.so]
set testmodule_blockedclient [file normalize tests/modules/blockedclient.so]
set testmodule [file normalize tests/modules/blockonkeys.so]
Expand Down
Loading

0 comments on commit e7dbce3

Please sign in to comment.