Skip to content

Commit

Permalink
Implement nonblocking IO in IPC server
Browse files Browse the repository at this point in the history
Added client write buffer and handler for writable status on client
socket.
  • Loading branch information
ilyaluk committed Aug 8, 2017
1 parent e12d1cf commit 1c5a96c
Showing 1 changed file with 96 additions and 5 deletions.
101 changes: 96 additions & 5 deletions sway/ipc-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,15 @@ static const char ipc_magic[] = {'i', '3', '-', 'i', 'p', 'c'};

struct ipc_client {
struct wlc_event_source *event_source;
struct wlc_event_source *writable_event_source;
int fd;
uint32_t payload_length;
uint32_t security_policy;
enum ipc_command_type current_command;
enum ipc_command_type subscribed_events;
size_t write_buffer_len;
size_t write_buffer_size;
char *write_buffer;
};

static list_t *ipc_get_pixel_requests = NULL;
Expand All @@ -58,6 +62,7 @@ struct get_pixels_request {
struct sockaddr_un *ipc_user_sockaddr(void);
int ipc_handle_connection(int fd, uint32_t mask, void *data);
int ipc_client_handle_readable(int client_fd, uint32_t mask, void *data);
int ipc_client_handle_writable(int client_fd, uint32_t mask, void *data);
void ipc_client_disconnect(struct ipc_client *client);
void ipc_client_handle_command(struct ipc_client *client);
bool ipc_send_reply(struct ipc_client *client, const char *payload, uint32_t payload_length);
Expand Down Expand Up @@ -168,6 +173,12 @@ int ipc_handle_connection(int fd, uint32_t mask, void *data) {
close(client_fd);
return 0;
}
if ((flags = fcntl(client_fd, F_GETFL)) == -1
|| fcntl(client_fd, F_SETFL, flags|O_NONBLOCK) == -1) {
sway_log_errno(L_ERROR, "Unable to set NONBLOCK on IPC client socket");
close(client_fd);
return 0;
}

struct ipc_client* client = malloc(sizeof(struct ipc_client));
if (!client) {
Expand All @@ -179,10 +190,22 @@ int ipc_handle_connection(int fd, uint32_t mask, void *data) {
client->fd = client_fd;
client->subscribed_events = 0;
client->event_source = wlc_event_loop_add_fd(client_fd, WLC_EVENT_READABLE, ipc_client_handle_readable, client);
client->writable_event_source = NULL;

client->write_buffer_size = 128;
client->write_buffer_len = 0;
client->write_buffer = malloc(client->write_buffer_size);
if (!client->write_buffer) {
sway_log(L_ERROR, "Unable to allocate ipc client write buffer");
close(client_fd);
return 0;
}

pid_t pid = get_client_pid(client->fd);
client->security_policy = get_ipc_policy_mask(pid);

sway_log(L_DEBUG, "New client: fd %d, pid %d", client_fd, pid);

list_add(ipc_client_list, client);

return 0;
Expand All @@ -205,6 +228,8 @@ int ipc_client_handle_readable(int client_fd, uint32_t mask, void *data) {
return 0;
}

sway_log(L_DEBUG, "Client %d readable", client->fd);

int read_available;
if (ioctl(client_fd, FIONREAD, &read_available) == -1) {
sway_log_errno(L_INFO, "Unable to read IPC socket buffer size");
Expand All @@ -226,6 +251,7 @@ int ipc_client_handle_readable(int client_fd, uint32_t mask, void *data) {

uint8_t buf[ipc_header_size];
uint32_t *buf32 = (uint32_t*)(buf + sizeof(ipc_magic));
// Should be fully available, because read_available >= ipc_header_size
ssize_t received = recv(client_fd, buf, ipc_header_size, 0);
if (received == -1) {
sway_log_errno(L_INFO, "Unable to receive header from IPC client");
Expand All @@ -249,6 +275,48 @@ int ipc_client_handle_readable(int client_fd, uint32_t mask, void *data) {
return 0;
}

int ipc_client_handle_writable(int client_fd, uint32_t mask, void *data) {
struct ipc_client *client = data;

if (mask & WLC_EVENT_ERROR) {
sway_log(L_ERROR, "IPC Client socket error, removing client");
ipc_client_disconnect(client);
return 0;
}

if (mask & WLC_EVENT_HANGUP) {
sway_log(L_DEBUG, "Client %d hung up", client->fd);
ipc_client_disconnect(client);
return 0;
}

if (client->write_buffer_len <= 0) {
return 0;
}

sway_log(L_DEBUG, "Client %d writable", client->fd);

ssize_t written = write(client->fd, client->write_buffer, client->write_buffer_len);

if (written == -1 && errno == EAGAIN) {
return 0;
} else if (written == -1) {
sway_log_errno(L_INFO, "Unable to send data from queue to IPC client");
ipc_client_disconnect(client);
return 0;
}

memmove(client->write_buffer, client->write_buffer + written, client->write_buffer_len - written);
client->write_buffer_len -= written;

if (client->write_buffer_len == 0 && client->writable_event_source) {
wlc_event_source_remove(client->writable_event_source);
client->writable_event_source = NULL;
}

return 0;
}

void ipc_client_disconnect(struct ipc_client *client) {
if (!sway_assert(client != NULL, "client != NULL")) {
return;
Expand All @@ -260,9 +328,13 @@ void ipc_client_disconnect(struct ipc_client *client) {

sway_log(L_INFO, "IPC Client %d disconnected", client->fd);
wlc_event_source_remove(client->event_source);
if (client->writable_event_source) {
wlc_event_source_remove(client->writable_event_source);
}
int i = 0;
while (i < ipc_client_list->length && ipc_client_list->items[i] != client) i++;
list_del(ipc_client_list, i);
free(client->write_buffer);
close(client->fd);
free(client);
}
Expand Down Expand Up @@ -334,6 +406,7 @@ void ipc_client_handle_command(struct ipc_client *client) {
return;
}
if (client->payload_length > 0) {
// Payload should be fully available
ssize_t received = recv(client->fd, buf, client->payload_length, 0);
if (received == -1)
{
Expand Down Expand Up @@ -590,17 +663,35 @@ bool ipc_send_reply(struct ipc_client *client, const char *payload, uint32_t pay
data32[0] = payload_length;
data32[1] = client->current_command;

if (write(client->fd, data, ipc_header_size) == -1) {
sway_log_errno(L_INFO, "Unable to send header to IPC client");
while (client->write_buffer_len + ipc_header_size + payload_length >=
client->write_buffer_size) {
client->write_buffer_size *= 2;
}

if (client->write_buffer_size > (1 << 22)) { // 4 MB
sway_log(L_ERROR, "Client write buffer too big, disconnecting client");
ipc_client_disconnect(client);
return false;
}

if (write(client->fd, payload, payload_length) == -1) {
sway_log_errno(L_INFO, "Unable to send payload to IPC client");
char *new_buffer = realloc(client->write_buffer, client->write_buffer_size);
if (!new_buffer) {
sway_log(L_ERROR, "Unable to reallocate ipc client write buffer");
ipc_client_disconnect(client);
return false;
}
client->write_buffer = new_buffer;

memcpy(client->write_buffer + client->write_buffer_len, data, ipc_header_size);
client->write_buffer_len += ipc_header_size;
memcpy(client->write_buffer + client->write_buffer_len, payload, payload_length);
client->write_buffer_len += payload_length;

if (!client->writable_event_source) {
client->writable_event_source = wlc_event_loop_add_fd(client->fd, WLC_EVENT_WRITABLE, ipc_client_handle_writable, client);
}

sway_log(L_DEBUG, "Send IPC reply: %s", payload);
sway_log(L_DEBUG, "Added IPC reply to client %d queue: %s", client->fd, payload);

return true;
}
Expand Down

0 comments on commit 1c5a96c

Please sign in to comment.