Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use flush_thread_count value for queued_chunks_limit_size when queued_chunks_limit_size is not specified #2173

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
end

def queue_full?
@queued_chunks_limit_size && (synchronize { @queue.size } >= @queued_chunks_limit_size)
synchronize { @queue.size } >= @queued_chunks_limit_size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buffer test complains @queued_chunks_limit_sise is nil and not to be able to compare.
Could you check it?
https://travis-ci.org/fluent/fluentd/jobs/452714881#L547

end

def queued_records
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ def configure(conf)
log.warn "'flush_interval' is ignored because default 'flush_mode' is not 'interval': '#{@flush_mode}'"
end
end

if @buffer.queued_chunks_limit_size.nil?
@buffer.queued_chunks_limit_size = @buffer_config.flush_thread_count
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@i.buffer.queue.size always seems to be assumed 1 in test cases.
Could you check it?
https://travis-ci.org/fluent/fluentd/jobs/452714881#L654

end
end

if @secondary_config
Expand Down
2 changes: 1 addition & 1 deletion test/plugin/test_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def create_chunk_es(metadata, es)

sub_test_case 'with default configuration and dummy implementation' do
setup do
@p = create_buffer({})
@p = create_buffer({'queued_chunks_limit_size' => 100})
@dm0 = create_metadata(Time.parse('2016-04-11 16:00:00 +0000').to_i, nil, nil)
@dm1 = create_metadata(Time.parse('2016-04-11 16:10:00 +0000').to_i, nil, nil)
@dm2 = create_metadata(Time.parse('2016-04-11 16:20:00 +0000').to_i, nil, nil)
Expand Down
19 changes: 19 additions & 0 deletions test/plugin/test_output_as_buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,24 @@ def waiting(seconds)
Timecop.return
end

test 'queued_chunks_limit_size is same as flush_thread_count by default' do
hash = {'flush_thread_count' => 4}
i = create_output
i.register(:prefer_buffered_processing) { true }
i.configure(config_element('ROOT', '', {}, [config_element('buffer','tag',hash)]))

assert_equal 4, i.buffer.queued_chunks_limit_size
end

test 'prefer queued_chunks_limit_size parameter than flush_thread_count' do
hash = {'flush_thread_count' => 4, 'queued_chunks_limit_size' => 2}
i = create_output
i.register(:prefer_buffered_processing) { true }
i.configure(config_element('ROOT', '', {}, [config_element('buffer','tag',hash)]))

assert_equal 2, i.buffer.queued_chunks_limit_size
end

sub_test_case 'chunk feature in #write for output plugins' do
setup do
@stored_global_logger = $log
Expand Down Expand Up @@ -1064,6 +1082,7 @@ def waiting(seconds)
'flush_thread_count' => 1,
'flush_thread_burst_interval' => 0.1,
'chunk_limit_size' => 1024,
'queued_chunks_limit_size' => 100
}
@i = create_output(:buffered)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
Expand Down
8 changes: 8 additions & 0 deletions test/plugin/test_output_as_buffered_retries.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def get_log_time(msg, logs)
'flush_interval' => 1,
'flush_thread_burst_interval' => 0.1,
'retry_randomize' => false,
'queued_chunks_limit_size' => 100
}
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.register(:prefer_buffered_processing){ true }
Expand Down Expand Up @@ -252,6 +253,7 @@ def get_log_time(msg, logs)
'flush_thread_burst_interval' => 0.1,
'retry_randomize' => false,
'retry_timeout' => 3600,
'queued_chunks_limit_size' => 100
}
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.register(:prefer_buffered_processing){ true }
Expand Down Expand Up @@ -342,6 +344,7 @@ def get_log_time(msg, logs)
'flush_thread_burst_interval' => 0.1,
'retry_randomize' => false,
'retry_max_times' => 10,
'queued_chunks_limit_size' => 100
}
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.register(:prefer_buffered_processing){ true }
Expand Down Expand Up @@ -485,6 +488,7 @@ def get_log_time(msg, logs)
'retry_type' => :periodic,
'retry_wait' => 3,
'retry_randomize' => false,
'queued_chunks_limit_size' => 100
}
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.register(:prefer_buffered_processing){ true }
Expand Down Expand Up @@ -531,6 +535,7 @@ def get_log_time(msg, logs)
'retry_wait' => 30,
'retry_randomize' => false,
'retry_timeout' => 120,
'queued_chunks_limit_size' => 100
}
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.register(:prefer_buffered_processing){ true }
Expand Down Expand Up @@ -626,6 +631,7 @@ def get_log_time(msg, logs)
'retry_wait' => 3,
'retry_randomize' => false,
'retry_max_times' => 10,
'queued_chunks_limit_size' => 100
}
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.register(:prefer_buffered_processing){ true }
Expand Down Expand Up @@ -724,6 +730,7 @@ def get_log_time(msg, logs)
'retry_randomize' => false,
'retry_timeout' => 3600,
'retry_max_times' => 10,
'queued_chunks_limit_size' => 100
}
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.register(:prefer_buffered_processing){ true }
Expand Down Expand Up @@ -795,6 +802,7 @@ def get_log_time(msg, logs)
'retry_wait' => 30,
'retry_timeout' => 360,
'retry_max_times' => 10,
'queued_chunks_limit_size' => 100
}
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.register(:prefer_buffered_processing){ true }
Expand Down
2 changes: 1 addition & 1 deletion test/plugin/test_output_as_buffered_secondary.rb
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ def dummy_event_stream
test 'secondary plugin can do delayed commit even if primary does not do it, and non-committed chunks will be rollbacked by primary' do
written = []
chunks = []
priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :periodic, 'retry_wait' => 3, 'retry_timeout' => 60, 'delayed_commit_timeout' => 2, 'retry_randomize' => false})
priconf = config_element('buffer','tag',{'flush_interval' => 1, 'retry_type' => :periodic, 'retry_wait' => 3, 'retry_timeout' => 60, 'delayed_commit_timeout' => 2, 'retry_randomize' => false, 'queued_chunks_limit_size' => 10})
secconf = config_element('secondary','',{'@type' => 'output_secondary_test2'})
@i.configure(config_element('ROOT','',{},[priconf,secconf]))
@i.register(:prefer_buffered_processing){ true }
Expand Down