From d9cf4e6171bb792bdcc75ddf6194065f11d6df86 Mon Sep 17 00:00:00 2001 From: abicky Date: Fri, 1 Jan 2021 07:05:08 +0900 Subject: [PATCH 1/3] Make unixtime injection faster by avoiding method_missing call According to the following script, this change makes the injection 3.5x faster than before: ``` require 'fluent/event' require 'benchmark/ips' time_in_unix = Time.parse("2016-06-21 08:10:11 +0900").to_i time_subsecond = 320_101_224 time = Fluent::EventTime.new(time_in_unix, time_subsecond) Benchmark.ips do |x| puts "time.to_i: #{time.to_i}" puts "time.to_int: #{time.to_int}" x.report('time.to_i') { time.to_i } x.report('time.to_int') { time.to_int } x.compare! end ``` Signed-off-by: abicky --- lib/fluent/time.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/fluent/time.rb b/lib/fluent/time.rb index 5c56c953e1..9f52cbba27 100644 --- a/lib/fluent/time.rb +++ b/lib/fluent/time.rb @@ -50,6 +50,7 @@ def nsec def to_int @sec end + alias :to_i :to_int def to_f @sec + @nsec / 1_000_000_000.0 From 930ad5fcf6dbf34e91b9589830db79ff95b797b6 Mon Sep 17 00:00:00 2001 From: abicky Date: Fri, 1 Jan 2021 08:08:54 +0900 Subject: [PATCH 2/3] Make unixtime_millis injection faster According to the following script, this change makes the injection 1.3x faster than before: ``` require 'fluent/event' require 'benchmark/ips' before = ->(time) { time.to_f.floor(3) * 1000 } after = ->(time) do time.respond_to?(:nsec) ? time.to_int * 1000 + time.nsec / 1_000_000 : (time * 1000).floor end time_in_unix = Time.parse("2016-06-21 08:10:11 +0900").to_i time_subsecond = 320_101_224 time = Fluent::EventTime.new(time_in_unix, time_subsecond) Benchmark.ips do |x| puts "before: #{before.call(time)}" puts "after: #{after.call(time)}" x.report('before') { before.call(time) } x.report('after') { after.call(time) } x.compare! end ``` Signed-off-by: abicky --- lib/fluent/plugin_helper/inject.rb | 2 +- test/plugin_helper/test_inject.rb | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin_helper/inject.rb b/lib/fluent/plugin_helper/inject.rb index e9194adecc..d0fc8abfa5 100644 --- a/lib/fluent/plugin_helper/inject.rb +++ b/lib/fluent/plugin_helper/inject.rb @@ -132,7 +132,7 @@ def configure(conf) if @_inject_time_key @_inject_time_formatter = case @inject_config.time_type when :float then ->(time){ time.to_r.truncate(+6).to_f } # microsecond floating point value - when :unixtime_millis then ->(time) { time.to_f.floor(3) * 1000 } + when :unixtime_millis then ->(time) { time.respond_to?(:nsec) ? time.to_i * 1_000 + time.nsec / 1_000_000 : (time * 1_000).floor } when :unixtime then ->(time){ time.to_i } else localtime = @inject_config.localtime && !@inject_config.utc diff --git a/test/plugin_helper/test_inject.rb b/test/plugin_helper/test_inject.rb index 1f087cdf4b..e3d66b256a 100644 --- a/test/plugin_helper/test_inject.rb +++ b/test/plugin_helper/test_inject.rb @@ -187,6 +187,7 @@ def config_inject_section(hash = {}) record = {"key1" => "value1", "key2" => 2} assert_equal record.merge({"timedata" => unixtime_millis}), @d.inject_values_to_record('tag', time, record) + assert_equal record.merge({"timedata" => time_in_unix * 1_000}), @d.inject_values_to_record('tag', time_in_unix, record) end test 'injects time as unix time into specified key' do From e2da75264aed73accc165fc8c8d37fa5db35e650 Mon Sep 17 00:00:00 2001 From: abicky Date: Fri, 1 Jan 2021 08:13:40 +0900 Subject: [PATCH 3/3] Support unixtime_micros and unixtime_nanos to inject time If we collect docker container logs using the fluentd logging driver, we need to sort the logs by the time to merge logs from various containers. Although we can now specify unixtime_millis, logs with line feed sometimes have the same time in milliseconds. As a result, the logs are sorted in the wrong order. This commit will resolve the problem by introducing new time types of unixtime with higher precision. Signed-off-by: abicky --- lib/fluent/plugin_helper/inject.rb | 4 +++- test/plugin_helper/test_inject.rb | 28 ++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin_helper/inject.rb b/lib/fluent/plugin_helper/inject.rb index d0fc8abfa5..6960ee45a6 100644 --- a/lib/fluent/plugin_helper/inject.rb +++ b/lib/fluent/plugin_helper/inject.rb @@ -76,7 +76,7 @@ module InjectParams config_param :time_key, :string, default: nil # To avoid defining :time_type twice - config_param :time_type, :enum, list: [:float, :unixtime, :unixtime_millis, :string], default: :float + config_param :time_type, :enum, list: [:float, :unixtime, :unixtime_millis, :unixtime_micros, :unixtime_nanos, :string], default: :float Fluent::TimeMixin::TIME_PARAMETERS.each do |name, type, opts| config_param(name, type, **opts) @@ -133,6 +133,8 @@ def configure(conf) @_inject_time_formatter = case @inject_config.time_type when :float then ->(time){ time.to_r.truncate(+6).to_f } # microsecond floating point value when :unixtime_millis then ->(time) { time.respond_to?(:nsec) ? time.to_i * 1_000 + time.nsec / 1_000_000 : (time * 1_000).floor } + when :unixtime_micros then ->(time) { time.respond_to?(:nsec) ? time.to_i * 1_000_000 + time.nsec / 1_000 : (time * 1_000_000).floor } + when :unixtime_nanos then ->(time) { time.respond_to?(:nsec) ? time.to_i * 1_000_000_000 + time.nsec : (time * 1_000_000_000).floor } when :unixtime then ->(time){ time.to_i } else localtime = @inject_config.localtime && !@inject_config.utc diff --git a/test/plugin_helper/test_inject.rb b/test/plugin_helper/test_inject.rb index e3d66b256a..b17f63f5eb 100644 --- a/test/plugin_helper/test_inject.rb +++ b/test/plugin_helper/test_inject.rb @@ -190,6 +190,34 @@ def config_inject_section(hash = {}) assert_equal record.merge({"timedata" => time_in_unix * 1_000}), @d.inject_values_to_record('tag', time_in_unix, record) end + test 'injects time as unix time micros into specified key' do + time_in_unix = Time.parse("2016-06-21 08:10:11 +0900").to_i + time_subsecond = 320_101_224 + time = Fluent::EventTime.new(time_in_unix, time_subsecond) + unixtime_micros = 1466464211320101 + + @d.configure(config_inject_section("time_key" => "timedata", "time_type" => "unixtime_micros")) + @d.start + + record = {"key1" => "value1", "key2" => 2} + assert_equal record.merge({"timedata" => unixtime_micros}), @d.inject_values_to_record('tag', time, record) + assert_equal record.merge({"timedata" => time_in_unix * 1_000_000}), @d.inject_values_to_record('tag', time_in_unix, record) + end + + test 'injects time as unix time nanos into specified key' do + time_in_unix = Time.parse("2016-06-21 08:10:11 +0900").to_i + time_subsecond = 320_101_224 + time = Fluent::EventTime.new(time_in_unix, time_subsecond) + unixtime_nanos = 1466464211320101224 + + @d.configure(config_inject_section("time_key" => "timedata", "time_type" => "unixtime_nanos")) + @d.start + + record = {"key1" => "value1", "key2" => 2} + assert_equal record.merge({"timedata" => unixtime_nanos}), @d.inject_values_to_record('tag', time, record) + assert_equal record.merge({"timedata" => time_in_unix * 1_000_000_000}), @d.inject_values_to_record('tag', time_in_unix, record) + end + test 'injects time as unix time into specified key' do time_in_unix = Time.parse("2016-06-21 08:10:11 +0900").to_i time_subsecond = 320_101_224