Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ minor_behavior_changes:
- area: http
change: |
changed the filter callback interfaces to make sure that downstream-only functionality is explicit.
change: |
the upstream remote address is now available to downstream filters via the ``upstreamRemoteAddress`` function.
- area: stats
change: |
Default tag extraction rules were changed for ``worker_id`` extraction. Previously, ``worker_`` was removed from the original name during the extraction. This
Expand Down
6 changes: 3 additions & 3 deletions envoy/http/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,10 @@ class Stream : public StreamResetHandler {
virtual absl::string_view responseDetails() { return ""; }

/**
* @return const Address::InstanceConstSharedPtr& the local address of the connection associated
* with the stream.
* @return const Network::ConnectionInfoProvider& the adderess provider of the connection
* associated with the stream.
*/
virtual const Network::Address::InstanceConstSharedPtr& connectionLocalAddress() PURE;
virtual const Network::ConnectionInfoProvider& connectionInfoProvider() PURE;

/**
* Set the flush timeout for the stream. At the codec level this is used to bound the amount of
Expand Down
4 changes: 2 additions & 2 deletions envoy/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -1400,13 +1400,13 @@ class GenericConnectionPoolCallbacks {
* @param upstream supplies the generic upstream for the stream.
* @param host supplies the description of the host that will carry the request. For logical
* connection pools the description may be different each time this is called.
* @param upstream_local_address supplies the local address of the upstream connection.
* @param connection_info_provider, supplies the address provider of the upstream connection.
* @param info supplies the stream info object associated with the upstream connection.
* @param protocol supplies the protocol associated with the upstream connection.
*/
virtual void onPoolReady(std::unique_ptr<GenericUpstream>&& upstream,
Upstream::HostDescriptionConstSharedPtr host,
const Network::Address::InstanceConstSharedPtr& upstream_local_address,
const Network::ConnectionInfoProvider& connection_info_provider,
StreamInfo::StreamInfo& info,
absl::optional<Http::Protocol> protocol) PURE;

Expand Down
11 changes: 11 additions & 0 deletions envoy/stream_info/stream_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,17 @@ class UpstreamInfo {
*/
virtual const Network::Address::InstanceConstSharedPtr& upstreamLocalAddress() const PURE;

/**
* @param upstream_remote_address sets the remote address of the upstream connection.
*/
virtual void setUpstreamRemoteAddress(
const Network::Address::InstanceConstSharedPtr& upstream_remote_address) PURE;

/**
* @return the upstream remote address.
*/
virtual const Network::Address::InstanceConstSharedPtr& upstreamRemoteAddress() const PURE;

/**
* @param failure_reason the upstream transport failure reason.
*/
Expand Down
12 changes: 6 additions & 6 deletions envoy/tcp/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ class GenericConnectionPoolCallbacks {
* @param info supplies the stream info object associated with the upstream connection.
* @param upstream supplies the generic upstream for the stream.
* @param host supplies the description of the host that will carry the request.
* @param upstream_local_address supplies the local address of the upstream connection.
* @param address_provider supplies the address provider of the upstream connection.
* @param ssl_info supplies the ssl information of the upstream connection.
*/
virtual void
onGenericPoolReady(StreamInfo::StreamInfo* info, std::unique_ptr<GenericUpstream>&& upstream,
Upstream::HostDescriptionConstSharedPtr& host,
const Network::Address::InstanceConstSharedPtr& upstream_local_address,
Ssl::ConnectionInfoConstSharedPtr ssl_info) PURE;
virtual void onGenericPoolReady(StreamInfo::StreamInfo* info,
std::unique_ptr<GenericUpstream>&& upstream,
Upstream::HostDescriptionConstSharedPtr& host,
const Network::ConnectionInfoProvider& address_provider,
Ssl::ConnectionInfoConstSharedPtr ssl_info) PURE;

/**
* Called to indicate a failure for GenericConnPool::newStream to establish a stream.
Expand Down
4 changes: 2 additions & 2 deletions source/common/http/http1/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ void StreamEncoderImpl::readDisable(bool disable) {

uint32_t StreamEncoderImpl::bufferLimit() const { return connection_.bufferLimit(); }

const Network::Address::InstanceConstSharedPtr& StreamEncoderImpl::connectionLocalAddress() {
return connection_.connection().connectionInfoProvider().localAddress();
const Network::ConnectionInfoProvider& StreamEncoderImpl::connectionInfoProvider() {
return connection_.connection().connectionInfoProvider();
}

static constexpr absl::string_view RESPONSE_PREFIX = "HTTP/1.1 ";
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/http1/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class StreamEncoderImpl : public virtual StreamEncoder,
void readDisable(bool disable) override;
uint32_t bufferLimit() const override;
absl::string_view responseDetails() override { return details_; }
const Network::Address::InstanceConstSharedPtr& connectionLocalAddress() override;
const Network::ConnectionInfoProvider& connectionInfoProvider() override;
void setFlushTimeout(std::chrono::milliseconds) override {
// HTTP/1 has one stream per connection, thus any data encoded is immediately written to the
// connection, invoking any watermarks as necessary. There is no internal buffering that would
Expand Down
4 changes: 2 additions & 2 deletions source/common/http/http2/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ class ConnectionImpl : public virtual Connection,
void resetStream(StreamResetReason reason) override;
void readDisable(bool disable) override;
uint32_t bufferLimit() const override { return pending_recv_data_->highWatermark(); }
const Network::Address::InstanceConstSharedPtr& connectionLocalAddress() override {
return parent_.connection_.connectionInfoProvider().localAddress();
const Network::ConnectionInfoProvider& connectionInfoProvider() override {
return parent_.connection_.connectionInfoProvider();
}
absl::string_view responseDetails() override { return details_; }
void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override;
Expand Down
4 changes: 2 additions & 2 deletions source/common/quic/envoy_quic_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ class EnvoyQuicStream : public virtual Http::StreamEncoder,
removeCallbacksHelper(callbacks);
}
uint32_t bufferLimit() const override { return send_buffer_simulation_.highWatermark(); }
const Network::Address::InstanceConstSharedPtr& connectionLocalAddress() override {
return connection()->connectionInfoProvider().localAddress();
const Network::ConnectionInfoProvider& connectionInfoProvider() override {
return connection()->connectionInfoProvider();
}

void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override {
Expand Down
12 changes: 7 additions & 5 deletions source/common/router/upstream_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -710,10 +710,11 @@ void UpstreamRequest::onPoolFailure(ConnectionPool::PoolFailureReason reason,
onResetStream(reset_reason, transport_failure_reason);
}

void UpstreamRequest::onPoolReady(
std::unique_ptr<GenericUpstream>&& upstream, Upstream::HostDescriptionConstSharedPtr host,
const Network::Address::InstanceConstSharedPtr& upstream_local_address,
StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol) {
void UpstreamRequest::onPoolReady(std::unique_ptr<GenericUpstream>&& upstream,
Upstream::HostDescriptionConstSharedPtr host,
const Network::ConnectionInfoProvider& address_provider,
StreamInfo::StreamInfo& info,
absl::optional<Http::Protocol> protocol) {
// This may be called under an existing ScopeTrackerScopeState but it will unwind correctly.
ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher());
ENVOY_STREAM_LOG(debug, "pool ready", *parent_.callbacks());
Expand Down Expand Up @@ -763,7 +764,8 @@ void UpstreamRequest::onPoolReady(
} else {
upstream_info.setUpstreamFilterState(filter_state);
}
upstream_info.setUpstreamLocalAddress(upstream_local_address);
upstream_info.setUpstreamLocalAddress(address_provider.localAddress());
upstream_info.setUpstreamRemoteAddress(address_provider.remoteAddress());
upstream_info.setUpstreamSslConnection(info.downstreamAddressProvider().sslConnection());

if (info.downstreamAddressProvider().connectionID().has_value()) {
Expand Down
2 changes: 1 addition & 1 deletion source/common/router/upstream_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
Upstream::HostDescriptionConstSharedPtr host) override;
void onPoolReady(std::unique_ptr<GenericUpstream>&& upstream,
Upstream::HostDescriptionConstSharedPtr host,
const Network::Address::InstanceConstSharedPtr& upstream_local_address,
const Network::ConnectionInfoProvider& address_provider,
StreamInfo::StreamInfo& info, absl::optional<Http::Protocol> protocol) override;
UpstreamToDownstream& upstreamToDownstream() override;

Expand Down
8 changes: 8 additions & 0 deletions source/common/stream_info/stream_info_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,17 @@ struct UpstreamInfoImpl : public UpstreamInfo {
const Network::Address::InstanceConstSharedPtr& upstreamLocalAddress() const override {
return upstream_local_address_;
}
const Network::Address::InstanceConstSharedPtr& upstreamRemoteAddress() const override {
return upstream_remote_address_;
}
void setUpstreamLocalAddress(
const Network::Address::InstanceConstSharedPtr& upstream_local_address) override {
upstream_local_address_ = upstream_local_address;
}
void setUpstreamRemoteAddress(
const Network::Address::InstanceConstSharedPtr& upstream_remote_address) override {
upstream_remote_address_ = upstream_remote_address;
}
void setUpstreamTransportFailureReason(absl::string_view failure_reason) override {
upstream_transport_failure_reason_ = std::string(failure_reason);
}
Expand Down Expand Up @@ -84,6 +91,7 @@ struct UpstreamInfoImpl : public UpstreamInfo {

Upstream::HostDescriptionConstSharedPtr upstream_host_{};
Network::Address::InstanceConstSharedPtr upstream_local_address_;
Network::Address::InstanceConstSharedPtr upstream_remote_address_;
UpstreamTiming upstream_timing_;
Ssl::ConnectionInfoConstSharedPtr upstream_ssl_info_;
absl::optional<uint64_t> upstream_connection_id_;
Expand Down
5 changes: 3 additions & 2 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -506,14 +506,15 @@ void Filter::onGenericPoolFailure(ConnectionPool::PoolFailureReason reason,
void Filter::onGenericPoolReady(StreamInfo::StreamInfo* info,
std::unique_ptr<GenericUpstream>&& upstream,
Upstream::HostDescriptionConstSharedPtr& host,
const Network::Address::InstanceConstSharedPtr& local_address,
const Network::ConnectionInfoProvider& address_provider,
Ssl::ConnectionInfoConstSharedPtr ssl_info) {
upstream_ = std::move(upstream);
generic_conn_pool_.reset();
read_callbacks_->upstreamHost(host);
StreamInfo::UpstreamInfo& upstream_info = *getStreamInfo().upstreamInfo();
upstream_info.setUpstreamHost(host);
upstream_info.setUpstreamLocalAddress(local_address);
upstream_info.setUpstreamLocalAddress(address_provider.localAddress());
upstream_info.setUpstreamRemoteAddress(address_provider.remoteAddress());
upstream_info.setUpstreamSslConnection(ssl_info);
onUpstreamConnection();
read_callbacks_->continueReading();
Expand Down
2 changes: 1 addition & 1 deletion source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ class Filter : public Network::ReadFilter,
// GenericConnectionPoolCallbacks
void onGenericPoolReady(StreamInfo::StreamInfo* info, std::unique_ptr<GenericUpstream>&& upstream,
Upstream::HostDescriptionConstSharedPtr& host,
const Network::Address::InstanceConstSharedPtr& local_address,
const Network::ConnectionInfoProvider& address_provider,
Ssl::ConnectionInfoConstSharedPtr ssl_info) override;
void onGenericPoolFailure(ConnectionPool::PoolFailureReason reason,
absl::string_view failure_reason,
Expand Down
6 changes: 3 additions & 3 deletions source/common/tcp_proxy/upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ void TcpConnPool::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data
auto upstream = std::make_unique<TcpUpstream>(std::move(conn_data), upstream_callbacks_);
callbacks_->onGenericPoolReady(
&connection.streamInfo(), std::move(upstream), host,
latched_data->connection().connectionInfoProvider().localAddress(),
latched_data->connection().connectionInfoProvider(),
latched_data->connection().streamInfo().downstreamAddressProvider().sslConnection());
}

Expand Down Expand Up @@ -252,9 +252,9 @@ void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder,
}

void HttpConnPool::onGenericPoolReady(Upstream::HostDescriptionConstSharedPtr& host,
const Network::Address::InstanceConstSharedPtr& local_address,
const Network::ConnectionInfoProvider& address_provider,
Ssl::ConnectionInfoConstSharedPtr ssl_info) {
callbacks_->onGenericPoolReady(nullptr, std::move(upstream_), host, local_address, ssl_info);
callbacks_->onGenericPoolReady(nullptr, std::move(upstream_), host, address_provider, ssl_info);
}

Http2Upstream::Http2Upstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
Expand Down
4 changes: 2 additions & 2 deletions source/common/tcp_proxy/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callba
virtual ~Callbacks() = default;
virtual void onSuccess(Http::RequestEncoder& request_encoder) {
ASSERT(conn_pool_ != nullptr);
conn_pool_->onGenericPoolReady(host_, request_encoder.getStream().connectionLocalAddress(),
conn_pool_->onGenericPoolReady(host_, request_encoder.getStream().connectionInfoProvider(),
ssl_info_);
}
virtual void onFailure() {
Expand All @@ -91,7 +91,7 @@ class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callba

private:
void onGenericPoolReady(Upstream::HostDescriptionConstSharedPtr& host,
const Network::Address::InstanceConstSharedPtr& local_address,
const Network::ConnectionInfoProvider& address_provider,
Ssl::ConnectionInfoConstSharedPtr ssl_info);
const TunnelingConfigHelper& config_;
Http::CodecType type_;
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/upstreams/http/http/upstream_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void HttpConnPool::onPoolReady(Envoy::Http::RequestEncoder& request_encoder,
auto upstream =
std::make_unique<HttpUpstream>(callbacks_->upstreamToDownstream(), &request_encoder);
callbacks_->onPoolReady(std::move(upstream), host,
request_encoder.getStream().connectionLocalAddress(), info, protocol);
request_encoder.getStream().connectionInfoProvider(), info, protocol);
}

} // namespace Http
Expand Down
3 changes: 1 addition & 2 deletions source/extensions/upstreams/http/tcp/upstream_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ void TcpConnPool::onPoolReady(Envoy::Tcp::ConnectionPool::ConnectionDataPtr&& co
Network::Connection& latched_conn = conn_data->connection();
auto upstream =
std::make_unique<TcpUpstream>(&callbacks_->upstreamToDownstream(), std::move(conn_data));
callbacks_->onPoolReady(std::move(upstream), host,
latched_conn.connectionInfoProvider().localAddress(),
callbacks_->onPoolReady(std::move(upstream), host, latched_conn.connectionInfoProvider(),
latched_conn.streamInfo(), {});
}

Expand Down
16 changes: 10 additions & 6 deletions test/common/router/router_upstream_log_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ class RouterUpstreamLogTest : public testing::Test {
const Http::ConnectionPool::Instance::StreamOptions&)
-> Http::ConnectionPool::Cancellable* {
response_decoder = &decoder;
EXPECT_CALL(encoder.stream_, connectionLocalAddress())
.WillRepeatedly(ReturnRef(upstream_local_address1_));
EXPECT_CALL(encoder.stream_, connectionInfoProvider())
.WillRepeatedly(ReturnRef(connection_info1_));
callbacks.onPoolReady(encoder,
context_.cluster_manager_.thread_local_cluster_.conn_pool_.host_,
stream_info_, Http::Protocol::Http10);
Expand Down Expand Up @@ -198,8 +198,8 @@ class RouterUpstreamLogTest : public testing::Test {
const Http::ConnectionPool::Instance::StreamOptions&)
-> Http::ConnectionPool::Cancellable* {
response_decoder = &decoder;
EXPECT_CALL(encoder1.stream_, connectionLocalAddress())
.WillRepeatedly(ReturnRef(upstream_local_address1_));
EXPECT_CALL(encoder1.stream_, connectionInfoProvider())
.WillRepeatedly(ReturnRef(connection_info1_));
callbacks.onPoolReady(encoder1,
context_.cluster_manager_.thread_local_cluster_.conn_pool_.host_,
stream_info_, Http::Protocol::Http10);
Expand Down Expand Up @@ -230,8 +230,8 @@ class RouterUpstreamLogTest : public testing::Test {
EXPECT_CALL(
context_.cluster_manager_.thread_local_cluster_.conn_pool_.host_->outlier_detector_,
putResult(Upstream::Outlier::Result::LocalOriginConnectSuccess, _));
EXPECT_CALL(encoder2.stream_, connectionLocalAddress())
.WillRepeatedly(ReturnRef(upstream_local_address2_));
EXPECT_CALL(encoder2.stream_, connectionInfoProvider())
.WillRepeatedly(ReturnRef(connection_info2_));
callbacks.onPoolReady(encoder2,
context_.cluster_manager_.thread_local_cluster_.conn_pool_.host_,
stream_info_, Http::Protocol::Http10);
Expand Down Expand Up @@ -263,6 +263,10 @@ class RouterUpstreamLogTest : public testing::Test {
Network::Utility::resolveUrl("tcp://10.0.0.5:10211")};
Network::Address::InstanceConstSharedPtr upstream_local_address2_{
Network::Utility::resolveUrl("tcp://10.0.0.5:10212")};
Network::ConnectionInfoSetterImpl connection_info1_{*upstream_local_address2_,
*upstream_local_address1_};
Network::ConnectionInfoSetterImpl connection_info2_{*upstream_local_address2_,
*upstream_local_address2_};
Event::MockTimer* response_timeout_{};
Event::MockTimer* per_try_timeout_{};

Expand Down
5 changes: 5 additions & 0 deletions test/integration/filters/stream_info_to_headers_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class StreamInfoToHeadersFilter : public Http::PassThroughFilter {
Http::LowerCaseString("alpn"),
decoder_callbacks_->streamInfo().upstreamInfo()->upstreamSslConnection()->alpn());
}
if (decoder_callbacks_->streamInfo().upstreamInfo()->upstreamRemoteAddress()) {
headers.addCopy(
Http::LowerCaseString("remote_address"),
decoder_callbacks_->streamInfo().upstreamInfo()->upstreamRemoteAddress()->asString());
}

headers.addCopy(Http::LowerCaseString("num_streams"),
decoder_callbacks_->streamInfo().upstreamInfo()->upstreamNumStreams());
Expand Down
3 changes: 3 additions & 0 deletions test/integration/multiplexed_upstream_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ void MultiplexedUpstreamIntegrationTest::bidirectionalStreaming(uint32_t bytes)
EXPECT_EQ(
"1",
response->headers().get(Http::LowerCaseString("num_streams"))[0]->value().getStringView());
EXPECT_EQ(
fake_upstreams_[0]->localAddress()->ip()->addressAsString(),
response->headers().get(Http::LowerCaseString("remote_address"))[0]->value().getStringView());
}

TEST_P(MultiplexedUpstreamIntegrationTest, BidirectionalStreaming) { bidirectionalStreaming(1024); }
Expand Down
2 changes: 1 addition & 1 deletion test/integration/upstreams/per_host_upstream_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class PerHostHttpConnPool : public Extensions::Upstreams::Http::Http::HttpConnPo
auto upstream = std::make_unique<PerHostHttpUpstream>(callbacks_->upstreamToDownstream(),
&callbacks_encoder, host);
callbacks_->onPoolReady(std::move(upstream), host,
callbacks_encoder.getStream().connectionLocalAddress(), info, protocol);
callbacks_encoder.getStream().connectionInfoProvider(), info, protocol);
}
};

Expand Down
1 change: 1 addition & 0 deletions test/mocks/http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ envoy_cc_mock(
hdrs = ["stream.h"],
deps = [
"//envoy/http:codec_interface",
"//source/common/network:socket_lib",
],
)

Expand Down
Loading