Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 9 additions & 17 deletions envoy/network/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,20 +271,6 @@ class ListenerFilterCallbacks {
*/
virtual ConnectionSocket& socket() PURE;

/**
* @return the Dispatcher for issuing events.
*/
virtual Event::Dispatcher& dispatcher() PURE;

/**
* If a filter stopped filter iteration by returning FilterStatus::StopIteration,
* the filter should call continueFilterChain(true) when complete to continue the filter chain,
* or continueFilterChain(false) if the filter execution failed and the connection must be
* closed.
* @param success boolean telling whether the filter execution was successful or not.
*/
virtual void continueFilterChain(bool success) PURE;

/**
* @param name the namespace used in the metadata in reverse DNS format, for example:
* envoy.test.my_filter.
Expand Down Expand Up @@ -326,15 +312,21 @@ class ListenerFilter {

/**
* Called when a new connection is accepted, but before a Connection is created.
* Filter chain iteration can be stopped if needed.
* Filter chain iteration can be stopped if need more data from the connection
* by returning `FilterStatus::StopIteration`, or continue the filter chain iteration
* by returning `FilterStatus::ContinueIteration`. Reject the connection by closing
* the socket and returning `FilterStatus::StopIteration`.
* @param cb the callbacks the filter instance can use to communicate with the filter chain.
* @return status used by the filter manager to manage further filter iteration.
*/
virtual FilterStatus onAccept(ListenerFilterCallbacks& cb) PURE;

/**
* Called when data read from the connection. If the filter chain doesn't get
* enough data, the filter chain can be stopped, then waiting for more data.
* Called when data read from the connection. If the filter doesn't get

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/data read/data is read

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thanks!

* enough data, filter chain iteration can be stopped if needed by returning
* `FilterStatus::StopIteration`. Or continue the filter chain iteration by returning
* `FilterStatus::ContinueIteration` if the filter get enough data. Reject the connection
* by closing the socket and returning `FilterStatus::StopIteration`.
* @param buffer the buffer of data.
* @return status used by the filter manager to manage further filter iteration.
*/
Expand Down
2 changes: 1 addition & 1 deletion source/server/active_stream_listener_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class ActiveStreamListenerBase : public ActiveListenerImplBase,
void onSocketAccepted(std::unique_ptr<ActiveTcpSocket> active_socket) {
// Create and run the filters
if (config_->filterChainFactory().createListenerFilterChain(*active_socket)) {
active_socket->continueFilterChain(true);
active_socket->startFilterChain();
} else {
// If create listener filter chain failed, it means the listener is missing
// config due to the ECDS. Then close the connection directly.
Expand Down
2 changes: 0 additions & 2 deletions source/server/active_tcp_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ ActiveTcpSocket::~ActiveTcpSocket() {
}
}

Event::Dispatcher& ActiveTcpSocket::dispatcher() { return listener_.dispatcher(); }

void ActiveTcpSocket::onTimeout() {
listener_.stats_.downstream_pre_cx_timeout_.inc();
ASSERT(inserted());
Expand Down
13 changes: 11 additions & 2 deletions source/server/active_tcp_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,17 @@ struct ActiveTcpSocket : public Network::ListenerFilterManager,

// Network::ListenerFilterCallbacks
Network::ConnectionSocket& socket() override { return *socket_.get(); }
Event::Dispatcher& dispatcher() override;
void continueFilterChain(bool success) override;

/**
* If a filter returned `FilterStatus::ContinueIteration`, `continueFilterChain(true)`
* should be called to continue the filter chain iteration. Or `continueFilterChain(false)`
* should be called if the filter returned `FilterStatus::StopIteration` and closed
* the socket.
* @param success boolean telling whether the filter execution was successful or not.
*/
void continueFilterChain(bool success);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this elsewhere since it's no longer part of the interface commented above and have it documented?

It seems like it's now split into either internal usage or external continueFilterChain(true) call that really just starts the filter chain. We could increase encapsulation splitting this out into startFilterChain() and continueFilterChain with the latter being private and startFilterChain just deferring to it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise comments in https://github.com/envoyproxy/envoy/pull/22732/files/e8563f3153394c2ffe8653ca9c9beef7e70dacc0#diff-6708370e112a5097f06e16009c3730b34f5ab0ee4f1e736f8816848409435888R122

and

virtual FilterStatus onAccept(ListenerFilterCallbacks& cb) PURE;

should be updated since the convention is changing e.g the filter doesn't call continue

@soulxu soulxu Aug 19, 2022

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this elsewhere since it's no longer part of the interface commented above and have it documented?

The ActiveTcpSocket is a struct, its all members are public now, I guess that is for convenient for ActiveTcpListener access. And ActiveTcpListener is only place ActiveTcpSocket, and probably we think ActiveTcpSocket is kind of internal struct for ActiveTcpListener(since its only alive short time, it will be deleted when the connection passed all the listener filters or the connection rejected by listener filter)

It seems like it's now split into either internal usage or external continueFilterChain(true) call that really just starts the filter chain. We could increase encapsulation splitting this out into startFilterChain() and continueFilterChain with the latter being private and startFilterChain just deferring to it.

Yea, that is good idea! The ActiveTcpListener is the place to call startFileChain(), and that make the code clear. Just as above said the whole ActiveTcpSocket are public, not sure I should make continueFilterChain as private. But let me know if this is still something you think should be changed.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise comments in https://github.com/envoyproxy/envoy/pull/22732/files/e8563f3153394c2ffe8653ca9c9beef7e70dacc0#diff-6708370e112a5097f06e16009c3730b34f5ab0ee4f1e736f8816848409435888R122

and

virtual FilterStatus onAccept(ListenerFilterCallbacks& cb) PURE;

should be updated since the convention is changing e.g the filter doesn't call continue

got it, will update the comments

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks this looks great! It makes a lot of sense that a lot of the class was open as you said, but it seems like it has grown in responsibility e.g. internally buffering listener data.

Having some methods private (continueFilterChain, createListenerFilterBuffer) would make the interface more compact / easier to grok.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, let me try to improve that.

void startFilterChain() { continueFilterChain(true); }

void setDynamicMetadata(const std::string& name, const ProtobufWkt::Struct& value) override;
envoy::config::core::v3::Metadata& dynamicMetadata() override {
return stream_info_->dynamicMetadata();
Expand Down
40 changes: 26 additions & 14 deletions test/server/connection_handler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "gtest/gtest.h"

using testing::_;
using testing::ByMove;
using testing::InSequence;
using testing::Invoke;
using testing::MockFunction;
Expand Down Expand Up @@ -2085,39 +2086,50 @@ TEST_F(ConnectionHandlerTest, ListenerFilterTimeoutResetOnSuccess) {
TestListener* test_listener =
addListener(1, true, false, "test_listener", listener, &listener_callbacks);
handler_->addListener(absl::nullopt, *test_listener, runtime_);

Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(123);
size_t max_size = 10;
Network::MockListenerFilter* test_filter = new Network::MockListenerFilter(max_size);
EXPECT_CALL(factory_, createListenerFilterChain(_))
.WillRepeatedly(Invoke([&](Network::ListenerFilterManager& manager) -> bool {
manager.addAcceptFilter(listener_filter_matcher_, Network::ListenerFilterPtr{test_filter});
return true;
}));
Network::ListenerFilterCallbacks* listener_filter_cb{};
Network::MockConnectionSocket* accepted_socket = new NiceMock<Network::MockConnectionSocket>();
std::string data = "test";
EXPECT_CALL(*test_filter, onAccept(_))
.WillOnce(Invoke([&](Network::ListenerFilterCallbacks& cb) -> Network::FilterStatus {
listener_filter_cb = &cb;
.WillOnce(Invoke([](Network::ListenerFilterCallbacks&) -> Network::FilterStatus {
return Network::FilterStatus::StopIteration;
}));
Network::IoSocketHandleImpl io_handle{42};
EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle)).RetiresOnSaturation();
EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle)).RetiresOnSaturation();
Event::FileEvent* file_event = new NiceMock<Event::MockFileEvent>();
EXPECT_CALL(dispatcher_, createFileEvent_(_, _, _, _)).WillOnce(Return(file_event));
Network::MockIoHandle io_handle;
EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle));
EXPECT_CALL(io_handle, isOpen()).WillOnce(Return(true));
EXPECT_CALL(*accepted_socket, ioHandle()).WillOnce(ReturnRef(io_handle));

Event::FileReadyCb file_event_callback;
EXPECT_CALL(io_handle,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
.WillOnce(SaveArg<1>(&file_event_callback));
EXPECT_CALL(io_handle, activateFileEvents(_));

Event::MockTimer* timeout = new Event::MockTimer(&dispatcher_);
EXPECT_CALL(*timeout, enableTimer(std::chrono::milliseconds(15000), _));
listener_callbacks->onAccept(Network::ConnectionSocketPtr{accepted_socket});

EXPECT_CALL(io_handle, recv(_, _, _))
.WillOnce(Return(ByMove(
Api::IoCallUint64Result(max_size, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})))));
EXPECT_CALL(*test_filter, onData(_))
.WillOnce(Invoke([](Network::ListenerFilterBuffer&) -> Network::FilterStatus {
return Network::FilterStatus::Continue;
}));
EXPECT_CALL(io_handle, resetFileEvents());
EXPECT_CALL(*test_filter, destroy_());
EXPECT_CALL(manager_, findFilterChain(_)).WillOnce(Return(nullptr));
EXPECT_CALL(*access_log_, log(_, _, _, _));
EXPECT_CALL(*timeout, disableTimer());
listener_filter_cb->continueFilterChain(true);

Event::FileEvent* file_event2 = new NiceMock<Event::MockFileEvent>();
EXPECT_CALL(dispatcher_, createFileEvent_(_, _, _, _)).WillOnce(Return(file_event2));
file_event_callback(Event::FileReadyType::Read);

EXPECT_CALL(io_handle, createFileEvent_(_, _, _, _));
EXPECT_CALL(*listener, onDestroy());

// Verify the file event created by listener filter was reset. If not
Expand Down