Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/common/network/connection_impl_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ void ConnectionImplBase::setConnectionStats(const ConnectionStats& stats) {
"with the configured filter chain.");
connection_stats_ = std::make_unique<ConnectionStats>(stats);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are format fixes for previous change

void ConnectionImplBase::setDelayedCloseTimeout(std::chrono::milliseconds timeout) {
// Validate that this is only called prior to issuing a close() or closeSocket().
ASSERT(delayed_close_timer_ == nullptr && state() == State::Open);
Expand Down
1 change: 0 additions & 1 deletion source/common/network/connection_impl_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ class ConnectionImplBase : public FilterManagerConnection,
protected Logger::Loggable<Logger::Id::connection> {
public:
ConnectionImplBase(Event::Dispatcher& dispatcher, uint64_t id);
~ConnectionImplBase() override {}

// Network::Connection
void addConnectionCallbacks(ConnectionCallbacks& cb) override;
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/quic_listeners/quiche/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ EnvoyQuicStream* quicStreamToEnvoyStream(quic::QuicStream* stream) {
return dynamic_cast<EnvoyQuicStream*>(stream);
}

bool QuicHttpConnectionImplBase::wantsToWrite() { return quic_session_.HasDataToWrite(); }
bool QuicHttpConnectionImplBase::wantsToWrite() { return quic_session_.bytesToSend() > 0; }

void QuicHttpConnectionImplBase::runWatermarkCallbacksForEachStream(
quic::QuicSmallMap<quic::QuicStreamId, std::unique_ptr<quic::QuicStream>, 10>& stream_map,
Expand Down
5 changes: 3 additions & 2 deletions source/extensions/quic_listeners/quiche/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ namespace Quic {
class QuicHttpConnectionImplBase : public virtual Http::Connection,
protected Logger::Loggable<Logger::Id::quic> {
public:
QuicHttpConnectionImplBase(quic::QuicSpdySession& quic_session) : quic_session_(quic_session) {}
QuicHttpConnectionImplBase(QuicFilterManagerConnectionImpl& quic_session)
: quic_session_(quic_session) {}

// Http::Connection
void dispatch(Buffer::Instance& /*data*/) override {
Expand All @@ -35,7 +36,7 @@ class QuicHttpConnectionImplBase : public virtual Http::Connection,
bool high_watermark);

protected:
quic::QuicSpdySession& quic_session_;
QuicFilterManagerConnectionImpl& quic_session_;
};

class QuicHttpServerConnectionImpl : public QuicHttpConnectionImplBase,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ void EnvoyQuicClientSession::Initialize() {
quic_connection_->setEnvoyConnection(*this);
}

void EnvoyQuicClientSession::OnCanWrite() {
quic::QuicSpdyClientSession::OnCanWrite();
maybeApplyDelayClosePolicy();
}

void EnvoyQuicClientSession::OnGoAway(const quic::QuicGoAwayFrame& frame) {
ENVOY_CONN_LOG(debug, "GOAWAY received with error {}: {}", *this,
quic::QuicErrorCodeToString(frame.error_code), frame.reason_phrase);
Expand Down Expand Up @@ -73,5 +78,7 @@ EnvoyQuicClientSession::CreateIncomingStream(quic::PendingStream* /*pending*/) {
NOT_REACHED_GCOVR_EXCL_LINE;
}

bool EnvoyQuicClientSession::hasDataToWrite() { return HasDataToWrite(); }

} // namespace Quic
} // namespace Envoy
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class EnvoyQuicClientSession : public QuicFilterManagerConnectionImpl,
void OnConnectionClosed(const quic::QuicConnectionCloseFrame& frame,
quic::ConnectionCloseSource source) override;
void Initialize() override;
void OnCanWrite() override;
void OnGoAway(const quic::QuicGoAwayFrame& frame) override;
// quic::QuicSpdyClientSessionBase
void OnCryptoHandshakeEvent(CryptoHandshakeEvent event) override;
Expand All @@ -66,6 +67,9 @@ class EnvoyQuicClientSession : public QuicFilterManagerConnectionImpl,
quic::QuicSpdyStream* CreateIncomingStream(quic::QuicStreamId id) override;
quic::QuicSpdyStream* CreateIncomingStream(quic::PendingStream* pending) override;

// QuicFilterManagerConnectionImpl
bool hasDataToWrite() override;

private:
// These callbacks are owned by network filters and quic session should outlive
// them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,21 @@ void EnvoyQuicServerSession::SendGoAway(quic::QuicErrorCode error_code, const st
}
}

void EnvoyQuicServerSession::OnCanWrite() {
quic::QuicServerSessionBase::OnCanWrite();
// Do not update delay close state according to connection level packet egress because that is
// equivalent to TCP transport layer egress. But only do so if the session gets chance to write.
maybeApplyDelayClosePolicy();
}

void EnvoyQuicServerSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
quic::QuicServerSessionBase::OnCryptoHandshakeEvent(event);
if (event == HANDSHAKE_CONFIRMED) {
raiseConnectionEvent(Network::ConnectionEvent::Connected);
}
}

bool EnvoyQuicServerSession::hasDataToWrite() { return HasDataToWrite(); }

} // namespace Quic
} // namespace Envoy
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class EnvoyQuicServerSession : public quic::QuicServerSessionBase,
quic::ConnectionCloseSource source) override;
void Initialize() override;
void SendGoAway(quic::QuicErrorCode error_code, const std::string& reason) override;
void OnCanWrite() override;
// quic::QuicSpdySession
void OnCryptoHandshakeEvent(CryptoHandshakeEvent event) override;

