Skip to content

Commit

Permalink
HrangebyLex supports specify intervals (#1120)
Browse files Browse the repository at this point in the history
  • Loading branch information
tanruixiang authored Jan 17, 2023
1 parent 319e841 commit 003d8fb
Show file tree
Hide file tree
Showing 10 changed files with 307 additions and 114 deletions.
53 changes: 29 additions & 24 deletions src/commands/redis_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1614,34 +1614,39 @@ class CommandHGetAll : public Commander {
}
};

class CommandHRange : public Commander {
class CommandHRangeByLex : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
if (args.size() != 6 && args.size() != 4) {
return {Status::RedisParseErr, errWrongNumOfArguments};
CommandParser parser(args, 4);
while (parser.Good()) {
if (parser.EatEqICase("REV")) {
spec_.reversed = true;
} else if (parser.EatEqICase("LIMIT")) {
spec_.offset = GET_OR_RET(parser.TakeInt());
spec_.count = GET_OR_RET(parser.TakeInt());
} else {
return parser.InvalidSyntax();
}
}

if (args.size() == 6 && Util::ToLower(args[4]) != "limit") {
return {Status::RedisInvalidCmd, errInvalidSyntax};
Status s;
if (spec_.reversed) {
s = ParseRangeLexSpec(args[3], args[2], &spec_);
} else {
s = ParseRangeLexSpec(args[2], args[3], &spec_);
}

if (args.size() == 6) {
auto parse_result = ParseInt<int64_t>(args_[5], 10);
if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};

limit_ = *parse_result;
if (!s.IsOK()) {
return Status(Status::RedisParseErr, s.Msg());
}
return Commander::Parse(args);
return Status::OK();
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
Redis::Hash hash_db(svr->storage_, conn->GetNamespace());
std::vector<FieldValue> field_values;
auto s = hash_db.Range(args_[1], args_[2], args_[3], limit_, &field_values);
rocksdb::Status s = hash_db.RangeByLex(args_[1], spec_, &field_values);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

std::vector<std::string> kv_pairs;
for (const auto &p : field_values) {
kv_pairs.emplace_back(p.field);
Expand All @@ -1653,7 +1658,7 @@ class CommandHRange : public Commander {
}

private:
int64_t limit_ = LONG_MAX;
CommonRangeLexSpec spec_;
};

class CommandPush : public Commander {
Expand Down Expand Up @@ -2674,7 +2679,7 @@ class CommandZIncrBy : public Commander {
class CommandZLexCount : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Status s = Redis::ZSet::ParseRangeLexSpec(args[2], args[3], &spec_);
Status s = ParseRangeLexSpec(args[2], args[3], &spec_);
if (!s.IsOK()) {
return {Status::RedisParseErr, s.Msg()};
}
Expand All @@ -2695,7 +2700,7 @@ class CommandZLexCount : public Commander {
}

private:
ZRangeLexSpec spec_;
CommonRangeLexSpec spec_;
};

class CommandZPop : public Commander {
Expand Down Expand Up @@ -2808,9 +2813,9 @@ class CommandZRangeByLex : public Commander {
Status Parse(const std::vector<std::string> &args) override {
Status s;
if (spec_.reversed) {
s = Redis::ZSet::ParseRangeLexSpec(args[3], args[2], &spec_);
s = ParseRangeLexSpec(args[3], args[2], &spec_);
} else {
s = Redis::ZSet::ParseRangeLexSpec(args[2], args[3], &spec_);
s = ParseRangeLexSpec(args[2], args[3], &spec_);
}

if (!s.IsOK()) {
Expand Down Expand Up @@ -2844,7 +2849,7 @@ class CommandZRangeByLex : public Commander {
}

private:
ZRangeLexSpec spec_;
CommonRangeLexSpec spec_;
};

class CommandZRangeByScore : public Commander {
Expand Down Expand Up @@ -3031,7 +3036,7 @@ class CommandZRemRangeByScore : public Commander {
class CommandZRemRangeByLex : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Status s = Redis::ZSet::ParseRangeLexSpec(args[2], args[3], &spec_);
Status s = ParseRangeLexSpec(args[2], args[3], &spec_);
if (!s.IsOK()) {
return {Status::RedisParseErr, s.Msg()};
}
Expand All @@ -3051,7 +3056,7 @@ class CommandZRemRangeByLex : public Commander {
}

private:
ZRangeLexSpec spec_;
CommonRangeLexSpec spec_;
};

class CommandZScore : public Commander {
Expand Down Expand Up @@ -6420,7 +6425,7 @@ REDIS_REGISTER_COMMANDS(
MakeCmdAttr<CommandHVals>("hvals", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHGetAll>("hgetall", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHScan>("hscan", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHRange>("hrange", -4, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHRangeByLex>("hrangebylex", -4, "read-only", 1, 1, 1),

MakeCmdAttr<CommandLPush>("lpush", -3, "write", 1, 1, 1), MakeCmdAttr<CommandRPush>("rpush", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandLPushX>("lpushx", -3, "write", 1, 1, 1),
Expand Down
53 changes: 53 additions & 0 deletions src/common/range_spec.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "range_spec.h"

Status ParseRangeLexSpec(const std::string &min, const std::string &max, CommonRangeLexSpec *spec) {
if (min == "+" || max == "-") {
return Status(Status::NotOK, "min > max");
}

if (min == "-") {
spec->min = "";
} else {
if (min[0] == '(') {
spec->minex = true;
} else if (min[0] == '[') {
spec->minex = false;
} else {
return Status(Status::NotOK, "the min is illegal");
}
spec->min = min.substr(1);
}

if (max == "+") {
spec->max_infinite = true;
} else {
if (max[0] == '(') {
spec->maxex = true;
} else if (max[0] == '[') {
spec->maxex = false;
} else {
return Status(Status::NotOK, "the max is illegal");
}
spec->max = max.substr(1);
}
return Status::OK();
}
36 changes: 36 additions & 0 deletions src/common/range_spec.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#pragma once

#include <string>

#include "status.h"
struct CommonRangeLexSpec {
std::string min, max;
bool minex, maxex; /* are min or max exclusive */
bool max_infinite; /* are max infinite */
int64_t offset, count;
bool removed, reversed;
CommonRangeLexSpec()
: minex(false), maxex(false), max_infinite(false), offset(-1), count(-1), removed(false), reversed(false) {}
};

Status ParseRangeLexSpec(const std::string &min, const std::string &max, CommonRangeLexSpec *spec);
50 changes: 39 additions & 11 deletions src/types/redis_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,37 +276,65 @@ rocksdb::Status Hash::MSet(const Slice &user_key, const std::vector<FieldValue>
return storage_->Write(storage_->DefaultWriteOptions(), &batch);
}

rocksdb::Status Hash::Range(const Slice &user_key, const Slice &start, const Slice &stop, int64_t limit,
std::vector<FieldValue> *field_values) {
rocksdb::Status Hash::RangeByLex(const Slice &user_key, const CommonRangeLexSpec &spec,
std::vector<FieldValue> *field_values) {
field_values->clear();
if (start.compare(stop) >= 0 || limit <= 0) {
if (spec.count == 0) {
return rocksdb::Status::OK();
}
std::string ns_key;
AppendNamespacePrefix(user_key, &ns_key);
HashMetadata metadata(false);
rocksdb::Status s = GetMetadata(ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
limit = std::min(static_cast<int64_t>(metadata.size), limit);
std::string start_key, stop_key;
InternalKey(ns_key, start, metadata.version, storage_->IsSlotIdEncoded()).Encode(&start_key);
InternalKey(ns_key, stop, metadata.version, storage_->IsSlotIdEncoded()).Encode(&stop_key);

std::string start_member = spec.reversed ? spec.max : spec.min;
std::string start_key, prefix_key, next_version_prefix_key;
InternalKey(ns_key, start_member, metadata.version, storage_->IsSlotIdEncoded()).Encode(&start_key);
InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode(&prefix_key);
InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(&next_version_prefix_key);
rocksdb::ReadOptions read_options;
LatestSnapShot ss(db_);
read_options.snapshot = ss.GetSnapShot();
rocksdb::Slice upper_bound(stop_key);
rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix_key);
read_options.iterate_lower_bound = &lower_bound;
read_options.fill_cache = false;

auto iter = DBUtil::UniqueIterator(db_, read_options);
iter->Seek(start_key);
for (int64_t i = 0; iter->Valid() && i <= limit - 1; ++i) {
if (!spec.reversed) {
iter->Seek(start_key);
} else {
if (spec.max_infinite) {
iter->SeekToLast();
} else {
iter->SeekForPrev(start_key);
}
}
int64_t pos = 0;
for (; iter->Valid() && iter->key().starts_with(prefix_key); (!spec.reversed ? iter->Next() : iter->Prev())) {
FieldValue tmp_field_value;
InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
if (spec.reversed) {
if (ikey.GetSubKey().ToString() < spec.min || (spec.minex && ikey.GetSubKey().ToString() == spec.min)) {
break;
}
if ((spec.maxex && ikey.GetSubKey().ToString() == spec.max) ||
(!spec.max_infinite && ikey.GetSubKey().ToString() > spec.max)) {
continue;
}
} else {
if (spec.minex && ikey.GetSubKey().ToString() == spec.min) continue; // the min member was exclusive
if ((spec.maxex && ikey.GetSubKey().ToString() == spec.max) ||
(!spec.max_infinite && ikey.GetSubKey().ToString() > spec.max))
break;
}
if (spec.offset >= 0 && pos++ < spec.offset) continue;
tmp_field_value.field = ikey.GetSubKey().ToString();
tmp_field_value.value = iter->value().ToString();
field_values->emplace_back(tmp_field_value);
iter->Next();
if (spec.count > 0 && field_values && field_values->size() >= static_cast<unsigned>(spec.count)) break;
}
return rocksdb::Status::OK();
}
Expand Down
5 changes: 3 additions & 2 deletions src/types/redis_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <string>
#include <vector>

#include "common/range_spec.h"
#include "encoding.h"
#include "storage/redis_db.h"
#include "storage/redis_metadata.h"
Expand All @@ -48,8 +49,8 @@ class Hash : public SubKeyScanner {
rocksdb::Status IncrBy(const Slice &user_key, const Slice &field, int64_t increment, int64_t *ret);
rocksdb::Status IncrByFloat(const Slice &user_key, const Slice &field, double increment, double *ret);
rocksdb::Status MSet(const Slice &user_key, const std::vector<FieldValue> &field_values, bool nx, int *ret);
rocksdb::Status Range(const Slice &user_key, const Slice &start, const Slice &stop, int64_t limit,
std::vector<FieldValue> *field_values);
rocksdb::Status RangeByLex(const Slice &user_key, const CommonRangeLexSpec &spec,
std::vector<FieldValue> *field_values);
rocksdb::Status MGet(const Slice &user_key, const std::vector<Slice> &fields, std::vector<std::string> *values,
std::vector<rocksdb::Status> *statuses);
rocksdb::Status GetAll(const Slice &user_key, std::vector<FieldValue> *field_values,
Expand Down
39 changes: 3 additions & 36 deletions src/types/redis_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,8 @@ rocksdb::Status ZSet::RangeByScore(const Slice &user_key, ZRangeSpec spec, std::
return rocksdb::Status::OK();
}

rocksdb::Status ZSet::RangeByLex(const Slice &user_key, const ZRangeLexSpec &spec, std::vector<std::string> *members,
int *size) {
rocksdb::Status ZSet::RangeByLex(const Slice &user_key, const CommonRangeLexSpec &spec,
std::vector<std::string> *members, int *size) {
if (size) *size = 0;
if (members) members->clear();
if (spec.offset > -1 && spec.count == 0) {
Expand Down Expand Up @@ -564,7 +564,7 @@ rocksdb::Status ZSet::RemoveRangeByScore(const Slice &user_key, ZRangeSpec spec,
return RangeByScore(user_key, spec, nullptr, ret);
}

rocksdb::Status ZSet::RemoveRangeByLex(const Slice &user_key, ZRangeLexSpec spec, int *ret) {
rocksdb::Status ZSet::RemoveRangeByLex(const Slice &user_key, CommonRangeLexSpec spec, int *ret) {
spec.removed = true;
return RangeByLex(user_key, spec, nullptr, ret);
}
Expand Down Expand Up @@ -799,39 +799,6 @@ Status ZSet::ParseRangeSpec(const std::string &min, const std::string &max, ZRan
return Status::OK();
}

Status ZSet::ParseRangeLexSpec(const std::string &min, const std::string &max, ZRangeLexSpec *spec) {
if (min == "+" || max == "-") {
return Status(Status::NotOK, "min > max");
}

if (min == "-") {
spec->min = "";
} else {
if (min[0] == '(') {
spec->minex = true;
} else if (min[0] == '[') {
spec->minex = false;
} else {
return Status(Status::NotOK, "the min is illegal");
}
spec->min = min.substr(1);
}

if (max == "+") {
spec->max_infinite = true;
} else {
if (max[0] == '(') {
spec->maxex = true;
} else if (max[0] == '[') {
spec->maxex = false;
} else {
return Status(Status::NotOK, "the max is illegal");
}
spec->max = max.substr(1);
}
return Status::OK();
}

rocksdb::Status ZSet::Scan(const Slice &user_key, const std::string &cursor, uint64_t limit,
const std::string &member_prefix, std::vector<std::string> *members,
std::vector<double> *scores) {
Expand Down
Loading

0 comments on commit 003d8fb

Please sign in to comment.