Skip to content

Commit

Permalink
selftests: mptcp: mptfo Initiator/Listener
Browse files Browse the repository at this point in the history
This patch first adds TFO support in mptcp_connect.c.

This can be enabled via a new option: -o MPTFO.

Once enabled, the TCP_FASTOPEN socket option is enabled for the server
side and a sendto() with MSG_FASTOPEN is used instead of a connect() for
the client side.

Note that the first SYN has a limit of bytes it can carry. In other
words, it is allowed to send less data than the provided one. We then
need to track more status info to properly allow the next sendmsg()
starting from the next part of the data to send the rest.

Also in TFO scenarios, we need to completely spool the partially xmitted
buffer -- and account for that -- before starting sendfile/mmap xmit,
otherwise the relevant tests will fail.

Co-developed-by: Paolo Abeni <[email protected]>
Signed-off-by: Paolo Abeni <[email protected]>
Signed-off-by: Dmytro Shytyi <[email protected]>
Signed-off-by: Matthieu Baerts <[email protected]>
Signed-off-by: Jakub Kicinski <[email protected]>
  • Loading branch information
Dmytro Shytyi authored and kuba-moo committed Nov 30, 2022
1 parent cb99816 commit ca7ae89
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 42 deletions.
171 changes: 129 additions & 42 deletions tools/testing/selftests/net/mptcp/mptcp_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,21 @@ struct cfg_cmsg_types {

struct cfg_sockopt_types {
unsigned int transparent:1;
unsigned int mptfo:1;
};

struct tcp_inq_state {
unsigned int last;
bool expect_eof;
};

struct wstate {
char buf[8192];
unsigned int len;
unsigned int off;
unsigned int total_len;
};

static struct tcp_inq_state tcp_inq;

static struct cfg_cmsg_types cfg_cmsg_types;
Expand Down Expand Up @@ -232,6 +240,14 @@ static void set_transparent(int fd, int pf)
}
}

static void set_mptfo(int fd, int pf)
{
int qlen = 25;

if (setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &qlen, sizeof(qlen)) == -1)
perror("TCP_FASTOPEN");
}

