Skip to content

Commit

Permalink
fix FD_SET crash when fd exceeds 1024
Browse files Browse the repository at this point in the history
  • Loading branch information
six-ddc committed Nov 11, 2017
1 parent f9493ea commit 6b0dbc5
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 45 deletions.
100 changes: 55 additions & 45 deletions executor.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <sys/wait.h>
#include <poll.h>
#include "common.h"
#include "sstring.h"
#include "executor.h"
Expand Down Expand Up @@ -75,46 +76,45 @@ fork_command(struct slot *pslot, void (*fn)(struct slot *, int, char **), int ar
}

static void
fdset_alive_slots(struct slot *pslot, fd_set *readfds) {
poll_alive_slots(struct slot *pslot, struct pollfd *pfd, size_t *cnt) {
*cnt = 0;
while (pslot) {
if (!pslot->alive) {
pslot->poll_index = -1;
pslot = pslot->next;
continue;
}
FD_SET(pslot->io.out[PIPE_READ_END], readfds);
FD_SET(pslot->io.err[PIPE_READ_END], readfds);
pslot->poll_index = (int) *cnt;

pfd[*cnt].fd = pslot->io.out[PIPE_READ_END];
pfd[*cnt].events = POLLIN;
*cnt = *cnt + 1;

pfd[*cnt].fd = pslot->io.err[PIPE_READ_END];
pfd[*cnt].events = POLLIN;
*cnt = *cnt + 1;

pslot = pslot->next;
}
}

static void
read_alive_slots(struct slot *pslot, fd_set *readfds, FILE *output) {
read_alive_slots(struct slot *pslot, struct pollfd *pfd, FILE *output) {
while (pslot) {
if (!pslot->alive) {
if (!pslot->alive || pslot->poll_index < 0) {
pslot = pslot->next;
continue;
}
if (FD_ISSET(pslot->io.out[PIPE_READ_END], readfds)) {
if (pfd[pslot->poll_index].revents & POLLIN) {
slot_read_line(pslot, STDOUT_FILENO, print_line, output);
}
if (FD_ISSET(pslot->io.err[PIPE_READ_END], readfds)) {
if (pfd[pslot->poll_index + 1].revents & POLLIN) {
slot_read_line(pslot, STDERR_FILENO, print_line, output);
}
pslot = pslot->next;
}
}

static void
read_dead_slots(struct slot *pslot, struct slot *pslot_end, FILE *output) {
while (pslot && pslot != pslot_end) {
if (!pslot->alive) {
slot_read_remains(pslot, STDOUT_FILENO, print_line, output);
slot_read_remains(pslot, STDERR_FILENO, print_line, output);
}
pslot = pslot->next;
}
}

int
exec_local_cmd(char *cmd) {
return system(cmd);
Expand Down Expand Up @@ -177,9 +177,12 @@ reap_child_handler(int sig) {
else
exit_code = 255;
pslot = slot_find_by_pid(slots, pid);
if (pslot) {
slot_close(pslot, exit_code);
if (!pslot) {
continue;
}
slot_read_remains(pslot, STDOUT_FILENO, print_line, pslot->output);
slot_read_remains(pslot, STDERR_FILENO, print_line, pslot->output);
slot_close(pslot, exit_code);
if (pconfig->verbose) {
printf("wait pid %d, status code %d\n", pslot->pid, pslot->exit_code);
}
Expand Down Expand Up @@ -314,19 +317,17 @@ exec_scp_cmd(struct slot *pslot, int argc, char **argv) {

static int
exec_command_foreach(struct slot *pslot_list, void (*fn_fork)(struct slot *, int, char **), int argc, char **argv) {
fd_set readfds;
struct slot *pslot = pslot_list->next;
struct slot *pslot_head = pslot;
struct timeval no_timeout;
struct timeval *timeout;
struct pollfd *pfd;
struct slot *pslot_head = pslot_list->next;
struct slot *pslot = pslot_head;
int timeout;
FILE *output;
size_t cnt = 0;

if (!pslot) {
if(!pslot) {
return 0;
}

memset(&no_timeout, 0, sizeof(struct timeval));

if (pconfig->output_file) {
output = fopen(pconfig->output_file, "a");
if (!output) {
Expand All @@ -337,6 +338,15 @@ exec_command_foreach(struct slot *pslot_list, void (*fn_fork)(struct slot *, int
output = stdout;
}

while (pslot) {
// poll stdout & stderr
cnt += 2;
pslot->output = output;
pslot = pslot->next;
}
pfd = calloc(cnt, sizeof(struct pollfd));
pslot = pslot_head;

alive_children = 0;

while (pslot || alive_children) {
Expand All @@ -347,25 +357,21 @@ exec_command_foreach(struct slot *pslot_list, void (*fn_fork)(struct slot *, int
usleep(10 * 1000);
}

read_dead_slots(pslot_head, pslot, output);

FD_ZERO(&readfds);
fdset_alive_slots(pslot_head, &readfds);
poll_alive_slots(pslot_head, pfd, &cnt);

timeout = pslot ? &no_timeout : NULL;
timeout = pslot ? 0 : -1;

if (select(MAXFD, &readfds, NULL, NULL, timeout) > 0) {
read_alive_slots(pslot_head, &readfds, output);
if (poll(pfd, (nfds_t) cnt, timeout) > 0) {
read_alive_slots(pslot_head, pfd, output);
}
UNBLOCK_SIGCHLD;
}

read_dead_slots(pslot_head, pslot, output);

if (output != stdout) {
fclose(output);
}

free(pfd);
return 0;
}

Expand All @@ -392,30 +398,34 @@ download_file(struct slot *pslot_list, char *remote_filename, char *local_filena

int
sync_exec_remote_cmd(struct slot *pslot_list, char *cmd, sstring *out, sstring *err) {

fd_set readfds;
struct pollfd pfd[2];
char *argv[1] = {cmd};
struct slot *pslot = pslot_list->next;

if (!pslot) {
return 0;
}

FD_ZERO(&readfds);
memset(&pfd[0], 0, sizeof(struct pollfd));
memset(&pfd[1], 0, sizeof(struct pollfd));

alive_children = 0;

BLOCK_SIGCHLD;
fork_command(pslot, exec_ssh_cmd, 1, argv);
FD_SET(pslot->io.out[PIPE_READ_END], &readfds);
FD_SET(pslot->io.err[PIPE_READ_END], &readfds);
pfd[0].fd = pslot->io.out[PIPE_READ_END];
pfd[0].events = POLLIN;
pfd[1].fd = pslot->io.err[PIPE_READ_END];
pfd[1].events = POLLIN;
UNBLOCK_SIGCHLD;

while (alive_children) {
BLOCK_SIGCHLD;
if (select(MAXFD, &readfds, NULL, NULL, NULL) > 0) {
if (FD_ISSET(pslot->io.out[PIPE_READ_END], &readfds)) {
if (poll(pfd, 2, 0) > 0) {
if (pfd[0].revents & POLLIN) {
slot_read_line(pslot, STDOUT_FILENO, save_string, out);
}
if (FD_ISSET(pslot->io.err[PIPE_READ_END], &readfds)) {
if (pfd[1].revents & POLLIN) {
slot_read_line(pslot, STDERR_FILENO, save_string, err);
}
}
Expand Down
2 changes: 2 additions & 0 deletions slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ slot {
int pid;
int exit_code;
bool alive;
int poll_index;
FILE* output;

sstring out_buff;
sstring err_buff;
Expand Down

0 comments on commit 6b0dbc5

Please sign in to comment.