Skip to content
4 changes: 2 additions & 2 deletions source/extensions/clusters/redis/redis_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ void RedisCluster::onClusterSlotUpdate(ClusterSlotsPtr&& slots) {
new_hosts.emplace_back(new RedisHost(info(), "", slot.primary(), *this, true, time_source_));
all_new_hosts.emplace(slot.primary()->asString());
for (auto const& replica : slot.replicas()) {
new_hosts.emplace_back(new RedisHost(info(), "", replica, *this, false, time_source_));
all_new_hosts.emplace(replica->asString());
new_hosts.emplace_back(new RedisHost(info(), "", replica.second, *this, false, time_source_));
all_new_hosts.emplace(replica.first);
}
}

Expand Down
12 changes: 9 additions & 3 deletions source/extensions/clusters/redis/redis_cluster_lb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,14 @@ namespace Clusters {
namespace Redis {

bool ClusterSlot::operator==(const Envoy::Extensions::Clusters::Redis::ClusterSlot& rhs) const {
return start_ == rhs.start_ && end_ == rhs.end_ && primary_ == rhs.primary_ &&
replicas_ == rhs.replicas_;
if (start_ != rhs.start_ || end_ != rhs.end_ || *primary_ != *rhs.primary_ ||
replicas_.size() != rhs.replicas_.size()) {
return false;
}
// The value type is shared_ptr, and the shared_ptr is not same one even for same ip:port.
// so, just compare the key here.
return std::equal(replicas_.begin(), replicas_.end(), rhs.replicas_.begin(), rhs.replicas_.end(),
[](const auto& it1, const auto& it2) { return it1.first == it2.first; });
Comment on lines +15 to +16
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might have been covered in PR discussions already but how does this differ from just doing replicas_ = rhs.replicas?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value type is shared_ptr, and the shared_ptr is not name one even for same ip:port.

Copy link
Copy Markdown
Contributor

@snowp snowp May 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see this is just comparing the key. Perhaps add a comment to make this clearer?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping on this ^^

Would be great to have this covered by a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I forgot that, now I have add a comment. Thank you!

}

// RedisClusterLoadBalancerFactory
Expand Down Expand Up @@ -43,7 +49,7 @@ bool RedisClusterLoadBalancerFactory::onClusterSlotUpdate(ClusterSlotsPtr&& slot
primary_and_replicas->push_back(primary_host->second);

for (auto const& replica : slot.replicas()) {
auto replica_host = all_hosts.find(replica->asString());
auto replica_host = all_hosts.find(replica.first);
ASSERT(replica_host != all_hosts.end(),
"we expect all address to be found in the updated_hosts");
replicas->push_back(replica_host->second);
Expand Down
9 changes: 4 additions & 5 deletions source/extensions/clusters/redis/redis_cluster_lb.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
#include "extensions/filters/network/common/redis/codec.h"
#include "extensions/filters/network/common/redis/supported_commands.h"

#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/container/btree_map.h"
#include "absl/synchronization/mutex.h"

namespace Envoy {
Expand All @@ -37,11 +36,11 @@ class ClusterSlot {
int64_t start() const { return start_; }
int64_t end() const { return end_; }
Network::Address::InstanceConstSharedPtr primary() const { return primary_; }
const absl::flat_hash_set<Network::Address::InstanceConstSharedPtr>& replicas() const {
const absl::btree_map<std::string, Network::Address::InstanceConstSharedPtr>& replicas() const {
return replicas_;
}
void addReplica(Network::Address::InstanceConstSharedPtr replica_address) {
replicas_.insert(std::move(replica_address));
replicas_.emplace(replica_address->asString(), std::move(replica_address));
}

bool operator==(const ClusterSlot& rhs) const;
Expand All @@ -50,7 +49,7 @@ class ClusterSlot {
int64_t start_;
int64_t end_;
Network::Address::InstanceConstSharedPtr primary_;
absl::flat_hash_set<Network::Address::InstanceConstSharedPtr> replicas_;
absl::btree_map<std::string, Network::Address::InstanceConstSharedPtr> replicas_;
};

using ClusterSlotsPtr = std::unique_ptr<std::vector<ClusterSlot>>;
Expand Down
20 changes: 17 additions & 3 deletions test/extensions/clusters/redis/redis_cluster_lb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -349,18 +349,30 @@ TEST_F(RedisClusterLoadBalancerTest, ClusterSlotUpdate) {

TEST_F(RedisClusterLoadBalancerTest, ClusterSlotNoUpdate) {
Upstream::HostVector hosts{Upstream::makeTestHost(info_, "tcp://127.0.0.1:90", simTime()),
Upstream::makeTestHost(info_, "tcp://127.0.0.1:91", simTime()),
Upstream::makeTestHost(info_, "tcp://127.0.0.1:92", simTime()),
Upstream::makeTestHost(info_, "tcp://127.0.0.1:90", simTime()),
Upstream::makeTestHost(info_, "tcp://127.0.0.1:91", simTime()),
Upstream::makeTestHost(info_, "tcp://127.0.0.1:92", simTime())};
Upstream::HostVector replicas{Upstream::makeTestHost(info_, "tcp://127.0.0.2:90", simTime()),
Upstream::makeTestHost(info_, "tcp://127.0.0.2:91", simTime()),
Upstream::makeTestHost(info_, "tcp://127.0.0.2:90", simTime()),
Upstream::makeTestHost(info_, "tcp://127.0.0.2:91", simTime())};

ClusterSlotsPtr slots = std::make_unique<std::vector<ClusterSlot>>(std::vector<ClusterSlot>{
ClusterSlot(0, 1000, hosts[0]->address()),
ClusterSlot(1001, 2000, hosts[1]->address()),
ClusterSlot(2001, 16383, hosts[2]->address()),
});

(*slots)[0].addReplica(replicas[0]->address());
(*slots)[0].addReplica(replicas[1]->address());
Upstream::HostMap all_hosts{
{hosts[0]->address()->asString(), hosts[0]},
{hosts[1]->address()->asString(), hosts[1]},
{hosts[2]->address()->asString(), hosts[2]},
{replicas[0]->address()->asString(), replicas[0]},
{replicas[1]->address()->asString(), replicas[1]},
};

// A list of (hash: host_index) pair.
Expand All @@ -373,10 +385,12 @@ TEST_F(RedisClusterLoadBalancerTest, ClusterSlotNoUpdate) {

// Calling cluster slot update without change should not change assignment.
std::vector<ClusterSlot> updated_slot{
ClusterSlot(0, 1000, hosts[0]->address()),
ClusterSlot(1001, 2000, hosts[1]->address()),
ClusterSlot(2001, 16383, hosts[2]->address()),
ClusterSlot(0, 1000, hosts[3]->address()),
ClusterSlot(1001, 2000, hosts[4]->address()),
ClusterSlot(2001, 16383, hosts[5]->address()),
};
updated_slot[0].addReplica(replicas[3]->address());
updated_slot[0].addReplica(replicas[2]->address());
EXPECT_EQ(false, factory_->onClusterSlotUpdate(
std::make_unique<std::vector<ClusterSlot>>(updated_slot), all_hosts));
validateAssignment(hosts, expected_assignments);
Expand Down