Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 74 additions & 6 deletions plugins/in_calyptia_fleet/in_calyptia_fleet.c
Original file line number Diff line number Diff line change
Expand Up @@ -2052,12 +2052,13 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins,
struct flb_config *config,
void *in_context)
{
int ret = -1;
struct flb_in_calyptia_fleet_config *ctx = in_context;

if (ctx->fleet_id == NULL) {
if (get_calyptia_fleet_id_by_name(ctx, config) == -1) {
flb_plg_error(ctx->ins, "unable to find fleet: %s", ctx->fleet_name);
FLB_INPUT_RETURN(-1);
goto fleet_id_error;
}
}

Expand All @@ -2066,11 +2067,56 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins,
FLB_INPUT_RETURN(-1);
}

if (get_calyptia_fleet_config(ctx) == -1) {
FLB_INPUT_RETURN(-1);
ret = get_calyptia_fleet_config(ctx);

fleet_id_error:
FLB_INPUT_RETURN(ret);
}

/* cb_collect callback */
static int in_calyptia_fleet_collect_cb(struct flb_input_instance *ins,
struct flb_config *config,
void *in_context)
{
FLB_INPUT_RETURN(in_calyptia_fleet_collect(ins, config, in_context));
}

static int in_calyptia_fleet_collect_once(struct flb_input_instance *ins,
struct flb_config *config,
void *in_context)
{
int ret;
struct flb_in_calyptia_fleet_config *ctx = in_context;

/* pause the oneshot interval forever. */
flb_input_collector_delete(ctx->initial_fd, ins);

ret = in_calyptia_fleet_collect(ins, config, in_context);

if (ret != 0 && ctx->initial_retries < FLEET_INITIAL_MAX_TRIES) {
ctx->initial_retries++;
ctx->initial_fd = flb_input_set_collector_time(ins,
in_calyptia_fleet_collect_once,
FLEET_INITIAL_RETRY_INTERVAL_SECONDS,
FLEET_INITIAL_RETRY_INTERVAL_NANOSECONDS,
config);

if (ctx->initial_fd == -1) {
flb_plg_error(ctx->ins, "could not initialize collector for fleet input plugin");
/* Resume main collector on retry scheduling failure */
flb_input_collector_resume(ctx->collect_fd, ins);
FLB_INPUT_RETURN(-1);
}

flb_plg_info(ctx->ins, "updating initial configuration with oneshot interval retry");
flb_input_collector_start(ctx->initial_fd, ins);
FLB_INPUT_RETURN(0);
}

FLB_INPUT_RETURN(0);
/* resume the main interval now that the oneshot has been ran */
flb_input_collector_resume(ctx->collect_fd, ins);

FLB_INPUT_RETURN(ret);
}

static int create_fleet_directory(struct flb_in_calyptia_fleet_config *ctx)
Expand Down Expand Up @@ -2555,7 +2601,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,

/* Set our collector based on time */
ret = flb_input_set_collector_time(in,
in_calyptia_fleet_collect,
in_calyptia_fleet_collect_cb,
ctx->interval_sec,
ctx->interval_nsec,
config);
Expand All @@ -2570,6 +2616,28 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
flb_plg_info(ctx->ins, "fleet collector initialized with interval: %d sec %d nsec",
ctx->interval_sec, ctx->interval_nsec);

if (is_fleet_config(ctx, config)) {
calyptia_config_commit(ctx);
}
else {
/* Attempt to fetch the configuration upon first startup.
*/
flb_input_collector_pause(ctx->collect_fd, in);

ctx->initial_retries = 0;
ctx->initial_fd = flb_input_set_collector_time(in,
in_calyptia_fleet_collect_once,
0,
0,
config);
if (ctx->initial_fd == -1) {
flb_plg_error(ctx->ins, "could not initialize collector for fleet input plugin");
in_calyptia_fleet_destroy(ctx);
return -1;
}
flb_plg_info(ctx->ins, "updating initial configuration with oneshot interval");
}

return 0;
}

Expand Down Expand Up @@ -2677,7 +2745,7 @@ struct flb_input_plugin in_calyptia_fleet_plugin = {
.description = "Calyptia Fleet Input",
.cb_init = in_calyptia_fleet_init,
.cb_pre_run = NULL,
.cb_collect = in_calyptia_fleet_collect,
.cb_collect = in_calyptia_fleet_collect_cb,
.cb_resume = cb_in_calyptia_fleet_resume,
.cb_pause = cb_in_calyptia_fleet_pause,
.cb_flush_buf = NULL,
Expand Down
8 changes: 8 additions & 0 deletions plugins/in_calyptia_fleet/in_calyptia_fleet.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@

#define FLEET_HEADERS_CONFIG_VERSION "Fleet-Config-Version"

#define FLEET_INITIAL_MAX_TRIES 1
#define FLEET_INITIAL_RETRY_INTERVAL_SECONDS 10
#define FLEET_INITIAL_RETRY_INTERVAL_NANOSECONDS 0

struct flb_in_calyptia_fleet_config {
/* Time interval check */
int interval_sec;
Expand Down Expand Up @@ -63,6 +67,10 @@ struct flb_in_calyptia_fleet_config {
struct flb_upstream *u;

int collect_fd;

/* track the initial configuration update */
int initial_fd;
int initial_retries;
};

struct reload_ctx {
Expand Down
Loading