Expand All @@ -68,6 +69,9 @@ class EnvoyQuicServerSession : public quic::QuicServerSessionBase,
quic::QuicSpdyStream* CreateOutgoingBidirectionalStream() override;
quic::QuicSpdyStream* CreateOutgoingUnidirectionalStream() override;

// QuicFilterManagerConnectionImpl
bool hasDataToWrite() override;

private:
void setUpRequestDecoder(EnvoyQuicStream& stream);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,46 @@ bool QuicFilterManagerConnectionImpl::aboveHighWatermark() const {
}

void QuicFilterManagerConnectionImpl::close(Network::ConnectionCloseType type) {
if (type != Network::ConnectionCloseType::NoFlush) {
// TODO(danzh): Implement FlushWrite and FlushWriteAndDelay mode.
}
if (quic_connection_ == nullptr) {
// Already detached from quic connection.
return;
}
closeConnectionImmediately();
}

void QuicFilterManagerConnectionImpl::setDelayedCloseTimeout(std::chrono::milliseconds timeout) {
if (timeout != std::chrono::milliseconds::zero()) {
// TODO(danzh) support delayed close of connection.
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
const bool delayed_close_timeout_configured = delayed_close_timeout_.count() > 0;
if (hasDataToWrite() && type != Network::ConnectionCloseType::NoFlush) {
if (delayed_close_timeout_configured) {
// QUIC connection has unsent data and caller wants to flush them. Wait for flushing or
// timeout.
if (!inDelayedClose()) {
// Only set alarm if not in delay close mode yet.
initializeDelayedCloseTimer();
}
// Update delay close state according to current call.
if (type == Network::ConnectionCloseType::FlushWriteAndDelay) {
delayed_close_state_ = DelayedCloseState::CloseAfterFlushAndWait;
} else {
ASSERT(type == Network::ConnectionCloseType::FlushWrite);
delayed_close_state_ = DelayedCloseState::CloseAfterFlush;
}
} else {
delayed_close_state_ = DelayedCloseState::CloseAfterFlush;
}
} else if (hasDataToWrite()) {
// Quic connection has unsent data but caller wants to close right away.
ASSERT(type == Network::ConnectionCloseType::NoFlush);
quic_connection_->OnCanWrite();
closeConnectionImmediately();
} else {
// Quic connection doesn't have unsent data. It's up to the caller and
// the configuration whether to wait or not before closing.
if (delayed_close_timeout_configured &&
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe have one if for delayed_close_timeout_configured and have the above code and this code in it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. But due to the intertwine of ConnectionCloseType and the presence of delayed_close_time in config, I feel it doesn't simplify the logic much...

type == Network::ConnectionCloseType::FlushWriteAndDelay) {
if (!inDelayedClose()) {
initializeDelayedCloseTimer();
}
delayed_close_state_ = DelayedCloseState::CloseAfterFlushAndWait;
} else {
closeConnectionImmediately();
}
}
}

Expand Down Expand Up @@ -102,6 +128,21 @@ void QuicFilterManagerConnectionImpl::adjustBytesToSend(int64_t delta) {
write_buffer_watermark_simulation_.checkLowWatermark(bytes_to_send_);
}

void QuicFilterManagerConnectionImpl::maybeApplyDelayClosePolicy() {
if (!inDelayedClose()) {
return;
}
if (hasDataToWrite() || delayed_close_state_ == DelayedCloseState::CloseAfterFlushAndWait) {
if (delayed_close_timer_ != nullptr) {
// Re-arm delay close timer on every write event if there are still data
// buffered or the connection close is supposed to be delayed.
delayed_close_timer_->enableTimer(delayed_close_timeout_);
}
} else {
closeConnectionImmediately();
}
}

void QuicFilterManagerConnectionImpl::onConnectionCloseEvent(
const quic::QuicConnectionCloseFrame& frame, quic::ConnectionCloseSource source) {
transport_failure_reason_ = absl::StrCat(quic::QuicErrorCodeToString(frame.quic_error_code),
Expand All @@ -115,6 +156,9 @@ void QuicFilterManagerConnectionImpl::onConnectionCloseEvent(
}

void QuicFilterManagerConnectionImpl::closeConnectionImmediately() {
if (quic_connection_ == nullptr) {
return;
}
quic_connection_->CloseConnection(quic::QUIC_NO_ERROR, "Closed by application",
quic::ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
quic_connection_ = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ class QuicFilterManagerConnectionImpl : public Network::ConnectionImplBase {
void noDelay(bool /*enable*/) override {
// No-op. TCP_NODELAY doesn't apply to UDP.
}
void setDelayedCloseTimeout(std::chrono::milliseconds timeout) override;
void readDisable(bool /*disable*/) override { NOT_REACHED_GCOVR_EXCL_LINE; }
void detectEarlyCloseWhenReadDisabled(bool /*value*/) override { NOT_REACHED_GCOVR_EXCL_LINE; }
bool readEnabled() const override { return true; }
Expand Down Expand Up @@ -98,13 +97,20 @@ class QuicFilterManagerConnectionImpl : public Network::ConnectionImplBase {
// streams, and run watermark check.
void adjustBytesToSend(int64_t delta);

// Called after each write when a previous connection close call is postponed.
void maybeApplyDelayClosePolicy();

uint32_t bytesToSend() { return bytes_to_send_; }

protected:
// Propagate connection close to network_connection_callbacks_.
void onConnectionCloseEvent(const quic::QuicConnectionCloseFrame& frame,
quic::ConnectionCloseSource source);

void closeConnectionImmediately() override;

virtual bool hasDataToWrite() PURE;

EnvoyQuicConnection* quic_connection_{nullptr};

private:
Expand Down
Loading