Skip to content

Commit

Permalink
chunk have create_at and modified as integer internally
Browse files Browse the repository at this point in the history
Signed-off-by: Yuta Iwama <[email protected]>
  • Loading branch information
ganmacs committed Aug 13, 2019
1 parent 7811da6 commit 6076a3e
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 13 deletions.
24 changes: 21 additions & 3 deletions lib/fluent/plugin/buffer/chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,31 @@ def initialize(metadata, compress: :text)
@state = :unstaged

@size = 0
@created_at = Time.now
@modified_at = Time.now
@created_at = Fluent::Clock.real_now
@modified_at = Fluent::Clock.real_now

extend Decompressable if compress == :gzip
end

attr_reader :unique_id, :metadata, :created_at, :modified_at, :state
attr_reader :unique_id, :metadata, :state

def raw_create_at
@created_at
end

def raw_modified_at
@modified_at
end

# for compatibility
def created_at
@created_at_object ||= Time.at(@created_at)
end

# for compatibility
def modified_at
@modified_at_object ||= Time.at(@created_at)
end

# data is array of formatted record string
def append(data, **kwargs)
Expand Down
16 changes: 8 additions & 8 deletions lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def commit
@size += @adding_size
@bytesize += @adding_bytes
@adding_bytes = @adding_size = 0
@modified_at = Time.now
@modified_at = Fluent::Clock.real_now

true
end
Expand Down Expand Up @@ -216,12 +216,12 @@ def self.unique_id_from_path(path)
def restore_metadata(bindata)
data = msgpack_unpacker(symbolize_keys: true).feed(bindata).read rescue {}

now = Time.now
now = Fluent::Clock.real_now

@unique_id = data[:id] || self.class.unique_id_from_path(@path) || @unique_id
@size = data[:s] || 0
@created_at = Time.at(data.fetch(:c, now.to_i))
@modified_at = Time.at(data.fetch(:m, now.to_i))
@created_at = data.fetch(:c, now.to_i)
@modified_at = data.fetch(:m, now.to_i)

@metadata.timekey = data[:timekey]
@metadata.tag = data[:tag]
Expand All @@ -231,8 +231,8 @@ def restore_metadata(bindata)
def restore_metadata_partially(chunk)
@unique_id = self.class.unique_id_from_path(chunk.path) || @unique_id
@size = 0
@created_at = chunk.ctime # birthtime isn't supported on Windows (and Travis?)
@modified_at = chunk.mtime
@created_at = chunk.ctime.to_i # birthtime isn't supported on Windows (and Travis?)
@modified_at = chunk.mtime.to_i

@metadata.timekey = nil
@metadata.tag = nil
Expand All @@ -243,8 +243,8 @@ def write_metadata(update: true)
data = @metadata.to_h.merge({
id: @unique_id,
s: (update ? @size + @adding_size : @size),
c: @created_at.to_i,
m: (update ? Time.now : @modified_at).to_i,
c: @created_at,
m: (update ? Fluent::Clock.real_now : @modified_at),
})
bin = msgpack_packer.pack(data).to_s
@meta.seek(0, IO::SEEK_SET)
Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/plugin/buffer/memory_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ def commit
@chunk_bytes += @adding_bytes

@adding_bytes = @adding_size = 0
@modified_at = Time.now
@modified_at = Fluent::Clock.real_now
@modified_at_object = nil
true
end

Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,7 @@ def enqueue_thread_run
# This block should be done by integer values.
# If both of flush_interval & flush_thread_interval are 1s, expected actual flush timing is 1.5s.
# If we use integered values for this comparison, expected actual flush timing is 1.0s.
@buffer.enqueue_all{ |metadata, chunk| chunk.created_at.to_i + flush_interval <= now_int }
@buffer.enqueue_all{ |metadata, chunk| chunk.raw_create_at + flush_interval <= now_int }
end

if @chunk_key_time
Expand Down

0 comments on commit 6076a3e

Please sign in to comment.