Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,6 @@ void ConnectionImpl::close(ConnectionCloseType type) {
return;
}

// All close types that follow do not actually close() the socket immediately so that buffered
// data can be written. However, we do want to stop reading to apply TCP backpressure.
read_enabled_ = false;

// NOTE: At this point, it's already been validated that the connection is not already in
// delayed close processing and therefore the timer has not yet been created.
if (delayed_close_timeout_set) {
Expand Down Expand Up @@ -244,9 +240,10 @@ void ConnectionImpl::noDelay(bool enable) {
uint64_t ConnectionImpl::id() const { return id_; }

void ConnectionImpl::onRead(uint64_t read_buffer_size) {
if (!read_enabled_) {
if (!read_enabled_ || inDelayedClose()) {
return;
}
ASSERT(ioHandle().isOpen());

if (read_buffer_size == 0 && !read_end_stream_) {
return;
Expand Down
164 changes: 164 additions & 0 deletions test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1355,6 +1355,7 @@ class FakeReadFilter : public Network::ReadFilter {
private:
ReadFilterCallbacks* callbacks_{nullptr};
};

class MockTransportConnectionImplTest : public testing::Test {
public:
MockTransportConnectionImplTest() {
Expand Down Expand Up @@ -1696,6 +1697,169 @@ TEST_F(MockTransportConnectionImplTest, WriteReadyOnConnected) {
.WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, true}));
}

// Fixture for validating behavior after a connection is closed.
class PostCloseConnectionImplTest : public MockTransportConnectionImplTest {
protected:
// Setup connection, single read event.
void initialize() {
connection_->addReadFilter(read_filter_);
connection_->setDelayedCloseTimeout(std::chrono::milliseconds(100));

EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Invoke([this](Buffer::Instance& buffer) -> IoResult {
buffer.add(val_.c_str(), val_.size());
return {PostIoAction::KeepOpen, val_.size(), false};
}));
EXPECT_CALL(*read_filter_, onNewConnection());
EXPECT_CALL(*read_filter_, onData(_, _));
file_ready_cb_(Event::FileReadyType::Read);
}

void writeSomeData() {
Buffer::OwnedImpl buffer("data");
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Write));
connection_->write(buffer, false);
}

const std::string val_{"a"};
std::shared_ptr<MockReadFilter> read_filter_{new StrictMock<MockReadFilter>()};
};

// Test that if a read event occurs after
// close(ConnectionCloseType::FlushWriteAndDelay), the read is not propagated to
// a read filter.
TEST_F(PostCloseConnectionImplTest, ReadAfterCloseFlushWriteDelayIgnored) {
InSequence s;
initialize();

// Delayed connection close.
EXPECT_CALL(dispatcher_, createTimer_(_));
connection_->close(ConnectionCloseType::FlushWriteAndDelay);

// Read event, doRead() happens on connection but no filter onData().
EXPECT_CALL(*read_filter_, onData(_, _)).Times(0);
EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Invoke([this](Buffer::Instance& buffer) -> IoResult {
buffer.add(val_.c_str(), val_.size());
return {PostIoAction::KeepOpen, val_.size(), false};
}));
file_ready_cb_(Event::FileReadyType::Read);
// Deferred close.
EXPECT_CALL(*transport_socket_, closeSocket(_));
}

// Test that if a read event occurs after
// close(ConnectionCloseType::FlushWriteAndDelay) with pending write data, the
// read is not propagated to a read filter.
TEST_F(PostCloseConnectionImplTest, ReadAfterCloseFlushWriteDelayIgnoredWithWriteData) {
InSequence s;
initialize();
writeSomeData();

// Delayed connection close.
EXPECT_CALL(dispatcher_, createTimer_(_));
connection_->close(ConnectionCloseType::FlushWriteAndDelay);

// Read event, doRead() happens on connection but no filter onData().
EXPECT_CALL(*read_filter_, onData(_, _)).Times(0);
EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Invoke([this](Buffer::Instance& buffer) -> IoResult {
buffer.add(val_.c_str(), val_.size());
return {PostIoAction::KeepOpen, val_.size(), false};
}));
file_ready_cb_(Event::FileReadyType::Read);
// We have data written above in writeSomeData(), it will be flushed here.
EXPECT_CALL(*transport_socket_, doWrite(_, true))
.WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false}));
// Deferred close.
EXPECT_CALL(*transport_socket_, closeSocket(_));
}

