Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions plugins/in_calyptia_fleet/in_calyptia_fleet.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ static int fleet_cur_chdir(struct flb_in_calyptia_fleet_config *ctx);
static int get_calyptia_files(struct flb_in_calyptia_fleet_config *ctx,
time_t timestamp);
static void in_calyptia_fleet_destroy(struct flb_in_calyptia_fleet_config *ctx);
static struct cfl_array *read_glob(const char *path);

#ifndef FLB_SYSTEM_WINDOWS

Expand Down Expand Up @@ -965,6 +966,93 @@ static int get_calyptia_fleet_id_by_name(struct flb_in_calyptia_fleet_config *ct
return 0;
}

/**
* Checks for existing config files having timestamps at or after new_timestamp.
* @param ctx Fleet configuration context
* @param new_timestamp Timestamp to compare against existing files
* @return FLB_TRUE if new_timestamp is newer than all existing files, FLB_FALSE otherwise.
*/
static int check_timestamp_is_newer(struct flb_in_calyptia_fleet_config *ctx, time_t new_timestamp)
{
flb_sds_t base_dir = NULL;
flb_sds_t glob_pattern = NULL;
struct cfl_array *files = NULL;
size_t idx;
int ret;
long file_timestamp;
char *ext;
char *fname;
char *path_copy = NULL;
const char *file_extension;

if (generate_base_fleet_directory(ctx, &base_dir) == NULL) {
return FLB_FALSE;
}

/* Create glob pattern to match all timestamped files */
glob_pattern = flb_sds_create_size(0);
if (glob_pattern == NULL) {
flb_sds_destroy(base_dir);
return FLB_FALSE;
}

file_extension = ctx->fleet_config_legacy_format ? "*.conf" : "*.yaml";
glob_pattern = flb_sds_printf(&glob_pattern, "%s" PATH_SEPARATOR "%s", base_dir, file_extension);

files = read_glob(glob_pattern);
if (files == NULL) {
/* No existing files found - could be empty directory or glob failure */
flb_plg_debug(ctx->ins, "no existing config files found in %s", base_dir);
flb_sds_destroy(base_dir);
flb_sds_destroy(glob_pattern);
return FLB_TRUE;
}

/* Check each existing file's timestamp */
ret = FLB_TRUE;
for (idx = 0; idx < files->entry_count; idx++) {
/* Create a copy to avoid basename() modifying the original path */
path_copy = flb_strdup(files->entries[idx]->data.as_string);
if (path_copy == NULL) {
ret = FLB_FALSE;
break;
}

fname = basename(path_copy);
if (fname == NULL) {
flb_free(path_copy);
ret = FLB_FALSE;
break;
}

errno = 0;
ext = NULL;
file_timestamp = strtol(fname, &ext, 10);
flb_free(path_copy);
fname = NULL;
if ((errno == ERANGE && (file_timestamp == LONG_MAX || file_timestamp == LONG_MIN)) ||
(errno != 0 && file_timestamp == 0)) {
/* Skip files that don't have valid timestamps */
continue;
}

/* Check if existing file timestamp is greater than or equal to new timestamp */
if (file_timestamp >= (long)new_timestamp) {
flb_plg_debug(ctx->ins,
"existing file with timestamp %ld >= new timestamp %ld",
file_timestamp, (long)new_timestamp);
ret = FLB_FALSE;
break;
}
}

cfl_array_destroy(files);
flb_sds_destroy(base_dir);
flb_sds_destroy(glob_pattern);

return ret;
}

static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx,
flb_sds_t url,
const char *hdr,
Expand Down Expand Up @@ -1002,6 +1090,18 @@ static int get_calyptia_file(struct flb_in_calyptia_fleet_config *ctx,
flb_strptime(fbit_last_modified, "%a, %d %B %Y %H:%M:%S GMT", &tm_last_modified);
last_modified = mktime(&tm_last_modified.tm);

/* Check if there are existing files with timestamps >= new timestamp */
if (check_timestamp_is_newer(ctx, last_modified) == FLB_FALSE) {
flb_plg_debug(ctx->ins, "not creating file with timestamp %ld since it is not newer than existing files",
(long)last_modified);
ret = -1;
goto client_error;
}
else {
flb_plg_info(ctx->ins, "creating config file with timestamp %ld",
(long)last_modified);
}

fname = time_fleet_config_filename(ctx, last_modified);
}
else {
Expand Down