Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ daemonize no

cluster-enabled no

# By default, namespaces are stored in the configuration file and won't be replicated
# to replicas. This option allows to change this behavior, so that namespaces are also
# propagated to slaves. Note that:
# 1) it won't replicate the 'masterauth' to prevent breaking master/replica replication
# 2) it will overwrite replica's namespace with master's namespace, so be careful of in-using namespaces
# 3) cannot switch off the namespace replication once it's enabled
#
# Default: no
repl-namespace-enabled no

# Persist the cluster nodes topology in local file($dir/nodes.conf). This configuration
# takes effect only if the cluster mode was enabled.
Expand Down
5 changes: 5 additions & 0 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,11 @@ Status ReplicationThread::parseWriteBatch(const std::string &batch_string) {
return s.Prefixed("failed to execute propagate command");
}
}
} else if (write_batch_handler.Key() == kNamespaceDBKey) {
auto s = srv_->GetNamespace()->LoadAndRewrite();
if (!s.IsOK()) {
return s.Prefixed("failed to load namespaces");
}
}
break;
case kBatchTypeStream: {
Expand Down
35 changes: 18 additions & 17 deletions src/commands/cmd_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ enum class AuthResult {
NO_REQUIRE_PASS,
};

AuthResult AuthenticateUser(Connection *conn, Config *config, const std::string &user_password) {
auto iter = config->tokens.find(user_password);
if (iter != config->tokens.end()) {
conn->SetNamespace(iter->second);
AuthResult AuthenticateUser(Server *srv, Connection *conn, const std::string &user_password) {
auto ns = srv->GetNamespace()->GetByToken(user_password);
if (ns.IsOK()) {
conn->SetNamespace(ns.GetValue());
conn->BecomeUser();
return AuthResult::OK;
}

const auto &requirepass = config->requirepass;
const auto &requirepass = srv->GetConfig()->requirepass;
if (!requirepass.empty() && user_password != requirepass) {
return AuthResult::INVALID_PASSWORD;
}
Expand All @@ -64,9 +64,8 @@ AuthResult AuthenticateUser(Connection *conn, Config *config, const std::string
class CommandAuth : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
Config *config = svr->GetConfig();
auto &user_password = args_[1];
AuthResult result = AuthenticateUser(conn, config, user_password);
AuthResult result = AuthenticateUser(svr, conn, user_password);
switch (result) {
case AuthResult::OK:
*output = redis::SimpleString("OK");
Expand All @@ -89,10 +88,13 @@ class CommandNamespace : public Commander {

Config *config = svr->GetConfig();
std::string sub_command = util::ToLower(args_[1]);
if (config->repl_namespace_enabled && config->IsSlave() && sub_command != "get") {
return {Status::RedisExecErr, "namespace is read-only for slave"};
}
if (args_.size() == 3 && sub_command == "get") {
if (args_[2] == "*") {
std::vector<std::string> namespaces;
auto tokens = config->tokens;
auto tokens = svr->GetNamespace()->List();
for (auto &token : tokens) {
namespaces.emplace_back(token.second); // namespace
namespaces.emplace_back(token.first); // token
Expand All @@ -101,26 +103,25 @@ class CommandNamespace : public Commander {
namespaces.emplace_back(config->requirepass);
*output = redis::MultiBulkString(namespaces, false);
} else {
std::string token;
auto s = config->GetNamespace(args_[2], &token);
if (s.Is<Status::NotFound>()) {
auto token = svr->GetNamespace()->Get(args_[2]);
if (token.Is<Status::NotFound>()) {
*output = redis::NilString();
} else {
*output = redis::BulkString(token);
*output = redis::BulkString(token.GetValue());
}
}
} else if (args_.size() == 4 && sub_command == "set") {
Status s = config->SetNamespace(args_[2], args_[3]);
Status s = svr->GetNamespace()->Set(args_[2], args_[3]);
*output = s.IsOK() ? redis::SimpleString("OK") : redis::Error("ERR " + s.Msg());
LOG(WARNING) << "Updated namespace: " << args_[2] << " with token: " << args_[3] << ", addr: " << conn->GetAddr()
<< ", result: " << s.Msg();
} else if (args_.size() == 4 && sub_command == "add") {
Status s = config->AddNamespace(args_[2], args_[3]);
Status s = svr->GetNamespace()->Add(args_[2], args_[3]);
*output = s.IsOK() ? redis::SimpleString("OK") : redis::Error("ERR " + s.Msg());
LOG(WARNING) << "New namespace: " << args_[2] << " with token: " << args_[3] << ", addr: " << conn->GetAddr()
<< ", result: " << s.Msg();
} else if (args_.size() == 3 && sub_command == "del") {
Status s = config->DelNamespace(args_[2]);
Status s = svr->GetNamespace()->Del(args_[2]);
*output = s.IsOK() ? redis::SimpleString("OK") : redis::Error("ERR " + s.Msg());
LOG(WARNING) << "Deleted namespace: " << args_[2] << ", addr: " << conn->GetAddr() << ", result: " << s.Msg();
} else {
Expand Down Expand Up @@ -239,7 +240,7 @@ class CommandConfig : public Commander {
}

if (args_.size() == 2 && sub_command == "rewrite") {
Status s = config->Rewrite();
Status s = config->Rewrite(svr->GetNamespace()->List());
if (!s.IsOK()) return {Status::RedisExecErr, s.Msg()};

*output = redis::SimpleString("OK");
Expand Down Expand Up @@ -709,7 +710,7 @@ class CommandHello final : public Commander {
next_arg++;
}
const auto &user_password = args_[next_arg + 1];
auto auth_result = AuthenticateUser(conn, svr->GetConfig(), user_password);
auto auth_result = AuthenticateUser(svr, conn, user_password);
switch (auth_result) {
case AuthResult::INVALID_PASSWORD:
return {Status::NotOK, "invalid password"};
Expand Down
128 changes: 18 additions & 110 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ Config::Config() {
{"log-retention-days", false, new IntField(&log_retention_days, -1, -1, INT_MAX)},
{"persist-cluster-nodes-enabled", false, new YesNoField(&persist_cluster_nodes_enabled, true)},
{"redis-cursor-compatible", false, new YesNoField(&redis_cursor_compatible, false)},
{"repl-namespace-enabled", false, new YesNoField(&repl_namespace_enabled, false)},

/* rocksdb options */
{"rocksdb.compression", false,
Expand Down Expand Up @@ -233,17 +234,17 @@ void Config::initFieldValidator() {
std::map<std::string, ValidateFn> validators = {
{"requirepass",
[this](const std::string &k, const std::string &v) -> Status {
if (v.empty() && !tokens.empty()) {
if (v.empty() && !load_tokens.empty()) {
return {Status::NotOK, "requirepass empty not allowed while the namespace exists"};
}
if (tokens.find(v) != tokens.end()) {
if (load_tokens.find(v) != load_tokens.end()) {
return {Status::NotOK, "requirepass is duplicated with namespace tokens"};
}
return Status::OK();
}},
{"masterauth",
[this](const std::string &k, const std::string &v) -> Status {
if (tokens.find(v) != tokens.end()) {
if (load_tokens.find(v) != load_tokens.end()) {
return {Status::NotOK, "masterauth is duplicated with namespace tokens"};
}
return Status::OK();
Expand Down Expand Up @@ -515,6 +516,12 @@ void Config::initFieldCallback() {
remove(nodes_file_path.data());
return Status::OK();
}},
{"repl-namespace-enabled",
[](Server *srv, const std::string &k, const std::string &v) -> Status {
if (!srv) return Status::OK();
return srv->GetNamespace()->LoadAndRewrite();
}},

{"rocksdb.target_file_size_base",
[this](Server *srv, const std::string &k, const std::string &v) -> Status {
if (!srv) return Status::OK();
Expand Down Expand Up @@ -682,7 +689,7 @@ Status Config::parseConfigFromPair(const std::pair<std::string, std::string> &in
if (strncasecmp(input.first.data(), ns_str, ns_str_size) == 0) {
// namespace should keep key case-sensitive
field_key = input.first;
tokens[input.second] = input.first.substr(ns_str_size);
load_tokens[input.second] = input.first.substr(ns_str_size);
return Status::OK();
}

Expand Down Expand Up @@ -711,10 +718,10 @@ Status Config::parseConfigFromString(const std::string &input, int line_number)
}

Status Config::finish() {
if (requirepass.empty() && !tokens.empty()) {
if (requirepass.empty() && !load_tokens.empty()) {
return {Status::NotOK, "requirepass empty wasn't allowed while the namespace exists"};
}
if ((cluster_enabled) && !tokens.empty()) {
if ((cluster_enabled) && !load_tokens.empty()) {
return {Status::NotOK, "enabled cluster mode wasn't allowed while the namespace exists"};
}
if (unixsocket.empty() && binds.size() == 0) {
Expand Down Expand Up @@ -836,7 +843,7 @@ Status Config::Set(Server *svr, std::string key, const std::string &value) {
return Status::OK();
}

Status Config::Rewrite() {
Status Config::Rewrite(const std::map<std::string, std::string> &tokens) {
if (path_.empty()) {
return {Status::NotOK, "the server is running without a config file"};
}
Expand All @@ -853,8 +860,10 @@ Status Config::Rewrite() {
}

std::string namespace_prefix = "namespace.";
for (const auto &iter : tokens) {
new_config[namespace_prefix + iter.second] = iter.first;
if (!repl_namespace_enabled) { // need to rewrite to the configuration if we don't replicate namespaces
for (const auto &iter : tokens) {
new_config[namespace_prefix + iter.second] = iter.first;
}
}

std::ifstream file(path_);
Expand Down Expand Up @@ -900,104 +909,3 @@ Status Config::Rewrite() {
}
return Status::OK();
}

Status Config::GetNamespace(const std::string &ns, std::string *token) const {
token->clear();
for (const auto &iter : tokens) {
if (iter.second == ns) {
*token = iter.first;
return Status::OK();
}
}
return {Status::NotFound};
}

Status Config::SetNamespace(const std::string &ns, const std::string &token) {
if (ns == kDefaultNamespace) {
return {Status::NotOK, "forbidden to update the default namespace"};
}
if (tokens.find(token) != tokens.end()) {
return {Status::NotOK, "the token has already exists"};
}

if (token == requirepass || token == masterauth) {
return {Status::NotOK, "the token is duplicated with requirepass or masterauth"};
}

for (const auto &iter : tokens) {
if (iter.second == ns) {
tokens.erase(iter.first);
tokens[token] = ns;
auto s = Rewrite();
if (!s.IsOK()) {
// Need to roll back the old token if fails to rewrite the config
tokens.erase(token);
tokens[iter.first] = ns;
}
return s;
}
}
return {Status::NotOK, "the namespace was not found"};
}

Status Config::AddNamespace(const std::string &ns, const std::string &token) {
if (requirepass.empty()) {
return {Status::NotOK, "forbidden to add namespace when requirepass was empty"};
}
if (cluster_enabled) {
return {Status::NotOK, "forbidden to add namespace when cluster mode was enabled"};
}
if (ns == kDefaultNamespace) {
return {Status::NotOK, "forbidden to add the default namespace"};
}
auto s = isNamespaceLegal(ns);
if (!s.IsOK()) return s;
if (tokens.find(token) != tokens.end()) {
return {Status::NotOK, "the token has already exists"};
}

if (token == requirepass || token == masterauth) {
return {Status::NotOK, "the token is duplicated with requirepass or masterauth"};
}

for (const auto &iter : tokens) {
if (iter.second == ns) {
return {Status::NotOK, "the namespace has already exists"};
}
}
tokens[token] = ns;

s = Rewrite();
if (!s.IsOK()) {
tokens.erase(token);
}
return s;
}

Status Config::DelNamespace(const std::string &ns) {
if (ns == kDefaultNamespace) {
return {Status::NotOK, "forbidden to delete the default namespace"};
}
for (const auto &iter : tokens) {
if (iter.second == ns) {
tokens.erase(iter.first);
auto s = Rewrite();
if (!s.IsOK()) {
tokens[iter.first] = ns;
}
return s;
}
}
return {Status::NotOK, "the namespace was not found"};
}

Status Config::isNamespaceLegal(const std::string &ns) {
if (ns.size() > UINT8_MAX) {
return {Status::NotOK, fmt::format("size exceed limit {}", UINT8_MAX)};
}
char last_char = ns.back();
if (last_char == std::numeric_limits<char>::max()) {
return {Status::NotOK, "namespace contain illegal letter"};
}
return Status::OK();
}
15 changes: 8 additions & 7 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ struct Config {
CompactionCheckerRange compaction_checker_range{-1, -1};
int64_t force_compact_file_age;
int force_compact_file_min_deleted_percentage;
std::map<std::string, std::string> tokens;
bool repl_namespace_enabled = false;
std::string replica_announce_ip;
uint32_t replica_announce_port = 0;

Expand All @@ -149,6 +149,11 @@ struct Config {

bool redis_cursor_compatible = false;
int log_retention_days;

// load_tokens is used to buffer the tokens when loading,
// don't use it to authenticate or rewrite the configuration file.
std::map<std::string, std::string> load_tokens;

// profiling
int profiling_sample_ratio = 0;
int profiling_sample_record_threshold_ms = 0;
Expand Down Expand Up @@ -210,16 +215,13 @@ struct Config {
mutable std::mutex backup_mu;

std::string NodesFilePath() const;
Status Rewrite();
Status Rewrite(const std::map<std::string, std::string> &tokens);
Status Load(const CLIOptions &path);
void Get(const std::string &key, std::vector<std::string> *values) const;
Status Set(Server *svr, std::string key, const std::string &value);
void SetMaster(const std::string &host, uint32_t port);
void ClearMaster();
Status GetNamespace(const std::string &ns, std::string *token) const;
Status AddNamespace(const std::string &ns, const std::string &token);
Status SetNamespace(const std::string &ns, const std::string &token);
Status DelNamespace(const std::string &ns);
bool IsSlave() const { return !master_host.empty(); }

private:
std::string path_;
Expand All @@ -237,5 +239,4 @@ struct Config {
Status parseConfigFromPair(const std::pair<std::string, std::string> &input, int line_number);
Status parseConfigFromString(const std::string &input, int line_number);
Status finish();
static Status isNamespaceLegal(const std::string &ns);
};
Loading