From 48df049feb7f30d37fbbf67df8da2a8493acdcdd Mon Sep 17 00:00:00 2001 From: clundro Date: Wed, 24 May 2023 13:48:12 +0800 Subject: [PATCH 1/4] update cmd_zset Signed-off-by: clundro --- src/commands/cmd_zset.cc | 90 ++++++++++++++++++++++++++++++++++++++++ src/types/redis_zset.cc | 57 ++++++++++++++++++++++--- src/types/redis_zset.h | 21 ++++++---- 3 files changed, 153 insertions(+), 15 deletions(-) diff --git a/src/commands/cmd_zset.cc b/src/commands/cmd_zset.cc index b718bb6acf9..32eb2e46923 100644 --- a/src/commands/cmd_zset.cc +++ b/src/commands/cmd_zset.cc @@ -23,6 +23,7 @@ #include "commands/scan_base.h" #include "error_constants.h" #include "server/server.h" +#include "string_util.h" #include "types/redis_zset.h" namespace redis { @@ -164,6 +165,93 @@ class CommandZCard : public Commander { } }; +/* + * description: + * syntax: `ZDIFF numkeys key [key ...] [WITHSCORES]` + */ +class CommandZDiff : public Commander { + public: + Status Parse(const std::vector &args) override { + auto parse_numkey = ParseInt(args[1], 10); + if (!parse_numkey) { + return {Status::RedisParseErr, errValueNotInteger}; + } + numkeys_ = *parse_numkey; + // for example: ZDIFF 2 zset1 zset2 WITHSCORES + if (args.size() == numkeys_ + 3 && util::ToLower(args.back()) == "withscores") { + with_scores_ = true; + } + return Commander::Parse(args); + } + + Status Execute(Server *svr, Connection *conn, std::string *output) override { + std::vector keys; + for (size_t i = 2; i < numkeys_ + 2; i++) { + keys.emplace_back(args_[i]); + } + + std::vector members; + redis::ZSet zset_db(svr->storage, conn->GetNamespace()); + auto s = zset_db.Diff(key, &members); + if (!s.ok()) { + return {Status::RedisExecErr, s.ToString()}; + } + + *output = redis::MultiBulkString(members, false); + return Status::OK(); + } + + static CommandKeyRange Range(const std::vector &args) { + if (bool with_score = util::ToLower(args.back()) == "withscores"; with_score) { + return {2, -2, 1}; + } + return {2, -1, 1}; + } + + private: + uint64_t numkeys_ = 0; + bool with_scores_ = false; +}; + +/* + * description: + * syntax: `ZDIFFSTORE destination numkeys key [key ...]` + */ +class CommandZDiffStore : public Commander { + public: + Status Parse(const std::vector &args) override { + auto parse_num = ParseInt(args[2], 10); + if (!parse_num) { + return {Status::RedisParseErr, errValueNotInteger}; + } + numkeys_ = *parse_numkey; + if (args.size() != numkeys_ + 3) { + return {Status::RedisParseErr, errWrongNumOfArguments}; + } + + return Commander::Parse(args); + } + + Status Execute(Server *svr, Connection *conn, std::string *output) override { + std::vector keys; + for (size_t i = 3; i < args_.size(); i++) { + keys.emplace_back(args_[i]); + } + + int ret = 0; + redis::ZSet zset_db(svr->storage, conn->GetNamespace()); + if (auto s = zset_db.DiffStore(args_[1], keys, &ret); !s.ok()) { + return {Status::RedisExecErr, s.ToString()}; + } + + *output = redis::Integer(ret); + return Status::OK(); + } + + private: + uint64_t numkeys_ = 0; +}; + class CommandZIncrBy : public Commander { public: Status Parse(const std::vector &args) override { @@ -740,6 +828,8 @@ class CommandZScan : public CommandSubkeyScanBase { REDIS_REGISTER_COMMANDS(MakeCmdAttr("zadd", -4, "write", 1, 1, 1), MakeCmdAttr("zcard", 2, "read-only", 1, 1, 1), MakeCmdAttr("zcount", 4, "read-only", 1, 1, 1), + MakeCmdAttr("zdiff", -4, "read-only", CommandZDiff::Range), + MakeCmdAttr("zdiffstore", -4, "read-only", 3, -1, 1), MakeCmdAttr("zincrby", 4, "write", 1, 1, 1), MakeCmdAttr("zinterstore", -4, "write", 1, 1, 1), MakeCmdAttr("zlexcount", 4, "read-only", 1, 1, 1), diff --git a/src/types/redis_zset.cc b/src/types/redis_zset.cc index e181654a044..cd955783bdf 100644 --- a/src/types/redis_zset.cc +++ b/src/types/redis_zset.cc @@ -34,7 +34,7 @@ rocksdb::Status ZSet::GetMetadata(const Slice &ns_key, ZSetMetadata *metadata) { return Database::GetMetadata(kRedisZSet, ns_key, metadata); } -rocksdb::Status ZSet::Add(const Slice &user_key, ZAddFlags flags, MemberScores *mscores, int *ret) { +rocksdb::Status ZSet::Add(const Slice &user_key, ZAddFlags flags, MemberScoresTy *mscores, int *ret) { *ret = 0; std::string ns_key; @@ -158,7 +158,51 @@ rocksdb::Status ZSet::IncrBy(const Slice &user_key, const Slice &member, double return rocksdb::Status::OK(); } -rocksdb::Status ZSet::Pop(const Slice &user_key, int count, bool min, MemberScores *mscores) { +rocksdb::Status ZSet::MemberScores(const Slice &user_key, MemberScoresTy *mscores) { + mscores->clear(); + std::string ns_key; + AppendNamespacePrefix(user_key, &ns_key); + + SetMetadata metadata(false); + rocksdb::Status s = GetMetadata(ns_key, &metadata); + if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s; + + std::string prefix, next_version_prefix; + InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode(&prefix); + InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(&next_version_prefix); + + rocksdb::ReadOptions read_options; + LatestSnapShot ss(storage_); + read_options.snapshot = ss.GetSnapshot(); + rocksdb::Slice upper_bound(next_version_prefix); + read_options.iterate_upper_bound = &upper_bound; + storage_->SetReadOptions(read_options); + + auto iter = util::UniqueIterator(storage_, read_options); + for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { + InternalKeys ikey(iter->key(), storage_->IsSlotIdEncoded()); + // mscores->emplace_back(); + } + return rocksdb::Status::OK(); +} + +rocksdb::Status ZSet::Diff(const std::vector &keys, MemberScoresTy *mscores, bool with_score) { + mscores->clear(); + std::vector source_members; + // auto s = MembersTy(keys[0],&source_members); + + return rocksdb::Status::OK(); +} + +rocksdb::Status ZSet::DiffStore(const Slice &dst, const std::vector &keys, int *ret) { + *ret = 0; + std::vector mscores; + auto s = Diff(keys, &members); + *ret = static_cast(mscores.size()); + return Overwrite(dst, mscores); +} + +rocksdb::Status ZSet::Pop(const Slice &user_key, int count, bool min, MemberScoresTy *mscores) { mscores->clear(); std::string ns_key; @@ -219,7 +263,7 @@ rocksdb::Status ZSet::Pop(const Slice &user_key, int count, bool min, MemberScor return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); } -rocksdb::Status ZSet::RangeByRank(const Slice &user_key, const RangeRankSpec &spec, MemberScores *mscores, int *ret) { +rocksdb::Status ZSet::RangeByRank(const Slice &user_key, const RangeRankSpec &spec, MemberScoresTy *mscores, int *ret) { if (mscores) mscores->clear(); int cnt = 0; @@ -301,7 +345,8 @@ rocksdb::Status ZSet::RangeByRank(const Slice &user_key, const RangeRankSpec &sp return rocksdb::Status::OK(); } -rocksdb::Status ZSet::RangeByScore(const Slice &user_key, const RangeScoreSpec &spec, MemberScores *mscores, int *ret) { +rocksdb::Status ZSet::RangeByScore(const Slice &user_key, const RangeScoreSpec &spec, MemberScoresTy *mscores, + int *ret) { if (mscores) mscores->clear(); int cnt = 0; @@ -419,7 +464,7 @@ rocksdb::Status ZSet::RangeByScore(const Slice &user_key, const RangeScoreSpec & return rocksdb::Status::OK(); } -rocksdb::Status ZSet::RangeByLex(const Slice &user_key, const RangeLexSpec &spec, Members *members, int *ret) { +rocksdb::Status ZSet::RangeByLex(const Slice &user_key, const RangeLexSpec &spec, MembersTy *members, int *ret) { if (members) members->clear(); int cnt = 0; @@ -618,7 +663,7 @@ rocksdb::Status ZSet::Rank(const Slice &user_key, const Slice &member, bool reve return rocksdb::Status::OK(); } -rocksdb::Status ZSet::Overwrite(const Slice &user_key, const MemberScores &mscores) { +rocksdb::Status ZSet::Overwrite(const Slice &user_key, const MemberScoresTy &mscores) { std::string ns_key; AppendNamespacePrefix(user_key, &ns_key); diff --git a/src/types/redis_zset.h b/src/types/redis_zset.h index 118177545d5..b0218ae9cc7 100644 --- a/src/types/redis_zset.h +++ b/src/types/redis_zset.h @@ -92,31 +92,34 @@ class ZSet : public SubKeyScanner { explicit ZSet(engine::Storage *storage, const std::string &ns) : SubKeyScanner(storage, ns), score_cf_handle_(storage->GetCFHandle("zset_score")) {} - using Members = std::vector; - using MemberScores = std::vector; + using MembersTy = std::vector; + using MemberScoresTy = std::vector; - rocksdb::Status Add(const Slice &user_key, ZAddFlags flags, MemberScores *mscores, int *ret); + rocksdb::Status Add(const Slice &user_key, ZAddFlags flags, MemberScoresTy *mscores, int *ret); rocksdb::Status Card(const Slice &user_key, int *ret); rocksdb::Status IncrBy(const Slice &user_key, const Slice &member, double increment, double *score); + rocksdb::Status Diff(const std::vector &keys, MemberScoresTy *mscores, bool with_score); + rocksdb::Status DiffStore(const Slice &dst, const std::vector &keys, int *ret); rocksdb::Status Rank(const Slice &user_key, const Slice &member, bool reversed, int *ret); rocksdb::Status Remove(const Slice &user_key, const std::vector &members, int *ret); - rocksdb::Status Pop(const Slice &user_key, int count, bool min, MemberScores *mscores); + rocksdb::Status Pop(const Slice &user_key, int count, bool min, MemberScoresTy *mscores); rocksdb::Status Score(const Slice &user_key, const Slice &member, double *score); rocksdb::Status Scan(const Slice &user_key, const std::string &cursor, uint64_t limit, const std::string &member_prefix, std::vector *members, std::vector *scores = nullptr); - rocksdb::Status Overwrite(const Slice &user_key, const MemberScores &mscores); + rocksdb::Status Overwrite(const Slice &user_key, const MemberScoresTy &mscores); rocksdb::Status InterStore(const Slice &dst, const std::vector &keys_weights, AggregateMethod aggregate_method, int *size); rocksdb::Status UnionStore(const Slice &dst, const std::vector &keys_weights, AggregateMethod aggregate_method, int *size); rocksdb::Status MGet(const Slice &user_key, const std::vector &members, std::map *scores); rocksdb::Status GetMetadata(const Slice &ns_key, ZSetMetadata *metadata); - rocksdb::Status Count(const Slice &user_key, const RangeScoreSpec &spec, int *ret); - rocksdb::Status RangeByRank(const Slice &user_key, const RangeRankSpec &spec, MemberScores *mscores, int *ret); - rocksdb::Status RangeByScore(const Slice &user_key, const RangeScoreSpec &spec, MemberScores *mscores, int *ret); - rocksdb::Status RangeByLex(const Slice &user_key, const RangeLexSpec &spec, Members *members, int *ret); + rocksdb::Status RangeByRank(const Slice &user_key, const RangeRankSpec &spec, MemberScoresTy *mscores, int *ret); + rocksdb::Status RangeByScore(const Slice &user_key, const RangeScoreSpec &spec, MemberScoresTy *mscores, int *ret); + rocksdb::Status RangeByLex(const Slice &user_key, const RangeLexSpec &spec, MembersTy *members, int *ret); + + rocksdb::Status MemberScores(const Slice &user_key, MemberScoresTy *mscores); private: rocksdb::ColumnFamilyHandle *score_cf_handle_; From 6476895c7bbabf149b5988a5536f515c25197302 Mon Sep 17 00:00:00 2001 From: clundro Date: Wed, 14 Jun 2023 01:38:30 +0800 Subject: [PATCH 2/4] add parse funcs for zdiff(store) Signed-off-by: clundro --- src/commands/cmd_zset.cc | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/src/commands/cmd_zset.cc b/src/commands/cmd_zset.cc index c4651284ec6..82da39b680b 100644 --- a/src/commands/cmd_zset.cc +++ b/src/commands/cmd_zset.cc @@ -178,11 +178,20 @@ class CommandZDiff : public Commander { if (!parse_numkey) { return {Status::RedisParseErr, errValueNotInteger}; } + if (*parse_numkey <= 0) { + return {Status::RedisParseErr, errValueMustBePositive}; + } + numkeys_ = *parse_numkey; // for example: ZDIFF 2 zset1 zset2 WITHSCORES if (args.size() == numkeys_ + 3 && util::ToLower(args.back()) == "withscores") { with_scores_ = true; } + + if (args.size() != numkeys_ + 2) { + return {Status::RedisParseErr, errWrongNumOfArguments}; + } + return Commander::Parse(args); } @@ -204,10 +213,8 @@ class CommandZDiff : public Commander { } static CommandKeyRange Range(const std::vector &args) { - if (bool with_score = util::ToLower(args.back()) == "withscores"; with_score) { - return {2, -2, 1}; - } - return {2, -1, 1}; + int num_key = *ParseInt(args[1], 10); + return {2, 1 + num_key, 1}; } private: @@ -226,11 +233,15 @@ class CommandZDiffStore : public Commander { if (!parse_num) { return {Status::RedisParseErr, errValueNotInteger}; } + if (*parse_numkey <= 0) { + return {Status::RedisParseErr, errValueMustBePositive}; + } + numkeys_ = *parse_numkey; + // for example: ZDIFFSTORE out 2 zset1 zset2 if (args.size() != numkeys_ + 3) { return {Status::RedisParseErr, errWrongNumOfArguments}; } - return Commander::Parse(args); } @@ -250,6 +261,8 @@ class CommandZDiffStore : public Commander { return Status::OK(); } + static CommandKeyRange Range(const std::vector &args) { return {2, -1, 1}; } + private: uint64_t numkeys_ = 0; }; @@ -1005,8 +1018,8 @@ class CommandZScan : public CommandSubkeyScanBase { REDIS_REGISTER_COMMANDS(MakeCmdAttr("zadd", -4, "write", 1, 1, 1), MakeCmdAttr("zcard", 2, "read-only", 1, 1, 1), MakeCmdAttr("zcount", 4, "read-only", 1, 1, 1), - MakeCmdAttr("zdiff", -4, "read-only", CommandZDiff::Range), - MakeCmdAttr("zdiffstore", -4, "read-only", 3, -1, 1), + MakeCmdAttr("zdiff", -3, "read-only", CommandZDiff::Range), + MakeCmdAttr("zdiffstore", -4, "write", CommandZDiffStore::Range), MakeCmdAttr("zincrby", 4, "write", 1, 1, 1), MakeCmdAttr("zinterstore", -4, "write", 1, 1, 1), MakeCmdAttr("zlexcount", 4, "read-only", 1, 1, 1), From 9cc4a90f6f8b8ba0eed5151da34d0dc51dbfc768 Mon Sep 17 00:00:00 2001 From: clundro Date: Wed, 14 Jun 2023 02:42:07 +0800 Subject: [PATCH 3/4] write interfaces for cmd_zset Signed-off-by: clundro --- src/commands/cmd_zset.cc | 25 ++++++++++++++++--------- src/types/redis_zset.cc | 19 +++++++++---------- src/types/redis_zset.h | 14 +++++++++----- 3 files changed, 34 insertions(+), 24 deletions(-) diff --git a/src/commands/cmd_zset.cc b/src/commands/cmd_zset.cc index 82da39b680b..049f01ef96f 100644 --- a/src/commands/cmd_zset.cc +++ b/src/commands/cmd_zset.cc @@ -186,6 +186,7 @@ class CommandZDiff : public Commander { // for example: ZDIFF 2 zset1 zset2 WITHSCORES if (args.size() == numkeys_ + 3 && util::ToLower(args.back()) == "withscores") { with_scores_ = true; + return Commander::Parse(args); } if (args.size() != numkeys_ + 2) { @@ -201,14 +202,19 @@ class CommandZDiff : public Commander { keys.emplace_back(args_[i]); } - std::vector members; + std::vector mscores; redis::ZSet zset_db(svr->storage, conn->GetNamespace()); - auto s = zset_db.Diff(key, &members); + auto s = zset_db.Diff(keys, &mscores); if (!s.ok()) { return {Status::RedisExecErr, s.ToString()}; } - *output = redis::MultiBulkString(members, false); + output->append(redis::MultiLen(mscores.size() * (with_scores_ ? 2 : 1))); + for (const auto &ms : mscores) { + output->append(redis::BulkString(ms.member)); + if (with_scores_) output->append(redis::BulkString(util::Float2String(ms.score))); + } + return Status::OK(); } @@ -218,8 +224,8 @@ class CommandZDiff : public Commander { } private: - uint64_t numkeys_ = 0; - bool with_scores_ = false; + uint64_t numkeys_{0}; + bool with_scores_{false}; }; /* @@ -229,8 +235,8 @@ class CommandZDiff : public Commander { class CommandZDiffStore : public Commander { public: Status Parse(const std::vector &args) override { - auto parse_num = ParseInt(args[2], 10); - if (!parse_num) { + auto parse_numkey = ParseInt(args[2], 10); + if (!parse_numkey) { return {Status::RedisParseErr, errValueNotInteger}; } if (*parse_numkey <= 0) { @@ -251,7 +257,7 @@ class CommandZDiffStore : public Commander { keys.emplace_back(args_[i]); } - int ret = 0; + uint64_t ret = 0; redis::ZSet zset_db(svr->storage, conn->GetNamespace()); if (auto s = zset_db.DiffStore(args_[1], keys, &ret); !s.ok()) { return {Status::RedisExecErr, s.ToString()}; @@ -261,10 +267,11 @@ class CommandZDiffStore : public Commander { return Status::OK(); } + // todo(infdahai): it exists issues. static CommandKeyRange Range(const std::vector &args) { return {2, -1, 1}; } private: - uint64_t numkeys_ = 0; + uint64_t numkeys_{0}; }; class CommandZIncrBy : public Commander { diff --git a/src/types/redis_zset.cc b/src/types/redis_zset.cc index 6f028a66b5a..81efc9b4104 100644 --- a/src/types/redis_zset.cc +++ b/src/types/redis_zset.cc @@ -34,7 +34,7 @@ rocksdb::Status ZSet::GetMetadata(const Slice &ns_key, ZSetMetadata *metadata) { return Database::GetMetadata(kRedisZSet, ns_key, metadata); } -rocksdb::Status ZSet::Add(const Slice &user_key, ZAddFlags flags, MemberScores *mscores, uint64_t *added_cnt) { +rocksdb::Status ZSet::Add(const Slice &user_key, ZAddFlags flags, MemberScoresTy *mscores, uint64_t *added_cnt) { *added_cnt = 0; std::string ns_key; @@ -180,13 +180,13 @@ rocksdb::Status ZSet::MemberScores(const Slice &user_key, MemberScoresTy *mscore auto iter = util::UniqueIterator(storage_, read_options); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { - InternalKeys ikey(iter->key(), storage_->IsSlotIdEncoded()); + InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded()); // mscores->emplace_back(); } return rocksdb::Status::OK(); } -rocksdb::Status ZSet::Diff(const std::vector &keys, MemberScoresTy *mscores, bool with_score) { +rocksdb::Status ZSet::Diff(const std::vector &keys, MemberScoresTy *mscores) { mscores->clear(); std::vector source_members; // auto s = MembersTy(keys[0],&source_members); @@ -194,11 +194,10 @@ rocksdb::Status ZSet::Diff(const std::vector &keys, MemberScoresTy *mscor return rocksdb::Status::OK(); } -rocksdb::Status ZSet::DiffStore(const Slice &dst, const std::vector &keys, int *ret) { - *ret = 0; +rocksdb::Status ZSet::DiffStore(const Slice &dst, const std::vector &keys, uint64_t *saved_cnt) { std::vector mscores; - auto s = Diff(keys, &members); - *ret = static_cast(mscores.size()); + auto s = Diff(keys, &mscores); + *saved_cnt = mscores.size(); return Overwrite(dst, mscores); } @@ -263,7 +262,7 @@ rocksdb::Status ZSet::Pop(const Slice &user_key, int count, bool min, MemberScor return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); } -rocksdb::Status ZSet::RangeByRank(const Slice &user_key, const RangeRankSpec &spec, MemberScores *mscores, +rocksdb::Status ZSet::RangeByRank(const Slice &user_key, const RangeRankSpec &spec, MemberScoresTy *mscores, uint64_t *removed_cnt) { if (mscores) mscores->clear(); @@ -346,7 +345,7 @@ rocksdb::Status ZSet::RangeByRank(const Slice &user_key, const RangeRankSpec &sp return rocksdb::Status::OK(); } -rocksdb::Status ZSet::RangeByScore(const Slice &user_key, const RangeScoreSpec &spec, MemberScores *mscores, +rocksdb::Status ZSet::RangeByScore(const Slice &user_key, const RangeScoreSpec &spec, MemberScoresTy *mscores, uint64_t *removed_cnt) { if (mscores) mscores->clear(); @@ -465,7 +464,7 @@ rocksdb::Status ZSet::RangeByScore(const Slice &user_key, const RangeScoreSpec & return rocksdb::Status::OK(); } -rocksdb::Status ZSet::RangeByLex(const Slice &user_key, const RangeLexSpec &spec, MemberScores *mscores, +rocksdb::Status ZSet::RangeByLex(const Slice &user_key, const RangeLexSpec &spec, MemberScoresTy *mscores, uint64_t *removed_cnt) { if (mscores) mscores->clear(); diff --git a/src/types/redis_zset.h b/src/types/redis_zset.h index d32c0295f24..0e901957f96 100644 --- a/src/types/redis_zset.h +++ b/src/types/redis_zset.h @@ -95,31 +95,35 @@ class ZSet : public SubKeyScanner { using MembersTy = std::vector; using MemberScoresTy = std::vector; - rocksdb::Status Add(const Slice &user_key, ZAddFlags flags, MemberScores *mscores, uint64_t *added_cnt); + rocksdb::Status Add(const Slice &user_key, ZAddFlags flags, MemberScoresTy *mscores, uint64_t *added_cnt); rocksdb::Status Card(const Slice &user_key, uint64_t *size); rocksdb::Status IncrBy(const Slice &user_key, const Slice &member, double increment, double *score); rocksdb::Status Rank(const Slice &user_key, const Slice &member, bool reversed, int *member_rank); rocksdb::Status Remove(const Slice &user_key, const std::vector &members, uint64_t *removed_cnt); - rocksdb::Status Pop(const Slice &user_key, int count, bool min, MemberScores *mscores); + rocksdb::Status Pop(const Slice &user_key, int count, bool min, MemberScoresTy *mscores); rocksdb::Status Score(const Slice &user_key, const Slice &member, double *score); rocksdb::Status Scan(const Slice &user_key, const std::string &cursor, uint64_t limit, const std::string &member_prefix, std::vector *members, std::vector *scores = nullptr); + rocksdb::Status Diff(const std::vector &keys, MemberScoresTy *mscores); rocksdb::Status Overwrite(const Slice &user_key, const MemberScoresTy &mscores); rocksdb::Status InterStore(const Slice &dst, const std::vector &keys_weights, AggregateMethod aggregate_method, uint64_t *saved_cnt); + rocksdb::Status DiffStore(const Slice &dst, const std::vector &keys, uint64_t *saved_cnt); rocksdb::Status UnionStore(const Slice &dst, const std::vector &keys_weights, AggregateMethod aggregate_method, uint64_t *saved_cnt); rocksdb::Status MGet(const Slice &user_key, const std::vector &members, std::map *scores); rocksdb::Status GetMetadata(const Slice &ns_key, ZSetMetadata *metadata); rocksdb::Status Count(const Slice &user_key, const RangeScoreSpec &spec, uint64_t *size); - rocksdb::Status RangeByRank(const Slice &user_key, const RangeRankSpec &spec, MemberScores *mscores, + rocksdb::Status RangeByRank(const Slice &user_key, const RangeRankSpec &spec, MemberScoresTy *mscores, uint64_t *removed_cnt); - rocksdb::Status RangeByScore(const Slice &user_key, const RangeScoreSpec &spec, MemberScores *mscores, + rocksdb::Status RangeByScore(const Slice &user_key, const RangeScoreSpec &spec, MemberScoresTy *mscores, uint64_t *removed_cnt); - rocksdb::Status RangeByLex(const Slice &user_key, const RangeLexSpec &spec, MemberScores *mscores, + rocksdb::Status RangeByLex(const Slice &user_key, const RangeLexSpec &spec, MemberScoresTy *mscores, uint64_t *removed_cnt); + rocksdb::Status MemberScores(const Slice &user_key, MemberScoresTy *mscores); + private: rocksdb::ColumnFamilyHandle *score_cf_handle_; }; From 07bc99a7e4c78f65e1f80c9a9971f98b666804c2 Mon Sep 17 00:00:00 2001 From: clundro Date: Mon, 19 Jun 2023 01:45:50 +0800 Subject: [PATCH 4/4] write done zset::diff Signed-off-by: clundro --- src/types/redis_zset.cc | 48 +++++++++++++------------------- src/types/redis_zset.h | 2 -- tests/cppunit/types/zset_test.cc | 24 ++++++++++++++++ 3 files changed, 44 insertions(+), 30 deletions(-) diff --git a/src/types/redis_zset.cc b/src/types/redis_zset.cc index 81efc9b4104..572f4c54d79 100644 --- a/src/types/redis_zset.cc +++ b/src/types/redis_zset.cc @@ -158,38 +158,29 @@ rocksdb::Status ZSet::IncrBy(const Slice &user_key, const Slice &member, double return rocksdb::Status::OK(); } -rocksdb::Status ZSet::MemberScores(const Slice &user_key, MemberScoresTy *mscores) { +rocksdb::Status ZSet::Diff(const std::vector &keys, MemberScoresTy *mscores) { mscores->clear(); - std::string ns_key; - AppendNamespacePrefix(user_key, &ns_key); - - SetMetadata metadata(false); - rocksdb::Status s = GetMetadata(ns_key, &metadata); - if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s; - - std::string prefix, next_version_prefix; - InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode(&prefix); - InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(&next_version_prefix); - - rocksdb::ReadOptions read_options; - LatestSnapShot ss(storage_); - read_options.snapshot = ss.GetSnapshot(); - rocksdb::Slice upper_bound(next_version_prefix); - read_options.iterate_upper_bound = &upper_bound; - storage_->SetReadOptions(read_options); + RangeScoreSpec spec; + uint64_t removed_cnt{}; + MemberScoresTy source_mscores; + auto s = RangeByScore(keys[0], spec, source_mscores, &removed_cnt); + if (!s.ok()) return s; - auto iter = util::UniqueIterator(storage_, read_options); - for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { - InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded()); - // mscores->emplace_back(); + std::map exclude_mscores; + MemberScoresTy target_mscores; + for (size_t i = 1; i < keys.size(); i++) { + s = RangeByScore(keys[i], &target_mscores); + if (!s.ok()) return s; + for (const auto &mscore : target_mscores) { + exclude_mscores[mscore.member] = true; + } } - return rocksdb::Status::OK(); -} -rocksdb::Status ZSet::Diff(const std::vector &keys, MemberScoresTy *mscores) { - mscores->clear(); - std::vector source_members; - // auto s = MembersTy(keys[0],&source_members); + for (const auto &mscore : source_mscores) { + if (exclude_mscores.find(mscore.member) == exclude_mscores.end()) { + mscores->emplace_back(mscore); + } + } return rocksdb::Status::OK(); } @@ -197,6 +188,7 @@ rocksdb::Status ZSet::Diff(const std::vector &keys, MemberScoresTy *mscor rocksdb::Status ZSet::DiffStore(const Slice &dst, const std::vector &keys, uint64_t *saved_cnt) { std::vector mscores; auto s = Diff(keys, &mscores); + if (!s.ok()) return s; *saved_cnt = mscores.size(); return Overwrite(dst, mscores); } diff --git a/src/types/redis_zset.h b/src/types/redis_zset.h index 0e901957f96..28732ff0552 100644 --- a/src/types/redis_zset.h +++ b/src/types/redis_zset.h @@ -122,8 +122,6 @@ class ZSet : public SubKeyScanner { rocksdb::Status RangeByLex(const Slice &user_key, const RangeLexSpec &spec, MemberScoresTy *mscores, uint64_t *removed_cnt); - rocksdb::Status MemberScores(const Slice &user_key, MemberScoresTy *mscores); - private: rocksdb::ColumnFamilyHandle *score_cf_handle_; }; diff --git a/tests/cppunit/types/zset_test.cc b/tests/cppunit/types/zset_test.cc index f7e6e27ac53..25c00ba5781 100644 --- a/tests/cppunit/types/zset_test.cc +++ b/tests/cppunit/types/zset_test.cc @@ -398,3 +398,27 @@ TEST_F(RedisZSetTest, Rank) { } zset_->Del(key_); } + +TEST_F(RedisZSetTest, Diff) { + uint64_t ret = 0; + std::vector mscores; + for (size_t i = 0; i < fields_.size(); i++) { + mscores.emplace_back(MemberScore{fields_[i].ToString(), scores_[i]}); + } + zset_->Add(key_, ZAddFlags::Default(), &mscores, &ret); + EXPECT_EQ(fields_.size(), ret); + + zset_->Del(key_); +} + +TEST_F(RedisZSetTest, DiffStore) { + std::vector mscores; + for (size_t i = 0; i < fields_.size(); i++) { + mscores.emplace_back(MemberScore{fields_[i].ToString(), scores_[i]}); + } + + zset_->Add(key_, ZAddFlags::Default(), &mscores, &ret); + EXPECT_EQ(fields_.size(), ret); + + zset_->Del(key_); +} \ No newline at end of file