Skip to content

Commit

Permalink
net: stream: add a new option to automatically reconnect
Browse files Browse the repository at this point in the history
In stream mode, if the server shuts down there is currently
no way to reconnect the client to a new server without removing
the NIC device and the netdev backend (or to reboot).

This patch introduces a reconnect option that specifies a delay
to try to reconnect with the same parameters.

Add a new test in qtest to test the reconnect option and the
connect/disconnect events.

Signed-off-by: Laurent Vivier <[email protected]>
Signed-off-by: Jason Wang <[email protected]>
  • Loading branch information
vivier authored and jasowang committed Feb 17, 2023
1 parent 993f71e commit 148fbf0
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 5 deletions.
53 changes: 52 additions & 1 deletion net/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
#include "io/channel-socket.h"
#include "io/net-listener.h"
#include "qapi/qapi-events-net.h"
#include "qapi/qapi-visit-sockets.h"
#include "qapi/clone-visitor.h"

typedef struct NetStreamState {
NetClientState nc;
Expand All @@ -49,11 +51,15 @@ typedef struct NetStreamState {
guint ioc_write_tag;
SocketReadState rs;
unsigned int send_index; /* number of bytes sent*/
uint32_t reconnect;
guint timer_tag;
SocketAddress *addr;
} NetStreamState;

static void net_stream_listen(QIONetListener *listener,
QIOChannelSocket *cioc,
void *opaque);
static void net_stream_arm_reconnect(NetStreamState *s);

static gboolean net_stream_writable(QIOChannel *ioc,
GIOCondition condition,
Expand Down Expand Up @@ -170,6 +176,7 @@ static gboolean net_stream_send(QIOChannel *ioc,
qemu_set_info_str(&s->nc, "%s", "");

qapi_event_send_netdev_stream_disconnected(s->nc.name);
net_stream_arm_reconnect(s);

return G_SOURCE_REMOVE;
}
Expand All @@ -187,6 +194,14 @@ static gboolean net_stream_send(QIOChannel *ioc,
static void net_stream_cleanup(NetClientState *nc)
{
NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
if (s->timer_tag) {
g_source_remove(s->timer_tag);
s->timer_tag = 0;
}
if (s->addr) {
qapi_free_SocketAddress(s->addr);
s->addr = NULL;
}
if (s->ioc) {
if (QIO_CHANNEL_SOCKET(s->ioc)->fd != -1) {
if (s->ioc_read_tag) {
Expand Down Expand Up @@ -346,12 +361,37 @@ static void net_stream_client_connected(QIOTask *task, gpointer opaque)
error:
object_unref(OBJECT(s->ioc));
s->ioc = NULL;
net_stream_arm_reconnect(s);
}

static gboolean net_stream_reconnect(gpointer data)
{
NetStreamState *s = data;
QIOChannelSocket *sioc;

s->timer_tag = 0;

sioc = qio_channel_socket_new();
s->ioc = QIO_CHANNEL(sioc);
qio_channel_socket_connect_async(sioc, s->addr,
net_stream_client_connected, s,
NULL, NULL);
return G_SOURCE_REMOVE;
}

static void net_stream_arm_reconnect(NetStreamState *s)
{
if (s->reconnect && s->timer_tag == 0) {
s->timer_tag = g_timeout_add_seconds(s->reconnect,
net_stream_reconnect, s);
}
}

static int net_stream_client_init(NetClientState *peer,
const char *model,
const char *name,
SocketAddress *addr,
uint32_t reconnect,
Error **errp)
{
NetStreamState *s;
Expand All @@ -364,6 +404,10 @@ static int net_stream_client_init(NetClientState *peer,
s->ioc = QIO_CHANNEL(sioc);
s->nc.link_down = true;

s->reconnect = reconnect;
if (reconnect) {
s->addr = QAPI_CLONE(SocketAddress, addr);
}
qio_channel_socket_connect_async(sioc, addr,
net_stream_client_connected, s,
NULL, NULL);
Expand All @@ -380,7 +424,14 @@ int net_init_stream(const Netdev *netdev, const char *name,
sock = &netdev->u.stream;

if (!sock->has_server || !sock->server) {
return net_stream_client_init(peer, "stream", name, sock->addr, errp);
return net_stream_client_init(peer, "stream", name, sock->addr,
sock->has_reconnect ? sock->reconnect : 0,
errp);
}
if (sock->has_reconnect) {
error_setg(errp, "'reconnect' option is incompatible with "
"socket in server mode");
return -1;
}
return net_stream_server_init(peer, "stream", name, sock->addr, errp);
}
7 changes: 6 additions & 1 deletion qapi/net.json
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,10 @@
# @addr: socket address to listen on (server=true)
# or connect to (server=false)
# @server: create server socket (default: false)
# @reconnect: For a client socket, if a socket is disconnected,
# then attempt a reconnect after the given number of seconds.
# Setting this to zero disables this function. (default: 0)
# (since 8.0)
#
# Only SocketAddress types 'unix', 'inet' and 'fd' are supported.
#
Expand All @@ -593,7 +597,8 @@
{ 'struct': 'NetdevStreamOptions',
'data': {
'addr': 'SocketAddress',
'*server': 'bool' } }
'*server': 'bool',
'*reconnect': 'uint32' } }

##
# @NetdevDgramOptions:
Expand Down
6 changes: 3 additions & 3 deletions qemu-options.hx
Original file line number Diff line number Diff line change
Expand Up @@ -2762,9 +2762,9 @@ DEF("netdev", HAS_ARG, QEMU_OPTION_netdev,
"-netdev socket,id=str[,fd=h][,udp=host:port][,localaddr=host:port]\n"
" configure a network backend to connect to another network\n"
" using an UDP tunnel\n"
"-netdev stream,id=str[,server=on|off],addr.type=inet,addr.host=host,addr.port=port[,to=maxport][,numeric=on|off][,keep-alive=on|off][,mptcp=on|off][,addr.ipv4=on|off][,addr.ipv6=on|off]\n"
"-netdev stream,id=str[,server=on|off],addr.type=unix,addr.path=path[,abstract=on|off][,tight=on|off]\n"
"-netdev stream,id=str[,server=on|off],addr.type=fd,addr.str=file-descriptor\n"
"-netdev stream,id=str[,server=on|off],addr.type=inet,addr.host=host,addr.port=port[,to=maxport][,numeric=on|off][,keep-alive=on|off][,mptcp=on|off][,addr.ipv4=on|off][,addr.ipv6=on|off][,reconnect=seconds]\n"
"-netdev stream,id=str[,server=on|off],addr.type=unix,addr.path=path[,abstract=on|off][,tight=on|off][,reconnect=seconds]\n"
"-netdev stream,id=str[,server=on|off],addr.type=fd,addr.str=file-descriptor[,reconnect=seconds]\n"
" configure a network backend to connect to another network\n"
" using a socket connection in stream mode.\n"
"-netdev dgram,id=str,remote.type=inet,remote.host=maddr,remote.port=port[,local.type=inet,local.host=addr]\n"
Expand Down
101 changes: 101 additions & 0 deletions tests/qtest/netdev-socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
#include <glib/gstdio.h>
#include "../unit/socket-helpers.h"
#include "libqtest.h"
#include "qapi/qmp/qstring.h"
#include "qemu/sockets.h"
#include "qapi/qobject-input-visitor.h"
#include "qapi/qapi-visit-sockets.h"

#define CONNECTION_TIMEOUT 60

Expand Down Expand Up @@ -142,6 +146,101 @@ static void test_stream_inet_ipv4(void)
qtest_quit(qts0);
}

static void wait_stream_connected(QTestState *qts, const char *id,
SocketAddress **addr)
{
QDict *resp, *data;
QString *qstr;
QObject *obj;
Visitor *v = NULL;

resp = qtest_qmp_eventwait_ref(qts, "NETDEV_STREAM_CONNECTED");
g_assert_nonnull(resp);
data = qdict_get_qdict(resp, "data");
g_assert_nonnull(data);

qstr = qobject_to(QString, qdict_get(data, "netdev-id"));
g_assert_nonnull(data);

g_assert(!strcmp(qstring_get_str(qstr), id));

obj = qdict_get(data, "addr");

v = qobject_input_visitor_new(obj);
visit_type_SocketAddress(v, NULL, addr, NULL);
visit_free(v);
qobject_unref(resp);
}

static void wait_stream_disconnected(QTestState *qts, const char *id)
{
QDict *resp, *data;
QString *qstr;

resp = qtest_qmp_eventwait_ref(qts, "NETDEV_STREAM_DISCONNECTED");
g_assert_nonnull(resp);
data = qdict_get_qdict(resp, "data");
g_assert_nonnull(data);

qstr = qobject_to(QString, qdict_get(data, "netdev-id"));
g_assert_nonnull(data);

g_assert(!strcmp(qstring_get_str(qstr), id));
qobject_unref(resp);
}

static void test_stream_inet_reconnect(void)
{
QTestState *qts0, *qts1;
int port;
SocketAddress *addr;

port = inet_get_free_port(false);
qts0 = qtest_initf("-nodefaults -M none "
"-netdev stream,id=st0,server=true,addr.type=inet,"
"addr.ipv4=on,addr.ipv6=off,"
"addr.host=127.0.0.1,addr.port=%d", port);

EXPECT_STATE(qts0, "st0: index=0,type=stream,\r\n", 0);

qts1 = qtest_initf("-nodefaults -M none "
"-netdev stream,server=false,id=st0,addr.type=inet,"
"addr.ipv4=on,addr.ipv6=off,reconnect=1,"
"addr.host=127.0.0.1,addr.port=%d", port);

wait_stream_connected(qts0, "st0", &addr);
g_assert_cmpint(addr->type, ==, SOCKET_ADDRESS_TYPE_INET);
g_assert_cmpstr(addr->u.inet.host, ==, "127.0.0.1");
qapi_free_SocketAddress(addr);

/* kill server */
qtest_quit(qts0);

/* check client has been disconnected */
wait_stream_disconnected(qts1, "st0");

/* restart server */
qts0 = qtest_initf("-nodefaults -M none "
"-netdev stream,id=st0,server=true,addr.type=inet,"
"addr.ipv4=on,addr.ipv6=off,"
"addr.host=127.0.0.1,addr.port=%d", port);

/* wait connection events*/
wait_stream_connected(qts0, "st0", &addr);
g_assert_cmpint(addr->type, ==, SOCKET_ADDRESS_TYPE_INET);
g_assert_cmpstr(addr->u.inet.host, ==, "127.0.0.1");
qapi_free_SocketAddress(addr);

wait_stream_connected(qts1, "st0", &addr);
g_assert_cmpint(addr->type, ==, SOCKET_ADDRESS_TYPE_INET);
g_assert_cmpstr(addr->u.inet.host, ==, "127.0.0.1");
g_assert_cmpint(atoi(addr->u.inet.port), ==, port);
qapi_free_SocketAddress(addr);

qtest_quit(qts1);
qtest_quit(qts0);
}

static void test_stream_inet_ipv6(void)
{
QTestState *qts0, *qts1;
Expand Down Expand Up @@ -418,6 +517,8 @@ int main(int argc, char **argv)
#ifndef _WIN32
qtest_add_func("/netdev/dgram/mcast", test_dgram_mcast);
#endif
qtest_add_func("/netdev/stream/inet/reconnect",
test_stream_inet_reconnect);
}
if (has_ipv6) {
qtest_add_func("/netdev/stream/inet/ipv6", test_stream_inet_ipv6);
Expand Down

0 comments on commit 148fbf0

Please sign in to comment.