Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fluentd emits invalid msgpacked chunks to output when chunks over the size limit are consumed #2084

Closed
adammw opened this issue Jul 23, 2018 · 10 comments
Labels

Comments

@adammw
Copy link
Contributor

adammw commented Jul 23, 2018

Fluentd emits invalid msgpacked chunks to output plugins when chunks are over the size limit set with chunk_size_limit.

We are seeing this in the out_kinesis plugin: awslabs/aws-fluent-plugin-kinesis#133. Unclear whether or not this is expected behaviour and the plugin needs to handle chunk fragments (or if this is even possible with the streaming msgpack parser), or if it's a bug within fluentd (that it should only provide parsable chunks).

Fluentd version: 1.2.3
Ruby version: 2.3.3
OS: Mac OS / Linux

/cc @repeatedly @tagomoris @riywo @gurunathj @grosser

Test Script

require 'fluent/plugin/output'
require 'fluent/test'
require 'fluent/test/driver/output'
require 'pry'

module Fluent
  module Plugin
    class TestOutput < BufferedOutput
      Fluent::Plugin.register_output('out_test', self)

      # needed to trigger @overrides_format_stream in handle_stream_simple
      include Fluent::SetTimeKeyMixin
      include Fluent::SetTagKeyMixin

      def configure(conf)
        @formatter = Fluent::Plugin.new_formatter("json")
        @formatter.configure(conf)
        super
      end

      def format(tag, time, record)
        [@formatter.format(tag, time, record).chomp.b].to_msgpack
      end

      def write(chunk)
        chunk.open do |io|
          begin
            data = io.read
            records = Fluent::Engine.msgpack_unpacker(StringIO.new(data)).to_enum.to_a
            puts "data #{data.size / 1024} KB - #{records.size} records"
            puts records.first
            puts "^^ this should not happen - msgpack parsing error" unless records.first.is_a? Array
          end
        end
      end

      def multi_workers_ready?
        true
      end
    end
  end
end


$driver = Fluent::Test::Driver::Output.new(Fluent::Plugin::TestOutput).configure <<~CONF
  log_level debug
  <buffer>
    chunk_limit_size "1m"
  </buffer>
CONF

$driver.run(force_flush_retry: true) do
  5.times do
    time = Fluent::EventTime.now
    events = Array.new(Kernel.rand(4000..6000)).enum_for(:each_with_index).map { |_,i| [time, { msg: ('a'.ord + i % 26).chr * 256 }] }
    puts "emitting #{events.size} events"
    $driver.feed("my.tag", events)
  end
end

puts $driver.logs

Expected behaviour

Each chunk should be a JSON-encoded string.

Actual behaviour / investigation

When a chunk is larger than the chunk size, "chunk bytes limit exceeds for an emitted event stream" log line is emitted and the code uses a different code path to split the chunks into smaller chunks, however it does not appear to respect the boundaries of the msgpack encoding. Therefore parsing this split chunk results in the last element being cut off (or unavailable), and further chunks being parsed incorrectly as an Integer rather than an array of strings.

emitting 4623 events
data 975 KB - 3698 records
{"msg":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}
emitting 4628 events
data 243 KB - 1086 records
103
^^ this should not happen - msgpack parsing error
emitting 5672 events
emitting 5198 events
emitting 4249 events
data 976 KB - 3702 records
{"msg":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}
data 244 KB - 1087 records
107
^^ this should not happen - msgpack parsing error
data 981 KB - 3724 records
{"msg":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}
data 513 KB - 2212 records
34
^^ this should not happen - msgpack parsing error
data 982 KB - 3727 records
{"msg":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}
data 387 KB - 1534 records
106
^^ this should not happen - msgpack parsing error
data 1008 KB - 3824 records
{"msg":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}
data 112 KB - 667 records
99
^^ this should not happen - msgpack parsing error
2018-07-23 16:39:52 -0700 [debug]: buffer started instance=70108479437040 stage_size=0 queue_size=0
2018-07-23 16:39:52 -0700 [debug]: flush_thread actually running
2018-07-23 16:39:52 -0700 [warn]: chunk bytes limit exceeds for an emitted event stream: 1248210bytes
2018-07-23 16:39:52 -0700 [warn]: chunk bytes limit exceeds for an emitted event stream: 1249560bytes
2018-07-23 16:39:52 -0700 [warn]: chunk bytes limit exceeds for an emitted event stream: 1531440bytes
2018-07-23 16:39:52 -0700 [warn]: chunk bytes limit exceeds for an emitted event stream: 1403460bytes
2018-07-23 16:39:52 -0700 [warn]: chunk bytes limit exceeds for an emitted event stream: 1147230bytes
2018-07-23 16:39:59 -0700 [debug]: closing buffer instance=70108479437040
@adammw
Copy link
Contributor Author

