From 6076a3e7657854753db904584f11f8ded67deb1f Mon Sep 17 00:00:00 2001 From: Yuta Iwama Date: Tue, 13 Aug 2019 23:32:04 +0900 Subject: [PATCH] chunk have create_at and modified as integer internally Signed-off-by: Yuta Iwama --- lib/fluent/plugin/buffer/chunk.rb | 24 +++++++++++++++++++++--- lib/fluent/plugin/buffer/file_chunk.rb | 16 ++++++++-------- lib/fluent/plugin/buffer/memory_chunk.rb | 3 ++- lib/fluent/plugin/output.rb | 2 +- 4 files changed, 32 insertions(+), 13 deletions(-) diff --git a/lib/fluent/plugin/buffer/chunk.rb b/lib/fluent/plugin/buffer/chunk.rb index dc6b9f40bd..050651f171 100644 --- a/lib/fluent/plugin/buffer/chunk.rb +++ b/lib/fluent/plugin/buffer/chunk.rb @@ -57,13 +57,31 @@ def initialize(metadata, compress: :text) @state = :unstaged @size = 0 - @created_at = Time.now - @modified_at = Time.now + @created_at = Fluent::Clock.real_now + @modified_at = Fluent::Clock.real_now extend Decompressable if compress == :gzip end - attr_reader :unique_id, :metadata, :created_at, :modified_at, :state + attr_reader :unique_id, :metadata, :state + + def raw_create_at + @created_at + end + + def raw_modified_at + @modified_at + end + + # for compatibility + def created_at + @created_at_object ||= Time.at(@created_at) + end + + # for compatibility + def modified_at + @modified_at_object ||= Time.at(@created_at) + end # data is array of formatted record string def append(data, **kwargs) diff --git a/lib/fluent/plugin/buffer/file_chunk.rb b/lib/fluent/plugin/buffer/file_chunk.rb index 9f2708ec3b..341811f376 100644 --- a/lib/fluent/plugin/buffer/file_chunk.rb +++ b/lib/fluent/plugin/buffer/file_chunk.rb @@ -74,7 +74,7 @@ def commit @size += @adding_size @bytesize += @adding_bytes @adding_bytes = @adding_size = 0 - @modified_at = Time.now + @modified_at = Fluent::Clock.real_now true end @@ -216,12 +216,12 @@ def self.unique_id_from_path(path) def restore_metadata(bindata) data = msgpack_unpacker(symbolize_keys: true).feed(bindata).read rescue {} - now = Time.now + now = Fluent::Clock.real_now @unique_id = data[:id] || self.class.unique_id_from_path(@path) || @unique_id @size = data[:s] || 0 - @created_at = Time.at(data.fetch(:c, now.to_i)) - @modified_at = Time.at(data.fetch(:m, now.to_i)) + @created_at = data.fetch(:c, now.to_i) + @modified_at = data.fetch(:m, now.to_i) @metadata.timekey = data[:timekey] @metadata.tag = data[:tag] @@ -231,8 +231,8 @@ def restore_metadata(bindata) def restore_metadata_partially(chunk) @unique_id = self.class.unique_id_from_path(chunk.path) || @unique_id @size = 0 - @created_at = chunk.ctime # birthtime isn't supported on Windows (and Travis?) - @modified_at = chunk.mtime + @created_at = chunk.ctime.to_i # birthtime isn't supported on Windows (and Travis?) + @modified_at = chunk.mtime.to_i @metadata.timekey = nil @metadata.tag = nil @@ -243,8 +243,8 @@ def write_metadata(update: true) data = @metadata.to_h.merge({ id: @unique_id, s: (update ? @size + @adding_size : @size), - c: @created_at.to_i, - m: (update ? Time.now : @modified_at).to_i, + c: @created_at, + m: (update ? Fluent::Clock.real_now : @modified_at), }) bin = msgpack_packer.pack(data).to_s @meta.seek(0, IO::SEEK_SET) diff --git a/lib/fluent/plugin/buffer/memory_chunk.rb b/lib/fluent/plugin/buffer/memory_chunk.rb index ddb16e4a65..556c8c8a3d 100644 --- a/lib/fluent/plugin/buffer/memory_chunk.rb +++ b/lib/fluent/plugin/buffer/memory_chunk.rb @@ -43,7 +43,8 @@ def commit @chunk_bytes += @adding_bytes @adding_bytes = @adding_size = 0 - @modified_at = Time.now + @modified_at = Fluent::Clock.real_now + @modified_at_object = nil true end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index be44e3990b..68b038b7ea 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -1376,7 +1376,7 @@ def enqueue_thread_run # This block should be done by integer values. # If both of flush_interval & flush_thread_interval are 1s, expected actual flush timing is 1.5s. # If we use integered values for this comparison, expected actual flush timing is 1.0s. - @buffer.enqueue_all{ |metadata, chunk| chunk.created_at.to_i + flush_interval <= now_int } + @buffer.enqueue_all{ |metadata, chunk| chunk.raw_create_at + flush_interval <= now_int } end if @chunk_key_time