Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new MPUBLISH command #1657

Merged
merged 2 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions src/commands/cmd_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,48 @@ namespace redis {

class CommandPublish : public Commander {
public:
// mark is_write as false here because slave should be able to execute publish command
Status Execute(Server *svr, Connection *conn, std::string *output) override {
if (!svr->IsSlave()) {
// Compromise: can't replicate message to sub-replicas in a cascading-like structure.
// Replication relies on WAL seq, increase the seq on slave will break the replication, hence the compromise
// Compromise: can't replicate a message to sub-replicas in a cascading-like structure.
// Replication relies on WAL seq; increasing the seq on a replica will break the replication process,
// hence the compromise solution
redis::PubSub pubsub_db(svr->storage);

auto s = pubsub_db.Publish(args_[1], args_[2]);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
}

int receivers = svr->PublishMessage(args_[1], args_[2]);

*output = redis::Integer(receivers);

return Status::OK();
}
};

class CommandMPublish : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
int total_receivers = 0;

for (size_t i = 2; i < args_.size(); i++) {
if (!svr->IsSlave()) {
redis::PubSub pubsub_db(svr->storage);

auto s = pubsub_db.Publish(args_[1], args_[i]);
if (!s.ok()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that Publish will be likely to success?

return {Status::RedisExecErr, s.ToString()};
}
}

int receivers = svr->PublishMessage(args_[1], args_[i]);
total_receivers += receivers;
}

*output = redis::Integer(total_receivers);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would an array of int be better here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mapleFU Possibly, yes. However, according to this PR (redis/redis#12267) it's likely that MPUBLISH will have a single integer as a response.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh this LGTM. Seems this is not stable. Maybe we can mark "mpublish" as "experimental" or something, and make it able to change the syntax of it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I think it's not easy to utilize the return value of the PUBLISH command, except to check whether it is 0 or more. The actual goal of Pub/Sub pattern is to decouple publishers from subscribers and if I want to check the number of receivers I should know the exact number of them. And for example, if I know that there should be 5 subscribers but I received only 4 in response to the PUBLISH command, what should I do?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds ok. It's ok to leave it like this


return Status::OK();
}
};
Expand Down Expand Up @@ -132,7 +160,7 @@ class CommandPubSub : public Commander {
return Status::OK();
}

return {Status::RedisInvalidCmd, "Unknown subcommand or wrong number of arguments"};
return {Status::RedisInvalidCmd, errUnknownSubcommandOrWrongArguments};
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
Expand Down Expand Up @@ -161,7 +189,7 @@ class CommandPubSub : public Commander {
return Status::OK();
}

return {Status::RedisInvalidCmd, "Unknown subcommand or wrong number of arguments"};
return {Status::RedisInvalidCmd, errUnknownSubcommandOrWrongArguments};
}

private:
Expand All @@ -172,6 +200,7 @@ class CommandPubSub : public Commander {

REDIS_REGISTER_COMMANDS(
MakeCmdAttr<CommandPublish>("publish", 3, "read-only pub-sub", 0, 0, 0),
MakeCmdAttr<CommandMPublish>("mpublish", -3, "read-only pub-sub", 0, 0, 0),
MakeCmdAttr<CommandSubscribe>("subscribe", -2, "read-only pub-sub no-multi no-script", 0, 0, 0),
MakeCmdAttr<CommandUnSubscribe>("unsubscribe", -1, "read-only pub-sub no-multi no-script", 0, 0, 0),
MakeCmdAttr<CommandPSubscribe>("psubscribe", -2, "read-only pub-sub no-multi no-script", 0, 0, 0),
Expand Down
1 change: 1 addition & 0 deletions src/commands/error_constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ inline constexpr const char *errScoreIsNotValidFloat = "score is not a valid flo
inline constexpr const char *errValueIsNotFloat = "value is not a valid float";
inline constexpr const char *errNoMatchingScript = "NOSCRIPT No matching script. Please use EVAL";
inline constexpr const char *errUnknownOption = "unknown option";
inline constexpr const char *errUnknownSubcommandOrWrongArguments = "Unknown subcommand or wrong number of arguments";

} // namespace redis
57 changes: 57 additions & 0 deletions tests/gocase/unit/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,63 @@ func TestPubSub(t *testing.T) {
require.Equal(t, "hello", msg.Payload)
})

t.Run("MPUBLISH basics", func(t *testing.T) {
var (
channelName = "channel1"
msg1 = "hello"
msg2 = "world"
msg3 = "!"
msg4 = "foo-bar"
)

c1 := srv.NewClient()
defer func() { require.NoError(t, c1.Close()) }()
c2 := srv.NewClient()
defer func() { require.NoError(t, c2.Close()) }()

pubsub1 := c1.Subscribe(ctx, channelName)
pubsub2 := c2.Subscribe(ctx, channelName)

require.EqualValues(t, 1, receiveType(t, pubsub1, &redis.Subscription{}).Count)
require.EqualValues(t, 1, receiveType(t, pubsub2, &redis.Subscription{}).Count)

require.EqualValues(t, 6, rdb.Do(ctx, "MPUBLISH", channelName, msg1, msg2, msg3).Val())

msg := receiveType(t, pubsub1, &redis.Message{})
require.Equal(t, channelName, msg.Channel)
require.Equal(t, msg1, msg.Payload)

msg = receiveType(t, pubsub2, &redis.Message{})
require.Equal(t, channelName, msg.Channel)
require.Equal(t, msg1, msg.Payload)

msg = receiveType(t, pubsub1, &redis.Message{})
require.Equal(t, channelName, msg.Channel)
require.Equal(t, msg2, msg.Payload)

msg = receiveType(t, pubsub2, &redis.Message{})
require.Equal(t, channelName, msg.Channel)
require.Equal(t, msg2, msg.Payload)

msg = receiveType(t, pubsub1, &redis.Message{})
require.Equal(t, channelName, msg.Channel)
require.Equal(t, msg3, msg.Payload)

msg = receiveType(t, pubsub2, &redis.Message{})
require.Equal(t, channelName, msg.Channel)
require.Equal(t, msg3, msg.Payload)

require.EqualValues(t, 2, rdb.Do(ctx, "MPUBLISH", channelName, msg4).Val())

msg = receiveType(t, pubsub1, &redis.Message{})
require.Equal(t, channelName, msg.Channel)
require.Equal(t, msg4, msg.Payload)

msg = receiveType(t, pubsub2, &redis.Message{})
require.Equal(t, channelName, msg.Channel)
require.Equal(t, msg4, msg.Payload)
})

t.Run("PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments", func(t *testing.T) {
pubsub := rdb.Subscribe(ctx, "chan1", "chan2", "chan3")
require.EqualValues(t, 1, receiveType(t, pubsub, &redis.Subscription{}).Count)
Expand Down