Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
cb82711
initial internal listener
lambdai May 20, 2021
4c156ce
for new pr
lambdai Jun 1, 2021
773c75a
merge main
lambdai Jun 2, 2021
b6ff892
delete test/integration/internal_tcp_proxy_integration_test.cc
lambdai Jun 2, 2021
635f185
coverage
lambdai Jun 2, 2021
3615512
remove ClientConnectionImpl ctor for internal socket
lambdai Jun 2, 2021
efe9ce7
revert dispatcher
lambdai Jun 9, 2021
01ee5bb
merged main
lambdai Jun 10, 2021
1564151
revert internal listener
lambdai Jun 11, 2021
9e5b2b4
pragma once
lambdai Jun 11, 2021
e802a6f
test bootstrap
lambdai Jun 11, 2021
c8e4e43
comment other than incNumConnections
lambdai Jun 15, 2021
aac23c1
rename
lambdai Jun 15, 2021
573cdfe
put cleanup, deferdelete in base
lambdai Jun 17, 2021
c1f7921
push down conn collection or active conn op
lambdai Jun 17, 2021
0528d50
Merge branch 'main' into addinternallistener_pre_1
lambdai Jul 1, 2021
0b3c2f0
Merge branch 'addinternallistener_pre_1' of github.com:lambdai/envoy-…
lambdai Jul 1, 2021
bbee979
tmpaddinternallistener_pre_1
lambdai Jul 1, 2021
57ca809
fix format
lambdai Jul 2, 2021
4523c99
address comment
lambdai Jul 2, 2021
dc08e69
Merge remote-tracking branch 'origin/main' into addinternallistener_p…
lambdai Jul 7, 2021
cd7d6e6
Merge branch 'main' into addinternallistener_pre_1on2
lambdai Jul 23, 2021
e724e22
reorg
lambdai Jul 23, 2021
bb46ea5
format
lambdai Jul 26, 2021
942fb9b
more protected
lambdai Jul 26, 2021
aeb163a
add comment
lambdai Jul 27, 2021
056f51a
comment to TypedActiveStreamListenerBase
lambdai Jul 27, 2021
f0fde02
final
lambdai Jul 28, 2021
bfdf05a
another method move
lambdai Jul 28, 2021
02e618c
Merge branch 'main' into addinternallistener_pre_1
lambdai Jul 30, 2021
bd122fa
Merge branch 'main' into addinternallistener_pre_1
lambdai Jul 30, 2021
2f406f5
Merge remote-tracking branch 'me/addinternallistener_pre_1' into addi…
lambdai Aug 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 46 additions & 2 deletions source/server/active_stream_listener_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include "envoy/network/filter.h"

#include "source/common/stats/timespan_impl.h"
#include "source/server/active_tcp_listener.h"

