Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Bug Fixes
---------
*Changes expected to improve the state of the world and are unlikely to have negative effects*

* listener: fixed the crash when updating listeners that do not bind to port.

Removed Config or Runtime
-------------------------
*Normally occurs at the end of the* :ref:`deprecation period <deprecated>`
Expand Down
41 changes: 39 additions & 2 deletions source/common/network/listen_socket_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@

namespace Envoy {
namespace Network {
namespace {
std::string getConnectionInfoString(const ConnectionInfoProvider& conn_info) {
std::stringstream out;
conn_info.dumpState(out, 0);
return out.str();
}
} // namespace

class ListenSocketImpl : public SocketImpl {
protected:
Expand All @@ -33,6 +40,23 @@ class ListenSocketImpl : public SocketImpl {
void setupSocket(const Network::Socket::OptionsSharedPtr& options);
void setListenSocketOptions(const Network::Socket::OptionsSharedPtr& options);
Api::SysCallIntResult bind(Network::Address::InstanceConstSharedPtr address) override;

void close() override {
RELEASE_ASSERT(io_handle_ != nullptr,
Comment thread
lambdai marked this conversation as resolved.
Outdated
absl::StrCat(__FUNCTION__,
" is called from the ListenSocket with no io handle. Socket info: ",
getConnectionInfoString(*connection_info_provider_)));
if (io_handle_->isOpen()) {
io_handle_->close();
}
}
bool isOpen() const override {
RELEASE_ASSERT(io_handle_ != nullptr,
absl::StrCat(__FUNCTION__,
" is called from the ListenSocket with no io handle. Socket info: ",
getConnectionInfoString(*connection_info_provider_)));
return io_handle_->isOpen();
}
};

/**
Expand Down Expand Up @@ -79,6 +103,18 @@ template <typename T> class NetworkListenSocket : public ListenSocketImpl {

Socket::Type socketType() const override { return T::type; }

SocketPtr duplicate() override {
if (io_handle_ == nullptr) {
// This is a listen socket that does not bind to port. Pass nullptr socket options.
return std::make_unique<NetworkListenSocket<T>>(connection_info_provider_->localAddress(),
/*options=*/nullptr, /*bind_to_port*/ false);
} else {
// TODO(lambdai): verify if duplicate is all the need to set up a TCP/UDP socket. Should
// socket options be applied along with duplicate?
return ListenSocketImpl::duplicate();
Comment thread
lambdai marked this conversation as resolved.
Outdated
}
}

// These four overrides are introduced to perform check. A null io handle is possible only if the
// the owner socket is a listen socket that does not bind to port.
IoHandle& ioHandle() override {
Expand All @@ -97,8 +133,9 @@ template <typename T> class NetworkListenSocket : public ListenSocketImpl {
}
}
bool isOpen() const override {
ASSERT(io_handle_ != nullptr);
return io_handle_->isOpen();
return io_handle_ == nullptr ? false // Consider listen socket as closed if it does not bind to
// port. No fd will leak.
: io_handle_->isOpen();
}

