Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Large message overflow logic incorrect #1849

Closed
amread opened this issue Feb 7, 2018 · 11 comments · Fixed by #3560
Closed

Large message overflow logic incorrect #1849

amread opened this issue Feb 7, 2018 · 11 comments · Fixed by #3560
Assignees
Labels

Comments

@amread
Copy link

amread commented Feb 7, 2018

Check CONTRIBUTING guideline first and here is the list to help us investigate the problem.

  • fluentd or td-agent version. v1.0.2 (also seems to be a problem on master right now)

I have a use case that allows for very log messages. While load testing using 2m buffers, I noticed that if I try to quickly jam these messages through back-to-back, the system will complain:

a 524596bytes record is larger than buffer chunk limit size

Looking at the code, I see the test for a single record overflowing the current chunk in write_step_by_step:

                  if split.size == 1 && original_bytesize == 0
                    big_record_size = format ? format.call(split).bytesize : split.first.bytesize
                    raise BufferChunkOverflowError, "a #{big_record_size}bytes record is larger than buffer chunk limit size"
                  end

Since original_bytesize is set outside of the inner loop, it does not detect if the current split is the first chunk. The fix to this is straightforward enough, but the retry logic isn't clever enough to do something different the next time.

Ideally, we would know ahead of time that the message would overflow the buffer and just not write it. I imagine the code does not take the approach for the sake of efficiency. It would be nice if there was something like the commit and rollback functionality for use while constructing the chunk, perhaps mark() and rewindToMark() functions, or maybe even restructure append and concat to take blocks and allow for rewinding on failure.

I took a stab at some minimal change, but found that I don't have a full appreciation for the nuance of this function's existing behavior, given the various assertions that failed in my attempts to fix this bug. It does seem that the underlying assumption of the current code is that records are small in comparison to the buffer so that the chunk_full_threshold will always be crossed while filling a chunk. I can probably work around this issue by either increasing my buffer size, or changing the chunk_full_threshold.

Here is a test that shows the problem adding one message that fills half the buffer, and then another that goes one byte over (added to 'with configuration for test with lower limits` sub_test_case):

    test '#write does not raise BufferOverflowError when chunk is not empty' do
      m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i)

      assert_nothing_raised Fluent::Plugin::Buffer::BufferOverflowError do
        @p.write({m => ["a" * 512, "b" * 513 ]})
      end
    end

Commit with new test: amread@d0653aa

@repeatedly repeatedly self-assigned this Feb 9, 2018
@repeatedly repeatedly added the v1 label Feb 9, 2018
@TheCoderGuyVlad
Copy link

Hi @amread / @repeatedly
I believe we are seeing the same thing. What is your recommendation: to increase the buffer size or to wait for a fix/workaround?
Thank you

@repeatedly
Copy link
Member

@TheCoderGuyVlad If you don't have a problem, set larger chunk_limit_size is better.
Changing buffer internal need some design consideration so it needs more time.

@amread
Copy link
Author

amread commented Mar 15, 2018

If you know the largest message size, you can adjust the chunk_limit_size or chunk_full_threshold to meet your needs (I ended up raising my chunk size and lowering the threshold slightly). I added message truncation to our pipeline to make sure no message exceeds the maximum message size.

You want to choose values for these such that when a chunk is one byte less than the threshold if a max sized message comes next it will not overflow the buffer.

@TheCoderGuyVlad
Copy link

Thanks @repeatedly and @amread for your inputs! Appreciate it!

@dennyac
Copy link

dennyac commented Mar 25, 2020

Ran into this issue and @amread 's suggestion worked for us.

Can we call this out in the fluentd docs till it is resolved? Took us a while to figure out that fluentd was dropping messages in our data pipeline.

@chris-heath
Copy link

We're having the same issue while using the fluent-plugin-kafka input group. Is this something that will be addressed at some point?

@lkoniecki
Copy link

I'm experiencing the same issue...

