diff --git a/envoy/network/filter.h b/envoy/network/filter.h index ec6d14f9042a7..40cd310f828ef 100644 --- a/envoy/network/filter.h +++ b/envoy/network/filter.h @@ -271,20 +271,6 @@ class ListenerFilterCallbacks { */ virtual ConnectionSocket& socket() PURE; - /** - * @return the Dispatcher for issuing events. - */ - virtual Event::Dispatcher& dispatcher() PURE; - - /** - * If a filter stopped filter iteration by returning FilterStatus::StopIteration, - * the filter should call continueFilterChain(true) when complete to continue the filter chain, - * or continueFilterChain(false) if the filter execution failed and the connection must be - * closed. - * @param success boolean telling whether the filter execution was successful or not. - */ - virtual void continueFilterChain(bool success) PURE; - /** * @param name the namespace used in the metadata in reverse DNS format, for example: * envoy.test.my_filter. @@ -326,15 +312,21 @@ class ListenerFilter { /** * Called when a new connection is accepted, but before a Connection is created. - * Filter chain iteration can be stopped if needed. + * Filter chain iteration can be stopped if need more data from the connection + * by returning `FilterStatus::StopIteration`, or continue the filter chain iteration + * by returning `FilterStatus::ContinueIteration`. Reject the connection by closing + * the socket and returning `FilterStatus::StopIteration`. * @param cb the callbacks the filter instance can use to communicate with the filter chain. * @return status used by the filter manager to manage further filter iteration. */ virtual FilterStatus onAccept(ListenerFilterCallbacks& cb) PURE; /** - * Called when data read from the connection. If the filter chain doesn't get - * enough data, the filter chain can be stopped, then waiting for more data. + * Called when data is read from the connection. If the filter doesn't get + * enough data, filter chain iteration can be stopped if needed by returning + * `FilterStatus::StopIteration`. Or continue the filter chain iteration by returning + * `FilterStatus::ContinueIteration` if the filter get enough data. Reject the connection + * by closing the socket and returning `FilterStatus::StopIteration`. * @param buffer the buffer of data. * @return status used by the filter manager to manage further filter iteration. */ diff --git a/source/server/active_stream_listener_base.h b/source/server/active_stream_listener_base.h index 1786c809300ca..b88b0adb6e757 100644 --- a/source/server/active_stream_listener_base.h +++ b/source/server/active_stream_listener_base.h @@ -85,28 +85,28 @@ class ActiveStreamListenerBase : public ActiveListenerImplBase, void onSocketAccepted(std::unique_ptr active_socket) { // Create and run the filters if (config_->filterChainFactory().createListenerFilterChain(*active_socket)) { - active_socket->continueFilterChain(true); + active_socket->startFilterChain(); } else { // If create listener filter chain failed, it means the listener is missing // config due to the ECDS. Then close the connection directly. - active_socket->socket_->close(); - ASSERT(active_socket->iter_ == active_socket->accept_filters_.end()); + active_socket->socket().close(); + ASSERT(active_socket->isEndFilterIteration()); } // Move active_socket to the sockets_ list if filter iteration needs to continue later. // Otherwise we let active_socket be destructed when it goes out of scope. - if (active_socket->iter_ != active_socket->accept_filters_.end()) { + if (!active_socket->isEndFilterIteration()) { active_socket->startTimer(); LinkedList::moveIntoListBack(std::move(active_socket), sockets_); } else { - if (!active_socket->connected_) { + if (!active_socket->connected()) { // If active_socket is about to be destructed, emit logs if a connection is not created. - if (active_socket->stream_info_ != nullptr) { - emitLogs(*config_, *active_socket->stream_info_); + if (active_socket->streamInfo() != nullptr) { + emitLogs(*config_, *active_socket->streamInfo()); } else { // If the active_socket is not connected, this socket is not promoted to active // connection. Thus the stream_info_ is owned by this active socket. - ENVOY_BUG(active_socket->stream_info_ != nullptr, + ENVOY_BUG(active_socket->streamInfo() != nullptr, "the unconnected active socket must have stream info."); } } diff --git a/source/server/active_tcp_socket.cc b/source/server/active_tcp_socket.cc index c6804531cafee..b056692f42047 100644 --- a/source/server/active_tcp_socket.cc +++ b/source/server/active_tcp_socket.cc @@ -36,8 +36,6 @@ ActiveTcpSocket::~ActiveTcpSocket() { } } -Event::Dispatcher& ActiveTcpSocket::dispatcher() { return listener_.dispatcher(); } - void ActiveTcpSocket::onTimeout() { listener_.stats_.downstream_pre_cx_timeout_.inc(); ASSERT(inserted()); diff --git a/source/server/active_tcp_socket.h b/source/server/active_tcp_socket.h index 8e46aa7eba573..30ce0dda8f84b 100644 --- a/source/server/active_tcp_socket.h +++ b/source/server/active_tcp_socket.h @@ -24,11 +24,12 @@ class ActiveStreamListenerBase; /** * Wrapper for an active accepted socket owned by the active tcp listener. */ -struct ActiveTcpSocket : public Network::ListenerFilterManager, - public Network::ListenerFilterCallbacks, - LinkedObject, - public Event::DeferredDeletable, - Logger::Loggable { +class ActiveTcpSocket : public Network::ListenerFilterManager, + public Network::ListenerFilterCallbacks, + public LinkedObject, + public Event::DeferredDeletable, + Logger::Loggable { +public: ActiveTcpSocket(ActiveStreamListenerBase& listener, Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections); ~ActiveTcpSocket() override; @@ -83,8 +84,9 @@ struct ActiveTcpSocket : public Network::ListenerFilterManager, // Network::ListenerFilterCallbacks Network::ConnectionSocket& socket() override { return *socket_.get(); } - Event::Dispatcher& dispatcher() override; - void continueFilterChain(bool success) override; + + void startFilterChain() { continueFilterChain(true); } + void setDynamicMetadata(const std::string& name, const ProtobufWkt::Struct& value) override; envoy::config::core::v3::Metadata& dynamicMetadata() override { return stream_info_->dynamicMetadata(); @@ -92,8 +94,20 @@ struct ActiveTcpSocket : public Network::ListenerFilterManager, const envoy::config::core::v3::Metadata& dynamicMetadata() const override { return stream_info_->dynamicMetadata(); }; - StreamInfo::FilterState& filterState() override { return *stream_info_->filterState().get(); } + StreamInfo::StreamInfo* streamInfo() const { return stream_info_.get(); } + bool connected() const { return connected_; } + bool isEndFilterIteration() const { return iter_ == accept_filters_.end(); } + +private: + /** + * If a filter returned `FilterStatus::ContinueIteration`, `continueFilterChain(true)` + * should be called to continue the filter chain iteration. Or `continueFilterChain(false)` + * should be called if the filter returned `FilterStatus::StopIteration` and closed + * the socket. + * @param success boolean telling whether the filter execution was successful or not. + */ + void continueFilterChain(bool success); void createListenerFilterBuffer(); diff --git a/test/server/active_tcp_listener_test.cc b/test/server/active_tcp_listener_test.cc index 9cc116b0020f6..d2665c5417411 100644 --- a/test/server/active_tcp_listener_test.cc +++ b/test/server/active_tcp_listener_test.cc @@ -482,7 +482,7 @@ TEST_F(ActiveTcpListenerTest, PopulateSNIWhenActiveTcpSocketTimeout) { generic_active_listener_->sockets().front()->onTimeout(); EXPECT_EQ(server_name, - tcp_socket->stream_info_->downstreamAddressProvider().requestedServerName()); + tcp_socket->streamInfo()->downstreamAddressProvider().requestedServerName()); } // Verify that the server connection with recovered address is rebalanced at redirected listener. diff --git a/test/server/connection_handler_test.cc b/test/server/connection_handler_test.cc index 95eb6298e8d57..3f15b2ca35c56 100644 --- a/test/server/connection_handler_test.cc +++ b/test/server/connection_handler_test.cc @@ -35,6 +35,7 @@ #include "gtest/gtest.h" using testing::_; +using testing::ByMove; using testing::InSequence; using testing::Invoke; using testing::MockFunction; @@ -2085,39 +2086,50 @@ TEST_F(ConnectionHandlerTest, ListenerFilterTimeoutResetOnSuccess) { TestListener* test_listener = addListener(1, true, false, "test_listener", listener, &listener_callbacks); handler_->addListener(absl::nullopt, *test_listener, runtime_); - - Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(123); + size_t max_size = 10; + Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(max_size); EXPECT_CALL(factory_, createListenerFilterChain(_)) .WillRepeatedly(Invoke([&](Network::ListenerFilterManager& manager) -> bool { manager.addAcceptFilter(listener_filter_matcher_, Network::ListenerFilterPtr{test_filter}); return true; })); - Network::ListenerFilterCallbacks* listener_filter_cb{}; Network::MockConnectionSocket* accepted_socket = new NiceMock(); - std::string data = "test"; EXPECT_CALL(*test_filter, onAccept(_)) - .WillOnce(Invoke([&](Network::ListenerFilterCallbacks& cb) -> Network::FilterStatus { - listener_filter_cb = &cb; + .WillOnce(Invoke([](Network::ListenerFilterCallbacks&) -> Network::FilterStatus { return Network::FilterStatus::StopIteration; })); - Network::IoSocketHandleImpl io_handle{42}; - EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle)).RetiresOnSaturation(); - EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle)).RetiresOnSaturation(); - Event::FileEvent* file_event = new NiceMock(); - EXPECT_CALL(dispatcher_, createFileEvent_(_, _, _, _)).WillOnce(Return(file_event)); + Network::MockIoHandle io_handle; + EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle)); + EXPECT_CALL(io_handle, isOpen()).WillOnce(Return(true)); + EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle)); + + Event::FileReadyCb file_event_callback; + EXPECT_CALL(io_handle, + createFileEvent_(_, _, Event::PlatformDefaultTriggerType, + Event::FileReadyType::Read | Event::FileReadyType::Closed)) + .WillOnce(SaveArg<1>(&file_event_callback)); + EXPECT_CALL(io_handle, activateFileEvents(_)); Event::MockTimer* timeout = new Event::MockTimer(&dispatcher_); EXPECT_CALL(*timeout, enableTimer(std::chrono::milliseconds(15000), _)); listener_callbacks->onAccept(Network::ConnectionSocketPtr{accepted_socket}); + EXPECT_CALL(io_handle, recv(_, _, _)) + .WillOnce(Return(ByMove( + Api::IoCallUint64Result(max_size, Api::IoErrorPtr(nullptr, [](Api::IoError*) {}))))); + EXPECT_CALL(*test_filter, onData(_)) + .WillOnce(Invoke([](Network::ListenerFilterBuffer&) -> Network::FilterStatus { + return Network::FilterStatus::Continue; + })); + EXPECT_CALL(io_handle, resetFileEvents()); EXPECT_CALL(*test_filter, destroy_()); EXPECT_CALL(manager_, findFilterChain(_)).WillOnce(Return(nullptr)); EXPECT_CALL(*access_log_, log(_, _, _, _)); EXPECT_CALL(*timeout, disableTimer()); - listener_filter_cb->continueFilterChain(true); - Event::FileEvent* file_event2 = new NiceMock(); - EXPECT_CALL(dispatcher_, createFileEvent_(_, _, _, _)).WillOnce(Return(file_event2)); + file_event_callback(Event::FileReadyType::Read); + + EXPECT_CALL(io_handle, createFileEvent_(_, _, _, _)); EXPECT_CALL(*listener, onDestroy()); // Verify the file event created by listener filter was reset. If not