Skip to content

Commit

Permalink
Merge pull request #1256 from fluent/plugin-development-enhancement
Browse files Browse the repository at this point in the history
Plugin development enhancement
  • Loading branch information
tagomoris authored Oct 6, 2016
2 parents 5fc798c + b795e41 commit 36b547d
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 0 deletions.
3 changes: 3 additions & 0 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ class Base

State = Struct.new(:configure, :start, :after_start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate)

attr_accessor :under_plugin_development

def initialize
super
@_state = State.new(false, false, false, false, false, false, false, false, false)
@under_plugin_development = false
end

def has_router?
Expand Down
7 changes: 7 additions & 0 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ def metadata_list
end
end

# it's too dangerous, and use it so carefully to remove metadata for tests
def metadata_list_clear!
synchronize do
@metadata_list.clear
end
end

def new_metadata(timekey: nil, tag: nil, variables: nil)
Metadata.new(timekey, tag, variables)
end
Expand Down
12 changes: 12 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,13 @@ def metadata(tag, time, record)
end
end

def metadata_for_test(tag, time, record)
raise "BUG: #metadata_for_test is available only when no actual metadata exists" unless @buffer.metadata_list.empty?
m = metadata(tag, time, record)
@buffer.metadata_list_clear!
m
end

def execute_chunking(tag, es, enqueue: false)
if @simple_chunking
handle_stream_simple(tag, es, enqueue: enqueue)
Expand Down Expand Up @@ -976,6 +983,10 @@ def try_flush
log.debug "taking back chunk for errors.", plugin_id: plugin_id, chunk: dump_unique_id_hex(chunk.unique_id)
@buffer.takeback_chunk(chunk.unique_id)

if @under_plugin_development
raise
end

@retry_mutex.synchronize do
if @retry
@counters_monitor.synchronize{ @num_errors += 1 }
Expand Down Expand Up @@ -1120,6 +1131,7 @@ def enqueue_thread_run
@buffer.enqueue_all{ |metadata, chunk| metadata.timekey < current_timekey && metadata.timekey + timekey_unit + timekey_wait <= now_int }
end
rescue => e
raise if @under_plugin_development
log.error "unexpected error while checking flushed chunks. ignored.", plugin_id: plugin_id, error_class: e.class, error: e
log.error_backtrace
end
Expand Down
1 change: 1 addition & 0 deletions lib/fluent/test/driver/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def initialize(klass, opts: {}, &block)
else
@instance = klass
end
@instance.under_plugin_development = true

@logs = []

Expand Down
3 changes: 3 additions & 0 deletions lib/fluent/test/driver/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
require 'fluent/test/driver/event_feeder'

require 'fluent/plugin/output'
require 'timeout'

module Fluent
module Test
Expand All @@ -43,6 +44,7 @@ def run_actual(**kwargs, &block)
super(**kwargs, &block)
if @flush_buffer_at_cleanup
@instance.force_flush
Timeout.timeout(10){ sleep 0.1 until !@instance.buffer || @instance.buffer.queue.size == 0 }
end
end

Expand All @@ -52,6 +54,7 @@ def formatted

def flush
@instance.force_flush
Timeout.timeout(10){ sleep 0.1 until !@instance.buffer || @instance.buffer.queue.size == 0 }
end

def instance_hook_after_started
Expand Down

0 comments on commit 36b547d

Please sign in to comment.