static int do_ulp_so(int sock, const char *name)
{
return setsockopt(sock, IPPROTO_TCP, TCP_ULP, name, strlen(name));
Expand Down Expand Up @@ -300,6 +316,9 @@ static int sock_listen_mptcp(const char * const listenaddr,
if (cfg_sockopt_types.transparent)
set_transparent(sock, pf);

if (cfg_sockopt_types.mptfo)
set_mptfo(sock, pf);

if (bind(sock, a->ai_addr, a->ai_addrlen) == 0)
break; /* success */

Expand Down Expand Up @@ -330,13 +349,15 @@ static int sock_listen_mptcp(const char * const listenaddr,

static int sock_connect_mptcp(const char * const remoteaddr,
const char * const port, int proto,
struct addrinfo **peer)
struct addrinfo **peer,
int infd, struct wstate *winfo)
{
struct addrinfo hints = {
.ai_protocol = IPPROTO_TCP,
.ai_socktype = SOCK_STREAM,
};
struct addrinfo *a, *addr;
int syn_copied = 0;
int sock = -1;

hints.ai_family = pf;
Expand All @@ -354,14 +375,34 @@ static int sock_connect_mptcp(const char * const remoteaddr,
if (cfg_mark)
set_mark(sock, cfg_mark);

if (connect(sock, a->ai_addr, a->ai_addrlen) == 0) {
*peer = a;
break; /* success */
if (cfg_sockopt_types.mptfo) {
if (!winfo->total_len)
winfo->total_len = winfo->len = read(infd, winfo->buf,
sizeof(winfo->buf));

syn_copied = sendto(sock, winfo->buf, winfo->len, MSG_FASTOPEN,
a->ai_addr, a->ai_addrlen);
if (syn_copied >= 0) {
winfo->off = syn_copied;
winfo->len -= syn_copied;
*peer = a;
break; /* success */
}
} else {
if (connect(sock, a->ai_addr, a->ai_addrlen) == 0) {
*peer = a;
break; /* success */
}
}
if (cfg_sockopt_types.mptfo) {
perror("sendto()");
close(sock);
sock = -1;
} else {
perror("connect()");
close(sock);
sock = -1;
}

perror("connect()");
close(sock);
sock = -1;
}

freeaddrinfo(addr);
Expand Down Expand Up @@ -571,14 +612,14 @@ static void shut_wr(int fd)
shutdown(fd, SHUT_WR);
}

static int copyfd_io_poll(int infd, int peerfd, int outfd, bool *in_closed_after_out)
static int copyfd_io_poll(int infd, int peerfd, int outfd,
bool *in_closed_after_out, struct wstate *winfo)
{
struct pollfd fds = {
.fd = peerfd,
.events = POLLIN | POLLOUT,
};
unsigned int woff = 0, wlen = 0, total_wlen = 0, total_rlen = 0;
char wbuf[8192];
unsigned int total_wlen = 0, total_rlen = 0;

set_nonblock(peerfd, true);

Expand Down Expand Up @@ -638,30 +679,30 @@ static int copyfd_io_poll(int infd, int peerfd, int outfd, bool *in_closed_after
}

if (fds.revents & POLLOUT) {
if (wlen == 0) {
woff = 0;
wlen = read(infd, wbuf, sizeof(wbuf));
if (winfo->len == 0) {
winfo->off = 0;
winfo->len = read(infd, winfo->buf, sizeof(winfo->buf));
}

if (wlen > 0) {
if (winfo->len > 0) {
ssize_t bw;

/* limit the total amount of written data to the trunc value */
if (cfg_truncate > 0 && wlen + total_wlen > cfg_truncate)
wlen = cfg_truncate - total_wlen;
if (cfg_truncate > 0 && winfo->len + total_wlen > cfg_truncate)
winfo->len = cfg_truncate - total_wlen;

bw = do_rnd_write(peerfd, wbuf + woff, wlen);
bw = do_rnd_write(peerfd, winfo->buf + winfo->off, winfo->len);
if (bw < 0) {
if (cfg_rcv_trunc)
return 0;
perror("write");
return 111;
}

woff += bw;
wlen -= bw;
winfo->off += bw;
winfo->len -= bw;
total_wlen += bw;
} else if (wlen == 0) {
} else if (winfo->len == 0) {
/* We have no more data to send. */
fds.events &= ~POLLOUT;

Expand Down Expand Up @@ -717,18 +758,38 @@ static int do_recvfile(int infd, int outfd)
return (int)r;
}

static int do_mmap(int infd, int outfd, unsigned int size)
static int spool_buf(int fd, struct wstate *winfo)
{
while (winfo->len) {
int ret = write(fd, winfo->buf + winfo->off, winfo->len);

if (ret < 0) {
perror("write");
return 4;
}
winfo->off += ret;
winfo->len -= ret;
}
return 0;
}

static int do_mmap(int infd, int outfd, unsigned int size,
struct wstate *winfo)
{
char *inbuf = mmap(NULL, size, PROT_READ, MAP_SHARED, infd, 0);
ssize_t ret = 0, off = 0;
ssize_t ret = 0, off = winfo->total_len;
size_t rem;

if (inbuf == MAP_FAILED) {
perror("mmap");
return 1;
}

rem = size;
ret = spool_buf(outfd, winfo);
if (ret < 0)
return ret;

rem = size - winfo->total_len;

while (rem > 0) {
ret = write(outfd, inbuf + off, rem);
Expand Down Expand Up @@ -772,8 +833,16 @@ static int get_infd_size(int fd)
return (int)count;
}

static int do_sendfile(int infd, int outfd, unsigned int count)
static int do_sendfile(int infd, int outfd, unsigned int count,
struct wstate *winfo)
{
int ret = spool_buf(outfd, winfo);

if (ret < 0)
return ret;

count -= winfo->total_len;

while (count > 0) {
ssize_t r;

Expand All @@ -790,7 +859,8 @@ static int do_sendfile(int infd, int outfd, unsigned int count)
}

static int copyfd_io_mmap(int infd, int peerfd, int outfd,
unsigned int size, bool *in_closed_after_out)
unsigned int size, bool *in_closed_after_out,
struct wstate *winfo)
{
int err;

Expand All @@ -799,9 +869,9 @@ static int copyfd_io_mmap(int infd, int peerfd, int outfd,
if (err)
return err;

err = do_mmap(infd, peerfd, size);
err = do_mmap(infd, peerfd, size, winfo);
} else {
err = do_mmap(infd, peerfd, size);
err = do_mmap(infd, peerfd, size, winfo);
if (err)
return err;

Expand All @@ -815,7 +885,7 @@ static int copyfd_io_mmap(int infd, int peerfd, int outfd,
}

static int copyfd_io_sendfile(int infd, int peerfd, int outfd,
unsigned int size, bool *in_closed_after_out)
unsigned int size, bool *in_closed_after_out, struct wstate *winfo)
{
int err;

Expand All @@ -824,9 +894,9 @@ static int copyfd_io_sendfile(int infd, int peerfd, int outfd,
if (err)
return err;

err = do_sendfile(infd, peerfd, size);
err = do_sendfile(infd, peerfd, size, winfo);
} else {
err = do_sendfile(infd, peerfd, size);
err = do_sendfile(infd, peerfd, size, winfo);
if (err)
return err;

Expand All @@ -839,7 +909,7 @@ static int copyfd_io_sendfile(int infd, int peerfd, int outfd,
return err;
}

static int copyfd_io(int infd, int peerfd, int outfd, bool close_peerfd)
static int copyfd_io(int infd, int peerfd, int outfd, bool close_peerfd, struct wstate *winfo)
{
bool in_closed_after_out = false;
struct timespec start, end;
Expand All @@ -851,21 +921,24 @@ static int copyfd_io(int infd, int peerfd, int outfd, bool close_peerfd)

switch (cfg_mode) {
case CFG_MODE_POLL:
ret = copyfd_io_poll(infd, peerfd, outfd, &in_closed_after_out);
ret = copyfd_io_poll(infd, peerfd, outfd, &in_closed_after_out,
winfo);
break;

case CFG_MODE_MMAP:
file_size = get_infd_size(infd);
if (file_size < 0)
return file_size;
ret = copyfd_io_mmap(infd, peerfd, outfd, file_size, &in_closed_after_out);
ret = copyfd_io_mmap(infd, peerfd, outfd, file_size,
&in_closed_after_out, winfo);
break;

case CFG_MODE_SENDFILE:
file_size = get_infd_size(infd);
if (file_size < 0)
return file_size;
ret = copyfd_io_sendfile(infd, peerfd, outfd, file_size, &in_closed_after_out);
ret = copyfd_io_sendfile(infd, peerfd, outfd, file_size,
&in_closed_after_out, winfo);
break;

default:
Expand Down Expand Up @@ -999,6 +1072,7 @@ static void maybe_close(int fd)
int main_loop_s(int listensock)
{
struct sockaddr_storage ss;
struct wstate winfo;
struct pollfd polls;
socklen_t salen;
int remotesock;
Expand Down Expand Up @@ -1033,7 +1107,8 @@ int main_loop_s(int listensock)

SOCK_TEST_TCPULP(remotesock, 0);

copyfd_io(fd, remotesock, 1, true);
memset(&winfo, 0, sizeof(winfo));
copyfd_io(fd, remotesock, 1, true, &winfo);
} else {
perror("accept");
return 1;
Expand Down Expand Up @@ -1130,6 +1205,11 @@ static void parse_setsock_options(const char *name)
return;
}

if (strncmp(name, "MPTFO", len) == 0) {
cfg_sockopt_types.mptfo = 1;
return;
}

fprintf(stderr, "Unrecognized setsockopt option %s\n", name);
exit(1);
}
Expand Down Expand Up @@ -1166,11 +1246,18 @@ void xdisconnect(int fd, int addrlen)

int main_loop(void)
{
int fd, ret, fd_in = 0;
int fd = 0, ret, fd_in = 0;
struct addrinfo *peer;
struct wstate winfo;

if (cfg_input && cfg_sockopt_types.mptfo) {
fd_in = open(cfg_input, O_RDONLY);
if (fd < 0)
xerror("can't open %s:%d", cfg_input, errno);
}

/* listener is ready. */
fd = sock_connect_mptcp(cfg_host, cfg_port, cfg_sock_proto, &peer);
memset(&winfo, 0, sizeof(winfo));
fd = sock_connect_mptcp(cfg_host, cfg_port, cfg_sock_proto, &peer, fd_in, &winfo);
if (fd < 0)
return 2;

Expand All @@ -1186,14 +1273,13 @@ int main_loop(void)
if (cfg_cmsg_types.cmsg_enabled)
apply_cmsg_types(fd, &cfg_cmsg_types);

if (cfg_input) {
if (cfg_input && !cfg_sockopt_types.mptfo) {
fd_in = open(cfg_input, O_RDONLY);
if (fd < 0)
xerror("can't open %s:%d", cfg_input, errno);
}

/* close the client socket open only if we are not going to reconnect */
ret = copyfd_io(fd_in, fd, 1, 0);
ret = copyfd_io(fd_in, fd, 1, 0, &winfo);
if (ret)
return ret;

Expand All @@ -1210,6 +1296,7 @@ int main_loop(void)
xerror("can't reconnect: %d", errno);
if (cfg_input)
close(fd_in);
memset(&winfo, 0, sizeof(winfo));
goto again;
} else {
close(fd);
Expand Down
Loading

0 comments on commit ca7ae89

Please sign in to comment.