diff --git a/source/extensions/clusters/redis/redis_cluster.cc b/source/extensions/clusters/redis/redis_cluster.cc index 297248a6b6a15..822eab1166be6 100644 --- a/source/extensions/clusters/redis/redis_cluster.cc +++ b/source/extensions/clusters/redis/redis_cluster.cc @@ -100,8 +100,8 @@ void RedisCluster::onClusterSlotUpdate(ClusterSlotsPtr&& slots) { new_hosts.emplace_back(new RedisHost(info(), "", slot.primary(), *this, true, time_source_)); all_new_hosts.emplace(slot.primary()->asString()); for (auto const& replica : slot.replicas()) { - new_hosts.emplace_back(new RedisHost(info(), "", replica, *this, false, time_source_)); - all_new_hosts.emplace(replica->asString()); + new_hosts.emplace_back(new RedisHost(info(), "", replica.second, *this, false, time_source_)); + all_new_hosts.emplace(replica.first); } } diff --git a/source/extensions/clusters/redis/redis_cluster_lb.cc b/source/extensions/clusters/redis/redis_cluster_lb.cc index 43bb0f9b32224..25f38f448dae0 100644 --- a/source/extensions/clusters/redis/redis_cluster_lb.cc +++ b/source/extensions/clusters/redis/redis_cluster_lb.cc @@ -6,8 +6,14 @@ namespace Clusters { namespace Redis { bool ClusterSlot::operator==(const Envoy::Extensions::Clusters::Redis::ClusterSlot& rhs) const { - return start_ == rhs.start_ && end_ == rhs.end_ && primary_ == rhs.primary_ && - replicas_ == rhs.replicas_; + if (start_ != rhs.start_ || end_ != rhs.end_ || *primary_ != *rhs.primary_ || + replicas_.size() != rhs.replicas_.size()) { + return false; + } + // The value type is shared_ptr, and the shared_ptr is not same one even for same ip:port. + // so, just compare the key here. + return std::equal(replicas_.begin(), replicas_.end(), rhs.replicas_.begin(), rhs.replicas_.end(), + [](const auto& it1, const auto& it2) { return it1.first == it2.first; }); } // RedisClusterLoadBalancerFactory @@ -43,7 +49,7 @@ bool RedisClusterLoadBalancerFactory::onClusterSlotUpdate(ClusterSlotsPtr&& slot primary_and_replicas->push_back(primary_host->second); for (auto const& replica : slot.replicas()) { - auto replica_host = all_hosts.find(replica->asString()); + auto replica_host = all_hosts.find(replica.first); ASSERT(replica_host != all_hosts.end(), "we expect all address to be found in the updated_hosts"); replicas->push_back(replica_host->second); diff --git a/source/extensions/clusters/redis/redis_cluster_lb.h b/source/extensions/clusters/redis/redis_cluster_lb.h index 561de3b681e53..7ca27407d1612 100644 --- a/source/extensions/clusters/redis/redis_cluster_lb.h +++ b/source/extensions/clusters/redis/redis_cluster_lb.h @@ -18,8 +18,7 @@ #include "extensions/filters/network/common/redis/codec.h" #include "extensions/filters/network/common/redis/supported_commands.h" -#include "absl/container/flat_hash_map.h" -#include "absl/container/flat_hash_set.h" +#include "absl/container/btree_map.h" #include "absl/synchronization/mutex.h" namespace Envoy { @@ -37,11 +36,11 @@ class ClusterSlot { int64_t start() const { return start_; } int64_t end() const { return end_; } Network::Address::InstanceConstSharedPtr primary() const { return primary_; } - const absl::flat_hash_set& replicas() const { + const absl::btree_map& replicas() const { return replicas_; } void addReplica(Network::Address::InstanceConstSharedPtr replica_address) { - replicas_.insert(std::move(replica_address)); + replicas_.emplace(replica_address->asString(), std::move(replica_address)); } bool operator==(const ClusterSlot& rhs) const; @@ -50,7 +49,7 @@ class ClusterSlot { int64_t start_; int64_t end_; Network::Address::InstanceConstSharedPtr primary_; - absl::flat_hash_set replicas_; + absl::btree_map replicas_; }; using ClusterSlotsPtr = std::unique_ptr>; diff --git a/test/extensions/clusters/redis/redis_cluster_lb_test.cc b/test/extensions/clusters/redis/redis_cluster_lb_test.cc index a0ed75aedf1f9..1b7abc6bb9420 100644 --- a/test/extensions/clusters/redis/redis_cluster_lb_test.cc +++ b/test/extensions/clusters/redis/redis_cluster_lb_test.cc @@ -349,18 +349,30 @@ TEST_F(RedisClusterLoadBalancerTest, ClusterSlotUpdate) { TEST_F(RedisClusterLoadBalancerTest, ClusterSlotNoUpdate) { Upstream::HostVector hosts{Upstream::makeTestHost(info_, "tcp://127.0.0.1:90", simTime()), + Upstream::makeTestHost(info_, "tcp://127.0.0.1:91", simTime()), + Upstream::makeTestHost(info_, "tcp://127.0.0.1:92", simTime()), + Upstream::makeTestHost(info_, "tcp://127.0.0.1:90", simTime()), Upstream::makeTestHost(info_, "tcp://127.0.0.1:91", simTime()), Upstream::makeTestHost(info_, "tcp://127.0.0.1:92", simTime())}; + Upstream::HostVector replicas{Upstream::makeTestHost(info_, "tcp://127.0.0.2:90", simTime()), + Upstream::makeTestHost(info_, "tcp://127.0.0.2:91", simTime()), + Upstream::makeTestHost(info_, "tcp://127.0.0.2:90", simTime()), + Upstream::makeTestHost(info_, "tcp://127.0.0.2:91", simTime())}; ClusterSlotsPtr slots = std::make_unique>(std::vector{ ClusterSlot(0, 1000, hosts[0]->address()), ClusterSlot(1001, 2000, hosts[1]->address()), ClusterSlot(2001, 16383, hosts[2]->address()), }); + + (*slots)[0].addReplica(replicas[0]->address()); + (*slots)[0].addReplica(replicas[1]->address()); Upstream::HostMap all_hosts{ {hosts[0]->address()->asString(), hosts[0]}, {hosts[1]->address()->asString(), hosts[1]}, {hosts[2]->address()->asString(), hosts[2]}, + {replicas[0]->address()->asString(), replicas[0]}, + {replicas[1]->address()->asString(), replicas[1]}, }; // A list of (hash: host_index) pair. @@ -373,10 +385,12 @@ TEST_F(RedisClusterLoadBalancerTest, ClusterSlotNoUpdate) { // Calling cluster slot update without change should not change assignment. std::vector updated_slot{ - ClusterSlot(0, 1000, hosts[0]->address()), - ClusterSlot(1001, 2000, hosts[1]->address()), - ClusterSlot(2001, 16383, hosts[2]->address()), + ClusterSlot(0, 1000, hosts[3]->address()), + ClusterSlot(1001, 2000, hosts[4]->address()), + ClusterSlot(2001, 16383, hosts[5]->address()), }; + updated_slot[0].addReplica(replicas[3]->address()); + updated_slot[0].addReplica(replicas[2]->address()); EXPECT_EQ(false, factory_->onClusterSlotUpdate( std::make_unique>(updated_slot), all_hosts)); validateAssignment(hosts, expected_assignments);