namespace Envoy {
namespace Server {
Expand Down Expand Up @@ -61,7 +60,7 @@ void ActiveStreamListenerBase::newConnection(Network::ConnectionSocketPtr&& sock
newActiveConnection(*filter_chain, std::move(server_conn_ptr), std::move(stream_info));
}

ActiveConnections::ActiveConnections(ActiveTcpListener& listener,
ActiveConnections::ActiveConnections(OwnedActiveStreamListenerBase& listener,
const Network::FilterChain& filter_chain)
: listener_(listener), filter_chain_(filter_chain) {}

Expand Down Expand Up @@ -117,5 +116,50 @@ void ActiveTcpConnection::onEvent(Network::ConnectionEvent event) {
}
}

void OwnedActiveStreamListenerBase::removeConnection(ActiveTcpConnection& connection) {
ENVOY_CONN_LOG(debug, "adding to cleanup list", *connection.connection_);
ActiveConnections& active_connections = connection.active_connections_;
ActiveConnectionPtr removed = connection.removeFromList(active_connections.connections_);
dispatcher().deferredDelete(std::move(removed));
// Delete map entry only iff connections becomes empty.
if (active_connections.connections_.empty()) {
auto iter = connections_by_context_.find(&active_connections.filter_chain_);
ASSERT(iter != connections_by_context_.end());
// To cover the lifetime of every single connection, Connections need to be deferred deleted
// because the previously contained connection is deferred deleted.
dispatcher().deferredDelete(std::move(iter->second));
// The erase will break the iteration over the connections_by_context_ during the deletion.
if (!is_deleting_) {
connections_by_context_.erase(iter);
}
}
}

ActiveConnections& OwnedActiveStreamListenerBase::getOrCreateActiveConnections(
const Network::FilterChain& filter_chain) {
ActiveConnectionCollectionPtr& connections = connections_by_context_[&filter_chain];
if (connections == nullptr) {
connections = std::make_unique<ActiveConnections>(*this, filter_chain);
}
return *connections;
}

void OwnedActiveStreamListenerBase::removeFilterChain(const Network::FilterChain* filter_chain) {
auto iter = connections_by_context_.find(filter_chain);
if (iter == connections_by_context_.end()) {
// It is possible when listener is stopping.
} else {
auto& connections = iter->second->connections_;
while (!connections.empty()) {
connections.front()->connection_->close(Network::ConnectionCloseType::NoFlush);
}
// Since is_deleting_ is on, we need to manually remove the map value and drive the
// iterator. Defer delete connection container to avoid race condition in destroying
// connection.
dispatcher().deferredDelete(std::move(iter->second));
connections_by_context_.erase(iter);
}
}

} // namespace Server
} // namespace Envoy
42 changes: 38 additions & 4 deletions source/server/active_stream_listener_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace Server {
// After the active socket passes all the listener filters, a server connection is created. The
// derived listener must override ``newActiveConnection`` to take the ownership of that server
// connection.
// TODO(lambdai): Refactor the listener filter test cases to adopt this class.
class ActiveStreamListenerBase : public ActiveListenerImplBase,
protected Logger::Loggable<Logger::Id::conn_handler> {
public:
Expand Down Expand Up @@ -137,20 +138,21 @@ class ActiveStreamListenerBase : public ActiveListenerImplBase,
};

struct ActiveTcpConnection;
class ActiveTcpListener;
class OwnedActiveStreamListenerBase;

/**
* Wrapper for a group of active connections which are attached to the same filter chain context.
*/
class ActiveConnections : public Event::DeferredDeletable {
public:
ActiveConnections(ActiveTcpListener& listener, const Network::FilterChain& filter_chain);
ActiveConnections(OwnedActiveStreamListenerBase& listener,
const Network::FilterChain& filter_chain);
~ActiveConnections() override;

// listener filter chain pair is the owner of the connections
ActiveTcpListener& listener_;
OwnedActiveStreamListenerBase& listener_;
const Network::FilterChain& filter_chain_;
// Owned connections
// Owned connections.
std::list<std::unique_ptr<ActiveTcpConnection>> connections_;
};

Expand Down Expand Up @@ -179,5 +181,37 @@ struct ActiveTcpConnection : LinkedObject<ActiveTcpConnection>,
using ActiveConnectionPtr = std::unique_ptr<ActiveTcpConnection>;
using ActiveConnectionCollectionPtr = std::unique_ptr<ActiveConnections>;

// The mixin that handles the composition type ActiveConnectionCollection. This mixin
// provides the connection removal helper and the filter chain removal helper.
// All the prod stream listeners should inherit from this class and leave ActiveStreamListenerBase
// for unit test only.
class OwnedActiveStreamListenerBase : public ActiveStreamListenerBase {
public:
OwnedActiveStreamListenerBase(Network::ConnectionHandler& parent, Event::Dispatcher& dispatcher,
Network::ListenerPtr&& listener, Network::ListenerConfig& config)
: ActiveStreamListenerBase(parent, dispatcher, std::move(listener), config) {}

/**
* Remove and destroy an active connection.
* @param connection supplies the connection to remove.
*/
void removeConnection(ActiveTcpConnection& connection);

protected:
/**
* Return the active connections container attached to the given filter chain.
*/
ActiveConnections& getOrCreateActiveConnections(const Network::FilterChain& filter_chain);

/**
* Remove an filter chain. All the active connections that are attached to the filter chain will
* be destroyed.
* @param filter_chain supplies the filter chain to remove.
*/
void removeFilterChain(const Network::FilterChain* filter_chain) override;

absl::flat_hash_map<const Network::FilterChain*, ActiveConnectionCollectionPtr>
connections_by_context_;
};
} // namespace Server
} // namespace Envoy
58 changes: 6 additions & 52 deletions source/server/active_tcp_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,25 @@
#include "source/common/common/assert.h"
#include "source/common/network/connection_impl.h"
#include "source/common/network/utility.h"
#include "source/common/stats/timespan_impl.h"

namespace Envoy {
namespace Server {

ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent,
Network::ListenerConfig& config, uint32_t worker_index)
: ActiveStreamListenerBase(parent, parent.dispatcher(),
parent.dispatcher().createListener(
config.listenSocketFactory().getListenSocket(worker_index),
*this, config.bindToPort()),
config),
: OwnedActiveStreamListenerBase(parent, parent.dispatcher(),
parent.dispatcher().createListener(
config.listenSocketFactory().getListenSocket(worker_index),
*this, config.bindToPort()),
config),
tcp_conn_handler_(parent) {
config.connectionBalancer().registerHandler(*this);
}

ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent,
Network::ListenerPtr&& listener,
Network::ListenerConfig& config)
: ActiveStreamListenerBase(parent, parent.dispatcher(), std::move(listener), config),
: OwnedActiveStreamListenerBase(parent, parent.dispatcher(), std::move(listener), config),
tcp_conn_handler_(parent) {
config.connectionBalancer().registerHandler(*this);
}
Expand Down Expand Up @@ -62,48 +61,12 @@ ActiveTcpListener::~ActiveTcpListener() {
config_->name(), numConnections()));
}

