Skip to content

Commit

Permalink
Initial version of multi-threaded writers.
Browse files Browse the repository at this point in the history
* Create iperf_send_mt and iperf_recv_mt, the multi-threaded versions
  of network I/O functions. These handle a single connection only and
  do not attempt to coordinate timing (for flow control) with any
  other threads.

* Make client and server thread functions call the new multi-threaded
  network I/O functions.

* Remove all network I/O for the test streams from the main thread.

* fd_set objects in test object only apply to sockets used by the main
  thread (not the test streams in the worker threads).

Outstanding issues:

* No locking of shared data structures at this point. Correctness may
  be compromised at this point.

* Worker threads on the sender side will tend to busy-wait because
  they do not attempt to sleep while attempting to pace themeselves.

* No support (for now) for ending conditions other than time-based
  (packet-based and byte-based don't work).
  • Loading branch information
bmah888 committed Nov 8, 2023
1 parent fc86f85 commit 0755cc4
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 101 deletions.
40 changes: 14 additions & 26 deletions src/iperf_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1861,10 +1861,8 @@ iperf_check_throttle(struct iperf_stream *sp, struct iperf_time *nowP)
bits_per_second = sp->result->bytes_sent * 8 / seconds;
if (bits_per_second < sp->test->settings->rate) {
sp->green_light = 1;
FD_SET(sp->socket, &sp->test->write_set);
} else {
sp->green_light = 0;
FD_CLR(sp->socket, &sp->test->write_set);
}
}

Expand Down Expand Up @@ -1909,10 +1907,10 @@ iperf_check_total_rate(struct iperf_test *test, iperf_size_t last_interval_bytes
}

int
iperf_send(struct iperf_test *test, fd_set *write_setP)
iperf_send_mt(struct iperf_stream *sp)
{
register int multisend, r, streams_active;
register struct iperf_stream *sp;
register struct iperf_test *test = sp->test;
struct iperf_time now;
int no_throttle_check;

Expand All @@ -1931,13 +1929,14 @@ iperf_send(struct iperf_test *test, fd_set *write_setP)
if (no_throttle_check)
iperf_time_now(&now);
streams_active = 0;
SLIST_FOREACH(sp, &test->streams, streams) {
if ((sp->green_light && sp->sender &&
(write_setP == NULL || FD_ISSET(sp->socket, write_setP)))) {
if (multisend > 1 && test->settings->bytes != 0 && test->bytes_sent >= test->settings->bytes)
break;
if (multisend > 1 && test->settings->blocks != 0 && test->blocks_sent >= test->settings->blocks)
break;
{
if (sp->green_light && sp->sender) {
// XXX If we hit one of these ending conditions maybe
// want to stop even trying to send something?
if (multisend > 1 && test->settings->bytes != 0 && test->bytes_sent >= test->settings->bytes)
break;
if (multisend > 1 && test->settings->blocks != 0 && test->blocks_sent >= test->settings->blocks)
break;
if ((r = sp->snd(sp)) < 0) {
if (r == NET_SOFTERROR)
break;
Expand All @@ -1957,35 +1956,24 @@ iperf_send(struct iperf_test *test, fd_set *write_setP)
}
if (!no_throttle_check) { /* Throttle check if was not checked for each send */
iperf_time_now(&now);
SLIST_FOREACH(sp, &test->streams, streams)
if (sp->sender)
iperf_check_throttle(sp, &now);
if (sp->sender)
iperf_check_throttle(sp, &now);
}
if (write_setP != NULL)
SLIST_FOREACH(sp, &test->streams, streams)
if (FD_ISSET(sp->socket, write_setP))
FD_CLR(sp->socket, write_setP);

return 0;
}

int
iperf_recv(struct iperf_test *test, fd_set *read_setP)
iperf_recv_mt(struct iperf_stream *sp)
{
int r;
struct iperf_stream *sp;
struct iperf_test *test = sp->test;

SLIST_FOREACH(sp, &test->streams, streams) {
if (FD_ISSET(sp->socket, read_setP) && !sp->sender) {
if ((r = sp->rcv(sp)) < 0) {
i_errno = IESTREAMREAD;
return r;
}
test->bytes_received += r;
++test->blocks_received;
FD_CLR(sp->socket, read_setP);
}
}

return 0;
}
Expand Down
4 changes: 2 additions & 2 deletions src/iperf_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ void build_tcpinfo_message(struct iperf_interval_results *r, char *message);

int iperf_set_send_state(struct iperf_test *test, signed char state);
void iperf_check_throttle(struct iperf_stream *sp, struct iperf_time *nowP);
int iperf_send(struct iperf_test *, fd_set *) /* __attribute__((hot)) */;
int iperf_recv(struct iperf_test *, fd_set *);
int iperf_send_mt(struct iperf_stream *) /* __attribute__((hot)) */;
int iperf_recv_mt(struct iperf_stream *);
void iperf_catch_sigend(void (*handler)(int));
void iperf_got_sigend(struct iperf_test *test) __attribute__ ((noreturn));
void usage(void);
Expand Down
54 changes: 16 additions & 38 deletions src/iperf_client_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,28 @@
#endif /* HAVE_TCP_CONGESTION */

void *
iperf_client_worker_start(void *s) {
iperf_client_worker_run(void *s) {
struct iperf_stream *sp = (struct iperf_stream *) s;
struct iperf_test *test = sp->test;

while (! (test->done)) {
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d\n", sp->socket);
if (sp->sender) {
if (iperf_send_mt(sp) < 0) {
goto cleanup_and_fail;
}
}
else {
if (iperf_recv_mt(sp) < 0) {
goto cleanup_and_fail;
}
}
sleep(1);
}
return NULL;

cleanup_and_fail:
/* XXX */
test->done = 0;
return NULL;
}

int
Expand Down Expand Up @@ -129,12 +140,6 @@ iperf_create_streams(struct iperf_test *test, int sender)
}
#endif /* HAVE_TCP_CONGESTION */

if (sender)
FD_SET(s, &test->write_set);
else
FD_SET(s, &test->read_set);
if (s > test->max_fd) test->max_fd = s;

sp = iperf_new_stream(test, s, sender);
if (!sp)
return -1;
Expand Down Expand Up @@ -643,7 +648,7 @@ iperf_run_client(struct iperf_test * test)
}

SLIST_FOREACH(sp, &test->streams, streams) {
if (pthread_create(&(sp->thr), &attr, &iperf_client_worker_start, sp) != 0) {
if (pthread_create(&(sp->thr), &attr, &iperf_client_worker_run, sp) != 0) {
i_errno = IEPTHREADCREATE;
goto cleanup_and_fail;
}
Expand All @@ -667,24 +672,6 @@ iperf_run_client(struct iperf_test * test)
}
}


if (test->mode == BIDIRECTIONAL)
{
if (iperf_send(test, &write_set) < 0)
goto cleanup_and_fail;
if (iperf_recv(test, &read_set) < 0)
goto cleanup_and_fail;
} else if (test->mode == SENDER) {
// Regular mode. Client sends.
if (iperf_send(test, &write_set) < 0)
goto cleanup_and_fail;
} else {
// Reverse mode. Client receives.
if (iperf_recv(test, &read_set) < 0)
goto cleanup_and_fail;
}


/* Run the timers. */
iperf_time_now(&now);
tmr_run(&now);
Expand Down Expand Up @@ -736,15 +723,6 @@ iperf_run_client(struct iperf_test * test)
goto cleanup_and_fail;
}
}
// If we're in reverse mode, continue draining the data
// connection(s) even if test is over. This prevents a
// deadlock where the server side fills up its pipe(s)
// and gets blocked, so it can't receive state changes
// from the client side.
else if (test->mode == RECEIVER && test->state == TEST_END) {
if (iperf_recv(test, &read_set) < 0)
goto cleanup_and_fail;
}
}

