Skip to content

Commit

Permalink
Merge pull request #1952 from fluent/backup-for-chunks
Browse files Browse the repository at this point in the history
output: Add backup feature for bad chunks
  • Loading branch information
repeatedly authored Apr 29, 2018
2 parents 5898965 + ece1a75 commit c765030
Show file tree
Hide file tree
Showing 3 changed files with 321 additions and 0 deletions.
1 change: 1 addition & 0 deletions lib/fluent/env.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ module Fluent
DEFAULT_CONFIG_PATH = ENV['FLUENT_CONF'] || '/etc/fluent/fluent.conf'
DEFAULT_PLUGIN_DIR = ENV['FLUENT_PLUGIN'] || '/etc/fluent/plugin'
DEFAULT_SOCKET_PATH = ENV['FLUENT_SOCKET'] || '/var/run/fluent/fluent.sock'
DEFAULT_BACKUP_DIR = ENV['FLUENT_BACKUP_DIR'] || '/tmp/fluent'
DEFAULT_OJ_OPTIONS = {bigdecimal_load: :float, mode: :compat, use_to_json: true}

def self.windows?
Expand Down
49 changes: 49 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
#

require 'fluent/error'
require 'fluent/plugin/base'
require 'fluent/plugin_helper/record_accessor'
require 'fluent/log'
Expand Down Expand Up @@ -1056,6 +1057,8 @@ def next_flush_time
end
end

UNRECOVERABLE_ERRORS = [Fluent::UnrecoverableError, TypeError, ArgumentError, NoMethodError]

def try_flush
chunk = @buffer.dequeue_chunk
return unless chunk
Expand Down Expand Up @@ -1100,6 +1103,37 @@ def try_flush
commit_write(chunk_id, delayed: false, secondary: using_secondary)
log.trace "done to commit a chunk", chunk: dump_chunk_id
end
rescue *UNRECOVERABLE_ERRORS => e
if @secondary
if using_secondary
log.warn "got unrecoverable error in secondary.", error: e
backup_chunk(chunk, using_secondary, output.delayed_commit)
else
if (self.class == @secondary.class)
log.warn "got unrecoverable error in primary and secondary type is same as primary. Skip secondary", error: e
backup_chunk(chunk, using_secondary, output.delayed_commit)
else
# Call secondary output directly without retry update.
# In this case, delayed commit causes inconsistent state in dequeued chunks so async output in secondary is not allowed for now.
if @secondary.delayed_commit
log.warn "got unrecoverable error in primary and secondary is async output. Skip secondary for backup", error: e
backup_chunk(chunk, using_secondary, output.delayed_commit)
else
log.warn "got unrecoverable error in primary. Skip retry and flush chunk to secondary", error: e
begin
@secondary.write(chunk)
commit_write(chunk_id, delayed: output.delayed_commit, secondary: true)
rescue => e
log.warn "got an error in secondary for unrecoverable error", error: e
backup_chunk(chunk, using_secondary, output.delayed_commit)
end
end
end
end
else
log.warn "got unrecoverable error in primary and no secondary", error: e
backup_chunk(chunk, using_secondary, output.delayed_commit)
end
rescue => e
log.debug "taking back chunk for errors.", chunk: dump_unique_id_hex(chunk.unique_id)
if output.delayed_commit
Expand All @@ -1115,6 +1149,21 @@ def try_flush
end
end

def backup_chunk(chunk, using_secondary, delayed_commit)
unique_id = dump_unique_id_hex(chunk.unique_id)
safe_plugin_id = plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_')
backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR
backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_plugin_id, "#{unique_id}.log")
backup_dir = File.dirname(backup_file)

log.warn "bad chunk is moved to #{backup_file}"
FileUtils.mkdir_p(backup_dir) unless Dir.exist?(backup_dir)
File.open(backup_file, 'ab', system_config.file_permission || 0644) { |f|
chunk.write_to(f)
}
commit_write(chunk.unique_id, secondary: using_secondary, delayed: delayed_commit)
end

