diff --git a/include/envoy/buffer/buffer.h b/include/envoy/buffer/buffer.h index aca59b31d6957..6e4f52644e37a 100644 --- a/include/envoy/buffer/buffer.h +++ b/include/envoy/buffer/buffer.h @@ -16,6 +16,7 @@ #include "absl/container/inlined_vector.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" +#include "absl/types/span.h" namespace Envoy { namespace Buffer { @@ -55,6 +56,21 @@ class BufferFragment { virtual void done() PURE; }; +/** + * A class to facilitate extracting buffer slices from a buffer instance. + */ +class SliceData { +public: + virtual ~SliceData() = default; + + /** + * @return a mutable view of the slice data. + */ + virtual absl::Span getMutableData() PURE; +}; + +using SliceDataPtr = std::unique_ptr; + /** * A basic buffer abstraction. */ @@ -144,6 +160,15 @@ class Instance { virtual RawSliceVector getRawSlices(absl::optional max_slices = absl::nullopt) const PURE; + /** + * Transfer ownership of the front slice to the caller. Must only be called if the + * buffer is not empty otherwise the implementation will have undefined behavior. + * If the underlying slice is immutable then the implementation must create and return + * a mutable slice that has a copy of the immutable data. + * @return pointer to SliceData object that wraps the front slice + */ + virtual SliceDataPtr extractMutableFrontSlice() PURE; + /** * @return uint64_t the total length of the buffer (not necessarily contiguous in memory). */ diff --git a/source/common/buffer/buffer_impl.cc b/source/common/buffer/buffer_impl.cc index 7503104ea426a..b69453c08437f 100644 --- a/source/common/buffer/buffer_impl.cc +++ b/source/common/buffer/buffer_impl.cc @@ -199,6 +199,34 @@ RawSliceVector OwnedImpl::getRawSlices(absl::optional max_slices) cons return raw_slices; } +SliceDataPtr OwnedImpl::extractMutableFrontSlice() { + RELEASE_ASSERT(length_ > 0, "Extract called on empty buffer"); + // Remove zero byte fragments from the front of the queue to ensure + // that the extracted slice has data. + while (!slices_.empty() && slices_.front()->dataSize() == 0) { + slices_.pop_front(); + } + ASSERT(!slices_.empty()); + ASSERT(slices_.front()); + auto slice = std::move(slices_.front()); + auto size = slice->dataSize(); + length_ -= size; + slices_.pop_front(); + if (!slice->isMutable()) { + // Create a mutable copy of the immutable slice data. + auto mutable_slice = OwnedSlice::create(size); + auto copy_size = mutable_slice->append(slice->data(), size); + ASSERT(copy_size == size); + // Drain trackers for the immutable slice will be called as part of the slice destructor. + return mutable_slice; + } else { + // Make sure drain trackers are called before ownership of the slice is transferred from + // the buffer to the caller. + slice->callAndClearDrainTrackers(); + return slice; + } +} + uint64_t OwnedImpl::length() const { #ifndef NDEBUG // When running in debug mode, verify that the precomputed length matches the sum diff --git a/source/common/buffer/buffer_impl.h b/source/common/buffer/buffer_impl.h index cc1981eb459b5..32c9c54eafb03 100644 --- a/source/common/buffer/buffer_impl.h +++ b/source/common/buffer/buffer_impl.h @@ -31,16 +31,23 @@ namespace Buffer { * | * data() */ -class Slice { +class Slice : public SliceData { public: using Reservation = RawSlice; - virtual ~Slice() { - for (const auto& drain_tracker : drain_trackers_) { - drain_tracker(); - } + ~Slice() override { callAndClearDrainTrackers(); } + + // SliceData + absl::Span getMutableData() override { + RELEASE_ASSERT(isMutable(), "Not allowed to call getMutableData if slice is immutable"); + return {base_ + data_, reservable_ - data_}; } + /** + * @return true if the data in the slice is mutable + */ + virtual bool isMutable() const { return false; } + /** * @return a pointer to the start of the usable content. */ @@ -117,10 +124,10 @@ class Slice { * @param reservation a reservation obtained from a previous call to reserve(). * If the reservation is not from this Slice, commit() will return false. * If the caller is committing fewer bytes than provided by reserve(), it - * should change the mem_ field of the reservation before calling commit(). + * should change the len_ field of the reservation before calling commit(). * For example, if a caller reserve()s 4KB to do a nonblocking socket read, * and the read only returns two bytes, the caller should set - * reservation.mem_ = 2 and then call `commit(reservation)`. + * reservation.len_ = 2 and then call `commit(reservation)`. * @return whether the Reservation was successfully committed to the Slice. */ bool commit(const Reservation& reservation) { @@ -200,15 +207,32 @@ class Slice { return SliceRepresentation{dataSize(), reservableSize(), capacity_}; } + /** + * Move all drain trackers from the current slice to the destination slice. + */ void transferDrainTrackersTo(Slice& destination) { destination.drain_trackers_.splice(destination.drain_trackers_.end(), drain_trackers_); ASSERT(drain_trackers_.empty()); } + /** + * Add a drain tracker to the slice. + */ void addDrainTracker(std::function drain_tracker) { drain_trackers_.emplace_back(std::move(drain_tracker)); } + /** + * Call all drain trackers associated with the slice, then clear + * the drain tracker list. + */ + void callAndClearDrainTrackers() { + for (const auto& drain_tracker : drain_trackers_) { + drain_tracker(); + } + drain_trackers_.clear(); + } + protected: Slice(uint64_t data, uint64_t reservable, uint64_t capacity) : data_(data), reservable_(reservable), capacity_(capacity) {} @@ -261,6 +285,8 @@ class OwnedSlice final : public Slice, public InlineStorage { private: OwnedSlice(uint64_t size) : Slice(0, 0, size) { base_ = storage_; } + bool isMutable() const override { return true; } + /** * Compute a slice size big enough to hold a specified amount of data. * @param data_size the minimum amount of data the slice must be able to store, in bytes. @@ -539,6 +565,7 @@ class OwnedImpl : public LibEventInstance { void copyOut(size_t start, uint64_t size, void* data) const override; void drain(uint64_t size) override; RawSliceVector getRawSlices(absl::optional max_slices = absl::nullopt) const override; + SliceDataPtr extractMutableFrontSlice() override; uint64_t length() const override; void* linearize(uint32_t size) override; void move(Instance& rhs) override; @@ -558,13 +585,13 @@ class OwnedImpl : public LibEventInstance { * @param data start of the content to copy. * */ - void appendSliceForTest(const void* data, uint64_t size); + virtual void appendSliceForTest(const void* data, uint64_t size); /** * Create a new slice at the end of the buffer, and copy the supplied string into it. * @param data the string to append to the buffer. */ - void appendSliceForTest(absl::string_view data); + virtual void appendSliceForTest(absl::string_view data); /** * Describe the in-memory representation of the slices in the buffer. For use diff --git a/source/common/buffer/watermark_buffer.cc b/source/common/buffer/watermark_buffer.cc index e3537ffe7943a..9d566be1965d8 100644 --- a/source/common/buffer/watermark_buffer.cc +++ b/source/common/buffer/watermark_buffer.cc @@ -51,6 +51,12 @@ void WatermarkBuffer::move(Instance& rhs, uint64_t length) { checkHighAndOverflowWatermarks(); } +SliceDataPtr WatermarkBuffer::extractMutableFrontSlice() { + auto result = OwnedImpl::extractMutableFrontSlice(); + checkLowWatermark(); + return result; +} + Api::IoCallUint64Result WatermarkBuffer::read(Network::IoHandle& io_handle, uint64_t max_length) { Api::IoCallUint64Result result = OwnedImpl::read(io_handle, max_length); checkHighAndOverflowWatermarks(); @@ -69,6 +75,15 @@ Api::IoCallUint64Result WatermarkBuffer::write(Network::IoHandle& io_handle) { return result; } +void WatermarkBuffer::appendSliceForTest(const void* data, uint64_t size) { + OwnedImpl::appendSliceForTest(data, size); + checkHighAndOverflowWatermarks(); +} + +void WatermarkBuffer::appendSliceForTest(absl::string_view data) { + appendSliceForTest(data.data(), data.size()); +} + void WatermarkBuffer::setWatermarks(uint32_t low_watermark, uint32_t high_watermark) { ASSERT(low_watermark < high_watermark || (high_watermark == 0 && low_watermark == 0)); uint32_t overflow_watermark_multiplier = diff --git a/source/common/buffer/watermark_buffer.h b/source/common/buffer/watermark_buffer.h index 127069307902b..de44822a56ab7 100644 --- a/source/common/buffer/watermark_buffer.h +++ b/source/common/buffer/watermark_buffer.h @@ -34,10 +34,13 @@ class WatermarkBuffer : public OwnedImpl { void drain(uint64_t size) override; void move(Instance& rhs) override; void move(Instance& rhs, uint64_t length) override; + SliceDataPtr extractMutableFrontSlice() override; Api::IoCallUint64Result read(Network::IoHandle& io_handle, uint64_t max_length) override; uint64_t reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) override; Api::IoCallUint64Result write(Network::IoHandle& io_handle) override; void postProcess() override { checkLowWatermark(); } + void appendSliceForTest(const void* data, uint64_t size) override; + void appendSliceForTest(absl::string_view data) override; void setWatermarks(uint32_t watermark) { setWatermarks(watermark / 2, watermark); } void setWatermarks(uint32_t low_watermark, uint32_t high_watermark); diff --git a/test/common/buffer/buffer_fuzz.cc b/test/common/buffer/buffer_fuzz.cc index 9c80f4655b09f..5ab1bd85c4ae4 100644 --- a/test/common/buffer/buffer_fuzz.cc +++ b/test/common/buffer/buffer_fuzz.cc @@ -133,6 +133,8 @@ class StringBuffer : public Buffer::Instance { return mutableStart(); } + Buffer::SliceDataPtr extractMutableFrontSlice() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } + void move(Buffer::Instance& rhs) override { move(rhs, rhs.length()); } void move(Buffer::Instance& rhs, uint64_t length) override { diff --git a/test/common/buffer/owned_impl_test.cc b/test/common/buffer/owned_impl_test.cc index d22b5c072c768..6f1ead8625f4f 100644 --- a/test/common/buffer/owned_impl_test.cc +++ b/test/common/buffer/owned_impl_test.cc @@ -347,6 +347,169 @@ TEST_F(OwnedImplTest, Read) { EXPECT_THAT(buffer.describeSlicesForTest(), testing::IsEmpty()); } +TEST_F(OwnedImplTest, ExtractOwnedSlice) { + // Create a buffer with two owned slices. + Buffer::OwnedImpl buffer; + buffer.appendSliceForTest("abcde"); + const uint64_t expected_length0 = 5; + buffer.appendSliceForTest("123"); + const uint64_t expected_length1 = 3; + EXPECT_EQ(buffer.toString(), "abcde123"); + RawSliceVector slices = buffer.getRawSlices(); + EXPECT_EQ(2, slices.size()); + + // Extract first slice. + auto slice = buffer.extractMutableFrontSlice(); + ASSERT_TRUE(slice); + auto slice_data = slice->getMutableData(); + ASSERT_NE(slice_data.data(), nullptr); + EXPECT_EQ(slice_data.size(), expected_length0); + EXPECT_EQ("abcde", + absl::string_view(reinterpret_cast(slice_data.data()), slice_data.size())); + EXPECT_EQ(buffer.toString(), "123"); + + // Modify and re-add extracted first slice data to the end of the buffer. + auto slice_mutable_data = slice->getMutableData(); + ASSERT_NE(slice_mutable_data.data(), nullptr); + EXPECT_EQ(slice_mutable_data.size(), expected_length0); + *slice_mutable_data.data() = 'A'; + buffer.appendSliceForTest(slice_mutable_data.data(), slice_mutable_data.size()); + EXPECT_EQ(buffer.toString(), "123Abcde"); + + // Extract second slice, leaving only the original first slice. + slice = buffer.extractMutableFrontSlice(); + ASSERT_TRUE(slice); + slice_data = slice->getMutableData(); + ASSERT_NE(slice_data.data(), nullptr); + EXPECT_EQ(slice_data.size(), expected_length1); + EXPECT_EQ("123", + absl::string_view(reinterpret_cast(slice_data.data()), slice_data.size())); + EXPECT_EQ(buffer.toString(), "Abcde"); +} + +TEST_F(OwnedImplTest, ExtractAfterSentinelDiscard) { + // Create a buffer with a sentinel and one owned slice. + Buffer::OwnedImpl buffer; + bool sentinel_discarded = false; + const Buffer::OwnedBufferFragmentImpl::Releasor sentinel_releasor{ + [&](const Buffer::OwnedBufferFragmentImpl* sentinel) { + sentinel_discarded = true; + delete sentinel; + }}; + auto sentinel = + Buffer::OwnedBufferFragmentImpl::create(absl::string_view("", 0), sentinel_releasor); + buffer.addBufferFragment(*sentinel.release()); + + buffer.appendSliceForTest("abcde"); + const uint64_t expected_length = 5; + EXPECT_EQ(buffer.toString(), "abcde"); + RawSliceVector slices = buffer.getRawSlices(); // only returns slices with data + EXPECT_EQ(1, slices.size()); + + // Extract owned slice after discarding sentinel. + EXPECT_FALSE(sentinel_discarded); + auto slice = buffer.extractMutableFrontSlice(); + ASSERT_TRUE(slice); + EXPECT_TRUE(sentinel_discarded); + auto slice_data = slice->getMutableData(); + ASSERT_NE(slice_data.data(), nullptr); + EXPECT_EQ(slice_data.size(), expected_length); + EXPECT_EQ("abcde", + absl::string_view(reinterpret_cast(slice_data.data()), slice_data.size())); + EXPECT_EQ(0, buffer.length()); +} + +TEST_F(OwnedImplTest, DrainThenExtractOwnedSlice) { + // Create a buffer with two owned slices. + Buffer::OwnedImpl buffer; + buffer.appendSliceForTest("abcde"); + const uint64_t expected_length0 = 5; + buffer.appendSliceForTest("123"); + EXPECT_EQ(buffer.toString(), "abcde123"); + RawSliceVector slices = buffer.getRawSlices(); + EXPECT_EQ(2, slices.size()); + + // Partially drain the first slice. + const uint64_t partial_drain_size = 2; + buffer.drain(partial_drain_size); + EXPECT_EQ(buffer.toString(), static_cast("abcde123") + partial_drain_size); + + // Extracted partially drained first slice, leaving the second slice. + auto slice = buffer.extractMutableFrontSlice(); + ASSERT_TRUE(slice); + auto slice_data = slice->getMutableData(); + ASSERT_NE(slice_data.data(), nullptr); + EXPECT_EQ(slice_data.size(), expected_length0 - partial_drain_size); + EXPECT_EQ(static_cast("abcde") + partial_drain_size, + absl::string_view(reinterpret_cast(slice_data.data()), slice_data.size())); + EXPECT_EQ(buffer.toString(), "123"); +} + +TEST_F(OwnedImplTest, ExtractUnownedSlice) { + // Create a buffer with an unowned slice. + std::string input{"unowned test slice"}; + const size_t expected_length0 = input.size(); + auto frag = OwnedBufferFragmentImpl::create( + {input.c_str(), expected_length0}, + [this](const OwnedBufferFragmentImpl*) { release_callback_called_ = true; }); + Buffer::OwnedImpl buffer; + buffer.addBufferFragment(*frag); + + bool drain_tracker_called{false}; + buffer.addDrainTracker([&] { drain_tracker_called = true; }); + + // Add an owned slice to the end of the buffer. + EXPECT_EQ(expected_length0, buffer.length()); + std::string owned_slice_content{"another slice, but owned"}; + buffer.add(owned_slice_content); + const uint64_t expected_length1 = owned_slice_content.length(); + + // Partially drain the unowned slice. + const uint64_t partial_drain_size = 5; + buffer.drain(partial_drain_size); + EXPECT_EQ(expected_length0 - partial_drain_size + expected_length1, buffer.length()); + EXPECT_FALSE(release_callback_called_); + EXPECT_FALSE(drain_tracker_called); + + // Extract what remains of the unowned slice, leaving only the owned slice. + auto slice = buffer.extractMutableFrontSlice(); + ASSERT_TRUE(slice); + EXPECT_TRUE(drain_tracker_called); + auto slice_data = slice->getMutableData(); + ASSERT_NE(slice_data.data(), nullptr); + EXPECT_EQ(slice_data.size(), expected_length0 - partial_drain_size); + EXPECT_EQ(input.data() + partial_drain_size, + absl::string_view(reinterpret_cast(slice_data.data()), slice_data.size())); + EXPECT_EQ(expected_length1, buffer.length()); + + // The underlying immutable unowned slice was discarded during the extract + // operation and replaced with a mutable copy. The drain trackers were + // called as part of the extract, implying that the release callback was called. + EXPECT_TRUE(release_callback_called_); +} + +TEST_F(OwnedImplTest, ExtractWithDrainTracker) { + testing::InSequence s; + + Buffer::OwnedImpl buffer; + buffer.add("a"); + + testing::MockFunction tracker1; + testing::MockFunction tracker2; + buffer.addDrainTracker(tracker1.AsStdFunction()); + buffer.addDrainTracker(tracker2.AsStdFunction()); + + testing::MockFunction done; + EXPECT_CALL(tracker1, Call()); + EXPECT_CALL(tracker2, Call()); + EXPECT_CALL(done, Call()); + auto slice = buffer.extractMutableFrontSlice(); + // The test now has ownership of the slice, but the drain trackers were + // called as part of the extract operation + done.Call(); + slice.reset(); +} + TEST_F(OwnedImplTest, DrainTracking) { testing::InSequence s; diff --git a/test/common/buffer/watermark_buffer_test.cc b/test/common/buffer/watermark_buffer_test.cc index db7fe530fcdb1..7181d36ac8589 100644 --- a/test/common/buffer/watermark_buffer_test.cc +++ b/test/common/buffer/watermark_buffer_test.cc @@ -142,6 +142,7 @@ TEST_F(WatermarkBufferTest, Drain) { buffer_.add(TEN_BYTES, 11); buffer_.drain(5); EXPECT_EQ(6, buffer_.length()); + EXPECT_EQ(1, times_high_watermark_called_); EXPECT_EQ(0, times_low_watermark_called_); // Now drain below. @@ -153,6 +154,38 @@ TEST_F(WatermarkBufferTest, Drain) { EXPECT_EQ(2, times_high_watermark_called_); } +TEST_F(WatermarkBufferTest, DrainUsingExtract) { + // Similar to `Drain` test, but using extractMutableFrontSlice() instead of drain(). + buffer_.add(TEN_BYTES, 10); + ASSERT_EQ(buffer_.length(), 10); + buffer_.extractMutableFrontSlice(); + EXPECT_EQ(0, times_high_watermark_called_); + EXPECT_EQ(0, times_low_watermark_called_); + + // Go above the high watermark then drain down to just at the low watermark. + buffer_.appendSliceForTest(TEN_BYTES, 5); + buffer_.appendSliceForTest(TEN_BYTES, 1); + buffer_.appendSliceForTest(TEN_BYTES, 5); + EXPECT_EQ(1, times_high_watermark_called_); + EXPECT_EQ(0, times_low_watermark_called_); + auto slice0 = buffer_.extractMutableFrontSlice(); // essentially drain(5) + ASSERT_TRUE(slice0); + EXPECT_EQ(slice0->getMutableData().size(), 5); + EXPECT_EQ(6, buffer_.length()); + EXPECT_EQ(0, times_low_watermark_called_); + + // Now drain below. + auto slice1 = buffer_.extractMutableFrontSlice(); // essentially drain(1) + ASSERT_TRUE(slice1); + EXPECT_EQ(slice1->getMutableData().size(), 1); + EXPECT_EQ(1, times_high_watermark_called_); + EXPECT_EQ(1, times_low_watermark_called_); + + // Going back above should trigger the high again. + buffer_.add(TEN_BYTES, 10); + EXPECT_EQ(2, times_high_watermark_called_); +} + // Verify that low watermark callback is called on drain in the case where the // high watermark is non-zero and low watermark is 0. TEST_F(WatermarkBufferTest, DrainWithLowWatermarkOfZero) {