From 3eb5d7616040c6d2307645a2e2d553c2370ea56a Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Tue, 12 May 2020 01:35:12 +0900 Subject: [PATCH 1/2] Use EventTime.now instead of Engine.now Signed-off-by: Masahiro Nakagawa --- lib/fluent/plugin/in_dummy.rb | 4 ++-- lib/fluent/plugin/in_forward.rb | 4 ++-- lib/fluent/plugin/in_http.rb | 4 ++-- lib/fluent/plugin/in_monitor_agent.rb | 2 +- lib/fluent/plugin/in_unix.rb | 4 ++-- lib/fluent/plugin/out_file.rb | 2 +- lib/fluent/plugin/out_forward.rb | 2 +- lib/fluent/test/filter_test.rb | 4 ++-- lib/fluent/test/output_test.rb | 6 +++--- 9 files changed, 16 insertions(+), 16 deletions(-) diff --git a/lib/fluent/plugin/in_dummy.rb b/lib/fluent/plugin/in_dummy.rb index 464cfdd8b3..9338a59e72 100644 --- a/lib/fluent/plugin/in_dummy.rb +++ b/lib/fluent/plugin/in_dummy.rb @@ -105,10 +105,10 @@ def emit(num) begin if @size > 1 num.times do - router.emit_array(@tag, Array.new(@size) { [Fluent::Engine.now, generate] }) + router.emit_array(@tag, Array.new(@size) { [Fluent::EventTime.now, generate] }) end else - num.times { router.emit(@tag, Fluent::Engine.now, generate) } + num.times { router.emit(@tag, Fluent::EventTime.now, generate) } end rescue => _ # ignore all errors not to stop emits by emit errors diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index d9e7f5d2ad..cd3d029f18 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -327,7 +327,7 @@ def on_message(msg, chunk_size, conn) record = e[1] next if record.nil? time = e[0] - time = Fluent::Engine.now if time.nil? || time.to_i == 0 # `to_i == 0` for empty EventTime + time = Fluent::EventTime.now if time.nil? || time.to_i == 0 # `to_i == 0` for empty EventTime es.add(time, record) } es @@ -347,7 +347,7 @@ def on_message(msg, chunk_size, conn) return msg[3] # retry never succeeded so return ack and drop incoming event. end return if record.nil? - time = Fluent::Engine.now if time.to_i == 0 + time = Fluent::EventTime.now if time.to_i == 0 if @enable_field_injection record[@source_address_key] = conn.remote_addr if @source_address_key record[@source_hostname_key] = conn.remote_host if @source_hostname_key diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index 5ac6f2249d..ed88f855ac 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -176,9 +176,9 @@ def on_request(path_info, params) end time = if param_time = params['time'] param_time = param_time.to_f - param_time.zero? ? Fluent::Engine.now : @float_time_parser.parse(param_time) + param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time) else - record_time.nil? ? Fluent::Engine.now : record_time + record_time.nil? ? Fluent::EventTime.now : record_time end rescue return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{$!}\n"] diff --git a/lib/fluent/plugin/in_monitor_agent.rb b/lib/fluent/plugin/in_monitor_agent.rb index 0f7f03897f..9eefc83617 100644 --- a/lib/fluent/plugin/in_monitor_agent.rb +++ b/lib/fluent/plugin/in_monitor_agent.rb @@ -223,7 +223,7 @@ def start opts = {with_config: false, with_retry: false} timer_execute(:in_monitor_agent_emit, @emit_interval, repeat: true) { es = Fluent::MultiEventStream.new - now = Fluent::Engine.now + now = Fluent::EventTime.now plugins_info_all(opts).each { |record| es.add(now, record) } diff --git a/lib/fluent/plugin/in_unix.rb b/lib/fluent/plugin/in_unix.rb index 4e3c01a55f..f2998f91da 100644 --- a/lib/fluent/plugin/in_unix.rb +++ b/lib/fluent/plugin/in_unix.rb @@ -95,7 +95,7 @@ def on_message(msg) record = e[1] next if record.nil? time = e[0] - time = (now ||= Engine.now) if time.to_i == 0 + time = (now ||= EventTime.now) if time.to_i == 0 es.add(time, record) } router.emit_stream(tag, es) @@ -106,7 +106,7 @@ def on_message(msg) return if record.nil? time = msg[1] - time = Engine.now if time.to_i == 0 + time = EventTime.now if time.to_i == 0 router.emit(tag, time, record) end end diff --git a/lib/fluent/plugin/out_file.rb b/lib/fluent/plugin/out_file.rb index 494c039c7a..fe7dca4797 100644 --- a/lib/fluent/plugin/out_file.rb +++ b/lib/fluent/plugin/out_file.rb @@ -155,7 +155,7 @@ def configure(conf) dummy_record_keys = get_placeholders_keys(@path_template) || ['message'] dummy_record = Hash[dummy_record_keys.zip(['data'] * dummy_record_keys.size)] - test_chunk1 = chunk_for_test(dummy_tag, Fluent::Engine.now, dummy_record) + test_chunk1 = chunk_for_test(dummy_tag, Fluent::EventTime.now, dummy_record) test_path = extract_placeholders(@path_template, test_chunk1) unless ::Fluent::FileUtil.writable_p?(test_path) raise Fluent::ConfigError, "out_file: `#{test_path}` is not writable" diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 2cde0b3223..38f6b1b5d5 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -709,7 +709,7 @@ def resolved_host @resolved_host ||= resolve_dns! else - now = Fluent::Engine.now + now = Fluent::EventTime.now rh = @resolved_host if !rh || now - @resolved_time >= @sender.expire_dns_cache rh = @resolved_host = resolve_dns! diff --git a/lib/fluent/test/filter_test.rb b/lib/fluent/test/filter_test.rb index 88cca149d6..7495a268d9 100644 --- a/lib/fluent/test/filter_test.rb +++ b/lib/fluent/test/filter_test.rb @@ -30,12 +30,12 @@ def initialize(klass, tag = 'filter.test', &block) attr_reader :filtered attr_accessor :tag - def emit(record, time = Engine.now) + def emit(record, time = EventTime.now) emit_with_tag(@tag, record, time) end alias_method :filter, :emit - def emit_with_tag(tag, record, time = Engine.now) + def emit_with_tag(tag, record, time = EventTime.now) @events[tag] ||= MultiEventStream.new @events[tag].add(time, record) end diff --git a/lib/fluent/test/output_test.rb b/lib/fluent/test/output_test.rb index 4c0eb11652..b7ff3fd37a 100644 --- a/lib/fluent/test/output_test.rb +++ b/lib/fluent/test/output_test.rb @@ -41,7 +41,7 @@ def initialize(klass, tag='test', &block) attr_accessor :tag - def emit(record, time=Engine.now) + def emit(record, time=EventTime.now) es = OneEventStream.new(time, record) @instance.emit_events(@tag, es) end @@ -62,7 +62,7 @@ def @instance.buffer attr_accessor :tag - def emit(record, time=Engine.now) + def emit(record, time=EventTime.now) @entries << [time, record] self end @@ -110,7 +110,7 @@ def initialize(klass, tag='test', &block) attr_accessor :tag - def emit(record, time=Engine.now) + def emit(record, time=EventTime.now) @entries << [time, record] self end From 02037cccbf8d20757d9f803d6f1966edf2b3ffe0 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Tue, 12 May 2020 09:54:31 +0900 Subject: [PATCH 2/2] Fix Engine.now based test Signed-off-by: Masahiro Nakagawa --- test/plugin/test_in_unix.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/plugin/test_in_unix.rb b/test/plugin/test_in_unix.rb index 3c0b3108b5..d922203d8f 100644 --- a/test/plugin/test_in_unix.rb +++ b/test/plugin/test_in_unix.rb @@ -11,14 +11,13 @@ def test_time d = create_driver time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") - Fluent::Engine.now = time d.expect_emit "tag1", time, {"a"=>1} d.expect_emit "tag2", time, {"a"=>2} d.run do d.expected_emits.each {|tag,_time,record| - send_data Fluent::MessagePackFactory.msgpack_packer.write([tag, 0, record]).to_s + send_data Fluent::MessagePackFactory.msgpack_packer.write([tag, _time, record]).to_s } end end