adammw commented Jul 23, 2018

Actually I think this has to be fixed in fluentd, as that's what provides the ChunkMessagePackEventStreamer mixin, which directly uses the chunk's io. So there's no way to handle this in the plugin when using msgpack_each

@okkez
Copy link
Contributor

okkez commented Jul 25, 2018

Is this intentional that you use BufferedOutput as a base class of your plugin?
We should use v1 API for a new plugin and we should migrate existing plugins to v1 API.

I've migrated your code to v1 API like followings.
I don't see the message "^^ this should not happen - msgpack parsing error".

require 'fluent/plugin/output'
require 'fluent/test'
require 'fluent/test/driver/output'

module Fluent
  module Plugin
    class TestOutput < Fluent::Plugin::Output
      Fluent::Plugin.register_output('out_test', self)

      helpers :formatter

      def configure(conf)
        super
        @formatter = formatter_create
      end

      def format(tag, time, record)
        [@formatter.format(tag, time, record).chomp.b].to_msgpack
      end

      def write(chunk)
        chunk.open do |io|
          begin
            data = io.read
            records = Fluent::Engine.msgpack_unpacker(StringIO.new(data)).to_enum.to_a
            puts "data #{data.size / 1024} KB - #{records.size} records"
            puts records.first
            puts "^^ this should not happen - msgpack parsing error" unless records.first.is_a? Array
          end
        end
      end

      def multi_workers_ready?
        true
      end
    end
  end
end


$driver = Fluent::Test::Driver::Output.new(Fluent::Plugin::TestOutput).configure <<~CONF
  log_level debug
  <format>
    @type json
  </format>
  <buffer>
    chunk_limit_size "1m"
  </buffer>
CONF

$driver.run(force_flush_retry: true) do
  5.times do
    time = Fluent::EventTime.now
    events = Array.new(Kernel.rand(4000..6000)).enum_for(:each_with_index).map { |_,i| [time, { msg: ('a'.ord + i % 26).chr * 256 }] }
    puts "emitting #{events.size} events"
    $driver.feed("my.tag", events)
  end
end

puts $driver.logs

@adammw
Copy link
Contributor Author

adammw commented Jul 25, 2018

It is intentional as I'm debugging an issue i'm seeing in the https://github.com/awslabs/aws-fluent-plugin-kinesis plugin, which inherits from the BufferedOutput class so that it can be compatible with both Fluentd v0.12 and v1.0. As shown in the example, the RecordFilterMixin is in use via the inclusion of Fluent::SetTimeKeyMixin and Fluent::SetTagKeyMixin, which triggers the entire chunk to be serialised to a single string rather than staying as an array (in format_stream).

I believe I tried upgrading the plugin to use the new API and remove the mixins, however I got other errors relating to not being able to iterate the chunk in the same way.

@cosmo0920
Copy link
Contributor

@adammw I've sent the PR to remove previous tricky msgpack unpacker code.
awslabs/aws-fluent-plugin-kinesis#160

Please check it?

@fujimotos
Copy link
Member

I'm poking around this issue a bit.

The root cause seems to be in the write_step_by_step() routine of
Fluent:Plugin:Buffer. This special routine is triggered when the size of
data exceeds chunk_limit_size. Simply put, what it does is to split
up the data records into smaller blocks.

The question to ask here is: what happens if a plugin pass a binary
opaque blob (something not easily dividable) to this routine?
This
actually occurs for a v0.12 plugin with a custom format_stream method.

Read Fluent::Compat::BufferedOutput on how it really occurs

This situation is not really handled well in the current code. The special
routine just tries to cut the binary data in an arbitrary position (!), so
data corruption is inevitable.

My current idea is to add something like below:

