Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,11 @@ bool Filter::maybeRetryReset(Http::StreamResetReason reset_reason,
upstream_request.upstreamHost()->stats().rq_error_.inc();
}

upstream_request.removeFromList(upstream_requests_);
auto request_ptr = upstream_request.removeFromList(upstream_requests_);
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.allow_upstream_inline_write")) {
request_ptr->cleanUp();
callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
}
return true;
} else if (retry_status == RetryStatus::NoOverflow) {
callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow);
Expand Down Expand Up @@ -1174,7 +1178,11 @@ void Filter::onUpstreamReset(Http::StreamResetReason reset_reason,
? Http::Code::BadGateway
: Http::Code::ServiceUnavailable;
chargeUpstreamAbort(error_code, dropped, upstream_request);
upstream_request.removeFromList(upstream_requests_);
auto request_ptr = upstream_request.removeFromList(upstream_requests_);
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.allow_upstream_inline_write")) {
request_ptr->cleanUp();
callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
}

// If there are other in-flight requests that might see an upstream response,
// don't return anything downstream.
Expand Down Expand Up @@ -1285,7 +1293,12 @@ void Filter::onUpstream1xxHeaders(Http::ResponseHeaderMapPtr&& headers,

void Filter::resetAll() {
while (!upstream_requests_.empty()) {
upstream_requests_.back()->removeFromList(upstream_requests_)->resetStream();
auto request_ptr = upstream_requests_.back()->removeFromList(upstream_requests_);
request_ptr->resetStream();
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.allow_upstream_inline_write")) {
request_ptr->cleanUp();
callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
}
}
}

Expand Down Expand Up @@ -1363,8 +1376,12 @@ void Filter::onUpstreamHeaders(uint64_t response_code, Http::ResponseHeaderMapPt
if (!end_stream || !upstream_request.encodeComplete()) {
upstream_request.resetStream();
}
upstream_request.removeFromList(upstream_requests_);

auto request_ptr = upstream_request.removeFromList(upstream_requests_);
if (Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.allow_upstream_inline_write")) {
request_ptr->cleanUp();
callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
}
return;
} else if (retry_status == RetryStatus::NoOverflow) {
callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow);
Expand Down Expand Up @@ -1394,7 +1411,12 @@ void Filter::onUpstreamHeaders(uint64_t response_code, Http::ResponseHeaderMapPt

// Reset the stream because there are other in-flight requests that we'll
// wait around for and we're not interested in consuming any body/trailers.
upstream_request.removeFromList(upstream_requests_)->resetStream();
auto request_ptr = upstream_request.removeFromList(upstream_requests_);
request_ptr->resetStream();
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.allow_upstream_inline_write")) {
request_ptr->cleanUp();
callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
}
return;
}

