From 782b562f479836766a6218048cbf9d3a9ad6862f Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Tue, 31 Mar 2020 16:32:02 +0900 Subject: [PATCH 1/3] check size first In test env, @file.size can change mtime Signed-off-by: Yuta Iwama --- lib/fluent/plugin/in_tail/position_file.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_tail/position_file.rb b/lib/fluent/plugin/in_tail/position_file.rb index 34c0e3adcb..2884f8e570 100644 --- a/lib/fluent/plugin/in_tail/position_file.rb +++ b/lib/fluent/plugin/in_tail/position_file.rb @@ -83,8 +83,8 @@ def try_compact size = nil @file_mutex.synchronize do - last_modified = @file.mtime size = @file.size + last_modified = @file.mtime end entries = fetch_compacted_entries From a750674f1ee9eff321cfdea66bbb434a39ba6a27 Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Tue, 31 Mar 2020 16:41:28 +0900 Subject: [PATCH 2/3] fetch_compacted_entries returns Entry to handle value later Signed-off-by: Yuta Iwama --- lib/fluent/plugin/in_tail/position_file.rb | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin/in_tail/position_file.rb b/lib/fluent/plugin/in_tail/position_file.rb index 2884f8e570..5c9d323255 100644 --- a/lib/fluent/plugin/in_tail/position_file.rb +++ b/lib/fluent/plugin/in_tail/position_file.rb @@ -21,7 +21,6 @@ class TailInput < Fluent::Plugin::Input class PositionFile UNWATCHED_POSITION = 0xffffffffffffffff POSITION_FILE_ENTRY_REGEX = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.freeze - POSITION_FILE_ENTRY_FORMAT = "%s\t%016x\t%016x\n".freeze def self.load(file, logger:) pf = new(file, logger: logger) @@ -93,7 +92,7 @@ def try_compact if last_modified == @file.mtime && size == @file.size @file.pos = 0 @file.truncate(0) - @file.write(entries.join) + @file.write(entries.values.map(&:to_entry_fmt).join) else # skip end @@ -104,7 +103,7 @@ def try_compact def compact @file_mutex.synchronize do - entries = fetch_compacted_entries + entries = fetch_compacted_entries.values.map(&:to_entry_fmt) @file.pos = 0 @file.truncate(0) @@ -133,11 +132,19 @@ def fetch_compacted_entries @logger.warn("#{path} already exists. use latest one: deleted #{entries[path]}") if @logger end - entries[path] = (POSITION_FILE_ENTRY_FORMAT % [path, pos, ino]) + entries[path] = Entry.new(path, pos, ino) end end - entries.values + entries + end + end + + Entry = Struct.new(:path, :pos, :ino) do + POSITION_FILE_ENTRY_FORMAT = "%s\t%016x\t%016x\n".freeze + + def to_entry_fmt + POSITION_FILE_ENTRY_FORMAT % [path, pos, ino] end end From 529af8a2fc9b72b72045a6cc123c5514e21745fe Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Tue, 31 Mar 2020 17:01:14 +0900 Subject: [PATCH 3/3] Update seek pos during compaction Signed-off-by: Yuta Iwama --- lib/fluent/plugin/in_tail/position_file.rb | 14 +++++++++++-- test/plugin/in_tail/test_position_file.rb | 24 ++++++++++++++++++++++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/lib/fluent/plugin/in_tail/position_file.rb b/lib/fluent/plugin/in_tail/position_file.rb index 5c9d323255..8fb6a28727 100644 --- a/lib/fluent/plugin/in_tail/position_file.rb +++ b/lib/fluent/plugin/in_tail/position_file.rb @@ -93,6 +93,12 @@ def try_compact @file.pos = 0 @file.truncate(0) @file.write(entries.values.map(&:to_entry_fmt).join) + + entries.each do |path, val| + if (m = @map[path]) + m.seek = val.seek + end + end else # skip end @@ -115,6 +121,7 @@ def fetch_compacted_entries entries = {} @file.pos = 0 + file_pos = 0 @file.each_line do |line| m = POSITION_FILE_ENTRY_REGEX.match(line) if m.nil? @@ -132,7 +139,8 @@ def fetch_compacted_entries @logger.warn("#{path} already exists. use latest one: deleted #{entries[path]}") if @logger end - entries[path] = Entry.new(path, pos, ino) + entries[path] = Entry.new(path, pos, ino, file_pos + path.size + 1) + file_pos += line.size end end @@ -140,7 +148,7 @@ def fetch_compacted_entries end end - Entry = Struct.new(:path, :pos, :ino) do + Entry = Struct.new(:path, :pos, :ino, :seek) do POSITION_FILE_ENTRY_FORMAT = "%s\t%016x\t%016x\n".freeze def to_entry_fmt @@ -165,6 +173,8 @@ def initialize(file, file_mutex, seek, pos, inode) @inode = inode end + attr_writer :seek + def update(ino, pos) @file_mutex.synchronize { @file.pos = @seek diff --git a/test/plugin/in_tail/test_position_file.rb b/test/plugin/in_tail/test_position_file.rb index 465b048403..69a8e969d2 100644 --- a/test/plugin/in_tail/test_position_file.rb +++ b/test/plugin/in_tail/test_position_file.rb @@ -77,6 +77,30 @@ def write_data(f, content) lines = @file.readlines assert_equal 5, lines.size end + + test 'update seek postion of remained position entry' do + pf = Fluent::Plugin::TailInput::PositionFile.new(@file, logger: $log) + pf['path1'] + pf['path2'] + pf['path3'] + pf.unwatch('path1') + + pf.try_compact + + @file.seek(0) + lines = @file.readlines + assert_equal "path2\t0000000000000000\t0000000000000000\n", lines[0] + assert_equal "path3\t0000000000000000\t0000000000000000\n", lines[1] + assert_equal 2, lines.size + + pf.unwatch('path2') + pf.unwatch('path3') + @file.seek(0) + lines = @file.readlines + assert_equal "path2\t#{UNWATCHED_STR}\t0000000000000000\n", lines[0] + assert_equal "path3\t#{UNWATCHED_STR}\t0000000000000000\n", lines[1] + assert_equal 2, lines.size + end end sub_test_case '#load' do