def write_once(metadata, data, format: nil, size: nil, &block)
  ...
  # We cannot split the data because it's an opaque binary blob.
  # This occurs only for v0.12 plugins with a custom `format_stream()`
  if not stored and data.is_a?(String)
    raise BufferChunkOverflowError, "The formatted data is #{data.size} bytes (> chunk_limit_size)"
  end

It does not fully fix the issue, but it should (at least) prevent Fluentd
from corrupting the input data, which seems to be a good first step.

@cosmo0920 @okkez
What do you think about this? If it seems to be ok, I'll submit a patch.

@cosmo0920
Copy link
Contributor

cosmo0920 commented Sep 18, 2018

This situation is not really handled well in the current code. The special
routine just tries to cut the binary data in an arbitrary position (!), so
data corruption is inevitable.

Hmm, it's hard to be well-handled.

It does not fully fix the issue, but it should (at least) prevent Fluentd
from corrupting the input data, which seems to be a good first step.

But, @fujimoto's idea sounds good to me.

@davidjmemmett
Copy link

I've been hitting this issue every so often. Have several forwarders which push logs into one centralised td-agent server (which then pushes logs to ElasticSearch). This morning the central server fell over with a series of #0 chunk bytes limit exceeds for an emitted event stream: 28940334bytes errors.

I've restarted td-agent several times but it keeps getting stuck. Let me know if there's any steps for debugging which might help.

@pbartoszek
Copy link

pbartoszek commented Jul 24, 2019

I have noticed that same issue in our system as well. I can reproduce the error by doing the following:

  1. Have some process generating log files that will be read by fluentd
  2. Run fluentd for some time so that position file is updated
  3. Stop fluentd for some time but keep generating new logs by source
    4 Start fluentd (fluentd will start catching up with the new files)
  4. In the logs I could find the following warnings:
14:20:30
2019-07-24 14:20:30 +0000 [warn]: #0 chunk bytes limit exceeds for an emitted event stream: 209002bytes

14:20:30
2019-07-24 14:20:30 +0000 [warn]: #0 got unrecoverable error in primary and no secondary error_class=NoMethodError error="undefined method `compact' for 52:Integer"

14:20:30
2019-07-24 14:20:30 +0000 [warn]: #0 bad chunk is moved to /tmp/fluent/backup/worker0/object_3fde3d40b180/58e6e02e9aff61653a9ee5c688ca5d3b.log

How it's possible that fluentd squeeze in more data in a chunk buffer (209002 bytes ~ 204k) than I configured ie 170k?

My configuration

<source>
@type tail
path /var/log/httpd/%Y/%m/%d/%H/*
refresh_interval 5
pos_file /var/log/td-agent/tail.pos
read_from_head true
read_lines_limit 500
tag kinesis
format none
</source>

<match kinesis**>

   aws_key_id  XXX
   aws_sec_key XXX
   @type kinesis_streams
   stream_name YYY
   region eu-west-1

   retries_on_batch_request 12

   <buffer>
        overflow_action block
        chunk_limit_size 170k
        queued_chunks_limit_size 256
        flush_interval 1
        flush_thread_interval 0.5
        flush_thread_burst_interval 0.01
        flush_thread_count 4
        retry_forever true
    </buffer>

</match>

Versions:

fluentd-1.2.6 ruby="2.4.4"
gem 'fluent-plugin-elasticsearch' version '2.11.11'
gem 'fluent-plugin-kafka' version '0.7.9'
gem 'fluent-plugin-kinesis' version '2.1.1'
gem 'fluent-plugin-record-modifier' version '1.1.0'
gem 'fluent-plugin-rewrite-tag-filter' version '2.1.0'
gem 'fluent-plugin-s3' version '1.1.6'
gem 'fluent-plugin-td' version '1.0.0'
gem 'fluent-plugin-td-monitoring' version '0.2.4'
gem 'fluent-plugin-webhdfs' version '1.2.3'
gem 'fluentd' version '1.2.6'

@github-actions
Copy link

This issue has been automatically marked as stale because it has been open 90 days with no activity. Remove stale label or comment or this issue will be closed in 30 days

@github-actions github-actions bot added the stale label Jan 26, 2021
@github-actions
Copy link

This issue was automatically closed because of stale in 30 days

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants