Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,9 @@ UpstreamDrainManager& Config::drainManager() {
return upstream_drain_manager_slot_->getTyped<UpstreamDrainManager>();
}

Filter::Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager,
TimeSource& time_source)
Filter::Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager, TimeSource&)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimeSource& is no longer needed in the signature then?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

: config_(config), cluster_manager_(cluster_manager), downstream_callbacks_(*this),
upstream_callbacks_(new UpstreamCallbacks(this)), stream_info_(time_source) {
upstream_callbacks_(new UpstreamCallbacks(this)) {
ASSERT(config != nullptr);
}

Expand Down Expand Up @@ -292,6 +291,11 @@ void Filter::readDisableDownstream(bool disable) {
}
}

StreamInfo::StreamInfo& Filter::getStreamInfo() {
ASSERT(read_callbacks_ != nullptr);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: del as this will crash in an obvious way on the next line.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.. Will remove

return read_callbacks_->connection().streamInfo();
}

void Filter::DownstreamCallbacks::onAboveWriteBufferHighWatermark() {
ASSERT(!on_high_watermark_called_);
on_high_watermark_called_ = true;
Expand Down
3 changes: 1 addition & 2 deletions source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ class Filter : public Network::ReadFilter,
bool on_high_watermark_called_{false};
};

virtual StreamInfo::StreamInfo& getStreamInfo() { return stream_info_; }
virtual StreamInfo::StreamInfo& getStreamInfo();

