Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ Status Cluster::SetNodeId(const std::string &node_id) {
}

// Set replication relationship
if (myself_) return SetMasterSlaveRepl();

return Status::OK();
return SetMasterSlaveRepl();
}

// The reason why the new version MUST be +1 of current version is that,
Expand Down Expand Up @@ -204,11 +202,8 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
}

// Set replication relationship
if (myself_) {
s = SetMasterSlaveRepl();
if (!s.IsOK()) {
return s.Prefixed("failed to set master-replica replication");
}
if (auto s = SetMasterSlaveRepl(); !s.IsOK()) {
return s.Prefixed("failed to set master-replica replication");
}

// Clear data of migrated slots
Comment on lines +206 to 209
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should here check myself_ == nullptr and skip checking for:

  if (!migrated_slots_.empty()) {
    engine::Context ctx(srv_->storage);
    for (const auto &[slot, _] : migrated_slots_) {
      if (slots_nodes_[slot] != myself_) {
        auto s = srv_->slot_migrator->ClearKeysOfSlotRange(ctx, kDefaultNamespace, SlotRange::GetPoint(slot));
        if (!s.ok()) {
          LOG(ERROR) << "failed to clear data of migrated slots: " << s.ToString();
        }
      }
    }
  }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer keeping the previous behavior here since it should clear the keys of migrating slots once removed from the cluster.

Expand All @@ -234,7 +229,13 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
Status Cluster::SetMasterSlaveRepl() {
if (!srv_) return Status::OK();

if (!myself_) return Status::OK();
// If the node is not in the cluster topology, remove the master replication if it's a replica.
if (!myself_) {
if (auto s = srv_->RemoveMaster(); !s.IsOK()) {
return s.Prefixed("failed to remove master");
}
return Status::OK();
}

if (myself_->role == kClusterMaster) {
// Master mode
Expand Down
51 changes: 36 additions & 15 deletions tests/gocase/integration/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,58 +154,79 @@ func TestClusterNodes(t *testing.T) {
}

func TestClusterReplicas(t *testing.T) {
srv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
defer srv.Close()

ctx := context.Background()
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()
srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
rdb1 := srv1.NewClient()
srv2 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
rdb2 := srv2.NewClient()

defer func() {
srv1.Close()
srv2.Close()
require.NoError(t, rdb1.Close())
require.NoError(t, rdb2.Close())
}()

nodes := ""

master1ID := "bb2e5b3c5282086df51eff6b3e35519aede96fa6"
master1Node := fmt.Sprintf("%s %s %d master - 0-8191", master1ID, srv.Host(), srv.Port())
master1Node := fmt.Sprintf("%s %s %d master - 0-8191", master1ID, srv1.Host(), srv1.Port())
nodes += master1Node + "\n"

master2ID := "159dde1194ebf5bfc5a293dff839c3d1476f2a49"
master2Node := fmt.Sprintf("%s %s %d master - 8192-16383", master2ID, srv.Host(), srv.Port())
master2Node := fmt.Sprintf("%s %s %d master - 8192-16383", master2ID, srv1.Host(), srv1.Port())
nodes += master2Node + "\n"

replica2ID := "7dbee3d628f04cc5d763b36e92b10533e627a1d0"
replica2Node := fmt.Sprintf("%s %s %d slave %s", replica2ID, srv.Host(), srv.Port(), master2ID)
replica2Node := fmt.Sprintf("%s %s %d slave %s", replica2ID, srv2.Host(), srv2.Port(), master2ID)
nodes += replica2Node

require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODES", nodes, "2").Err())
require.EqualValues(t, "2", rdb.Do(ctx, "clusterx", "version").Val())
require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", nodes, "2").Err())
require.EqualValues(t, "2", rdb1.Do(ctx, "clusterx", "version").Val())
require.NoError(t, rdb2.Do(ctx, "clusterx", "SETNODES", nodes, "2").Err())

t.Run("with replicas", func(t *testing.T) {
replicas, err := rdb.Do(ctx, "cluster", "replicas", "159dde1194ebf5bfc5a293dff839c3d1476f2a49").Text()
replicas, err := rdb1.Do(ctx, "cluster", "replicas", "159dde1194ebf5bfc5a293dff839c3d1476f2a49").Text()
require.NoError(t, err)
fields := strings.Split(replicas, " ")
require.Len(t, fields, 8)
require.Equal(t, fmt.Sprintf("%s@%d", srv.HostPort(), srv.Port()+10000), fields[1])
require.Equal(t, fmt.Sprintf("%s@%d", srv2.HostPort(), srv2.Port()+10000), fields[1])
require.Equal(t, "slave", fields[2])
require.Equal(t, master2ID, fields[3])
require.Equal(t, "connected\n", fields[7])
})

t.Run("without replicas", func(t *testing.T) {
replicas, err := rdb.Do(ctx, "cluster", "replicas", "bb2e5b3c5282086df51eff6b3e35519aede96fa6").Text()
replicas, err := rdb1.Do(ctx, "cluster", "replicas", "bb2e5b3c5282086df51eff6b3e35519aede96fa6").Text()
require.NoError(t, err)
require.Empty(t, replicas)
})

t.Run("send command to replica", func(t *testing.T) {
err := rdb.Do(ctx, "cluster", "replicas", "7dbee3d628f04cc5d763b36e92b10533e627a1d0").Err()
err := rdb1.Do(ctx, "cluster", "replicas", "7dbee3d628f04cc5d763b36e92b10533e627a1d0").Err()
require.Error(t, err)
require.Contains(t, err.Error(), "The node isn't a master")
})

t.Run("unknown node", func(t *testing.T) {
err := rdb.Do(ctx, "cluster", "replicas", "unknown").Err()
err := rdb1.Do(ctx, "cluster", "replicas", "unknown").Err()
require.Error(t, err)
require.Contains(t, err.Error(), "Invalid cluster node id")
})

t.Run("remove the replication if the node is not in the cluster", func(t *testing.T) {
require.Equal(t, "slave", util.FindInfoEntry(rdb2, "role"))
// remove the cluster replica node
clusterNode := fmt.Sprintf("%s\n%s", master1Node, master2Node)
err := rdb1.Do(ctx, "clusterx", "SETNODES", clusterNode, "3").Err()
require.NoError(t, err)
err = rdb2.Do(ctx, "clusterx", "SETNODES", clusterNode, "3").Err()
require.NoError(t, err)

require.Eventually(t, func() bool {
return util.FindInfoEntry(rdb2, "role") == "master"
}, 5*time.Second, 100*time.Millisecond)
})
}

func TestClusterDumpAndLoadClusterNodesInfo(t *testing.T) {
Expand Down
Loading