Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
cb82711
initial internal listener
lambdai May 20, 2021
4c156ce
for new pr
lambdai Jun 1, 2021
773c75a
merge main
lambdai Jun 2, 2021
b6ff892
delete test/integration/internal_tcp_proxy_integration_test.cc
lambdai Jun 2, 2021
635f185
coverage
lambdai Jun 2, 2021
3615512
remove ClientConnectionImpl ctor for internal socket
lambdai Jun 2, 2021
efe9ce7
revert dispatcher
lambdai Jun 9, 2021
01ee5bb
merged main
lambdai Jun 10, 2021
1564151
revert internal listener
lambdai Jun 11, 2021
9e5b2b4
pragma once
lambdai Jun 11, 2021
e802a6f
test bootstrap
lambdai Jun 11, 2021
c8e4e43
comment other than incNumConnections
lambdai Jun 15, 2021
aac23c1
rename
lambdai Jun 15, 2021
573cdfe
put cleanup, deferdelete in base
lambdai Jun 17, 2021
c1f7921
push down conn collection or active conn op
lambdai Jun 17, 2021
0528d50
Merge branch 'main' into addinternallistener_pre_1
lambdai Jul 1, 2021
0b3c2f0
Merge branch 'addinternallistener_pre_1' of github.com:lambdai/envoy-…
lambdai Jul 1, 2021
bbee979
tmpaddinternallistener_pre_1
lambdai Jul 1, 2021
57ca809
fix format
lambdai Jul 2, 2021
4523c99
address comment
lambdai Jul 2, 2021
dc08e69
Merge remote-tracking branch 'origin/main' into addinternallistener_p…
lambdai Jul 7, 2021
dfe40e4
listener: extract active_tcp_socket and active_stream_listener_base
lambdai Jul 15, 2021
a497d0b
add missing files
lambdai Jul 15, 2021
ef4fc26
new line at EOF
lambdai Jul 15, 2021
feebbc3
address comments
lambdai Jul 21, 2021
df6f495
format
lambdai Jul 22, 2021
ee35a0f
merge main
lambdai Jul 22, 2021
fa60a09
revert flat_hash_map
lambdai Jul 22, 2021
87060fc
base listener: add removeSocket and const socket list accessor
lambdai Jul 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 75 additions & 10 deletions source/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ envoy_cc_library(
envoy_cc_library(
name = "connection_handler_lib",
deps = [
"//source/server:active_tcp_listener",
"//source/server:active_udp_listener",
"//source/server:connection_handler_impl",
":active_tcp_listener",
":active_udp_listener",
":connection_handler_impl",
],
)

Expand Down Expand Up @@ -117,25 +117,20 @@ envoy_cc_library(
"active_tcp_listener.h",
],
deps = [
":active_stream_listener_base",
":active_tcp_socket",
"//envoy/common:time_interface",
"//envoy/event:deferred_deletable",
"//envoy/event:dispatcher_interface",
"//envoy/event:timer_interface",
"//envoy/network:connection_handler_interface",
"//envoy/network:connection_interface",
"//envoy/network:filter_interface",
"//envoy/network:listen_socket_interface",
"//envoy/network:listener_interface",
"//envoy/server:listener_manager_interface",
"//envoy/stats:timespan_interface",
"//source/common/common:assert_lib",
"//source/common/common:linked_object",
"//source/common/common:non_copyable",
"//source/common/common:safe_memcpy_lib",
"//source/common/event:deferred_task",
"//source/common/network:connection_lib",
"//source/common/stats:timespan_lib",
"//source/common/stream_info:stream_info_lib",
"//source/server:active_listener_base",
],
)
Expand All @@ -158,6 +153,76 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "active_tcp_listener_headers",
hdrs = [
"active_stream_listener_base.h",
"active_tcp_socket.h",
],
deps = [
":active_listener_base",
"//envoy/common:time_interface",
"//envoy/event:deferred_deletable",
"//envoy/event:dispatcher_interface",
"//envoy/event:timer_interface",
"//envoy/network:connection_handler_interface",
"//envoy/network:connection_interface",
"//envoy/network:filter_interface",
"//envoy/network:listen_socket_interface",
"//envoy/network:listener_interface",
"//envoy/server:listener_manager_interface",
"//source/common/common:linked_object",
],
)

envoy_cc_library(
name = "active_tcp_socket",
srcs = ["active_tcp_socket.cc"],
hdrs = [
"active_tcp_socket.h",
],
deps = [
":active_listener_base",
":active_tcp_listener_headers",
"//envoy/common:time_interface",
"//envoy/event:deferred_deletable",
"//envoy/event:dispatcher_interface",
"//envoy/event:timer_interface",
"//envoy/network:connection_handler_interface",
"//envoy/network:connection_interface",
"//envoy/network:filter_interface",
"//envoy/network:listen_socket_interface",
"//envoy/network:listener_interface",
"//source/common/common:linked_object",
"//source/common/network:connection_lib",
"//source/common/stream_info:stream_info_lib",
],
)

