From c011256bec95ecae02b9f48154cc41e3192e2c1b Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Wed, 18 Apr 2018 13:47:49 +0900 Subject: [PATCH 1/4] output: Add backup feature for bad chunks Signed-off-by: Masahiro Nakagawa --- lib/fluent/env.rb | 1 + lib/fluent/plugin/output.rb | 49 +++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/lib/fluent/env.rb b/lib/fluent/env.rb index 4d704e8354..5fc0a40e00 100644 --- a/lib/fluent/env.rb +++ b/lib/fluent/env.rb @@ -20,6 +20,7 @@ module Fluent DEFAULT_CONFIG_PATH = ENV['FLUENT_CONF'] || '/etc/fluent/fluent.conf' DEFAULT_PLUGIN_DIR = ENV['FLUENT_PLUGIN'] || '/etc/fluent/plugin' DEFAULT_SOCKET_PATH = ENV['FLUENT_SOCKET'] || '/var/run/fluent/fluent.sock' + DEFAULT_BACKUP_DIR = ENV['FLUENT_BACKUP_DIR'] || '/tmp/fluent' DEFAULT_OJ_OPTIONS = {bigdecimal_load: :float, mode: :compat, use_to_json: true} def self.windows? diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 4bb9ac4619..ca570f5db4 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -14,6 +14,7 @@ # limitations under the License. # +require 'fluent/error' require 'fluent/plugin/base' require 'fluent/plugin_helper/record_accessor' require 'fluent/log' @@ -1056,6 +1057,8 @@ def next_flush_time end end + UNRECOVERABLE_ERRORS = [TypeError, ArgumentError, NoMethodError, Fluent::UnrecoverableError] + def try_flush chunk = @buffer.dequeue_chunk return unless chunk @@ -1100,6 +1103,37 @@ def try_flush commit_write(chunk_id, delayed: false, secondary: using_secondary) log.trace "done to commit a chunk", chunk: dump_chunk_id end + rescue *UNRECOVERABLE_ERRORS => e + if @secondary + if using_secondary + log.warn "got unrecoverable error in secondary.", error: e + backup_chunk(chunk, using_secondary, output.delayed_commit) + else + if (self.class == @secondary.class) + log.warn "got unrecoverable error in primary and secondary type is same as primary. Skip secondary", error: e + backup_chunk(chunk, using_secondary, output.delayed_commit) + else + # Call secondary output directly without retry update. + # In this case, delayed commit causes inconsistent state in dequeued chunks so async output in secondary is not allowed for now. + if @secondary.delayed_commit + log.warn "got unrecoverable error in primary and secondary is async output. Skip secondary for backup", error: e + backup_chunk(chunk, using_secondary, output.delayed_commit) + else + log.warn "got unrecoverable error in primary. Skip retry and flush chunk to secondary", error: e + begin + @secondary.write(chunk) + commit_write(chunk_id, delayed: output.delayed_commit, secondary: true) + rescue => e + log.warn "got an error in secondary for unrecoverable error", error: e + backup_chunk(chunk, using_secondary, output.delayed_commit) + end + end + end + end + else + log.warn "got unrecoverable error in primary and no secondary", error: e + backup_chunk(chunk, using_secondary, output.delayed_commit) + end rescue => e log.debug "taking back chunk for errors.", chunk: dump_unique_id_hex(chunk.unique_id) if output.delayed_commit @@ -1115,6 +1149,21 @@ def try_flush end end + def backup_chunk(chunk, using_secondary, delayed_commit) + unique_id = dump_unique_id_hex(chunk.unique_id) + safe_plugin_id = plugin_id.gsub(/[ \/\\:]/, '_') + backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR + backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_plugin_id, "#{unique_id}.log") + backup_dir = File.dirname(backup_file) + + log.warn "bad chunk is moved to #{backup_file}" + FileUtils.mkdir_p(backup_dir) unless Dir.exist?(backup_dir) + File.open(backup_file, 'ab', system_config.file_permission || 0644) { |f| + chunk.write_to(f) + } + commit_write(chunk.unique_id, secondary: using_secondary, delayed: delayed_commit) + end + def check_slow_flush(start) elapsed_time = Fluent::Clock.now - start if elapsed_time > @slow_flush_log_threshold From 960e1b1e7d2f4f85275707458ebf4e68705b6416 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Tue, 24 Apr 2018 20:54:41 +0900 Subject: [PATCH 2/4] Move Fluent::UnrecoverableError to first in error list Signed-off-by: Masahiro Nakagawa --- lib/fluent/plugin/output.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index ca570f5db4..04dbc4ed6a 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1057,7 +1057,7 @@ def next_flush_time end end - UNRECOVERABLE_ERRORS = [TypeError, ArgumentError, NoMethodError, Fluent::UnrecoverableError] + UNRECOVERABLE_ERRORS = [Fluent::UnrecoverableError, TypeError, ArgumentError, NoMethodError] def try_flush chunk = @buffer.dequeue_chunk From 1ea81f4bba44b85fd2879c029124c914baa56cfb Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Tue, 24 Apr 2018 23:15:42 +0900 Subject: [PATCH 3/4] Add more escape characters Signed-off-by: Masahiro Nakagawa --- lib/fluent/plugin/output.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 04dbc4ed6a..f2e3987012 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1151,7 +1151,7 @@ def try_flush def backup_chunk(chunk, using_secondary, delayed_commit) unique_id = dump_unique_id_hex(chunk.unique_id) - safe_plugin_id = plugin_id.gsub(/[ \/\\:]/, '_') + safe_plugin_id = plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_') backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_plugin_id, "#{unique_id}.log") backup_dir = File.dirname(backup_file) From ece1a75f17356c480caa8a4484a3b3002143c787 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Sun, 29 Apr 2018 22:49:41 +0900 Subject: [PATCH 4/4] Add test for backup Signed-off-by: Masahiro Nakagawa --- test/plugin/test_output_as_buffered_backup.rb | 271 ++++++++++++++++++ 1 file changed, 271 insertions(+) create mode 100644 test/plugin/test_output_as_buffered_backup.rb diff --git a/test/plugin/test_output_as_buffered_backup.rb b/test/plugin/test_output_as_buffered_backup.rb new file mode 100644 index 0000000000..885ad35041 --- /dev/null +++ b/test/plugin/test_output_as_buffered_backup.rb @@ -0,0 +1,271 @@ +require_relative '../helper' +require 'fluent/plugin/output' +require 'fluent/plugin/buffer' +require 'fluent/event' +require 'fluent/error' + +require 'json' +require 'time' +require 'timeout' +require 'timecop' + + +class BufferedOutputBackupTest < Test::Unit::TestCase + class BareOutput < Fluent::Plugin::Output + def register(name, &block) + instance_variable_set("@#{name}", block) + end + end + class DummyOutput < BareOutput + def initialize + super + @process = nil + @format = nil + @write = nil + @try_write = nil + end + def prefer_buffered_processing + true + end + def prefer_delayed_commit + false + end + def process(tag, es) + @process ? @process.call(tag, es) : nil + end + def format(tag, time, record) + [tag, time.to_i, record].to_json + "\n" + end + def write(chunk) + @write ? @write.call(chunk) : nil + end + def try_write(chunk) + @try_write ? @try_write.call(chunk) : nil + end + end + class DummyOutputForSecondary < BareOutput + def initialize + super + @process = nil + @format = nil + @write = nil + @try_write = nil + end + def prefer_buffered_processing + true + end + def prefer_delayed_commit + false + end + def process(tag, es) + @process ? @process.call(tag, es) : nil + end + def format(tag, time, record) + [tag, time.to_i, record].to_json + "\n" + end + def write(chunk) + @write ? @write.call(chunk) : nil + end + def try_write(chunk) + @try_write ? @try_write.call(chunk) : nil + end + end + class DummyAsyncOutputForSecondary < BareOutput + def initialize + super + @process = nil + @format = nil + @write = nil + @try_write = nil + end + def prefer_buffered_processing + true + end + def prefer_delayed_commit + true + end + def process(tag, es) + @process ? @process.call(tag, es) : nil + end + def format(tag, time, record) + [tag, time.to_i, record].to_json + "\n" + end + def write(chunk) + @write ? @write.call(chunk) : nil + end + def try_write(chunk) + @try_write ? @try_write.call(chunk) : nil + end + end + + TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/bu#{ENV['TEST_ENV_NUMBER']}") + + def create_output + DummyOutput.new + end + def create_metadata(timekey: nil, tag: nil, variables: nil) + Fluent::Plugin::Buffer::Metadata.new(timekey, tag, variables) + end + def waiting(seconds) + begin + Timeout.timeout(seconds) do + yield + end + rescue Timeout::Error + STDERR.print(*@i.log.out.logs) + raise + end + end + + def dummy_event_stream + Fluent::ArrayEventStream.new([ + [ event_time('2016-04-13 18:33:00'), {"name" => "moris", "age" => 36, "message" => "data1"} ], + [ event_time('2016-04-13 18:33:13'), {"name" => "moris", "age" => 36, "message" => "data2"} ], + [ event_time('2016-04-13 18:33:32'), {"name" => "moris", "age" => 36, "message" => "data3"} ], + ]) + end + + setup do + @i = create_output + FileUtils.rm_rf(TMP_DIR) + FileUtils.mkdir_p(TMP_DIR) + + Fluent::Plugin.register_output('backup_output', DummyOutput) + Fluent::Plugin.register_output('backup_output2', DummyOutputForSecondary) + Fluent::Plugin.register_output('backup_async_output', DummyAsyncOutputForSecondary) + end + + teardown do + if @i + @i.stop unless @i.stopped? + @i.before_shutdown unless @i.before_shutdown? + @i.shutdown unless @i.shutdown? + @i.after_shutdown unless @i.after_shutdown? + @i.close unless @i.closed? + @i.terminate unless @i.terminated? + end + Timecop.return + end + + sub_test_case 'buffered output for broken chunks' do + def flush_chunks + @i.start + @i.after_start + + @i.interrupt_flushes + + now = Time.parse('2016-04-13 18:33:30 -0700') + Timecop.freeze(now) + @i.emit_events("test.tag.1", dummy_event_stream()) + now = Time.parse('2016-04-13 18:33:32 -0700') + Timecop.freeze(now) + + @i.enqueue_thread_wait + @i.flush_thread_wakeup + waiting(4) { Thread.pass until @i.write_count > 0 } + + assert { @i.write_count > 0 } + Timecop.freeze(now) + @i.flush_thread_wakeup + end + + test 'backup chunk without secondary' do + Fluent::SystemConfig.overwrite_system_config('root_dir' => TMP_DIR) do + id = 'backup_test' + hash = { + 'flush_interval' => 1, + 'flush_thread_burst_interval' => 0.1, + } + chunk_id = nil + @i.configure(config_element('ROOT', '', {'@id' => id}, [config_element('buffer', 'tag', hash)])) + @i.register(:write) { |chunk| + chunk_id = chunk.unique_id; + raise Fluent::UnrecoverableError, "yay, your #write must fail" + } + + flush_chunks + + target = "#{TMP_DIR}/backup/worker0/#{id}/#{@i.dump_unique_id_hex(chunk_id)}.log" + assert_true File.exist?(target) + logs = @i.log.out.logs + assert { logs.any? { |l| l.include?("got unrecoverable error in primary and no secondary") } } + end + end + + test 'backup chunk with same type secondary' do + Fluent::SystemConfig.overwrite_system_config('root_dir' => TMP_DIR) do + id = 'backup_test_with_same_secondary' + hash = { + 'flush_interval' => 1, + 'flush_thread_burst_interval' => 0.1, + } + chunk_id = nil + secconf = config_element('secondary','',{'@type' => 'backup_output'}) + @i.configure(config_element('ROOT', '', {'@id' => id}, [config_element('buffer', 'tag', hash), secconf])) + @i.register(:write) { |chunk| + chunk_id = chunk.unique_id; + raise Fluent::UnrecoverableError, "yay, your #write must fail" + } + + flush_chunks + + target = "#{TMP_DIR}/backup/worker0/#{id}/#{@i.dump_unique_id_hex(chunk_id)}.log" + assert_true File.exist?(target) + logs = @i.log.out.logs + assert { logs.any? { |l| l.include?("got unrecoverable error in primary and secondary type is same as primary") } } + end + end + + test 'backup chunk with different type secondary' do + Fluent::SystemConfig.overwrite_system_config('root_dir' => TMP_DIR) do + id = 'backup_test_with_diff_secondary' + hash = { + 'flush_interval' => 1, + 'flush_thread_burst_interval' => 0.1, + } + chunk_id = nil + secconf = config_element('secondary','',{'@type' => 'backup_output2'}) + @i.configure(config_element('ROOT', '', {'@id' => id}, [config_element('buffer', 'tag', hash), secconf])) + @i.register(:write) { |chunk| + chunk_id = chunk.unique_id; + raise Fluent::UnrecoverableError, "yay, your #write must fail" + } + @i.secondary.register(:write) { |chunk| + raise Fluent::UnrecoverableError, "yay, your secondary #write must fail" + } + + flush_chunks + + target = "#{TMP_DIR}/backup/worker0/#{id}/#{@i.dump_unique_id_hex(chunk_id)}.log" + assert_true File.exist?(target) + logs = @i.log.out.logs + assert { logs.any? { |l| l.include?("got unrecoverable error in primary. Skip retry and flush chunk to secondary") } } + assert { logs.any? { |l| l.include?("got an error in secondary for unrecoverable error") } } + end + end + + test 'backup chunk with async secondary' do + Fluent::SystemConfig.overwrite_system_config('root_dir' => TMP_DIR) do + id = 'backup_test_with_diff_secondary' + hash = { + 'flush_interval' => 1, + 'flush_thread_burst_interval' => 0.1, + } + chunk_id = nil + secconf = config_element('secondary','',{'@type' => 'backup_async_output'}) + @i.configure(config_element('ROOT', '', {'@id' => id}, [config_element('buffer', 'tag', hash), secconf])) + @i.register(:write) { |chunk| + chunk_id = chunk.unique_id; + raise Fluent::UnrecoverableError, "yay, your #write must fail" + } + + flush_chunks + + target = "#{TMP_DIR}/backup/worker0/#{id}/#{@i.dump_unique_id_hex(chunk_id)}.log" + assert_true File.exist?(target) + logs = @i.log.out.logs + assert { logs.any? { |l| l.include?("got unrecoverable error in primary and secondary is async output") } } + end + end + end +end