Skip to content

Commit

Permalink
Merge pull request #2557 from ganmacs/use-process-get-clock
Browse files Browse the repository at this point in the history
Use faster way to get sec and nsec
  • Loading branch information
repeatedly authored Aug 16, 2019
2 parents 2ad5b6d + 809af50 commit 2110b44
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 14 deletions.
4 changes: 4 additions & 0 deletions lib/fluent/clock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ def self.now_raw
Process.clock_gettime(CLOCK_ID)
end

def self.real_now(unit = :second)
Process.clock_gettime(Process::CLOCK_REALTIME, unit)
end

def self.dst_clock_from_time(time)
diff_sec = Time.now - time
now_raw - diff_sec
Expand Down
24 changes: 21 additions & 3 deletions lib/fluent/plugin/buffer/chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,31 @@ def initialize(metadata, compress: :text)
@state = :unstaged

@size = 0
@created_at = Time.now
@modified_at = Time.now
@created_at = Fluent::Clock.real_now
@modified_at = Fluent::Clock.real_now

extend Decompressable if compress == :gzip
end

attr_reader :unique_id, :metadata, :created_at, :modified_at, :state
attr_reader :unique_id, :metadata, :state

def raw_create_at
@created_at
end

def raw_modified_at
@modified_at
end

# for compatibility
def created_at
@created_at_object ||= Time.at(@created_at)
end

# for compatibility
def modified_at
@modified_at_object ||= Time.at(@modified_at)
end

# data is array of formatted record string
def append(data, **kwargs)
Expand Down
17 changes: 9 additions & 8 deletions lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ def commit
@size += @adding_size
@bytesize += @adding_bytes
@adding_bytes = @adding_size = 0
@modified_at = Time.now
@modified_at = Fluent::Clock.real_now
@modified_at_object = nil

true
end
Expand Down Expand Up @@ -216,12 +217,12 @@ def self.unique_id_from_path(path)
def restore_metadata(bindata)
data = msgpack_unpacker(symbolize_keys: true).feed(bindata).read rescue {}

now = Time.now
now = Fluent::Clock.real_now

@unique_id = data[:id] || self.class.unique_id_from_path(@path) || @unique_id
@size = data[:s] || 0
@created_at = Time.at(data.fetch(:c, now.to_i))
@modified_at = Time.at(data.fetch(:m, now.to_i))
@created_at = data.fetch(:c, now.to_i)
@modified_at = data.fetch(:m, now.to_i)

@metadata.timekey = data[:timekey]
@metadata.tag = data[:tag]
Expand All @@ -231,8 +232,8 @@ def restore_metadata(bindata)
def restore_metadata_partially(chunk)
@unique_id = self.class.unique_id_from_path(chunk.path) || @unique_id
@size = 0
@created_at = chunk.ctime # birthtime isn't supported on Windows (and Travis?)
@modified_at = chunk.mtime
@created_at = chunk.ctime.to_i # birthtime isn't supported on Windows (and Travis?)
@modified_at = chunk.mtime.to_i

@metadata.timekey = nil
@metadata.tag = nil
Expand All @@ -243,8 +244,8 @@ def write_metadata(update: true)
data = @metadata.to_h.merge({
id: @unique_id,
s: (update ? @size + @adding_size : @size),
c: @created_at.to_i,
m: (update ? Time.now : @modified_at).to_i,
c: @created_at,
m: (update ? Fluent::Clock.real_now : @modified_at),
})
bin = msgpack_packer.pack(data).to_s
@meta.seek(0, IO::SEEK_SET)
Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/plugin/buffer/memory_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ def commit
@chunk_bytes += @adding_bytes

@adding_bytes = @adding_size = 0
@modified_at = Time.now
@modified_at = Fluent::Clock.real_now
@modified_at_object = nil
true
end

Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,7 @@ def enqueue_thread_run
# This block should be done by integer values.
# If both of flush_interval & flush_thread_interval are 1s, expected actual flush timing is 1.5s.
# If we use integered values for this comparison, expected actual flush timing is 1.0s.
@buffer.enqueue_all{ |metadata, chunk| chunk.created_at.to_i + flush_interval <= now_int }
@buffer.enqueue_all{ |metadata, chunk| chunk.raw_create_at + flush_interval <= now_int }
end

if @chunk_key_time
Expand Down
4 changes: 3 additions & 1 deletion lib/fluent/time.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ def self.eq?(a, b)
end

def self.now
from_time(Time.now)
# This method is called many time. so call Process.clock_gettime directly instead of Fluent::Clock.real_now
now = Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond)
Fluent::EventTime.new(now / 1_000_000_000, now % 1_000_000_000)
end

def self.parse(*args)
Expand Down
1 change: 1 addition & 0 deletions test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def to_masked_element
require 'fluent/time'
require 'serverengine'
require 'helpers/fuzzy_assert'
require 'helpers/process_extenstion'

module Fluent
module Plugin
Expand Down
33 changes: 33 additions & 0 deletions test/helpers/process_extenstion.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
require 'timecop'

module Process
class << self
alias_method :clock_gettime_original, :clock_gettime

def clock_gettime(clock_id, unit = :float_second)
# now only support CLOCK_REALTIME
if Process::CLOCK_REALTIME == clock_id
t = Time.now

case unit
when :float_second
t.to_i + t.nsec / 1_000_000_000.0
when :float_millisecond
t.to_i * 1_000 + t.nsec / 1_000_000.0
when :float_microsecond
t.to_i * 1_000_000 + t.nsec / 1_000.0
when :second
t.to_i
when :millisecond
t.to_i * 1000 + t.nsec / 1_000_000
when :microsecond
t.to_i * 1_000_000 + t.nsec / 1_000
when :nanosecond
t.to_i * 1_000_000_000 + t.nsec
end
else
Process.clock_gettime_original(clock_id, unit)
end
end
end
end

0 comments on commit 2110b44

Please sign in to comment.