2021-05-26 19:29:59 +0000 [error]: #0 [input_http] failed to emit data error_class=Fluent::Plugin::Buffer::BufferChunkOverflowError error="a 29796bytes record is larger than buffer chunk limit size"

@davide-bolcioni
Copy link

I see this

{"time":"2021-09-11 21:24:42 +0000","level":"error","message":"[forward] unexpected error on reading data host=\"10.4.211.132\" port=49146 error_class=Fluent::Plugin::Buffer::BufferChunkOverflowError error=\"a 75262bytes record is larger than buffer chunk limit size\"","worker_id":3}

with the following configuration

<buffer []>
            flush_mode             interval
            flush_interval         30s
            flush_thread_count     2
            retry_max_interval     180s
            retry_wait             2s
            retry_timeout          96h
            chunk_limit_size       300K
            chunk_limit_records    1000000  # A large number
            delayed_commit_timeout 150s
            overflow_action        throw_exception
          </buffer>

and 75262 < 300K. From the above, it also depends on how full the chunk is when the 75262 come in, correct ?

When using the rdkafka2 output plugin, is it still the case that the chunk_size_limit must be smaller than the Kafka batch size limit ?

@davide-bolcioni
Copy link

Seeing the above on fluentd 1.14.

@tatsu-yam
Copy link
Contributor

tatsu-yam commented Nov 15, 2021

I also encountered this problem.
The bad thing about this problem is that if the size of the argument data of Fluent::Plugin::Buffer#write_once exceeds the chunk_limit_size, all the data will be thrown away. Even if each record does not exceed the chunk_limit_size.

In my case, I am using Amazon kinesis. So, I wanted to keep the chunk_limit_size small. That's why I ran into this problem. Fortunately, the fluent-plugin-kinesis was able to increase the chunk_limit_size because it transfers the chunk while splitting it to fit kinesis. However, since it is difficult to control the speed at which input plugin reads the data, the possibility of missing logs still remains.

Fluent::Plugin::Buffer#write_step_by_step is trying to split the argument data and add it to the chunk, but it does not seem to be working here.

if chunk_size_over?(chunk) # split size is larger than difference between size_full? and size_over?
chunk.rollback
if split.size == 1 && original_bytesize == 0
big_record_size = format ? format.call(split).bytesize : split.first.bytesize
raise BufferChunkOverflowError, "a #{big_record_size}bytes record is larger than buffer chunk limit size"
end
if chunk_size_full?(chunk) || split.size == 1
enqueue_chunk_before_retry = true
else
splits_count *= 10
end
raise ShouldRetry
end

If chunk size exceeds chunk_limit_size, but record size does not exceed chunk_limit_size, then Fluent::Plugin::Buffer#write_once or Fluent::Plugin::Buffer#write_step_by_step should create a new chunk and write to it.

I was able to reproduce this with the following settings.

$ gem list |grep -e fluentd -e fluent-plugin-kinesis
fluent-plugin-kinesis (3.4.2)
fluentd (1.14.2)

$ cat test.log
{"msg":"aaaaaaaaaa"}
{"msg":"bbbbbbbbbb"}
{"msg":"cccccccccc"}

$ cat test.conf
<source>
  @type tail
  read_from_head true
  path /path/to/test.log

  tag tail.log
  <parse>
    @type json
  </parse>
</source>

<match tail.log>
  @type kinesis_streams_aggregated
  stream_name  test-bucket
  <buffer>
    @type file
    path /path/to/buffer/kinesis
    chunk_limit_size 50
    flush_interval 1s
  </buffer>
</match>

You will get the following error and nothing in the buffer.

emit transaction failed: error_class=Fluent::Plugin::Buffer::BufferChunkOverflowError error="a 23bytes record is larger than buffer chunk limit size" location="xxx/fluentd-1.14.2/lib/fluent/plugin/buffer.rb:775:in `block in write_step_by_step'" tag="tail.log"

@tatsu-yam
Copy link
Contributor

Thank you for responding !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants