Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add the support of the ZDIFF, ZDIFFSTORE commands #1494

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions src/commands/cmd_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "error_constants.h"
#include "server/redis_reply.h"
#include "server/server.h"
#include "string_util.h"
#include "types/redis_zset.h"

namespace redis {
Expand Down Expand Up @@ -169,6 +170,113 @@ class CommandZCard : public Commander {
}
};

/*
* description:
* syntax: `ZDIFF numkeys key [key ...] [WITHSCORES]`
*/
class CommandZDiff : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_numkey = ParseInt<int>(args[1], 10);
if (!parse_numkey) {
return {Status::RedisParseErr, errValueNotInteger};
}
if (*parse_numkey <= 0) {
return {Status::RedisParseErr, errValueMustBePositive};
}

numkeys_ = *parse_numkey;
// for example: ZDIFF 2 zset1 zset2 WITHSCORES
if (args.size() == numkeys_ + 3 && util::ToLower(args.back()) == "withscores") {
with_scores_ = true;
return Commander::Parse(args);
}

if (args.size() != numkeys_ + 2) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}

return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
std::vector<Slice> keys;
for (size_t i = 2; i < numkeys_ + 2; i++) {
keys.emplace_back(args_[i]);
}

std::vector<MemberScore> mscores;
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
auto s = zset_db.Diff(keys, &mscores);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

output->append(redis::MultiLen(mscores.size() * (with_scores_ ? 2 : 1)));
for (const auto &ms : mscores) {
output->append(redis::BulkString(ms.member));
if (with_scores_) output->append(redis::BulkString(util::Float2String(ms.score)));
}

return Status::OK();
}

static CommandKeyRange Range(const std::vector<std::string> &args) {
int num_key = *ParseInt<int>(args[1], 10);
return {2, 1 + num_key, 1};
}

private:
uint64_t numkeys_{0};
bool with_scores_{false};
};

/*
* description:
* syntax: `ZDIFFSTORE destination numkeys key [key ...]`
*/
class CommandZDiffStore : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_numkey = ParseInt<int>(args[2], 10);
if (!parse_numkey) {
return {Status::RedisParseErr, errValueNotInteger};
}
if (*parse_numkey <= 0) {
return {Status::RedisParseErr, errValueMustBePositive};
}

numkeys_ = *parse_numkey;
// for example: ZDIFFSTORE out 2 zset1 zset2
if (args.size() != numkeys_ + 3) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
std::vector<Slice> keys;
for (size_t i = 3; i < args_.size(); i++) {
keys.emplace_back(args_[i]);
}

uint64_t ret = 0;
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
if (auto s = zset_db.DiffStore(args_[1], keys, &ret); !s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::Integer(ret);
return Status::OK();
}

// todo(infdahai): it exists issues.
static CommandKeyRange Range(const std::vector<std::string> &args) { return {2, -1, 1}; }

private:
uint64_t numkeys_{0};
};

