Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use faster way to get sec and nsec #2557

Merged
merged 4 commits into from
Aug 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/fluent/clock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ def self.now_raw
Process.clock_gettime(CLOCK_ID)
end

def self.real_now(unit = :second)
Process.clock_gettime(Process::CLOCK_REALTIME, unit)
end

def self.dst_clock_from_time(time)
diff_sec = Time.now - time
now_raw - diff_sec
Expand Down
24 changes: 21 additions & 3 deletions lib/fluent/plugin/buffer/chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,31 @@ def initialize(metadata, compress: :text)
@state = :unstaged

@size = 0
@created_at = Time.now
@modified_at = Time.now
@created_at = Fluent::Clock.real_now
@modified_at = Fluent::Clock.real_now

extend Decompressable if compress == :gzip
end

attr_reader :unique_id, :metadata, :created_at, :modified_at, :state
attr_reader :unique_id, :metadata, :state

def raw_create_at
@created_at
end

def raw_modified_at
@modified_at
end

# for compatibility
def created_at
@created_at_object ||= Time.at(@created_at)
end

# for compatibility
def modified_at
@modified_at_object ||= Time.at(@modified_at)
end

# data is array of formatted record string
def append(data, **kwargs)
Expand Down
17 changes: 9 additions & 8 deletions lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ def commit
@size += @adding_size
@bytesize += @adding_bytes
@adding_bytes = @adding_size = 0
@modified_at = Time.now
@modified_at = Fluent::Clock.real_now
@modified_at_object = nil

true
end
Expand Down Expand Up @@ -216,12 +217,12 @@ def self.unique_id_from_path(path)
def restore_metadata(bindata)
data = msgpack_unpacker(symbolize_keys: true).feed(bindata).read rescue {}

now = Time.now
now = Fluent::Clock.real_now

@unique_id = data[:id] || self.class.unique_id_from_path(@path) || @unique_id
@size = data[:s] || 0
@created_at = Time.at(data.fetch(:c, now.to_i))
@modified_at = Time.at(data.fetch(:m, now.to_i))
@created_at = data.fetch(:c, now.to_i)
@modified_at = data.fetch(:m, now.to_i)

@metadata.timekey = data[:timekey]
@metadata.tag = data[:tag]
Expand All @@ -231,8 +232,8 @@ def restore_metadata(bindata)
def restore_metadata_partially(chunk)
@unique_id = self.class.unique_id_from_path(chunk.path) || @unique_id
@size = 0
@created_at = chunk.ctime # birthtime isn't supported on Windows (and Travis?)
@modified_at = chunk.mtime
@created_at = chunk.ctime.to_i # birthtime isn't supported on Windows (and Travis?)
@modified_at = chunk.mtime.to_i

@metadata.timekey = nil
@metadata.tag = nil
Expand All @@ -243,8 +244,8 @@ def write_metadata(update: true)
data = @metadata.to_h.merge({
id: @unique_id,
s: (update ? @size + @adding_size : @size),
c: @created_at.to_i,
m: (update ? Time.now : @modified_at).to_i,
c: @created_at,
m: (update ? Fluent::Clock.real_now : @modified_at),
})
bin = msgpack_packer.pack(data).to_s
@meta.seek(0, IO::SEEK_SET)
Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/plugin/buffer/memory_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ def commit
@chunk_bytes += @adding_bytes

@adding_bytes = @adding_size = 0
@modified_at = Time.now
@modified_at = Fluent::Clock.real_now
@modified_at_object = nil
true
end

Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,7 @@ def enqueue_thread_run
# This block should be done by integer values.
# If both of flush_interval & flush_thread_interval are 1s, expected actual flush timing is 1.5s.
# If we use integered values for this comparison, expected actual flush timing is 1.0s.
@buffer.enqueue_all{ |metadata, chunk| chunk.created_at.to_i + flush_interval <= now_int }
@buffer.enqueue_all{ |metadata, chunk| chunk.raw_create_at + flush_interval <= now_int }
end

if @chunk_key_time
Expand Down
4 changes: 3 additions & 1 deletion lib/fluent/time.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ def self.eq?(a, b)
end

def self.now
from_time(Time.now)
# This method is called many time. so call Process.clock_gettime directly instead of Fluent::Clock.real_now
now = Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond)
Fluent::EventTime.new(now / 1_000_000_000, now % 1_000_000_000)
end

def self.parse(*args)
Expand Down
1 change: 1 addition & 0 deletions test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def to_masked_element
require 'fluent/time'
require 'serverengine'
require 'helpers/fuzzy_assert'
require 'helpers/process_extenstion'

module Fluent
module Plugin
Expand Down
33 changes: 33 additions & 0 deletions test/helpers/process_extenstion.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
require 'timecop'

module Process
class << self
alias_method :clock_gettime_original, :clock_gettime

def clock_gettime(clock_id, unit = :float_second)
# now only support CLOCK_REALTIME
if Process::CLOCK_REALTIME == clock_id
t = Time.now

case unit
when :float_second
t.to_i + t.nsec / 1_000_000_000.0
when :float_millisecond
t.to_i * 1_000 + t.nsec / 1_000_000.0
when :float_microsecond
t.to_i * 1_000_000 + t.nsec / 1_000.0
when :second
t.to_i
when :millisecond
t.to_i * 1000 + t.nsec / 1_000_000
when :microsecond
t.to_i * 1_000_000 + t.nsec / 1_000
when :nanosecond
t.to_i * 1_000_000_000 + t.nsec
end
else
Process.clock_gettime_original(clock_id, unit)
end
end
end
end