diff --git a/include/fluent-bit/flb_network.h b/include/fluent-bit/flb_network.h index 24a7d6f191d..4fb741527ec 100644 --- a/include/fluent-bit/flb_network.h +++ b/include/fluent-bit/flb_network.h @@ -74,12 +74,15 @@ struct flb_net_setup { /* dns resolver : LEGACY or ASYNC */ char *dns_resolver; - /* prioritize ipv4 results when trying to establish a connection*/ + /* prioritize ipv4 results when trying to establish a connection */ int dns_prefer_ipv4; - /* prioritize ipv6 results when trying to establish a connection*/ + /* prioritize ipv6 results when trying to establish a connection */ int dns_prefer_ipv6; + /* allow this port to be shared */ + int share_port; + /* maximum number of allowed active TCP connections */ int max_worker_connections; }; @@ -165,10 +168,10 @@ flb_sockfd_t flb_net_udp_connect(const char *host, unsigned long port, char *source_addr); int flb_net_tcp_fd_connect(flb_sockfd_t fd, const char *host, unsigned long port); -flb_sockfd_t flb_net_server(const char *port, const char *listen_addr); -flb_sockfd_t flb_net_server_udp(const char *port, const char *listen_addr); +flb_sockfd_t flb_net_server(const char *port, const char *listen_addr, int share_port); +flb_sockfd_t flb_net_server_udp(const char *port, const char *listen_addr, int share_port); flb_sockfd_t flb_net_server_unix(const char *listen_path, int stream_mode, - int backlog); + int backlog, int share_port); int flb_net_bind(flb_sockfd_t fd, const struct sockaddr *addr, socklen_t addrlen, int backlog); int flb_net_bind_udp(flb_sockfd_t fd, const struct sockaddr *addr, diff --git a/plugins/in_collectd/in_collectd.c b/plugins/in_collectd/in_collectd.c index a782a39f322..bcd07552ed5 100644 --- a/plugins/in_collectd/in_collectd.c +++ b/plugins/in_collectd/in_collectd.c @@ -110,7 +110,8 @@ static int in_collectd_init(struct flb_input_instance *in, /* Set the context */ flb_input_set_context(in, ctx); - ctx->server_fd = flb_net_server_udp(ctx->port, ctx->listen); + ctx->server_fd = flb_net_server_udp(ctx->port, ctx->listen, + in->net_setup.share_port); if (ctx->server_fd < 0) { flb_plg_error(ctx->ins, "failed to bind to %s:%s", ctx->listen, ctx->port); diff --git a/plugins/in_event_test/event_test.c b/plugins/in_event_test/event_test.c index c0af7e42ec9..f4db1e362de 100644 --- a/plugins/in_event_test/event_test.c +++ b/plugins/in_event_test/event_test.c @@ -287,7 +287,7 @@ static int cb_event_test_init(struct flb_input_instance *ins, ut->coll_id = ret; /* unit test 2: collector_socket */ - fd = flb_net_server(SERVER_PORT, SERVER_IFACE); + fd = flb_net_server(SERVER_PORT, SERVER_IFACE, FLB_FALSE); if (fd < 0) { flb_errno(); config_destroy(ctx); diff --git a/plugins/in_statsd/statsd.c b/plugins/in_statsd/statsd.c index bd0a2eb4df2..74022d9c036 100644 --- a/plugins/in_statsd/statsd.c +++ b/plugins/in_statsd/statsd.c @@ -312,7 +312,7 @@ static int cb_statsd_init(struct flb_input_instance *ins, flb_input_set_context(ins, ctx); /* Accepts metrics from UDP connections. */ - ctx->server_fd = flb_net_server_udp(ctx->port, ctx->listen); + ctx->server_fd = flb_net_server_udp(ctx->port, ctx->listen, ins->net_setup.share_port); if (ctx->server_fd == -1) { flb_plg_error(ctx->ins, "can't bind to %s:%s", ctx->listen, ctx->port); flb_log_event_encoder_destroy(ctx->log_encoder); diff --git a/src/flb_downstream.c b/src/flb_downstream.c index 3660e3710f5..fa014a90242 100644 --- a/src/flb_downstream.c +++ b/src/flb_downstream.c @@ -34,6 +34,12 @@ /* Config map for Downstream networking setup */ struct flb_config_map downstream_net[] = { + { + FLB_CONFIG_MAP_BOOL, "net.share_port", "false", + 0, FLB_TRUE, offsetof(struct flb_net_setup, share_port), + "Allow multiple plugins to bind to the same port" + }, + { FLB_CONFIG_MAP_TIME, "net.io_timeout", "0s", 0, FLB_TRUE, offsetof(struct flb_net_setup, io_timeout), @@ -107,20 +113,22 @@ int flb_downstream_setup(struct flb_downstream *stream, snprintf(port_string, sizeof(port_string), "%u", port); if (transport == FLB_TRANSPORT_TCP) { - stream->server_fd = flb_net_server(port_string, host); + stream->server_fd = flb_net_server(port_string, host, net_setup->share_port); } else if (transport == FLB_TRANSPORT_UDP) { - stream->server_fd = flb_net_server_udp(port_string, host); + stream->server_fd = flb_net_server_udp(port_string, host, net_setup->share_port); } else if (transport == FLB_TRANSPORT_UNIX_STREAM) { stream->server_fd = flb_net_server_unix(host, FLB_TRUE, - FLB_NETWORK_DEFAULT_BACKLOG_SIZE); + FLB_NETWORK_DEFAULT_BACKLOG_SIZE, + net_setup->share_port); } else if (transport == FLB_TRANSPORT_UNIX_DGRAM) { stream->server_fd = flb_net_server_unix(host, FLB_FALSE, - FLB_NETWORK_DEFAULT_BACKLOG_SIZE); + FLB_NETWORK_DEFAULT_BACKLOG_SIZE, + net_setup->share_port); } if (stream->server_fd != -1) { diff --git a/src/flb_network.c b/src/flb_network.c index e96f2576045..d183209fd52 100644 --- a/src/flb_network.c +++ b/src/flb_network.c @@ -192,6 +192,25 @@ int flb_net_socket_reset(flb_sockfd_t fd) return 0; } +int flb_net_socket_share_port(flb_sockfd_t fd) +{ + int on = 1; + int ret; + +#ifdef SO_REUSEPORT + ret = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)); +#else + ret = -1; +#endif + + if (ret == -1) { + flb_errno(); + return -1; + } + + return 0; +} + int flb_net_socket_tcp_nodelay(flb_sockfd_t fd) { int on = 1; @@ -1534,7 +1553,7 @@ int flb_net_tcp_fd_connect(flb_sockfd_t fd, const char *host, unsigned long port return ret; } -flb_sockfd_t flb_net_server(const char *port, const char *listen_addr) +flb_sockfd_t flb_net_server(const char *port, const char *listen_addr, int share_port) { flb_sockfd_t fd = -1; int ret; @@ -1560,6 +1579,10 @@ flb_sockfd_t flb_net_server(const char *port, const char *listen_addr) continue; } + if (share_port) { + flb_net_socket_share_port(fd); + } + flb_net_socket_tcp_nodelay(fd); flb_net_socket_reset(fd); @@ -1580,7 +1603,7 @@ flb_sockfd_t flb_net_server(const char *port, const char *listen_addr) return fd; } -flb_sockfd_t flb_net_server_udp(const char *port, const char *listen_addr) +flb_sockfd_t flb_net_server_udp(const char *port, const char *listen_addr, int share_port) { flb_sockfd_t fd = -1; int ret; @@ -1606,6 +1629,10 @@ flb_sockfd_t flb_net_server_udp(const char *port, const char *listen_addr) continue; } + if (share_port) { + flb_net_socket_share_port(fd); + } + ret = flb_net_bind_udp(fd, rp->ai_addr, rp->ai_addrlen); if(ret == -1) { flb_warn("Cannot listen on %s port %s", listen_addr, port); @@ -1626,7 +1653,8 @@ flb_sockfd_t flb_net_server_udp(const char *port, const char *listen_addr) #ifdef FLB_HAVE_UNIX_SOCKET flb_sockfd_t flb_net_server_unix(const char *listen_path, int stream_mode, - int backlog) + int backlog, + int share_port) { size_t address_length; size_t path_length; @@ -1654,6 +1682,10 @@ flb_sockfd_t flb_net_server_unix(const char *listen_path, strncpy(address.sun_path, listen_path, sizeof(address.sun_path)); + if (share_port) { + flb_net_socket_share_port(fd); + } + if (stream_mode) { ret = flb_net_bind(fd, (const struct sockaddr *) &address, diff --git a/tests/internal/network.c b/tests/internal/network.c index 50683f18ec7..c19b7e3a255 100644 --- a/tests/internal/network.c +++ b/tests/internal/network.c @@ -69,7 +69,7 @@ static void test_client_server(int is_ipv6) } TEST_CHECK(fd_client != -1); - fd_server = flb_net_server(TEST_PORT, host); + fd_server = flb_net_server(TEST_PORT, host, FLB_FALSE); TEST_CHECK(fd_server != -1); /* Create Event loop */