Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions source/common/http/codec_helper.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "envoy/event/dispatcher.h"
#include "envoy/event/timer.h"
#include "envoy/http/codec.h"

#include "source/common/common/assert.h"
Expand Down Expand Up @@ -82,5 +84,55 @@ class StreamCallbackHelper {
uint32_t high_watermark_callbacks_{};
};

// A base class shared between Http2 codec and Http3 codec to set a timeout for locally ended stream
// with buffered data.
class MultiplexedStreamImplBase : public Stream, public StreamCallbackHelper {
public:
MultiplexedStreamImplBase(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}
~MultiplexedStreamImplBase() override { ASSERT(stream_idle_timer_ == nullptr); }
// TODO(mattklein123): Optimally this would be done in the destructor but there are currently
// deferred delete lifetime issues that need sorting out if the destructor of the stream is
// going to be able to refer to the parent connection.
virtual void destroy() { disarmStreamIdleTimer(); }

void onLocalEndStream() {
ASSERT(local_end_stream_);
if (hasPendingData()) {
createPendingFlushTimer();
}
}

void disarmStreamIdleTimer() {
if (stream_idle_timer_ != nullptr) {
// To ease testing and the destructor assertion.
stream_idle_timer_->disableTimer();
stream_idle_timer_.reset();
}
}

protected:
void setFlushTimeout(std::chrono::milliseconds timeout) override {
stream_idle_timeout_ = timeout;
}

void createPendingFlushTimer() {
ASSERT(stream_idle_timer_ == nullptr);
if (stream_idle_timeout_.count() > 0) {
stream_idle_timer_ = dispatcher_.createTimer([this] { onPendingFlushTimer(); });
stream_idle_timer_->enableTimer(stream_idle_timeout_);
}
}

virtual void onPendingFlushTimer() { stream_idle_timer_.reset(); }

virtual bool hasPendingData() PURE;

private:
Event::Dispatcher& dispatcher_;
// See HttpConnectionManager.stream_idle_timeout.
std::chrono::milliseconds stream_idle_timeout_{};
Event::TimerPtr stream_idle_timer_;
};

} // namespace Http
} // namespace Envoy
23 changes: 6 additions & 17 deletions source/common/http/http2/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ template <typename T> static T* removeConst(const void* object) {
}

