Skip to content

Commit

Permalink
Merge pull request #984 from fluent/fix-issues-for-v0.14.0
Browse files Browse the repository at this point in the history
Fix issues for v0.14.0
  • Loading branch information
tagomoris committed May 24, 2016
2 parents 2acc67c + 923b913 commit 8c93d04
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 47 deletions.
10 changes: 5 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ matrix:
include:
- rvm: 2.1.10
os: linux
- rvm: 2.2.4
- rvm: 2.2.5
os: linux
- rvm: 2.3.0
- rvm: 2.3.1
os: linux
- rvm: ruby-head
os: linux
- rvm: 2.1.0
- rvm: 2.1.10
os: osx
osx_image: xcode7.3 # OSX 10.11
- rvm: 2.2.4
- rvm: 2.2.5
os: osx
osx_image: xcode7.1 # OSX 10.10
osx_image: xcode7.3 # OSX 10.11
- rvm: ruby-head
os: osx
osx_image: xcode 7.3 # OSX 10.11
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/configurable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def configured_in(section_name)
def config_param(name, type = nil, **kwargs, &block)
configure_proxy(self.name).config_param(name, type, **kwargs, &block)
# reserved names '@foo' are invalid as attr_accessor name
attr_accessor(name) unless Fluent::Config::Element::RESERVED_PARAMETERS.include?(name.to_s)
attr_accessor(name) unless kwargs[:skip_accessor] || Fluent::Config::Element::RESERVED_PARAMETERS.include?(name.to_s)
end

def config_set_default(name, defval)
Expand Down
30 changes: 9 additions & 21 deletions lib/fluent/plugin/formatter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def self.included(klass)
config_param :include_time_key, :bool, default: false
config_param :time_key, :string, default: 'time'
config_param :time_format, :string, default: nil
config_param :time_as_epoch, :bool, default: false
config_param :include_tag_key, :bool, default: false
config_param :tag_key, :string, default: 'tag'
config_param :localtime, :bool, default: true
Expand All @@ -51,41 +52,28 @@ def configure(conf)
@localtime = false
end
@timef = Fluent::TimeFormatter.new(@time_format, @localtime, @timezone)
if @time_as_epoch && !@include_time_key
log.warn "time_as_epoch will be ignored because include_time_key is false"
end
end

def filter_record(tag, time, record)
if @include_tag_key
record[@tag_key] = tag
end
if @include_time_key
record[@time_key] = @timef.format(time)
end
end
end

module StructuredFormatMixin
def self.included(klass)
klass.instance_eval {
config_param :time_as_epoch, :bool, default: false
}
end

def configure(conf)
super

if @time_as_epoch
if @include_time_key
@include_time_key = false
if @time_as_epoch
record[@time_key] = time.to_i
else
log.warn "include_time_key is false so ignore time_as_epoch"
@time_as_epoch = false
record[@time_key] = @timef.format(time)
end
end
end
end

module StructuredFormatMixin
def format(tag, time, record)
filter_record(tag, time, record)
record[@time_key] = time if @time_as_epoch
format_record(record)
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/out_exec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ExecOutput < Output
desc 'The format for event time used when the time_key parameter is specified. The default is UNIX time (integer).'
config_param :time_format, :string, default: nil
desc "The format used to map the incoming events to the program input. (#{ExecUtil::SUPPORTED_FORMAT.keys.join(',')})"
config_param :format, default: :tsv do |val|
config_param :format, default: :tsv, skip_accessor: true do |val|
f = ExecUtil::SUPPORTED_FORMAT[val]
raise ConfigError, "Unsupported format '#{val}'" unless f
f
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/out_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class FileOutput < TimeSlicedOutput
desc "The Path of the file."
config_param :path, :string
desc "The format of the file content. The default is out_file."
config_param :format, :string, default: 'out_file'
config_param :format, :string, default: 'out_file', skip_accessor: true
desc "The flushed chunk is appended to existence file or not."
config_param :append, :bool, default: false
desc "Compress flushed file."
Expand Down
8 changes: 6 additions & 2 deletions lib/fluent/plugin_helper/event_loop.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ def event_loop_attach(watcher)
end

def event_loop_wait_until_start
::Thread.pass until event_loop_running?
sleep(0.1) until event_loop_running?
end

def event_loop_wait_until_stop
sleep(0.1) while event_loop_running?
end

def event_loop_running?
Expand Down Expand Up @@ -74,7 +78,7 @@ def shutdown
@_event_loop.watchers.each {|w| w.detach if w.attached? }
end
while @_event_loop_running
::Thread.pass
sleep 0.1
end

