Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions source/common/http/codec_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ void CodecClient::deleteRequest(ActiveRequest& request) {
}
}

Http::StreamEncoder& CodecClient::newStream(Http::StreamDecoder& response_decoder) {
ActiveRequestPtr request(new ActiveRequest(*this));
request->decoder_wrapper_.reset(new ResponseDecoderWrapper(response_decoder, *request));
request->encoder_ = &codec_->newStream(*request->decoder_wrapper_);
StreamEncoder& CodecClient::newStream(StreamDecoder& response_decoder) {
ActiveRequestPtr request(new ActiveRequest(*this, response_decoder));
request->encoder_ = &codec_->newStream(*request);
request->encoder_->getStream().addCallbacks(*request);
request->moveIntoList(std::move(request), active_requests_);
return *active_requests_.front()->encoder_;
Expand Down Expand Up @@ -79,7 +78,7 @@ void CodecClient::responseDecodeComplete(ActiveRequest& request) {
request.encoder_->getStream().removeCallbacks(request);
}

void CodecClient::onReset(ActiveRequest& request, Http::StreamResetReason reason) {
void CodecClient::onReset(ActiveRequest& request, StreamResetReason reason) {
conn_log_debug("request reset", *connection_);
if (codec_client_callbacks_) {
codec_client_callbacks_->onStreamReset(reason);
Expand All @@ -102,7 +101,7 @@ void CodecClient::onData(Buffer::Instance& data) {

// Don't count 408 responses where we have no active requests as protocol errors
if (!active_requests_.empty() ||
Utility::getResponseStatus(e.headers()) != enumToInt(Http::Code::RequestTimeout)) {
Utility::getResponseStatus(e.headers()) != enumToInt(Code::RequestTimeout)) {
protocol_error = true;
}
}
Expand All @@ -118,11 +117,11 @@ CodecClientProd::CodecClientProd(Type type, Network::ClientConnectionPtr&& conne
: CodecClient(type, std::move(connection), stats) {
switch (type) {
case Type::HTTP1: {
codec_.reset(new Http::Http1::ClientConnectionImpl(*connection_, *this));
codec_.reset(new Http1::ClientConnectionImpl(*connection_, *this));
break;
}
case Type::HTTP2: {
codec_.reset(new Http::Http2::ClientConnectionImpl(*connection_, *this, store, codec_options));
codec_.reset(new Http2::ClientConnectionImpl(*connection_, *this, store, codec_options));
break;
}
}
Expand Down
43 changes: 16 additions & 27 deletions source/common/http/codec_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class CodecClientCallbacks {
* Called when a stream is reset by the client.
* @param reason supplies the reset reason.
*/
virtual void onStreamReset(Http::StreamResetReason reason) PURE;
virtual void onStreamReset(StreamResetReason reason) PURE;
};

/**
Expand Down Expand Up @@ -95,9 +95,9 @@ class CodecClient : Logger::Loggable<Logger::Id::client>,
* connections. Thus, calling newStream() before the previous request has been fully encoded
* is an error. Pipelining is supported however.
* @param response_decoder supplies the decoder to use for response callbacks.
* @return Http::StreamEncoder& the encoder to use for encoding the request.
* @return StreamEncoder& the encoder to use for encoding the request.
*/
Http::StreamEncoder& newStream(Http::StreamDecoder& response_decoder);
StreamEncoder& newStream(StreamDecoder& response_decoder);

void setCodecClientCallbacks(CodecClientCallbacks& callbacks) {
codec_client_callbacks_ = &callbacks;
Expand All @@ -124,7 +124,7 @@ class CodecClient : Logger::Loggable<Logger::Id::client>,
}

const Type type_;
Http::ClientConnectionPtr codec_;
ClientConnectionPtr codec_;
Network::ClientConnectionPtr connection_;

private:
Expand All @@ -146,35 +146,24 @@ class CodecClient : Logger::Loggable<Logger::Id::client>,

struct ActiveRequest;

/**
* Wrapper for the client response decoder. We use this only for managing end of stream.
*/
struct ResponseDecoderWrapper : public StreamDecoderWrapper {
ResponseDecoderWrapper(Http::StreamDecoder& inner, ActiveRequest& parent)
: StreamDecoderWrapper(inner), parent_(parent) {}

// StreamDecoderWrapper
void onPreDecodeComplete() override { parent_.parent_.responseDecodeComplete(parent_); }
void onDecodeComplete() override {}

ActiveRequest& parent_;
};

typedef std::unique_ptr<ResponseDecoderWrapper> ResponseDecoderWrapperPtr;

/**
* Wrapper for an outstanding request. Designed for handling stream multiplexing.
*/
struct ActiveRequest : LinkedObject<ActiveRequest>,
public Event::DeferredDeletable,
public Http::StreamCallbacks {
ActiveRequest(CodecClient& parent) : parent_(parent) {}
public StreamCallbacks,
public StreamDecoderWrapper {
ActiveRequest(CodecClient& parent, StreamDecoder& inner)
: StreamDecoderWrapper(inner), parent_(parent) {}

// Http::StreamCallbacks
void onResetStream(Http::StreamResetReason reason) override { parent_.onReset(*this, reason); }
// StreamCallbacks
void onResetStream(StreamResetReason reason) override { parent_.onReset(*this, reason); }

// StreamDecoderWrapper
void onPreDecodeComplete() override { parent_.responseDecodeComplete(*this); }
void onDecodeComplete() override {}

Http::StreamEncoder* encoder_{};
ResponseDecoderWrapperPtr decoder_wrapper_;
StreamEncoder* encoder_{};
CodecClient& parent_;
};

Expand All @@ -187,7 +176,7 @@ class CodecClient : Logger::Loggable<Logger::Id::client>,
void responseDecodeComplete(ActiveRequest& request);

void deleteRequest(ActiveRequest& request);
void onReset(ActiveRequest& request, Http::StreamResetReason reason);
void onReset(ActiveRequest& request, StreamResetReason reason);
void onData(Buffer::Instance& data);

// Network::ConnectionCallbacks
Expand Down
46 changes: 46 additions & 0 deletions source/common/http/codec_helper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#pragma once

namespace Http {

class StreamCallbackHelper {
public:
void runResetCallbacks(StreamResetReason reason) {
if (reset_callbacks_run_) {
return;
}

for (StreamCallbacks* callbacks : callbacks_) {
if (callbacks) {
callbacks->onResetStream(reason);
}
}

reset_callbacks_run_ = true;
}

protected:
StreamCallbackHelper() {
// Set space for 8 callbacks (64 bytes).
callbacks_.reserve(8);
}

void addCallbacks_(StreamCallbacks& callbacks) { callbacks_.push_back(&callbacks); }

void removeCallbacks_(StreamCallbacks& callbacks) {
// For performance reasons we just clear the callback and do not resize the vector.
// Reset callbacks scale with the number of filters per request and do not get added and
// removed multiple times.
for (size_t i = 0; i < callbacks_.size(); i++) {
if (callbacks_[i] == &callbacks) {
callbacks_[i] = nullptr;
return;
}
}
}

private:
std::vector<StreamCallbacks*> callbacks_;
bool reset_callbacks_run_{};
};

} // Http
17 changes: 7 additions & 10 deletions source/common/http/http1/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "common/buffer/buffer_impl.h"
#include "common/common/assert.h"
#include "common/http/codec_helper.h"
#include "common/http/header_map_impl.h"

#include "http_parser.h"
Expand All @@ -19,23 +20,20 @@ class ConnectionImpl;
/**
* Base class for HTTP/1.1 request and response encoders.
*/
class StreamEncoderImpl : public StreamEncoder, public Stream, Logger::Loggable<Logger::Id::http> {
class StreamEncoderImpl : public StreamEncoder,
public Stream,
Logger::Loggable<Logger::Id::http>,
public StreamCallbackHelper {
public:
void runResetCallbacks(StreamResetReason reason) {
for (StreamCallbacks* callbacks : callbacks_) {
callbacks->onResetStream(reason);
}
}

// Http::StreamEncoder
void encodeHeaders(const HeaderMap& headers, bool end_stream) override;
void encodeData(Buffer::Instance& data, bool end_stream) override;
void encodeTrailers(const HeaderMap& trailers) override;
Stream& getStream() override { return *this; }

// Http::Stream
void addCallbacks(StreamCallbacks& callbacks) override { callbacks_.push_back(&callbacks); }
void removeCallbacks(StreamCallbacks& callbacks) override { callbacks_.remove(&callbacks); }
void addCallbacks(StreamCallbacks& callbacks) override { addCallbacks_(callbacks); }
void removeCallbacks(StreamCallbacks& callbacks) override { removeCallbacks_(callbacks); }
void resetStream(StreamResetReason reason) override;

protected:
Expand All @@ -61,7 +59,6 @@ class StreamEncoderImpl : public StreamEncoder, public Stream, Logger::Loggable<
*/
void endEncode();

std::list<StreamCallbacks*> callbacks_{};
bool chunk_encoding_{true};
};

Expand Down
38 changes: 18 additions & 20 deletions source/common/http/http1/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,9 @@ void ConnPoolImpl::addDrainedCallback(DrainedCb cb) {

void ConnPoolImpl::attachRequestToClient(ActiveClient& client, StreamDecoder& response_decoder,
ConnectionPool::Callbacks& callbacks) {
ASSERT(!client.request_encoder_);
ASSERT(!client.response_decoder_)
client.response_decoder_.reset(new ResponseDecoderWrapper(response_decoder, client));
client.request_encoder_.reset(new RequestEncoderWrapper(
client.codec_client_->newStream(*client.response_decoder_), client));
callbacks.onPoolReady(*client.request_encoder_, client.real_host_description_);
ASSERT(!client.stream_wrapper_);
client.stream_wrapper_.reset(new StreamWrapper(response_decoder, client));
callbacks.onPoolReady(*client.stream_wrapper_, client.real_host_description_);
}

void ConnPoolImpl::checkForDrained() {
Expand Down Expand Up @@ -102,8 +99,8 @@ void ConnPoolImpl::onConnectionEvent(ActiveClient& client, uint32_t events) {
conn_log_debug("client disconnected", *client.codec_client_);
ActiveClientPtr removed;
bool check_for_drained = true;
if (client.response_decoder_) {
if (!client.response_decoder_->complete_) {
if (client.stream_wrapper_) {
if (!client.stream_wrapper_->decode_complete_) {
if (events & Network::ConnectionEvent::LocalClose) {
host_->cluster().stats().upstream_cx_destroy_local_with_active_rq_.inc();
}
Expand Down Expand Up @@ -179,10 +176,10 @@ void ConnPoolImpl::onPendingRequestCancel(PendingRequest& request) {

void ConnPoolImpl::onResponseComplete(ActiveClient& client) {
conn_log_debug("response complete", *client.codec_client_);
if (!client.request_encoder_->encode_complete_) {
if (!client.stream_wrapper_->encode_complete_) {
conn_log_debug("response before request complete", *client.codec_client_);
onDownstreamReset(client);
} else if (client.response_decoder_->saw_close_header_) {
} else if (client.stream_wrapper_->saw_close_header_) {
conn_log_debug("saw upstream connection: close", *client.codec_client_);
onDownstreamReset(client);
} else if (client.remaining_requests_ > 0 && --client.remaining_requests_ == 0) {
Expand All @@ -195,8 +192,7 @@ void ConnPoolImpl::onResponseComplete(ActiveClient& client) {
}

void ConnPoolImpl::processIdleClient(ActiveClient& client) {
client.request_encoder_.reset();
client.response_decoder_.reset();
client.stream_wrapper_.reset();
if (pending_requests_.empty()) {
// There is nothing to service so just move the connection into the ready list.
conn_log_debug("moving to ready", *client.codec_client_);
Expand All @@ -213,23 +209,25 @@ void ConnPoolImpl::processIdleClient(ActiveClient& client) {
checkForDrained();
}

void ConnPoolImpl::RequestEncoderWrapper::onEncodeComplete() { encode_complete_ = true; }
ConnPoolImpl::StreamWrapper::StreamWrapper(StreamDecoder& response_decoder, ActiveClient& parent)
: StreamEncoderWrapper(parent.codec_client_->newStream(*this)),
StreamDecoderWrapper(response_decoder), parent_(parent) {

ConnPoolImpl::ResponseDecoderWrapper::ResponseDecoderWrapper(StreamDecoder& inner,
ActiveClient& parent)
: StreamDecoderWrapper(inner), parent_(parent) {
StreamEncoderWrapper::inner_.getStream().addCallbacks(*this);
parent_.parent_.host_->cluster().stats().upstream_rq_total_.inc();
parent_.parent_.host_->cluster().stats().upstream_rq_active_.inc();
parent_.parent_.host_->stats().rq_total_.inc();
parent_.parent_.host_->stats().rq_active_.inc();
}

ConnPoolImpl::ResponseDecoderWrapper::~ResponseDecoderWrapper() {
ConnPoolImpl::StreamWrapper::~StreamWrapper() {
parent_.parent_.host_->cluster().stats().upstream_rq_active_.dec();
parent_.parent_.host_->stats().rq_active_.dec();
}

void ConnPoolImpl::ResponseDecoderWrapper::decodeHeaders(HeaderMapPtr&& headers, bool end_stream) {
void ConnPoolImpl::StreamWrapper::onEncodeComplete() { encode_complete_ = true; }

void ConnPoolImpl::StreamWrapper::decodeHeaders(HeaderMapPtr&& headers, bool end_stream) {
if (headers->Connection() &&
0 == StringUtil::caseInsensitiveCompare(headers->Connection()->value().c_str(),
Headers::get().ConnectionValues.Close.c_str())) {
Expand All @@ -240,8 +238,8 @@ void ConnPoolImpl::ResponseDecoderWrapper::decodeHeaders(HeaderMapPtr&& headers,
StreamDecoderWrapper::decodeHeaders(std::move(headers), end_stream);
}

void ConnPoolImpl::ResponseDecoderWrapper::onDecodeComplete() {
complete_ = parent_.request_encoder_->encode_complete_;
void ConnPoolImpl::StreamWrapper::onDecodeComplete() {
decode_complete_ = encode_complete_;
parent_.parent_.onResponseComplete(parent_);
}

Expand Down
34 changes: 12 additions & 22 deletions source/common/http/http1/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,39 +36,30 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
protected:
struct ActiveClient;

struct RequestEncoderWrapper : public StreamEncoderWrapper, public StreamCallbacks {
RequestEncoderWrapper(StreamEncoder& inner, ActiveClient& parent)
: StreamEncoderWrapper(inner), parent_(parent) {
inner.getStream().addCallbacks(*this);
}
struct StreamWrapper : public StreamEncoderWrapper,
public StreamDecoderWrapper,
public StreamCallbacks {
StreamWrapper(StreamDecoder& response_decoder, ActiveClient& parent);
~StreamWrapper();

// StreamEncoderWrapper
void onEncodeComplete() override;

// Http::StreamCallbacks
void onResetStream(StreamResetReason) override { parent_.parent_.onDownstreamReset(parent_); }

ActiveClient& parent_;
bool encode_complete_{};
};

typedef std::unique_ptr<RequestEncoderWrapper> RequestEncoderWrapperPtr;

struct ResponseDecoderWrapper : public StreamDecoderWrapper {
ResponseDecoderWrapper(StreamDecoder& inner, ActiveClient& parent);
~ResponseDecoderWrapper();

// StreamDecoderWrapper
void decodeHeaders(HeaderMapPtr&& headers, bool end_stream) override;
void onPreDecodeComplete() override {}
void onDecodeComplete() override;

// Http::StreamCallbacks
void onResetStream(StreamResetReason) override { parent_.parent_.onDownstreamReset(parent_); }

ActiveClient& parent_;
bool encode_complete_{};
bool saw_close_header_{};
bool complete_{};
bool decode_complete_{};
};

typedef std::unique_ptr<ResponseDecoderWrapper> ResponseDecoderWrapperPtr;
typedef std::unique_ptr<StreamWrapper> StreamWrapperPtr;

struct ActiveClient : LinkedObject<ActiveClient>,
public Network::ConnectionCallbacks,
Expand All @@ -86,8 +77,7 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
ConnPoolImpl& parent_;
CodecClientPtr codec_client_;
Upstream::HostDescriptionPtr real_host_description_;
RequestEncoderWrapperPtr request_encoder_;
ResponseDecoderWrapperPtr response_decoder_;
StreamWrapperPtr stream_wrapper_;
Event::TimerPtr connect_timer_;
Stats::TimespanPtr conn_length_;
uint64_t remaining_requests_;
Expand Down
12 changes: 0 additions & 12 deletions source/common/http/http2/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,18 +203,6 @@ void ConnectionImpl::StreamImpl::resetStreamWorker(StreamResetReason reason) {
UNREFERENCED_PARAMETER(rc);
}

void ConnectionImpl::StreamImpl::runResetCallbacks(StreamResetReason reason) {
if (reset_callbacks_run_) {
return;
}

for (StreamCallbacks* callbacks : callbacks_) {
callbacks->onResetStream(reason);
}

reset_callbacks_run_ = true;
}

ConnectionImpl::~ConnectionImpl() { nghttp2_session_del(session_); }

void ConnectionImpl::dispatch(Buffer::Instance& data) {
Expand Down
Loading