ConnectionImpl::StreamImpl::StreamImpl(ConnectionImpl& parent, uint32_t buffer_limit)
: parent_(parent),
: MultiplexedStreamImplBase(parent.connection_.dispatcher()), parent_(parent),
pending_recv_data_(parent_.connection_.dispatcher().getWatermarkFactory().createBuffer(
[this]() -> void { this->pendingRecvBufferLowWatermark(); },
[this]() -> void { this->pendingRecvBufferHighWatermark(); },
Expand All @@ -149,10 +149,8 @@ ConnectionImpl::StreamImpl::StreamImpl(ConnectionImpl& parent, uint32_t buffer_l
}
}

ConnectionImpl::StreamImpl::~StreamImpl() { ASSERT(stream_idle_timer_ == nullptr); }

void ConnectionImpl::StreamImpl::destroy() {
disarmStreamIdleTimer();
MultiplexedStreamImplBase::destroy();
parent_.stats_.streams_active_.dec();
parent_.stats_.pending_send_bytes_.sub(pending_send_data_->length());
}
Expand Down Expand Up @@ -287,7 +285,7 @@ void ConnectionImpl::StreamImpl::encodeTrailersBase(const HeaderMap& trailers) {
trailers.empty() && parent_.skip_encoding_empty_trailers_;
if (!skip_encoding_empty_trailers) {
pending_trailers_to_encode_ = cloneTrailers(trailers);
createPendingFlushTimer();
onLocalEndStream();
}
} else {
submitTrailers(trailers);
Expand Down Expand Up @@ -491,18 +489,9 @@ void ConnectionImpl::ServerStreamImpl::submitHeaders(const std::vector<nghttp2_n
ASSERT(rc == 0);
}

void ConnectionImpl::ServerStreamImpl::createPendingFlushTimer() {
ASSERT(stream_idle_timer_ == nullptr);
if (stream_idle_timeout_.count() > 0) {
stream_idle_timer_ =
parent_.connection_.dispatcher().createTimer([this] { onPendingFlushTimer(); });
stream_idle_timer_->enableTimer(stream_idle_timeout_);
}
}

void ConnectionImpl::StreamImpl::onPendingFlushTimer() {
ENVOY_CONN_LOG(debug, "pending stream flush timeout", parent_.connection_);
stream_idle_timer_.reset();
MultiplexedStreamImplBase::onPendingFlushTimer();
parent_.stats_.tx_flush_timeout_.inc();
ASSERT(local_end_stream_ && !local_end_stream_sent_);
// This will emit a reset frame for this stream and close the stream locally. No reset callbacks
Expand Down Expand Up @@ -541,8 +530,8 @@ void ConnectionImpl::StreamImpl::encodeDataHelper(Buffer::Instance& data, bool e
// Intended to check through coverage that this error case is tested
return;
}
if (local_end_stream_ && pending_send_data_->length() > 0) {
createPendingFlushTimer();
if (local_end_stream_) {
onLocalEndStream();
}
}

Expand Down
43 changes: 16 additions & 27 deletions source/common/http/http2/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,25 +181,16 @@ class ConnectionImpl : public virtual Connection,
* Base class for client and server side streams.
*/
struct StreamImpl : public virtual StreamEncoder,
public Stream,
public LinkedObject<StreamImpl>,
public Event::DeferredDeletable,
public StreamCallbackHelper,
public Http::MultiplexedStreamImplBase,
public ScopeTrackedObject {

StreamImpl(ConnectionImpl& parent, uint32_t buffer_limit);
~StreamImpl() override;
// TODO(mattklein123): Optimally this would be done in the destructor but there are currently
// deferred delete lifetime issues that need sorting out if the destructor of the stream is
// going to be able to refer to the parent connection.
virtual void destroy();
void disarmStreamIdleTimer() {
if (stream_idle_timer_ != nullptr) {
// To ease testing and the destructor assertion.
stream_idle_timer_->disableTimer();
stream_idle_timer_.reset();
}
}

// Http::MultiplexedStreamImplBase
void destroy() override;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally for overrides we comment the class it's from

// Http::MultiplexedStreamImplBase

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

void onPendingFlushTimer() override;

StreamImpl* base() { return this; }
ssize_t onDataSourceRead(uint64_t length, uint32_t* data_flags);
Expand All @@ -217,8 +208,6 @@ class ConnectionImpl : public virtual Connection,
virtual HeaderMap& headers() PURE;
virtual void allocTrailers() PURE;
virtual HeaderMapPtr cloneTrailers(const HeaderMap& trailers) PURE;
virtual void createPendingFlushTimer() PURE;
void onPendingFlushTimer();

// Http::StreamEncoder
void encodeData(Buffer::Instance& data, bool end_stream) override;
Expand All @@ -236,9 +225,6 @@ class ConnectionImpl : public virtual Connection,
return parent_.connection_.addressProvider().localAddress();
}
absl::string_view responseDetails() override { return details_; }
void setFlushTimeout(std::chrono::milliseconds timeout) override {
stream_idle_timeout_ = timeout;
}
void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override;

// ScopeTrackedObject
Expand Down Expand Up @@ -317,9 +303,12 @@ class ConnectionImpl : public virtual Connection,
bool pending_send_buffer_high_watermark_called_ : 1;
bool reset_due_to_messaging_error_ : 1;
absl::string_view details_;
// See HttpConnectionManager.stream_idle_timeout.
std::chrono::milliseconds stream_idle_timeout_{};
Event::TimerPtr stream_idle_timer_;

protected:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto on commenting the override.
alternately I don't see a reason to not have it public and group it - your call.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. It is only used by base class. So I prefer keep it as protected.

// Http::MultiplexedStreamImplBase
bool hasPendingData() override {
return pending_send_data_->length() > 0 || pending_trailers_to_encode_ != nullptr;
}
};

using StreamImplPtr = std::unique_ptr<StreamImpl>;
Expand All @@ -333,6 +322,11 @@ class ConnectionImpl : public virtual Connection,
: StreamImpl(parent, buffer_limit), response_decoder_(response_decoder),
headers_or_trailers_(ResponseHeaderMapImpl::create()) {}

// Http::MultiplexedStreamImplBase
void setFlushTimeout(std::chrono::milliseconds /*timeout*/) override {
// Client streams do not need a flush timer because we currently assume that any failure
// to flush would be covered by a request/stream/etc. timeout.
}
// StreamImpl
void submitHeaders(const std::vector<nghttp2_nv>& final_headers,
nghttp2_data_provider* provider) override;
Expand All @@ -358,10 +352,6 @@ class ConnectionImpl : public virtual Connection,
HeaderMapPtr cloneTrailers(const HeaderMap& trailers) override {
return createHeaderMap<RequestTrailerMapImpl>(trailers);
}
void createPendingFlushTimer() override {
// Client streams do not create a flush timer because we currently assume that any failure
// to flush would be covered by a request/stream/etc. timeout.
}

// RequestEncoder
Status encodeHeaders(const RequestHeaderMap& headers, bool end_stream) override;
Expand Down Expand Up @@ -407,7 +397,6 @@ class ConnectionImpl : public virtual Connection,
HeaderMapPtr cloneTrailers(const HeaderMap& trailers) override {
return createHeaderMap<ResponseTrailerMapImpl>(trailers);
}
void createPendingFlushTimer() override;
void resetStream(StreamResetReason reason) override;

// ResponseEncoder
Expand Down
3 changes: 2 additions & 1 deletion source/common/http/http3/codec_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ namespace Http3 {
COUNTER(rx_reset) \
COUNTER(tx_reset) \
COUNTER(metadata_not_supported_error) \
COUNTER(quic_version_rfc_v1)
COUNTER(quic_version_rfc_v1) \
COUNTER(tx_flush_timeout)

/**
* Wrapper struct for the HTTP/3 codec stats. @see stats_macros.h
Expand Down
10 changes: 10 additions & 0 deletions source/common/quic/envoy_quic_client_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ Http::Status EnvoyQuicClientStream::encodeHeaders(const Http::RequestHeaderMap&
}
}
WriteHeaders(std::move(spdy_headers), end_stream, nullptr);
if (local_end_stream_) {
onLocalEndStream();
}
return Http::okStatus();
}

Expand All @@ -85,6 +88,9 @@ void EnvoyQuicClientStream::encodeData(Buffer::Instance& data, bool end_stream)
Reset(quic::QUIC_BAD_APPLICATION_PAYLOAD);
return;
}
if (local_end_stream_) {
onLocalEndStream();
}
}

void EnvoyQuicClientStream::encodeTrailers(const Http::RequestTrailerMap& trailers) {
Expand All @@ -93,6 +99,7 @@ void EnvoyQuicClientStream::encodeTrailers(const Http::RequestTrailerMap& traile
ENVOY_STREAM_LOG(debug, "encodeTrailers: {}.", *this, trailers);
ScopedWatermarkBufferUpdater updater(this, this);
WriteTrailers(envoyHeadersToSpdyHeaderBlock(trailers), nullptr);
onLocalEndStream();
}

void EnvoyQuicClientStream::encodeMetadata(const Http::MetadataMapVector& /*metadata_map_vector*/) {
Expand Down Expand Up @@ -271,6 +278,7 @@ void EnvoyQuicClientStream::OnConnectionClosed(quic::QuicErrorCode error,
}

void EnvoyQuicClientStream::OnClose() {
destroy();
quic::QuicSpdyClientStream::OnClose();
if (isDoingWatermarkAccounting()) {
// This is called in the scope of a watermark buffer updater. Clear the
Expand Down Expand Up @@ -321,5 +329,7 @@ void EnvoyQuicClientStream::onStreamError(absl::optional<bool> should_close_conn
}
}

bool EnvoyQuicClientStream::hasPendingData() { return BufferedDataBytes() > 0; }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe comment how this plays with trailers?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. I added comment in server stream to which this interface actually matters.


} // namespace Quic
} // namespace Envoy
3 changes: 3 additions & 0 deletions source/common/quic/envoy_quic_client_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class EnvoyQuicClientStream : public quic::QuicSpdyClientStream,
void OnTrailingHeadersComplete(bool fin, size_t frame_len,
const quic::QuicHeaderList& header_list) override;

// Http::MultiplexedStreamImplBase
bool hasPendingData() override;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we disable flush timer for client streams as we do for HTTP/2? or maybe since we have separate classes only inherit for the server stream?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is disabled via overriding setFlushTimeout() to no-op a few lines above.
hasPendingData() is implemented here just because it is pure method in the base class.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I missed it as it wasn't part of the diff, my bad!


private:
QuicFilterManagerConnectionImpl* filterManagerConnection();

Expand Down
30 changes: 28 additions & 2 deletions source/common/quic/envoy_quic_server_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ void EnvoyQuicServerStream::encodeHeaders(const Http::ResponseHeaderMap& headers
local_end_stream_ = end_stream;
SendBufferMonitor::ScopedWatermarkBufferUpdater updater(this, this);
WriteHeaders(envoyHeadersToSpdyHeaderBlock(headers), end_stream, nullptr);
if (local_end_stream_) {
onLocalEndStream();
}
}

void EnvoyQuicServerStream::encodeData(Buffer::Instance& data, bool end_stream) {
Expand All @@ -78,6 +81,9 @@ void EnvoyQuicServerStream::encodeData(Buffer::Instance& data, bool end_stream)
Reset(quic::QUIC_BAD_APPLICATION_PAYLOAD);
return;
}
if (local_end_stream_) {
onLocalEndStream();
}
}

void EnvoyQuicServerStream::encodeTrailers(const Http::ResponseTrailerMap& trailers) {
Expand All @@ -86,6 +92,7 @@ void EnvoyQuicServerStream::encodeTrailers(const Http::ResponseTrailerMap& trail
ENVOY_STREAM_LOG(debug, "encodeTrailers: {}.", *this, trailers);
SendBufferMonitor::ScopedWatermarkBufferUpdater updater(this, this);
WriteTrailers(envoyHeadersToSpdyHeaderBlock(trailers), nullptr);
onLocalEndStream();
}

void EnvoyQuicServerStream::encodeMetadata(const Http::MetadataMapVector& /*metadata_map_vector*/) {
Expand Down Expand Up @@ -269,8 +276,10 @@ void EnvoyQuicServerStream::OnStreamReset(const quic::QuicRstStreamFrame& frame)
void EnvoyQuicServerStream::Reset(quic::QuicRstStreamErrorCode error) {
ENVOY_STREAM_LOG(debug, "sending reset code={}", *this, error);
stats_.tx_reset_.inc();
// Upper layers expect calling resetStream() to immediately raise reset callbacks.
runResetCallbacks(quicRstErrorToEnvoyLocalResetReason(error));
if (!local_end_stream_) {
// Upper layers expect calling resetStream() to immediately raise reset callbacks.
runResetCallbacks(quicRstErrorToEnvoyLocalResetReason(error));
}
quic::QuicSpdyServerStreamBase::Reset(error);
}

Expand Down Expand Up @@ -302,6 +311,7 @@ void EnvoyQuicServerStream::CloseWriteSide() {
}

void EnvoyQuicServerStream::OnClose() {
destroy();
quic::QuicSpdyServerStreamBase::OnClose();
if (isDoingWatermarkAccounting()) {
return;
Expand Down Expand Up @@ -367,5 +377,21 @@ void EnvoyQuicServerStream::onStreamError(absl::optional<bool> should_close_conn
}
}

void EnvoyQuicServerStream::onPendingFlushTimer() {
ENVOY_STREAM_LOG(debug, "pending stream flush timeout", *this);
Http::MultiplexedStreamImplBase::onPendingFlushTimer();
stats_.tx_flush_timeout_.inc();
ASSERT(local_end_stream_ && !fin_sent());
// Reset the stream locally. But no reset callbacks will be run because higher layers think the
// stream is already finished.
Reset(quic::QUIC_STREAM_CANCELLED);
}

bool EnvoyQuicServerStream::hasPendingData() {
// Quic stream sends headers and trailers on the same stream, and buffers them in the same sending
// buffer if needed. So checking this buffer is sufficient.
return BufferedDataBytes() > 0;
}

} // namespace Quic
} // namespace Envoy
7 changes: 4 additions & 3 deletions source/common/quic/envoy_quic_server_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ class EnvoyQuicServerStream : public quic::QuicSpdyServerStreamBase,

// Http::Stream
void resetStream(Http::StreamResetReason reason) override;
void setFlushTimeout(std::chrono::milliseconds) override {
// TODO(mattklein123): Actually implement this for HTTP/3 similar to HTTP/2.
}

// quic::QuicSpdyStream
void OnBodyAvailable() override;
Expand Down Expand Up @@ -79,6 +76,10 @@ class EnvoyQuicServerStream : public quic::QuicSpdyServerStreamBase,
const quic::QuicHeaderList& header_list) override;
void OnHeadersTooLarge() override;

// Http::MultiplexedStreamImplBase
void onPendingFlushTimer() override;
bool hasPendingData() override;

private:
QuicFilterManagerConnectionImpl* filterManagerConnection();

Expand Down
6 changes: 3 additions & 3 deletions source/common/quic/envoy_quic_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ namespace Quic {

// Base class for EnvoyQuicServer|ClientStream.
class EnvoyQuicStream : public virtual Http::StreamEncoder,
public Http::Stream,
public Http::StreamCallbackHelper,
public Http::MultiplexedStreamImplBase,
public SendBufferMonitor,
public HeaderValidator,
protected Logger::Loggable<Logger::Id::quic_stream> {
Expand All @@ -28,7 +27,8 @@ class EnvoyQuicStream : public virtual Http::StreamEncoder,
std::function<void()> below_low_watermark,
std::function<void()> above_high_watermark, Http::Http3::CodecStats& stats,
const envoy::config::core::v3::Http3ProtocolOptions& http3_options)
: stats_(stats), http3_options_(http3_options),
: Http::MultiplexedStreamImplBase(filter_manager_connection.dispatcher()), stats_(stats),
http3_options_(http3_options),
send_buffer_simulation_(buffer_limit / 2, buffer_limit, std::move(below_low_watermark),
std::move(above_high_watermark), ENVOY_LOGGER()),
filter_manager_connection_(filter_manager_connection),
Expand Down
Loading