Skip to content
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
d712a58
socket: add get sockopt for connect
lambdai Aug 13, 2021
1217008
add inteneral listener
lambdai Aug 18, 2021
b673747
add inteneral listener
lambdai Aug 18, 2021
bc6e8a3
Merge branch 'main' into ilisteneronconnect
lambdai Sep 1, 2021
0361f4a
merge master
lambdai Sep 2, 2021
c5e1131
clean up
lambdai Sep 13, 2021
713efa5
Merge branch 'main' into ilisteneronconnect
lambdai Sep 13, 2021
da47f12
revert stub for advanced integration test
lambdai Sep 13, 2021
427efc1
cleanup
lambdai Sep 29, 2021
d17804a
Merge branch 'main' into ilisteneronconnect
lambdai Sep 29, 2021
a6aca89
more
lambdai Sep 29, 2021
6361da0
add new listen socket type for internal listener
lambdai Oct 5, 2021
6d50f2a
WIP: listener address mutated to ip
lambdai Oct 5, 2021
254a12f
add integration test
lambdai Oct 5, 2021
131eaa9
listener: fix duplicate and isOpen of the listener socket
lambdai Oct 4, 2021
cb8f53d
add integration test
lambdai Oct 5, 2021
dc4116e
stash
lambdai Oct 6, 2021
f5692cf
add new test that destroy listener when connections are alive
lambdai Oct 6, 2021
09d023c
add test coverage
lambdai Oct 6, 2021
84300f3
clear duplicated runtime value
lambdai Oct 6, 2021
1b7d26b
sanitize dep
lambdai Oct 6, 2021
c58e7d1
few dep
lambdai Oct 7, 2021
f8379de
improve coverage
lambdai Oct 27, 2021
a77afb0
Merge branch 'main' into ilisteneronconnect
lambdai Oct 27, 2021
f6c8fd8
merge fix
lambdai Oct 27, 2021
7af21cf
Merge remote-tracking branch 'me/ilisteneronconnect' into ilisteneron…
lambdai Oct 27, 2021
a9d5891
fix format
lambdai Oct 27, 2021
d565405
another try windows release assert
lambdai Oct 27, 2021
1605a0d
testcov
lambdai Oct 28, 2021
e7f0e50
another death at windows try
lambdai Oct 28, 2021
46d97f5
Merge branch 'main' into ilisteneronconnect
lambdai Nov 1, 2021
c4b74e8
ASSERT to ASSERT_TRUE in test
lambdai Nov 1, 2021
41401f4
typo
lambdai Nov 2, 2021
178b70e
revert a test config
lambdai Nov 2, 2021
3f6407b
address comment
lambdai Nov 9, 2021
18a4d07
add test case and address naming
lambdai Nov 12, 2021
f5efb17
more validation
lambdai Nov 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,16 @@ class UdpListenerConfig {

using UdpListenerConfigOptRef = OptRef<UdpListenerConfig>;

/**
* Configuration for an internal listener.
*/
class InternalListenerConfig {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this isn't used yet for anything. Delete until it's needed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used by the ConnectionHandler to determine if the listener type is Internal.
I feel this is cleaner than find the listen address and query the address type then dispatch by the address type.

In fact I almost make the ConnectionHandler not relying on the address type.

public:
virtual ~InternalListenerConfig() = default;
};

using InternalListenerConfigOptRef = OptRef<InternalListenerConfig>;

/**
* A configuration for an individual listener.
*/
Expand Down Expand Up @@ -185,6 +195,11 @@ class ListenerConfig {
*/
virtual UdpListenerConfigOptRef udpListenerConfig() PURE;

/**
* @return the internal configuration for the listener IFF it is an internal listener.
*/
virtual InternalListenerConfigOptRef internalListenerConfig() PURE;

/**
* @return traffic direction of the listener.
*/
Expand Down Expand Up @@ -428,6 +443,46 @@ class UdpListener : public virtual Listener {

using UdpListenerPtr = std::unique_ptr<UdpListener>;

/**
* Internal listener callbacks.
*/
class InternalListenerCallbacks {
Comment thread
lambdai marked this conversation as resolved.
Outdated
public:
virtual ~InternalListenerCallbacks() = default;

/**
* Called when a new connection is accepted.
* @param socket supplies the socket that is moved into the callee.
*/
virtual void onAccept(ConnectionSocketPtr&& socket) PURE;
};
using InternalListenerCallbacksOptRef =
absl::optional<std::reference_wrapper<InternalListenerCallbacks>>;
Comment thread
lambdai marked this conversation as resolved.
Outdated

class InternalListener {};
Comment thread
lambdai marked this conversation as resolved.
Outdated

using InternalListenerPtr = std::unique_ptr<InternalListener>;
using InternalListenerOptRef = absl::optional<std::reference_wrapper<InternalListener>>;
Comment thread
lambdai marked this conversation as resolved.
Outdated

/**
* The query interface of the registered internal listener callbacks.
*/
class InternalListenerManager {
public:
virtual ~InternalListenerManager() = default;

/**
* Return the internal listener callbacks binding the listener address.
*
* @param listen_address the internal address of the expected internal listener.
*/
virtual InternalListenerCallbacksOptRef
findByAddress(const Address::InstanceConstSharedPtr& listen_address) PURE;
};

using InternalListenerManagerOptRef =
absl::optional<std::reference_wrapper<InternalListenerManager>>;

/**
* Handles delivering datagrams to the correct worker.
*/
Expand Down
1 change: 1 addition & 0 deletions source/common/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ envoy_cc_library(
hdrs = ["connection_impl_base.h"],
deps = [
":filter_manager_lib",
":listen_socket_lib",
"//envoy/common:scope_tracker_interface",
"//envoy/event:dispatcher_interface",
"//source/common/common:assert_lib",
Expand Down
19 changes: 19 additions & 0 deletions source/common/network/listen_socket_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,25 @@ class UdsListenSocket : public ListenSocketImpl {
Socket::Type socketType() const override { return Socket::Type::Stream; }
};

// This socket type adapts the ListenerComponentFactory.
class InternalListenSocket : public ListenSocketImpl {
Comment thread
lambdai marked this conversation as resolved.
public:
InternalListenSocket(const Address::InstanceConstSharedPtr& address)
: ListenSocketImpl(/* io_handle= */ nullptr, address) {}
Socket::Type socketType() const override { return Socket::Type::Stream; }

// InternalListenSocket cannot be duplicated.
SocketPtr duplicate() override {
return std::make_unique<InternalListenSocket>(connectionInfoProvider().localAddress());
}

void close() override { ASSERT(io_handle_ == nullptr); }
bool isOpen() const override {
ASSERT(io_handle_ == nullptr);
return false;
Comment thread
lambdai marked this conversation as resolved.
}
};

class ConnectionSocketImpl : public SocketImpl, public ConnectionSocket {
public:
ConnectionSocketImpl(IoHandlePtr&& io_handle,
Expand Down
5 changes: 5 additions & 0 deletions source/common/network/socket_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ SocketImpl::SocketImpl(IoHandlePtr&& io_handle,
return;
}

if (connection_info_provider_->remoteAddress() != nullptr) {
addr_type_ = connection_info_provider_->remoteAddress()->type();
return;
}

// Should not happen but some tests inject -1 fds
if (!io_handle_->isOpen()) {
return;
Expand Down
3 changes: 3 additions & 0 deletions source/common/network/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,9 @@ Utility::protobufAddressSocketType(const envoy::config::core::v3::Address& proto
}
case envoy::config::core::v3::Address::AddressCase::kPipe:
return Socket::Type::Stream;
case envoy::config::core::v3::Address::AddressCase::kEnvoyInternalAddress:
// Currently internal address supports stream operation only.
return Socket::Type::Stream;
default:
NOT_REACHED_GCOVR_EXCL_LINE;
}
Expand Down
1 change: 1 addition & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ constexpr const char* runtime_features[] = {
"envoy.reloadable_features.http_reject_path_with_fragment",
"envoy.reloadable_features.http_strip_fragment_from_path_unsafe_if_disabled",
"envoy.reloadable_features.http_transport_failure_reason_in_body",
"envoy.reloadable_features.internal_address",
"envoy.reloadable_features.internal_redirects_with_body",
"envoy.reloadable_features.listener_reuse_port_default_enabled",
"envoy.reloadable_features.listener_wildcard_match_ip_family",
Expand Down
3 changes: 3 additions & 0 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,8 @@ Network::FilterStatus Filter::onNewConnection() {
}

void Filter::onDownstreamEvent(Network::ConnectionEvent event) {
ENVOY_CONN_LOG(trace, "on downstream event {}, has upstream = {}", read_callbacks_->connection(),
static_cast<int>(event), upstream_ == nullptr);
if (upstream_) {
Tcp::ConnectionPool::ConnectionDataPtr conn_data(upstream_->onDownstreamEvent(event));
if (conn_data != nullptr &&
Expand Down Expand Up @@ -699,6 +701,7 @@ Drainer::Drainer(UpstreamDrainManager& parent, const Config::SharedConfigSharedP
const Upstream::HostDescriptionConstSharedPtr& upstream_host)
: parent_(parent), callbacks_(callbacks), upstream_conn_data_(std::move(conn_data)),
timer_(std::move(idle_timer)), upstream_host_(upstream_host), config_(config) {
ENVOY_CONN_LOG(debug, "draining the upstream connection", upstream_conn_data_->connection());
Comment thread
lambdai marked this conversation as resolved.
Outdated
config_->stats().upstream_flush_total_.inc();
config_->stats().upstream_flush_active_.inc();
}
Expand Down
2 changes: 1 addition & 1 deletion source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ class Filter : public Network::ReadFilter,
// This class deals with an upstream connection that needs to finish flushing, when the downstream
// connection has been closed. The TcpProxy is destroyed when the downstream connection is closed,
// so handling the upstream connection here allows it to finish draining or timeout.
class Drainer : public Event::DeferredDeletable {
class Drainer : public Event::DeferredDeletable, protected Logger::Loggable<Logger::Id::filter> {
public:
Drainer(UpstreamDrainManager& parent, const Config::SharedConfigSharedPtr& config,
const std::shared_ptr<Filter::UpstreamCallbacks>& callbacks,
Expand Down
5 changes: 4 additions & 1 deletion source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,10 @@ Network::ClientConnectionPtr HostImpl::createConnection(
} else {
connection_options = options;
}
ASSERT(!address->envoyInternalAddress());

ASSERT(!address->envoyInternalAddress() ||
Runtime::runtimeFeatureEnabled("envoy.reloadable_features.internal_address"));

Network::ClientConnectionPtr connection =
address_list.size() > 1
? std::make_unique<Network::HappyEyeballsConnectionImpl>(
Expand Down
32 changes: 32 additions & 0 deletions source/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ envoy_cc_library(
envoy_cc_library(
name = "connection_handler_lib",
deps = [
":active_internal_listener",
":active_tcp_listener",
":active_udp_listener",
":connection_handler_impl",
Expand All @@ -77,6 +78,7 @@ envoy_cc_library(
"connection_handler_impl.h",
],
deps = [
":active_internal_listener",
":active_tcp_listener",
"//envoy/common:time_interface",
"//envoy/event:deferred_deletable",
Expand Down Expand Up @@ -179,6 +181,36 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "active_internal_listener",
srcs = ["active_internal_listener.cc"],
hdrs = [
"active_internal_listener.h",
],
deps = [
":active_listener_base",
":active_tcp_listener_headers",
"//envoy/common:time_interface",
"//envoy/event:deferred_deletable",
"//envoy/event:dispatcher_interface",
"//envoy/event:timer_interface",
"//envoy/network:connection_handler_interface",
"//envoy/network:connection_interface",
"//envoy/network:exception_interface",
"//envoy/network:filter_interface",
"//envoy/network:listen_socket_interface",
"//envoy/network:listener_interface",
"//envoy/server:listener_manager_interface",
"//envoy/stats:timespan_interface",
"//source/common/common:linked_object",
"//source/common/common:non_copyable",
"//source/common/event:deferred_task",
"//source/common/network:connection_lib",
"//source/common/stats:timespan_lib",
"//source/common/stream_info:stream_info_lib",
],
)

envoy_cc_library(
name = "active_tcp_socket",
srcs = ["active_tcp_socket.cc"],
Expand Down
79 changes: 79 additions & 0 deletions source/server/active_internal_listener.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#include "source/server/active_internal_listener.h"

#include "envoy/network/filter.h"
#include "envoy/stats/scope.h"

#include "source/common/network/address_impl.h"
#include "source/common/stats/timespan_impl.h"

#include "active_stream_listener_base.h"

namespace Envoy {
namespace Server {

ActiveInternalListener::ActiveInternalListener(Network::ConnectionHandler& conn_handler,
Event::Dispatcher& dispatcher,
Network::ListenerConfig& config)
: OwnedActiveStreamListenerBase(
conn_handler, dispatcher,
std::make_unique<ActiveInternalListener::NetworkInternalListener>(), config) {}

ActiveInternalListener::ActiveInternalListener(Network::ConnectionHandler& conn_handler,
Event::Dispatcher& dispatcher,
Network::ListenerPtr listener,
Network::ListenerConfig& config)
: OwnedActiveStreamListenerBase(conn_handler, dispatcher, std::move(listener), config) {}

ActiveInternalListener::~ActiveInternalListener() {
is_deleting_ = true;
// Purge sockets that have not progressed to connections. This should only happen when
// a listener filter stops iteration and never resumes.
while (!sockets_.empty()) {
auto removed = sockets_.front()->removeFromList(sockets_);
dispatcher().deferredDelete(std::move(removed));
}

for (auto& [chain, active_connections] : connections_by_context_) {
ASSERT(active_connections != nullptr);
auto& connections = active_connections->connections_;
while (!connections.empty()) {
connections.front()->connection_->close(Network::ConnectionCloseType::NoFlush);
}
Comment thread
lambdai marked this conversation as resolved.
}
dispatcher().clearDeferredDeleteList();
}

void ActiveInternalListener::updateListenerConfig(Network::ListenerConfig& config) {
ENVOY_LOG(trace, "replacing listener ", config_->listenerTag(), " by ", config.listenerTag());
config_ = &config;
}

void ActiveInternalListener::onAccept(Network::ConnectionSocketPtr&& socket) {
// Unlike tcp listener, no rebalancer is applied and won't call pickTargetHandler to account
// connections.
incNumConnections();

auto active_socket = std::make_unique<ActiveTcpSocket>(
*this, std::move(socket), false /* do not handle off at internal listener */);
Comment thread
lambdai marked this conversation as resolved.
Outdated

onSocketAccepted(std::move(active_socket));
}

void ActiveInternalListener::newActiveConnection(
const Network::FilterChain& filter_chain, Network::ServerConnectionPtr server_conn_ptr,
std::unique_ptr<StreamInfo::StreamInfo> stream_info) {
auto& active_connections = getOrCreateActiveConnections(filter_chain);
auto active_connection =
std::make_unique<ActiveTcpConnection>(active_connections, std::move(server_conn_ptr),
dispatcher().timeSource(), std::move(stream_info));
// If the connection is already closed, we can just let this connection immediately die.
if (active_connection->connection_->state() != Network::Connection::State::Closed) {
ENVOY_CONN_LOG(
debug, "new connection from {}", *active_connection->connection_,
active_connection->connection_->connectionInfoProvider().remoteAddress()->asString());
active_connection->connection_->addConnectionCallbacks(*active_connection);
LinkedList::moveIntoList(std::move(active_connection), active_connections.connections_);
}
}
} // namespace Server
} // namespace Envoy
Loading