class CommandZIncrBy : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Expand Down Expand Up @@ -1260,6 +1368,8 @@ class CommandZScan : public CommandSubkeyScanBase {
REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandZAdd>("zadd", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandZCard>("zcard", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandZCount>("zcount", 4, "read-only", 1, 1, 1),
MakeCmdAttr<CommandZDiff>("zdiff", -3, "read-only", CommandZDiff::Range),
MakeCmdAttr<CommandZDiffStore>("zdiffstore", -4, "write", CommandZDiffStore::Range),
MakeCmdAttr<CommandZIncrBy>("zincrby", 4, "write", 1, 1, 1),
MakeCmdAttr<CommandZInterStore>("zinterstore", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandZLexCount>("zlexcount", 4, "read-only", 1, 1, 1),
Expand Down
47 changes: 41 additions & 6 deletions src/types/redis_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ rocksdb::Status ZSet::GetMetadata(const Slice &ns_key, ZSetMetadata *metadata) {
return Database::GetMetadata(kRedisZSet, ns_key, metadata);
}

rocksdb::Status ZSet::Add(const Slice &user_key, ZAddFlags flags, MemberScores *mscores, uint64_t *added_cnt) {
rocksdb::Status ZSet::Add(const Slice &user_key, ZAddFlags flags, MemberScoresTy *mscores, uint64_t *added_cnt) {
*added_cnt = 0;

std::string ns_key;
Expand Down Expand Up @@ -158,7 +158,42 @@ rocksdb::Status ZSet::IncrBy(const Slice &user_key, const Slice &member, double
return rocksdb::Status::OK();
}

rocksdb::Status ZSet::Pop(const Slice &user_key, int count, bool min, MemberScores *mscores) {
rocksdb::Status ZSet::Diff(const std::vector<Slice> &keys, MemberScoresTy *mscores) {
mscores->clear();
RangeScoreSpec spec;
uint64_t removed_cnt{};
MemberScoresTy source_mscores;
auto s = RangeByScore(keys[0], spec, source_mscores, &removed_cnt);
if (!s.ok()) return s;

std::map<std::string, bool> exclude_mscores;
MemberScoresTy target_mscores;
for (size_t i = 1; i < keys.size(); i++) {
s = RangeByScore(keys[i], &target_mscores);
if (!s.ok()) return s;
for (const auto &mscore : target_mscores) {
exclude_mscores[mscore.member] = true;
}
}

for (const auto &mscore : source_mscores) {
if (exclude_mscores.find(mscore.member) == exclude_mscores.end()) {
mscores->emplace_back(mscore);
}
}

return rocksdb::Status::OK();
}

rocksdb::Status ZSet::DiffStore(const Slice &dst, const std::vector<Slice> &keys, uint64_t *saved_cnt) {
std::vector<MemberScore> mscores;
auto s = Diff(keys, &mscores);
if (!s.ok()) return s;
*saved_cnt = mscores.size();
return Overwrite(dst, mscores);
}

rocksdb::Status ZSet::Pop(const Slice &user_key, int count, bool min, MemberScoresTy *mscores) {
mscores->clear();

std::string ns_key;
Expand Down Expand Up @@ -219,7 +254,7 @@ rocksdb::Status ZSet::Pop(const Slice &user_key, int count, bool min, MemberScor
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status ZSet::RangeByRank(const Slice &user_key, const RangeRankSpec &spec, MemberScores *mscores,
rocksdb::Status ZSet::RangeByRank(const Slice &user_key, const RangeRankSpec &spec, MemberScoresTy *mscores,
uint64_t *removed_cnt) {
if (mscores) mscores->clear();

Expand Down Expand Up @@ -302,7 +337,7 @@ rocksdb::Status ZSet::RangeByRank(const Slice &user_key, const RangeRankSpec &sp
return rocksdb::Status::OK();
}

rocksdb::Status ZSet::RangeByScore(const Slice &user_key, const RangeScoreSpec &spec, MemberScores *mscores,
rocksdb::Status ZSet::RangeByScore(const Slice &user_key, const RangeScoreSpec &spec, MemberScoresTy *mscores,
uint64_t *removed_cnt) {
if (mscores) mscores->clear();

Expand Down Expand Up @@ -421,7 +456,7 @@ rocksdb::Status ZSet::RangeByScore(const Slice &user_key, const RangeScoreSpec &
return rocksdb::Status::OK();
}

rocksdb::Status ZSet::RangeByLex(const Slice &user_key, const RangeLexSpec &spec, MemberScores *mscores,
rocksdb::Status ZSet::RangeByLex(const Slice &user_key, const RangeLexSpec &spec, MemberScoresTy *mscores,
uint64_t *removed_cnt) {
if (mscores) mscores->clear();

Expand Down Expand Up @@ -621,7 +656,7 @@ rocksdb::Status ZSet::Rank(const Slice &user_key, const Slice &member, bool reve
return rocksdb::Status::OK();
}

rocksdb::Status ZSet::Overwrite(const Slice &user_key, const MemberScores &mscores) {
rocksdb::Status ZSet::Overwrite(const Slice &user_key, const MemberScoresTy &mscores) {
std::string ns_key;
AppendNamespacePrefix(user_key, &ns_key);

Expand Down
19 changes: 10 additions & 9 deletions src/types/redis_zset.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,33 +92,34 @@ class ZSet : public SubKeyScanner {
explicit ZSet(engine::Storage *storage, const std::string &ns)
: SubKeyScanner(storage, ns), score_cf_handle_(storage->GetCFHandle("zset_score")) {}

using Members = std::vector<std::string>;
using MemberScores = std::vector<MemberScore>;
using MembersTy = std::vector<std::string>;
using MemberScoresTy = std::vector<MemberScore>;

rocksdb::Status Add(const Slice &user_key, ZAddFlags flags, MemberScores *mscores, uint64_t *added_cnt);
rocksdb::Status Add(const Slice &user_key, ZAddFlags flags, MemberScoresTy *mscores, uint64_t *added_cnt);
rocksdb::Status Card(const Slice &user_key, uint64_t *size);
rocksdb::Status IncrBy(const Slice &user_key, const Slice &member, double increment, double *score);
rocksdb::Status Rank(const Slice &user_key, const Slice &member, bool reversed, int *member_rank);
rocksdb::Status Remove(const Slice &user_key, const std::vector<Slice> &members, uint64_t *removed_cnt);
rocksdb::Status Pop(const Slice &user_key, int count, bool min, MemberScores *mscores);
rocksdb::Status Pop(const Slice &user_key, int count, bool min, MemberScoresTy *mscores);
rocksdb::Status Score(const Slice &user_key, const Slice &member, double *score);
rocksdb::Status Scan(const Slice &user_key, const std::string &cursor, uint64_t limit,
const std::string &member_prefix, std::vector<std::string> *members,
std::vector<double> *scores = nullptr);
rocksdb::Status Overwrite(const Slice &user_key, const MemberScores &mscores);
rocksdb::Status Diff(const std::vector<Slice> &keys, MemberScoresTy *mscores);
rocksdb::Status Overwrite(const Slice &user_key, const MemberScoresTy &mscores);
rocksdb::Status InterStore(const Slice &dst, const std::vector<KeyWeight> &keys_weights,
AggregateMethod aggregate_method, uint64_t *saved_cnt);
rocksdb::Status DiffStore(const Slice &dst, const std::vector<Slice> &keys, uint64_t *saved_cnt);
rocksdb::Status UnionStore(const Slice &dst, const std::vector<KeyWeight> &keys_weights,
AggregateMethod aggregate_method, uint64_t *saved_cnt);
rocksdb::Status MGet(const Slice &user_key, const std::vector<Slice> &members, std::map<std::string, double> *scores);
rocksdb::Status GetMetadata(const Slice &ns_key, ZSetMetadata *metadata);

rocksdb::Status Count(const Slice &user_key, const RangeScoreSpec &spec, uint64_t *size);
rocksdb::Status RangeByRank(const Slice &user_key, const RangeRankSpec &spec, MemberScores *mscores,
rocksdb::Status RangeByRank(const Slice &user_key, const RangeRankSpec &spec, MemberScoresTy *mscores,
uint64_t *removed_cnt);
rocksdb::Status RangeByScore(const Slice &user_key, const RangeScoreSpec &spec, MemberScores *mscores,
rocksdb::Status RangeByScore(const Slice &user_key, const RangeScoreSpec &spec, MemberScoresTy *mscores,
uint64_t *removed_cnt);
rocksdb::Status RangeByLex(const Slice &user_key, const RangeLexSpec &spec, MemberScores *mscores,
rocksdb::Status RangeByLex(const Slice &user_key, const RangeLexSpec &spec, MemberScoresTy *mscores,
uint64_t *removed_cnt);

private:
Expand Down
24 changes: 24 additions & 0 deletions tests/cppunit/types/zset_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -398,3 +398,27 @@ TEST_F(RedisZSetTest, Rank) {
}
zset_->Del(key_);
}

TEST_F(RedisZSetTest, Diff) {
uint64_t ret = 0;
std::vector<MemberScore> mscores;
for (size_t i = 0; i < fields_.size(); i++) {
mscores.emplace_back(MemberScore{fields_[i].ToString(), scores_[i]});
}
zset_->Add(key_, ZAddFlags::Default(), &mscores, &ret);
EXPECT_EQ(fields_.size(), ret);

zset_->Del(key_);
}

TEST_F(RedisZSetTest, DiffStore) {
std::vector<MemberScore> mscores;
for (size_t i = 0; i < fields_.size(); i++) {
mscores.emplace_back(MemberScore{fields_[i].ToString(), scores_[i]});
}

zset_->Add(key_, ZAddFlags::Default(), &mscores, &ret);
EXPECT_EQ(fields_.size(), ret);

zset_->Del(key_);
}