diff --git a/src/iperf_api.c b/src/iperf_api.c index df2f7d5d5..561a2cab6 100644 --- a/src/iperf_api.c +++ b/src/iperf_api.c @@ -1861,10 +1861,8 @@ iperf_check_throttle(struct iperf_stream *sp, struct iperf_time *nowP) bits_per_second = sp->result->bytes_sent * 8 / seconds; if (bits_per_second < sp->test->settings->rate) { sp->green_light = 1; - FD_SET(sp->socket, &sp->test->write_set); } else { sp->green_light = 0; - FD_CLR(sp->socket, &sp->test->write_set); } } @@ -1909,10 +1907,10 @@ iperf_check_total_rate(struct iperf_test *test, iperf_size_t last_interval_bytes } int -iperf_send(struct iperf_test *test, fd_set *write_setP) +iperf_send_mt(struct iperf_stream *sp) { register int multisend, r, streams_active; - register struct iperf_stream *sp; + register struct iperf_test *test = sp->test; struct iperf_time now; int no_throttle_check; @@ -1931,13 +1929,14 @@ iperf_send(struct iperf_test *test, fd_set *write_setP) if (no_throttle_check) iperf_time_now(&now); streams_active = 0; - SLIST_FOREACH(sp, &test->streams, streams) { - if ((sp->green_light && sp->sender && - (write_setP == NULL || FD_ISSET(sp->socket, write_setP)))) { - if (multisend > 1 && test->settings->bytes != 0 && test->bytes_sent >= test->settings->bytes) - break; - if (multisend > 1 && test->settings->blocks != 0 && test->blocks_sent >= test->settings->blocks) - break; + { + if (sp->green_light && sp->sender) { + // XXX If we hit one of these ending conditions maybe + // want to stop even trying to send something? + if (multisend > 1 && test->settings->bytes != 0 && test->bytes_sent >= test->settings->bytes) + break; + if (multisend > 1 && test->settings->blocks != 0 && test->blocks_sent >= test->settings->blocks) + break; if ((r = sp->snd(sp)) < 0) { if (r == NET_SOFTERROR) break; @@ -1957,35 +1956,24 @@ iperf_send(struct iperf_test *test, fd_set *write_setP) } if (!no_throttle_check) { /* Throttle check if was not checked for each send */ iperf_time_now(&now); - SLIST_FOREACH(sp, &test->streams, streams) - if (sp->sender) - iperf_check_throttle(sp, &now); + if (sp->sender) + iperf_check_throttle(sp, &now); } - if (write_setP != NULL) - SLIST_FOREACH(sp, &test->streams, streams) - if (FD_ISSET(sp->socket, write_setP)) - FD_CLR(sp->socket, write_setP); - return 0; } int -iperf_recv(struct iperf_test *test, fd_set *read_setP) +iperf_recv_mt(struct iperf_stream *sp) { int r; - struct iperf_stream *sp; + struct iperf_test *test = sp->test; - SLIST_FOREACH(sp, &test->streams, streams) { - if (FD_ISSET(sp->socket, read_setP) && !sp->sender) { if ((r = sp->rcv(sp)) < 0) { i_errno = IESTREAMREAD; return r; } test->bytes_received += r; ++test->blocks_received; - FD_CLR(sp->socket, read_setP); - } - } return 0; } diff --git a/src/iperf_api.h b/src/iperf_api.h index a6756e8de..ed991e12b 100644 --- a/src/iperf_api.h +++ b/src/iperf_api.h @@ -310,8 +310,8 @@ void build_tcpinfo_message(struct iperf_interval_results *r, char *message); int iperf_set_send_state(struct iperf_test *test, signed char state); void iperf_check_throttle(struct iperf_stream *sp, struct iperf_time *nowP); -int iperf_send(struct iperf_test *, fd_set *) /* __attribute__((hot)) */; -int iperf_recv(struct iperf_test *, fd_set *); +int iperf_send_mt(struct iperf_stream *) /* __attribute__((hot)) */; +int iperf_recv_mt(struct iperf_stream *); void iperf_catch_sigend(void (*handler)(int)); void iperf_got_sigend(struct iperf_test *test) __attribute__ ((noreturn)); void usage(void); diff --git a/src/iperf_client_api.c b/src/iperf_client_api.c index 6e6afb698..b10e7e4e7 100644 --- a/src/iperf_client_api.c +++ b/src/iperf_client_api.c @@ -52,17 +52,28 @@ #endif /* HAVE_TCP_CONGESTION */ void * -iperf_client_worker_start(void *s) { +iperf_client_worker_run(void *s) { struct iperf_stream *sp = (struct iperf_stream *) s; struct iperf_test *test = sp->test; while (! (test->done)) { - if (test->debug_level >= DEBUG_LEVEL_INFO) { - iperf_printf(test, "Thread FD %d\n", sp->socket); + if (sp->sender) { + if (iperf_send_mt(sp) < 0) { + goto cleanup_and_fail; + } + } + else { + if (iperf_recv_mt(sp) < 0) { + goto cleanup_and_fail; + } } - sleep(1); } return NULL; + + cleanup_and_fail: + /* XXX */ + test->done = 0; + return NULL; } int @@ -129,12 +140,6 @@ iperf_create_streams(struct iperf_test *test, int sender) } #endif /* HAVE_TCP_CONGESTION */ - if (sender) - FD_SET(s, &test->write_set); - else - FD_SET(s, &test->read_set); - if (s > test->max_fd) test->max_fd = s; - sp = iperf_new_stream(test, s, sender); if (!sp) return -1; @@ -643,7 +648,7 @@ iperf_run_client(struct iperf_test * test) } SLIST_FOREACH(sp, &test->streams, streams) { - if (pthread_create(&(sp->thr), &attr, &iperf_client_worker_start, sp) != 0) { + if (pthread_create(&(sp->thr), &attr, &iperf_client_worker_run, sp) != 0) { i_errno = IEPTHREADCREATE; goto cleanup_and_fail; } @@ -667,24 +672,6 @@ iperf_run_client(struct iperf_test * test) } } - - if (test->mode == BIDIRECTIONAL) - { - if (iperf_send(test, &write_set) < 0) - goto cleanup_and_fail; - if (iperf_recv(test, &read_set) < 0) - goto cleanup_and_fail; - } else if (test->mode == SENDER) { - // Regular mode. Client sends. - if (iperf_send(test, &write_set) < 0) - goto cleanup_and_fail; - } else { - // Reverse mode. Client receives. - if (iperf_recv(test, &read_set) < 0) - goto cleanup_and_fail; - } - - /* Run the timers. */ iperf_time_now(&now); tmr_run(&now); @@ -736,15 +723,6 @@ iperf_run_client(struct iperf_test * test) goto cleanup_and_fail; } } - // If we're in reverse mode, continue draining the data - // connection(s) even if test is over. This prevents a - // deadlock where the server side fills up its pipe(s) - // and gets blocked, so it can't receive state changes - // from the client side. - else if (test->mode == RECEIVER && test->state == TEST_END) { - if (iperf_recv(test, &read_set) < 0) - goto cleanup_and_fail; - } } /* Cancel receiver threads */ diff --git a/src/iperf_server_api.c b/src/iperf_server_api.c index 33f4c0237..2be72cae7 100644 --- a/src/iperf_server_api.c +++ b/src/iperf_server_api.c @@ -67,17 +67,28 @@ #endif /* HAVE_TCP_CONGESTION */ void * -iperf_server_worker_start(void *s) { +iperf_server_worker_run(void *s) { struct iperf_stream *sp = (struct iperf_stream *) s; struct iperf_test *test = sp->test; while (! (test->done)) { - if (test->debug_level >= DEBUG_LEVEL_INFO) { - iperf_printf(test, "Thread FD %d\n", sp->socket); + if (sp->sender) { + if (iperf_send_mt(sp) < 0) { + goto cleanup_and_fail; + } + } + else { + if (iperf_recv_mt(sp) < 0) { + goto cleanup_and_fail; + } } - sleep(1); } return NULL; + + cleanup_and_fail: + /* XXX */ + test->done = 0; + return NULL; } int @@ -746,11 +757,6 @@ iperf_run_server(struct iperf_test *test) return -1; } - if (sp->sender) - FD_SET(s, &test->write_set); - else - FD_SET(s, &test->read_set); - if (s > test->max_fd) test->max_fd = s; /* @@ -844,7 +850,7 @@ iperf_run_server(struct iperf_test *test) }; SLIST_FOREACH(sp, &test->streams, streams) { - if (pthread_create(&(sp->thr), &attr, &iperf_server_worker_start, sp) != 0) { + if (pthread_create(&(sp->thr), &attr, &iperf_server_worker_run, sp) != 0) { i_errno = IEPTHREADCREATE; cleanup_server(test); return -1; @@ -862,31 +868,6 @@ iperf_run_server(struct iperf_test *test) }; } } - - if (test->state == TEST_RUNNING) { - if (test->mode == BIDIRECTIONAL) { - if (iperf_recv(test, &read_set) < 0) { - cleanup_server(test); - return -1; - } - if (iperf_send(test, &write_set) < 0) { - cleanup_server(test); - return -1; - } - } else if (test->mode == SENDER) { - // Reverse mode. Server sends. - if (iperf_send(test, &write_set) < 0) { - cleanup_server(test); - return -1; - } - } else { - // Regular mode. Server receives. - if (iperf_recv(test, &read_set) < 0) { - cleanup_server(test); - return -1; - } - } - } } if (result == 0 ||