Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Bug Fixes
---------
*Changes expected to improve the state of the world and are unlikely to have negative effects*

* listener: fixed the crash when updating listeners that do not bind to port.
* thrift_proxy: fix the thrift_proxy connection manager to correctly report success/error response metrics when performing :ref:`payload passthrough <envoy_v3_api_field_extensions.filters.network.thrift_proxy.v3.ThriftProxy.payload_passthrough>`.

Removed Config or Runtime
Expand Down
22 changes: 20 additions & 2 deletions source/common/network/listen_socket_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ class ListenSocketImpl : public SocketImpl {
void setupSocket(const Network::Socket::OptionsSharedPtr& options);
void setListenSocketOptions(const Network::Socket::OptionsSharedPtr& options);
Api::SysCallIntResult bind(Network::Address::InstanceConstSharedPtr address) override;

void close() override {
if (io_handle_ != nullptr && io_handle_->isOpen()) {
io_handle_->close();
}
}
bool isOpen() const override { return io_handle_ != nullptr && io_handle_->isOpen(); }
};

/**
Expand Down Expand Up @@ -79,6 +86,16 @@ template <typename T> class NetworkListenSocket : public ListenSocketImpl {

Socket::Type socketType() const override { return T::type; }

SocketPtr duplicate() override {
if (io_handle_ == nullptr) {
// This is a listen socket that does not bind to port. Pass nullptr socket options.
return std::make_unique<NetworkListenSocket<T>>(connection_info_provider_->localAddress(),
/*options=*/nullptr, /*bind_to_port*/ false);
} else {
return ListenSocketImpl::duplicate();
}
}

// These four overrides are introduced to perform check. A null io handle is possible only if the
// the owner socket is a listen socket that does not bind to port.
IoHandle& ioHandle() override {
Expand All @@ -97,8 +114,9 @@ template <typename T> class NetworkListenSocket : public ListenSocketImpl {
}
}
bool isOpen() const override {
ASSERT(io_handle_ != nullptr);
return io_handle_->isOpen();
return io_handle_ == nullptr ? false // Consider listen socket as closed if it does not bind to
// port. No fd will leak.
: io_handle_->isOpen();
}

