Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions plugins/in_forward/fw.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ static int in_fw_collect(struct flb_input_instance *ins,
return -1;
}

if(ctx->is_paused) {
flb_downstream_conn_release(connection);
flb_plg_trace(ins, "TCP connection will be closed FD=%i", connection->fd);
return -1;
}

flb_plg_trace(ins, "new TCP connection arrived FD=%i", connection->fd);

conn = fw_conn_add(connection, ctx);
Expand Down Expand Up @@ -265,6 +271,9 @@ static int in_fw_init(struct flb_input_instance *ins,
/* Set the context */
flb_input_set_context(ins, ctx);

/* Set plugin ingestion to active */
ctx->is_paused = FLB_FALSE;

/* Unix Socket mode */
if (ctx->unix_path) {
#ifndef FLB_HAVE_UNIX_SOCKET
Expand Down Expand Up @@ -348,6 +357,17 @@ static int in_fw_init(struct flb_input_instance *ins,
static void in_fw_pause(void *data, struct flb_config *config)
{
struct flb_in_fw_config *ctx = data;
if (config->is_running == FLB_TRUE) {
/*
* This is the case when we are not in a shutdown phase, but
* backpressure built up, and the plugin needs to
* pause the ingestion. The plugin should close all the connections
* and wait for the ingestion to resume.
*/
flb_input_collector_pause(ctx->coll_fd, ctx->ins);
fw_conn_del_all(ctx);
ctx->is_paused = FLB_TRUE;
}

/*
* If the plugin is paused AND the ingestion not longer active,
Expand All @@ -362,6 +382,15 @@ static void in_fw_pause(void *data, struct flb_config *config)
}
}

static void in_fw_resume(void *data, struct flb_config *config) {
struct flb_in_fw_config *ctx = data;
if (config->is_running == FLB_TRUE) {
ctx->is_paused = FLB_FALSE;
flb_input_collector_resume(ctx->coll_fd, ctx->ins);
}
}


static int in_fw_exit(void *data, struct flb_config *config)
{
(void) *config;
Expand Down Expand Up @@ -431,6 +460,7 @@ struct flb_input_plugin in_forward_plugin = {
.cb_collect = in_fw_collect,
.cb_flush_buf = NULL,
.cb_pause = in_fw_pause,
.cb_resume = in_fw_resume,
.cb_exit = in_fw_exit,
.config_map = config_map,
.flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS
Expand Down
3 changes: 3 additions & 0 deletions plugins/in_forward/fw.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ struct flb_in_fw_config {

struct flb_log_event_decoder *log_decoder;
struct flb_log_event_encoder *log_encoder;

/* Plugin is paused */
int is_paused;
};

#endif