Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions include/envoy/buffer/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "absl/container/inlined_vector.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/span.h"

namespace Envoy {
namespace Buffer {
Expand Down Expand Up @@ -55,6 +56,20 @@ class BufferFragment {
virtual void done() PURE;
};

/**
* A class to facilitate extracting buffer slices from a buffer instance.
*/
class SliceData {
public:
virtual ~SliceData() = default;
/**
* @return absl::Span<uint8_t> a span of the slice data.
*/
virtual absl::Span<uint8_t> getData() PURE;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be something like absl::Span<const uint8_t>

I had not considered const issues in the existing buffer API which was a fairly direct translation of an earlier API built on top of libevent buffers. IIRC data in Unowned slices should be considered immutable. It is technically possible for multiple unowned slices to share the same underlying storage although I think we don't rely on that yet, but in-progress enhancements including the HTTP/1.1 cache extension would benefit from the ability to provide const access to data blocks that are referenced by buffers so multiple requests can reference the same data blocks in the in-memory cache.

Is use of const data blocks compatible with your use case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My use case requires a mutable slice, but I agree that the more common case would be an immutable view of the slice. Commit df19673 attempts to provide both, with immutable as default.

};

using SliceDataPtr = std::unique_ptr<SliceData>;

/**
* A basic buffer abstraction.
*/
Expand Down Expand Up @@ -144,6 +159,13 @@ class Instance {
virtual RawSliceVector
getRawSlices(absl::optional<uint64_t> max_slices = absl::nullopt) const PURE;

/**
* Transfer ownership of the front slice to the caller. Must only be called if the
* buffer is not empty otherwise the implementation will have undefined behavior.
* @return pointer to SliceData object that wraps the front slice
*/
virtual SliceDataPtr extractFrontSlice() PURE;

/**
* @return uint64_t the total length of the buffer (not necessarily contiguous in memory).
*/
Expand Down
15 changes: 15 additions & 0 deletions source/common/buffer/buffer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,21 @@ RawSliceVector OwnedImpl::getRawSlices(absl::optional<uint64_t> max_slices) cons
return raw_slices;
}

SliceDataPtr OwnedImpl::extractFrontSlice() {
RELEASE_ASSERT(length_ > 0, "Extract called on empty buffer");
// Remove zero byte fragments from the front of the queue to ensure
// that the extracted slice has data.
while (!slices_.empty() && slices_.front()->dataSize() == 0) {
slices_.pop_front();
Comment thread
roelfdutoit marked this conversation as resolved.
}
ASSERT(!slices_.empty());
ASSERT(slices_.front());
length_ -= slices_.front()->dataSize();
auto slice = std::move(slices_.front());
slices_.pop_front();
return slice;
}

uint64_t OwnedImpl::length() const {
#ifndef NDEBUG
// When running in debug mode, verify that the precomputed length matches the sum
Expand Down
14 changes: 9 additions & 5 deletions source/common/buffer/buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace Buffer {
* |
* data()
*/
class Slice {
class Slice : public SliceData {
public:
using Reservation = RawSlice;

Expand All @@ -41,6 +41,9 @@ class Slice {
}
Comment thread
roelfdutoit marked this conversation as resolved.
Outdated
}

// SliceData
absl::Span<uint8_t> getData() override { return {base_ + data_, reservable_ - data_}; }

/**
* @return a pointer to the start of the usable content.
*/
Expand Down Expand Up @@ -117,10 +120,10 @@ class Slice {
* @param reservation a reservation obtained from a previous call to reserve().
* If the reservation is not from this Slice, commit() will return false.
* If the caller is committing fewer bytes than provided by reserve(), it
* should change the mem_ field of the reservation before calling commit().
* should change the len_ field of the reservation before calling commit().
* For example, if a caller reserve()s 4KB to do a nonblocking socket read,
* and the read only returns two bytes, the caller should set
* reservation.mem_ = 2 and then call `commit(reservation)`.
* reservation.len_ = 2 and then call `commit(reservation)`.
* @return whether the Reservation was successfully committed to the Slice.
*/
bool commit(const Reservation& reservation) {
Expand Down Expand Up @@ -539,6 +542,7 @@ class OwnedImpl : public LibEventInstance {
void copyOut(size_t start, uint64_t size, void* data) const override;
void drain(uint64_t size) override;
RawSliceVector getRawSlices(absl::optional<uint64_t> max_slices = absl::nullopt) const override;
SliceDataPtr extractFrontSlice() override;
uint64_t length() const override;
void* linearize(uint32_t size) override;
void move(Instance& rhs) override;
Expand All @@ -558,13 +562,13 @@ class OwnedImpl : public LibEventInstance {
* @param data start of the content to copy.
*
*/
void appendSliceForTest(const void* data, uint64_t size);
virtual void appendSliceForTest(const void* data, uint64_t size);

/**
* Create a new slice at the end of the buffer, and copy the supplied string into it.
* @param data the string to append to the buffer.
*/
void appendSliceForTest(absl::string_view data);
virtual void appendSliceForTest(absl::string_view data);

/**
* Describe the in-memory representation of the slices in the buffer. For use
Expand Down
15 changes: 15 additions & 0 deletions source/common/buffer/watermark_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ void WatermarkBuffer::move(Instance& rhs, uint64_t length) {
checkHighAndOverflowWatermarks();
}

SliceDataPtr WatermarkBuffer::extractFrontSlice() {
auto result = OwnedImpl::extractFrontSlice();
checkLowWatermark();
return result;
}

Api::IoCallUint64Result WatermarkBuffer::read(Network::IoHandle& io_handle, uint64_t max_length) {
Api::IoCallUint64Result result = OwnedImpl::read(io_handle, max_length);
checkHighAndOverflowWatermarks();
Expand All @@ -69,6 +75,15 @@ Api::IoCallUint64Result WatermarkBuffer::write(Network::IoHandle& io_handle) {
return result;
}

void WatermarkBuffer::appendSliceForTest(const void* data, uint64_t size) {
OwnedImpl::appendSliceForTest(data, size);
checkHighAndOverflowWatermarks();
}

void WatermarkBuffer::appendSliceForTest(absl::string_view data) {
appendSliceForTest(data.data(), data.size());
}

void WatermarkBuffer::setWatermarks(uint32_t low_watermark, uint32_t high_watermark) {
ASSERT(low_watermark < high_watermark || (high_watermark == 0 && low_watermark == 0));
uint32_t overflow_watermark_multiplier =
Expand Down
3 changes: 3 additions & 0 deletions source/common/buffer/watermark_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ class WatermarkBuffer : public OwnedImpl {
void drain(uint64_t size) override;
void move(Instance& rhs) override;
void move(Instance& rhs, uint64_t length) override;
SliceDataPtr extractFrontSlice() override;
Api::IoCallUint64Result read(Network::IoHandle& io_handle, uint64_t max_length) override;
uint64_t reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) override;
Api::IoCallUint64Result write(Network::IoHandle& io_handle) override;
void postProcess() override { checkLowWatermark(); }
void appendSliceForTest(const void* data, uint64_t size) override;
void appendSliceForTest(absl::string_view data) override;

void setWatermarks(uint32_t watermark) { setWatermarks(watermark / 2, watermark); }
void setWatermarks(uint32_t low_watermark, uint32_t high_watermark);
Expand Down
6 changes: 6 additions & 0 deletions test/common/buffer/buffer_fuzz.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "absl/container/fixed_array.h"
#include "absl/strings/match.h"
#include "absl/types/span.h"
#include "gtest/gtest.h"

namespace Envoy {
Expand Down Expand Up @@ -133,6 +134,11 @@ class StringBuffer : public Buffer::Instance {
return mutableStart();
}

Buffer::SliceDataPtr extractFrontSlice() override {
// not used
Comment thread
roelfdutoit marked this conversation as resolved.
Outdated
return Buffer::SliceDataPtr{};
}

void move(Buffer::Instance& rhs) override { move(rhs, rhs.length()); }

void move(Buffer::Instance& rhs, uint64_t length) override {
Expand Down
123 changes: 123 additions & 0 deletions test/common/buffer/owned_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,129 @@ TEST_F(OwnedImplTest, Read) {
EXPECT_THAT(buffer.describeSlicesForTest(), testing::IsEmpty());
}

TEST_F(OwnedImplTest, ExtractOwnedSlice) {
// Create a buffer with two owned slices.
Buffer::OwnedImpl buffer;
buffer.appendSliceForTest("abcde");
const uint64_t expected_length0 = 5;
buffer.appendSliceForTest("123");
const uint64_t expected_length1 = 3;
EXPECT_EQ(buffer.toString(), "abcde123");
RawSliceVector slices = buffer.getRawSlices();
EXPECT_EQ(2, slices.size());

// Extract first slice.
auto slice = buffer.extractFrontSlice();
ASSERT_TRUE(slice);
auto slice_data = slice->getData();
ASSERT_NE(slice_data.data(), nullptr);
EXPECT_EQ(slice_data.size(), expected_length0);
EXPECT_EQ("abcde",
absl::string_view(reinterpret_cast<const char*>(slice_data.data()), slice_data.size()));
EXPECT_EQ(buffer.toString(), "123");

// Re-add extracted first slice to the end of the buffer.
buffer.appendSliceForTest(slice_data.data(), slice_data.size());
EXPECT_EQ(buffer.toString(), "123abcde");

// Extract second slice, leaving only the original first slice.
slice = buffer.extractFrontSlice();
ASSERT_TRUE(slice);
slice_data = slice->getData();
ASSERT_NE(slice_data.data(), nullptr);
EXPECT_EQ(slice_data.size(), expected_length1);
EXPECT_EQ("123",
absl::string_view(reinterpret_cast<const char*>(slice_data.data()), slice_data.size()));
EXPECT_EQ(buffer.toString(), "abcde");
}
Comment thread
roelfdutoit marked this conversation as resolved.

TEST_F(OwnedImplTest, DrainThenExtractOwnedSlice) {
// Create a buffer with two owned slices.
Buffer::OwnedImpl buffer;
buffer.appendSliceForTest("abcde");
const uint64_t expected_length0 = 5;
buffer.appendSliceForTest("123");
EXPECT_EQ(buffer.toString(), "abcde123");
RawSliceVector slices = buffer.getRawSlices();
EXPECT_EQ(2, slices.size());

// Partially drain the first slice.
const uint64_t partial_drain_size = 2;
buffer.drain(partial_drain_size);
EXPECT_EQ(buffer.toString(), static_cast<const char*>("abcde123") + partial_drain_size);

// Extracted partially drained first slice, leaving the second slice.
auto slice = buffer.extractFrontSlice();
ASSERT_TRUE(slice);
auto slice_data = slice->getData();
ASSERT_NE(slice_data.data(), nullptr);
EXPECT_EQ(slice_data.size(), expected_length0 - partial_drain_size);
EXPECT_EQ(static_cast<const char*>("abcde") + partial_drain_size,
absl::string_view(reinterpret_cast<const char*>(slice_data.data()), slice_data.size()));
EXPECT_EQ(buffer.toString(), "123");
}

TEST_F(OwnedImplTest, ExtractUnownedSlice) {
// Create a buffer with an unowned slice.
std::string input{"unowned test slice"};
const size_t expected_length0 = input.size();
auto frag = OwnedBufferFragmentImpl::create(
{input.c_str(), expected_length0},
[this](const OwnedBufferFragmentImpl*) { release_callback_called_ = true; });
Buffer::OwnedImpl buffer;
buffer.addBufferFragment(*frag);
Comment thread
roelfdutoit marked this conversation as resolved.

// Add an owned slice to the end of the buffer.
EXPECT_EQ(expected_length0, buffer.length());
std::string owned_slice_content{"another slice, but owned"};
buffer.appendSliceForTest(owned_slice_content);
Comment thread
roelfdutoit marked this conversation as resolved.
Outdated
const uint64_t expected_length1 = owned_slice_content.length();

// Partially drain the unowned slice.
const uint64_t partial_drain_size = 5;
buffer.drain(partial_drain_size);
EXPECT_EQ(expected_length0 - partial_drain_size + expected_length1, buffer.length());
EXPECT_FALSE(release_callback_called_);

// Extract what remains of the unowned slice, leaving only the owned slice
auto slice = buffer.extractFrontSlice();
ASSERT_TRUE(slice);
auto slice_data = slice->getData();
ASSERT_NE(slice_data.data(), nullptr);
EXPECT_EQ(slice_data.size(), expected_length0 - partial_drain_size);
EXPECT_EQ(input.data() + partial_drain_size,
absl::string_view(reinterpret_cast<const char*>(slice_data.data()), slice_data.size()));
EXPECT_EQ(expected_length1, buffer.length());

// This test now has ownership of the unowned slice, which means that the
// release callback will only be called once the slice is destroyed.
EXPECT_FALSE(release_callback_called_);
slice.reset();
EXPECT_TRUE(release_callback_called_);
}

TEST_F(OwnedImplTest, ExtractWithDrainTracker) {
testing::InSequence s;

Buffer::OwnedImpl buffer;
buffer.add("a");

testing::MockFunction<void()> tracker1;
testing::MockFunction<void()> tracker2;
buffer.addDrainTracker(tracker1.AsStdFunction());
buffer.addDrainTracker(tracker2.AsStdFunction());

testing::MockFunction<void()> done;
EXPECT_CALL(tracker1, Call());
EXPECT_CALL(tracker2, Call());
EXPECT_CALL(done, Call());
auto slice = buffer.extractFrontSlice();
// The test now has ownership of the slice with the associated drain trackers.
// The drain trackers will only be called once the slice is destroyed.
slice.reset();
done.Call();
}

TEST_F(OwnedImplTest, DrainTracking) {
testing::InSequence s;

Expand Down
25 changes: 25 additions & 0 deletions test/common/buffer/watermark_buffer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,31 @@ TEST_F(WatermarkBufferTest, Drain) {
EXPECT_EQ(2, times_high_watermark_called_);
}

TEST_F(WatermarkBufferTest, DrainUsingExtract) {
// Similar to `Drain` test, but using extractFrontSlice() instead of drain()
buffer_.add(TEN_BYTES, 10);
ASSERT_EQ(buffer_.length(), 10);
buffer_.extractFrontSlice();
EXPECT_EQ(0, times_high_watermark_called_);
EXPECT_EQ(0, times_low_watermark_called_);

// Go above the high watermark then drain down to just at the low watermark.
buffer_.appendSliceForTest(TEN_BYTES, 5);
buffer_.appendSliceForTest(TEN_BYTES, 1);
buffer_.appendSliceForTest(TEN_BYTES, 5);
Comment thread
roelfdutoit marked this conversation as resolved.
buffer_.extractFrontSlice(); // essentially drain(5)
EXPECT_EQ(6, buffer_.length());
EXPECT_EQ(0, times_low_watermark_called_);

// Now drain below.
buffer_.extractFrontSlice(); // essentially drain(1)
Comment thread
roelfdutoit marked this conversation as resolved.
Outdated
EXPECT_EQ(1, times_low_watermark_called_);

// Going back above should trigger the high again
buffer_.add(TEN_BYTES, 10);
EXPECT_EQ(2, times_high_watermark_called_);
}

// Verify that low watermark callback is called on drain in the case where the
// high watermark is non-zero and low watermark is 0.
TEST_F(WatermarkBufferTest, DrainWithLowWatermarkOfZero) {
Expand Down