protected:
Expand Down
42 changes: 36 additions & 6 deletions test/common/network/listen_socket_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,15 @@ TEST(ConnectionSocketImplTest, LowerCaseRequestedServerName) {

template <Network::Socket::Type Type>
class ListenSocketImplTest : public testing::TestWithParam<Address::IpVersion> {
using ListenSocketType = NetworkListenSocket<NetworkSocketTrait<Type>>;

protected:
ListenSocketImplTest() : version_(GetParam()) {}
const Address::IpVersion version_;

template <typename... Args>
std::unique_ptr<ListenSocketImpl> createListenSocketPtr(Args&&... args) {
using NetworkSocketTraitType = NetworkSocketTrait<Type>;

return std::make_unique<NetworkListenSocket<NetworkSocketTraitType>>(
std::forward<Args>(args)...);
std::unique_ptr<ListenSocketType> createListenSocketPtr(Args&&... args) {
return std::make_unique<ListenSocketType>(std::forward<Args>(args)...);
}

void testBindSpecificPort() {
Expand Down Expand Up @@ -76,7 +75,7 @@ class ListenSocketImplTest : public testing::TestWithParam<Address::IpVersion> {
EXPECT_CALL(*option, setOption(_, envoy::config::core::v3::SocketOption::STATE_PREBIND))
.WillOnce(Return(true));
options->emplace_back(std::move(option));
std::unique_ptr<ListenSocketImpl> socket1;
std::unique_ptr<ListenSocketType> socket1;
try {
socket1 = createListenSocketPtr(addr, options, true);
} catch (SocketBindException& e) {
Expand Down Expand Up @@ -139,6 +138,19 @@ class ListenSocketImplTest : public testing::TestWithParam<Address::IpVersion> {
EXPECT_GT(socket->connectionInfoProvider().localAddress()->ip()->port(), 0U);
EXPECT_EQ(Type, socket->socketType());
}

// Verify that a listen sockets that do not bind to port can be duplicated and closed.
void testNotBindToPort() {
auto local_address = version_ == Address::IpVersion::v4 ? Utility::getIpv6AnyAddress()
: Utility::getIpv4AnyAddress();
auto socket = NetworkListenSocket<NetworkSocketTrait<Type>>(local_address, nullptr,
/*bind_to_port=*/false);
auto dup_socket = socket.duplicate();
EXPECT_FALSE(socket.isOpen());
EXPECT_FALSE(dup_socket->isOpen());
socket.close();
dup_socket->close();
}
};

using ListenSocketImplTestTcp = ListenSocketImplTest<Network::Socket::Type::Stream>;
Expand All @@ -162,9 +174,23 @@ class TestListenSocket : public ListenSocketImpl {
public:
TestListenSocket(Address::InstanceConstSharedPtr address)
: ListenSocketImpl(std::make_unique<Network::IoSocketHandleImpl>(), address) {}

TestListenSocket(Address::IpVersion ip_version)
: ListenSocketImpl(/*io_handle=*/nullptr, ip_version == Address::IpVersion::v4
? Utility::getIpv4AnyAddress()
: Utility::getIpv6AnyAddress()) {}
Socket::Type socketType() const override { return Socket::Type::Stream; }

bool isOpen() const override { return ListenSocketImpl::isOpen(); }
void close() override { ListenSocketImpl::close(); }
};

TEST_P(ListenSocketImplTestTcp, NonIoHandleListenSocket) {
TestListenSocket sock(version_);
EXPECT_FALSE(sock.isOpen());
sock.close();
}

TEST_P(ListenSocketImplTestTcp, SetLocalAddress) {
std::string address_str = "10.1.2.3";
if (version_ == Address::IpVersion::v6) {
Expand Down Expand Up @@ -228,6 +254,10 @@ TEST_P(ListenSocketImplTestTcp, BindPortZero) { testBindPortZero(); }

TEST_P(ListenSocketImplTestUdp, BindPortZero) { testBindPortZero(); }

TEST_P(ListenSocketImplTestTcp, NotBindToPortAccess) { testNotBindToPort(); }

TEST_P(ListenSocketImplTestUdp, NotBindToPortAccess) { testNotBindToPort(); }

} // namespace
} // namespace Network
} // namespace Envoy
12 changes: 10 additions & 2 deletions test/integration/filters/address_restore_listener_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,22 @@
namespace Envoy {

// The FakeOriginalDstListenerFilter restore desired local address without the dependency of OS.
// Ipv6 and Ipv4 addresses are restored to the corresponding loopback ip address and port 80.
class FakeOriginalDstListenerFilter : public Network::ListenerFilter {
public:
// Network::ListenerFilter
Network::FilterStatus onAccept(Network::ListenerFilterCallbacks& cb) override {
FANCY_LOG(debug, "in FakeOriginalDstListenerFilter::onAccept");
Network::ConnectionSocket& socket = cb.socket();
socket.connectionInfoProvider().restoreLocalAddress(
std::make_shared<Network::Address::Ipv4Instance>("127.0.0.2", 80));
auto local_address = socket.connectionInfoProvider().localAddress();
if (local_address != nullptr &&
local_address->ip()->version() == Network::Address::IpVersion::v6) {
socket.connectionInfoProvider().restoreLocalAddress(
std::make_shared<Network::Address::Ipv6Instance>("::1", 80));
} else {
socket.connectionInfoProvider().restoreLocalAddress(
std::make_shared<Network::Address::Ipv4Instance>("127.0.0.1", 80));
}
FANCY_LOG(debug, "current local socket address is {} restored = {}",
socket.connectionInfoProvider().localAddress()->asString(),
socket.connectionInfoProvider().localAddressRestored());
Expand Down
102 changes: 64 additions & 38 deletions test/integration/listener_lds_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,11 @@ TEST_P(ListenerIntegrationTest, ChangeListenerAddress) {
EXPECT_EQ(request_size, upstream_request_->bodyLength());
}

struct PerConnection {
std::string response_;
std::unique_ptr<RawConnectionDriver> client_conn_;
FakeRawConnectionPtr upstream_conn_;
};
class RebalancerTest : public testing::TestWithParam<Network::Address::IpVersion>,
public BaseIntegrationTest {
public:
Expand Down Expand Up @@ -585,10 +590,7 @@ class RebalancerTest : public testing::TestWithParam<Network::Address::IpVersion
virtual_listener_config.mutable_bind_to_port()->set_value(false);
virtual_listener_config.set_name("balanced_target_listener");
virtual_listener_config.mutable_connection_balance_config()->mutable_exact_balance();

// TODO(lambdai): Replace by getLoopbackAddressUrlString to emulate the real world.
*virtual_listener_config.mutable_address()->mutable_socket_address()->mutable_address() =
"127.0.0.2";
*virtual_listener_config.mutable_stat_prefix() = target_listener_prefix_;
virtual_listener_config.mutable_address()->mutable_socket_address()->set_port_value(80);
});
BaseIntegrationTest::initialize();
Expand All @@ -604,14 +606,66 @@ class RebalancerTest : public testing::TestWithParam<Network::Address::IpVersion
},
version_, *dispatcher_);
}
};

struct PerConnection {
std::string response_;
std::unique_ptr<RawConnectionDriver> client_conn_;
FakeRawConnectionPtr upstream_conn_;
void verifyBalance(uint32_t repeats = 10) {
// The balancer is balanced as per active connection instead of total connection.
// The below vector maintains all the connections alive.
std::vector<PerConnection> connections;
for (uint32_t i = 0; i < repeats * concurrency_; ++i) {
connections.emplace_back();
connections.back().client_conn_ =
createConnectionAndWrite("dummy", connections.back().response_);
connections.back().client_conn_->waitForConnection();
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(connections.back().upstream_conn_));
}
for (auto& conn : connections) {
conn.client_conn_->close();
while (!conn.client_conn_->closed()) {
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
}
}
ASSERT_EQ(TestUtility::findCounter(test_server_->statStore(),
absl::StrCat("listener.", target_listener_prefix_,
".worker_0.downstream_cx_total"))
->value(),
repeats);
ASSERT_EQ(TestUtility::findCounter(test_server_->statStore(),
absl::StrCat("listener.", target_listener_prefix_,
".worker_1.downstream_cx_total"))
->value(),
repeats);
}

// The stats prefix that shared by ipv6 and ipv4 listener.
std::string target_listener_prefix_{"balanced_listener"};
};

TEST_P(RebalancerTest, BindToPortUpdate) {
concurrency_ = 2;
initialize();

ConfigHelper new_config_helper(
version_, *api_, MessageUtil::getJsonStringFromMessageOrDie(config_helper_.bootstrap()));

new_config_helper.addConfigModifier([&](envoy::config::bootstrap::v3::Bootstrap& bootstrap)
-> void {
// This virtual listener need updating.
auto& virtual_listener_config = *bootstrap.mutable_static_resources()->mutable_listeners(1);
*virtual_listener_config.mutable_address()->mutable_socket_address()->mutable_address() =
bootstrap.static_resources().listeners(0).address().socket_address().address();
(*(*virtual_listener_config.mutable_metadata()->mutable_filter_metadata())["random_filter_name"]
.mutable_fields())["random_key"]
.set_number_value(2);
});
// Create an LDS response with the new config, and reload config.
new_config_helper.setLds("1");

test_server_->waitForCounterEq("listener_manager.listener_modified", 1);
test_server_->waitForGaugeEq("listener_manager.total_listeners_draining", 0);

verifyBalance();
}

// Verify the connections are distributed evenly on the 2 worker threads of the redirected
// listener.
// Currently flaky because the virtual listener create listen socket anyway despite the socket is
Expand All @@ -620,36 +674,8 @@ TEST_P(RebalancerTest, DISABLED_RedirectConnectionIsBalancedOnDestinationListene
auto ip_address_str =
Network::Test::getLoopbackAddressUrlString(TestEnvironment::getIpVersionsForTest().front());
concurrency_ = 2;
int repeats = 10;
initialize();

// The balancer is balanced as per active connection instead of total connection.
// The below vector maintains all the connections alive.
std::vector<PerConnection> connections;
for (uint32_t i = 0; i < repeats * concurrency_; ++i) {
connections.emplace_back();
connections.back().client_conn_ =
createConnectionAndWrite("dummy", connections.back().response_);
connections.back().client_conn_->waitForConnection();
ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(connections.back().upstream_conn_));
}
for (auto& conn : connections) {
conn.client_conn_->close();
while (!conn.client_conn_->closed()) {
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
}
}

ASSERT_EQ(TestUtility::findCounter(
test_server_->statStore(),
absl::StrCat("listener.", ip_address_str, "_80.worker_0.downstream_cx_total"))
->value(),
repeats);
ASSERT_EQ(TestUtility::findCounter(
test_server_->statStore(),
absl::StrCat("listener.", ip_address_str, "_80.worker_1.downstream_cx_total"))
->value(),
repeats);
verifyBalance();
}

INSTANTIATE_TEST_SUITE_P(IpVersions, RebalancerTest,
Expand Down
2 changes: 1 addition & 1 deletion test/per_file_coverage.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ declare -a KNOWN_LOW_COVERAGE=(
"source/common/http:96.5"
"source/common/json:90.1"
"source/common/matcher:94.2"
"source/common/network:94.8" # Flaky, `activateFileEvents`, `startSecureTransport` and `ioctl` do not always report LCOV
"source/common/network:94.4" # Flaky, `activateFileEvents`, `startSecureTransport` and `ioctl`, listener_socket do not always report LCOV
"source/common/protobuf:95.3"
"source/common/quic:91.8"
"source/common/secret:96.3"
Expand Down