Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions include/envoy/buffer/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "absl/container/inlined_vector.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/span.h"

namespace Envoy {
namespace Buffer {
Expand Down Expand Up @@ -55,6 +56,21 @@ class BufferFragment {
virtual void done() PURE;
};

/**
* A class to facilitate extracting buffer slices from a buffer instance.
*/
class SliceData {
public:
virtual ~SliceData() = default;

/**
* @return a mutable view of the slice data.
*/
virtual absl::Span<uint8_t> getMutableData() PURE;
};

using SliceDataPtr = std::unique_ptr<SliceData>;

/**
* A basic buffer abstraction.
*/
Expand Down Expand Up @@ -144,6 +160,15 @@ class Instance {
virtual RawSliceVector
getRawSlices(absl::optional<uint64_t> max_slices = absl::nullopt) const PURE;

/**
* Transfer ownership of the front slice to the caller. Must only be called if the
* buffer is not empty otherwise the implementation will have undefined behavior.
* If the underlying slice is immutable then the implementation must create and return
* a mutable slice that has a copy of the immutable data.
* @return pointer to SliceData object that wraps the front slice
*/
virtual SliceDataPtr extractMutableFrontSlice() PURE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry for the back and forth about APIs, but I'ld like your opinion on the following:
Should we focus on the mutable slice case and avoid the non mutable case entirely. For OwnedSlices, the extraction method can just return the slice, but in the case of UnownedSlice it would force copy on extraction. Effectively, keep extractMutableFrontSlice and getMutableData methods from the APIs added in this PR, but remove extractFrontSlice, getData and isMutable.

For consistency between these two cases, I think that drain trackers should be cleared from OwnedSlices as part of the extraction process.


/**
* @return uint64_t the total length of the buffer (not necessarily contiguous in memory).
*/
Expand Down
28 changes: 28 additions & 0 deletions source/common/buffer/buffer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,34 @@ RawSliceVector OwnedImpl::getRawSlices(absl::optional<uint64_t> max_slices) cons
return raw_slices;
}

SliceDataPtr OwnedImpl::extractMutableFrontSlice() {
RELEASE_ASSERT(length_ > 0, "Extract called on empty buffer");
// Remove zero byte fragments from the front of the queue to ensure
// that the extracted slice has data.
while (!slices_.empty() && slices_.front()->dataSize() == 0) {
slices_.pop_front();
}
ASSERT(!slices_.empty());
ASSERT(slices_.front());
auto slice = std::move(slices_.front());
auto size = slice->dataSize();
length_ -= size;
slices_.pop_front();
if (!slice->isMutable()) {
// Create a mutable copy of the immutable slice data.
auto mutable_slice = OwnedSlice::create(size);
auto copy_size = mutable_slice->append(slice->data(), size);
ASSERT(copy_size == size);
// Drain trackers for the immutable slice will be called as part of the slice destructor.
return mutable_slice;
} else {
// Make sure drain trackers are called before ownership of the slice is transferred from
// the buffer to the caller.
slice->callAndClearDrainTrackers();
return slice;
}
}

uint64_t OwnedImpl::length() const {
#ifndef NDEBUG
// When running in debug mode, verify that the precomputed length matches the sum
Expand Down
45 changes: 36 additions & 9 deletions source/common/buffer/buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,23 @@ namespace Buffer {
* |
* data()
*/
class Slice {
class Slice : public SliceData {
public:
using Reservation = RawSlice;

virtual ~Slice() {
for (const auto& drain_tracker : drain_trackers_) {
drain_tracker();
}
~Slice() override { callAndClearDrainTrackers(); }

// SliceData
absl::Span<uint8_t> getMutableData() override {
RELEASE_ASSERT(isMutable(), "Not allowed to call getMutableData if slice is immutable");
return {base_ + data_, reservable_ - data_};
}

/**
* @return true if the data in the slice is mutable
*/
virtual bool isMutable() const { return false; }

/**
* @return a pointer to the start of the usable content.
*/
Expand Down Expand Up @@ -117,10 +124,10 @@ class Slice {
* @param reservation a reservation obtained from a previous call to reserve().
* If the reservation is not from this Slice, commit() will return false.
* If the caller is committing fewer bytes than provided by reserve(), it
* should change the mem_ field of the reservation before calling commit().
* should change the len_ field of the reservation before calling commit().
* For example, if a caller reserve()s 4KB to do a nonblocking socket read,
* and the read only returns two bytes, the caller should set
* reservation.mem_ = 2 and then call `commit(reservation)`.
* reservation.len_ = 2 and then call `commit(reservation)`.
* @return whether the Reservation was successfully committed to the Slice.
*/
bool commit(const Reservation& reservation) {
Expand Down Expand Up @@ -200,15 +207,32 @@ class Slice {
return SliceRepresentation{dataSize(), reservableSize(), capacity_};
}

/**
* Move all drain trackers from the current slice to the destination slice.
*/
void transferDrainTrackersTo(Slice& destination) {
destination.drain_trackers_.splice(destination.drain_trackers_.end(), drain_trackers_);
ASSERT(drain_trackers_.empty());
}

/**
* Add a drain tracker to the slice.
*/
void addDrainTracker(std::function<void()> drain_tracker) {
drain_trackers_.emplace_back(std::move(drain_tracker));
}

/**
* Call all drain trackers associated with the slice, then clear
* the drain tracker list.
*/
void callAndClearDrainTrackers() {
for (const auto& drain_tracker : drain_trackers_) {
drain_tracker();
}
drain_trackers_.clear();
}

protected:
Slice(uint64_t data, uint64_t reservable, uint64_t capacity)
: data_(data), reservable_(reservable), capacity_(capacity) {}
Expand Down Expand Up @@ -261,6 +285,8 @@ class OwnedSlice final : public Slice, public InlineStorage {
private:
OwnedSlice(uint64_t size) : Slice(0, 0, size) { base_ = storage_; }

bool isMutable() const override { return true; }

/**
* Compute a slice size big enough to hold a specified amount of data.
* @param data_size the minimum amount of data the slice must be able to store, in bytes.
Expand Down Expand Up @@ -539,6 +565,7 @@ class OwnedImpl : public LibEventInstance {
void copyOut(size_t start, uint64_t size, void* data) const override;
void drain(uint64_t size) override;
RawSliceVector getRawSlices(absl::optional<uint64_t> max_slices = absl::nullopt) const override;
SliceDataPtr extractMutableFrontSlice() override;
uint64_t length() const override;
void* linearize(uint32_t size) override;
void move(Instance& rhs) override;
Expand All @@ -558,13 +585,13 @@ class OwnedImpl : public LibEventInstance {
* @param data start of the content to copy.
*
*/
void appendSliceForTest(const void* data, uint64_t size);
virtual void appendSliceForTest(const void* data, uint64_t size);

/**
* Create a new slice at the end of the buffer, and copy the supplied string into it.
* @param data the string to append to the buffer.
*/
void appendSliceForTest(absl::string_view data);
virtual void appendSliceForTest(absl::string_view data);

/**
* Describe the in-memory representation of the slices in the buffer. For use
Expand Down
15 changes: 15 additions & 0 deletions source/common/buffer/watermark_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ void WatermarkBuffer::move(Instance& rhs, uint64_t length) {
checkHighAndOverflowWatermarks();
}

SliceDataPtr WatermarkBuffer::extractMutableFrontSlice() {
auto result = OwnedImpl::extractMutableFrontSlice();
checkLowWatermark();
return result;
}

Api::IoCallUint64Result WatermarkBuffer::read(Network::IoHandle& io_handle, uint64_t max_length) {
Api::IoCallUint64Result result = OwnedImpl::read(io_handle, max_length);
checkHighAndOverflowWatermarks();
Expand All @@ -69,6 +75,15 @@ Api::IoCallUint64Result WatermarkBuffer::write(Network::IoHandle& io_handle) {
return result;
}

void WatermarkBuffer::appendSliceForTest(const void* data, uint64_t size) {
OwnedImpl::appendSliceForTest(data, size);
checkHighAndOverflowWatermarks();
}

void WatermarkBuffer::appendSliceForTest(absl::string_view data) {
appendSliceForTest(data.data(), data.size());
}

void WatermarkBuffer::setWatermarks(uint32_t low_watermark, uint32_t high_watermark) {
ASSERT(low_watermark < high_watermark || (high_watermark == 0 && low_watermark == 0));
uint32_t overflow_watermark_multiplier =
Expand Down
3 changes: 3 additions & 0 deletions source/common/buffer/watermark_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ class WatermarkBuffer : public OwnedImpl {
void drain(uint64_t size) override;
void move(Instance& rhs) override;
void move(Instance& rhs, uint64_t length) override;
SliceDataPtr extractMutableFrontSlice() override;
Api::IoCallUint64Result read(Network::IoHandle& io_handle, uint64_t max_length) override;
uint64_t reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) override;
Api::IoCallUint64Result write(Network::IoHandle& io_handle) override;
void postProcess() override { checkLowWatermark(); }
void appendSliceForTest(const void* data, uint64_t size) override;
void appendSliceForTest(absl::string_view data) override;

void setWatermarks(uint32_t watermark) { setWatermarks(watermark / 2, watermark); }
void setWatermarks(uint32_t low_watermark, uint32_t high_watermark);
Expand Down
2 changes: 2 additions & 0 deletions test/common/buffer/buffer_fuzz.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ class StringBuffer : public Buffer::Instance {
return mutableStart();
}

Buffer::SliceDataPtr extractMutableFrontSlice() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }

void move(Buffer::Instance& rhs) override { move(rhs, rhs.length()); }

void move(Buffer::Instance& rhs, uint64_t length) override {
Expand Down
Loading