diff --git a/include/envoy/buffer/buffer.h b/include/envoy/buffer/buffer.h index d9cc51ee2223a..5add75fd24fd6 100644 --- a/include/envoy/buffer/buffer.h +++ b/include/envoy/buffer/buffer.h @@ -64,6 +64,19 @@ class Instance { */ virtual void* linearize(uint32_t size) PURE; + /** + * Move a buffer into this buffer. As little copying is done as possible. + * @param rhs supplies the buffer to move. + */ + virtual void move(Instance& rhs) PURE; + + /** + * Move a portion of a buffer into this buffer. As little copying is done as possible. + * @param rhs supplies the buffer to move. + * @param length supplies the amount of data to move. + */ + virtual void move(Instance& rhs, uint64_t length) PURE; + /** * Search for an occurence of a buffer within the larger buffer. * @param data supplies the data to search for. diff --git a/include/envoy/http/codec.h b/include/envoy/http/codec.h index 619d04d7f06e1..e0ffbfc3c81b1 100644 --- a/include/envoy/http/codec.h +++ b/include/envoy/http/codec.h @@ -25,10 +25,10 @@ class StreamEncoder { /** * Encode a data frame. - * @param data supplies the data to encode. + * @param data supplies the data to encode. The data may be moved by the encoder. * @param end_stream supplies whether this is the last data frame. */ - virtual void encodeData(const Buffer::Instance& data, bool end_stream) PURE; + virtual void encodeData(Buffer::Instance& data, bool end_stream) PURE; /** * Encode trailers. This implicitly ends the stream. @@ -62,7 +62,7 @@ class StreamDecoder { * @param data supplies the decoded data. * @param end_stream supplies whether this is the last data frame. */ - virtual void decodeData(const Buffer::Instance& data, bool end_stream) PURE; + virtual void decodeData(Buffer::Instance& data, bool end_stream) PURE; /** * Called with a decoded trailers frame. This implicitly ends the stream. diff --git a/source/common/buffer/buffer_impl.cc b/source/common/buffer/buffer_impl.cc index 5fe94053b2740..a3acf6d91acf4 100644 --- a/source/common/buffer/buffer_impl.cc +++ b/source/common/buffer/buffer_impl.cc @@ -43,6 +43,23 @@ void* ImplBase::linearize(uint32_t size) { return evbuffer_pullup(&buffer(), size); } +void ImplBase::move(Instance& rhs) { + // We do the static cast here because in practice we only have one buffer implementation right + // now and this is safe. Using the evbuffer move routines require having access to both evbuffers. + // This is a reasonable compromise in a high performance path where we want to maintain an + // abstraction in case we get rid of evbuffer later. + int rc = evbuffer_add_buffer(&buffer(), &static_cast(rhs).buffer()); + ASSERT(rc == 0); + UNREFERENCED_PARAMETER(rc); +} + +void ImplBase::move(Instance& rhs, uint64_t length) { + // See move() above for why we do the static cast. + int rc = evbuffer_remove_buffer(&static_cast(rhs).buffer(), &buffer(), length); + ASSERT(static_cast(rc) == length); + UNREFERENCED_PARAMETER(rc); +} + ssize_t ImplBase::search(const void* data, uint64_t size, size_t start) const { evbuffer_ptr start_ptr; if (-1 == evbuffer_ptr_set(&buffer(), &start_ptr, start, EVBUFFER_PTR_SET)) { diff --git a/source/common/buffer/buffer_impl.h b/source/common/buffer/buffer_impl.h index 2c3b74b6ed1dc..4d7aaa73c481c 100644 --- a/source/common/buffer/buffer_impl.h +++ b/source/common/buffer/buffer_impl.h @@ -19,6 +19,8 @@ class ImplBase : public Instance { uint64_t getRawSlices(RawSlice* out, uint64_t out_size) const override; uint64_t length() const override; void* linearize(uint32_t size) override; + void move(Instance& rhs) override; + void move(Instance& rhs, uint64_t length) override; ssize_t search(const void* data, uint64_t size, size_t start) const override; private: diff --git a/source/common/filter/echo.cc b/source/common/filter/echo.cc index dced746131b23..32e5d352d40bc 100644 --- a/source/common/filter/echo.cc +++ b/source/common/filter/echo.cc @@ -10,7 +10,7 @@ namespace Filter { Network::FilterStatus Echo::onData(Buffer::Instance& data) { conn_log_trace("echo: got {} bytes", read_callbacks_->connection(), data.length()); read_callbacks_->connection().write(data); - data.drain(data.length()); + ASSERT(0 == data.length()); return Network::FilterStatus::StopIteration; } diff --git a/source/common/filter/tcp_proxy.cc b/source/common/filter/tcp_proxy.cc index 4d8524ba95a08..e768082734779 100644 --- a/source/common/filter/tcp_proxy.cc +++ b/source/common/filter/tcp_proxy.cc @@ -107,7 +107,7 @@ Network::FilterStatus TcpProxy::onData(Buffer::Instance& data) { conn_log_trace("received {} bytes", read_callbacks_->connection(), data.length()); upstream_connection_->write(data); - data.drain(data.length()); + ASSERT(0 == data.length()); return Network::FilterStatus::StopIteration; } @@ -147,7 +147,7 @@ void TcpProxy::onUpstreamBufferChange(Network::ConnectionBufferType type, uint64 void TcpProxy::onUpstreamData(Buffer::Instance& data) { read_callbacks_->connection().write(data); - data.drain(data.length()); + ASSERT(0 == data.length()); } void TcpProxy::onUpstreamEvent(uint32_t event) { diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index c5a716d68ecff..602316772da0c 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -70,10 +70,9 @@ void AsyncRequestImpl::encodeData(Buffer::Instance& data, bool end_stream) { log_trace("async http request response data (length={} end_stream={})", data.length(), end_stream); if (!response_->body()) { - response_->body(Buffer::InstancePtr{new Buffer::OwnedImpl(data)}); - } else { - response_->body()->add(data); + response_->body(Buffer::InstancePtr{new Buffer::OwnedImpl()}); } + response_->body()->move(data); if (end_stream) { onComplete(); diff --git a/source/common/http/codec_wrappers.h b/source/common/http/codec_wrappers.h index 04c4f568d4c28..47136461fa4f0 100644 --- a/source/common/http/codec_wrappers.h +++ b/source/common/http/codec_wrappers.h @@ -22,7 +22,7 @@ class StreamDecoderWrapper : public StreamDecoder { } } - void decodeData(const Buffer::Instance& data, bool end_stream) override { + void decodeData(Buffer::Instance& data, bool end_stream) override { if (end_stream) { onPreDecodeComplete(); } @@ -66,7 +66,7 @@ class StreamEncoderWrapper : public StreamEncoder { } } - void encodeData(const Buffer::Instance& data, bool end_stream) override { + void encodeData(Buffer::Instance& data, bool end_stream) override { inner_.encodeData(data, end_stream); if (end_stream) { onEncodeComplete(); diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index f41ab0522a5cf..b98417ecc4cb5 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -420,8 +420,7 @@ void ConnectionManagerImpl::ActiveStream::decodeHeaders(ActiveStreamDecoderFilte } } -void ConnectionManagerImpl::ActiveStream::decodeData(const Buffer::Instance& data, - bool end_stream) { +void ConnectionManagerImpl::ActiveStream::decodeData(Buffer::Instance& data, bool end_stream) { request_info_.bytes_received_ += data.length(); ASSERT(!state_.remote_complete_); state_.remote_complete_ = end_stream; @@ -429,10 +428,7 @@ void ConnectionManagerImpl::ActiveStream::decodeData(const Buffer::Instance& dat stream_log_debug("request end stream", *this); } - // We are fed data directly from codec buffers. Perform a single copy here so that filters can - // modify the data and potentially take ownership of it. - Buffer::OwnedImpl data_copy(data); - decodeData(nullptr, data_copy, end_stream); + decodeData(nullptr, data, end_stream); } void ConnectionManagerImpl::ActiveStream::decodeData(ActiveStreamDecoderFilter* filter, @@ -702,7 +698,7 @@ void ConnectionManagerImpl::ActiveStreamFilterBase::commonHandleBufferData( if (!bufferedData()) { bufferedData().reset(new Buffer::OwnedImpl()); } - bufferedData()->add(provided_data); + bufferedData()->move(provided_data); } } diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index fb2c9a0a78ef2..082f884e6dbbe 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -341,7 +341,7 @@ class ConnectionManagerImpl : Logger::Loggable, // Http::StreamDecoder void decodeHeaders(HeaderMapPtr&& headers, bool end_stream) override; - void decodeData(const Buffer::Instance& data, bool end_stream) override; + void decodeData(Buffer::Instance& data, bool end_stream) override; void decodeTrailers(HeaderMapPtr&& trailers) override; // Http::FilterChainFactoryCallbacks diff --git a/source/common/http/http1/codec_impl.cc b/source/common/http/http1/codec_impl.cc index 10dbc9c78966e..b6dc28478439a 100644 --- a/source/common/http/http1/codec_impl.cc +++ b/source/common/http/http1/codec_impl.cc @@ -82,7 +82,7 @@ void StreamEncoderImpl::encodeHeaders(const HeaderMap& headers, bool end_stream) } } -void StreamEncoderImpl::encodeData(const Buffer::Instance& data, bool end_stream) { +void StreamEncoderImpl::encodeData(Buffer::Instance& data, bool end_stream) { // end_stream may be indicated with a zero length data buffer. If that is the case, so not // atually write the zero length buffer out. if (data.length() > 0) { @@ -90,7 +90,7 @@ void StreamEncoderImpl::encodeData(const Buffer::Instance& data, bool end_stream output_buffer_.add(fmt::format("{:x}\r\n", data.length())); } - output_buffer_.add(data); + output_buffer_.move(data); if (chunk_encoding_) { output_buffer_.add(CRLF); @@ -378,8 +378,7 @@ void ServerConnectionImpl::onBody(const char* data, size_t length) { ASSERT(!deferred_end_stream_headers_); if (active_request_) { conn_log_trace("body size={}", connection_, length); - Buffer::OwnedImpl buffer; - buffer.add(data, length); + Buffer::OwnedImpl buffer(data, length); active_request_->request_decoder_->decodeData(buffer, false); } } diff --git a/source/common/http/http1/codec_impl.h b/source/common/http/http1/codec_impl.h index 20140e4ace1e2..f53867d40888d 100644 --- a/source/common/http/http1/codec_impl.h +++ b/source/common/http/http1/codec_impl.h @@ -28,7 +28,7 @@ class StreamEncoderImpl : public StreamEncoder, public Stream, Logger::Loggable< // Http::StreamEncoder void encodeHeaders(const HeaderMap& headers, bool end_stream) override; - void encodeData(const Buffer::Instance& data, bool end_stream) override; + void encodeData(Buffer::Instance& data, bool end_stream) override; void encodeTrailers(const HeaderMap& trailers) override; Stream& getStream() override { return *this; } diff --git a/source/common/http/http2/codec_impl.cc b/source/common/http/http2/codec_impl.cc index 2ea79445dde38..6b7b8604461d9 100644 --- a/source/common/http/http2/codec_impl.cc +++ b/source/common/http/http2/codec_impl.cc @@ -139,23 +139,9 @@ int ConnectionImpl::StreamImpl::onDataSourceSend(const uint8_t* framehd, size_t static const uint64_t FRAME_HEADER_SIZE = 9; // TODO: Back pressure. - uint64_t length_remaining = length; Buffer::OwnedImpl output(framehd, FRAME_HEADER_SIZE); - uint64_t num_slices = pending_send_data_.getRawSlices(nullptr, 0); - Buffer::RawSlice slices[num_slices]; - pending_send_data_.getRawSlices(slices, num_slices); - for (Buffer::RawSlice& slice : slices) { - if (length_remaining == 0) { - break; - } - - uint64_t data_to_write = std::min(length_remaining, slice.len_); - output.add(slice.mem_, data_to_write); - length_remaining -= data_to_write; - } - + output.move(pending_send_data_, length); parent_.connection_.write(output); - pending_send_data_.drain(length); return 0; } @@ -176,10 +162,10 @@ void ConnectionImpl::ServerStreamImpl::submitHeaders(const std::vectorencodeData(data, end_stream); diff --git a/source/common/http/pooled_stream_encoder.h b/source/common/http/pooled_stream_encoder.h index 4cbc693d5a454..fb9e63da778dd 100644 --- a/source/common/http/pooled_stream_encoder.h +++ b/source/common/http/pooled_stream_encoder.h @@ -38,7 +38,7 @@ class PooledStreamEncoder final : public Logger::Loggable, PooledStreamEncoderCallbacks& callbacks); void encodeHeaders(const HeaderMap& headers, bool end_stream); - void encodeData(const Buffer::Instance& data, bool end_stream); + void encodeData(Buffer::Instance& data, bool end_stream); void encodeTrailers(const HeaderMap& trailers); void resetStream(); uint64_t connectionId() { return connection_id_; } diff --git a/source/common/network/connection_impl.cc b/source/common/network/connection_impl.cc index 780bdd2d2e00d..4ee0c612cb9fc 100644 --- a/source/common/network/connection_impl.cc +++ b/source/common/network/connection_impl.cc @@ -38,7 +38,8 @@ ConnectionImpl::ConnectionImpl(Event::DispatcherImpl& dispatcher, : dispatcher_(dispatcher), bev_(std::move(bev)), remote_address_(remote_address), id_(++next_global_id_), filter_manager_(*this, *this), redispatch_read_event_(dispatcher.createTimer([this]() -> void { onRead(); })), - read_buffer_(bufferevent_get_input(bev_.get())) { + read_buffer_(bufferevent_get_input(bev_.get())), + write_buffer_(bufferevent_get_output(bev_.get())) { enableCallbacks(true, false, true); bufferevent_enable(bev_.get(), EV_READ | EV_WRITE); @@ -262,14 +263,7 @@ void ConnectionImpl::write(Buffer::Instance& data) { if (data.length() > 0) { conn_log_trace("writing {} bytes", *this, data.length()); - uint64_t num_slices = data.getRawSlices(nullptr, 0); - Buffer::RawSlice slices[num_slices]; - data.getRawSlices(slices, num_slices); - for (Buffer::RawSlice& slice : slices) { - int rc = bufferevent_write(bev_.get(), slice.mem_, slice.len_); - ASSERT(rc == 0); - UNREFERENCED_PARAMETER(rc); - } + write_buffer_.move(data); } } diff --git a/source/common/network/connection_impl.h b/source/common/network/connection_impl.h index 7f8d149616ef7..16cdc0145d391 100644 --- a/source/common/network/connection_impl.h +++ b/source/common/network/connection_impl.h @@ -108,6 +108,7 @@ class ConnectionImpl : public virtual Connection, void fakeBufferDrain(ConnectionBufferType type, evbuffer* buffer); Buffer::WrappedImpl read_buffer_; + Buffer::WrappedImpl write_buffer_; Buffer::Instance* current_write_buffer_{}; }; diff --git a/source/common/router/router.cc b/source/common/router/router.cc index 0d59647760b3f..c413777243294 100644 --- a/source/common/router/router.cc +++ b/source/common/router/router.cc @@ -221,15 +221,24 @@ void Filter::sendNoHealthyUpstreamResponse() { } Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_stream) { - upstream_request_->upstream_encoder_->encodeData(data, end_stream); + bool buffering = (retry_state_ && retry_state_->enabled()) || do_shadowing_; + + // If we are going to buffer for retries or shadowing, we need to make a copy before encoding + // since it's all moves from here on. + if (buffering) { + Buffer::OwnedImpl copy(data); + upstream_request_->upstream_encoder_->encodeData(copy, end_stream); + } else { + upstream_request_->upstream_encoder_->encodeData(data, end_stream); + } + if (end_stream) { onRequestComplete(); } // If we are potentially going to retry or shadow this request we need to buffer. - return (retry_state_ && retry_state_->enabled()) || do_shadowing_ - ? Http::FilterDataStatus::StopIterationAndBuffer - : Http::FilterDataStatus::StopIterationNoBuffer; + return buffering ? Http::FilterDataStatus::StopIterationAndBuffer + : Http::FilterDataStatus::StopIterationNoBuffer; } Http::FilterTrailersStatus Filter::decodeTrailers(Http::HeaderMap& trailers) { @@ -416,13 +425,12 @@ void Filter::onUpstreamHeaders(Http::HeaderMapPtr&& headers, bool end_stream) { callbacks_->encodeHeaders(std::move(headers), end_stream); } -void Filter::onUpstreamData(const Buffer::Instance& data, bool end_stream) { +void Filter::onUpstreamData(Buffer::Instance& data, bool end_stream) { if (end_stream) { onUpstreamComplete(); } - Buffer::OwnedImpl copy(data); - callbacks_->encodeData(copy, end_stream); + callbacks_->encodeData(data, end_stream); } void Filter::onUpstreamTrailers(Http::HeaderMapPtr&& trailers) { @@ -500,8 +508,9 @@ void Filter::doRetry() { // It's possible we got immediately reset. if (upstream_request_) { if (callbacks_->decodingBuffer()) { - upstream_request_->upstream_encoder_->encodeData(*callbacks_->decodingBuffer(), - !downstream_trailers_); + // If we are doing a retry we need to make a copy. + Buffer::OwnedImpl copy(*callbacks_->decodingBuffer()); + upstream_request_->upstream_encoder_->encodeData(copy, !downstream_trailers_); } if (downstream_trailers_) { @@ -529,7 +538,7 @@ void Filter::UpstreamRequest::decodeHeaders(Http::HeaderMapPtr&& headers, bool e parent_.onUpstreamHeaders(std::move(headers), end_stream); } -void Filter::UpstreamRequest::decodeData(const Buffer::Instance& data, bool end_stream) { +void Filter::UpstreamRequest::decodeData(Buffer::Instance& data, bool end_stream) { parent_.onUpstreamData(data, end_stream); } diff --git a/source/common/router/router.h b/source/common/router/router.h index c87be5e2627cf..833f43640c7ef 100644 --- a/source/common/router/router.h +++ b/source/common/router/router.h @@ -118,7 +118,7 @@ class Filter : Logger::Loggable, public Http::StreamDecoderF // Http::StreamDecoder void decodeHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override; - void decodeData(const Buffer::Instance& data, bool end_stream) override; + void decodeData(Buffer::Instance& data, bool end_stream) override; void decodeTrailers(Http::HeaderMapPtr&& trailers) override; // Http::StreamCallbacks @@ -159,7 +159,7 @@ class Filter : Logger::Loggable, public Http::StreamDecoderF void onResetStream(); void onResponseTimeout(); void onUpstreamHeaders(Http::HeaderMapPtr&& headers, bool end_stream); - void onUpstreamData(const Buffer::Instance& data, bool end_stream); + void onUpstreamData(Buffer::Instance& data, bool end_stream); void onUpstreamTrailers(Http::HeaderMapPtr&& trailers); void onUpstreamComplete(); void onUpstreamReset(UpstreamResetType type, diff --git a/source/common/upstream/health_checker_impl.h b/source/common/upstream/health_checker_impl.h index b7a698f197781..a9e9984ed41a7 100644 --- a/source/common/upstream/health_checker_impl.h +++ b/source/common/upstream/health_checker_impl.h @@ -125,7 +125,7 @@ class HttpHealthCheckerImpl : public HealthCheckerImplBase { // Http::StreamDecoder void decodeHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override; - void decodeData(const Buffer::Instance&, bool end_stream) override { + void decodeData(Buffer::Instance&, bool end_stream) override { if (end_stream) { onResponseComplete(); } diff --git a/test/common/http/conn_manager_impl_test.cc b/test/common/http/conn_manager_impl_test.cc index 6bf710250ac35..2ec938fd28115 100644 --- a/test/common/http/conn_manager_impl_test.cc +++ b/test/common/http/conn_manager_impl_test.cc @@ -515,7 +515,10 @@ TEST_F(HttpConnectionManagerImplTest, DoubleBuffering) { NiceMock encoder; Http::StreamDecoder* decoder = nullptr; + + // The data will get moved so we need to have a copy to compare against. Buffer::OwnedImpl fake_data("hello"); + Buffer::OwnedImpl fake_data_copy("hello"); EXPECT_CALL(*codec_, dispatch(_)) .WillOnce(Invoke([&](Buffer::Instance&) -> void { decoder = &conn_manager_->newStream(encoder); @@ -540,7 +543,7 @@ TEST_F(HttpConnectionManagerImplTest, DoubleBuffering) { // data to have been kept inline as it moves through. EXPECT_CALL(*decoder_filter3, decodeHeaders(_, false)) .WillOnce(Return(Http::FilterHeadersStatus::StopIteration)); - EXPECT_CALL(*decoder_filter3, decodeData(BufferEqual(&fake_data), true)) + EXPECT_CALL(*decoder_filter3, decodeData(BufferEqual(&fake_data_copy), true)) .WillOnce(Return(Http::FilterDataStatus::StopIterationNoBuffer)); decoder_filter2->callbacks_->continueDecoding(); } diff --git a/test/common/http/http2/codec_impl_test.cc b/test/common/http/http2/codec_impl_test.cc index 243d5facd3a60..ac87a7722e9bd 100644 --- a/test/common/http/http2/codec_impl_test.cc +++ b/test/common/http/http2/codec_impl_test.cc @@ -145,7 +145,8 @@ TEST_P(Http2CodecImplTest, TrailingHeaders) { EXPECT_CALL(request_decoder, decodeHeaders_(_, false)); request_encoder.encodeHeaders(request_headers, false); EXPECT_CALL(request_decoder, decodeData(_, false)); - request_encoder.encodeData(Buffer::OwnedImpl("hello"), false); + Buffer::OwnedImpl hello("hello"); + request_encoder.encodeData(hello, false); EXPECT_CALL(request_decoder, decodeTrailers_(_)); request_encoder.encodeTrailers(HeaderMapImpl{{"trailing", "header"}}); @@ -153,7 +154,8 @@ TEST_P(Http2CodecImplTest, TrailingHeaders) { EXPECT_CALL(response_decoder, decodeHeaders_(_, false)); response_encoder->encodeHeaders(response_headers, false); EXPECT_CALL(response_decoder, decodeData(_, false)); - response_encoder->encodeData(Buffer::OwnedImpl("world"), false); + Buffer::OwnedImpl world("world"); + response_encoder->encodeData(world, false); EXPECT_CALL(response_decoder, decodeTrailers_(_)); response_encoder->encodeTrailers(HeaderMapImpl{{"trailing", "header"}}); } @@ -180,7 +182,8 @@ TEST_P(Http2CodecImplTest, TrailingHeadersLargeBody) { EXPECT_CALL(request_decoder, decodeHeaders_(_, false)); request_encoder.encodeHeaders(request_headers, false); EXPECT_CALL(request_decoder, decodeData(_, false)).Times(AtLeast(1)); - request_encoder.encodeData(Buffer::OwnedImpl(std::string(1024 * 1024, 'a')), false); + Buffer::OwnedImpl body(std::string(1024 * 1024, 'a')); + request_encoder.encodeData(body, false); EXPECT_CALL(request_decoder, decodeTrailers_(_)); request_encoder.encodeTrailers(HeaderMapImpl{{"trailing", "header"}}); @@ -192,7 +195,8 @@ TEST_P(Http2CodecImplTest, TrailingHeadersLargeBody) { EXPECT_CALL(response_decoder, decodeHeaders_(_, false)); response_encoder->encodeHeaders(response_headers, false); EXPECT_CALL(response_decoder, decodeData(_, false)); - response_encoder->encodeData(Buffer::OwnedImpl("world"), false); + Buffer::OwnedImpl world("world"); + response_encoder->encodeData(world, false); EXPECT_CALL(response_decoder, decodeTrailers_(_)); response_encoder->encodeTrailers(HeaderMapImpl{{"trailing", "header"}}); } diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index 3070bd79b134c..ac9f97a22d7b3 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -22,7 +22,7 @@ void FakeStream::decodeHeaders(Http::HeaderMapPtr&& headers, bool end_stream) { decoder_event_.notify_one(); } -void FakeStream::decodeData(const Buffer::Instance& data, bool end_stream) { +void FakeStream::decodeData(Buffer::Instance& data, bool end_stream) { std::unique_lock lock(lock_); end_stream_ = end_stream; body_length_ += data.length(); diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index e1100cd4a2d5a..32b54b12212ae 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -36,7 +36,7 @@ class FakeStream : public Http::StreamDecoder, public Http::StreamCallbacks { // Http::StreamDecoder void decodeHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override; - void decodeData(const Buffer::Instance& data, bool end_stream) override; + void decodeData(Buffer::Instance& data, bool end_stream) override; void decodeTrailers(Http::HeaderMapPtr&& trailers) override; // Http::StreamCallbacks diff --git a/test/integration/integration.cc b/test/integration/integration.cc index 4cdaf30bd7dd8..85500d606a67a 100644 --- a/test/integration/integration.cc +++ b/test/integration/integration.cc @@ -43,7 +43,7 @@ void IntegrationStreamDecoder::decodeHeaders(Http::HeaderMapPtr&& headers, bool } } -void IntegrationStreamDecoder::decodeData(const Buffer::Instance& data, bool end_stream) { +void IntegrationStreamDecoder::decodeData(Buffer::Instance& data, bool end_stream) { saw_end_stream_ = end_stream; uint64_t num_slices = data.getRawSlices(nullptr, 0); Buffer::RawSlice slices[num_slices]; diff --git a/test/integration/integration.h b/test/integration/integration.h index 399d906f52582..f748da86d3509 100644 --- a/test/integration/integration.h +++ b/test/integration/integration.h @@ -24,7 +24,7 @@ class IntegrationStreamDecoder : public Http::StreamDecoder, public Http::Stream // Http::StreamDecoder void decodeHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override; - void decodeData(const Buffer::Instance& data, bool end_stream) override; + void decodeData(Buffer::Instance& data, bool end_stream) override; void decodeTrailers(Http::HeaderMapPtr&& trailers) override; // Http::StreamCallbacks diff --git a/test/integration/utility.cc b/test/integration/utility.cc index 4aaef6a00c4e5..028900bdf0797 100644 --- a/test/integration/utility.cc +++ b/test/integration/utility.cc @@ -21,7 +21,7 @@ void BufferingStreamDecoder::decodeHeaders(Http::HeaderMapPtr&& headers, bool en } } -void BufferingStreamDecoder::decodeData(const Buffer::Instance& data, bool end_stream) { +void BufferingStreamDecoder::decodeData(Buffer::Instance& data, bool end_stream) { ASSERT(!complete_); complete_ = end_stream; body_.append(TestUtility::bufferToString(data)); @@ -61,7 +61,8 @@ IntegrationUtil::makeSingleRequest(uint32_t port, const std::string& method, con headers.addViaMoveValue(Http::Headers::get().Scheme, "http"); encoder.encodeHeaders(headers, body.empty()); if (!body.empty()) { - encoder.encodeData(Buffer::OwnedImpl(body), true); + Buffer::OwnedImpl body_buffer(body); + encoder.encodeData(body_buffer, true); } dispatcher->run(Event::Dispatcher::RunType::Block); diff --git a/test/integration/utility.h b/test/integration/utility.h index 828e9d92070d1..9ece48e48039c 100644 --- a/test/integration/utility.h +++ b/test/integration/utility.h @@ -20,7 +20,7 @@ class BufferingStreamDecoder : public Http::StreamDecoder, public Http::StreamCa // Http::StreamDecoder void decodeHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override; - void decodeData(const Buffer::Instance&, bool end_stream) override; + void decodeData(Buffer::Instance&, bool end_stream) override; void decodeTrailers(Http::HeaderMapPtr&& trailers) override; // Http::StreamCallbacks diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index 9257a968d9249..fd1f7aeed0cd1 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -109,7 +109,7 @@ class MockStreamDecoder : public StreamDecoder { // Http::StreamDecoder MOCK_METHOD2(decodeHeaders_, void(HeaderMapPtr& headers, bool end_stream)); - MOCK_METHOD2(decodeData, void(const Buffer::Instance& data, bool end_stream)); + MOCK_METHOD2(decodeData, void(Buffer::Instance& data, bool end_stream)); MOCK_METHOD1(decodeTrailers_, void(HeaderMapPtr& trailers)); }; @@ -142,7 +142,7 @@ class MockStreamEncoder : public StreamEncoder { // Http::StreamEncoder MOCK_METHOD2(encodeHeaders, void(const HeaderMap& headers, bool end_stream)); - MOCK_METHOD2(encodeData, void(const Buffer::Instance& data, bool end_stream)); + MOCK_METHOD2(encodeData, void(Buffer::Instance& data, bool end_stream)); MOCK_METHOD1(encodeTrailers, void(const HeaderMap& trailers)); MOCK_METHOD0(getStream, Stream&()); diff --git a/test/mocks/network/mocks.cc b/test/mocks/network/mocks.cc index 0b10870569a2f..7a4bcb23abe57 100644 --- a/test/mocks/network/mocks.cc +++ b/test/mocks/network/mocks.cc @@ -44,6 +44,11 @@ template static void initializeMockConnection(T& connection) { ON_CALL(connection, remoteAddress()).WillByDefault(ReturnRef(connection.remote_address_)); ON_CALL(connection, id()).WillByDefault(Return(connection.next_id_)); ON_CALL(connection, state()).WillByDefault(ReturnPointee(&connection.state_)); + + // The real implementation will move the buffer data into the socket. + ON_CALL(connection, write(_)) + .WillByDefault( + Invoke([](Buffer::Instance& buffer) -> void { buffer.drain(buffer.length()); })); } MockConnection::MockConnection() { initializeMockConnection(*this); }