diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index 7d255e3769b..89f837a66cb 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -287,6 +287,8 @@ struct flb_config { int shutdown_by_hot_reloading; int hot_reloading; int hot_reload_succeeded; + + int hot_reload_watchdog_timeout_seconds; /* Routing */ size_t route_mask_size; @@ -381,6 +383,7 @@ enum conf_type { #define FLB_CONF_STR_HOT_RELOAD "Hot_Reload" #define FLB_CONF_STR_HOT_RELOAD_ENSURE_THREAD_SAFETY "Hot_Reload.Ensure_Thread_Safety" +#define FLB_CONF_STR_HOT_RELOAD_TIMEOUT "Hot_Reload.Timeout" /* Set up maxstdio (Windows) */ #define FLB_CONF_STR_WINDOWS_MAX_STDIO "windows.maxstdio" diff --git a/plugins/in_dummy/in_dummy.c b/plugins/in_dummy/in_dummy.c index 6d008b4cb2c..ea7a23d40a6 100644 --- a/plugins/in_dummy/in_dummy.c +++ b/plugins/in_dummy/in_dummy.c @@ -30,6 +30,7 @@ #include #include #include +#include #include "in_dummy.h" @@ -332,6 +333,7 @@ static int in_dummy_init(struct flb_input_instance *in, ctx->ins = in; ctx->samples = 0; ctx->samples_count = 0; + ctx->test_hang_on_exit = FLB_FALSE; /* Initialize head config */ ret = configure(ctx, in, &tm); @@ -391,6 +393,13 @@ static int in_dummy_exit(void *data, struct flb_config *config) (void) *config; struct flb_dummy *ctx = data; + /* Test-only hang used by watchdog tests */ + if (ctx->test_hang_on_exit) { + flb_plg_debug(ctx->ins, "TEST: Simulating hang for hot reload watchdog test"); + /* 1000 seconds */ + flb_time_msleep(1000 * 1000); + } + config_destroy(ctx); return 0; @@ -453,6 +462,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct flb_dummy, flush_on_startup), "generate the first event on startup" }, + { + FLB_CONFIG_MAP_BOOL, "test_hang_on_exit", "false", + 0, FLB_TRUE, offsetof(struct flb_dummy, test_hang_on_exit), + "TEST ONLY: simulate hang during exit to test hot reload watchdog" + }, {0} }; diff --git a/plugins/in_dummy/in_dummy.h b/plugins/in_dummy/in_dummy.h index f225fcb4eeb..1f9cb5aef27 100644 --- a/plugins/in_dummy/in_dummy.h +++ b/plugins/in_dummy/in_dummy.h @@ -49,6 +49,7 @@ struct flb_dummy { int fixed_timestamp; int flush_on_startup; + int test_hang_on_exit; /* TEST ONLY: Used for hot reload watchdog testing */ char *ref_metadata_msgpack; size_t ref_metadata_msgpack_size; diff --git a/src/flb_config.c b/src/flb_config.c index 77872a1eba4..c4622db6a16 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -205,6 +205,10 @@ struct flb_service_config service_configs[] = { FLB_CONF_TYPE_BOOL, offsetof(struct flb_config, ensure_thread_safety_on_hot_reloading)}, + {FLB_CONF_STR_HOT_RELOAD_TIMEOUT, + FLB_CONF_TYPE_INT, + offsetof(struct flb_config, hot_reload_watchdog_timeout_seconds)}, + {NULL, FLB_CONF_TYPE_OTHER, 0} /* end of array */ }; @@ -308,6 +312,7 @@ struct flb_config *flb_config_init() config->shutdown_by_hot_reloading = FLB_FALSE; config->hot_reloading = FLB_FALSE; config->hot_reload_succeeded = FLB_FALSE; + config->hot_reload_watchdog_timeout_seconds = 0; #ifdef FLB_SYSTEM_WINDOWS config->win_maxstdio = 512; diff --git a/src/flb_reload.c b/src/flb_reload.c index c82dae71daa..17a9ed82fbd 100644 --- a/src/flb_reload.c +++ b/src/flb_reload.c @@ -32,12 +32,16 @@ #include #include #include +#include #include #include #include #include +#include +#include + static int flb_input_propery_check_all(struct flb_config *config) { int ret; @@ -376,6 +380,66 @@ static int flb_reload_reinstantiate_external_plugins(struct flb_config *src, str return 0; } +struct flb_reload_watchdog_ctx { + pthread_t tid; + int timeout_seconds; +}; + +static void *hot_reload_watchdog_thread(void *arg) +{ + struct flb_reload_watchdog_ctx *ctx = (struct flb_reload_watchdog_ctx *)arg; + + /* Set async cancellation type for (mostly) immediate response to pthread_cancel */ + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); + + flb_time_msleep(ctx->timeout_seconds * 1000); + + flb_error("[hot_reload_watchdog] Hot reload timeout exceeded (%d seconds), " + "aborting to prevent indefinite hang", ctx->timeout_seconds); + abort(); +} + +static struct flb_reload_watchdog_ctx *flb_reload_watchdog_start(struct flb_config *config) +{ + struct flb_reload_watchdog_ctx *watchdog_ctx; + int ret; + + if (config->hot_reload_watchdog_timeout_seconds <= 0) { + flb_debug("[reload] Hot reload watchdog disabled"); + return NULL; + } + + watchdog_ctx = flb_malloc(sizeof(struct flb_reload_watchdog_ctx)); + if (!watchdog_ctx) { + flb_errno(); + return NULL; + } + watchdog_ctx->timeout_seconds = config->hot_reload_watchdog_timeout_seconds; + + ret = pthread_create(&watchdog_ctx->tid, NULL, hot_reload_watchdog_thread, watchdog_ctx); + if (ret != 0) { + flb_error("[reload] Failed to create hot reload watchdog thread: %d", ret); + flb_free(watchdog_ctx); + return NULL; + } + + flb_debug("[reload] Hot reload watchdog thread started"); + return watchdog_ctx; +} + +static void flb_reload_watchdog_cleanup(struct flb_reload_watchdog_ctx *watchdog_ctx) +{ + if (!watchdog_ctx) { + return; + } + + pthread_cancel(watchdog_ctx->tid); + pthread_join(watchdog_ctx->tid, NULL); + flb_debug("[reload] Hot reload watchdog thread cancelled"); + + flb_free(watchdog_ctx); +} + int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) { int ret; @@ -387,6 +451,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) struct flb_cf *original_cf; int verbose; int reloaded_count = 0; + struct flb_reload_watchdog_ctx *watchdog_ctx = NULL; if (ctx == NULL) { flb_error("[reload] given flb context is NULL"); @@ -417,6 +482,9 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) (long unsigned int) getpid(), (void *) pthread_self()); + /* Start the watchdog thread */ + watchdog_ctx = flb_reload_watchdog_start(old_config); + if (old_config->conf_path_file) { file = flb_sds_create(old_config->conf_path_file); } @@ -427,6 +495,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) } flb_cf_destroy(new_cf); flb_error("[reload] reconstruct cf failed"); + flb_reload_watchdog_cleanup(watchdog_ctx); return FLB_RELOAD_HALTED; } } @@ -439,7 +508,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) } flb_cf_destroy(new_cf); flb_error("[reload] creating flb context is failed. Reloading is halted"); - + flb_reload_watchdog_cleanup(watchdog_ctx); return FLB_RELOAD_HALTED; } @@ -469,7 +538,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) if (!new_cf) { flb_sds_destroy(file); old_config->hot_reloading = FLB_FALSE; - + flb_reload_watchdog_cleanup(watchdog_ctx); return FLB_RELOAD_HALTED; } } @@ -485,7 +554,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) flb_destroy(new_ctx); old_config->hot_reloading = FLB_FALSE; flb_error("[reload] reloaded config is invalid. Reloading is halted"); - + flb_reload_watchdog_cleanup(watchdog_ctx); return FLB_RELOAD_HALTED; } } @@ -499,7 +568,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) old_config->hot_reloading = FLB_FALSE; flb_error("[reload] reloaded config format is invalid. Reloading is halted"); - + flb_reload_watchdog_cleanup(watchdog_ctx); return FLB_RELOAD_HALTED; } @@ -512,7 +581,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) old_config->hot_reloading = FLB_FALSE; flb_error("[reload] reloaded config is invalid. Reloading is halted"); - + flb_reload_watchdog_cleanup(watchdog_ctx); return FLB_RELOAD_HALTED; } @@ -541,7 +610,7 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) old_config->hot_reloading = FLB_FALSE; flb_error("[reload] loaded configuration contains error(s). Reloading is aborted"); - + flb_reload_watchdog_cleanup(watchdog_ctx); return FLB_RELOAD_ABORTED; } @@ -550,6 +619,9 @@ int flb_reload(flb_ctx_t *ctx, struct flb_cf *cf_opts) flb_debug("[reload] hot reloaded %d time(s)", reloaded_count); new_config->hot_reloading = FLB_FALSE; new_config->hot_reload_succeeded = FLB_TRUE; + + /* Cancel the watchdog thread since reload completed successfully */ + flb_reload_watchdog_cleanup(watchdog_ctx); return 0; } diff --git a/tests/internal/reload.c b/tests/internal/reload.c index e6a695d5102..c4d258c00ee 100644 --- a/tests/internal/reload.c +++ b/tests/internal/reload.c @@ -11,6 +11,12 @@ #include #include +#ifndef FLB_SYSTEM_WINDOWS +#include +#include +#include +#endif + #include "flb_tests_internal.h" #define FLB_YAML FLB_TESTS_DATA_PATH "/data/reload/yaml/processor.yaml" @@ -241,9 +247,153 @@ void test_reload_yaml() flb_destroy(ctx); } +/* Test hot reload watchdog timeout functionality */ +#ifndef FLB_SYSTEM_WINDOWS +void test_reload_watchdog_timeout() +{ + struct flb_cf *cf = NULL; + struct flb_cf *cf_opts; + struct flb_cf_section *section; + struct cfl_variant *ret; + flb_ctx_t *ctx; + int status; + pid_t pid; + int wstatus; + + /* Fork a child process to test the watchdog timeout */ + pid = fork(); + + if (pid == 0) { + /* Child process - this will trigger the watchdog and abort */ + + /* create context */ + cf_opts = flb_cf_create(); + if (!cf_opts) { + exit(1); + } + + /* add a valid section (input) with hang flag */ + section = flb_cf_section_create(cf_opts, "INPUT", 5); + if (!section) { + exit(1); + } + + /* add property to create dummy input */ + ret = flb_cf_section_property_add(cf_opts, section->properties, "name", 0, "dummy", 0); + if (!ret) { + exit(1); + } + + /* IMPORTANT: Enable the test hang flag */ + ret = flb_cf_section_property_add(cf_opts, section->properties, + "test_hang_on_exit", 0, "true", 0); + if (!ret) { + exit(1); + } + + /* Set a very fast collection rate (10 times per second) */ + ret = flb_cf_section_property_add(cf_opts, section->properties, + "rate", 0, "10", 0); + if (!ret) { + exit(1); + } + + /* Add an output to ensure the engine runs */ + section = flb_cf_section_create(cf_opts, "OUTPUT", 6); + if (!section) { + exit(1); + } + + ret = flb_cf_section_property_add(cf_opts, section->properties, "name", 0, "null", 0); + if (!ret) { + exit(1); + } + + ctx = flb_create(); + if (!ctx) { + exit(1); + } + + cf = ctx->config->cf_main; + + ctx->config->enable_hot_reload = FLB_TRUE; + ctx->config->hot_reload_watchdog_timeout_seconds = 2; /* Short timeout for testing */ + + status = flb_reload_reconstruct_cf(cf_opts, cf); + if (status != 0) { + exit(1); + } + + status = flb_config_load_config_format(ctx->config, cf); + if (status != 0) { + exit(1); + } + + /* Start the engine */ + status = flb_start(ctx); + if (status != 0) { + exit(1); + } + + /* Give the engine time to start and begin collecting */ + sleep(2); + + /* Trigger hot reload - this should hang in dummy collect and trigger watchdog */ + flb_info("[TEST] Triggering hot reload with hanging dummy input..."); + status = flb_reload(ctx, cf_opts); + + /* We should never reach here - watchdog should abort */ + flb_error("[TEST] ERROR: Hot reload completed without triggering watchdog!"); + exit(2); + } + else if (pid > 0) { + /* Parent process - wait for child and check it was aborted */ + alarm(10); /* Set a 10 second timeout for the test itself */ + + if (waitpid(pid, &wstatus, 0) == -1) { + TEST_CHECK(0); + TEST_MSG("waitpid failed"); + return; + } + + alarm(0); /* Cancel the alarm */ + + /* Check that the child was terminated by SIGABRT */ + if (WIFSIGNALED(wstatus)) { + int sig = WTERMSIG(wstatus); + flb_info("[TEST] Child process terminated by signal %d", sig); + + /* We expect SIGABRT (usually signal 6) from abort() */ + TEST_CHECK(sig == SIGABRT); + TEST_MSG("Expected SIGABRT (%d), got signal %d", SIGABRT, sig); + } + else if (WIFEXITED(wstatus)) { + int exit_code = WEXITSTATUS(wstatus); + flb_error("[TEST] Child process exited normally with code %d", exit_code); + TEST_CHECK(0); + TEST_MSG("Process should have been aborted by watchdog, not exit normally"); + } + else { + TEST_CHECK(0); + TEST_MSG("Unexpected child termination status"); + } + } + else { + TEST_CHECK(0); + TEST_MSG("fork() failed"); + } +} +#else +void test_reload_watchdog_timeout() +{ + TEST_MSG("skipped on Windows"); +} +#endif + TEST_LIST = { { "reconstruct_cf" , test_reconstruct_cf}, { "reload" , test_reload}, { "reload_yaml" , test_reload_yaml}, + { "reload_watchdog_timeout", test_reload_watchdog_timeout}, { 0 } };