envoy_cc_library(
name = "active_stream_listener_base",
srcs = ["active_stream_listener_base.cc"],
hdrs = [
"active_stream_listener_base.h",
],
deps = [
":active_listener_base",
":active_tcp_listener_headers",
"//envoy/common:time_interface",
"//envoy/event:deferred_deletable",
"//envoy/event:dispatcher_interface",
"//envoy/event:timer_interface",
"//envoy/network:connection_handler_interface",
"//envoy/network:connection_interface",
"//envoy/network:filter_interface",
"//envoy/network:listen_socket_interface",
"//envoy/network:listener_interface",
"//envoy/stream_info:stream_info_interface",
"//source/common/common:linked_object",
"//source/common/network:connection_lib",
],
)

envoy_cc_library(
name = "drain_manager_lib",
srcs = ["drain_manager_impl.cc"],
Expand Down
62 changes: 62 additions & 0 deletions source/server/active_stream_listener_base.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#include "source/server/active_stream_listener_base.h"

#include "envoy/network/filter.h"

namespace Envoy {
namespace Server {

ActiveStreamListenerBase::ActiveStreamListenerBase(Network::ConnectionHandler& parent,
Event::Dispatcher& dispatcher,
Network::ListenerPtr&& listener,
Network::ListenerConfig& config)
: ActiveListenerImplBase(parent, &config), parent_(parent),
listener_filters_timeout_(config.listenerFiltersTimeout()),
continue_on_listener_filters_timeout_(config.continueOnListenerFiltersTimeout()),
listener_(std::move(listener)), dispatcher_(dispatcher) {}

void ActiveStreamListenerBase::emitLogs(Network::ListenerConfig& config,
StreamInfo::StreamInfo& stream_info) {
stream_info.onRequestComplete();
for (const auto& access_log : config.accessLogs()) {
access_log->log(nullptr, nullptr, nullptr, stream_info);
}
}

void ActiveStreamListenerBase::newConnection(Network::ConnectionSocketPtr&& socket,
std::unique_ptr<StreamInfo::StreamInfo> stream_info) {
// Find matching filter chain.
const auto filter_chain = config_->filterChainManager().findFilterChain(*socket);
if (filter_chain == nullptr) {
RELEASE_ASSERT(socket->addressProvider().remoteAddress() != nullptr, "");
ENVOY_LOG(debug, "closing connection from {}: no matching filter chain found",
socket->addressProvider().remoteAddress()->asString());
stats_.no_filter_chain_match_.inc();
stream_info->setResponseFlag(StreamInfo::ResponseFlag::NoRouteFound);
stream_info->setResponseCodeDetails(StreamInfo::ResponseCodeDetails::get().FilterChainNotFound);
emitLogs(*config_, *stream_info);
socket->close();
return;
}
stream_info->setFilterChainName(filter_chain->name());
auto transport_socket = filter_chain->transportSocketFactory().createTransportSocket(nullptr);
stream_info->setDownstreamSslConnection(transport_socket->ssl());
auto server_conn_ptr = dispatcher().createServerConnection(
std::move(socket), std::move(transport_socket), *stream_info);
if (const auto timeout = filter_chain->transportSocketConnectTimeout();
timeout != std::chrono::milliseconds::zero()) {
server_conn_ptr->setTransportSocketConnectTimeout(timeout);
}
server_conn_ptr->setBufferLimits(config_->perConnectionBufferLimitBytes());
RELEASE_ASSERT(server_conn_ptr->addressProvider().remoteAddress() != nullptr, "");
const bool empty_filter_chain = !config_->filterChainFactory().createNetworkFilterChain(
*server_conn_ptr, filter_chain->networkFilterFactories());
if (empty_filter_chain) {
ENVOY_CONN_LOG(debug, "closing connection from {}: no filters", *server_conn_ptr,
server_conn_ptr->addressProvider().remoteAddress()->asString());
server_conn_ptr->close(Network::ConnectionCloseType::NoFlush);
}
newActiveConnection(*filter_chain, std::move(server_conn_ptr), std::move(stream_info));
}

} // namespace Server
} // namespace Envoy
139 changes: 139 additions & 0 deletions source/server/active_stream_listener_base.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#pragma once

#include <atomic>
#include <cstdint>
#include <list>
#include <memory>

#include "envoy/common/time.h"
#include "envoy/event/dispatcher.h"
#include "envoy/network/connection.h"
#include "envoy/network/connection_handler.h"
#include "envoy/network/listener.h"
#include "envoy/stream_info/stream_info.h"

#include "source/common/common/linked_object.h"
#include "source/server/active_listener_base.h"
#include "source/server/active_tcp_socket.h"

namespace Envoy {
namespace Server {

// The base class of the stream listener. It owns listener filter handling of active sockets.
// After the active socket passes all the listener filters, a server connection is created. The
// derived listener must override ``newActiveConnection`` to take the ownership of that server
// connection.
class ActiveStreamListenerBase : public ActiveListenerImplBase,
protected Logger::Loggable<Logger::Id::conn_handler> {
public:
ActiveStreamListenerBase(Network::ConnectionHandler& parent, Event::Dispatcher& dispatcher,
Network::ListenerPtr&& listener, Network::ListenerConfig& config);
static void emitLogs(Network::ListenerConfig& config, StreamInfo::StreamInfo& stream_info);

Event::Dispatcher& dispatcher() { return dispatcher_; }

/**
* Schedule to remove and destroy the active connections which are not tracked by listener
* config. Caution: The connection are not destroyed yet when function returns.
*/
void
deferredRemoveFilterChains(const std::list<const Network::FilterChain*>& draining_filter_chains) {
// Need to recover the original deleting state.
const bool was_deleting = is_deleting_;
is_deleting_ = true;
for (const auto* filter_chain : draining_filter_chains) {
removeFilterChain(filter_chain);
}
is_deleting_ = was_deleting;
}

virtual void incNumConnections() PURE;
virtual void decNumConnections() PURE;

/**
* Create a new connection from a socket accepted by the listener.
*/
void newConnection(Network::ConnectionSocketPtr&& socket,
std::unique_ptr<StreamInfo::StreamInfo> stream_info);

/**
* Remove the socket from this listener. Should be called when the socket passes the listener
* filter.
* @return std::unique_ptr<ActiveTcpSocket> the exact same socket in the parameter but in the
* state that not owned by the listener.
*/
std::unique_ptr<ActiveTcpSocket> removeSocket(ActiveTcpSocket&& socket) {
return socket.removeFromList(sockets_);
}

/**
* @return const std::list<std::unique_ptr<ActiveTcpSocket>>& the sockets going through the
* listener filters.
*/
const std::list<std::unique_ptr<ActiveTcpSocket>>& sockets() const { return sockets_; }

/**
* Schedule removal and destruction of all active connections owned by a filter chain.
*/
virtual void removeFilterChain(const Network::FilterChain* filter_chain) PURE;

virtual Network::BalancedConnectionHandlerOptRef
getBalancedHandlerByAddress(const Network::Address::Instance& address) PURE;

void onSocketAccepted(std::unique_ptr<ActiveTcpSocket> active_socket) {
// Create and run the filters
config_->filterChainFactory().createListenerFilterChain(*active_socket);
active_socket->continueFilterChain(true);

// Move active_socket to the sockets_ list if filter iteration needs to continue later.
// Otherwise we let active_socket be destructed when it goes out of scope.
if (active_socket->iter_ != active_socket->accept_filters_.end()) {
active_socket->startTimer();
LinkedList::moveIntoListBack(std::move(active_socket), sockets_);
} else {
if (!active_socket->connected_) {
// If active_socket is about to be destructed, emit logs if a connection is not created.
if (active_socket->stream_info_ != nullptr) {
emitLogs(*config_, *active_socket->stream_info_);
} else {
// If the active_socket is not connected, this socket is not promoted to active
// connection. Thus the stream_info_ is owned by this active socket.
ENVOY_BUG(active_socket->stream_info_ != nullptr,
"the unconnected active socket must have stream info.");
}
}
}
}

// Below members are open to access by ActiveTcpSocket.
Network::ConnectionHandler& parent_;
const std::chrono::milliseconds listener_filters_timeout_;
const bool continue_on_listener_filters_timeout_;

protected:
/**
* Create the active connection from server connection. This active listener owns the created
* active connection.
*
* @param filter_chain The network filter chain linking to the connection.
* @param server_conn_ptr The server connection.
* @param stream_info The stream info of the active connection.
*/
virtual void newActiveConnection(const Network::FilterChain& filter_chain,
Network::ServerConnectionPtr server_conn_ptr,
std::unique_ptr<StreamInfo::StreamInfo> stream_info) PURE;

std::list<std::unique_ptr<ActiveTcpSocket>> sockets_;
Network::ListenerPtr listener_;
// True if the follow up connection deletion is raised by the connection collection deletion is
// performing. Otherwise, the collection should be deleted when the last connection in the
// collection is removed. This state is maintained in base class because this state is independent
// from concrete connection type.
bool is_deleting_{false};

private:
Event::Dispatcher& dispatcher_;
};

} // namespace Server
} // namespace Envoy
Loading