diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index cdba27ed72..c4e9eaa287 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -113,6 +113,8 @@ def initialize config_param :path_timezone, :string, default: nil desc 'Follow inodes instead of following file names. Guarantees more stable delivery and allows to use * in path pattern with rotating files' config_param :follow_inodes, :bool, default: false + desc 'Skip long lines which size is longer than specified length' + config_param :skip_long_lines, :size, default: nil config_section :parse, required: false, multi: true, init: true, param_name: :parser_configs do config_argument :usage, :string, default: 'in_tail_parser' @@ -594,6 +596,12 @@ def flush_buffer(tw, buf) # @return true if no error or unrecoverable error happens in emit action. false if got BufferOverflowError def receive_lines(lines, tail_watcher) + lines = lines.reject do |line| + skip_line = @skip_long_lines ? line.bytesize > @skip_long_lines : false + log.warn "received line length is longer than #{@skip_long_lines}" if skip_line + log.debug "skipped line: #{line.chomp}" if skip_line + skip_line + end es = @receive_handler.call(lines, tail_watcher) unless es.empty? tag = if @tag_prefix || @tag_suffix diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 01dd4c5250..fe1ac78ff4 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -1707,6 +1707,41 @@ def test_tag_prefix_and_suffix_ignore mock(plugin.router).emit_stream('pre.foo.bar.log.post', anything).once plugin.receive_lines(['foo', 'bar'], DummyWatcher.new('foo.bar.log')) end + + data( + small: ["128", 128], + KiB: ["1k", 1024] + ) + test 'skip_long_lines' do |(label, size)| + config = config_element("", "", { + "tag" => "skip_long_lines", + "path" => "#{TMP_DIR}/with_long_lines.txt", + "format" => "none", + "read_from_head" => true, + "skip_long_lines" => label, + "log_level" => "debug" + }) + File.open("#{TMP_DIR}/with_long_lines.txt", "w+") do |f| + f.puts "foo" + f.puts "x" * size # 'x' * size + \n > @skip_long_lines + f.puts "bar" + end + d = create_driver(config, false) + timestamp = Time.parse("Mon Nov 29 11:22:33 UTC 2021") + Timecop.freeze(timestamp) + d.run(expect_records: 2) + assert_equal([ + [{"message" => "foo"},{"message" => "bar"}], + [ + "2021-11-29 11:22:33 +0000 [warn]: received line length is longer than #{size}\n", + "2021-11-29 11:22:33 +0000 [debug]: skipped line: #{'x' * size}\n" + ] + ], + [ + d.events.collect { |event| event.last }, + d.logs[-2..] + ]) + end end # Ensure that no fatal exception is raised when a file is missing and that