protected:
Expand Down
28 changes: 22 additions & 6 deletions test/common/network/listen_socket_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,15 @@ TEST(ConnectionSocketImplTest, LowerCaseRequestedServerName) {

template <Network::Socket::Type Type>
class ListenSocketImplTest : public testing::TestWithParam<Address::IpVersion> {
using ListenSocketType = NetworkListenSocket<NetworkSocketTrait<Type>>;

protected:
ListenSocketImplTest() : version_(GetParam()) {}
const Address::IpVersion version_;

template <typename... Args>
std::unique_ptr<ListenSocketImpl> createListenSocketPtr(Args&&... args) {
using NetworkSocketTraitType = NetworkSocketTrait<Type>;

return std::make_unique<NetworkListenSocket<NetworkSocketTraitType>>(
std::forward<Args>(args)...);
std::unique_ptr<ListenSocketType> createListenSocketPtr(Args&&... args) {
return std::make_unique<ListenSocketType>(std::forward<Args>(args)...);
}

void testBindSpecificPort() {
Expand Down Expand Up @@ -76,7 +75,7 @@ class ListenSocketImplTest : public testing::TestWithParam<Address::IpVersion> {
EXPECT_CALL(*option, setOption(_, envoy::config::core::v3::SocketOption::STATE_PREBIND))
.WillOnce(Return(true));
options->emplace_back(std::move(option));
std::unique_ptr<ListenSocketImpl> socket1;
std::unique_ptr<ListenSocketType> socket1;
try {
socket1 = createListenSocketPtr(addr, options, true);
} catch (SocketBindException& e) {
Expand Down Expand Up @@ -139,6 +138,19 @@ class ListenSocketImplTest : public testing::TestWithParam<Address::IpVersion> {
EXPECT_GT(socket->connectionInfoProvider().localAddress()->ip()->port(), 0U);
EXPECT_EQ(Type, socket->socketType());
}

// Verify that a listen sockets that do not bind to port can be duplicated and closed.
void testNotBindToPort() {
auto local_address = version_ == Address::IpVersion::v4 ? Utility::getIpv6AnyAddress()
: Utility::getIpv4AnyAddress();
auto socket = NetworkListenSocket<NetworkSocketTrait<Type>>(local_address, nullptr,
/*bind_to_port=*/false);
auto dup_socket = socket.duplicate();
EXPECT_FALSE(socket.isOpen());
EXPECT_FALSE(dup_socket->isOpen());
socket.close();
dup_socket->close();
}
};

using ListenSocketImplTestTcp = ListenSocketImplTest<Network::Socket::Type::Stream>;
Expand Down Expand Up @@ -228,6 +240,10 @@ TEST_P(ListenSocketImplTestTcp, BindPortZero) { testBindPortZero(); }

TEST_P(ListenSocketImplTestUdp, BindPortZero) { testBindPortZero(); }

TEST_P(ListenSocketImplTestTcp, NotBindToPortAccess) { testNotBindToPort(); }

TEST_P(ListenSocketImplTestUdp, NotBindToPortAccess) { testNotBindToPort(); }

} // namespace
} // namespace Network
} // namespace Envoy
12 changes: 10 additions & 2 deletions test/integration/filters/address_restore_listener_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,22 @@
namespace Envoy {

// The FakeOriginalDstListenerFilter restore desired local address without the dependency of OS.
// Ipv6 and Ipv4 addresses are restored to the corresponding loopback ip address and port 80.
class FakeOriginalDstListenerFilter : public Network::ListenerFilter {
public:
// Network::ListenerFilter
Network::FilterStatus onAccept(Network::ListenerFilterCallbacks& cb) override {
FANCY_LOG(debug, "in FakeOriginalDstListenerFilter::onAccept");
Network::ConnectionSocket& socket = cb.socket();
socket.connectionInfoProvider().restoreLocalAddress(
std::make_shared<Network::Address::Ipv4Instance>("127.0.0.2", 80));
auto local_address = socket.connectionInfoProvider().localAddress();
if (local_address != nullptr &&
local_address->ip()->version() == Network::Address::IpVersion::v6) {
socket.connectionInfoProvider().restoreLocalAddress(
std::make_shared<Network::Address::Ipv6Instance>("::1", 80));
} else {
socket.connectionInfoProvider().restoreLocalAddress(
std::make_shared<Network::Address::Ipv4Instance>("127.0.0.1", 80));
}
FANCY_LOG(debug, "current local socket address is {} restored = {}",
socket.connectionInfoProvider().localAddress()->asString(),
socket.connectionInfoProvider().localAddressRestored());
Expand Down
102 changes: 64 additions & 38 deletions test/integration/listener_lds_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,11 @@ TEST_P(ListenerIntegrationTest, ChangeListenerAddress) {
EXPECT_EQ(request_size, upstream_request_->bodyLength());
}

struct PerConnection {
std::string response_;
std::unique_ptr<RawConnectionDriver> client_conn_;
FakeRawConnectionPtr upstream_conn_;
};
class RebalancerTest : public testing::TestWithParam<Network::Address::IpVersion>,
public BaseIntegrationTest {
public:
Expand Down Expand Up @@ -585,10 +590,7 @@ class RebalancerTest : public testing::TestWithParam<Network::Address::IpVersion
virtual_listener_config.mutable_bind_to_port()->set_value(false);
virtual_listener_config.set_name("balanced_target_listener");
virtual_listener_config.mutable_connection_balance_config()->mutable_exact_balance();

// TODO(lambdai): Replace by getLoopbackAddressUrlString to emulate the real world.
*virtual_listener_config.mutable_address()->mutable_socket_address()->mutable_address() =
"127.0.0.2";
*virtual_listener_config.mutable_stat_prefix() = target_listener_prefix_;
virtual_listener_config.mutable_address()->mutable_socket_address()->set_port_value(80);
});
BaseIntegrationTest::initialize();
Expand All @@ -604,14 +606,66 @@ class RebalancerTest : public testing::TestWithParam<Network::Address::IpVersion
},
version_, *dispatcher_);
}
};

