Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 9 additions & 17 deletions envoy/network/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,20 +271,6 @@ class ListenerFilterCallbacks {
*/
virtual ConnectionSocket& socket() PURE;

/**
* @return the Dispatcher for issuing events.
*/
virtual Event::Dispatcher& dispatcher() PURE;

/**
* If a filter stopped filter iteration by returning FilterStatus::StopIteration,
* the filter should call continueFilterChain(true) when complete to continue the filter chain,
* or continueFilterChain(false) if the filter execution failed and the connection must be
* closed.
* @param success boolean telling whether the filter execution was successful or not.
*/
virtual void continueFilterChain(bool success) PURE;

/**
* @param name the namespace used in the metadata in reverse DNS format, for example:
* envoy.test.my_filter.
Expand Down Expand Up @@ -326,15 +312,21 @@ class ListenerFilter {

/**
* Called when a new connection is accepted, but before a Connection is created.
* Filter chain iteration can be stopped if needed.
* Filter chain iteration can be stopped if need more data from the connection
* by returning `FilterStatus::StopIteration`, or continue the filter chain iteration
* by returning `FilterStatus::ContinueIteration`. Reject the connection by closing
* the socket and returning `FilterStatus::StopIteration`.
* @param cb the callbacks the filter instance can use to communicate with the filter chain.
* @return status used by the filter manager to manage further filter iteration.
*/
virtual FilterStatus onAccept(ListenerFilterCallbacks& cb) PURE;

/**
* Called when data read from the connection. If the filter chain doesn't get
* enough data, the filter chain can be stopped, then waiting for more data.
* Called when data is read from the connection. If the filter doesn't get
* enough data, filter chain iteration can be stopped if needed by returning
* `FilterStatus::StopIteration`. Or continue the filter chain iteration by returning
* `FilterStatus::ContinueIteration` if the filter get enough data. Reject the connection
* by closing the socket and returning `FilterStatus::StopIteration`.
* @param buffer the buffer of data.
* @return status used by the filter manager to manage further filter iteration.
*/
Expand Down
16 changes: 8 additions & 8 deletions source/server/active_stream_listener_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,28 +85,28 @@ class ActiveStreamListenerBase : public ActiveListenerImplBase,
void onSocketAccepted(std::unique_ptr<ActiveTcpSocket> active_socket) {
// Create and run the filters
if (config_->filterChainFactory().createListenerFilterChain(*active_socket)) {
active_socket->continueFilterChain(true);
active_socket->startFilterChain();
} else {
// If create listener filter chain failed, it means the listener is missing
// config due to the ECDS. Then close the connection directly.
active_socket->socket_->close();
ASSERT(active_socket->iter_ == active_socket->accept_filters_.end());
active_socket->socket().close();
ASSERT(active_socket->isEndFilterIteration());
}

// Move active_socket to the sockets_ list if filter iteration needs to continue later.
// Otherwise we let active_socket be destructed when it goes out of scope.
if (active_socket->iter_ != active_socket->accept_filters_.end()) {
if (!active_socket->isEndFilterIteration()) {
active_socket->startTimer();
LinkedList::moveIntoListBack(std::move(active_socket), sockets_);
} else {
if (!active_socket->connected_) {
if (!active_socket->connected()) {
// If active_socket is about to be destructed, emit logs if a connection is not created.
if (active_socket->stream_info_ != nullptr) {
emitLogs(*config_, *active_socket->stream_info_);
if (active_socket->streamInfo() != nullptr) {
emitLogs(*config_, *active_socket->streamInfo());
} else {
// If the active_socket is not connected, this socket is not promoted to active
// connection. Thus the stream_info_ is owned by this active socket.
ENVOY_BUG(active_socket->stream_info_ != nullptr,
ENVOY_BUG(active_socket->streamInfo() != nullptr,
"the unconnected active socket must have stream info.");
}
}
Expand Down
2 changes: 0 additions & 2 deletions source/server/active_tcp_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ ActiveTcpSocket::~ActiveTcpSocket() {
}
}

Event::Dispatcher& ActiveTcpSocket::dispatcher() { return listener_.dispatcher(); }

void ActiveTcpSocket::onTimeout() {
listener_.stats_.downstream_pre_cx_timeout_.inc();
ASSERT(inserted());
Expand Down
30 changes: 22 additions & 8 deletions source/server/active_tcp_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ class ActiveStreamListenerBase;
/**
* Wrapper for an active accepted socket owned by the active tcp listener.
*/
struct ActiveTcpSocket : public Network::ListenerFilterManager,
public Network::ListenerFilterCallbacks,
LinkedObject<ActiveTcpSocket>,
public Event::DeferredDeletable,
Logger::Loggable<Logger::Id::conn_handler> {
class ActiveTcpSocket : public Network::ListenerFilterManager,
public Network::ListenerFilterCallbacks,
public LinkedObject<ActiveTcpSocket>,
public Event::DeferredDeletable,
Logger::Loggable<Logger::Id::conn_handler> {
public:
ActiveTcpSocket(ActiveStreamListenerBase& listener, Network::ConnectionSocketPtr&& socket,
bool hand_off_restored_destination_connections);
~ActiveTcpSocket() override;
Expand Down Expand Up @@ -83,17 +84,30 @@ struct ActiveTcpSocket : public Network::ListenerFilterManager,

// Network::ListenerFilterCallbacks
Network::ConnectionSocket& socket() override { return *socket_.get(); }
Event::Dispatcher& dispatcher() override;
void continueFilterChain(bool success) override;

void startFilterChain() { continueFilterChain(true); }

void setDynamicMetadata(const std::string& name, const ProtobufWkt::Struct& value) override;
envoy::config::core::v3::Metadata& dynamicMetadata() override {
return stream_info_->dynamicMetadata();
};
const envoy::config::core::v3::Metadata& dynamicMetadata() const override {
return stream_info_->dynamicMetadata();
};

StreamInfo::FilterState& filterState() override { return *stream_info_->filterState().get(); }
StreamInfo::StreamInfo* streamInfo() const { return stream_info_.get(); }
bool connected() const { return connected_; }
bool isEndFilterIteration() const { return iter_ == accept_filters_.end(); }

private:
/**
* If a filter returned `FilterStatus::ContinueIteration`, `continueFilterChain(true)`
* should be called to continue the filter chain iteration. Or `continueFilterChain(false)`
* should be called if the filter returned `FilterStatus::StopIteration` and closed
* the socket.
* @param success boolean telling whether the filter execution was successful or not.
*/
void continueFilterChain(bool success);

void createListenerFilterBuffer();

Expand Down
2 changes: 1 addition & 1 deletion test/server/active_tcp_listener_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ TEST_F(ActiveTcpListenerTest, PopulateSNIWhenActiveTcpSocketTimeout) {
generic_active_listener_->sockets().front()->onTimeout();

EXPECT_EQ(server_name,
tcp_socket->stream_info_->downstreamAddressProvider().requestedServerName());
tcp_socket->streamInfo()->downstreamAddressProvider().requestedServerName());
}

// Verify that the server connection with recovered address is rebalanced at redirected listener.
Expand Down
40 changes: 26 additions & 14 deletions test/server/connection_handler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "gtest/gtest.h"

using testing::_;
using testing::ByMove;
using testing::InSequence;
using testing::Invoke;
using testing::MockFunction;
Expand Down Expand Up @@ -2085,39 +2086,50 @@ TEST_F(ConnectionHandlerTest, ListenerFilterTimeoutResetOnSuccess) {
TestListener* test_listener =
addListener(1, true, false, "test_listener", listener, &listener_callbacks);
handler_->addListener(absl::nullopt, *test_listener, runtime_);

Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(123);
size_t max_size = 10;
Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(max_size);
EXPECT_CALL(factory_, createListenerFilterChain(_))
.WillRepeatedly(Invoke([&](Network::ListenerFilterManager& manager) -> bool {
manager.addAcceptFilter(listener_filter_matcher_, Network::ListenerFilterPtr{test_filter});
return true;
}));
Network::ListenerFilterCallbacks* listener_filter_cb{};
Network::MockConnectionSocket* accepted_socket = new NiceMock<Network::MockConnectionSocket>();
std::string data = "test";
EXPECT_CALL(*test_filter, onAccept(_))
.WillOnce(Invoke([&](Network::ListenerFilterCallbacks& cb) -> Network::FilterStatus {
listener_filter_cb = &cb;
.WillOnce(Invoke([](Network::ListenerFilterCallbacks&) -> Network::FilterStatus {
return Network::FilterStatus::StopIteration;
}));
Network::IoSocketHandleImpl io_handle{42};
EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle)).RetiresOnSaturation();
EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle)).RetiresOnSaturation();
Event::FileEvent* file_event = new NiceMock<Event::MockFileEvent>();
EXPECT_CALL(dispatcher_, createFileEvent_(_, _, _, _)).WillOnce(Return(file_event));
Network::MockIoHandle io_handle;
EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle));
EXPECT_CALL(io_handle, isOpen()).WillOnce(Return(true));
EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle));

Event::FileReadyCb file_event_callback;
EXPECT_CALL(io_handle,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
.WillOnce(SaveArg<1>(&file_event_callback));
EXPECT_CALL(io_handle, activateFileEvents(_));

Event::MockTimer* timeout = new Event::MockTimer(&dispatcher_);
EXPECT_CALL(*timeout, enableTimer(std::chrono::milliseconds(15000), _));
listener_callbacks->onAccept(Network::ConnectionSocketPtr{accepted_socket});

EXPECT_CALL(io_handle, recv(_, _, _))
.WillOnce(Return(ByMove(
Api::IoCallUint64Result(max_size, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})))));
EXPECT_CALL(*test_filter, onData(_))
.WillOnce(Invoke([](Network::ListenerFilterBuffer&) -> Network::FilterStatus {
return Network::FilterStatus::Continue;
}));
EXPECT_CALL(io_handle, resetFileEvents());
EXPECT_CALL(*test_filter, destroy_());
EXPECT_CALL(manager_, findFilterChain(_)).WillOnce(Return(nullptr));
EXPECT_CALL(*access_log_, log(_, _, _, _));
EXPECT_CALL(*timeout, disableTimer());
listener_filter_cb->continueFilterChain(true);

Event::FileEvent* file_event2 = new NiceMock<Event::MockFileEvent>();
EXPECT_CALL(dispatcher_, createFileEvent_(_, _, _, _)).WillOnce(Return(file_event2));
file_event_callback(Event::FileReadyType::Read);

EXPECT_CALL(io_handle, createFileEvent_(_, _, _, _));
EXPECT_CALL(*listener, onDestroy());

// Verify the file event created by listener filter was reset. If not
Expand Down