super
Expand Down
8 changes: 7 additions & 1 deletion lib/fluent/plugin_helper/thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ def thread_current_running?

def thread_wait_until_start
until @_threads_mutex.synchronize{ @_threads.values.reduce(true){|r,t| r && t[:_fluentd_plugin_helper_thread_started] } }
::Thread.pass
sleep 0.1
end
end

def thread_wait_until_stop
until @_threads_mutex.synchronize{ @_threads.values.reduce(true){|r,t| r && ![:_fluentd_plugin_helper_thread_running] } }
sleep 0.1
end
end

Expand Down
17 changes: 17 additions & 0 deletions lib/fluent/test/driver/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ def error_events(tag: nil)
def run(expect_emits: nil, expect_records: nil, timeout: nil, start: true, shutdown: true, &block)
instance_start if start

if @instance.respond_to?(:thread_wait_until_start)
@instance.thread_wait_until_start
end
if @instance.respond_to?(:event_loop_wait_until_start)
@instance.event_loop_wait_until_start
end

begin
run_actual(expect_emits: expect_emits, expect_records: expect_records, timeout: timeout, &block)
ensure
Expand All @@ -158,8 +165,18 @@ def instance_shutdown
@instance.stop unless @instance.stopped?
@instance.before_shutdown unless @instance.before_shutdown?
@instance.shutdown unless @instance.shutdown?

if @instance.respond_to?(:event_loop_wait_until_stop)
@instance.event_loop_wait_until_stop
end

@instance.after_shutdown unless @instance.after_shutdown?
@instance.close unless @instance.closed?

if @instance.respond_to?(:thread_wait_until_stop)
@instance.thread_wait_until_stop
end

@instance.terminate unless @instance.terminated?
end

Expand Down
1 change: 1 addition & 0 deletions lib/fluent/time.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#

require 'time'
require 'msgpack'

module Fluent
class EventTime
Expand Down
2 changes: 1 addition & 1 deletion test/plugin/test_out_buffered_null.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class BufferedNullOutputTestCase < Test::Unit::TestCase
end

assert_equal 1, data.size
chunk_id, tag, binary = data.first
_, tag, binary = data.first
events = []
Fluent::MessagePackFactory.unpacker.feed_each(binary){|obj| events << obj }
assert_equal 'test', tag
Expand Down
28 changes: 14 additions & 14 deletions test/plugin_helper/test_timer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ class Dummy < Fluent::Plugin::TestBase
assert d1.timer_running?

counter = 0
d1.timer_execute(:test, 0.1) do
d1.timer_execute(:test, 1) do
counter += 1
end

sleep 0.55
sleep 5

d1.stop
assert !d1.timer_running?

assert{ counter >= 4 && counter <= 6 }
assert{ counter >= 3 && counter <= 6 }

d1.shutdown; d1.close; d1.terminate
end
Expand All @@ -52,19 +52,19 @@ class Dummy < Fluent::Plugin::TestBase
counter1 = 0
counter2 = 0

d1.timer_execute(:t1, 0.1) do
d1.timer_execute(:t1, 1) do
counter1 += 1
end
d1.timer_execute(:t2, 0.25) do
d1.timer_execute(:t2, 2) do
counter2 += 1
end

sleep 0.55
sleep 6

d1.stop

assert{ counter1 >= 4 && counter1 <= 6 }
assert{ counter2 == 2 }
assert{ counter1 >= 4 && counter1 <= 7 }
assert{ counter2 >= 2 && counter2 <= 4 }

d1.shutdown; d1.close; d1.terminate
end
Expand All @@ -77,20 +77,20 @@ class Dummy < Fluent::Plugin::TestBase
counter1 = 0
counter2 = 0

d1.timer_execute(:t1, 0.1) do
d1.timer_execute(:t1, 1) do
counter1 += 1
end
d1.timer_execute(:t2, 0.1) do
raise "abort!!!!!!" if counter2 > 2
d1.timer_execute(:t2, 1) do
raise "abort!!!!!!" if counter2 > 1
counter2 += 1
end

sleep 0.55
sleep 5

d1.stop

assert{ counter1 >= 4 && counter1 <= 6 }
assert{ counter2 == 3 }
assert{ counter1 >= 3 && counter1 <= 6 }
assert{ counter2 == 2 }
msg = "Unexpected error raised. Stopping the timer. title=:t2"
assert{ d1.log.out.logs.any?{|line| line.include?("[error]:") && line.include?(msg) && line.include?("abort!!!!!!") } }
assert{ d1.log.out.logs.any?{|line| line.include?("[error]:") && line.include?("Timer detached. title=:t2") } }
Expand Down

0 comments on commit 8c93d04

Please sign in to comment.