// Test that if a read event occurs after
// close(ConnectionCloseType::FlushWriteAndDelay) with pending write data and a
// transport socket than canFlushClose(), the read is not propagated to a read
// filter.
TEST_F(PostCloseConnectionImplTest, ReadAfterCloseFlushWriteDelayIgnoredCanFlushClose) {
InSequence s;
initialize();
writeSomeData();

// The path of interest is when the transport socket canFlushClose().
ON_CALL(*transport_socket_, canFlushClose()).WillByDefault(Return(true));

// Delayed connection close.
EXPECT_CALL(dispatcher_, createTimer_(_));
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write | Event::FileReadyType::Closed));
connection_->close(ConnectionCloseType::FlushWriteAndDelay);

// Read event, doRead() happens on connection but no filter onData().
EXPECT_CALL(*read_filter_, onData(_, _)).Times(0);
EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Invoke([this](Buffer::Instance& buffer) -> IoResult {
buffer.add(val_.c_str(), val_.size());
return {PostIoAction::KeepOpen, val_.size(), false};
}));
file_ready_cb_(Event::FileReadyType::Read);

// Deferred close.
EXPECT_CALL(*transport_socket_, closeSocket(_));
}

// Test that if a read event occurs after close(ConnectionCloseType::NoFlush),
// then no read is attempted from the transport socket and hence the read is not
// propagated to a read filter.
TEST_F(PostCloseConnectionImplTest, NoReadAfterCloseNoFlush) {
InSequence s;
initialize();

// Immediate connection close.
EXPECT_CALL(*transport_socket_, closeSocket(_));
connection_->close(ConnectionCloseType::NoFlush);

// We don't even see a doRead(), let alone an onData() callback.
EXPECT_CALL(*read_filter_, onData(_, _)).Times(0);
EXPECT_CALL(*transport_socket_, doRead(_)).Times(0);
file_ready_cb_(Event::FileReadyType::Read);
}

// Test that if a read event occurs after close(ConnectionCloseType::FlushWrite),
// then no read is attempted from the transport socket and hence the read is not
// propagated to a read filter.
TEST_F(PostCloseConnectionImplTest, NoReadAfterCloseFlushWrite) {
InSequence s;
initialize();

// Connection flush and close.
EXPECT_CALL(*transport_socket_, closeSocket(_));
connection_->close(ConnectionCloseType::FlushWrite);

// We don't even see a doRead(), let alone an onData() callback.
EXPECT_CALL(*read_filter_, onData(_, _)).Times(0);
EXPECT_CALL(*transport_socket_, doRead(_)).Times(0);
file_ready_cb_(Event::FileReadyType::Read);
}

// Test that if a read event occurs after close(ConnectionCloseType::FlushWrite)
// with pending write data, then no read is attempted from the transport socket
// and hence the read is not propagated to a read filter.
TEST_F(PostCloseConnectionImplTest, NoReadAfterCloseFlushWriteWriteData) {
InSequence s;
initialize();
writeSomeData();

// Connection flush and close. We have data written above in writeSomeData(),
// it will be flushed here.
EXPECT_CALL(*transport_socket_, doWrite(_, true))
.WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false}));
EXPECT_CALL(*transport_socket_, closeSocket(_));
connection_->close(ConnectionCloseType::FlushWrite);

// We don't even see a doRead(), let alone an onData() callback.
EXPECT_CALL(*read_filter_, onData(_, _)).Times(0);
EXPECT_CALL(*transport_socket_, doRead(_)).Times(0);
file_ready_cb_(Event::FileReadyType::Read);
}

class ReadBufferLimitTest : public ConnectionImplTest {
public:
void readBufferLimitTest(uint32_t read_buffer_limit, uint32_t expected_chunk_size) {
Expand Down
Loading