Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support nanosecond #653

Merged
merged 63 commits into from
Oct 26, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
742042b
Implement NTime
Aug 5, 2015
1d7c967
Engine.now returns NTime
Aug 5, 2015
eed864b
TimeParser.parse returns NTime
Aug 5, 2015
028d64b
Use Engine.now instead of Time.now.to_i in OutputTestDriver
Aug 5, 2015
81f3a5d
Use NTime instead of Time.now in test_{filter,out}_stdout
Aug 5, 2015
6ae478d
Implement NTime#coerce
Aug 5, 2015
556053d
Implement NTime#to_r
Aug 5, 2015
adc377e
Remove NTime#to_{m,u,n}sec
Aug 6, 2015
140a2c4
Add comment
Aug 6, 2015
931fb8a
Add tests for NTime
Aug 6, 2015
5684c08
Implement NTime#to_s
Aug 6, 2015
2f62059
in_exec emits NTime
Aug 6, 2015
006c7a7
out_exec_filter emits NTime
Aug 6, 2015
16321f7
Assert NTime in InputTestDriver
Aug 6, 2015
9141b49
in_http always emits NTime
Aug 6, 2015
a41dfd8
Parser always return NTime as time
Aug 6, 2015
fc834ab
Separate Fluent::NTime declaration
Aug 6, 2015
17187b6
Add NTime tests for input plugins
Aug 6, 2015
c63c8a2
Add assert_equal_ntime
Aug 6, 2015
67e2550
Serialize NTime as extention type
Aug 6, 2015
43069dc
Use LL instead of NN when serializing NTime
Aug 7, 2015
cb330e5
Implement time_as_integer option for out_forward
Aug 7, 2015
3588d04
Rename NTime to NanoTime
Aug 7, 2015
8078847
TimeFormatter cache time as NanoTime if possible
Aug 7, 2015
2455be9
Enable in_http accept msgpack including NanoTime
Aug 11, 2015
bca8eba
Use Time.now instead of unnecessary use of Engine.now
Aug 11, 2015
66a7d5c
in_http always returns NanoTime
Aug 11, 2015
17e29d7
NanoTime#to_msgpack returns the value of @sec.to_msgpack
Aug 11, 2015
2fda4a8
Add comment
Aug 11, 2015
7a199a3
Rename test_ntime to test_nano_time
Aug 11, 2015
b12bf13
Remove Fluent::NanoTime.now
Aug 11, 2015
a0f93be
Move time_as_integer option to ObjectBufferdOutput
Aug 12, 2015
462378e
Optimize time_as_integer
Aug 12, 2015
4bd4151
Add tests for NanoTime.eq?
Aug 12, 2015
c9b27d0
Optimize TimeFormatter.format in the case subsec is no needed
Aug 12, 2015
4a4d74d
JSONParser parses time as float when time_format is not set
Aug 13, 2015
6a71f7e
Don't use `include Fluent`
Aug 13, 2015
969bd6f
Use Fluent::NanoTime instead of NanoTime
Aug 13, 2015
2c3d4b5
Add test for in_syslog
Aug 17, 2015
809ff35
in_http accepts float as NanoTime
Aug 17, 2015
308f8dd
Add tests for in_forward
Aug 17, 2015
564c29f
Add test for out_exec
Aug 17, 2015
7b14325
test_out_exec_filter accepts float as NanoTime
Aug 17, 2015
618eba3
in_exec accepts float as NanoTime
Aug 17, 2015
42e4e69
Add tests for out_forward
Aug 17, 2015
e7b1c45
Add test for out_stream
Aug 17, 2015
b077c43
filter_record_transformer accept float as NanoTime
Aug 17, 2015
56bc9b9
Add test for test_filter_record_transformer
Aug 17, 2015
5e2f5aa
Add NanoTime.now
Aug 21, 2015
905ad91
Add NanoTime.parse
Aug 21, 2015
51130d3
Use strptime gem for optimization of TimeParser
Aug 21, 2015
304c681
Add a comment for gems
Aug 21, 2015
385564f
Fix regexp to check whether format string include subsec or not
Aug 26, 2015
4ea193d
Cosmetic change
Aug 26, 2015
9a1a4b1
Rename NanoTime to EventTime
Aug 26, 2015
9f78bcc
Fix comment
Sep 3, 2015
a18a3cb
Remove 1.9.3 from .travis.yml
Sep 4, 2015
4647c99
Use released gems
Sep 18, 2015
f185cb1
Use flexmock 2.0.0 or later
Sep 18, 2015
a1025d3
Revert "Use flexmock 2.0.0 or later"
Oct 8, 2015
e991f76
Update strptime version to 0.1.2
Oct 8, 2015
3fc0430
Update strptime version to 0.1.3
Oct 9, 2015
1817567
Update msgpack to 0.7.0
Oct 24, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
language: ruby

