Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ struct flb_config {
unsigned int hot_reloaded_count;
int shutdown_by_hot_reloading;
int hot_reloading;
int hot_reload_succeeded;

/* Routing */
size_t route_mask_size;
Expand Down
173 changes: 121 additions & 52 deletions plugins/in_calyptia_fleet/in_calyptia_fleet.c
Original file line number Diff line number Diff line change
Expand Up @@ -573,34 +573,19 @@ static int is_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_
is_timestamped_fleet_config(ctx, cfg);
}

static int exists_new_fleet_config(struct flb_in_calyptia_fleet_config *ctx)
static int exists_fleet_config(struct flb_in_calyptia_fleet_config *ctx,
const char *ref_name)
{
int ret = FLB_FALSE;
flb_sds_t cfgnewname;

cfgnewname = fleet_config_deref(ctx, "new");
if (cfgnewname == NULL) {
return FLB_FALSE;
}

ret = access(cfgnewname, F_OK) == 0 ? FLB_TRUE : FLB_FALSE;
flb_sds_destroy(cfgnewname);

return ret;
}

static int exists_old_fleet_config(struct flb_in_calyptia_fleet_config *ctx)
{
int ret = FLB_FALSE;
flb_sds_t cfgoldname;
flb_sds_t config_path;

cfgoldname = fleet_config_deref(ctx, "old");
if (cfgoldname == NULL) {
config_path = fleet_config_deref(ctx, ref_name);
if (config_path == NULL) {
return FLB_FALSE;
}

ret = access(cfgoldname, F_OK) == 0 ? FLB_TRUE : FLB_FALSE;
flb_sds_destroy(cfgoldname);
ret = access(config_path, F_OK) == 0 ? FLB_TRUE : FLB_FALSE;
flb_sds_destroy(config_path);

return ret;
}
Expand Down Expand Up @@ -633,13 +618,15 @@ static void *do_reload(void *data)
/* avoid reloading the current configuration... just use our new one! */
flb_context_set(reload->flb);
reload->flb->config->enable_hot_reload = FLB_TRUE;
reload->flb->config->hot_reload_succeeded = FLB_FALSE;
if (reload->flb->config->conf_path_file) {
flb_sds_destroy(reload->flb->config->conf_path_file);
}
reload->flb->config->conf_path_file = reload->cfg_path;

flb_free(reload);
sleep(5);
flb_info("reloading configuration from path: %s", reload->cfg_path);
flb_free(reload);
#ifndef FLB_SYSTEM_WINDOWS
kill(getpid(), SIGHUP);
#else
Expand Down Expand Up @@ -1575,50 +1562,67 @@ static int calyptia_config_delete_by_ref(struct flb_in_calyptia_fleet_config *ct
return FLB_TRUE;
}


static int calyptia_config_add(struct flb_in_calyptia_fleet_config *ctx,
const char *cfgname)
flb_sds_t cfgname)
{
flb_sds_t current_config = NULL;
flb_sds_t derefed_cur_config_path;
flb_sds_t derefed_new_config_path;
flb_sds_t cur_ref_filename;

current_config = fleet_config_deref(ctx, "new");
if (current_config == NULL) {
current_config = fleet_config_deref(ctx, "cur");
/* Repoint the old ref to the current ref (if it exists) */
derefed_cur_config_path = fleet_config_deref(ctx, "cur");
if (derefed_cur_config_path != NULL) {
if (fleet_config_set_ref(ctx, "old", derefed_cur_config_path) == FLB_FALSE) {
flb_plg_error(ctx->ins, "unable to move current configuration to old");
flb_sds_destroy(derefed_cur_config_path);
return FLB_FALSE;
}

flb_sds_destroy(derefed_cur_config_path);
}

/* If there's a current config, copy it to the old ref file */
if (current_config != NULL) {
if (fleet_config_set_ref(ctx, "old", current_config) == FLB_FALSE) {
flb_sds_destroy(current_config);
/* If there is uncommitted and different new config, delete it first */
derefed_new_config_path = fleet_config_deref(ctx, "new");
if (derefed_new_config_path != NULL
&& strcmp(derefed_new_config_path, cfgname) != 0) {
if (calyptia_config_delete_by_ref(ctx, "new") == FLB_FALSE) {
flb_plg_error(ctx->ins, "unable to delete new configuration by ref");
flb_sds_destroy(derefed_new_config_path);
return FLB_FALSE;
}
}
if (derefed_new_config_path != NULL) {
flb_sds_destroy(derefed_new_config_path);
}

/* Set the new ref file to the new config */
if (fleet_config_set_ref(ctx, "new", cfgname) == FLB_FALSE) {
flb_plg_error(ctx->ins, "unable to create new configuration reference.");
flb_sds_destroy(current_config);
flb_plg_error(ctx->ins, "unable to set new configuration by ref");
return FLB_FALSE;
}

/* Delete the current ref file if it exists */
cur_ref_filename = fleet_config_ref_filename(ctx, "cur");
if (cur_ref_filename != NULL) {
flb_plg_info(ctx->ins, "deleting current ref file: %s", cur_ref_filename);
unlink(cur_ref_filename);
flb_sds_destroy(cur_ref_filename);
}

flb_sds_destroy(current_config);
return FLB_TRUE;
}

/**
* Commits the latest received config as the valid, now-current config.
* This updates the ref file for the current config file to point to the
* new config file, and then deletes the old config file.
*/
static int calyptia_config_commit(struct flb_in_calyptia_fleet_config *ctx)
{
flb_sds_t config_path = NULL;
flb_sds_t new_ref_filename = NULL;

if (exists_new_fleet_config(ctx) == FLB_FALSE) {
if (exists_fleet_config(ctx, "new") == FLB_FALSE) {
flb_plg_info(ctx->ins, "no new configuration to commit");
return FLB_FALSE;
}
Expand All @@ -1637,7 +1641,7 @@ static int calyptia_config_commit(struct flb_in_calyptia_fleet_config *ctx)
}

/* Delete the old config and its ref file */
if (exists_old_fleet_config(ctx) == FLB_TRUE) {
if (exists_fleet_config(ctx, "old") == FLB_TRUE) {
if (calyptia_config_delete_by_ref(ctx, "old") == FLB_FALSE) {
flb_plg_error(ctx->ins, "unable to delete old configuration by ref");
return FLB_FALSE;
Expand All @@ -1652,8 +1656,11 @@ static int calyptia_config_commit(struct flb_in_calyptia_fleet_config *ctx)
return FLB_FALSE;
}

flb_plg_info(ctx->ins, "deleting config ref file: %s", new_ref_filename);
unlink(new_ref_filename);
flb_sds_destroy(new_ref_filename);

flb_plg_info(ctx->ins, "committed new config: %s", config_path);
flb_sds_destroy(config_path);

return FLB_TRUE;
Expand All @@ -1665,7 +1672,7 @@ static int calyptia_config_rollback(struct flb_in_calyptia_fleet_config *ctx)
flb_sds_t old_ref_filename = NULL;

/* Delete the new config and its ref file */
if (exists_new_fleet_config(ctx) == FLB_TRUE) {
if (exists_fleet_config(ctx, "new") == FLB_TRUE) {
if (calyptia_config_delete_by_ref(ctx, "new") == FLB_FALSE) {
flb_plg_error(ctx->ins, "unable to delete new configuration by ref");
return FLB_FALSE;
Expand All @@ -1685,6 +1692,7 @@ static int calyptia_config_rollback(struct flb_in_calyptia_fleet_config *ctx)
flb_sds_destroy(old_config_path);
return FLB_FALSE;
}
flb_plg_info(ctx->ins, "rolled back to previous config: %s", old_config_path);
flb_sds_destroy(old_config_path);

/* Delete the old config ref */
Expand All @@ -1700,6 +1708,51 @@ static int calyptia_config_rollback(struct flb_in_calyptia_fleet_config *ctx)
return FLB_TRUE;
}

/**
* Checks if the last config was successfully reloaded and if so, commits it.
* This considers a newly received config to be successfully reloaded if
* the current context indicates it was loaded from the new config file.
*
* This returns FLB_TRUE (even if the config was not committed) and
* FLB_FALSE if there was an error.
*/
static int commit_config_if_reloaded(struct flb_in_calyptia_fleet_config *ctx)
{
struct flb_config *config;

/* Get the current configuration */
config = ctx->ins->config;
if (config == NULL) {
return FLB_TRUE;
}

if (config->hot_reloading == FLB_TRUE) {
return FLB_TRUE;
}

if (config->hot_reload_succeeded != FLB_TRUE) {
/* The config either hasn't been reloaded yet or the reload failed */
return FLB_TRUE;
}

/* Check if the current config is from a new fleet config file */
if (exists_fleet_config(ctx, "new") == FLB_FALSE) {
return FLB_TRUE;
}

if (is_new_fleet_config(ctx, config)) {
/* The config was successfully reloaded from the new file, commit it */
if (calyptia_config_commit(ctx) == FLB_TRUE) {
flb_plg_info(ctx->ins, "committed reloaded configuration");
} else {
flb_plg_error(ctx->ins, "failed to commit reloaded configuration");
return FLB_FALSE;
}
}

return FLB_TRUE;
}

static void fleet_config_get_properties(flb_sds_t *buf, struct mk_list *props, int fleet_config_legacy_format)
{
struct mk_list *head;
Expand Down Expand Up @@ -1985,22 +2038,25 @@ static int in_calyptia_fleet_collect(struct flb_input_instance *ins,
struct flb_config *config,
void *in_context)
{
int ret = -1;
struct flb_in_calyptia_fleet_config *ctx = in_context;

if (ctx->fleet_id == NULL) {
if (get_calyptia_fleet_id_by_name(ctx, config) == -1) {
flb_plg_error(ctx->ins, "unable to find fleet: %s", ctx->fleet_name);
goto fleet_id_error;
FLB_INPUT_RETURN(-1);
}
}

if (get_calyptia_fleet_config(ctx) != FLB_TRUE) {
ret = -1;
/* Update symlinks if a recent reload was successful */
if (commit_config_if_reloaded(ctx) == FLB_FALSE) {
FLB_INPUT_RETURN(-1);
}

fleet_id_error:
FLB_INPUT_RETURN(ret);
if (get_calyptia_fleet_config(ctx) == -1) {
FLB_INPUT_RETURN(-1);
}

FLB_INPUT_RETURN(0);
}

static int create_fleet_directory(struct flb_in_calyptia_fleet_config *ctx)
Expand Down Expand Up @@ -2093,16 +2149,16 @@ static int fleet_cur_chdir(struct flb_in_calyptia_fleet_config *ctx)
static int load_fleet_config(struct flb_in_calyptia_fleet_config *ctx)
{
flb_ctx_t *flb_ctx = flb_context_get();
flb_sds_t config_path = NULL;
flb_sds_t config_path;

/* check if we are already using the fleet configuration file. */
if (is_fleet_config(ctx, flb_ctx->config) == FLB_FALSE) {
flb_plg_debug(ctx->ins, "loading configuration file");

/* Find the current config file, or as backup, the new one */
/* Find the current config file, or as backup, the old one */
config_path = fleet_config_deref(ctx, "cur");
if (config_path == NULL) {
config_path = fleet_config_deref(ctx, "new");
config_path = fleet_config_deref(ctx, "old");
}

if (config_path != NULL) {
Expand Down Expand Up @@ -2359,6 +2415,7 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
int upstream_flags;
struct flb_in_calyptia_fleet_config *ctx;
(void) data;
flb_sds_t new_config_path;

#ifdef _WIN32
char *tmpdir;
Expand Down Expand Up @@ -2461,15 +2518,27 @@ static int in_calyptia_fleet_init(struct flb_input_instance *in,
create_fleet_header(ctx);
}

/* If there is an uncommitted new config, delete it */
new_config_path = fleet_config_deref(ctx, "new");
if (new_config_path != NULL
&& is_new_fleet_config(ctx, config) == FLB_FALSE) {
flb_plg_warn(ctx->ins, "deleting uncommitted new config: %s", new_config_path);
if (calyptia_config_delete_by_ref(ctx, "new") == FLB_FALSE) {
flb_plg_error(ctx->ins, "unable to delete uncommitted new config");
flb_sds_destroy(new_config_path);
in_calyptia_fleet_destroy(ctx);
return -1;
}
flb_sds_destroy(new_config_path);
} else if (new_config_path != NULL) {
flb_sds_destroy(new_config_path);
}

/* if we load a new configuration then we will be reloaded anyways */
if (load_fleet_config(ctx) == FLB_TRUE) {
return 0;
}

if (is_fleet_config(ctx, config)) {
calyptia_config_commit(ctx);
}

/* Set our collector based on time */
ret = flb_input_set_collector_time(in,
in_calyptia_fleet_collect,
Expand Down
1 change: 1 addition & 0 deletions src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ struct flb_config *flb_config_init()
config->hot_reloaded_count = 0;
config->shutdown_by_hot_reloading = FLB_FALSE;
config->hot_reloading = FLB_FALSE;
config->hot_reload_succeeded = FLB_FALSE;

#ifdef FLB_SYSTEM_WINDOWS
config->win_maxstdio = 512;
Expand Down
1 change: 1 addition & 0 deletions src/flb_reload.c
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts)
new_config->hot_reloaded_count = reloaded_count;
flb_debug("[reload] hot reloaded %d time(s)", reloaded_count);
new_config->hot_reloading = FLB_FALSE;
new_config->hot_reload_succeeded = FLB_TRUE;

return 0;
}
Loading