Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion envoy/http/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class Callbacks {
* @param protocol supplies the protocol associated with the stream, or absl::nullopt for raw TCP.
*/
virtual void onPoolReady(RequestEncoder& encoder, Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info,
StreamInfo::StreamInfo& info,
absl::optional<Http::Protocol> protocol) PURE;
};

Expand Down
2 changes: 1 addition & 1 deletion envoy/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,7 @@ class GenericConnectionPoolCallbacks {
virtual void onPoolReady(std::unique_ptr<GenericUpstream>&& upstream,
Upstream::HostDescriptionConstSharedPtr host,
const Network::Address::InstanceConstSharedPtr& upstream_local_address,
const StreamInfo::StreamInfo& info,
StreamInfo::StreamInfo& info,
absl::optional<Http::Protocol> protocol) PURE;

// @return the UpstreamToDownstream interface for this stream.
Expand Down
9 changes: 9 additions & 0 deletions envoy/stream_info/stream_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,15 @@ class StreamInfo {
downstream_info.setUpstreamBytesMeter(upstream_info.getUpstreamBytesMeter());
upstream_info.setDownstreamBytesMeter(downstream_info.getDownstreamBytesMeter());
}

/**
* Filter State object to be shared between upstream and downstream filters.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we end up going this route we're going to want to be awfully clear that this is downstream L7 filters and upstream L4 filters.

I also wonder if instead of adding another set of accessors, if there would be a way to tag information that's added as "information that should be communicated to upstream stream info" WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is more confusing than helpful. I think an explicit export is probably better (either in the filter state itself or in the transport socket).

* This is set for upstream connections.
* @param pointer to downstream connections filter state.
* @return pointer to filter state to be used by downstream connections.
*/
virtual const FilterStateSharedPtr& downstreamFilterState() const PURE;
virtual void setDownstreamFilterState(const FilterStateSharedPtr& filter_state) PURE;
};

} // namespace StreamInfo
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/codec_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class CodecClient : protected Logger::Loggable<Logger::Id::client>,
CodecType type() const { return type_; }

// Note this is the L4 stream info, not L7.
const StreamInfo::StreamInfo& streamInfo() { return connection_->streamInfo(); }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your PR description says
" Set downstream filter state on upstream stream info."
but you're setting downstream filter state on the upstream connection stream info.

Matching downstream L7 streams with upstream L4 connections seems like it's going to be problematic. Is that what you're intending to do? I would think that we'd want to match stream to stream, in case multiple L7 streams went out over one virtual connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, this is a very good point. I think this PR needs more thought actually. L4 connections from downstreams are not strictly one-to-one with L4 upstream connections so it's not right to use either L4 or L7 filter state here. I tend to think that the import of filter state must also be explicit like in #19435. That way the hash policy determines the grouping of downstream L4/L7 stream filter states.

StreamInfo::StreamInfo& streamInfo() { return connection_->streamInfo(); }