Expand Down
11 changes: 9 additions & 2 deletions source/common/router/upstream_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent,
encode_complete_(false), encode_trailers_(false), retried_(false), awaiting_headers_(true),
outlier_detection_timeout_recorded_(false),
create_per_try_timeout_on_request_complete_(false), paused_for_connect_(false),
record_timeout_budget_(parent_.cluster()->timeoutBudgetStats().has_value()) {
record_timeout_budget_(parent_.cluster()->timeoutBudgetStats().has_value()),
cleaned_up_(false) {
if (parent_.config().start_child_span_) {
span_ = parent_.callbacks()->activeSpan().spawnChild(
parent_.callbacks()->tracingConfig(), "router " + parent.cluster()->name() + " egress",
Expand All @@ -69,7 +70,13 @@ UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent,
}
}

UpstreamRequest::~UpstreamRequest() {
UpstreamRequest::~UpstreamRequest() { cleanUp(); }

void UpstreamRequest::cleanUp() {
if (cleaned_up_) {
return;
}
cleaned_up_ = true;
if (span_ != nullptr) {
Tracing::HttpTracerUtility::finalizeUpstreamSpan(*span_, upstream_headers_.get(),
upstream_trailers_.get(), stream_info_,
Expand Down
8 changes: 7 additions & 1 deletion source/common/router/upstream_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@ class UpstreamRequest;
class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
public UpstreamToDownstream,
public LinkedObject<UpstreamRequest>,
public GenericConnectionPoolCallbacks {
public GenericConnectionPoolCallbacks,
public Event::DeferredDeletable {
public:
UpstreamRequest(RouterFilterInterface& parent, std::unique_ptr<GenericConnPool>&& conn_pool);
~UpstreamRequest() override;

// To be called from the destructor, or prior to deferred delete.
void cleanUp();

void encodeHeaders(bool end_stream);
void encodeData(Buffer::Instance& data, bool end_stream);
void encodeTrailers(const Http::RequestTrailerMap& trailers);
Expand Down Expand Up @@ -178,6 +182,8 @@ class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
// Sentinel to indicate if timeout budget tracking is configured for the cluster,
// and if so, if the per-try histogram should record a value.
bool record_timeout_budget_ : 1;
// Track if one time clean up has been performed.
bool cleaned_up_ : 1;

Event::TimerPtr max_stream_duration_timer_;
};
Expand Down
1 change: 1 addition & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ constexpr const char* runtime_features[] = {
"envoy.reloadable_features.test_feature_true",
// Begin alphabetically sorted section.
"envoy.reloadable_features.allow_response_for_timeout",
"envoy.reloadable_features.allow_upstream_inline_write",
"envoy.reloadable_features.conn_pool_delete_when_idle",
"envoy.reloadable_features.correct_scheme_and_xfp",
"envoy.reloadable_features.disable_tls_inspector_injection",
Expand Down
4 changes: 4 additions & 0 deletions test/common/http/async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ TEST_F(AsyncClientImplTest, RetryWithStream) {
EXPECT_CALL(stream_callbacks_, onComplete());
ResponseHeaderMapPtr response_headers2(new TestResponseHeaderMapImpl{{":status", "200"}});
response_decoder_->decodeHeaders(std::move(response_headers2), true);
dispatcher_.clearDeferredDeleteList();
}

TEST_F(AsyncClientImplTest, MultipleStreams) {
Expand Down Expand Up @@ -865,6 +866,7 @@ TEST_F(AsyncClientImplTest, LocalResetAfterStreamStart) {
response_decoder_->decodeData(*body, false);

stream->reset();
dispatcher_.clearDeferredDeleteList();
}

TEST_F(AsyncClientImplTest, SendDataAfterRemoteClosure) {
Expand Down Expand Up @@ -979,6 +981,7 @@ TEST_F(AsyncClientImplTest, ResetInOnHeaders) {
static_cast<Http::AsyncStreamImpl*>(stream);
filter_callbacks->encodeHeaders(
ResponseHeaderMapPtr(new TestResponseHeaderMapImpl{{":status", "200"}}), false, "details");
dispatcher_.clearDeferredDeleteList();
}

TEST_F(AsyncClientImplTest, RemoteResetAfterStreamStart) {
Expand Down Expand Up @@ -1016,6 +1019,7 @@ TEST_F(AsyncClientImplTest, RemoteResetAfterStreamStart) {
response_decoder_->decodeData(*body, false);

stream_encoder_.getStream().resetStream(StreamResetReason::RemoteReset);
dispatcher_.clearDeferredDeleteList();
}

TEST_F(AsyncClientImplTest, ResetAfterResponseStart) {
Expand Down
2 changes: 2 additions & 0 deletions test/common/router/router_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ RouterTestBase::RouterTestBase(bool start_child_span, bool suppress_envoy_header
// Allow any number of (append|pop)TrackedObject calls for the dispatcher strict mock.
EXPECT_CALL(callbacks_.dispatcher_, pushTrackedObject(_)).Times(AnyNumber());
EXPECT_CALL(callbacks_.dispatcher_, popTrackedObject(_)).Times(AnyNumber());
EXPECT_CALL(callbacks_.dispatcher_, deferredDelete_(_)).Times(AnyNumber());
callbacks_.dispatcher_.delete_immediately_ = true;
}

void RouterTestBase::expectResponseTimerCreate() {
Expand Down
2 changes: 2 additions & 0 deletions test/common/router/router_upstream_log_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class RouterUpstreamLogTest : public testing::Test {
ON_CALL(*cluster_info_, name()).WillByDefault(ReturnRef(cluster_name));
ON_CALL(*cluster_info_, observabilityName()).WillByDefault(ReturnRef(cluster_name));
ON_CALL(callbacks_.stream_info_, upstreamClusterInfo()).WillByDefault(Return(cluster_info_));
EXPECT_CALL(callbacks_.dispatcher_, deferredDelete_).Times(testing::AnyNumber());
callbacks_.dispatcher_.delete_immediately_ = true;

if (upstream_log) {
ON_CALL(*context_.access_log_manager_.file_, write(_))
Expand Down
2 changes: 2 additions & 0 deletions test/integration/filters/test_socket_interface.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ Api::IoCallUint64Result TestIoSocketHandle::sendmsg(const Buffer::RawSlice* slic
const Address::Ip* self_ip,
const Address::Instance& peer_address) {
if (write_override_) {
peer_address_override_ = peer_address;
auto result = write_override_(this, slices, num_slice);
peer_address_override_.reset();
if (result.has_value()) {
return std::move(result).value();
}
Expand Down
13 changes: 13 additions & 0 deletions test/integration/filters/test_socket_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "source/common/network/io_socket_handle_impl.h"
#include "source/common/network/socket_interface_impl.h"
#include "source/common/network/utility.h"
#include "source/common/network/win32_socket_handle_impl.h"

#include "test/test_common/network_utility.h"
Expand Down Expand Up @@ -47,6 +48,17 @@ class TestIoSocketHandle : public Test::IoSocketHandlePlatformImpl {
dispatcher_->post([this, events]() { activateFileEvents(events); });
}

// HTTP/3 sockets won't have a bound peer address, but instead get peer
// address from the argument in sendmsg. TestIoSocketHandle::sendmsg will
// stash that in peer_address_override_.
Address::InstanceConstSharedPtr peerAddress() override {
if (peer_address_override_.has_value()) {
return Network::Utility::getAddressWithPort(
peer_address_override_.value().get(), peer_address_override_.value().get().ip()->port());
}
return Test::IoSocketHandlePlatformImpl::peerAddress();
}

private:
IoHandlePtr accept(struct sockaddr* addr, socklen_t* addrlen) override;
Api::IoCallUint64Result writev(const Buffer::RawSlice* slices, uint64_t num_slice) override;
Expand All @@ -56,6 +68,7 @@ class TestIoSocketHandle : public Test::IoSocketHandlePlatformImpl {

IoHandlePtr duplicate() override;

OptRef<const Address::Instance> peer_address_override_;
const WriteOverrideProc write_override_;
absl::Mutex mutex_;
Event::Dispatcher* dispatcher_ ABSL_GUARDED_BY(mutex_) = nullptr;
Expand Down
35 changes: 34 additions & 1 deletion test/integration/protocol_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3540,7 +3540,7 @@ class NoUdpGso : public Api::OsSysCallsImpl {
bool supportsUdpGso() const override { return false; }
};

TEST_P(DownstreamProtocolIntegrationTest, HandleSocketFail) {
TEST_P(DownstreamProtocolIntegrationTest, HandleDownstreamSocketFail) {
// Make sure for HTTP/3 Envoy will use sendmsg, so the write_matcher will work.
NoUdpGso reject_gso_;
TestThreadsafeSingletonInjector<Api::OsSysCallsImpl> os_calls{&reject_gso_};
Expand Down Expand Up @@ -3572,4 +3572,37 @@ TEST_P(DownstreamProtocolIntegrationTest, HandleSocketFail) {
test_server_.reset();
}

TEST_P(ProtocolIntegrationTest, HandleUpstreamSocketFail) {
SocketInterfaceSwap socket_swap;

initialize();
codec_client_ = makeHttpConnection(lookupPort("http"));
auto encoder_decoder = codec_client_->startRequest(default_request_headers_);
auto downstream_request = &encoder_decoder.first;
auto response = std::move(encoder_decoder.second);

// Make sure the headers made it through.
waitForNextUpstreamConnection(std::vector<uint64_t>{0}, TestUtility::DefaultTimeout,
fake_upstream_connection_);
AssertionResult result =
fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_);
RELEASE_ASSERT(result, result.message());

// Makes us have Envoy's writes to upstream return EBADF
Network::IoSocketError* ebadf = Network::IoSocketError::getIoSocketEbadfInstance();
socket_swap.write_matcher_->setDestinationPort(fake_upstreams_[0]->localAddress()->ip()->port());
socket_swap.write_matcher_->setWriteOverride(ebadf);

Buffer::OwnedImpl data("HTTP body content goes here");
codec_client_->sendData(*downstream_request, data, true);

ASSERT_TRUE(response->waitForEndStream());
EXPECT_TRUE(response->complete());
EXPECT_EQ("503", response->headers().getStatusValue());
socket_swap.write_matcher_->setWriteOverride(nullptr);
// Shut down the server before os_calls goes out of scope to avoid syscalls
// during its removal.
test_server_.reset();
}

} // namespace Envoy
4 changes: 4 additions & 0 deletions test/mocks/event/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ class MockDispatcher : public Dispatcher {
if (to_delete) {
to_delete_.push_back(std::move(to_delete));
}
if (delete_immediately_) {
to_delete_.clear();
}
}

SignalEventPtr listenForSignal(signal_t signal_num, SignalCb cb) override {
Expand Down Expand Up @@ -167,6 +170,7 @@ class MockDispatcher : public Dispatcher {
std::list<DeferredDeletablePtr> to_delete_;
testing::NiceMock<MockBufferFactory> buffer_factory_;
bool allow_null_callback_{};
bool delete_immediately_{};

private:
const std::string name_;
Expand Down