Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions include/envoy/buffer/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@ class Instance {
*/
virtual void* linearize(uint32_t size) PURE;

/**
* Move a buffer into this buffer. As little copying is done as possible.
* @param rhs supplies the buffer to move.
*/
virtual void move(Instance& rhs) PURE;

/**
* Move a portion of a buffer into this buffer. As little copying is done as possible.
* @param rhs supplies the buffer to move.
* @param length supplies the amount of data to move.
*/
virtual void move(Instance& rhs, uint64_t length) PURE;

/**
* Search for an occurence of a buffer within the larger buffer.
* @param data supplies the data to search for.
Expand Down
6 changes: 3 additions & 3 deletions include/envoy/http/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class StreamEncoder {

/**
* Encode a data frame.
* @param data supplies the data to encode.
* @param data supplies the data to encode. The data may be moved by the encoder.
* @param end_stream supplies whether this is the last data frame.
*/
virtual void encodeData(const Buffer::Instance& data, bool end_stream) PURE;
virtual void encodeData(Buffer::Instance& data, bool end_stream) PURE;

/**
* Encode trailers. This implicitly ends the stream.
Expand Down Expand Up @@ -62,7 +62,7 @@ class StreamDecoder {
* @param data supplies the decoded data.
* @param end_stream supplies whether this is the last data frame.
*/
virtual void decodeData(const Buffer::Instance& data, bool end_stream) PURE;
virtual void decodeData(Buffer::Instance& data, bool end_stream) PURE;

/**
* Called with a decoded trailers frame. This implicitly ends the stream.
Expand Down
17 changes: 17 additions & 0 deletions source/common/buffer/buffer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ void* ImplBase::linearize(uint32_t size) {
return evbuffer_pullup(&buffer(), size);
}

void ImplBase::move(Instance& rhs) {
// We do the static cast here because in practice we only have one buffer implementation right
// now and this is safe. Using the evbuffer move routines require having access to both evbuffers.
// This is a reasonable compromise in a high performance path where we want to maintain an
// abstraction in case we get rid of evbuffer later.
int rc = evbuffer_add_buffer(&buffer(), &static_cast<ImplBase&>(rhs).buffer());
ASSERT(rc == 0);
UNREFERENCED_PARAMETER(rc);
}

void ImplBase::move(Instance& rhs, uint64_t length) {
// See move() above for why we do the static cast.
int rc = evbuffer_remove_buffer(&static_cast<ImplBase&>(rhs).buffer(), &buffer(), length);
ASSERT(static_cast<uint64_t>(rc) == length);
UNREFERENCED_PARAMETER(rc);
}

ssize_t ImplBase::search(const void* data, uint64_t size, size_t start) const {
evbuffer_ptr start_ptr;
if (-1 == evbuffer_ptr_set(&buffer(), &start_ptr, start, EVBUFFER_PTR_SET)) {
Expand Down
2 changes: 2 additions & 0 deletions source/common/buffer/buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class ImplBase : public Instance {
uint64_t getRawSlices(RawSlice* out, uint64_t out_size) const override;
uint64_t length() const override;
void* linearize(uint32_t size) override;
void move(Instance& rhs) override;
void move(Instance& rhs, uint64_t length) override;
ssize_t search(const void* data, uint64_t size, size_t start) const override;

private:
Expand Down
2 changes: 1 addition & 1 deletion source/common/filter/echo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Filter {
Network::FilterStatus Echo::onData(Buffer::Instance& data) {
conn_log_trace("echo: got {} bytes", read_callbacks_->connection(), data.length());
read_callbacks_->connection().write(data);
data.drain(data.length());
ASSERT(0 == data.length());
return Network::FilterStatus::StopIteration;
}

Expand Down
4 changes: 2 additions & 2 deletions source/common/filter/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ Network::FilterStatus TcpProxy::onData(Buffer::Instance& data) {

conn_log_trace("received {} bytes", read_callbacks_->connection(), data.length());
upstream_connection_->write(data);
data.drain(data.length());
ASSERT(0 == data.length());
return Network::FilterStatus::StopIteration;
}

Expand Down Expand Up @@ -147,7 +147,7 @@ void TcpProxy::onUpstreamBufferChange(Network::ConnectionBufferType type, uint64

void TcpProxy::onUpstreamData(Buffer::Instance& data) {
read_callbacks_->connection().write(data);
data.drain(data.length());
ASSERT(0 == data.length());
}

void TcpProxy::onUpstreamEvent(uint32_t event) {
Expand Down
5 changes: 2 additions & 3 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,9 @@ void AsyncRequestImpl::encodeData(Buffer::Instance& data, bool end_stream) {
log_trace("async http request response data (length={} end_stream={})", data.length(),
end_stream);
if (!response_->body()) {
response_->body(Buffer::InstancePtr{new Buffer::OwnedImpl(data)});
} else {
response_->body()->add(data);
response_->body(Buffer::InstancePtr{new Buffer::OwnedImpl()});
}
response_->body()->move(data);

if (end_stream) {
onComplete();
Expand Down
4 changes: 2 additions & 2 deletions source/common/http/codec_wrappers.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class StreamDecoderWrapper : public StreamDecoder {
}
}

void decodeData(const Buffer::Instance& data, bool end_stream) override {
void decodeData(Buffer::Instance& data, bool end_stream) override {
if (end_stream) {
onPreDecodeComplete();
}
Expand Down Expand Up @@ -66,7 +66,7 @@ class StreamEncoderWrapper : public StreamEncoder {
}
}

void encodeData(const Buffer::Instance& data, bool end_stream) override {
void encodeData(Buffer::Instance& data, bool end_stream) override {
inner_.encodeData(data, end_stream);
if (end_stream) {
onEncodeComplete();
Expand Down
10 changes: 3 additions & 7 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -420,19 +420,15 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(ActiveStreamDecoderFilte
}
}

void ConnectionManagerImpl::ActiveStream::decodeData(const Buffer::Instance& data,
bool end_stream) {
void ConnectionManagerImpl::ActiveStream::decodeData(Buffer::Instance& data, bool end_stream) {
request_info_.bytes_received_ += data.length();
ASSERT(!state_.remote_complete_);
state_.remote_complete_ = end_stream;
if (state_.remote_complete_) {
stream_log_debug("request end stream", *this);
}

// We are fed data directly from codec buffers. Perform a single copy here so that filters can
// modify the data and potentially take ownership of it.
Buffer::OwnedImpl data_copy(data);
decodeData(nullptr, data_copy, end_stream);
decodeData(nullptr, data, end_stream);
}

void ConnectionManagerImpl::ActiveStream::decodeData(ActiveStreamDecoderFilter* filter,
Expand Down Expand Up @@ -702,7 +698,7 @@ void ConnectionManagerImpl::ActiveStreamFilterBase::commonHandleBufferData(
if (!bufferedData()) {
bufferedData().reset(new Buffer::OwnedImpl());
}
bufferedData()->add(provided_data);
bufferedData()->move(provided_data);
}
}

Expand Down
2 changes: 1 addition & 1 deletion source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,

// Http::StreamDecoder
void decodeHeaders(HeaderMapPtr&& headers, bool end_stream) override;
void decodeData(const Buffer::Instance& data, bool end_stream) override;
void decodeData(Buffer::Instance& data, bool end_stream) override;
void decodeTrailers(HeaderMapPtr&& trailers) override;

// Http::FilterChainFactoryCallbacks
Expand Down
7 changes: 3 additions & 4 deletions source/common/http/http1/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ void StreamEncoderImpl::encodeHeaders(const HeaderMap& headers, bool end_stream)
}
}

void StreamEncoderImpl::encodeData(const Buffer::Instance& data, bool end_stream) {
void StreamEncoderImpl::encodeData(Buffer::Instance& data, bool end_stream) {
// end_stream may be indicated with a zero length data buffer. If that is the case, so not
// atually write the zero length buffer out.
if (data.length() > 0) {
if (chunk_encoding_) {
output_buffer_.add(fmt::format("{:x}\r\n", data.length()));
}

output_buffer_.add(data);
output_buffer_.move(data);

if (chunk_encoding_) {
output_buffer_.add(CRLF);
Expand Down Expand Up @@ -378,8 +378,7 @@ void ServerConnectionImpl::onBody(const char* data, size_t length) {
ASSERT(!deferred_end_stream_headers_);
if (active_request_) {
conn_log_trace("body size={}", connection_, length);
Buffer::OwnedImpl buffer;
buffer.add(data, length);
Buffer::OwnedImpl buffer(data, length);
active_request_->request_decoder_->decodeData(buffer, false);
}
}
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/http1/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class StreamEncoderImpl : public StreamEncoder, public Stream, Logger::Loggable<

// Http::StreamEncoder
void encodeHeaders(const HeaderMap& headers, bool end_stream) override;
void encodeData(const Buffer::Instance& data, bool end_stream) override;
void encodeData(Buffer::Instance& data, bool end_stream) override;
void encodeTrailers(const HeaderMap& trailers) override;
Stream& getStream() override { return *this; }

Expand Down
20 changes: 3 additions & 17 deletions source/common/http/http2/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,23 +139,9 @@ int ConnectionImpl::StreamImpl::onDataSourceSend(const uint8_t* framehd, size_t
static const uint64_t FRAME_HEADER_SIZE = 9;

// TODO: Back pressure.
uint64_t length_remaining = length;
Buffer::OwnedImpl output(framehd, FRAME_HEADER_SIZE);
uint64_t num_slices = pending_send_data_.getRawSlices(nullptr, 0);
Buffer::RawSlice slices[num_slices];
pending_send_data_.getRawSlices(slices, num_slices);
for (Buffer::RawSlice& slice : slices) {
if (length_remaining == 0) {
break;
}

uint64_t data_to_write = std::min(length_remaining, slice.len_);
output.add(slice.mem_, data_to_write);
length_remaining -= data_to_write;
}

output.move(pending_send_data_, length);
parent_.connection_.write(output);
pending_send_data_.drain(length);
return 0;
}

Expand All @@ -176,10 +162,10 @@ void ConnectionImpl::ServerStreamImpl::submitHeaders(const std::vector<nghttp2_n
UNREFERENCED_PARAMETER(rc);
}

void ConnectionImpl::StreamImpl::encodeData(const Buffer::Instance& data, bool end_stream) {
void ConnectionImpl::StreamImpl::encodeData(Buffer::Instance& data, bool end_stream) {
ASSERT(!local_end_stream_);
local_end_stream_ = end_stream;
pending_send_data_.add(data);
pending_send_data_.move(data);
if (data_deferred_) {
int rc = nghttp2_session_resume_data(parent_.session_, stream_id_);
ASSERT(rc == 0);
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/http2/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class ConnectionImpl : public virtual Connection, Logger::Loggable<Logger::Id::h

// Http::StreamEncoder
void encodeHeaders(const HeaderMap& headers, bool end_stream) override;
void encodeData(const Buffer::Instance& data, bool end_stream) override;
void encodeData(Buffer::Instance& data, bool end_stream) override;
void encodeTrailers(const HeaderMap& trailers) override;
Stream& getStream() override { return *this; }

Expand Down
4 changes: 2 additions & 2 deletions source/common/http/pooled_stream_encoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ void PooledStreamEncoder::encodeHeaders(const HeaderMap& headers, bool end_strea
}
}

void PooledStreamEncoder::encodeData(const Buffer::Instance& data, bool end_stream) {
void PooledStreamEncoder::encodeData(Buffer::Instance& data, bool end_stream) {
commonEncodePrefix(end_stream);
if (!request_encoder_) {
stream_log_trace("buffering {} bytes", *this, data.length());
buffered_request_body_.add(data);
buffered_request_body_.move(data);
} else {
stream_log_trace("proxying {} bytes", *this, data.length());
request_encoder_->encodeData(data, end_stream);
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/pooled_stream_encoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class PooledStreamEncoder final : public Logger::Loggable<Logger::Id::pool>,
PooledStreamEncoderCallbacks& callbacks);

void encodeHeaders(const HeaderMap& headers, bool end_stream);
void encodeData(const Buffer::Instance& data, bool end_stream);
void encodeData(Buffer::Instance& data, bool end_stream);
void encodeTrailers(const HeaderMap& trailers);
void resetStream();
uint64_t connectionId() { return connection_id_; }
Expand Down
12 changes: 3 additions & 9 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ ConnectionImpl::ConnectionImpl(Event::DispatcherImpl& dispatcher,
: dispatcher_(dispatcher), bev_(std::move(bev)), remote_address_(remote_address),
id_(++next_global_id_), filter_manager_(*this, *this),
redispatch_read_event_(dispatcher.createTimer([this]() -> void { onRead(); })),
read_buffer_(bufferevent_get_input(bev_.get())) {
read_buffer_(bufferevent_get_input(bev_.get())),
write_buffer_(bufferevent_get_output(bev_.get())) {

enableCallbacks(true, false, true);
bufferevent_enable(bev_.get(), EV_READ | EV_WRITE);
Expand Down Expand Up @@ -262,14 +263,7 @@ void ConnectionImpl::write(Buffer::Instance& data) {

if (data.length() > 0) {
conn_log_trace("writing {} bytes", *this, data.length());
uint64_t num_slices = data.getRawSlices(nullptr, 0);
Buffer::RawSlice slices[num_slices];
data.getRawSlices(slices, num_slices);
for (Buffer::RawSlice& slice : slices) {
int rc = bufferevent_write(bev_.get(), slice.mem_, slice.len_);
ASSERT(rc == 0);
UNREFERENCED_PARAMETER(rc);
}
write_buffer_.move(data);
}
}

Expand Down
1 change: 1 addition & 0 deletions source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class ConnectionImpl : public virtual Connection,
void fakeBufferDrain(ConnectionBufferType type, evbuffer* buffer);

Buffer::WrappedImpl read_buffer_;
Buffer::WrappedImpl write_buffer_;
Buffer::Instance* current_write_buffer_{};
};

Expand Down
29 changes: 19 additions & 10 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,24 @@ void Filter::sendNoHealthyUpstreamResponse() {
}

Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_stream) {
upstream_request_->upstream_encoder_->encodeData(data, end_stream);
bool buffering = (retry_state_ && retry_state_->enabled()) || do_shadowing_;

// If we are going to buffer for retries or shadowing, we need to make a copy before encoding
// since it's all moves from here on.
if (buffering) {
Buffer::OwnedImpl copy(data);
upstream_request_->upstream_encoder_->encodeData(copy, end_stream);
} else {
upstream_request_->upstream_encoder_->encodeData(data, end_stream);
}

if (end_stream) {
onRequestComplete();
}

// If we are potentially going to retry or shadow this request we need to buffer.
return (retry_state_ && retry_state_->enabled()) || do_shadowing_
? Http::FilterDataStatus::StopIterationAndBuffer
: Http::FilterDataStatus::StopIterationNoBuffer;
return buffering ? Http::FilterDataStatus::StopIterationAndBuffer
: Http::FilterDataStatus::StopIterationNoBuffer;
}

Http::FilterTrailersStatus Filter::decodeTrailers(Http::HeaderMap& trailers) {
Expand Down Expand Up @@ -416,13 +425,12 @@ void Filter::onUpstreamHeaders(Http::HeaderMapPtr&& headers, bool end_stream) {
callbacks_->encodeHeaders(std::move(headers), end_stream);
}

void Filter::onUpstreamData(const Buffer::Instance& data, bool end_stream) {
void Filter::onUpstreamData(Buffer::Instance& data, bool end_stream) {
if (end_stream) {
onUpstreamComplete();
}

Buffer::OwnedImpl copy(data);
callbacks_->encodeData(copy, end_stream);
callbacks_->encodeData(data, end_stream);
}

void Filter::onUpstreamTrailers(Http::HeaderMapPtr&& trailers) {
Expand Down Expand Up @@ -500,8 +508,9 @@ void Filter::doRetry() {
// It's possible we got immediately reset.
if (upstream_request_) {
if (callbacks_->decodingBuffer()) {
upstream_request_->upstream_encoder_->encodeData(*callbacks_->decodingBuffer(),
!downstream_trailers_);
// If we are doing a retry we need to make a copy.
Buffer::OwnedImpl copy(*callbacks_->decodingBuffer());
upstream_request_->upstream_encoder_->encodeData(copy, !downstream_trailers_);
}

if (downstream_trailers_) {
Expand Down Expand Up @@ -529,7 +538,7 @@ void Filter::UpstreamRequest::decodeHeaders(Http::HeaderMapPtr&& headers, bool e
parent_.onUpstreamHeaders(std::move(headers), end_stream);
}

void Filter::UpstreamRequest::decodeData(const Buffer::Instance& data, bool end_stream) {
void Filter::UpstreamRequest::decodeData(Buffer::Instance& data, bool end_stream) {
parent_.onUpstreamData(data, end_stream);
}

Expand Down
4 changes: 2 additions & 2 deletions source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class Filter : Logger::Loggable<Logger::Id::router>, public Http::StreamDecoderF

// Http::StreamDecoder
void decodeHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override;
void decodeData(const Buffer::Instance& data, bool end_stream) override;
void decodeData(Buffer::Instance& data, bool end_stream) override;
void decodeTrailers(Http::HeaderMapPtr&& trailers) override;

// Http::StreamCallbacks
Expand Down Expand Up @@ -159,7 +159,7 @@ class Filter : Logger::Loggable<Logger::Id::router>, public Http::StreamDecoderF
void onResetStream();
void onResponseTimeout();
void onUpstreamHeaders(Http::HeaderMapPtr&& headers, bool end_stream);
void onUpstreamData(const Buffer::Instance& data, bool end_stream);
void onUpstreamData(Buffer::Instance& data, bool end_stream);
void onUpstreamTrailers(Http::HeaderMapPtr&& trailers);
void onUpstreamComplete();
void onUpstreamReset(UpstreamResetType type,
Expand Down
Loading