protected:
/**
Expand Down
4 changes: 2 additions & 2 deletions source/common/http/conn_pool_grid.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ ConnectivityGrid::StreamCreationResult ConnectivityGrid::WrapperCallbacks::newSt

void ConnectivityGrid::WrapperCallbacks::onConnectionAttemptReady(
ConnectionAttemptCallbacks* attempt, RequestEncoder& encoder,
Upstream::HostDescriptionConstSharedPtr host, const StreamInfo::StreamInfo& info,
Upstream::HostDescriptionConstSharedPtr host, StreamInfo::StreamInfo& info,
absl::optional<Http::Protocol> protocol) {
ENVOY_LOG(trace, "{} pool successfully connected to host '{}'.", describePool(attempt->pool()),
host->hostname());
Expand Down Expand Up @@ -137,7 +137,7 @@ void ConnectivityGrid::WrapperCallbacks::maybeMarkHttp3Broken() {

void ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::onPoolReady(
RequestEncoder& encoder, Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol) {
StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol) {
cancellable_ = nullptr; // Attempt succeeded and can no longer be cancelled.
parent_.onConnectionAttemptReady(this, encoder, host, info, protocol);
}
Expand Down
4 changes: 2 additions & 2 deletions source/common/http/conn_pool_grid.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class ConnectivityGrid : public ConnectionPool::Instance,
absl::string_view transport_failure_reason,
Upstream::HostDescriptionConstSharedPtr host) override;
void onPoolReady(RequestEncoder& encoder, Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info,
StreamInfo::StreamInfo& info,
absl::optional<Http::Protocol> protocol) override;

ConnectionPool::Instance& pool() { return **pool_it_; }
Expand Down Expand Up @@ -94,7 +94,7 @@ class ConnectivityGrid : public ConnectionPool::Instance,
// Called by a ConnectionAttempt when the underlying pool is ready.
void onConnectionAttemptReady(ConnectionAttemptCallbacks* attempt, RequestEncoder& encoder,
Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info,
StreamInfo::StreamInfo& info,
absl::optional<Http::Protocol> protocol);

private:
Expand Down
11 changes: 6 additions & 5 deletions source/common/router/upstream_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ void UpstreamRequest::onPoolFailure(ConnectionPool::PoolFailureReason reason,
void UpstreamRequest::onPoolReady(
std::unique_ptr<GenericUpstream>&& upstream, Upstream::HostDescriptionConstSharedPtr host,
const Network::Address::InstanceConstSharedPtr& upstream_local_address,
const StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol) {
StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol) {
// This may be called under an existing ScopeTrackerScopeState but it will unwind correctly.
ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
ENVOY_STREAM_LOG(debug, "pool ready", *parent_.callbacks());
Expand All @@ -437,18 +437,19 @@ void UpstreamRequest::onPoolReady(

StreamInfo::UpstreamInfo& upstream_info = *stream_info_.upstreamInfo();
parent_.callbacks()->streamInfo().setUpstreamInfo(stream_info_.upstreamInfo());
if (info.upstreamInfo().has_value()) {
auto& upstream_timing = info.upstreamInfo().value().get().upstreamTiming();
if (info.upstreamInfo()) {
auto& upstream_timing = info.upstreamInfo()->upstreamTiming();
upstreamTiming().upstream_connect_start_ = upstream_timing.upstream_connect_start_;
upstreamTiming().upstream_connect_complete_ = upstream_timing.upstream_connect_complete_;
upstreamTiming().upstream_handshake_complete_ = upstream_timing.upstream_handshake_complete_;
upstream_info.setUpstreamNumStreams(info.upstreamInfo().value().get().upstreamNumStreams());
upstream_info.setUpstreamNumStreams(info.upstreamInfo()->upstreamNumStreams());
}

upstream_info.setUpstreamFilterState(std::make_shared<StreamInfo::FilterStateImpl>(
info.filterState().parent()->parent(), StreamInfo::FilterState::LifeSpan::Request));
info.filterState()->parent()->parent(), StreamInfo::FilterState::LifeSpan::Request));
upstream_info.setUpstreamLocalAddress(upstream_local_address);
upstream_info.setUpstreamSslConnection(info.downstreamAddressProvider().sslConnection());
info.setDownstreamFilterState(parent_.callbacks()->streamInfo().filterState());

if (info.downstreamAddressProvider().connectionID().has_value()) {
upstream_info.setUpstreamConnectionId(info.downstreamAddressProvider().connectionID().value());
Expand Down
3 changes: 1 addition & 2 deletions source/common/router/upstream_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
void onPoolReady(std::unique_ptr<GenericUpstream>&& upstream,
Upstream::HostDescriptionConstSharedPtr host,
const Network::Address::InstanceConstSharedPtr& upstream_local_address,
const StreamInfo::StreamInfo& info,
absl::optional<Http::Protocol> protocol) override;
StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol) override;
UpstreamToDownstream& upstreamToDownstream() override { return *this; }

void clearRequestEncoder();
Expand Down
8 changes: 8 additions & 0 deletions source/common/stream_info/stream_info_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,13 @@ struct StreamInfoImpl : public StreamInfo {
ASSERT(downstream_bytes_meter_.get() == downstream_bytes_meter.get());
}

const FilterStateSharedPtr& downstreamFilterState() const override {
return downstream_filter_state_;
}
void setDownstreamFilterState(const FilterStateSharedPtr& filter_state) override {
downstream_filter_state_ = filter_state;
}

TimeSource& time_source_;
const SystemTime start_time_;
const MonotonicTime start_time_monotonic_;
Expand Down Expand Up @@ -360,6 +367,7 @@ struct StreamInfoImpl : public StreamInfo {
// Default construct the object because upstream stream is not constructed in some cases.
BytesMeterSharedPtr upstream_bytes_meter_{std::make_shared<BytesMeter>()};
BytesMeterSharedPtr downstream_bytes_meter_;
FilterStateSharedPtr downstream_filter_state_;
};

} // namespace StreamInfo
Expand Down
1 change: 1 addition & 0 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ void Filter::onGenericPoolReady(StreamInfo::StreamInfo* info,
read_callbacks_->continueReading();
if (info) {
upstream_info.setUpstreamFilterState(info->filterState());
info->setDownstreamFilterState(getStreamInfo().filterState());
}
}

Expand Down
2 changes: 1 addition & 1 deletion source/common/tcp_proxy/upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ void HttpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason, absl:

void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder,
Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info, absl::optional<Http::Protocol>) {
StreamInfo::StreamInfo& info, absl::optional<Http::Protocol>) {
upstream_handle_ = nullptr;
upstream_->setRequestEncoder(request_encoder,
host->transportSocketFactory().implementsSecureTransport());
Expand Down
2 changes: 1 addition & 1 deletion source/common/tcp_proxy/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callba
absl::string_view transport_failure_reason,
Upstream::HostDescriptionConstSharedPtr host) override;
void onPoolReady(Http::RequestEncoder& request_encoder,
Upstream::HostDescriptionConstSharedPtr host, const StreamInfo::StreamInfo& info,
Upstream::HostDescriptionConstSharedPtr host, StreamInfo::StreamInfo& info,
absl::optional<Http::Protocol>) override;

class Callbacks {
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/upstreams/http/http/upstream_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void HttpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason,

void HttpConnPool::onPoolReady(Envoy::Http::RequestEncoder& request_encoder,
Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info,
StreamInfo::StreamInfo& info,
absl::optional<Envoy::Http::Protocol> protocol) {
conn_pool_stream_handle_ = nullptr;
auto upstream =
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/upstreams/http/http/upstream_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class HttpConnPool : public Router::GenericConnPool, public Envoy::Http::Connect
absl::string_view transport_failure_reason,
Upstream::HostDescriptionConstSharedPtr host) override;
void onPoolReady(Envoy::Http::RequestEncoder& callbacks_encoder,
Upstream::HostDescriptionConstSharedPtr host, const StreamInfo::StreamInfo& info,
Upstream::HostDescriptionConstSharedPtr host, StreamInfo::StreamInfo& info,
absl::optional<Envoy::Http::Protocol> protocol) override;
Upstream::HostDescriptionConstSharedPtr host() const override {
return pool_data_.value().host();
Expand Down
2 changes: 1 addition & 1 deletion test/common/http/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class CodecClientForTest : public Http::CodecClient {
*/
struct ConnPoolCallbacks : public Http::ConnectionPool::Callbacks {
void onPoolReady(Http::RequestEncoder& encoder, Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo&, absl::optional<Http::Protocol>) override {
StreamInfo::StreamInfo&, absl::optional<Http::Protocol>) override {
outer_encoder_ = &encoder;
host_ = host;
pool_ready_.ready();
Expand Down
5 changes: 5 additions & 0 deletions test/common/router/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5016,6 +5016,10 @@ TEST_F(RouterTest, PropagatesUpstreamFilterState) {
upstream_stream_info_.filterState()->setData(
"upstream data", std::make_unique<StreamInfo::UInt32AccessorImpl>(123),
StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::Connection);
callbacks_.streamInfo().filterState()->setData(
"downstream data", std::make_unique<StreamInfo::UInt32AccessorImpl>(456),
StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::Connection);

expectResponseTimerCreate();
EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _))
.WillOnce(Invoke(
Expand All @@ -5040,6 +5044,7 @@ TEST_F(RouterTest, PropagatesUpstreamFilterState) {
EXPECT_TRUE(filter_state_verified);
EXPECT_TRUE(callbacks_.streamInfo().upstreamInfo()->upstreamFilterState()->hasDataWithName(
"upstream data"));
EXPECT_TRUE(upstream_stream_info_.downstreamFilterState()->hasDataWithName("downstream data"));
}

TEST_F(RouterTest, UpstreamSSLConnection) {
Expand Down
3 changes: 3 additions & 0 deletions test/common/stream_info/stream_info_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ TEST_F(StreamInfoImplTest, MiscSettersAndGetters) {
->upstreamFilterState()
->getDataReadOnly<TestIntAccessor>("test")
.access());
stream_info.setDownstreamFilterState(stream_info.filterState());
EXPECT_EQ(
1, stream_info.downstreamFilterState()->getDataReadOnly<TestIntAccessor>("test").access());

EXPECT_EQ(absl::nullopt, stream_info.upstreamClusterInfo());
Upstream::ClusterInfoConstSharedPtr cluster_info(new NiceMock<Upstream::MockClusterInfo>());
Expand Down
9 changes: 9 additions & 0 deletions test/common/tcp_proxy/tcp_proxy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,15 @@ TEST_F(TcpProxyTest, ShareFilterState) {
->upstreamFilterState()
->getDataReadOnly<PerConnectionCluster>("envoy.tcp_proxy.cluster")
.value());

filter_callbacks_.connection_.streamInfo().filterState()->setData(
"test_state", std::make_unique<PerConnectionCluster>("test_value"),
StreamInfo::FilterState::StateType::Mutable, StreamInfo::FilterState::LifeSpan::Connection);
EXPECT_EQ("test_value", upstream_connections_.at(0)
->streamInfo()
.downstreamFilterState()
->getDataReadOnly<PerConnectionCluster>("test_state")
.value());
}

// Tests that filter callback can access downstream and upstream address and ssl properties.
Expand Down
2 changes: 1 addition & 1 deletion test/integration/upstreams/per_host_upstream_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class PerHostHttpConnPool : public Extensions::Upstreams::Http::Http::HttpConnPo
: HttpConnPool(thread_local_cluster, is_connect, route_entry, downstream_protocol, ctx) {}

void onPoolReady(Envoy::Http::RequestEncoder& callbacks_encoder,
Upstream::HostDescriptionConstSharedPtr host, const StreamInfo::StreamInfo& info,
Upstream::HostDescriptionConstSharedPtr host, StreamInfo::StreamInfo& info,
absl::optional<Http::Protocol> protocol) override {
conn_pool_stream_handle_ = nullptr;
auto upstream = std::make_unique<PerHostHttpUpstream>(callbacks_->upstreamToDownstream(),
Expand Down
2 changes: 1 addition & 1 deletion test/mocks/http/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class MockCallbacks : public Callbacks {
Upstream::HostDescriptionConstSharedPtr host));
MOCK_METHOD(void, onPoolReady,
(RequestEncoder & encoder, Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol));
StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol));
};

class MockInstance : public Instance {
Expand Down
2 changes: 1 addition & 1 deletion test/mocks/router/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ class MockGenericConnectionPoolCallbacks : public GenericConnectionPoolCallbacks
(std::unique_ptr<GenericUpstream> && upstream,
Upstream::HostDescriptionConstSharedPtr host,
const Network::Address::InstanceConstSharedPtr& upstream_local_address,
const StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol));
StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol));
MOCK_METHOD(UpstreamToDownstream&, upstreamToDownstream, ());

