Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions source/server/listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,12 @@ bool ListenerImpl::getReusePortOrDefault(Server::Instance& server,
return initial_reuse_port_value;
}

bool ListenerImpl::hasCompatibleAddress(const ListenerImpl& other) const {
return *address() == *other.address() &&
Network::Utility::protobufAddressSocketType(config_.address()) ==
Network::Utility::protobufAddressSocketType(other.config_.address());
}

bool ListenerMessageUtil::filterChainOnlyChange(const envoy::config::listener::v3::Listener& lhs,
const envoy::config::listener::v3::Listener& rhs) {
Protobuf::util::MessageDifferencer differencer;
Expand Down
3 changes: 3 additions & 0 deletions source/server/listener_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ class ListenerImpl final : public Network::ListenerConfig,
static bool getReusePortOrDefault(Server::Instance& server,
const envoy::config::listener::v3::Listener& config);

// Check whether a new listener can share sockets with this listener.
bool hasCompatibleAddress(const ListenerImpl& other) const;

// Network::ListenerConfig
Network::FilterChainManager& filterChainManager() override { return filter_chain_manager_; }
Network::FilterChainFactory& filterChainFactory() override { return *this; }
Expand Down
19 changes: 9 additions & 10 deletions source/server/listener_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ bool ListenerManagerImpl::addOrUpdateListenerInternal(
// In this case we can just replace inline.
ASSERT(workers_started_);
new_listener->debugLog("update warming listener");
if (*(*existing_warming_listener)->address() != *new_listener->address()) {
if (!(*existing_warming_listener)->hasCompatibleAddress(*new_listener)) {
setNewOrDrainingSocketFactory(name, config.address(), *new_listener);
} else {
new_listener->setSocketFactory((*existing_warming_listener)->getSocketFactory().clone());
Expand All @@ -457,7 +457,7 @@ bool ListenerManagerImpl::addOrUpdateListenerInternal(
} else if (existing_active_listener != active_listeners_.end()) {
// In this case we have no warming listener, so what we do depends on whether workers
// have been started or not.
if (*(*existing_active_listener)->address() != *new_listener->address()) {
if (!(*existing_active_listener)->hasCompatibleAddress(*new_listener)) {
setNewOrDrainingSocketFactory(name, config.address(), *new_listener);
} else {
new_listener->setSocketFactory((*existing_active_listener)->getSocketFactory().clone());
Expand Down Expand Up @@ -495,10 +495,10 @@ bool ListenerManagerImpl::addOrUpdateListenerInternal(
return true;
}

bool ListenerManagerImpl::hasListenerWithAddress(const ListenerList& list,
const Network::Address::Instance& address) {
for (const auto& listener : list) {
if (*listener->address() == address) {
bool ListenerManagerImpl::hasListenerWithCompatibleAddress(const ListenerList& list,
const ListenerImpl& listener) {
for (const auto& existing_listener : list) {
if (existing_listener->hasCompatibleAddress(listener)) {
return true;
}
}
Expand Down Expand Up @@ -962,8 +962,8 @@ void ListenerManagerImpl::setNewOrDrainingSocketFactory(
// is an edge case and nothing will explicitly break, but there is no possibility that two
// listeners that do not bind will ever be used. Only the first one will be used when searched for
// by address. Thus we block it.
if (!listener.bindToPort() && (hasListenerWithAddress(warming_listeners_, *listener.address()) ||
hasListenerWithAddress(active_listeners_, *listener.address()))) {
if (!listener.bindToPort() && (hasListenerWithCompatibleAddress(warming_listeners_, listener) ||
hasListenerWithCompatibleAddress(active_listeners_, listener))) {
const std::string message =
fmt::format("error adding listener: '{}' has duplicate address '{}' as existing listener",
name, listener.address()->asString());
Expand All @@ -980,8 +980,7 @@ void ListenerManagerImpl::setNewOrDrainingSocketFactory(
draining_listeners_.cbegin(), draining_listeners_.cend(),
[&listener](const DrainingListener& draining_listener) {
return draining_listener.listener_->listenSocketFactory().getListenSocket(0)->isOpen() &&
*listener.address() ==
*draining_listener.listener_->listenSocketFactory().localAddress();
listener.hasCompatibleAddress(*draining_listener.listener_);
});

if (existing_draining_listener != draining_listeners_.cend()) {
Expand Down
4 changes: 2 additions & 2 deletions source/server/listener_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ class ListenerManagerImpl : public ListenerManager, Logger::Loggable<Logger::Id:

ProtobufTypes::MessagePtr dumpListenerConfigs(const Matchers::StringMatcher& name_matcher);
static ListenerManagerStats generateStats(Stats::Scope& scope);
static bool hasListenerWithAddress(const ListenerList& list,
const Network::Address::Instance& address);
static bool hasListenerWithCompatibleAddress(const ListenerList& list,
const ListenerImpl& listener);
void updateWarmingActiveGauges() {
// Using set() avoids a multiple modifiers problem during the multiple processes phase of hot
// restart.
Expand Down
27 changes: 18 additions & 9 deletions test/server/listener_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,18 @@ class ListenerManagerImplForInPlaceFilterChainUpdateTest : public Event::Simulat

Network::MockListenSocket*
expectUpdateToThenDrain(const envoy::config::listener::v3::Listener& new_listener_proto,
ListenerHandle* old_listener_handle, Network::MockListenSocket& socket) {
auto duplicated_socket = new NiceMock<Network::MockListenSocket>();
EXPECT_CALL(socket, duplicate())
.WillOnce(Return(ByMove(std::unique_ptr<Network::Socket>(duplicated_socket))));
ListenerHandle* old_listener_handle,
OptRef<Network::MockListenSocket> socket,
ListenerComponentFactory::BindType bind_type = default_bind_type) {
Network::MockListenSocket* new_socket;
if (socket.has_value()) {
new_socket = new NiceMock<Network::MockListenSocket>();
EXPECT_CALL(socket.value().get(), duplicate())
.WillOnce(Return(ByMove(std::unique_ptr<Network::Socket>(new_socket))));
} else {
new_socket = listener_factory_.socket_.get();
EXPECT_CALL(listener_factory_, createListenSocket(_, _, _, bind_type, 0));
}
EXPECT_CALL(*worker_, addListener(_, _, _));
EXPECT_CALL(*worker_, stopListener(_, _));
EXPECT_CALL(*old_listener_handle->drain_manager_, startDrainSequence(_));
Expand All @@ -140,7 +148,7 @@ class ListenerManagerImplForInPlaceFilterChainUpdateTest : public Event::Simulat

EXPECT_CALL(*old_listener_handle, onDestroy());
worker_->callRemovalCompletion();
return duplicated_socket;
return new_socket;
}

void expectRemove(const envoy::config::listener::v3::Listener& listener_proto,
Expand Down Expand Up @@ -5105,6 +5113,8 @@ TEST_F(ListenerManagerImplForInPlaceFilterChainUpdateTest, TraditionalUpdateIfWo
EXPECT_EQ(0, server_.stats_store_.counter("listener_manager.listener_in_place_updated").value());
}

// This case also verifies that listeners that share port but do not share socket type (TCP vs. UDP)
// do not share a listener.
TEST_F(ListenerManagerImplForInPlaceFilterChainUpdateTest, TraditionalUpdateIfAnyListenerIsNotTcp) {
EXPECT_CALL(*worker_, start(_, _));
manager_->startWorkers(guard_dog_, callback_.AsStdFunction());
Expand All @@ -5120,10 +5130,9 @@ TEST_F(ListenerManagerImplForInPlaceFilterChainUpdateTest, TraditionalUpdateIfAn
envoy::config::core::v3::SocketAddress_Protocol::SocketAddress_Protocol_UDP);

ListenerHandle* listener_foo_update1 = expectListenerCreate(false, true);
auto duplicated_socket =
expectUpdateToThenDrain(new_listener_proto, listener_foo, *listener_factory_.socket_);

expectRemove(new_listener_proto, listener_foo_update1, *duplicated_socket);
expectUpdateToThenDrain(new_listener_proto, listener_foo, OptRef<Network::MockListenSocket>(),
ListenerComponentFactory::BindType::ReusePort);
expectRemove(new_listener_proto, listener_foo_update1, *listener_factory_.socket_);

EXPECT_EQ(0UL, manager_->listeners().size());
EXPECT_EQ(0, server_.stats_store_.counter("listener_manager.listener_in_place_updated").value());
Expand Down