Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions cpp/src/plasma/events.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@

namespace plasma {

void EventLoop::file_event_callback(aeEventLoop* loop, int fd, void* context,
int events) {
void EventLoop::FileEventCallback(aeEventLoop* loop, int fd, void* context, int events) {
FileCallback* callback = reinterpret_cast<FileCallback*>(context);
(*callback)(events);
}

int EventLoop::timer_event_callback(aeEventLoop* loop, TimerID timer_id, void* context) {
int EventLoop::TimerEventCallback(aeEventLoop* loop, TimerID timer_id, void* context) {
TimerCallback* callback = reinterpret_cast<TimerCallback*>(context);
return (*callback)(timer_id);
}
Expand All @@ -36,21 +35,21 @@ constexpr int kInitialEventLoopSize = 1024;

EventLoop::EventLoop() { loop_ = aeCreateEventLoop(kInitialEventLoopSize); }

bool EventLoop::add_file_event(int fd, int events, const FileCallback& callback) {
bool EventLoop::AddFileEvent(int fd, int events, const FileCallback& callback) {
if (file_callbacks_.find(fd) != file_callbacks_.end()) {
return false;
}
auto data = std::unique_ptr<FileCallback>(new FileCallback(callback));
void* context = reinterpret_cast<void*>(data.get());
// Try to add the file descriptor.
int err = aeCreateFileEvent(loop_, fd, events, EventLoop::file_event_callback, context);
int err = aeCreateFileEvent(loop_, fd, events, EventLoop::FileEventCallback, context);
// If it cannot be added, increase the size of the event loop.
if (err == AE_ERR && errno == ERANGE) {
err = aeResizeSetSize(loop_, 3 * aeGetSetSize(loop_) / 2);
if (err != AE_OK) {
return false;
}
err = aeCreateFileEvent(loop_, fd, events, EventLoop::file_event_callback, context);
err = aeCreateFileEvent(loop_, fd, events, EventLoop::FileEventCallback, context);
}
// In any case, test if there were errors.
if (err == AE_OK) {
Expand All @@ -60,23 +59,28 @@ bool EventLoop::add_file_event(int fd, int events, const FileCallback& callback)
return false;
}

void EventLoop::remove_file_event(int fd) {
void EventLoop::RemoveFileEvent(int fd) {
aeDeleteFileEvent(loop_, fd, AE_READABLE | AE_WRITABLE);
file_callbacks_.erase(fd);
}

void EventLoop::run() { aeMain(loop_); }
void EventLoop::Start() { aeMain(loop_); }

int64_t EventLoop::add_timer(int64_t timeout, const TimerCallback& callback) {
void EventLoop::Stop() {
aeStop(loop_);
aeDeleteEventLoop(loop_);
}

int64_t EventLoop::AddTimer(int64_t timeout, const TimerCallback& callback) {
auto data = std::unique_ptr<TimerCallback>(new TimerCallback(callback));
void* context = reinterpret_cast<void*>(data.get());
int64_t timer_id =
aeCreateTimeEvent(loop_, timeout, EventLoop::timer_event_callback, context, NULL);
aeCreateTimeEvent(loop_, timeout, EventLoop::TimerEventCallback, context, NULL);
timer_callbacks_.emplace(timer_id, std::move(data));
return timer_id;
}

int EventLoop::remove_timer(int64_t timer_id) {
int EventLoop::RemoveTimer(int64_t timer_id) {
int err = aeDeleteTimeEvent(loop_, timer_id);
timer_callbacks_.erase(timer_id);
return err;
Expand Down
19 changes: 11 additions & 8 deletions cpp/src/plasma/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,37 +61,40 @@ class EventLoop {
/// @param events The flags for events we are listening to (read or write).
/// @param callback The callback that will be called when the event happens.
/// @return Returns true if the event handler was added successfully.
bool add_file_event(int fd, int events, const FileCallback& callback);
bool AddFileEvent(int fd, int events, const FileCallback& callback);

/// Remove a file event handler from the event loop.
///
/// @param fd The file descriptor of the event handler.
/// @return Void.
void remove_file_event(int fd);
void RemoveFileEvent(int fd);

/// Register a handler that will be called after a time slice of
/// "timeout" milliseconds.
///
/// @param timeout The timeout in milliseconds.
/// @param callback The callback for the timeout.
/// @return The ID of the newly created timer.
int64_t add_timer(int64_t timeout, const TimerCallback& callback);
int64_t AddTimer(int64_t timeout, const TimerCallback& callback);

/// Remove a timer handler from the event loop.
///
/// @param timer_id The ID of the timer that is to be removed.
/// @return The ae.c error code. TODO(pcm): needs to be standardized
int remove_timer(int64_t timer_id);
int RemoveTimer(int64_t timer_id);

/// Run the event loop.
/// \brief Run the event loop.
///
/// @return Void.
void run();
void Start();

/// \brief Stop the event loop
void Stop();

private:
static void file_event_callback(aeEventLoop* loop, int fd, void* context, int events);
static void FileEventCallback(aeEventLoop* loop, int fd, void* context, int events);

static int timer_event_callback(aeEventLoop* loop, TimerID timer_id, void* context);
static int TimerEventCallback(aeEventLoop* loop, TimerID timer_id, void* context);

aeEventLoop* loop_;
std::unordered_map<int, std::unique_ptr<FileCallback>> file_callbacks_;
Expand Down
87 changes: 60 additions & 27 deletions cpp/src/plasma/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ void PlasmaStore::return_from_get(GetRequest* get_req) {
}
// Remove the get request.
if (get_req->timer != -1) {
ARROW_CHECK(loop_->remove_timer(get_req->timer) == AE_OK);
ARROW_CHECK(loop_->RemoveTimer(get_req->timer) == AE_OK);
}
delete get_req;
}
Expand Down Expand Up @@ -330,7 +330,7 @@ void PlasmaStore::process_get_request(Client* client,
} else if (timeout_ms != -1) {
// Set a timer that will cause the get request to return to the client. Note
// that a timeout of -1 is used to indicate that no timer should be set.
get_req->timer = loop_->add_timer(timeout_ms, [this, get_req](int64_t timer_id) {
get_req->timer = loop_->AddTimer(timeout_ms, [this, get_req](int64_t timer_id) {
return_from_get(get_req);
return kEventLoopTimerDone;
});
Expand Down Expand Up @@ -412,11 +412,13 @@ void PlasmaStore::delete_objects(const std::vector<ObjectID>& object_ids) {

void PlasmaStore::connect_client(int listener_sock) {
int client_fd = AcceptClient(listener_sock);
// This is freed in disconnect_client.

Client* client = new Client(client_fd);
connected_clients_[client_fd] = std::unique_ptr<Client>(client);

// Add a callback to handle events on this socket.
// TODO(pcm): Check return value.
loop_->add_file_event(client_fd, kEventLoopRead, [this, client](int events) {
loop_->AddFileEvent(client_fd, kEventLoopRead, [this, client](int events) {
Status s = process_message(client);
if (!s.ok()) {
ARROW_LOG(FATAL) << "Failed to process file event: " << s;
Expand All @@ -425,23 +427,25 @@ void PlasmaStore::connect_client(int listener_sock) {
ARROW_LOG(DEBUG) << "New connection with fd " << client_fd;
}

void PlasmaStore::disconnect_client(Client* client) {
ARROW_CHECK(client != NULL);
ARROW_CHECK(client->fd > 0);
loop_->remove_file_event(client->fd);
void PlasmaStore::disconnect_client(int client_fd) {
ARROW_CHECK(client_fd > 0);
auto it = connected_clients_.find(client_fd);
ARROW_CHECK(it != connected_clients_.end());
loop_->RemoveFileEvent(client_fd);
// Close the socket.
close(client->fd);
ARROW_LOG(INFO) << "Disconnecting client on fd " << client->fd;
close(client_fd);
ARROW_LOG(INFO) << "Disconnecting client on fd " << client_fd;
// If this client was using any objects, remove it from the appropriate
// lists.
for (const auto& entry : store_info_.objects) {
remove_client_from_object_clients(entry.second.get(), client);
remove_client_from_object_clients(entry.second.get(), it->second.get());
}

// Note, the store may still attempt to send a message to the disconnected
// client (for example, when an object ID that the client was waiting for
// is ready). In these cases, the attempt to send the message will fail, but
// the store should just ignore the failure.
delete client;
connected_clients_.erase(it);
}

/// Send notifications about sealed objects to the subscribers. This is called
Expand Down Expand Up @@ -478,7 +482,7 @@ void PlasmaStore::send_notifications(int client_fd) {
// at the end of the method.
// TODO(pcm): Introduce status codes and check in case the file descriptor
// is added twice.
loop_->add_file_event(client_fd, kEventLoopWrite, [this, client_fd](int events) {
loop_->AddFileEvent(client_fd, kEventLoopWrite, [this, client_fd](int events) {
send_notifications(client_fd);
});
break;
Expand Down Expand Up @@ -507,7 +511,7 @@ void PlasmaStore::send_notifications(int client_fd) {

// If we have sent all notifications, remove the fd from the event loop.
if (it->second.object_notifications.empty()) {
loop_->remove_file_event(client_fd);
loop_->RemoveFileEvent(client_fd);
}
}

Expand Down Expand Up @@ -616,7 +620,7 @@ Status PlasmaStore::process_message(Client* client) {
} break;
case DISCONNECT_CLIENT:
ARROW_LOG(DEBUG) << "Disconnecting client on fd " << client->fd;
disconnect_client(client);
disconnect_client(client->fd);
break;
default:
// This code should be unreachable.
Expand All @@ -625,9 +629,43 @@ Status PlasmaStore::process_message(Client* client) {
return Status::OK();
}

// Report "success" to valgrind.
void signal_handler(int signal) {
class PlasmaStoreRunner {
public:
PlasmaStoreRunner() {}

void Start(char* socket_name, int64_t system_memory) {
// Create the event loop.
loop_.reset(new EventLoop);
store_.reset(new PlasmaStore(loop_.get(), system_memory));
int socket = bind_ipc_sock(socket_name, true);
// TODO(pcm): Check return value.
ARROW_CHECK(socket >= 0);

loop_->AddFileEvent(socket, kEventLoopRead, [this, socket](int events) {
this->store_->connect_client(socket);
});
loop_->Start();
}

void Shutdown() {
loop_->Stop();
loop_ = nullptr;
store_ = nullptr;
}

private:
std::unique_ptr<EventLoop> loop_;
std::unique_ptr<PlasmaStore> store_;
};

static PlasmaStoreRunner* g_runner = nullptr;

void HandleSignal(int signal) {
if (signal == SIGTERM) {
if (g_runner != nullptr) {
g_runner->Shutdown();
}
// Report "success" to valgrind.
exit(0);
}
}
Expand All @@ -636,21 +674,16 @@ void start_server(char* socket_name, int64_t system_memory) {
// Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
// to a client that has already died, the store could die.
signal(SIGPIPE, SIG_IGN);
// Create the event loop.
EventLoop loop;
PlasmaStore store(&loop, system_memory);
int socket = bind_ipc_sock(socket_name, true);
ARROW_CHECK(socket >= 0);
// TODO(pcm): Check return value.
loop.add_file_event(socket, kEventLoopRead,
[&store, socket](int events) { store.connect_client(socket); });
loop.run();

PlasmaStoreRunner runner;
g_runner = &runner;
signal(SIGTERM, HandleSignal);
runner.Start(socket_name, system_memory);
}

} // namespace plasma

int main(int argc, char* argv[]) {
signal(SIGTERM, plasma::signal_handler);
char* socket_name = NULL;
int64_t system_memory = -1;
int c;
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/plasma/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ class PlasmaStore {

/// Disconnect a client from the PlasmaStore.
///
/// @param client The client that is disconnected.
/// @param client_fd The client file descriptor that is disconnected.
/// @return Void.
void disconnect_client(Client* client);
void disconnect_client(int client_fd);

void send_notifications(int client_fd);

Expand Down Expand Up @@ -166,6 +166,8 @@ class PlasmaStore {
/// TODO(pcm): Consider putting this into the Client data structure and
/// reorganize the code slightly.
std::unordered_map<int, NotificationQueue> pending_notifications_;

std::unordered_map<int, std::unique_ptr<Client>> connected_clients_;
};

} // namespace plasma
Expand Down