diff --git a/src/flb_engine.c b/src/flb_engine.c index ea3d069ad9b..d7fcd7a6223 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -134,9 +134,9 @@ void flb_engine_reschedule_retries(struct flb_config *config) task = mk_list_entry(t_head, struct flb_task, _head); if (task->users > 0) { - flb_debug("[engine] retry=%p for task %i already scheduled to run, " - "not re-scheduling it.", - retry, task->id); + flb_debug("[engine] task %i already scheduled to run, not re-scheduling it.", + task->id + ); continue; } @@ -581,7 +581,7 @@ static FLB_INLINE int flb_engine_handle_event(flb_pipefd_t fd, int mask, return 0; } else if (config->shutdown_fd == fd) { - flb_utils_pipe_byte_consume(fd); + flb_utils_timer_consume(fd); return FLB_ENGINE_SHUTDOWN; } else if (config->ch_manager[0] == fd) { @@ -716,7 +716,6 @@ int flb_engine_start(struct flb_config *config) struct flb_sched *sched; struct flb_net_dns dns_ctx; struct flb_notification *notification; - int exiting = FLB_FALSE; /* Initialize the networking layer */ flb_net_lib_init(); @@ -1001,6 +1000,12 @@ int flb_engine_start(struct flb_config *config) flb_event_priority_live_foreach(event, evl_bktq, evl, FLB_ENGINE_LOOP_MAX_ITER) { if (event->type == FLB_ENGINE_EV_CORE) { ret = flb_engine_handle_event(event->fd, event->mask, config); + + /* + * This block will be called once on engine stop. + * Will reschedule task to 1 sec. retry. + * Also timer with shutdown event will be created. + */ if (ret == FLB_ENGINE_STOP) { if (config->grace_count == 0) { if (config->grace >= 0) { @@ -1015,11 +1020,7 @@ int flb_engine_start(struct flb_config *config) } /* mark the runtime as the ingestion is not active and that we are in shutting down mode */ - config->is_ingestion_active = FLB_FALSE; - config->is_shutting_down = FLB_TRUE; - - /* pause all input plugin instances */ - flb_input_pause_all(config); + flb_engine_stop_ingestion(config); /* * We are preparing to shutdown, we give a graceful time @@ -1028,6 +1029,7 @@ int flb_engine_start(struct flb_config *config) event = &config->event_shutdown; event->mask = MK_EVENT_EMPTY; event->status = MK_EVENT_NONE; + event->priority = FLB_ENGINE_PRIORITY_SHUTDOWN; /* * Configure a timer of 1 second, on expiration the code will @@ -1038,11 +1040,18 @@ int flb_engine_start(struct flb_config *config) * If no tasks exists, there is no need to wait for the maximum * grace period. */ - config->shutdown_fd = mk_event_timeout_create(evl, - 1, - 0, - event); - event->priority = FLB_ENGINE_PRIORITY_SHUTDOWN; + if (config->shutdown_fd <= 0) { + config->shutdown_fd = mk_event_timeout_create(evl, + 1, + 0, + event); + + if (config->shutdown_fd == -1) { + flb_error("[engine] could not create shutdown timer"); + /* fail early so we don't silently skip scheduled shutdown */ + return -1; + } + } } else if (ret == FLB_ENGINE_SHUTDOWN) { /* Increase the grace counter */ @@ -1062,10 +1071,19 @@ int flb_engine_start(struct flb_config *config) fs_chunks = 0; tasks = flb_task_running_count(config); flb_storage_chunk_count(config, &mem_chunks, &fs_chunks); + + if ((mem_chunks + fs_chunks) > 0) { + flb_info("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d", + mem_chunks, fs_chunks, config->grace_count); + } + + if (tasks > 0) { + flb_task_running_print(config); + } + ret = tasks + mem_chunks + fs_chunks; if (ret > 0 && (config->grace_count < config->grace || config->grace == -1)) { if (config->grace_count == 1) { - flb_task_running_print(config); /* * If storage.backlog.shutdown_flush is enabled, attempt to flush pending * filesystem chunks during shutdown. This is particularly useful in scenarios @@ -1079,32 +1097,10 @@ int flb_engine_start(struct flb_config *config) } } } - if ((mem_chunks + fs_chunks) > 0) { - flb_info("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d", - mem_chunks, fs_chunks, config->grace_count); - } - /* Create new tasks for pending chunks */ flb_engine_flush(config, NULL); - if (config->grace_count < config->grace_input) { - if (exiting == FLB_FALSE) { - flb_engine_exit(config); - exiting = FLB_TRUE; - } - } else { - if (config->is_ingestion_active == FLB_TRUE) { - flb_engine_stop_ingestion(config); - } - } } else { - if (tasks > 0) { - flb_task_running_print(config); - } - if ((mem_chunks + fs_chunks) > 0) { - flb_info("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d", - mem_chunks, fs_chunks, config->grace_count); - } flb_info("[engine] service has stopped (%i pending tasks)", tasks); ret = config->exit_status_code; @@ -1115,7 +1111,6 @@ int flb_engine_start(struct flb_config *config) &config->event_shutdown); } - config = NULL; return ret; } }