void ActiveTcpListener::removeConnection(ActiveTcpConnection& connection) {
ENVOY_CONN_LOG(debug, "adding to cleanup list", *connection.connection_);
ActiveConnections& active_connections = connection.active_connections_;
auto removed = connection.removeFromList(active_connections.connections_);
dispatcher().deferredDelete(std::move(removed));
// Delete map entry only iff connections becomes empty.
if (active_connections.connections_.empty()) {
auto iter = connections_by_context_.find(&active_connections.filter_chain_);
ASSERT(iter != connections_by_context_.end());
// To cover the lifetime of every single connection, Connections need to be deferred deleted
// because the previously contained connection is deferred deleted.
dispatcher().deferredDelete(std::move(iter->second));
// The erase will break the iteration over the connections_by_context_ during the deletion.
if (!is_deleting_) {
connections_by_context_.erase(iter);
}
}
}

void ActiveTcpListener::updateListenerConfig(Network::ListenerConfig& config) {
ENVOY_LOG(trace, "replacing listener ", config_->listenerTag(), " by ", config.listenerTag());
ASSERT(&config_->connectionBalancer() == &config.connectionBalancer());
config_ = &config;
}

void ActiveTcpListener::removeFilterChain(const Network::FilterChain* filter_chain) {
auto iter = connections_by_context_.find(filter_chain);
if (iter == connections_by_context_.end()) {
// It is possible when listener is stopping.
} else {
auto& connections = iter->second->connections_;
while (!connections.empty()) {
connections.front()->connection_->close(Network::ConnectionCloseType::NoFlush);
}
// Since is_deleting_ is on, we need to manually remove the map value and drive the
// iterator. Defer delete connection container to avoid race condition in destroying
// connection.
dispatcher().deferredDelete(std::move(iter->second));
connections_by_context_.erase(iter);
}
}

void ActiveTcpListener::onAccept(Network::ConnectionSocketPtr&& socket) {
if (listenerConnectionLimitReached()) {
RELEASE_ASSERT(socket->addressProvider().remoteAddress() != nullptr, "");
Expand Down Expand Up @@ -179,15 +142,6 @@ void ActiveTcpListener::newActiveConnection(const Network::FilterChain& filter_c
}
}

ActiveConnections&
ActiveTcpListener::getOrCreateActiveConnections(const Network::FilterChain& filter_chain) {
ActiveConnectionCollectionPtr& connections = connections_by_context_[&filter_chain];
if (connections == nullptr) {
connections = std::make_unique<ActiveConnections>(*this, filter_chain);
}
return *connections;
}

void ActiveTcpListener::post(Network::ConnectionSocketPtr&& socket) {
// It is not possible to capture a unique_ptr because the post() API copies the lambda, so we must
// bundle the socket inside a shared_ptr that can be captured.
Expand Down
19 changes: 1 addition & 18 deletions source/server/active_tcp_listener.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include "envoy/event/dispatcher.h"
#include "envoy/stats/timespan.h"
#include "envoy/stream_info/stream_info.h"

#include "source/common/common/linked_object.h"
Expand All @@ -23,7 +22,7 @@ using RebalancedSocketSharedPtr = std::shared_ptr<RebalancedSocket>;
* Wrapper for an active tcp listener owned by this handler.
*/
class ActiveTcpListener final : public Network::TcpListenerCallbacks,
public ActiveStreamListenerBase,
public OwnedActiveStreamListenerBase,
public Network::BalancedConnectionHandler {
public:
ActiveTcpListener(Network::TcpConnectionHandler& parent, Network::ListenerConfig& config,
Expand Down Expand Up @@ -71,28 +70,12 @@ class ActiveTcpListener final : public Network::TcpListenerCallbacks,
Network::ServerConnectionPtr server_conn_ptr,
std::unique_ptr<StreamInfo::StreamInfo> stream_info) override;

/**
* Return the active connections container attached with the given filter chain.
*/
ActiveConnections& getOrCreateActiveConnections(const Network::FilterChain& filter_chain);

/**
* Update the listener config. The follow up connections will see the new config. The existing
* connections are not impacted.
*/
void updateListenerConfig(Network::ListenerConfig& config);

void removeFilterChain(const Network::FilterChain* filter_chain) override;

/**
* Remove and destroy an active connection.
* @param connection supplies the connection to remove.
*/
void removeConnection(ActiveTcpConnection& connection);

absl::flat_hash_map<const Network::FilterChain*, std::unique_ptr<ActiveConnections>>
connections_by_context_;

Network::TcpConnectionHandler& tcp_conn_handler_;
// The number of connections currently active on this listener. This is typically used for
// connection balancing across per-handler listeners.
Expand Down