Skip to content

Commit 4391ba4

Browse files
committed
Don't allow to switch off after enabling
1 parent 3fc33ab commit 4391ba4

File tree

7 files changed

+63
-47
lines changed

7 files changed

+63
-47
lines changed

kvrocks.conf

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ cluster-enabled no
5555
# to replicas. This option allows to change this behavior, so that namespaces are also
5656
# propagated to slaves. Note that:
5757
# 1) it won't replicate the 'masterauth' to prevent breaking master/replica replication
58-
# 2) it will overwrite replica's namespace with master's namespace, so be careful of in-using namespaces.
58+
# 2) it will overwrite replica's namespace with master's namespace, so be careful of in-using namespaces
59+
# 3) cannot switch off the namespace replication once it's enabled
5960
#
6061
# Default: no
6162
repl-namespace-enabled no

src/cluster/replication.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -976,7 +976,7 @@ Status ReplicationThread::parseWriteBatch(const std::string &batch_string) {
976976
}
977977
}
978978
} else if (write_batch_handler.Key() == kNamespaceDBKey) {
979-
auto s = srv_->GetNamespace()->Load();
979+
auto s = srv_->GetNamespace()->LoadAndRewrite();
980980
if (!s.IsOK()) {
981981
return s.Prefixed("failed to load namespaces");
982982
}

src/config/config.cc

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -517,15 +517,11 @@ void Config::initFieldCallback() {
517517
return Status::OK();
518518
}},
519519
{"repl-namespace-enabled",
520-
[this](Server *srv, const std::string &k, const std::string &v) -> Status {
521-
if (!srv || !cluster_enabled) return Status::OK();
522-
auto new_value = util::ToLower(v);
523-
// If the namespace replication is enabled/disabled, need to reload.
524-
if ((repl_namespace_enabled && new_value == "no") || (!repl_namespace_enabled && new_value == "yes")) {
525-
return srv->GetNamespace()->Load();
526-
}
527-
return Status::OK();
520+
[](Server *srv, const std::string &k, const std::string &v) -> Status {
521+
if (!srv) return Status::OK();
522+
return srv->GetNamespace()->LoadAndRewrite();
528523
}},
524+
529525
{"rocksdb.target_file_size_base",
530526
[this](Server *srv, const std::string &k, const std::string &v) -> Status {
531527
if (!srv) return Status::OK();

src/server/namespace.cc

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ Status IsNamespaceLegal(const std::string& ns) {
4343
return Status::OK();
4444
}
4545

46-
Status Namespace::Load() {
46+
Status Namespace::LoadAndRewrite() {
4747
auto config = storage_->GetConfig();
4848
// Load from the configuration file first
4949
tokens_ = config->load_tokens;
@@ -52,18 +52,25 @@ Status Namespace::Load() {
5252
// this can avoid missing some namespaces when turn on/off repl_namespace_enabled.
5353
std::string value;
5454
auto s = storage_->Get(rocksdb::ReadOptions(), cf_, kNamespaceDBKey, &value);
55-
if (!s.ok()) {
56-
if (s.IsNotFound()) return Status::OK();
55+
if (!s.ok() && !s.IsNotFound()) {
5756
return {Status::NotOK, s.ToString()};
5857
}
59-
jsoncons::json j = jsoncons::json::parse(value);
60-
for (const auto& iter : j.object_range()) {
61-
if (tokens_.find(iter.key()) == tokens_.end()) {
62-
// merge the namespace from db
63-
tokens_[iter.key()] = iter.value().as<std::string>();
58+
if (s.ok()) {
59+
// The namespace db key is existed, so it doesn't allow to switch off repl_namespace_enabled
60+
if (!config->repl_namespace_enabled) {
61+
return {Status::NotOK, "cannot switch off repl_namespace_enabled when namespaces exist in db"};
62+
}
63+
64+
jsoncons::json j = jsoncons::json::parse(value);
65+
for (const auto& iter : j.object_range()) {
66+
if (tokens_.find(iter.key()) == tokens_.end()) {
67+
// merge the namespace from db
68+
tokens_[iter.key()] = iter.value().as<std::string>();
69+
}
6470
}
6571
}
66-
return rewriteOrWriteDB();
72+
73+
return Rewrite();
6774
}
6875

6976
StatusOr<std::string> Namespace::Get(const std::string& ns) const {
@@ -108,7 +115,7 @@ Status Namespace::Set(const std::string& ns, const std::string& token) {
108115
}
109116
tokens_[token] = ns;
110117

111-
s = rewriteOrWriteDB();
118+
s = Rewrite();
112119
if (!s.IsOK()) {
113120
tokens_.erase(token);
114121
return s;
@@ -139,7 +146,7 @@ Status Namespace::Del(const std::string& ns) {
139146
for (const auto& iter : tokens_) {
140147
if (iter.second == ns) {
141148
tokens_.erase(iter.first);
142-
auto s = rewriteOrWriteDB();
149+
auto s = Rewrite();
143150
if (!s.IsOK()) {
144151
tokens_[iter.first] = iter.second;
145152
return s;
@@ -150,7 +157,7 @@ Status Namespace::Del(const std::string& ns) {
150157
return {Status::NotOK, kErrNamespaceNotFound};
151158
}
152159

153-
Status Namespace::rewriteOrWriteDB() {
160+
Status Namespace::Rewrite() {
154161
auto config = storage_->GetConfig();
155162
auto s = config->Rewrite(tokens_);
156163
if (!s.IsOK()) {

src/server/namespace.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,17 @@ class Namespace {
3434
Namespace(const Namespace &) = delete;
3535
Namespace &operator=(const Namespace &) = delete;
3636

37-
Status Load();
37+
Status LoadAndRewrite();
3838
StatusOr<std::string> Get(const std::string &ns) const;
3939
StatusOr<std::string> GetByToken(const std::string &token) const;
4040
Status Set(const std::string &ns, const std::string &token);
4141
Status Add(const std::string &ns, const std::string &token);
4242
Status Del(const std::string &ns);
4343
const std::map<std::string, std::string> &List() const { return tokens_; }
44+
Status Rewrite();
4445

4546
private:
4647
engine::Storage *storage_;
4748
rocksdb::ColumnFamilyHandle *cf_ = nullptr;
4849
std::map<std::string, std::string> tokens_;
49-
50-
Status rewriteOrWriteDB();
5150
};

src/server/server.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ Server::~Server() {
132132
// - feed-replica-data-info: generate checkpoint and send files list when full sync
133133
// - feed-replica-file: send SST files when slaves ask for full sync
134134
Status Server::Start() {
135-
auto s = namespace_.Load();
135+
auto s = namespace_.LoadAndRewrite();
136136
if (!s.IsOK()) {
137137
return s;
138138
}

tests/gocase/unit/namespace/namespace_test.go

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,11 @@ func TestNamespace(t *testing.T) {
8989
require.NoError(t, r.Err())
9090
require.Equal(t, "OK", r.Val())
9191
}
92+
for ns, token := range nsTokens {
93+
r := rdb.Do(ctx, "NAMESPACE", "GET", ns)
94+
require.NoError(t, r.Err())
95+
require.Equal(t, token, r.Val())
96+
}
9297

9398
srv.Restart()
9499
for ns, token := range nsTokens {
@@ -167,9 +172,9 @@ func TestNamespaceReplicate(t *testing.T) {
167172
"n4": "t4",
168173
}
169174

170-
t.Run("Replicate namespaces", func(t *testing.T) {
171-
require.NoError(t, masterRdb.ConfigSet(ctx, "repl-namespace-enabled", "yes").Err())
172-
require.NoError(t, slaveRdb.ConfigSet(ctx, "repl-namespace-enabled", "yes").Err())
175+
t.Run("Disable Replicate namespces", func(t *testing.T) {
176+
require.NoError(t, masterRdb.ConfigSet(ctx, "repl-namespace-enabled", "no").Err())
177+
require.NoError(t, slaveRdb.ConfigSet(ctx, "repl-namespace-enabled", "no").Err())
173178

174179
for ns, token := range nsTokens {
175180
r := masterRdb.Do(ctx, "NAMESPACE", "ADD", ns, token)
@@ -178,53 +183,61 @@ func TestNamespaceReplicate(t *testing.T) {
178183
}
179184
util.WaitForOffsetSync(t, slaveRdb, masterRdb)
180185

186+
// Can read namespaces on master
181187
for ns, token := range nsTokens {
182-
r := slaveRdb.Do(ctx, "NAMESPACE", "GET", ns)
188+
r := masterRdb.Do(ctx, "NAMESPACE", "GET", ns)
183189
require.NoError(t, r.Err())
184190
require.Equal(t, token, r.Val())
185191
}
192+
// Can't read namespaces on slave
193+
for ns := range nsTokens {
194+
r := slaveRdb.Do(ctx, "NAMESPACE", "GET", ns)
195+
require.EqualError(t, r.Err(), redis.Nil.Error())
196+
}
186197

187198
for ns := range nsTokens {
188199
r := masterRdb.Do(ctx, "NAMESPACE", "DEL", ns)
189200
require.NoError(t, r.Err())
190201
require.Equal(t, "OK", r.Val())
191202
}
192-
util.WaitForOffsetSync(t, slaveRdb, masterRdb)
193-
194-
for ns := range nsTokens {
195-
r := slaveRdb.Do(ctx, "NAMESPACE", "GET", ns)
196-
require.EqualError(t, r.Err(), redis.Nil.Error())
197-
}
198203
})
199204

200-
t.Run("Don't allow to operate slave's namespace if replication is enabled", func(t *testing.T) {
201-
r := slaveRdb.Do(ctx, "NAMESPACE", "ADD", "ns_xxxx", "token_xxxx")
202-
util.ErrorRegexp(t, r.Err(), ".*ERR namespace is read-only for slave.*")
203-
})
205+
t.Run("Enable Replicate namespaces", func(t *testing.T) {
206+
require.NoError(t, masterRdb.ConfigSet(ctx, "repl-namespace-enabled", "yes").Err())
207+
require.NoError(t, slaveRdb.ConfigSet(ctx, "repl-namespace-enabled", "yes").Err())
204208

205-
t.Run("Turn off namespace replication", func(t *testing.T) {
206-
require.NoError(t, masterRdb.ConfigSet(ctx, "repl-namespace-enabled", "no").Err())
207-
require.NoError(t, slaveRdb.ConfigSet(ctx, "repl-namespace-enabled", "no").Err())
208209
for ns, token := range nsTokens {
209210
r := masterRdb.Do(ctx, "NAMESPACE", "ADD", ns, token)
210211
require.NoError(t, r.Err())
211212
require.Equal(t, "OK", r.Val())
212213
}
214+
util.WaitForOffsetSync(t, slaveRdb, masterRdb)
215+
213216
for ns, token := range nsTokens {
214-
r := masterRdb.Do(ctx, "NAMESPACE", "GET", ns)
217+
r := slaveRdb.Do(ctx, "NAMESPACE", "GET", ns)
215218
require.NoError(t, r.Err())
216219
require.Equal(t, token, r.Val())
217220
}
221+
222+
for ns := range nsTokens {
223+
r := masterRdb.Do(ctx, "NAMESPACE", "DEL", ns)
224+
require.NoError(t, r.Err())
225+
require.Equal(t, "OK", r.Val())
226+
}
218227
util.WaitForOffsetSync(t, slaveRdb, masterRdb)
228+
219229
for ns := range nsTokens {
220230
r := slaveRdb.Do(ctx, "NAMESPACE", "GET", ns)
221231
require.EqualError(t, r.Err(), redis.Nil.Error())
222232
}
223233
})
224234

225-
t.Run("Allow to operate slave's namespace if replication is disabled", func(t *testing.T) {
235+
t.Run("Don't allow to operate slave's namespace if replication is enabled", func(t *testing.T) {
226236
r := slaveRdb.Do(ctx, "NAMESPACE", "ADD", "ns_xxxx", "token_xxxx")
227-
require.NoError(t, r.Err())
228-
require.Equal(t, "OK", r.Val())
237+
util.ErrorRegexp(t, r.Err(), ".*ERR namespace is read-only for slave.*")
238+
})
239+
240+
t.Run("Turn off namespace replication is not allowed", func(t *testing.T) {
241+
util.ErrorRegexp(t, masterRdb.ConfigSet(ctx, "repl-namespace-enabled", "no").Err(), ".*cannot switch off repl_namespace_enabled when namespaces exist in db.*")
229242
})
230243
}

0 commit comments

Comments
 (0)