NiceMock<MockUpstreamToDownstream> upstream_to_downstream_;
Expand Down
5 changes: 5 additions & 0 deletions test/mocks/stream_info/mocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ MockStreamInfo::MockStreamInfo()
.WillByDefault(Invoke([this](const BytesMeterSharedPtr& downstream_bytes_meter) {
downstream_bytes_meter_ = downstream_bytes_meter;
}));
ON_CALL(*this, downstreamFilterState()).WillByDefault(ReturnRef(downstream_filter_state_));
ON_CALL(*this, setDownstreamFilterState(_))
.WillByDefault(Invoke([this](const FilterStateSharedPtr& filter_state) {
downstream_filter_state_ = filter_state;
}));
}

MockStreamInfo::~MockStreamInfo() = default;
Expand Down
3 changes: 3 additions & 0 deletions test/mocks/stream_info/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class MockStreamInfo : public StreamInfo {
MOCK_METHOD(const BytesMeterSharedPtr&, getDownstreamBytesMeter, (), (const));
MOCK_METHOD(void, setUpstreamBytesMeter, (const BytesMeterSharedPtr&));
MOCK_METHOD(void, setDownstreamBytesMeter, (const BytesMeterSharedPtr&));
MOCK_METHOD(const FilterStateSharedPtr&, downstreamFilterState, (), (const));
MOCK_METHOD(void, setDownstreamFilterState, (const FilterStateSharedPtr&));
Envoy::Event::SimulatedTimeSystem ts_;
SystemTime start_time_;
MonotonicTime start_time_monotonic_;
Expand All @@ -112,6 +114,7 @@ class MockStreamInfo : public StreamInfo {
absl::optional<uint32_t> attempt_count_;
absl::optional<std::string> virtual_cluster_name_;
DownstreamTiming downstream_timing_;
FilterStateSharedPtr downstream_filter_state_;
};

} // namespace StreamInfo
Expand Down