diff --git a/envoy/network/filter.h b/envoy/network/filter.h index aa8668e64c8ed..ceeee6b533e51 100644 --- a/envoy/network/filter.h +++ b/envoy/network/filter.h @@ -286,6 +286,20 @@ class ListenerFilterCallbacks { */ virtual ConnectionSocket& socket() PURE; + /** + * @return the Dispatcher for issuing events. + */ + virtual Event::Dispatcher& dispatcher() PURE; + + /** + * If a filter stopped filter iteration by returning FilterStatus::StopIteration, + * the filter should call continueFilterChain(true) when complete to continue the filter chain, + * or continueFilterChain(false) if the filter execution failed and the connection must be + * closed. + * @param success boolean telling whether the filter execution was successful or not. + */ + virtual void continueFilterChain(bool success) PURE; + /** * @param name the namespace used in the metadata in reverse DNS format, for example: * envoy.test.my_filter. @@ -327,21 +341,15 @@ class ListenerFilter { /** * Called when a new connection is accepted, but before a Connection is created. - * Filter chain iteration can be stopped if need more data from the connection - * by returning `FilterStatus::StopIteration`, or continue the filter chain iteration - * by returning `FilterStatus::ContinueIteration`. Reject the connection by closing - * the socket and returning `FilterStatus::StopIteration`. + * Filter chain iteration can be stopped if needed. * @param cb the callbacks the filter instance can use to communicate with the filter chain. * @return status used by the filter manager to manage further filter iteration. */ virtual FilterStatus onAccept(ListenerFilterCallbacks& cb) PURE; /** - * Called when data is read from the connection. If the filter doesn't get - * enough data, filter chain iteration can be stopped if needed by returning - * `FilterStatus::StopIteration`. Or continue the filter chain iteration by returning - * `FilterStatus::ContinueIteration` if the filter get enough data. Reject the connection - * by closing the socket and returning `FilterStatus::StopIteration`. + * Called when data read from the connection. If the filter chain doesn't get + * enough data, the filter chain can be stopped, then waiting for more data. * @param buffer the buffer of data. * @return status used by the filter manager to manage further filter iteration. */ diff --git a/source/server/active_stream_listener_base.h b/source/server/active_stream_listener_base.h index b88b0adb6e757..1786c809300ca 100644 --- a/source/server/active_stream_listener_base.h +++ b/source/server/active_stream_listener_base.h @@ -85,28 +85,28 @@ class ActiveStreamListenerBase : public ActiveListenerImplBase, void onSocketAccepted(std::unique_ptr active_socket) { // Create and run the filters if (config_->filterChainFactory().createListenerFilterChain(*active_socket)) { - active_socket->startFilterChain(); + active_socket->continueFilterChain(true); } else { // If create listener filter chain failed, it means the listener is missing // config due to the ECDS. Then close the connection directly. - active_socket->socket().close(); - ASSERT(active_socket->isEndFilterIteration()); + active_socket->socket_->close(); + ASSERT(active_socket->iter_ == active_socket->accept_filters_.end()); } // Move active_socket to the sockets_ list if filter iteration needs to continue later. // Otherwise we let active_socket be destructed when it goes out of scope. - if (!active_socket->isEndFilterIteration()) { + if (active_socket->iter_ != active_socket->accept_filters_.end()) { active_socket->startTimer(); LinkedList::moveIntoListBack(std::move(active_socket), sockets_); } else { - if (!active_socket->connected()) { + if (!active_socket->connected_) { // If active_socket is about to be destructed, emit logs if a connection is not created. - if (active_socket->streamInfo() != nullptr) { - emitLogs(*config_, *active_socket->streamInfo()); + if (active_socket->stream_info_ != nullptr) { + emitLogs(*config_, *active_socket->stream_info_); } else { // If the active_socket is not connected, this socket is not promoted to active // connection. Thus the stream_info_ is owned by this active socket. - ENVOY_BUG(active_socket->streamInfo() != nullptr, + ENVOY_BUG(active_socket->stream_info_ != nullptr, "the unconnected active socket must have stream info."); } } diff --git a/source/server/active_tcp_socket.cc b/source/server/active_tcp_socket.cc index b056692f42047..c6804531cafee 100644 --- a/source/server/active_tcp_socket.cc +++ b/source/server/active_tcp_socket.cc @@ -36,6 +36,8 @@ ActiveTcpSocket::~ActiveTcpSocket() { } } +Event::Dispatcher& ActiveTcpSocket::dispatcher() { return listener_.dispatcher(); } + void ActiveTcpSocket::onTimeout() { listener_.stats_.downstream_pre_cx_timeout_.inc(); ASSERT(inserted()); diff --git a/source/server/active_tcp_socket.h b/source/server/active_tcp_socket.h index 30ce0dda8f84b..8e46aa7eba573 100644 --- a/source/server/active_tcp_socket.h +++ b/source/server/active_tcp_socket.h @@ -24,12 +24,11 @@ class ActiveStreamListenerBase; /** * Wrapper for an active accepted socket owned by the active tcp listener. */ -class ActiveTcpSocket : public Network::ListenerFilterManager, - public Network::ListenerFilterCallbacks, - public LinkedObject, - public Event::DeferredDeletable, - Logger::Loggable { -public: +struct ActiveTcpSocket : public Network::ListenerFilterManager, + public Network::ListenerFilterCallbacks, + LinkedObject, + public Event::DeferredDeletable, + Logger::Loggable { ActiveTcpSocket(ActiveStreamListenerBase& listener, Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections); ~ActiveTcpSocket() override; @@ -84,9 +83,8 @@ class ActiveTcpSocket : public Network::ListenerFilterManager, // Network::ListenerFilterCallbacks Network::ConnectionSocket& socket() override { return *socket_.get(); } - - void startFilterChain() { continueFilterChain(true); } - + Event::Dispatcher& dispatcher() override; + void continueFilterChain(bool success) override; void setDynamicMetadata(const std::string& name, const ProtobufWkt::Struct& value) override; envoy::config::core::v3::Metadata& dynamicMetadata() override { return stream_info_->dynamicMetadata(); @@ -94,20 +92,8 @@ class ActiveTcpSocket : public Network::ListenerFilterManager, const envoy::config::core::v3::Metadata& dynamicMetadata() const override { return stream_info_->dynamicMetadata(); }; + StreamInfo::FilterState& filterState() override { return *stream_info_->filterState().get(); } - StreamInfo::StreamInfo* streamInfo() const { return stream_info_.get(); } - bool connected() const { return connected_; } - bool isEndFilterIteration() const { return iter_ == accept_filters_.end(); } - -private: - /** - * If a filter returned `FilterStatus::ContinueIteration`, `continueFilterChain(true)` - * should be called to continue the filter chain iteration. Or `continueFilterChain(false)` - * should be called if the filter returned `FilterStatus::StopIteration` and closed - * the socket. - * @param success boolean telling whether the filter execution was successful or not. - */ - void continueFilterChain(bool success); void createListenerFilterBuffer(); diff --git a/test/server/active_tcp_listener_test.cc b/test/server/active_tcp_listener_test.cc index d2665c5417411..9cc116b0020f6 100644 --- a/test/server/active_tcp_listener_test.cc +++ b/test/server/active_tcp_listener_test.cc @@ -482,7 +482,7 @@ TEST_F(ActiveTcpListenerTest, PopulateSNIWhenActiveTcpSocketTimeout) { generic_active_listener_->sockets().front()->onTimeout(); EXPECT_EQ(server_name, - tcp_socket->streamInfo()->downstreamAddressProvider().requestedServerName()); + tcp_socket->stream_info_->downstreamAddressProvider().requestedServerName()); } // Verify that the server connection with recovered address is rebalanced at redirected listener. diff --git a/test/server/connection_handler_test.cc b/test/server/connection_handler_test.cc index 3f15b2ca35c56..95eb6298e8d57 100644 --- a/test/server/connection_handler_test.cc +++ b/test/server/connection_handler_test.cc @@ -35,7 +35,6 @@ #include "gtest/gtest.h" using testing::_; -using testing::ByMove; using testing::InSequence; using testing::Invoke; using testing::MockFunction; @@ -2086,50 +2085,39 @@ TEST_F(ConnectionHandlerTest, ListenerFilterTimeoutResetOnSuccess) { TestListener* test_listener = addListener(1, true, false, "test_listener", listener, &listener_callbacks); handler_->addListener(absl::nullopt, *test_listener, runtime_); - size_t max_size = 10; - Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(max_size); + + Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(123); EXPECT_CALL(factory_, createListenerFilterChain(_)) .WillRepeatedly(Invoke([&](Network::ListenerFilterManager& manager) -> bool { manager.addAcceptFilter(listener_filter_matcher_, Network::ListenerFilterPtr{test_filter}); return true; })); + Network::ListenerFilterCallbacks* listener_filter_cb{}; Network::MockConnectionSocket* accepted_socket = new NiceMock(); + std::string data = "test"; EXPECT_CALL(*test_filter, onAccept(_)) - .WillOnce(Invoke([](Network::ListenerFilterCallbacks&) -> Network::FilterStatus { + .WillOnce(Invoke([&](Network::ListenerFilterCallbacks& cb) -> Network::FilterStatus { + listener_filter_cb = &cb; return Network::FilterStatus::StopIteration; })); - Network::MockIoHandle io_handle; - EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle)); - EXPECT_CALL(io_handle, isOpen()).WillOnce(Return(true)); - EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle)); - - Event::FileReadyCb file_event_callback; - EXPECT_CALL(io_handle, - createFileEvent_(_, _, Event::PlatformDefaultTriggerType, - Event::FileReadyType::Read | Event::FileReadyType::Closed)) - .WillOnce(SaveArg<1>(&file_event_callback)); - EXPECT_CALL(io_handle, activateFileEvents(_)); + Network::IoSocketHandleImpl io_handle{42}; + EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle)).RetiresOnSaturation(); + EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle)).RetiresOnSaturation(); + Event::FileEvent* file_event = new NiceMock(); + EXPECT_CALL(dispatcher_, createFileEvent_(_, _, _, _)).WillOnce(Return(file_event)); Event::MockTimer* timeout = new Event::MockTimer(&dispatcher_); EXPECT_CALL(*timeout, enableTimer(std::chrono::milliseconds(15000), _)); listener_callbacks->onAccept(Network::ConnectionSocketPtr{accepted_socket}); - EXPECT_CALL(io_handle, recv(_, _, _)) - .WillOnce(Return(ByMove( - Api::IoCallUint64Result(max_size, Api::IoErrorPtr(nullptr, [](Api::IoError*) {}))))); - EXPECT_CALL(*test_filter, onData(_)) - .WillOnce(Invoke([](Network::ListenerFilterBuffer&) -> Network::FilterStatus { - return Network::FilterStatus::Continue; - })); - EXPECT_CALL(io_handle, resetFileEvents()); EXPECT_CALL(*test_filter, destroy_()); EXPECT_CALL(manager_, findFilterChain(_)).WillOnce(Return(nullptr)); EXPECT_CALL(*access_log_, log(_, _, _, _)); EXPECT_CALL(*timeout, disableTimer()); + listener_filter_cb->continueFilterChain(true); - file_event_callback(Event::FileReadyType::Read); - - EXPECT_CALL(io_handle, createFileEvent_(_, _, _, _)); + Event::FileEvent* file_event2 = new NiceMock(); + EXPECT_CALL(dispatcher_, createFileEvent_(_, _, _, _)).WillOnce(Return(file_event2)); EXPECT_CALL(*listener, onDestroy()); // Verify the file event created by listener filter was reset. If not