Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
cb82711
initial internal listener
lambdai May 20, 2021
4c156ce
for new pr
lambdai Jun 1, 2021
773c75a
merge main
lambdai Jun 2, 2021
b6ff892
delete test/integration/internal_tcp_proxy_integration_test.cc
lambdai Jun 2, 2021
635f185
coverage
lambdai Jun 2, 2021
3615512
remove ClientConnectionImpl ctor for internal socket
lambdai Jun 2, 2021
efe9ce7
revert dispatcher
lambdai Jun 9, 2021
01ee5bb
merged main
lambdai Jun 10, 2021
1564151
revert internal listener
lambdai Jun 11, 2021
9e5b2b4
pragma once
lambdai Jun 11, 2021
e802a6f
test bootstrap
lambdai Jun 11, 2021
c8e4e43
comment other than incNumConnections
lambdai Jun 15, 2021
aac23c1
rename
lambdai Jun 15, 2021
573cdfe
put cleanup, deferdelete in base
lambdai Jun 17, 2021
c1f7921
push down conn collection or active conn op
lambdai Jun 17, 2021
0528d50
Merge branch 'main' into addinternallistener_pre_1
lambdai Jul 1, 2021
0b3c2f0
Merge branch 'addinternallistener_pre_1' of github.com:lambdai/envoy-…
lambdai Jul 1, 2021
bbee979
tmpaddinternallistener_pre_1
lambdai Jul 1, 2021
57ca809
fix format
lambdai Jul 2, 2021
4523c99
address comment
lambdai Jul 2, 2021
dc08e69
Merge remote-tracking branch 'origin/main' into addinternallistener_p…
lambdai Jul 7, 2021
cd7d6e6
Merge branch 'main' into addinternallistener_pre_1on2
lambdai Jul 23, 2021
e724e22
reorg
lambdai Jul 23, 2021
bb46ea5
format
lambdai Jul 26, 2021
942fb9b
more protected
lambdai Jul 26, 2021
aeb163a
add comment
lambdai Jul 27, 2021
056f51a
comment to TypedActiveStreamListenerBase
lambdai Jul 27, 2021
f0fde02
final
lambdai Jul 28, 2021
bfdf05a
another method move
lambdai Jul 28, 2021
02e618c
Merge branch 'main' into addinternallistener_pre_1
lambdai Jul 30, 2021
bd122fa
Merge branch 'main' into addinternallistener_pre_1
lambdai Jul 30, 2021
2f406f5
Merge remote-tracking branch 'me/addinternallistener_pre_1' into addi…
lambdai Aug 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions source/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ envoy_cc_library(
envoy_cc_library(
name = "connection_handler_lib",
deps = [
"//source/server:active_tcp_listener",
"//source/server:active_udp_listener",
"//source/server:connection_handler_impl",
":active_tcp_listener",
":active_udp_listener",
":connection_handler_impl",
],
)

Expand Down Expand Up @@ -116,6 +116,7 @@ envoy_cc_library(
"active_tcp_listener.h",
],
deps = [
":active_stream_socket",
"//envoy/common:time_interface",
"//envoy/event:deferred_deletable",
"//envoy/event:dispatcher_interface",
Expand Down Expand Up @@ -157,6 +158,35 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "active_stream_socket",
Comment thread
lambdai marked this conversation as resolved.
Outdated
srcs = ["active_tcp_socket.cc"],
hdrs = [
"active_tcp_socket.h",
],
deps = [
":active_listener_base",
"//envoy/common:time_interface",
"//envoy/event:deferred_deletable",
"//envoy/event:dispatcher_interface",
"//envoy/event:timer_interface",
"//envoy/network:connection_handler_interface",
"//envoy/network:connection_interface",
"//envoy/network:exception_interface",
"//envoy/network:filter_interface",
"//envoy/network:listen_socket_interface",
"//envoy/network:listener_interface",
"//envoy/server:listener_manager_interface",
"//envoy/stats:timespan_interface",
"//source/common/common:linked_object",
"//source/common/common:non_copyable",
"//source/common/event:deferred_task",
"//source/common/network:connection_lib",
"//source/common/stats:timespan_lib",
"//source/common/stream_info:stream_info_lib",
],
)

envoy_cc_library(
name = "drain_manager_lib",
srcs = ["drain_manager_impl.cc"],
Expand Down
231 changes: 35 additions & 196 deletions source/server/active_tcp_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,51 +16,30 @@
namespace Envoy {
namespace Server {

namespace {
void emitLogs(Network::ListenerConfig& config, StreamInfo::StreamInfo& stream_info) {
stream_info.onRequestComplete();
for (const auto& access_log : config.accessLogs()) {
access_log->log(nullptr, nullptr, nullptr, stream_info);
}
}
} // namespace

ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent,
Network::ListenerConfig& config)
: ActiveTcpListener(
parent,
: TypedActiveStreamListenerBase<ActiveTcpConnection>(
parent, parent.dispatcher(),
parent.dispatcher().createListener(config.listenSocketFactory().getListenSocket(), *this,
config.bindToPort(), config.tcpBacklogSize()),
config) {}
config),
tcp_conn_handler_(parent) {
config.connectionBalancer().registerHandler(*this);
}

ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent,
Network::ListenerPtr&& listener,
Network::ListenerConfig& config)
: ActiveListenerImplBase(parent, &config), parent_(parent), listener_(std::move(listener)),
listener_filters_timeout_(config.listenerFiltersTimeout()),
continue_on_listener_filters_timeout_(config.continueOnListenerFiltersTimeout()) {
: TypedActiveStreamListenerBase<ActiveTcpConnection>(parent, parent.dispatcher(),
std::move(listener), config),
tcp_conn_handler_(parent) {
config.connectionBalancer().registerHandler(*this);
}

ActiveTcpListener::~ActiveTcpListener() {
is_deleting_ = true;
config_->connectionBalancer().unregisterHandler(*this);

// Purge sockets that have not progressed to connections. This should only happen when
// a listener filter stops iteration and never resumes.
while (!sockets_.empty()) {
ActiveTcpSocketPtr removed = sockets_.front()->removeFromList(sockets_);
parent_.dispatcher().deferredDelete(std::move(removed));
}

for (auto& chain_and_connections : connections_by_context_) {
ASSERT(chain_and_connections.second != nullptr);
auto& connections = chain_and_connections.second->connections_;
while (!connections.empty()) {
connections.front()->connection_->close(Network::ConnectionCloseType::NoFlush);
}
}
parent_.dispatcher().clearDeferredDeleteList();
cleanupConnections();
Comment thread
lambdai marked this conversation as resolved.
Outdated

// By the time a listener is destroyed, in the common case, there should be no connections.
// However, this is not always true if there is an in flight rebalanced connection that is
Expand All @@ -76,14 +55,14 @@ void ActiveTcpListener::removeConnection(ActiveTcpConnection& connection) {
ENVOY_CONN_LOG(debug, "adding to cleanup list", *connection.connection_);
ActiveConnections& active_connections = connection.active_connections_;
ActiveTcpConnectionPtr removed = connection.removeFromList(active_connections.connections_);
parent_.dispatcher().deferredDelete(std::move(removed));
dispatcher().deferredDelete(std::move(removed));
// Delete map entry only iff connections becomes empty.
if (active_connections.connections_.empty()) {
auto iter = connections_by_context_.find(&active_connections.filter_chain_);
ASSERT(iter != connections_by_context_.end());
// To cover the lifetime of every single connection, Connections need to be deferred deleted
// because the previously contained connection is deferred deleted.
parent_.dispatcher().deferredDelete(std::move(iter->second));
dispatcher().deferredDelete(std::move(iter->second));
// The erase will break the iteration over the connections_by_context_ during the deletion.
if (!is_deleting_) {
connections_by_context_.erase(iter);
Expand All @@ -97,118 +76,6 @@ void ActiveTcpListener::updateListenerConfig(Network::ListenerConfig& config) {
config_ = &config;
}

void ActiveTcpSocket::onTimeout() {
listener_.stats_.downstream_pre_cx_timeout_.inc();
ASSERT(inserted());
ENVOY_LOG(debug, "listener filter times out after {} ms",
listener_.listener_filters_timeout_.count());

if (listener_.continue_on_listener_filters_timeout_) {
ENVOY_LOG(debug, "fallback to default listener filter");
newConnection();
}
unlink();
}

void ActiveTcpSocket::startTimer() {
if (listener_.listener_filters_timeout_.count() > 0) {
timer_ = listener_.parent_.dispatcher().createTimer([this]() -> void { onTimeout(); });
timer_->enableTimer(listener_.listener_filters_timeout_);
}
}

void ActiveTcpSocket::unlink() {
ActiveTcpSocketPtr removed = removeFromList(listener_.sockets_);
if (removed->timer_ != nullptr) {
removed->timer_->disableTimer();
}
// Emit logs if a connection is not established.
if (!connected_) {
emitLogs(*listener_.config_, *stream_info_);
}
listener_.parent_.dispatcher().deferredDelete(std::move(removed));
}

void ActiveTcpSocket::continueFilterChain(bool success) {
if (success) {
bool no_error = true;
if (iter_ == accept_filters_.end()) {
iter_ = accept_filters_.begin();
} else {
iter_ = std::next(iter_);
}

for (; iter_ != accept_filters_.end(); iter_++) {
Network::FilterStatus status = (*iter_)->onAccept(*this);
if (status == Network::FilterStatus::StopIteration) {
// The filter is responsible for calling us again at a later time to continue the filter
// chain from the next filter.
if (!socket().ioHandle().isOpen()) {
// break the loop but should not create new connection
no_error = false;
break;
} else {
// Blocking at the filter but no error
return;
}
}
}
// Successfully ran all the accept filters.
if (no_error) {
newConnection();
} else {
// Signal the caller that no extra filter chain iteration is needed.
iter_ = accept_filters_.end();
}
}

// Filter execution concluded, unlink and delete this ActiveTcpSocket if it was linked.
if (inserted()) {
unlink();
}
}

void ActiveTcpSocket::setDynamicMetadata(const std::string& name,
const ProtobufWkt::Struct& value) {
stream_info_->setDynamicMetadata(name, value);
}

void ActiveTcpSocket::newConnection() {
connected_ = true;

// Check if the socket may need to be redirected to another listener.
Network::BalancedConnectionHandlerOptRef new_listener;

if (hand_off_restored_destination_connections_ &&
socket_->addressProvider().localAddressRestored()) {
// Find a listener associated with the original destination address.
new_listener =
listener_.parent_.getBalancedHandlerByAddress(*socket_->addressProvider().localAddress());
}
if (new_listener.has_value()) {
// Hands off connections redirected by iptables to the listener associated with the
// original destination address. Pass 'hand_off_restored_destination_connections' as false to
// prevent further redirection.
// Leave the new listener to decide whether to execute re-balance.
// Note also that we must account for the number of connections properly across both listeners.
// TODO(mattklein123): See note in ~ActiveTcpSocket() related to making this accounting better.
listener_.decNumConnections();
new_listener.value().get().onAcceptWorker(std::move(socket_), false, false);
} else {
// Set default transport protocol if none of the listener filters did it.
if (socket_->detectedTransportProtocol().empty()) {
socket_->setDetectedTransportProtocol("raw_buffer");
}
// TODO(lambdai): add integration test
// TODO: Address issues in wider scope. See https://github.com/envoyproxy/envoy/issues/8925
// Erase accept filter states because accept filters may not get the opportunity to clean up.
// Particularly the assigned events need to reset before assigning new events in the follow up.
accept_filters_.clear();
// Create a new connection on this listener.
listener_.newConnection(std::move(socket_), std::move(stream_info_));
}
}

void ActiveTcpListener::onAccept(Network::ConnectionSocketPtr&& socket) {
if (listenerConnectionLimitReached()) {
RELEASE_ASSERT(socket->addressProvider().remoteAddress() != nullptr, "");
Expand Down Expand Up @@ -248,28 +115,12 @@ void ActiveTcpListener::onAcceptWorker(Network::ConnectionSocketPtr&& socket,
auto active_socket = std::make_unique<ActiveTcpSocket>(*this, std::move(socket),
hand_off_restored_destination_connections);

// Create and run the filters.
config_->filterChainFactory().createListenerFilterChain(*active_socket);
active_socket->continueFilterChain(true);

// Move active_socket to the sockets_ list if filter iteration needs to continue later.
// Otherwise we let active_socket be destructed when it goes out of scope.
if (active_socket->iter_ != active_socket->accept_filters_.end()) {
active_socket->startTimer();
LinkedList::moveIntoListBack(std::move(active_socket), sockets_);
} else {
// If active_socket is about to be destructed, emit logs if a connection is not created.
if (!active_socket->connected_) {
if (active_socket->stream_info_ != nullptr) {
emitLogs(*config_, *active_socket->stream_info_);
} else {
// If the active_socket is not connected, this socket is not promoted to active connection.
// Thus the stream_info_ is owned by this active socket.
ENVOY_BUG(active_socket->stream_info_ != nullptr,
"the unconnected active socket must have stream info.");
}
}
}
onSocketAccepted(std::move(active_socket));
}

Network::BalancedConnectionHandlerOptRef
ActiveTcpListener::getBalancedHandlerByAddress(const Network::Address::Instance& address) {
return tcp_conn_handler_.getBalancedHandlerByAddress(address);
}

void ActiveTcpListener::pauseListening() {
Expand Down Expand Up @@ -305,7 +156,7 @@ void ActiveTcpListener::newConnection(Network::ConnectionSocketPtr&& socket,
auto transport_socket = filter_chain->transportSocketFactory().createTransportSocket(nullptr);
stream_info->setDownstreamSslConnection(transport_socket->ssl());
auto& active_connections = getOrCreateActiveConnections(*filter_chain);
auto server_conn_ptr = parent_.dispatcher().createServerConnection(
auto server_conn_ptr = dispatcher().createServerConnection(
std::move(socket), std::move(transport_socket), *stream_info);
if (const auto timeout = filter_chain->transportSocketConnectTimeout();
timeout != std::chrono::milliseconds::zero()) {
Expand All @@ -314,7 +165,7 @@ void ActiveTcpListener::newConnection(Network::ConnectionSocketPtr&& socket,

ActiveTcpConnectionPtr active_connection(
new ActiveTcpConnection(active_connections, std::move(server_conn_ptr),
parent_.dispatcher().timeSource(), std::move(stream_info)));
dispatcher().timeSource(), std::move(stream_info)));
active_connection->connection_->setBufferLimits(config_->perConnectionBufferLimitBytes());

RELEASE_ASSERT(active_connection->connection_->addressProvider().remoteAddress() != nullptr, "");
Expand Down Expand Up @@ -345,29 +196,6 @@ ActiveTcpListener::getOrCreateActiveConnections(const Network::FilterChain& filt
return *connections;
}

void ActiveTcpListener::deferredRemoveFilterChains(
const std::list<const Network::FilterChain*>& draining_filter_chains) {
// Need to recover the original deleting state.
const bool was_deleting = is_deleting_;
is_deleting_ = true;
for (const auto* filter_chain : draining_filter_chains) {
auto iter = connections_by_context_.find(filter_chain);
if (iter == connections_by_context_.end()) {
// It is possible when listener is stopping.
} else {
auto& connections = iter->second->connections_;
while (!connections.empty()) {
connections.front()->connection_->close(Network::ConnectionCloseType::NoFlush);
}
// Since is_deleting_ is on, we need to manually remove the map value and drive the iterator.
// Defer delete connection container to avoid race condition in destroying connection.
parent_.dispatcher().deferredDelete(std::move(iter->second));
connections_by_context_.erase(iter);
}
}
is_deleting_ = was_deleting;
}

void ActiveTcpListener::post(Network::ConnectionSocketPtr&& socket) {
// It is not possible to capture a unique_ptr because the post() API copies the lambda, so we must
// bundle the socket inside a shared_ptr that can be captured.
Expand All @@ -376,9 +204,10 @@ void ActiveTcpListener::post(Network::ConnectionSocketPtr&& socket) {
RebalancedSocketSharedPtr socket_to_rebalance = std::make_shared<RebalancedSocket>();
socket_to_rebalance->socket = std::move(socket);

parent_.dispatcher().post([socket_to_rebalance, tag = config_->listenerTag(), &parent = parent_,
handoff = config_->handOffRestoredDestinationConnections()]() {
auto balanced_handler = parent.getBalancedHandlerByTag(tag);
dispatcher().post([socket_to_rebalance, tag = config_->listenerTag(),
&tcp_conn_handler = tcp_conn_handler_,
handoff = config_->handOffRestoredDestinationConnections()]() {
auto balanced_handler = tcp_conn_handler.getBalancedHandlerByTag(tag);
if (balanced_handler.has_value()) {
balanced_handler->get().onAcceptWorker(std::move(socket_to_rebalance->socket), handoff, true);
return;
Expand Down Expand Up @@ -420,7 +249,7 @@ ActiveTcpConnection::ActiveTcpConnection(ActiveConnections& active_connections,
}

ActiveTcpConnection::~ActiveTcpConnection() {
emitLogs(*active_connections_.listener_.config_, *stream_info_);
ActiveStreamListenerBase::emitLogs(*active_connections_.listener_.config_, *stream_info_);
auto& listener = active_connections_.listener_;
listener.stats_.downstream_cx_active_.dec();
listener.stats_.downstream_cx_destroy_.inc();
Expand All @@ -434,5 +263,15 @@ ActiveTcpConnection::~ActiveTcpConnection() {
listener.parent_.decNumConnections();
}

// Network::ConnectionCallbacks
void ActiveTcpConnection::onEvent(Network::ConnectionEvent event) {
ENVOY_LOG(trace, "[C{}] connection on event {}", connection_->id(), static_cast<int>(event));
// Any event leads to destruction of the connection.
if (event == Network::ConnectionEvent::LocalClose ||
event == Network::ConnectionEvent::RemoteClose) {
active_connections_.listener_.removeConnection(*this);
}
}

} // namespace Server
} // namespace Envoy
Loading