Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 99 additions & 13 deletions plugins/in_calyptia_fleet/in_calyptia_fleet.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <time.h>
#include <signal.h>
#include <sys/stat.h>
#include <errno.h>
#include <limits.h>

#include <msgpack.h>
#include <fluent-bit/flb_input.h>
Expand Down Expand Up @@ -64,6 +66,7 @@ static int fleet_cur_chdir(struct flb_in_calyptia_fleet_config *ctx);
static int get_calyptia_files(struct flb_in_calyptia_fleet_config *ctx,
time_t timestamp);
static void in_calyptia_fleet_destroy(struct flb_in_calyptia_fleet_config *ctx);
static struct cfl_array *read_glob(const char *path);

#ifndef FLB_SYSTEM_WINDOWS

Expand Down Expand Up @@ -295,7 +298,6 @@ static int is_old_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct
flb_sds_t cfgcurname;
int ret = FLB_FALSE;


if (cfg == NULL) {
return FLB_FALSE;
}
Expand All @@ -319,40 +321,44 @@ static int is_old_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct
return ret;
}

static int is_timestamped_fleet_config_path(struct flb_in_calyptia_fleet_config *ctx, const char *path)
/**
* Returns the timestamp of the fleet config file if it is a timestamped file,
* or 0 if it is not a timestamped file.
*/
static time_t fleet_config_path_timestamp(struct flb_in_calyptia_fleet_config *ctx, const char *path)
{
char *fname;
char *end;
long val;
long long val;

if (path == NULL || ctx == NULL) {
return FLB_FALSE;
return 0;
}

fname = strrchr(path, PATH_SEPARATOR[0]);

if (fname == NULL) {
return FLB_FALSE;
return 0;
}

fname++;

errno = 0;
val = strtol(fname, &end, 10);
val = strtoll(fname, &end, 10);
if ((errno == ERANGE && (val == LONG_MAX || val == LONG_MIN)) || (errno != 0 && val == 0)) {
return FLB_FALSE;
return 0;
}

if (ctx->fleet_config_legacy_format) {
if (strcmp(end, ".conf") == 0) {
return FLB_TRUE;
return (time_t)val;
}
}
else if (strcmp(end, ".yaml") == 0) {
return FLB_TRUE;
return (time_t)val;
}

return FLB_FALSE;
return 0;
}

static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg)
Expand All @@ -365,7 +371,7 @@ static int is_timestamped_fleet_config(struct flb_in_calyptia_fleet_config *ctx,
return FLB_FALSE;
}

return is_timestamped_fleet_config_path(ctx, cfg->conf_path_file);
return fleet_config_path_timestamp(ctx, cfg->conf_path_file) > 0;
}

static int is_fleet_config(struct flb_in_calyptia_fleet_config *ctx, struct flb_config *cfg)
Expand Down Expand Up @@ -965,6 +971,74 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct
return 0;
}

/**
* Checks for existing config files having timestamps at or after new_timestamp.
* @param ctx Fleet configuration context
* @param new_timestamp Timestamp to compare against existing files
* @return FLB_TRUE if new_timestamp is newer than all existing files, FLB_FALSE otherwise.
*/
static int check_timestamp_is_newer(struct flb_in_calyptia_fleet_config *ctx, time_t new_timestamp)
{
flb_sds_t base_dir = NULL;
flb_sds_t glob_pattern = NULL;
struct cfl_array *files = NULL;
size_t idx;
int ret;
time_t file_timestamp;
const char *file_extension;

if (generate_base_fleet_directory(ctx, &base_dir) == NULL) {
return FLB_FALSE;
}

/* Create glob pattern to match all timestamped files */
glob_pattern = flb_sds_create_size(0);
if (glob_pattern == NULL) {
flb_sds_destroy(base_dir);
return FLB_FALSE;
}

file_extension = ctx->fleet_config_legacy_format ? "*.conf" : "*.yaml";
if (flb_sds_printf(&glob_pattern, "%s" PATH_SEPARATOR "%s", base_dir, file_extension) == NULL) {
flb_sds_destroy(glob_pattern);
flb_sds_destroy(base_dir);
return FLB_FALSE;
}

files = read_glob(glob_pattern);
if (files == NULL) {
/* No existing files found - could be empty directory or glob failure */
flb_plg_debug(ctx->ins, "no existing config files found in %s", base_dir);
flb_sds_destroy(base_dir);
flb_sds_destroy(glob_pattern);
return FLB_TRUE;
}

/* Check each existing file's timestamp */
ret = FLB_TRUE;
for (idx = 0; idx < files->entry_count; idx++) {
file_timestamp = fleet_config_path_timestamp(ctx, files->entries[idx]->data.as_string);
if (file_timestamp == 0) {
continue;
}

/* Check if existing file timestamp is greater than or equal to new timestamp */
if (file_timestamp >= new_timestamp) {
flb_plg_debug(ctx->ins,
"existing file with timestamp %ld >= new timestamp %ld",
(long long)file_timestamp, (long long)new_timestamp);
ret = FLB_FALSE;
break;
}
}

cfl_array_destroy(files);
flb_sds_destroy(base_dir);
flb_sds_destroy(glob_pattern);

return ret;
}

static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx,
flb_sds_t url,
const char *hdr,
Expand Down Expand Up @@ -1002,6 +1076,18 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx,
flb_strptime(fbit_last_modified, "%a, %d %B %Y %H:%M:%S GMT", &tm_last_modified);
last_modified = mktime(&tm_last_modified.tm);

/* Check if there are existing files with timestamps >= new timestamp */
if (check_timestamp_is_newer(ctx, last_modified) == FLB_FALSE) {
flb_plg_debug(ctx->ins, "not creating file with timestamp %lld since it is not newer than existing files",
(long long)last_modified);
ret = -1;
goto client_error;
}
else {
flb_plg_info(ctx->ins, "creating config file with timestamp %lld",
(long long)last_modified);
}

fname = time_fleet_config_filename(ctx, last_modified);
}
else {
Expand Down Expand Up @@ -1355,7 +1441,7 @@ static int calyptia_config_delete_old(struct flb_in_calyptia_fleet_config *ctx)
}

for (idx = 0; idx < confs->entry_count; idx++) {
if (is_timestamped_fleet_config_path(ctx, confs->entries[idx]->data.as_string) == FLB_TRUE) {
if (fleet_config_path_timestamp(ctx, confs->entries[idx]->data.as_string) > 0) {
cfl_array_append_string(tconfs, confs->entries[idx]->data.as_string);
}
}
Expand Down Expand Up @@ -1421,7 +1507,7 @@ static flb_sds_t calyptia_config_get_newest(struct flb_in_calyptia_fleet_config

for (idx = inis->entry_count-1; idx >= 0; idx--) {
curconf = inis->entries[idx]->data.as_string;
if (is_timestamped_fleet_config_path(ctx, curconf)) {
if (fleet_config_path_timestamp(ctx, curconf) > 0) {
cfgnewname = flb_sds_create(curconf);
break;
}
Expand Down
Loading