Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions source/common/buffer/watermark_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ void WatermarkBuffer::appendSliceForTest(absl::string_view data) {
appendSliceForTest(data.data(), data.size());
}

void WatermarkBuffer::setWatermarks(uint32_t low_watermark, uint32_t high_watermark) {
ASSERT(low_watermark < high_watermark || (high_watermark == 0 && low_watermark == 0));
void WatermarkBuffer::setWatermarks(uint32_t high_watermark) {
uint32_t overflow_watermark_multiplier =
Runtime::getInteger("envoy.buffer.overflow_multiplier", 0);
if (overflow_watermark_multiplier > 0 &&
Expand All @@ -101,7 +100,7 @@ void WatermarkBuffer::setWatermarks(uint32_t low_watermark, uint32_t high_waterm
"high_watermark is overflowing. Disabling overflow watermark.");
overflow_watermark_multiplier = 0;
}
low_watermark_ = low_watermark;
low_watermark_ = high_watermark / 2;
high_watermark_ = high_watermark;
overflow_watermark_ = overflow_watermark_multiplier * high_watermark;
checkHighAndOverflowWatermarks();
Expand Down
3 changes: 1 addition & 2 deletions source/common/buffer/watermark_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ class WatermarkBuffer : public OwnedImpl {
void appendSliceForTest(const void* data, uint64_t size) override;
void appendSliceForTest(absl::string_view data) override;

void setWatermarks(uint32_t watermark) override { setWatermarks(watermark / 2, watermark); }
void setWatermarks(uint32_t low_watermark, uint32_t high_watermark);
void setWatermarks(uint32_t high_watermark) override;
uint32_t highWatermark() const override { return high_watermark_; }
// Returns true if the high watermark callbacks have been called more recently
// than the low watermark callbacks.
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/http2/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ ConnectionImpl::StreamImpl::StreamImpl(ConnectionImpl& parent, uint32_t buffer_l
pending_send_buffer_high_watermark_called_(false), reset_due_to_messaging_error_(false) {
parent_.stats_.streams_active_.inc();
if (buffer_limit > 0) {
setWriteBufferWatermarks(buffer_limit / 2, buffer_limit);
setWriteBufferWatermarks(buffer_limit);
}
}

Expand Down
6 changes: 3 additions & 3 deletions source/common/http/http2/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,9 @@ class ConnectionImpl : public virtual Connection,
}
}

void setWriteBufferWatermarks(uint32_t low_watermark, uint32_t high_watermark) {
pending_recv_data_.setWatermarks(low_watermark, high_watermark);
pending_send_data_.setWatermarks(low_watermark, high_watermark);
void setWriteBufferWatermarks(uint32_t high_watermark) {
pending_recv_data_.setWatermarks(high_watermark);
pending_send_data_.setWatermarks(high_watermark);
}

// If the receive buffer encounters watermark callbacks, enable/disable reads on this stream.
Expand Down
48 changes: 24 additions & 24 deletions test/common/buffer/watermark_buffer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const char TEN_BYTES[] = "0123456789";

class WatermarkBufferTest : public testing::Test {
public:
WatermarkBufferTest() { buffer_.setWatermarks(5, 10); }
WatermarkBufferTest() { buffer_.setWatermarks(10); }

Buffer::WatermarkBuffer buffer_{[&]() -> void { ++times_low_watermark_called_; },
[&]() -> void { ++times_high_watermark_called_; },
Expand Down Expand Up @@ -104,7 +104,7 @@ TEST_F(WatermarkBufferTest, PrependBuffer) {
WatermarkBuffer prefixBuffer{[&]() -> void { ++prefix_buffer_low_watermark_hits; },
[&]() -> void { ++prefix_buffer_high_watermark_hits; },
[&]() -> void { ++prefix_buffer_overflow_watermark_hits; }};
prefixBuffer.setWatermarks(5, 10);
prefixBuffer.setWatermarks(10);
prefixBuffer.add(prefix);
prefixBuffer.add(suffix);

Expand Down Expand Up @@ -196,18 +196,18 @@ TEST_F(WatermarkBufferTest, DrainUsingExtract) {
// Verify that low watermark callback is called on drain in the case where the
// high watermark is non-zero and low watermark is 0.
TEST_F(WatermarkBufferTest, DrainWithLowWatermarkOfZero) {
buffer_.setWatermarks(0, 10);
buffer_.setWatermarks(1);

// Draining from above to below the low watermark does nothing if the high
// watermark never got hit.
buffer_.add(TEN_BYTES, 10);
buffer_.drain(10);
buffer_.add(TEN_BYTES, 1);
buffer_.drain(1);
EXPECT_EQ(0, times_high_watermark_called_);
EXPECT_EQ(0, times_low_watermark_called_);

// Go above the high watermark then drain down to just above the low watermark.
buffer_.add(TEN_BYTES, 11);
buffer_.drain(10);
buffer_.add(TEN_BYTES, 2);
buffer_.drain(1);
EXPECT_EQ(1, buffer_.length());
EXPECT_EQ(0, times_low_watermark_called_);

Expand All @@ -216,7 +216,7 @@ TEST_F(WatermarkBufferTest, DrainWithLowWatermarkOfZero) {
EXPECT_EQ(1, times_low_watermark_called_);

// Going back above should trigger the high again
buffer_.add(TEN_BYTES, 11);
buffer_.add(TEN_BYTES, 2);
EXPECT_EQ(2, times_high_watermark_called_);
}

Expand Down Expand Up @@ -284,18 +284,18 @@ TEST_F(WatermarkBufferTest, WatermarkFdFunctions) {
TEST_F(WatermarkBufferTest, MoveWatermarks) {
buffer_.add(TEN_BYTES, 9);
EXPECT_EQ(0, times_high_watermark_called_);
buffer_.setWatermarks(1, 9);
buffer_.setWatermarks(9);
EXPECT_EQ(0, times_high_watermark_called_);
buffer_.setWatermarks(1, 8);
buffer_.setWatermarks(8);
EXPECT_EQ(1, times_high_watermark_called_);

buffer_.setWatermarks(8, 20);
buffer_.setWatermarks(16);
EXPECT_EQ(0, times_low_watermark_called_);
buffer_.setWatermarks(9, 20);
buffer_.setWatermarks(18);
EXPECT_EQ(1, times_low_watermark_called_);
buffer_.setWatermarks(7, 20);
buffer_.setWatermarks(14);
EXPECT_EQ(1, times_low_watermark_called_);
buffer_.setWatermarks(9, 20);
buffer_.setWatermarks(18);
EXPECT_EQ(1, times_low_watermark_called_);
EXPECT_EQ(0, times_overflow_watermark_called_);

Expand Down Expand Up @@ -355,7 +355,7 @@ TEST_F(WatermarkBufferTest, MoveBackWithWatermarks) {
Buffer::WatermarkBuffer buffer1{[&]() -> void { ++low_watermark_buffer1; },
[&]() -> void { ++high_watermark_buffer1; },
[&]() -> void { ++overflow_watermark_buffer1; }};
buffer1.setWatermarks(5, 10);
buffer1.setWatermarks(10);

// Stick 20 bytes in buffer_ and expect the high watermark is hit.
buffer_.add(TEN_BYTES, 10);
Expand Down Expand Up @@ -393,7 +393,7 @@ TEST_F(WatermarkBufferTest, OverflowWatermark) {
Buffer::WatermarkBuffer buffer1{[&]() -> void { ++low_watermark_buffer1; },
[&]() -> void { ++high_watermark_buffer1; },
[&]() -> void { ++overflow_watermark_buffer1; }};
buffer1.setWatermarks(5, 10);
buffer1.setWatermarks(10);

buffer1.add(TEN_BYTES, 10);
EXPECT_EQ(0, high_watermark_buffer1);
Expand Down Expand Up @@ -439,7 +439,7 @@ TEST_F(WatermarkBufferTest, OverflowWatermarkDisabled) {
Buffer::WatermarkBuffer buffer1{[&]() -> void { ++low_watermark_buffer1; },
[&]() -> void { ++high_watermark_buffer1; },
[&]() -> void { ++overflow_watermark_buffer1; }};
buffer1.setWatermarks(5, 10);
buffer1.setWatermarks(10);

buffer1.add(TEN_BYTES, 10);
EXPECT_EQ(0, high_watermark_buffer1);
Expand Down Expand Up @@ -510,7 +510,7 @@ TEST_F(WatermarkBufferTest, OverflowWatermarkEqualHighWatermark) {
Buffer::WatermarkBuffer buffer1{[&]() -> void { ++low_watermark_buffer1; },
[&]() -> void { ++high_watermark_buffer1; },
[&]() -> void { ++overflow_watermark_buffer1; }};
buffer1.setWatermarks(5, 10);
buffer1.setWatermarks(10);

buffer1.add(TEN_BYTES, 10);
EXPECT_EQ(0, high_watermark_buffer1);
Expand Down Expand Up @@ -540,25 +540,25 @@ TEST_F(WatermarkBufferTest, MoveWatermarksOverflow) {
Buffer::WatermarkBuffer buffer1{[&]() -> void { ++low_watermark_buffer1; },
[&]() -> void { ++high_watermark_buffer1; },
[&]() -> void { ++overflow_watermark_buffer1; }};
buffer1.setWatermarks(5, 10);
buffer1.setWatermarks(10);
buffer1.add(TEN_BYTES, 9);
EXPECT_EQ(0, high_watermark_buffer1);
EXPECT_EQ(0, overflow_watermark_buffer1);
buffer1.setWatermarks(1, 9);
buffer1.setWatermarks(9);
EXPECT_EQ(0, high_watermark_buffer1);
EXPECT_EQ(0, overflow_watermark_buffer1);
buffer1.setWatermarks(1, 8);
buffer1.setWatermarks(8);
EXPECT_EQ(1, high_watermark_buffer1);
EXPECT_EQ(0, overflow_watermark_buffer1);
buffer1.setWatermarks(1, 5);
buffer1.setWatermarks(5);
EXPECT_EQ(1, high_watermark_buffer1);
EXPECT_EQ(0, overflow_watermark_buffer1);
buffer1.setWatermarks(1, 4);
buffer1.setWatermarks(4);
EXPECT_EQ(1, high_watermark_buffer1);
EXPECT_EQ(1, overflow_watermark_buffer1);

// Overflow is only triggered once
buffer1.setWatermarks(3, 6);
buffer1.setWatermarks(6);
EXPECT_EQ(0, low_watermark_buffer1);
EXPECT_EQ(1, high_watermark_buffer1);
EXPECT_EQ(1, overflow_watermark_buffer1);
Expand Down
2 changes: 1 addition & 1 deletion test/common/http/http2/codec_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1419,7 +1419,7 @@ TEST_P(Http2CodecImplFlowControlTest, FlowControlPendingRecvData) {
// the recv buffer can be overrun by a client which negotiates a larger
// SETTINGS_MAX_FRAME_SIZE but there's no current easy way to tweak that in
// envoy (without sending raw HTTP/2 frames) so we lower the buffer limit instead.
server_->getStream(1)->setWriteBufferWatermarks(10, 20);
server_->getStream(1)->setWriteBufferWatermarks(20);

EXPECT_CALL(request_decoder_, decodeData(_, false));
Buffer::OwnedImpl data(std::string(40, 'a'));
Expand Down
2 changes: 1 addition & 1 deletion test/mocks/http/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class MockStream : public Stream {
MOCK_METHOD(void, removeCallbacks, (StreamCallbacks & callbacks));
MOCK_METHOD(void, resetStream, (StreamResetReason reason));
MOCK_METHOD(void, readDisable, (bool disable));
MOCK_METHOD(void, setWriteBufferWatermarks, (uint32_t, uint32_t));
MOCK_METHOD(void, setWriteBufferWatermarks, (uint32_t));
MOCK_METHOD(uint32_t, bufferLimit, ());
MOCK_METHOD(const Network::Address::InstanceConstSharedPtr&, connectionLocalAddress, ());
MOCK_METHOD(void, setFlushTimeout, (std::chrono::milliseconds timeout));
Expand Down