/* Cancel receiver threads */
Expand Down
51 changes: 16 additions & 35 deletions src/iperf_server_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,28 @@
#endif /* HAVE_TCP_CONGESTION */

void *
iperf_server_worker_start(void *s) {
iperf_server_worker_run(void *s) {
struct iperf_stream *sp = (struct iperf_stream *) s;
struct iperf_test *test = sp->test;

while (! (test->done)) {
if (test->debug_level >= DEBUG_LEVEL_INFO) {
iperf_printf(test, "Thread FD %d\n", sp->socket);
if (sp->sender) {
if (iperf_send_mt(sp) < 0) {
goto cleanup_and_fail;
}
}
else {
if (iperf_recv_mt(sp) < 0) {
goto cleanup_and_fail;
}
}
sleep(1);
}
return NULL;

cleanup_and_fail:
/* XXX */
test->done = 0;
return NULL;
}

int
Expand Down Expand Up @@ -746,11 +757,6 @@ iperf_run_server(struct iperf_test *test)
return -1;
}

if (sp->sender)
FD_SET(s, &test->write_set);
else
FD_SET(s, &test->read_set);

if (s > test->max_fd) test->max_fd = s;

/*
Expand Down Expand Up @@ -844,7 +850,7 @@ iperf_run_server(struct iperf_test *test)
};

SLIST_FOREACH(sp, &test->streams, streams) {
if (pthread_create(&(sp->thr), &attr, &iperf_server_worker_start, sp) != 0) {
if (pthread_create(&(sp->thr), &attr, &iperf_server_worker_run, sp) != 0) {
i_errno = IEPTHREADCREATE;
cleanup_server(test);
return -1;
Expand All @@ -862,31 +868,6 @@ iperf_run_server(struct iperf_test *test)
};
}
}

if (test->state == TEST_RUNNING) {
if (test->mode == BIDIRECTIONAL) {
if (iperf_recv(test, &read_set) < 0) {
cleanup_server(test);
return -1;
}
if (iperf_send(test, &write_set) < 0) {
cleanup_server(test);
return -1;
}
} else if (test->mode == SENDER) {
// Reverse mode. Server sends.
if (iperf_send(test, &write_set) < 0) {
cleanup_server(test);
return -1;
}
} else {
// Regular mode. Server receives.
if (iperf_recv(test, &read_set) < 0) {
cleanup_server(test);
return -1;
}
}
}
}

if (result == 0 ||
Expand Down

0 comments on commit 0755cc4

Please sign in to comment.