Skip to content
131 changes: 66 additions & 65 deletions source/common/buffer/buffer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ void OwnedImpl::addImpl(const void* data, uint64_t size) {
bool new_slice_needed = slices_.empty();
while (size != 0) {
if (new_slice_needed) {
slices_.emplace_back(OwnedSlice::create(size));
slices_.emplace_back(size);
}
uint64_t copy_size = slices_.back()->append(src, size);
uint64_t copy_size = slices_.back().append(src, size);
src += copy_size;
size -= copy_size;
length_ += copy_size;
Expand All @@ -35,14 +35,14 @@ void OwnedImpl::addImpl(const void* data, uint64_t size) {

void OwnedImpl::addDrainTracker(std::function<void()> drain_tracker) {
ASSERT(!slices_.empty());
slices_.back()->addDrainTracker(std::move(drain_tracker));
slices_.back().addDrainTracker(std::move(drain_tracker));
}

void OwnedImpl::add(const void* data, uint64_t size) { addImpl(data, size); }

void OwnedImpl::addBufferFragment(BufferFragment& fragment) {
length_ += fragment.size();
slices_.emplace_back(std::make_unique<UnownedSlice>(fragment));
slices_.emplace_back(fragment);
}

void OwnedImpl::add(absl::string_view data) { add(data.data(), data.size()); }
Expand All @@ -59,9 +59,9 @@ void OwnedImpl::prepend(absl::string_view data) {
bool new_slice_needed = slices_.empty();
while (size != 0) {
if (new_slice_needed) {
slices_.emplace_front(OwnedSlice::create(size));
slices_.emplace_front(size);
}
uint64_t copy_size = slices_.front()->prepend(data.data(), size);
uint64_t copy_size = slices_.front().prepend(data.data(), size);
size -= copy_size;
length_ += copy_size;
new_slice_needed = true;
Expand All @@ -72,7 +72,7 @@ void OwnedImpl::prepend(Instance& data) {
ASSERT(&data != this);
OwnedImpl& other = static_cast<OwnedImpl&>(data);
while (!other.slices_.empty()) {
uint64_t slice_size = other.slices_.back()->dataSize();
uint64_t slice_size = other.slices_.back().dataSize();
length_ += slice_size;
slices_.emplace_front(std::move(other.slices_.back()));
other.slices_.pop_back();
Expand All @@ -85,26 +85,26 @@ void OwnedImpl::commit(RawSlice* iovecs, uint64_t num_iovecs) {
if (num_iovecs == 0) {
return;
}
if (slices_.empty()) {
return;
}
// Find the slices in the buffer that correspond to the iovecs:
// First, scan backward from the end of the buffer to find the last slice containing
// any content. Reservations are made from the end of the buffer, and out-of-order commits
// aren't supported, so any slices before this point cannot match the iovecs being committed.
ssize_t slice_index = static_cast<ssize_t>(slices_.size()) - 1;
while (slice_index >= 0 && slices_[slice_index]->dataSize() == 0) {
while (slice_index >= 0 && slices_[slice_index].dataSize() == 0) {
slice_index--;
}
if (slice_index < 0) {
// There was no slice containing any data, so rewind the iterator at the first slice.
slice_index = 0;
if (!slices_[0]) {
return;
}
}

// Next, scan forward and attempt to match the slices against iovecs.
uint64_t num_slices_committed = 0;
while (num_slices_committed < num_iovecs) {
if (slices_[slice_index]->commit(iovecs[num_slices_committed])) {
if (slices_[slice_index].commit(iovecs[num_slices_committed])) {
length_ += iovecs[num_slices_committed].len_;
num_slices_committed++;
}
Expand All @@ -115,7 +115,7 @@ void OwnedImpl::commit(RawSlice* iovecs, uint64_t num_iovecs) {
}

// In case an extra slice was reserved, remove empty slices from the end of the buffer.
while (!slices_.empty() && slices_.back()->dataSize() == 0) {
while (!slices_.empty() && slices_.back().dataSize() == 0) {
slices_.pop_back();
}

Expand All @@ -129,15 +129,15 @@ void OwnedImpl::copyOut(size_t start, uint64_t size, void* data) const {
if (size == 0) {
break;
}
uint64_t data_size = slice->dataSize();
uint64_t data_size = slice.dataSize();
if (data_size <= bytes_to_skip) {
// The offset where the caller wants to start copying is after the end of this slice,
// so just skip over this slice completely.
bytes_to_skip -= data_size;
continue;
}
uint64_t copy_size = std::min(size, data_size - bytes_to_skip);
memcpy(dest, slice->data() + bytes_to_skip, copy_size);
memcpy(dest, slice.data() + bytes_to_skip, copy_size);
size -= copy_size;
dest += copy_size;
// Now that we've started copying, there are no bytes left to skip over. If there
Expand All @@ -155,20 +155,20 @@ void OwnedImpl::drainImpl(uint64_t size) {
if (slices_.empty()) {
break;
}
uint64_t slice_size = slices_.front()->dataSize();
uint64_t slice_size = slices_.front().dataSize();
if (slice_size <= size) {
slices_.pop_front();
length_ -= slice_size;
size -= slice_size;
} else {
slices_.front()->drain(size);
slices_.front().drain(size);
length_ -= size;
size = 0;
}
}
// Make sure to drain any zero byte fragments that might have been added as
// sentinels for flushed data.
while (!slices_.empty() && slices_.front()->dataSize() == 0) {
while (!slices_.empty() && slices_.front().dataSize() == 0) {
slices_.pop_front();
}
}
Expand All @@ -186,7 +186,7 @@ RawSliceVector OwnedImpl::getRawSlices(absl::optional<uint64_t> max_slices) cons
break;
}

if (slice->dataSize() == 0) {
if (slice.dataSize() == 0) {
continue;
}

Expand All @@ -196,16 +196,17 @@ RawSliceVector OwnedImpl::getRawSlices(absl::optional<uint64_t> max_slices) cons
// there is currently no max size validation.
// TODO(antoniovicente) Set realistic limits on the max size of BufferSlice and consider use of
// size_t instead of uint64_t in the Slice interface.
raw_slices.emplace_back(RawSlice{slice->data(), static_cast<size_t>(slice->dataSize())});
raw_slices.emplace_back(
RawSlice{const_cast<uint8_t*>(slice.data()), static_cast<size_t>(slice.dataSize())});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth creating a ConstRawSliceVector to avoid the const_cast? I think uses of getRawSlices() do not modify the slice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The introduction of ConstRawSlice in parallel to RawSlice will require API across multiple interfaces. Some examples include arguments to IoHandle::writev which should use ConstRawSlice, while readv should continue using RawSlice. Some const casts would be needed when you have ConstRawSlice and are feeding that input into C APIs like nghttp2_hd_inflate_hd2 in MetadataDecoder::decodeMetadataPayloadUsingNghttp2

This PR already feels huge. Happy to try the change as a followup if you'ld like.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, agree we shouldn't expand the scope that much in this PR.

}
return raw_slices;
}

RawSlice OwnedImpl::frontSlice() const {
// Ignore zero-size slices and return the first slice with data.
for (const auto& slice : slices_) {
if (slice->dataSize() > 0) {
return RawSlice{slice->data(), slice->dataSize()};
if (slice.dataSize() > 0) {
return RawSlice{const_cast<uint8_t*>(slice.data()), slice.dataSize()};
}
}

Expand All @@ -216,27 +217,26 @@ SliceDataPtr OwnedImpl::extractMutableFrontSlice() {
RELEASE_ASSERT(length_ > 0, "Extract called on empty buffer");
// Remove zero byte fragments from the front of the queue to ensure
// that the extracted slice has data.
while (!slices_.empty() && slices_.front()->dataSize() == 0) {
while (!slices_.empty() && slices_.front().dataSize() == 0) {
slices_.pop_front();
}
ASSERT(!slices_.empty());
ASSERT(slices_.front());
auto slice = std::move(slices_.front());
auto size = slice->dataSize();
auto size = slice.dataSize();
length_ -= size;
slices_.pop_front();
if (!slice->isMutable()) {
if (!slice.isMutable()) {
// Create a mutable copy of the immutable slice data.
auto mutable_slice = OwnedSlice::create(size);
auto copy_size = mutable_slice->append(slice->data(), size);
Slice mutable_slice{size};
auto copy_size = mutable_slice.append(slice.data(), size);
ASSERT(copy_size == size);
// Drain trackers for the immutable slice will be called as part of the slice destructor.
return mutable_slice;
return std::make_unique<SliceDataImpl>(std::move(mutable_slice));
} else {
// Make sure drain trackers are called before ownership of the slice is transferred from
// the buffer to the caller.
slice->callAndClearDrainTrackers();
return slice;
slice.callAndClearDrainTrackers();
return std::make_unique<SliceDataImpl>(std::move(slice));
}
}

Expand All @@ -246,7 +246,7 @@ uint64_t OwnedImpl::length() const {
// of the lengths of the slices.
uint64_t length = 0;
for (const auto& slice : slices_) {
length += slice->dataSize();
length += slice.dataSize();
}
ASSERT(length == length_);
#endif
Expand All @@ -259,13 +259,13 @@ void* OwnedImpl::linearize(uint32_t size) {
if (slices_.empty()) {
return nullptr;
}
if (slices_[0]->dataSize() < size) {
auto new_slice = OwnedSlice::create(size);
Slice::Reservation reservation = new_slice->reserve(size);
if (slices_[0].dataSize() < size) {
Slice new_slice{size};
Slice::Reservation reservation = new_slice.reserve(size);
ASSERT(reservation.mem_ != nullptr);
ASSERT(reservation.len_ == size);
copyOut(0, size, reservation.mem_);
new_slice->commit(reservation);
new_slice.commit(reservation);

// Replace the first 'size' bytes in the buffer with the new slice. Since new_slice re-adds the
// drained bytes, avoid use of the overridable 'drain' method to avoid incorrectly checking if
Expand All @@ -274,23 +274,23 @@ void* OwnedImpl::linearize(uint32_t size) {
slices_.emplace_front(std::move(new_slice));
length_ += size;
}
return slices_.front()->data();
return slices_.front().data();
}

void OwnedImpl::coalesceOrAddSlice(SlicePtr&& other_slice) {
const uint64_t slice_size = other_slice->dataSize();
void OwnedImpl::coalesceOrAddSlice(Slice&& other_slice) {
const uint64_t slice_size = other_slice.dataSize();
// The `other_slice` content can be coalesced into the existing slice IFF:
// 1. The `other_slice` can be coalesced. Objects of type UnownedSlice can not be coalesced. See
// comment in the UnownedSlice class definition;
// 1. The `other_slice` can be coalesced. Immutable slices can not be safely coalesced because
// their destructors can be arbitrary global side effects.
// 2. There are existing slices;
// 3. The `other_slice` content length is under the CopyThreshold;
// 4. There is enough unused space in the existing slice to accommodate the `other_slice` content.
if (other_slice->canCoalesce() && !slices_.empty() && slice_size < CopyThreshold &&
slices_.back()->reservableSize() >= slice_size) {
if (other_slice.canCoalesce() && !slices_.empty() && slice_size < CopyThreshold &&
slices_.back().reservableSize() >= slice_size) {
// Copy content of the `other_slice`. The `move` methods which call this method effectively
// drain the source buffer.
addImpl(other_slice->data(), slice_size);
other_slice->transferDrainTrackersTo(*slices_.back());
addImpl(other_slice.data(), slice_size);
other_slice.transferDrainTrackersTo(slices_.back());
} else {
// Take ownership of the slice.
slices_.emplace_back(std::move(other_slice));
Expand All @@ -305,7 +305,7 @@ void OwnedImpl::move(Instance& rhs) {
// want to maintain an abstraction.
OwnedImpl& other = static_cast<OwnedImpl&>(rhs);
while (!other.slices_.empty()) {
const uint64_t slice_size = other.slices_.front()->dataSize();
const uint64_t slice_size = other.slices_.front().dataSize();
coalesceOrAddSlice(std::move(other.slices_.front()));
other.length_ -= slice_size;
other.slices_.pop_front();
Expand All @@ -318,15 +318,15 @@ void OwnedImpl::move(Instance& rhs, uint64_t length) {
// See move() above for why we do the static cast.
OwnedImpl& other = static_cast<OwnedImpl&>(rhs);
while (length != 0 && !other.slices_.empty()) {
const uint64_t slice_size = other.slices_.front()->dataSize();
const uint64_t slice_size = other.slices_.front().dataSize();
const uint64_t copy_size = std::min(slice_size, length);
if (copy_size == 0) {
other.slices_.pop_front();
} else if (copy_size < slice_size) {
// TODO(brian-pane) add reference-counting to allow slices to share their storage
// and eliminate the copy for this partial-slice case?
add(other.slices_.front()->data(), copy_size);
other.slices_.front()->drain(copy_size);
add(other.slices_.front().data(), copy_size);
other.slices_.front().drain(copy_size);
other.length_ -= copy_size;
} else {
coalesceOrAddSlice(std::move(other.slices_.front()));
Expand All @@ -345,11 +345,11 @@ uint64_t OwnedImpl::reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iove
// Check whether there are any empty slices with reservable space at the end of the buffer.
size_t first_reservable_slice = slices_.size();
while (first_reservable_slice > 0) {
if (slices_[first_reservable_slice - 1]->reservableSize() == 0) {
if (slices_[first_reservable_slice - 1].reservableSize() == 0) {
break;
}
first_reservable_slice--;
if (slices_[first_reservable_slice]->dataSize() != 0) {
if (slices_[first_reservable_slice].dataSize() != 0) {
// There is some content in this slice, so anything in front of it is non-reservable.
break;
}
Expand All @@ -362,24 +362,24 @@ uint64_t OwnedImpl::reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iove
size_t slice_index = first_reservable_slice;
while (slice_index < slices_.size() && bytes_remaining != 0 && num_slices_used < num_iovecs) {
auto& slice = slices_[slice_index];
const uint64_t reservation_size = std::min(slice->reservableSize(), bytes_remaining);
const uint64_t reservation_size = std::min(slice.reservableSize(), bytes_remaining);
if (num_slices_used + 1 == num_iovecs && reservation_size < bytes_remaining) {
// There is only one iovec left, and this next slice does not have enough space to
// complete the reservation. Stop iterating, with last one iovec still unpopulated,
// so the code following this loop can allocate a new slice to hold the rest of the
// reservation.
break;
}
iovecs[num_slices_used] = slice->reserve(reservation_size);
iovecs[num_slices_used] = slice.reserve(reservation_size);
bytes_remaining -= iovecs[num_slices_used].len_;
num_slices_used++;
slice_index++;
}

// If needed, allocate one more slice at the end to provide the remainder of the reservation.
if (bytes_remaining != 0) {
slices_.emplace_back(OwnedSlice::create(bytes_remaining));
iovecs[num_slices_used] = slices_.back()->reserve(bytes_remaining);
slices_.emplace_back(bytes_remaining);
iovecs[num_slices_used] = slices_.back().reserve(bytes_remaining);
bytes_remaining -= iovecs[num_slices_used].len_;
num_slices_used++;
}
Expand Down Expand Up @@ -409,13 +409,13 @@ ssize_t OwnedImpl::search(const void* data, uint64_t size, size_t start, size_t
for (size_t slice_index = 0; slice_index < slices_.size() && (left_to_search > 0);
slice_index++) {
const auto& slice = slices_[slice_index];
uint64_t slice_size = slice->dataSize();
uint64_t slice_size = slice.dataSize();
if (slice_size <= start) {
start -= slice_size;
offset += slice_size;
continue;
}
const uint8_t* slice_start = slice->data();
const uint8_t* slice_start = slice.data();
const uint8_t* haystack = slice_start;
const uint8_t* haystack_end = haystack + slice_size;
haystack += start;
Expand Down Expand Up @@ -450,8 +450,8 @@ ssize_t OwnedImpl::search(const void* data, uint64_t size, size_t start, size_t
break;
}
const auto& match_slice = slices_[match_index];
match_next = match_slice->data();
match_end = match_next + match_slice->dataSize();
match_next = match_slice.data();
match_end = match_next + match_slice.dataSize();
continue;
}
left_to_search--;
Expand Down Expand Up @@ -487,8 +487,8 @@ bool OwnedImpl::startsWith(absl::string_view data) const {
const uint8_t* prefix = reinterpret_cast<const uint8_t*>(data.data());
size_t size = data.length();
for (const auto& slice : slices_) {
uint64_t slice_size = slice->dataSize();
const uint8_t* slice_start = slice->data();
uint64_t slice_size = slice.dataSize();
const uint8_t* slice_start = slice.data();

if (slice_size >= size) {
// The remaining size bytes of data are in this slice.
Expand Down Expand Up @@ -530,18 +530,19 @@ std::string OwnedImpl::toString() const {
void OwnedImpl::postProcess() {}

void OwnedImpl::appendSliceForTest(const void* data, uint64_t size) {
slices_.emplace_back(OwnedSlice::create(data, size));
slices_.emplace_back(size);
slices_.back().append(data, size);
length_ += size;
}

void OwnedImpl::appendSliceForTest(absl::string_view data) {
appendSliceForTest(data.data(), data.size());
}

std::vector<OwnedSlice::SliceRepresentation> OwnedImpl::describeSlicesForTest() const {
std::vector<OwnedSlice::SliceRepresentation> slices;
std::vector<Slice::SliceRepresentation> OwnedImpl::describeSlicesForTest() const {
std::vector<Slice::SliceRepresentation> slices;
for (const auto& slice : slices_) {
slices.push_back(slice->describeSliceForTest());
slices.push_back(slice.describeSliceForTest());
}
return slices;
}
Expand Down
Loading