struct PerConnection {
std::string response_;
std::unique_ptr<RawConnectionDriver> client_conn_;
FakeRawConnectionPtr upstream_conn_;
void verifyBalance(uint32_t repeats = 10) {
// The balancer is balanced as per active connection instead of total connection.
// The below vector maintains all the connections alive.
std::vector<PerConnection> connections;
for (uint32_t i = 0; i < repeats * concurrency_; ++i) {
connections.emplace_back();
connections.back().client_conn_ =
createConnectionAndWrite("dummy", connections.back().response_);
connections.back().client_conn_->waitForConnection();
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(connections.back().upstream_conn_));
}
for (auto& conn : connections) {
conn.client_conn_->close();
while (!conn.client_conn_->closed()) {
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
}
}
ASSERT_EQ(TestUtility::findCounter(test_server_->statStore(),
absl::StrCat("listener.", target_listener_prefix_,
".worker_0.downstream_cx_total"))
->value(),
repeats);
ASSERT_EQ(TestUtility::findCounter(test_server_->statStore(),
absl::StrCat("listener.", target_listener_prefix_,
".worker_1.downstream_cx_total"))
->value(),
repeats);
}

// The stats prefix that shared by ipv6 and ipv4 listener.
std::string target_listener_prefix_{"balanced_listener"};
};

TEST_P(RebalancerTest, BindToPortUpdate) {
concurrency_ = 2;
initialize();

ConfigHelper new_config_helper(
version_, *api_, MessageUtil::getJsonStringFromMessageOrDie(config_helper_.bootstrap()));

new_config_helper.addConfigModifier([&](envoy::config::bootstrap::v3::Bootstrap& bootstrap)
-> void {
// This virtual listener need updating.
auto& virtual_listener_config = *bootstrap.mutable_static_resources()->mutable_listeners(1);
*virtual_listener_config.mutable_address()->mutable_socket_address()->mutable_address() =
bootstrap.static_resources().listeners(0).address().socket_address().address();
(*(*virtual_listener_config.mutable_metadata()->mutable_filter_metadata())["random_filter_name"]
.mutable_fields())["random_key"]
.set_number_value(2);
});
// Create an LDS response with the new config, and reload config.
new_config_helper.setLds("1");

test_server_->waitForCounterEq("listener_manager.listener_modified", 1);
test_server_->waitForGaugeEq("listener_manager.total_listeners_draining", 0);

verifyBalance();
}

// Verify the connections are distributed evenly on the 2 worker threads of the redirected
// listener.
// Currently flaky because the virtual listener create listen socket anyway despite the socket is
Expand All @@ -620,36 +674,8 @@ TEST_P(RebalancerTest, DISABLED_RedirectConnectionIsBalancedOnDestinationListene
auto ip_address_str =
Network::Test::getLoopbackAddressUrlString(TestEnvironment::getIpVersionsForTest().front());
concurrency_ = 2;
int repeats = 10;
initialize();

// The balancer is balanced as per active connection instead of total connection.
// The below vector maintains all the connections alive.
std::vector<PerConnection> connections;
for (uint32_t i = 0; i < repeats * concurrency_; ++i) {
connections.emplace_back();
connections.back().client_conn_ =
createConnectionAndWrite("dummy", connections.back().response_);
connections.back().client_conn_->waitForConnection();
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(connections.back().upstream_conn_));
}
for (auto& conn : connections) {
conn.client_conn_->close();
while (!conn.client_conn_->closed()) {
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
}
}

ASSERT_EQ(TestUtility::findCounter(
test_server_->statStore(),
absl::StrCat("listener.", ip_address_str, "_80.worker_0.downstream_cx_total"))
->value(),
repeats);
ASSERT_EQ(TestUtility::findCounter(
test_server_->statStore(),
absl::StrCat("listener.", ip_address_str, "_80.worker_1.downstream_cx_total"))
->value(),
repeats);
verifyBalance();
}

INSTANTIATE_TEST_SUITE_P(IpVersions, RebalancerTest,
Expand Down
2 changes: 1 addition & 1 deletion test/per_file_coverage.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ declare -a KNOWN_LOW_COVERAGE=(
"source/common/http:96.5"
"source/common/json:90.1"
"source/common/matcher:94.2"
"source/common/network:94.8" # Flaky, `activateFileEvents`, `startSecureTransport` and `ioctl` do not always report LCOV
"source/common/network:94.4" # Flaky, `activateFileEvents`, `startSecureTransport` and `ioctl`, listener_socket do not always report LCOV
"source/common/protobuf:95.3"
"source/common/quic:91.8"
"source/common/secret:96.3"
Expand Down