protected:
struct DownstreamCallbacks : public Network::ConnectionCallbacks {
Expand Down Expand Up @@ -354,7 +354,6 @@ class Filter : public Network::ReadFilter,
std::shared_ptr<UpstreamCallbacks> upstream_callbacks_; // shared_ptr required for passing as a
// read filter.
std::unique_ptr<GenericUpstream> upstream_;
StreamInfo::StreamInfoImpl stream_info_;
RouteConstSharedPtr route_;
Network::TransportSocketOptionsSharedPtr transport_socket_options_;
uint32_t connect_attempts_{};
Expand Down
48 changes: 27 additions & 21 deletions test/common/tcp_proxy/tcp_proxy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -953,8 +953,8 @@ class TcpProxyTest : public testing::Test {

NiceMock<Server::Configuration::MockFactoryContext> factory_context_;
ConfigSharedPtr config_;
std::unique_ptr<Filter> filter_;
NiceMock<Network::MockReadFilterCallbacks> filter_callbacks_;
std::unique_ptr<Filter> filter_;
std::vector<std::shared_ptr<NiceMock<Upstream::MockHost>>> upstream_hosts_{};
std::vector<std::unique_ptr<NiceMock<Network::MockClientConnection>>> upstream_connections_{};
std::vector<std::unique_ptr<NiceMock<Tcp::ConnectionPool::MockConnectionData>>>
Expand Down Expand Up @@ -1746,6 +1746,23 @@ TEST_F(TcpProxyTest, ShareFilterState) {
.value());
}

// Tests that filter callback can access downstream and upstream address and ssl properties.
TEST_F(TcpProxyTest, AccessDownstreamAndUpstreamProperties) {
setup(1);

raiseEventUpstreamConnected(0);
EXPECT_EQ(filter_callbacks_.connection().streamInfo().downstreamLocalAddress(),
filter_callbacks_.connection().localAddress());
EXPECT_EQ(filter_callbacks_.connection().streamInfo().downstreamRemoteAddress(),
filter_callbacks_.connection().remoteAddress());
EXPECT_EQ(filter_callbacks_.connection().streamInfo().downstreamSslConnection(),
filter_callbacks_.connection().ssl());
EXPECT_EQ(filter_callbacks_.connection().streamInfo().upstreamLocalAddress(),
upstream_connections_.at(0)->localAddress());
EXPECT_EQ(filter_callbacks_.connection().streamInfo().upstreamSslConnection(),
upstream_connections_.at(0)->streamInfo().downstreamSslConnection());
}

class TcpProxyRoutingTest : public testing::Test {
public:
TcpProxyRoutingTest() = default;
Expand Down Expand Up @@ -1826,13 +1843,10 @@ TEST_F(TcpProxyRoutingTest, DEPRECATED_FEATURE_TEST(UseClusterFromPerConnectionC
setup();
initializeFilter();

NiceMock<StreamInfo::MockStreamInfo> stream_info;
stream_info.filterState()->setData("envoy.tcp_proxy.cluster",
std::make_unique<PerConnectionCluster>("filter_state_cluster"),
StreamInfo::FilterState::StateType::Mutable,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);
ON_CALL(connection_, streamInfo()).WillByDefault(ReturnRef(stream_info));
EXPECT_CALL(Const(connection_), streamInfo()).WillRepeatedly(ReturnRef(stream_info));
connection_.streamInfo().filterState()->setData(
"envoy.tcp_proxy.cluster", std::make_unique<PerConnectionCluster>("filter_state_cluster"),
StreamInfo::FilterState::StateType::Mutable,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);

// Expect filter to try to open a connection to specified cluster.
EXPECT_CALL(factory_context_.cluster_manager_,
Expand All @@ -1847,14 +1861,10 @@ TEST_F(TcpProxyRoutingTest, DEPRECATED_FEATURE_TEST(UpstreamServerName)) {
setup();
initializeFilter();

NiceMock<StreamInfo::MockStreamInfo> stream_info;
stream_info.filterState()->setData("envoy.network.upstream_server_name",
std::make_unique<UpstreamServerName>("www.example.com"),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);

ON_CALL(connection_, streamInfo()).WillByDefault(ReturnRef(stream_info));
EXPECT_CALL(Const(connection_), streamInfo()).WillRepeatedly(ReturnRef(stream_info));
connection_.streamInfo().filterState()->setData(
"envoy.network.upstream_server_name", std::make_unique<UpstreamServerName>("www.example.com"),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);

// Expect filter to try to open a connection to a cluster with the transport socket options with
// override-server-name
Expand Down Expand Up @@ -1882,16 +1892,12 @@ TEST_F(TcpProxyRoutingTest, DEPRECATED_FEATURE_TEST(ApplicationProtocols)) {
setup();
initializeFilter();

NiceMock<StreamInfo::MockStreamInfo> stream_info;
stream_info.filterState()->setData(
connection_.streamInfo().filterState()->setData(
Network::ApplicationProtocols::key(),
std::make_unique<Network::ApplicationProtocols>(std::vector<std::string>{"foo", "bar"}),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);

ON_CALL(connection_, streamInfo()).WillByDefault(ReturnRef(stream_info));
EXPECT_CALL(Const(connection_), streamInfo()).WillRepeatedly(ReturnRef(stream_info));

// Expect filter to try to open a connection to a cluster with the transport socket options with
// override-application-protocol
EXPECT_CALL(factory_context_.cluster_manager_, tcpConnPoolForCluster(_, _, _))
Expand Down
1 change: 1 addition & 0 deletions test/mocks/stream_info/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ envoy_cc_mock(
"//include/envoy/stream_info:stream_info_interface",
"//include/envoy/upstream:upstream_interface",
"//test/mocks/upstream:host_mocks",
"//test/test_common:simulated_time_system_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)
26 changes: 24 additions & 2 deletions test/mocks/stream_info/mocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@ namespace Envoy {
namespace StreamInfo {

MockStreamInfo::MockStreamInfo()
: filter_state_(std::make_shared<FilterStateImpl>(FilterState::LifeSpan::FilterChain)),
: start_time_(ts_.systemTime()),
filter_state_(std::make_shared<FilterStateImpl>(FilterState::LifeSpan::FilterChain)),
downstream_local_address_(new Network::Address::Ipv4Instance("127.0.0.2")),
downstream_direct_remote_address_(new Network::Address::Ipv4Instance("127.0.0.1")),
downstream_remote_address_(new Network::Address::Ipv4Instance("127.0.0.1")) {
ON_CALL(*this, upstreamHost()).WillByDefault(ReturnPointee(&host_));
ON_CALL(*this, setResponseFlag(_)).WillByDefault(Invoke([this](ResponseFlag response_flag) {
response_flags_ |= response_flag;
}));
ON_CALL(*this, onUpstreamHostSelected(_))
.WillByDefault(
Invoke([this](Upstream::HostDescriptionConstSharedPtr host) { upstream_host_ = host; }));
ON_CALL(*this, startTime()).WillByDefault(ReturnPointee(&start_time_));
ON_CALL(*this, startTimeMonotonic()).WillByDefault(ReturnPointee(&start_time_monotonic_));
ON_CALL(*this, lastDownstreamRxByteReceived())
Expand All @@ -37,6 +43,11 @@ MockStreamInfo::MockStreamInfo()
ON_CALL(*this, lastDownstreamTxByteSent())
.WillByDefault(ReturnPointee(&last_downstream_tx_byte_sent_));
ON_CALL(*this, requestComplete()).WillByDefault(ReturnPointee(&end_time_));
ON_CALL(*this, onRequestComplete()).WillByDefault(Invoke([this]() {
end_time_ = absl::make_optional<std::chrono::nanoseconds>(
std::chrono::duration_cast<std::chrono::nanoseconds>(ts_.systemTime() - start_time_)
.count());
}));
ON_CALL(*this, setUpstreamLocalAddress(_))
.WillByDefault(
Invoke([this](const Network::Address::InstanceConstSharedPtr& upstream_local_address) {
Expand Down Expand Up @@ -85,6 +96,17 @@ MockStreamInfo::MockStreamInfo()
bytes_sent_ += bytes_sent;
}));
ON_CALL(*this, bytesSent()).WillByDefault(ReturnPointee(&bytes_sent_));
ON_CALL(*this, hasResponseFlag(_)).WillByDefault(Invoke([this](ResponseFlag flag) {
return response_flags_ & flag;
}));
ON_CALL(*this, upstreamHost()).WillByDefault(Invoke([this]() {
if (upstream_host_) {
return upstream_host_;
}
ReturnPointee(&host_);
// Call should not reach here and is just to make compiler happy.
return upstream_host_;
}));
ON_CALL(*this, dynamicMetadata()).WillByDefault(ReturnRef(metadata_));
ON_CALL(Const(*this), dynamicMetadata()).WillByDefault(ReturnRef(metadata_));
ON_CALL(*this, filterState()).WillByDefault(ReturnRef(filter_state_));
Expand Down
4 changes: 4 additions & 0 deletions test/mocks/stream_info/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "common/stream_info/filter_state_impl.h"

#include "test/mocks/upstream/host.h"
#include "test/test_common/simulated_time_system.h"

#include "gmock/gmock.h"

Expand Down Expand Up @@ -91,6 +92,8 @@ class MockStreamInfo : public StreamInfo {

std::shared_ptr<testing::NiceMock<Upstream::MockHostDescription>> host_{
new testing::NiceMock<Upstream::MockHostDescription>()};
Upstream::HostDescriptionConstSharedPtr upstream_host_{};
Envoy::Event::SimulatedTimeSystem ts_;
SystemTime start_time_;
MonotonicTime start_time_monotonic_;
absl::optional<std::chrono::nanoseconds> last_downstream_rx_byte_received_;
Expand All @@ -104,6 +107,7 @@ class MockStreamInfo : public StreamInfo {
absl::optional<Http::Protocol> protocol_;
absl::optional<uint32_t> response_code_;
absl::optional<std::string> response_code_details_;
uint64_t response_flags_{};
envoy::config::core::v3::Metadata metadata_;
FilterStateSharedPtr upstream_filter_state_;
FilterStateSharedPtr filter_state_;
Expand Down