Skip to content

Commit

Permalink
Fix potential race condition (#248)
Browse files Browse the repository at this point in the history
Prior to this PR, there was a possible race condition that could be hit by two execution paths where,

- path 1 calls Server::drop_stream which is a yielding call
- path 2 call Server:m_connections->drop_all_streams();

`Server::drop_stream` locks the server's state, but `Server::m_connections` is not protected by that mutex.

Relates to #247

Authors:
  - Ryan Olson (https://github.com/ryanolson)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #248
  • Loading branch information
ryanolson authored Dec 6, 2022
1 parent 9da384b commit 0db47ad
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 17 deletions.
37 changes: 35 additions & 2 deletions cpp/mrc/src/internal/control_plane/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ void Server::do_service_await_join()
{
// clear all instances which drops their held stream writers
DVLOG(10) << "awaiting all streams";
m_connections.drop_all_streams();
drop_all_streams();

// we keep the event handlers open until the streams are closed
m_queue->disable_persistence();
Expand Down Expand Up @@ -301,6 +301,7 @@ void Server::do_handle_event(event_t&& event)
}
else
{
DVLOG(10) << "event.ok failed; close stream";
drop_stream(event.stream);
}
} catch (const tl::bad_expected_access<Error>& e)
Expand Down Expand Up @@ -584,10 +585,25 @@ void Server::drop_instance(const instance_id_t& instance_id)
}

void Server::drop_stream(writer_t& writer)
{
const auto stream_id = writer->get_id();
drop_stream(stream_id);
writer.reset();
}

void Server::drop_stream(const stream_id_t& stream_id)
{
std::lock_guard<decltype(m_mutex)> lock(m_mutex);

const auto stream_id = writer->get_id();
auto search = m_connections.streams().find(stream_id);
if (search == m_connections.streams().end())
{
LOG(WARNING) << "attempting to drop stream_id: " << stream_id
<< " which is not found in set of connected streams";
}

auto writer = search->second->writer();

DVLOG(10) << "dropping stream with machine_id: " << stream_id;

// for each instance - iterate over state machines and drop the instance id
Expand All @@ -603,6 +619,23 @@ void Server::drop_stream(writer_t& writer)
m_connections.drop_stream(stream_id);
}

void Server::drop_all_streams()
{
std::vector<stream_id_t> stream_ids;
{
std::lock_guard<decltype(m_mutex)> lock(m_mutex);
for (const auto& [id, stream] : m_connections.streams())
{
stream_ids.push_back(id);
}
}

for (const auto& id : stream_ids)
{
drop_stream(id);
}
}

Expected<Server::instance_t> Server::validate_instance_id(const instance_id_t& instance_id, const event_t& event) const
{
return m_connections.get_instance(instance_id).and_then([&event, &instance_id](auto& i) -> Expected<instance_t> {
Expand Down
2 changes: 2 additions & 0 deletions cpp/mrc/src/internal/control_plane/server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ class Server : public Service

void drop_instance(const instance_id_t& instance_id);
void drop_stream(writer_t& writer);
void drop_stream(const stream_id_t& stream_id);
void drop_all_streams();
static void on_fatal_exception();

// convenience methods - these method do not lock internal state
Expand Down
15 changes: 1 addition & 14 deletions cpp/mrc/src/internal/control_plane/server/connection_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,6 @@ void ConnectionManager::drop_stream(const stream_id_t& stream_id) noexcept
mark_as_modified();
}

void ConnectionManager::drop_all_streams() noexcept
{
std::vector<stream_id_t> ids;
for (auto& [stream_id, stream] : m_streams)
{
ids.push_back(stream_id);
}
for (const auto& id : ids)
{
drop_stream(id);
}
}

Expected<ConnectionManager::instance_t> ConnectionManager::get_instance(const instance_id_t& instance_id) const
{
auto search = m_instances.find(instance_id);
Expand Down Expand Up @@ -126,7 +113,7 @@ Expected<protos::RegisterWorkersResponse> ConnectionManager::register_instances(
}

// check if any workers/instances have been registered on the requesting stream
if (m_instances_by_stream.count(stream_id) != 0)
if (m_instances_by_stream.contains(stream_id))
{
return Error::create(MRC_CONCAT_STR("failed to register instances on immutable stream "
<< stream_id << "; streams are immutable after first registration"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class ConnectionManager : public VersionedState

void add_stream(const stream_t& stream);
void drop_stream(const stream_id_t& stream_id) noexcept;
void drop_all_streams() noexcept;

const std::map<stream_id_t, stream_t>& streams() const;

Expand Down
2 changes: 2 additions & 0 deletions cpp/mrc/src/internal/resources/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ Manager::Manager(std::unique_ptr<system::Resources> resources) :
}
});
}

VLOG(10) << "resources::Manager initialized";
}

Manager::~Manager()
Expand Down
4 changes: 4 additions & 0 deletions cpp/mrc/src/tests/test_codable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,15 @@ class TestCodable : public ::testing::Test
})));

m_runtime = std::make_unique<internal::runtime::Runtime>(std::move(resources));

DVLOG(10) << "Setup Complete";
}

void TearDown() override
{
DVLOG(10) << "Start Teardown";
m_runtime.reset();
DVLOG(10) << "Teardown Complete";
}

std::unique_ptr<internal::runtime::Runtime> m_runtime;
Expand Down

0 comments on commit 0db47ad

Please sign in to comment.