From 2012df4b3bb63871a7abfb4f4633f7a769e32f00 Mon Sep 17 00:00:00 2001 From: majimenez-stratio Date: Mon, 1 Nov 2021 02:44:45 +0100 Subject: [PATCH] Always read from head for new/rotated files after startup Signed-off-by: majimenez-stratio --- lib/fluent/plugin/in_tail.rb | 19 ++++++++++--------- test/plugin/test_in_tail.rb | 14 +++++++++++--- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 0ee8bab77f..f0437e4457 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -250,7 +250,7 @@ def start end end - refresh_watchers unless @skip_refresh_on_startup + refresh_watchers(true) unless @skip_refresh_on_startup timer_execute(:in_tail_refresh_watchers, @refresh_interval, &method(:refresh_watchers)) end @@ -366,7 +366,7 @@ def existence_path # It will cause log duplication after updated watch files. # In such case, you should separate log directory and specify two paths in path parameter. # e.g. path /path/to/dir/*,/path/to/rotated_logs/target_file - def refresh_watchers + def refresh_watchers(startup = false) target_paths_hash = expand_paths existence_paths_hash = existence_path @@ -376,12 +376,13 @@ def refresh_watchers added_hash = target_paths_hash.reject {|key, value| existence_paths_hash.key?(key)} stop_watchers(unwatched_hash, immediate: false, unwatched: true) unless unwatched_hash.empty? - start_watchers(added_hash) unless added_hash.empty? + start_watchers(added_hash, startup) unless added_hash.empty? end - def setup_watcher(target_info, pe) + def setup_watcher(target_info, pe, startup = false) line_buffer_timer_flusher = @multiline_mode ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil - tw = TailWatcher.new(target_info, pe, log, @read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler), @metrics) + read_from_head = startup ? @read_from_head : true + tw = TailWatcher.new(target_info, pe, log, read_from_head, @follow_inodes, method(:update_watcher), line_buffer_timer_flusher, method(:io_handler), @metrics) if @enable_watch_timer tt = TimerTrigger.new(1, log) { tw.on_notify } @@ -410,7 +411,7 @@ def setup_watcher(target_info, pe) raise e end - def construct_watcher(target_info) + def construct_watcher(target_info, startup) pe = nil if @pf pe = @pf[target_info] @@ -424,7 +425,7 @@ def construct_watcher(target_info) end begin - tw = setup_watcher(target_info, pe) + tw = setup_watcher(target_info, pe, startup) rescue WatcherSetupError => e log.warn "Skip #{target_info.path} because unexpected setup error happens: #{e}" return @@ -443,9 +444,9 @@ def construct_watcher(target_info) end end - def start_watchers(targets_info) + def start_watchers(targets_info, startup) targets_info.each_value {|target_info| - construct_watcher(target_info) + construct_watcher(target_info, startup) break if before_shutdown? } end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 89296da69a..8a2584a100 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -516,13 +516,21 @@ def test_shorter_than_rotate_wait rotated = false detached = false d = create_driver(config) + mock.proxy(d.instance).setup_watcher(anything, anything, anything) do |tw| + mock.proxy(tw).detach(anything) do |v| + detached = true + v + end + tw + end.once + mock.proxy(d.instance).setup_watcher(anything, anything) do |tw| mock.proxy(tw).detach(anything) do |v| detached = true v end tw - end.twice + end.once d.run(timeout: 10) do until detached do @@ -2188,7 +2196,7 @@ def test_ENOENT_error_after_setup_watcher 'format' => 'none', }) d = create_driver(config) - mock.proxy(d.instance).setup_watcher(anything, anything) do |tw| + mock.proxy(d.instance).setup_watcher(anything, anything, anything) do |tw| cleanup_file(path) tw end @@ -2213,7 +2221,7 @@ def test_EACCES_error_after_setup_watcher 'format' => 'none', }) d = create_driver(config, false) - mock.proxy(d.instance).setup_watcher(anything, anything) do |tw| + mock.proxy(d.instance).setup_watcher(anything, anything, anything) do |tw| FileUtils.chmod(0000, "#{TMP_DIR}/noaccess") tw end