def check_slow_flush(start)
elapsed_time = Fluent::Clock.now - start
if elapsed_time > @slow_flush_log_threshold
Expand Down
271 changes: 271 additions & 0 deletions test/plugin/test_output_as_buffered_backup.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
require_relative '../helper'
require 'fluent/plugin/output'
require 'fluent/plugin/buffer'
require 'fluent/event'
require 'fluent/error'

require 'json'
require 'time'
require 'timeout'
require 'timecop'


class BufferedOutputBackupTest < Test::Unit::TestCase
class BareOutput < Fluent::Plugin::Output
def register(name, &block)
instance_variable_set("@#{name}", block)
end
end
class DummyOutput < BareOutput
def initialize
super
@process = nil
@format = nil
@write = nil
@try_write = nil
end
def prefer_buffered_processing
true
end
def prefer_delayed_commit
false
end
def process(tag, es)
@process ? @process.call(tag, es) : nil
end
def format(tag, time, record)
[tag, time.to_i, record].to_json + "\n"
end
def write(chunk)
@write ? @write.call(chunk) : nil
end
def try_write(chunk)
@try_write ? @try_write.call(chunk) : nil
end
end
class DummyOutputForSecondary < BareOutput
def initialize
super
@process = nil
@format = nil
@write = nil
@try_write = nil
end
def prefer_buffered_processing
true
end
def prefer_delayed_commit
false
end
def process(tag, es)
@process ? @process.call(tag, es) : nil
end
def format(tag, time, record)
[tag, time.to_i, record].to_json + "\n"
end
def write(chunk)
@write ? @write.call(chunk) : nil
end
def try_write(chunk)
@try_write ? @try_write.call(chunk) : nil
end
end
class DummyAsyncOutputForSecondary < BareOutput
def initialize
super
@process = nil
@format = nil
@write = nil
@try_write = nil
end
def prefer_buffered_processing
true
end
def prefer_delayed_commit
true
end
def process(tag, es)
@process ? @process.call(tag, es) : nil
end
def format(tag, time, record)
[tag, time.to_i, record].to_json + "\n"
end
def write(chunk)
@write ? @write.call(chunk) : nil
end
def try_write(chunk)
@try_write ? @try_write.call(chunk) : nil
end
end

TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/bu#{ENV['TEST_ENV_NUMBER']}")

def create_output
DummyOutput.new
end
def create_metadata(timekey: nil, tag: nil, variables: nil)
Fluent::Plugin::Buffer::Metadata.new(timekey, tag, variables)
end
def waiting(seconds)
begin
Timeout.timeout(seconds) do
yield
end
rescue Timeout::Error
STDERR.print(*@i.log.out.logs)
raise
end
end

def dummy_event_stream
Fluent::ArrayEventStream.new([
[ event_time('2016-04-13 18:33:00'), {"name" => "moris", "age" => 36, "message" => "data1"} ],
[ event_time('2016-04-13 18:33:13'), {"name" => "moris", "age" => 36, "message" => "data2"} ],
[ event_time('2016-04-13 18:33:32'), {"name" => "moris", "age" => 36, "message" => "data3"} ],
])
end

setup do
@i = create_output
FileUtils.rm_rf(TMP_DIR)
FileUtils.mkdir_p(TMP_DIR)

Fluent::Plugin.register_output('backup_output', DummyOutput)
Fluent::Plugin.register_output('backup_output2', DummyOutputForSecondary)
Fluent::Plugin.register_output('backup_async_output', DummyAsyncOutputForSecondary)
end

teardown do
if @i
@i.stop unless @i.stopped?
@i.before_shutdown unless @i.before_shutdown?
@i.shutdown unless @i.shutdown?
@i.after_shutdown unless @i.after_shutdown?
@i.close unless @i.closed?
@i.terminate unless @i.terminated?
end
Timecop.return
end

sub_test_case 'buffered output for broken chunks' do
def flush_chunks
@i.start
@i.after_start

@i.interrupt_flushes

now = Time.parse('2016-04-13 18:33:30 -0700')
Timecop.freeze(now)
@i.emit_events("test.tag.1", dummy_event_stream())
now = Time.parse('2016-04-13 18:33:32 -0700')
Timecop.freeze(now)

@i.enqueue_thread_wait
@i.flush_thread_wakeup
waiting(4) { Thread.pass until @i.write_count > 0 }

