Skip to content
Merged
13 changes: 8 additions & 5 deletions include/fluent-bit/flb_network.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,15 @@ struct flb_net_setup {
/* dns resolver : LEGACY or ASYNC */
char *dns_resolver;

/* prioritize ipv4 results when trying to establish a connection*/
/* prioritize ipv4 results when trying to establish a connection */
int dns_prefer_ipv4;

/* prioritize ipv6 results when trying to establish a connection*/
/* prioritize ipv6 results when trying to establish a connection */
int dns_prefer_ipv6;

/* allow this port to be shared */
int share_port;

/* maximum number of allowed active TCP connections */
int max_worker_connections;
};
Expand Down Expand Up @@ -165,10 +168,10 @@ flb_sockfd_t flb_net_udp_connect(const char *host, unsigned long port,
char *source_addr);

int flb_net_tcp_fd_connect(flb_sockfd_t fd, const char *host, unsigned long port);
flb_sockfd_t flb_net_server(const char *port, const char *listen_addr);
flb_sockfd_t flb_net_server_udp(const char *port, const char *listen_addr);
flb_sockfd_t flb_net_server(const char *port, const char *listen_addr, int share_port);
flb_sockfd_t flb_net_server_udp(const char *port, const char *listen_addr, int share_port);
flb_sockfd_t flb_net_server_unix(const char *listen_path, int stream_mode,
int backlog);
int backlog, int share_port);
int flb_net_bind(flb_sockfd_t fd, const struct sockaddr *addr,
socklen_t addrlen, int backlog);
int flb_net_bind_udp(flb_sockfd_t fd, const struct sockaddr *addr,
Expand Down
3 changes: 2 additions & 1 deletion plugins/in_collectd/in_collectd.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ static int in_collectd_init(struct flb_input_instance *in,
/* Set the context */
flb_input_set_context(in, ctx);

ctx->server_fd = flb_net_server_udp(ctx->port, ctx->listen);
ctx->server_fd = flb_net_server_udp(ctx->port, ctx->listen,
in->net_setup.share_port);
if (ctx->server_fd < 0) {
flb_plg_error(ctx->ins, "failed to bind to %s:%s", ctx->listen,
ctx->port);
Expand Down
2 changes: 1 addition & 1 deletion plugins/in_event_test/event_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ static int cb_event_test_init(struct flb_input_instance *ins,
ut->coll_id = ret;

/* unit test 2: collector_socket */
fd = flb_net_server(SERVER_PORT, SERVER_IFACE);
fd = flb_net_server(SERVER_PORT, SERVER_IFACE, FLB_FALSE);
if (fd < 0) {
flb_errno();
config_destroy(ctx);
Expand Down
2 changes: 1 addition & 1 deletion plugins/in_statsd/statsd.c
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ static int cb_statsd_init(struct flb_input_instance *ins,
flb_input_set_context(ins, ctx);

/* Accepts metrics from UDP connections. */
ctx->server_fd = flb_net_server_udp(ctx->port, ctx->listen);
ctx->server_fd = flb_net_server_udp(ctx->port, ctx->listen, ins->net_setup.share_port);
if (ctx->server_fd == -1) {
flb_plg_error(ctx->ins, "can't bind to %s:%s", ctx->listen, ctx->port);
flb_log_event_encoder_destroy(ctx->log_encoder);
Expand Down
16 changes: 12 additions & 4 deletions src/flb_downstream.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@

/* Config map for Downstream networking setup */
struct flb_config_map downstream_net[] = {
{
FLB_CONFIG_MAP_BOOL, "net.share_port", "false",
0, FLB_TRUE, offsetof(struct flb_net_setup, share_port),
"Allow multiple plugins to bind to the same port"
},

{
FLB_CONFIG_MAP_TIME, "net.io_timeout", "0s",
0, FLB_TRUE, offsetof(struct flb_net_setup, io_timeout),
Expand Down Expand Up @@ -107,20 +113,22 @@ int flb_downstream_setup(struct flb_downstream *stream,
snprintf(port_string, sizeof(port_string), "%u", port);

if (transport == FLB_TRANSPORT_TCP) {
stream->server_fd = flb_net_server(port_string, host);
stream->server_fd = flb_net_server(port_string, host, net_setup->share_port);
}
else if (transport == FLB_TRANSPORT_UDP) {
stream->server_fd = flb_net_server_udp(port_string, host);
stream->server_fd = flb_net_server_udp(port_string, host, net_setup->share_port);
}
else if (transport == FLB_TRANSPORT_UNIX_STREAM) {
stream->server_fd = flb_net_server_unix(host,
FLB_TRUE,
FLB_NETWORK_DEFAULT_BACKLOG_SIZE);
FLB_NETWORK_DEFAULT_BACKLOG_SIZE,
net_setup->share_port);
}
else if (transport == FLB_TRANSPORT_UNIX_DGRAM) {
stream->server_fd = flb_net_server_unix(host,
FLB_FALSE,
FLB_NETWORK_DEFAULT_BACKLOG_SIZE);
FLB_NETWORK_DEFAULT_BACKLOG_SIZE,
net_setup->share_port);
}

if (stream->server_fd != -1) {
Expand Down
38 changes: 35 additions & 3 deletions src/flb_network.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,25 @@ int flb_net_socket_reset(flb_sockfd_t fd)
return 0;
}

int flb_net_socket_share_port(flb_sockfd_t fd)
{
int on = 1;
int ret;

#ifdef SO_REUSEPORT
ret = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on));
#else
ret = -1;
#endif

if (ret == -1) {
flb_errno();
return -1;
}

return 0;
}

int flb_net_socket_tcp_nodelay(flb_sockfd_t fd)
{
int on = 1;
Expand Down Expand Up @@ -1534,7 +1553,7 @@ int flb_net_tcp_fd_connect(flb_sockfd_t fd, const char *host, unsigned long port
return ret;
}

flb_sockfd_t flb_net_server(const char *port, const char *listen_addr)
flb_sockfd_t flb_net_server(const char *port, const char *listen_addr, int share_port)
{
flb_sockfd_t fd = -1;
int ret;
Expand All @@ -1560,6 +1579,10 @@ flb_sockfd_t flb_net_server(const char *port, const char *listen_addr)
continue;
}

if (share_port) {
flb_net_socket_share_port(fd);
}

flb_net_socket_tcp_nodelay(fd);
flb_net_socket_reset(fd);

Expand All @@ -1580,7 +1603,7 @@ flb_sockfd_t flb_net_server(const char *port, const char *listen_addr)
return fd;
}

flb_sockfd_t flb_net_server_udp(const char *port, const char *listen_addr)
flb_sockfd_t flb_net_server_udp(const char *port, const char *listen_addr, int share_port)
{
flb_sockfd_t fd = -1;
int ret;
Expand All @@ -1606,6 +1629,10 @@ flb_sockfd_t flb_net_server_udp(const char *port, const char *listen_addr)
continue;
}

if (share_port) {
flb_net_socket_share_port(fd);
}

ret = flb_net_bind_udp(fd, rp->ai_addr, rp->ai_addrlen);
if(ret == -1) {
flb_warn("Cannot listen on %s port %s", listen_addr, port);
Expand All @@ -1626,7 +1653,8 @@ flb_sockfd_t flb_net_server_udp(const char *port, const char *listen_addr)
#ifdef FLB_HAVE_UNIX_SOCKET
flb_sockfd_t flb_net_server_unix(const char *listen_path,
int stream_mode,
int backlog)
int backlog,
int share_port)
{
size_t address_length;
size_t path_length;
Expand Down Expand Up @@ -1654,6 +1682,10 @@ flb_sockfd_t flb_net_server_unix(const char *listen_path,

strncpy(address.sun_path, listen_path, sizeof(address.sun_path));

if (share_port) {
flb_net_socket_share_port(fd);
}

if (stream_mode) {
ret = flb_net_bind(fd,
(const struct sockaddr *) &address,
Expand Down
2 changes: 1 addition & 1 deletion tests/internal/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ static void test_client_server(int is_ipv6)
}
TEST_CHECK(fd_client != -1);

fd_server = flb_net_server(TEST_PORT, host);
fd_server = flb_net_server(TEST_PORT, host, FLB_FALSE);
TEST_CHECK(fd_server != -1);

/* Create Event loop */
Expand Down