rvm:
- 1.9.3
- 2.0.0
- 2.1
- 2.2
Expand Down
3 changes: 2 additions & 1 deletion fluentd.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Gem::Specification.new do |gem|

gem.required_ruby_version = '>= 1.9.3'

gem.add_runtime_dependency("msgpack", [">= 0.5.11", "< 0.6.0"])
gem.add_runtime_dependency("msgpack", [">= 0.7.0"])
gem.add_runtime_dependency("json", [">= 1.4.3"])
gem.add_runtime_dependency("yajl-ruby", ["~> 1.0"])
gem.add_runtime_dependency("cool.io", [">= 1.4.1", "< 2.0.0"])
Expand All @@ -34,6 +34,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency("win32-event", ["~> 0.6.1"])
gem.add_runtime_dependency("windows-pr", ["~> 1.2.3"])
end
gem.add_runtime_dependency("strptime", [">= 0.1.3"])

gem.add_development_dependency("rake", [">= 0.9.2"])
gem.add_development_dependency("flexmock", ["~> 1.3.3"])
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def write_to(io)

def msgpack_each(&block)
open {|io|
u = MessagePack::Unpacker.new(io)
u = Fluent::Engine.msgpack_factory.unpacker(io)
begin
u.each(&block)
rescue EOFError
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/command/cat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ def abort_message(time, record)

when 'msgpack'
begin
u = MessagePack::Unpacker.new($stdin)
u = Fluent::Engine.msgpack_factory.unpacker($stdin)
u.each {|record|
w.write(record)
}
Expand Down
6 changes: 5 additions & 1 deletion lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@ def initialize
@log_event_queue = []

@suppress_config_dump = false

@msgpack_factory = MessagePack::Factory.new
@msgpack_factory.register_type(Fluent::EventTime::TYPE, Fluent::EventTime)
end

MATCH_CACHE_SIZE = 1024
LOG_EMIT_INTERVAL = 0.1

attr_reader :root_agent
attr_reader :matches, :sources
attr_reader :msgpack_factory

def init(opts = {})
BasicSocket.do_not_reverse_lookup = true
Expand Down Expand Up @@ -132,7 +136,7 @@ def flush!

def now
# TODO thread update
Time.now.to_i
Fluent::EventTime.now
end

def log_event_loop
Expand Down
12 changes: 10 additions & 2 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@ def each(&block)
end

def to_msgpack_stream
out = MessagePack::Packer.new # MessagePack::Packer is fastest way to serialize events
out = Fluent::Engine.msgpack_factory.packer
each {|time,record|
out.write([time,record])
}
out.to_s
end

def to_msgpack_stream_forced_integer
out = Fluent::Engine.msgpack_factory.packer
each {|time,record|
out.write([time.to_i,record])
}
out.to_s
end
end


Expand Down Expand Up @@ -143,7 +151,7 @@ def repeatable?

def each(&block)
# TODO format check
unpacker = MessagePack::Unpacker.new
unpacker = Fluent::Engine.msgpack_factory.unpacker
unpacker.feed_each(@data, &block)
nil
end
Expand Down
2 changes: 2 additions & 0 deletions lib/fluent/load.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
require 'yajl'
require 'uri'
require 'msgpack'
require 'strptime'
begin
require 'sigdump/setup'
rescue
# ignore setup error on Win or similar platform which doesn't support signal
end
require 'cool.io'

require 'fluent/time'
require 'fluent/env'
require 'fluent/version'
require 'fluent/log'
Expand Down
35 changes: 34 additions & 1 deletion lib/fluent/mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,24 @@
module Fluent
class TimeFormatter
require 'fluent/timezone'
require 'fluent/time'

def initialize(format, localtime, timezone = nil)
@tc1 = 0
@tc1_str = nil
@tc2 = 0
@tc2_str = nil

if format && format =~ /(^|[^%])(%%)*%L|(^|[^%])(%%)*%\d*N/
define_singleton_method(:format) {|time|
format_with_subsec(time)
}
else
define_singleton_method(:format) {|time|
format_without_subsec(time)
}
end

if formatter = Fluent::Timezone.formatter(timezone, format)
define_singleton_method(:format_nocache) {|time|
formatter.call(time)
Expand Down Expand Up @@ -54,7 +65,7 @@ def initialize(format, localtime, timezone = nil)
end
end

def format(time)
def format_without_subsec(time)
if @tc1 == time
return @tc1_str
elsif @tc2 == time
Expand All @@ -72,6 +83,28 @@ def format(time)
end
end

def format_with_subsec(time)
if Fluent::EventTime.eq?(@tc1, time)
return @tc1_str
elsif Fluent::EventTime.eq?(@tc2, time)
return @tc2_str
else
str = format_nocache(time)
if @tc1 < @tc2
@tc1 = time
@tc1_str = str
else
@tc2 = time
@tc2_str = str
end
return str
end
end

def format(time)
# will be overridden in initialize
end

def format_nocache(time)
# will be overridden in initialize
end
Expand Down
10 changes: 8 additions & 2 deletions lib/fluent/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -431,13 +431,19 @@ def flush_secondary(secondary)


class ObjectBufferedOutput < BufferedOutput
config_param :time_as_integer, :bool, :default => true

def initialize
super
end

def emit(tag, es, chain)
@emit_count += 1
data = es.to_msgpack_stream
if @time_as_integer
data = es.to_msgpack_stream_forced_integer
else
data = es.to_msgpack_stream
end
key = tag
if @buffer.emit(key, data, chain)
submit_flush
Expand Down Expand Up @@ -529,7 +535,7 @@ def configure(conf)
else
@flush_interval = [60, @time_slice_cache_interval].min
@enqueue_buffer_proc = Proc.new do
nowslice = @time_slicer.call(Engine.now - @time_slice_wait)
nowslice = @time_slicer.call(Time.now - @time_slice_wait)
@buffer.keys.each {|key|
if key < nowslice
@buffer.push(key)
Expand Down
14 changes: 10 additions & 4 deletions lib/fluent/parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,18 @@ def initialize(time_format)
@cache2_time = nil
@parser =
if time_format
Proc.new { |value| Time.strptime(value, time_format) }
begin
strptime = Strptime.new(time_format)
Proc.new { |value| Fluent::EventTime.from_time(strptime.exec(value)) }
rescue
Proc.new { |value| Fluent::EventTime.from_time(Time.strptime(value, time_format)) }
end
else
Time.method(:parse)
Proc.new { |value| Fluent::EventTime.parse(value) }
end
end

# TODO: new cache mechanism using format string
def parse(value)
unless value.is_a?(String)
raise ParserError, "value must be string: #{value}"
Expand All @@ -77,7 +83,7 @@ def parse(value)
return @cache2_time
else
begin
time = @parser.call(value).to_i
time = @parser.call(value)
rescue => e
raise ParserError, "invalid time format: value = #{value}, error_class = #{e.class.name}, error = #{e.message}"
end
Expand Down Expand Up @@ -256,7 +262,7 @@ def parse(text)
time = @mutex.synchronize { @time_parser.parse(value) }
else
begin
time = value.to_i
time = Fluent::EventTime.from_time(Time.at(value.to_f))
rescue => e
raise ParserError, "invalid time value: value = #{value}, error_class = #{e.class.name}, error = #{e.message}"
end
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/buf_memory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def write_to(io)

# optimize
def msgpack_each(&block)
u = MessagePack::Unpacker.new
u = Fluent::Engine.msgpack_factory.unpacker
u.feed_each(@data, &block)
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/exec_util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def call(io)

class MessagePackParser < Parser
def call(io)
@u = MessagePack::Unpacker.new(io)
@u = Fluent::Engine.msgpack_factory.unpacker(io)
begin
@u.each(&@on_message)
rescue EOFError
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/filter_record_transformer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def filter_stream(tag, es)
last_record = record # for debug log
new_record = reform(time, record, placeholders)
if @renew_time_key && new_record.has_key?(@renew_time_key)
time = new_record[@renew_time_key].to_i
time = EventTime.from_time(Time.at(new_record[@renew_time_key].to_f))
end
new_es.add(time, new_record)
end
Expand Down
10 changes: 8 additions & 2 deletions lib/fluent/plugin/in_exec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,15 @@ def configure(conf)
if @time_key
if @time_format
f = @time_format
@time_parse_proc = Proc.new {|str| Time.strptime(str, f).to_i }
@time_parse_proc =
begin
strptime = Strptime.new(f)
Proc.new { |str| Fluent::EventTime.from_time(strptime.exec(str)) }
rescue
Proc.new {|str| Fluent::EventTime.from_time(Time.strptime(str, f)) }
end
else
@time_parse_proc = Proc.new {|str| str.to_i }
@time_parse_proc = Proc.new {|str| Fluent::EventTime.from_time(Time.at(str.to_f)) }
end
end

Expand Down
8 changes: 4 additions & 4 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ def on_message(msg, chunk_size, source)
entries.each {|e|
record = e[1]
next if record.nil?
time = e[0].to_i
time = (now ||= Engine.now) if time == 0
time = e[0]
time = (now ||= Engine.now) if time.to_i == 0
es.add(time, record)
}
router.emit_stream(tag, es)
Expand All @@ -165,7 +165,7 @@ def on_message(msg, chunk_size, source)
record = msg[2]
return if record.nil?
time = msg[1]
time = Engine.now if time == 0
time = Engine.now if time.to_i == 0
router.emit(tag, time, record)
option = msg[3]
end
Expand Down Expand Up @@ -219,7 +219,7 @@ def on_read(data)
else
m = method(:on_read_msgpack)
@serializer = :to_msgpack.to_proc
@u = MessagePack::Unpacker.new
@u = Fluent::Engine.msgpack_factory.unpacker
end

(class << self; self; end).module_eval do
Expand Down
6 changes: 3 additions & 3 deletions lib/fluent/plugin/in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ def on_request(path_info, params)
end
end
time = if param_time = params['time']
param_time = param_time.to_i
param_time.zero? ? Engine.now : param_time
param_time = param_time.to_f
param_time.zero? ? Engine.now : Fluent::EventTime.from_time(Time.at(param_time))
else
record_time.nil? ? Engine.now : record_time
end
Expand Down Expand Up @@ -191,7 +191,7 @@ def on_request(path_info, params)

def parse_params_default(params)
record = if msgpack = params['msgpack']
MessagePack.unpack(msgpack)
Engine.msgpack_factory.unpacker.feed(msgpack).read
elsif js = params['json']
JSON.parse(js)
else
Expand Down
8 changes: 4 additions & 4 deletions lib/fluent/plugin/in_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ def on_message(msg)
entries.each {|e|
record = e[1]
next if record.nil?
time = e[0].to_i
time = (now ||= Engine.now) if time == 0
time = e[0]
time = (now ||= Engine.now) if time.to_i == 0
es.add(time, record)
}
router.emit_stream(tag, es)
Expand All @@ -99,7 +99,7 @@ def on_message(msg)
return if record.nil?

time = msg[1]
time = Engine.now if time == 0
time = Engine.now if time.to_i == 0
router.emit(tag, time, record)
end
end
Expand Down Expand Up @@ -130,7 +130,7 @@ def on_read(data)
@y.on_parse_complete = @on_message
else
m = method(:on_read_msgpack)
@u = MessagePack::Unpacker.new
@u = Fluent::Engine.msgpack_factory.unpacker
end

(class << self; self; end).module_eval do
Expand Down
10 changes: 8 additions & 2 deletions lib/fluent/plugin/out_exec_filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,15 @@ def configure(conf)

if @out_time_key
if f = @out_time_format
@time_parse_proc = Proc.new {|str| Time.strptime(str, f).to_i }
@time_parse_proc =
begin
strptime = Strptime.new(f)
Proc.new { |str| Fluent::EventTime.from_time(strptime.exec(str)) }
rescue
Proc.new {|str| Fluent::EventTime.from_time(Time.strptime(str, f)) }
end
else
@time_parse_proc = Proc.new {|str| str.to_i }
@time_parse_proc = Proc.new {|str| Fluent::EventTime.from_time(Time.at(str.to_f)) }
end
elsif @out_time_format
log.warn "out_time_format effects nothing when out_time_key is not specified: #{conf}"
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/out_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def write(chunk)
chain = NullOutputChain.instance
chunk.open {|io|
# TODO use MessagePackIoEventStream
u = MessagePack::Unpacker.new(io)
u = Fluent::Engine.msgpack_factory.unpacker(io)
begin
u.each {|(tag,entries)|
es = MultiEventStream.new
Expand Down
Loading