diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 3d64839264..0da22cb5bc 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -304,6 +304,10 @@ def configure(conf) raise Fluent::ConfigError, " argument includes 'time', but timekey is not configured" unless @buffer_config.timekey Fluent::Timezone.validate!(@buffer_config.timekey_zone) @timekey_zone = @buffer_config.timekey_use_utc ? '+0000' : @buffer_config.timekey_zone + @timekey = @buffer_config.timekey + @timekey_use_utc = @buffer_config.timekey_use_utc + @offset = Fluent::Timezone.utc_offset(@timekey_zone) + @calculate_offset = @offset.respond_to?(:call) ? @offset : nil @output_time_formatter_cache = {} end @@ -803,20 +807,17 @@ def metadata(tag, time, record) if !@chunk_key_time && !@chunk_key_tag @buffer.metadata() elsif @chunk_key_time && @chunk_key_tag - time_int = time.to_i - timekey = (time_int - (time_int % @buffer_config.timekey)).to_i + timekey = calculate_timekey(time) @buffer.metadata(timekey: timekey, tag: tag) elsif @chunk_key_time - time_int = time.to_i - timekey = (time_int - (time_int % @buffer_config.timekey)).to_i + timekey = calculate_timekey(time) @buffer.metadata(timekey: timekey) else @buffer.metadata(tag: tag) end else timekey = if @chunk_key_time - time_int = time.to_i - (time_int - (time_int % @buffer_config.timekey)).to_i + calculate_timekey(time) else nil end @@ -825,6 +826,16 @@ def metadata(tag, time, record) end end + def calculate_timekey(time) + time_int = time.to_i + if @timekey_use_utc + (time_int - (time_int % @timekey)).to_i + else + offset = @calculate_offset ? @calculate_offset.call(time) : @offset + (time_int - ((time_int + offset)% @timekey)).to_i + end + end + def chunk_for_test(tag, time, record) require 'fluent/plugin/buffer/memory_chunk' diff --git a/lib/fluent/timezone.rb b/lib/fluent/timezone.rb index bb6fa86893..53a155153b 100644 --- a/lib/fluent/timezone.rb +++ b/lib/fluent/timezone.rb @@ -139,5 +139,19 @@ def self.formatter(timezone = nil, format = nil) return nil end + + def self.utc_offset(timezone) + return 0 if timezone.nil? + + case timezone + when NUMERIC_PATTERN + Time.zone_offset(timezone) + when NAME_PATTERN + tz = TZInfo::Timezone.get(timezone) + ->(time) { + tz.period_for_utc(time).utc_total_offset + } + end + end end end diff --git a/test/plugin/test_out_file.rb b/test/plugin/test_out_file.rb index 3cc745f751..0ad30add74 100644 --- a/test/plugin/test_out_file.rb +++ b/test/plugin/test_out_file.rb @@ -561,6 +561,79 @@ def parse_system(text) check_gzipped_result(path, formatted_lines * 3) end + test 'append when JST' do + with_timezone(Fluent.windows? ? "JST-9" : "Asia/Tokyo") do + time = event_time("2011-01-02 03:14:15+09:00") + formatted_lines = %[2011-01-02T03:14:15+09:00\ttest\t{"a":1}\n] + %[2011-01-02T03:14:15+09:00\ttest\t{"a":2}\n] + + write_once = ->(){ + d = create_driver %[ + path #{TMP_DIR}/out_file_test + compress gz + append true + + timekey_use_utc false + timekey_zone Asia/Tokyo + + ] + d.run(default_tag: 'test'){ + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + } + d.instance.last_written_path + } + + path = write_once.call + assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path + check_gzipped_result(path, formatted_lines) + + path = write_once.call + assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path + check_gzipped_result(path, formatted_lines * 2) + + path = write_once.call + assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path + check_gzipped_result(path, formatted_lines * 3) + end + end + + test 'append when UTC-02 but timekey_zone is +0900' do + with_timezone("UTC-02") do # +0200 + time = event_time("2011-01-02 17:14:15+02:00") + formatted_lines = %[2011-01-02T17:14:15+02:00\ttest\t{"a":1}\n] + %[2011-01-02T17:14:15+02:00\ttest\t{"a":2}\n] + + write_once = ->(){ + d = create_driver %[ + path #{TMP_DIR}/out_file_test + compress gz + append true + + timekey_use_utc false + timekey_zone +0900 + + ] + d.run(default_tag: 'test'){ + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + } + d.instance.last_written_path + } + + path = write_once.call + # Rotated at 2011-01-02 17:00:00+02:00 + assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path + check_gzipped_result(path, formatted_lines) + + path = write_once.call + assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path + check_gzipped_result(path, formatted_lines * 2) + + path = write_once.call + assert_equal "#{TMP_DIR}/out_file_test.20110103.log.gz", path + check_gzipped_result(path, formatted_lines * 3) + end + end + test '${chunk_id}' do time = event_time("2011-01-02 13:14:15 UTC") formatted_lines = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n]