Skip to content

Commit

Permalink
network: cleanup connection logic and crash when fd cannot be allocat…
Browse files Browse the repository at this point in the history
…ed (#237)

The fix in 8d5366 looks good. This commit further cleans up the socket
close logic to make it simpler and more unified. It also changes the
behavior to crash the process if we can't allocate a socket or we have
an error during accept. In practice those only happen when we run out of
FDs which should be considered an OOM condition where we also crash on
purpose.
  • Loading branch information
mattklein123 authored Nov 21, 2016
1 parent 8d53667 commit 210b5dd
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 44 deletions.
4 changes: 0 additions & 4 deletions source/common/common/assert.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
* assert macro that uses our builtin logging which gives us thread ID and can log to various
* sinks.
*/
#ifndef COVERAGE
#define RELEASE_ASSERT(X) \
{ \
if (!(X)) { \
Expand All @@ -15,9 +14,6 @@
abort(); \
} \
}
#else
#define RELEASE_ASSERT(X)
#endif

#ifndef NDEBUG
#define ASSERT(X) RELEASE_ASSERT(X)
Expand Down
47 changes: 18 additions & 29 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ ConnectionImpl::ConnectionImpl(Event::DispatcherImpl& dispatcher, int fd,
: filter_manager_(*this, *this), remote_address_(remote_address), dispatcher_(dispatcher),
fd_(fd), id_(++next_global_id_) {

// Treat the lack of a valid fd (which in practice only happens if we run out of FDs) as an OOM
// condition and just crash.
RELEASE_ASSERT(fd_ != -1);

file_event_ =
dispatcher_.createFileEvent(fd_, [this](uint32_t events) -> void { onFileEvent(events); });
if (fd_ == -1) {
// Can't obtain a socket.
state_ |= InternalState::ImmediateConnectionError;
file_event_->activate(Event::FileReadyType::Write);
}

read_buffer_.setCallback([this](uint64_t old_size, int64_t delta) -> void {
onBufferChange(ConnectionBufferType::Read, old_size, delta);
Expand Down Expand Up @@ -69,7 +68,7 @@ void ConnectionImpl::close(ConnectionCloseType type) {
doWriteToSocket();
}

doLocalClose();
closeSocket(ConnectionEvent::LocalClose);
} else {
ASSERT(type == ConnectionCloseType::FlushWrite);
state_ |= InternalState::CloseWithFlush;
Expand All @@ -87,9 +86,12 @@ Connection::State ConnectionImpl::state() {
}
}

void ConnectionImpl::closeSocket() {
ASSERT(fd_ != -1 || (state_ & InternalState::ImmediateConnectionError));
conn_log_debug("closing socket", *this);
void ConnectionImpl::closeSocket(uint32_t close_type) {
if (fd_ == -1) {
return;
}

conn_log_debug("closing socket: {}", *this, close_type);

// Drain input and output buffers so that callbacks get fired. This does not happen automatically
// as part of destruction.
Expand All @@ -109,15 +111,8 @@ void ConnectionImpl::closeSocket() {
file_event_.reset();
::close(fd_);
fd_ = -1;
}

void ConnectionImpl::doLocalClose() {
conn_log_debug("doing local close", *this);
closeSocket();

// We expect our owner to deal with freeing us in whatever way makes sense. We raise an event
// to kick that off.
raiseEvents(ConnectionEvent::LocalClose);
raiseEvents(close_type);
}

Event::Dispatcher& ConnectionImpl::dispatcher() { return dispatcher_; }
Expand Down Expand Up @@ -232,8 +227,7 @@ void ConnectionImpl::onFileEvent(uint32_t events) {

if (state_ & InternalState::ImmediateConnectionError) {
conn_log_debug("raising immediate connect error", *this);
closeSocket();
raiseEvents(ConnectionEvent::RemoteClose);
closeSocket(ConnectionEvent::RemoteClose);
return;
}

Expand Down Expand Up @@ -282,10 +276,9 @@ void ConnectionImpl::onReadReady() {
onRead();

// The read callback may have already closed the connection.
if (fd_ != -1 && action == PostIoAction::Close) {
if (action == PostIoAction::Close) {
conn_log_debug("remote close", *this);
closeSocket();
raiseEvents(ConnectionEvent::RemoteClose);
closeSocket(ConnectionEvent::RemoteClose);
}
}

Expand Down Expand Up @@ -326,8 +319,7 @@ void ConnectionImpl::onWriteReady() {
onConnected();
} else {
conn_log_debug("delayed connection error: {}", *this, error);
closeSocket();
raiseEvents(ConnectionEvent::RemoteClose);
closeSocket(ConnectionEvent::RemoteClose);
return;
}
}
Expand All @@ -336,13 +328,10 @@ void ConnectionImpl::onWriteReady() {
// It is possible (though unlikely) for the connection to have already been closed during the
// write callback. This can happen if we manage to complete the SSL handshake in the write
// callback, raise a connected event, and close the connection.
if (fd_ != -1) {
closeSocket();
raiseEvents(ConnectionEvent::RemoteClose);
}
closeSocket(ConnectionEvent::RemoteClose);
} else if ((state_ & InternalState::CloseWithFlush) && write_buffer_.length() == 0) {
conn_log_debug("write flush complete", *this);
doLocalClose();
closeSocket(ConnectionEvent::LocalClose);
}
}

Expand Down
3 changes: 1 addition & 2 deletions source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ConnectionImpl : public virtual Connection,
protected:
enum class PostIoAction { Close, KeepOpen };

virtual void closeSocket();
virtual void closeSocket(uint32_t close_type);
void doConnect(const sockaddr* addr, socklen_t addrlen);
void raiseEvents(uint32_t events);

Expand All @@ -67,7 +67,6 @@ class ConnectionImpl : public virtual Connection,
};
// clang-format on

void doLocalClose();
virtual PostIoAction doReadFromSocket();
virtual PostIoAction doWriteToSocket();
void onBufferChange(ConnectionBufferType type, uint64_t old_size, int64_t delta);
Expand Down
8 changes: 8 additions & 0 deletions source/common/network/listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ ListenerImpl::ListenerImpl(Event::DispatcherImpl& dispatcher, ListenSocket& sock
if (!listener_) {
throw CreateListenerException(fmt::format("cannot listen on socket: {}", socket.name()));
}

evconnlistener_set_error_cb(listener_.get(), errorCallback);
}

void ListenerImpl::errorCallback(evconnlistener*, void*) {
// We should never get an error callback. This can happen if we run out of FDs or memory. In those
// cases just crash.
PANIC(fmt::format("listener accept failure: {}", strerror(errno)));
}

void ListenerImpl::newConnection(int fd, sockaddr* addr) {
Expand Down
2 changes: 2 additions & 0 deletions source/common/network/listener_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class ListenerImpl : public Listener {
ProxyProtocol proxy_protocol_;

private:
static void errorCallback(evconnlistener* listener, void* context);

Event::Libevent::ListenerPtr listener_;
};

Expand Down
4 changes: 2 additions & 2 deletions source/common/ssl/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ void ClientConnectionImpl::connect() {
doConnect(addr_info->ai_addr, addr_info->ai_addrlen);
}

void ConnectionImpl::closeSocket() {
void ConnectionImpl::closeSocket(uint32_t close_type) {
if (handshake_complete_) {
// Attempt to send a shutdown before closing the socket. It's possible this won't go out if
// there is no room on the socket. We can extend the state machine to handle this at some point
Expand All @@ -206,7 +206,7 @@ void ConnectionImpl::closeSocket() {
drainErrorQueue();
}

Network::ConnectionImpl::closeSocket();
Network::ConnectionImpl::closeSocket(close_type);
}

std::string ConnectionImpl::nextProtocol() {
Expand Down
2 changes: 1 addition & 1 deletion source/common/ssl/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class ConnectionImpl : public Network::ConnectionImpl, public Connection {
void drainErrorQueue();

// Network::ConnectionImpl
void closeSocket() override;
void closeSocket(uint32_t close_type) override;
PostIoAction doReadFromSocket() override;
PostIoAction doWriteToSocket() override;
void onConnected() override;
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ add_executable(envoy-test
common/network/connection_impl_test.cc
common/network/dns_impl_test.cc
common/network/filter_manager_impl_test.cc
common/network/listener_impl_test.cc
common/network/listen_socket_impl_test.cc
common/network/proxy_protocol_test.cc
common/network/utility_test.cc
Expand Down
8 changes: 2 additions & 6 deletions test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,9 @@ using testing::Test;

namespace Network {

TEST(ConnectionImplTest, BadFd) {
TEST(ConnectionImplDeathTest, BadFd) {
Event::DispatcherImpl dispatcher;
ConnectionImpl connection(dispatcher, -1, "127.0.0.1");
MockConnectionCallbacks callbacks;
connection.addConnectionCallbacks(callbacks);
EXPECT_CALL(callbacks, onEvent(ConnectionEvent::RemoteClose));
dispatcher.run(Event::Dispatcher::RunType::Block);
EXPECT_DEATH(ConnectionImpl(dispatcher, -1, "127.0.0.1"), ".*assert failure: fd_ != -1.*");
}

TEST(ConnectionImplTest, BufferCallbacks) {
Expand Down
39 changes: 39 additions & 0 deletions test/common/network/listener_impl_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#include "common/network/listener_impl.h"
#include "common/stats/stats_impl.h"

#include "test/mocks/network/mocks.h"

using testing::_;
using testing::Invoke;

namespace Network {

static void errorCallbackTest() {
// Force the error callback to fire by closing the socket under the listener. We run this entire
// test in the forked process to avoid confusion when the fork happens.
Stats::IsolatedStoreImpl stats_store;
Event::DispatcherImpl dispatcher;
Network::TcpListenSocket socket(10000);
Network::MockListenerCallbacks listener_callbacks;
Network::ListenerPtr listener =
dispatcher.createListener(socket, listener_callbacks, stats_store, false);

Network::ClientConnectionPtr client_connection =
dispatcher.createClientConnection("tcp://127.0.0.1:10000");
client_connection->connect();

EXPECT_CALL(listener_callbacks, onNewConnection_(_))
.WillOnce(Invoke([&](Network::ConnectionPtr& conn) -> void {
client_connection->close(ConnectionCloseType::NoFlush);
conn->close(ConnectionCloseType::NoFlush);
socket.close();
}));

dispatcher.run(Event::Dispatcher::RunType::Block);
}

TEST(ListenerImplDeathTest, ErrorCallback) {
EXPECT_DEATH(errorCallbackTest(), ".*listener accept failure.*");
}

} // Network
8 changes: 8 additions & 0 deletions test/server/options_impl_test.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
#include "server/options_impl.h"

TEST(OptionsImplDeathTest, HotRestartVersion) {
std::vector<const char*> argv;
argv.push_back("envoy");
argv.push_back("--hot-restart-version");
EXPECT_EXIT(OptionsImpl(argv.size(), const_cast<char**>(&argv[0]), "1", spdlog::level::warn),
testing::ExitedWithCode(0), "");
}

TEST(OptionsImplTest, All) {
std::vector<const char*> argv;
argv.push_back("envoy");
Expand Down

0 comments on commit 210b5dd

Please sign in to comment.