Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ void ConnectionImpl::readDisable(bool disable) {
}

void ConnectionImpl::raiseEvent(ConnectionEvent event) {
// We may have pending data in the write buffer on transport handshake
// completion, which may also have completed in the context of onReadReady(),
// where no check of the write buffer is made. Provide an opportunity to flush
// here. If connection write is not ready, this is harmless.
if (event == ConnectionEvent::Connected && write_buffer_->length() > 0) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this happen before notifying callbacks?

If we leave this after the above loop, should check whether the connection was closed before doing this.

onWriteReady();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onWriteReady() can close the socket. Need to check that before continuing. Although, if that happens, filters will get notified of a close before being notified that it was connected.

Hmmm... Thinking this through, it is probably better to notify all filters that they're connected (ie put it back to the way it was before), and make this case be:

if (state() == State::Open && event == ConnectionEvent::Connected && write_buffer_->length() > 0)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to go in circles on this; ConnectionImpl is complicated.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, this is an area of the code I'm less familiar with, and it's hard to reason about what callbacks can and can't do here, so thanks for the patience.

}
for (ConnectionCallbacks* callback : callbacks_) {
// TODO(mattklein123): If we close while raising a connected event we should not raise further
// connected events.
Expand Down
34 changes: 33 additions & 1 deletion test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,10 @@ class MockTransportConnectionImplTest : public testing::Test {
EXPECT_CALL(dispatcher_, createFileEvent_(0, _, _, _))
.WillOnce(DoAll(SaveArg<1>(&file_ready_cb_), Return(file_event_)));
transport_socket_ = new NiceMock<MockTransportSocket>;
EXPECT_CALL(*transport_socket_, setTransportSocketCallbacks(_))
.WillOnce(Invoke([this](TransportSocketCallbacks& callbacks) {
transport_socket_callbacks_ = &callbacks;
}));
connection_.reset(
new ConnectionImpl(dispatcher_, std::make_unique<ConnectionSocketImpl>(0, nullptr, nullptr),
TransportSocketPtr(transport_socket_), true));
Expand All @@ -861,9 +865,10 @@ class MockTransportConnectionImplTest : public testing::Test {
std::unique_ptr<ConnectionImpl> connection_;
Event::MockDispatcher dispatcher_;
NiceMock<MockConnectionCallbacks> callbacks_;
NiceMock<MockTransportSocket>* transport_socket_;
MockTransportSocket* transport_socket_;
Event::MockFileEvent* file_event_;
Event::FileReadyCb file_ready_cb_;
TransportSocketCallbacks* transport_socket_callbacks_;
};

// Test that BytesSentCb is invoked at the correct times
Expand Down Expand Up @@ -1122,6 +1127,33 @@ TEST_F(MockTransportConnectionImplTest, WriteEndStreamStopIteration) {
connection_->write(buffer, true);
}

// Validate that when the transport signals ConnectionEvent::Connected, that we
// check for pending write buffer content.
TEST_F(MockTransportConnectionImplTest, WriteReadyOnConnected) {
InSequence s;

// Queue up some data in write buffer, simulating what happens in SSL handshake.
const std::string val("some data");
Buffer::OwnedImpl buffer(val);
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Write)).WillOnce(Invoke(file_ready_cb_));
EXPECT_CALL(*transport_socket_, doWrite(BufferStringEqual(val), false))
.WillOnce(Return(IoResult{PostIoAction::KeepOpen, 100, false}));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should return 0 instead of 100; otherwise you're claiming that 100 bytes were written, but this test scenario is saying that it cannot be written yet (and 100 bytes is more than the size of the write, so it's weird).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please return 0 on this one

connection_->write(buffer, false);

// A read event happens, resulting in handshake completion and
// raiseEvent(Network::ConnectionEvent::Connected). Since we have data queued
// in the write buffer, we should see a doWrite with this data.
EXPECT_CALL(*transport_socket_, doRead(_)).WillOnce(InvokeWithoutArgs([this] {
transport_socket_callbacks_->raiseEvent(Network::ConnectionEvent::Connected);
return IoResult{PostIoAction::KeepOpen, 0, false};
}));
EXPECT_CALL(*transport_socket_, doWrite(BufferStringEqual(val), false))
.WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false}));
file_ready_cb_(Event::FileReadyType::Read);
EXPECT_CALL(*transport_socket_, doWrite(_, true))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What causes this call? Is this from shutdown of the test? If so, let's put this EXPECT after the file_ready_cb_ below.

.WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, true}));
}

class ReadBufferLimitTest : public ConnectionImplTest {
public:
void readBufferLimitTest(uint32_t read_buffer_limit, uint32_t expected_chunk_size) {
Expand Down