assert { @i.write_count > 0 }
Timecop.freeze(now)
@i.flush_thread_wakeup
end

test 'backup chunk without secondary' do
Fluent::SystemConfig.overwrite_system_config('root_dir' => TMP_DIR) do
id = 'backup_test'
hash = {
'flush_interval' => 1,
'flush_thread_burst_interval' => 0.1,
}
chunk_id = nil
@i.configure(config_element('ROOT', '', {'@id' => id}, [config_element('buffer', 'tag', hash)]))
@i.register(:write) { |chunk|
chunk_id = chunk.unique_id;
raise Fluent::UnrecoverableError, "yay, your #write must fail"
}

flush_chunks

target = "#{TMP_DIR}/backup/worker0/#{id}/#{@i.dump_unique_id_hex(chunk_id)}.log"
assert_true File.exist?(target)
logs = @i.log.out.logs
assert { logs.any? { |l| l.include?("got unrecoverable error in primary and no secondary") } }
end
end

test 'backup chunk with same type secondary' do
Fluent::SystemConfig.overwrite_system_config('root_dir' => TMP_DIR) do
id = 'backup_test_with_same_secondary'
hash = {
'flush_interval' => 1,
'flush_thread_burst_interval' => 0.1,
}
chunk_id = nil
secconf = config_element('secondary','',{'@type' => 'backup_output'})
@i.configure(config_element('ROOT', '', {'@id' => id}, [config_element('buffer', 'tag', hash), secconf]))
@i.register(:write) { |chunk|
chunk_id = chunk.unique_id;
raise Fluent::UnrecoverableError, "yay, your #write must fail"
}

flush_chunks

target = "#{TMP_DIR}/backup/worker0/#{id}/#{@i.dump_unique_id_hex(chunk_id)}.log"
assert_true File.exist?(target)
logs = @i.log.out.logs
assert { logs.any? { |l| l.include?("got unrecoverable error in primary and secondary type is same as primary") } }
end
end

test 'backup chunk with different type secondary' do
Fluent::SystemConfig.overwrite_system_config('root_dir' => TMP_DIR) do
id = 'backup_test_with_diff_secondary'
hash = {
'flush_interval' => 1,
'flush_thread_burst_interval' => 0.1,
}
chunk_id = nil
secconf = config_element('secondary','',{'@type' => 'backup_output2'})
@i.configure(config_element('ROOT', '', {'@id' => id}, [config_element('buffer', 'tag', hash), secconf]))
@i.register(:write) { |chunk|
chunk_id = chunk.unique_id;
raise Fluent::UnrecoverableError, "yay, your #write must fail"
}
@i.secondary.register(:write) { |chunk|
raise Fluent::UnrecoverableError, "yay, your secondary #write must fail"
}

flush_chunks

target = "#{TMP_DIR}/backup/worker0/#{id}/#{@i.dump_unique_id_hex(chunk_id)}.log"
assert_true File.exist?(target)
logs = @i.log.out.logs
assert { logs.any? { |l| l.include?("got unrecoverable error in primary. Skip retry and flush chunk to secondary") } }
assert { logs.any? { |l| l.include?("got an error in secondary for unrecoverable error") } }
end
end

test 'backup chunk with async secondary' do
Fluent::SystemConfig.overwrite_system_config('root_dir' => TMP_DIR) do
id = 'backup_test_with_diff_secondary'
hash = {
'flush_interval' => 1,
'flush_thread_burst_interval' => 0.1,
}
chunk_id = nil
secconf = config_element('secondary','',{'@type' => 'backup_async_output'})
@i.configure(config_element('ROOT', '', {'@id' => id}, [config_element('buffer', 'tag', hash), secconf]))
@i.register(:write) { |chunk|
chunk_id = chunk.unique_id;
raise Fluent::UnrecoverableError, "yay, your #write must fail"
}

flush_chunks

target = "#{TMP_DIR}/backup/worker0/#{id}/#{@i.dump_unique_id_hex(chunk_id)}.log"
assert_true File.exist?(target)
logs = @i.log.out.logs
assert { logs.any? { |l| l.include?("got unrecoverable error in primary and secondary is async output") } }
end
end
end
end

0 comments on commit c765030

Please sign in to comment.