Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Symmetric time parse and format #1207

Merged
merged 16 commits into from
Sep 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/fluent/plugin/formatter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

require 'fluent/plugin/base'
require 'fluent/plugin/owned_by_mixin'

require 'fluent/mixin' # for TimeFormatter
require 'fluent/time'

module Fluent
module Plugin
class Formatter < Base
include OwnedByMixin
include TimeMixin::Formatter

configured_in :format

Expand Down
5 changes: 1 addition & 4 deletions lib/fluent/plugin/formatter_out_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ class OutFileFormatter < Formatter
end
end
config_param :time_type, :enum, list: [:float, :unixtime, :string], default: :string
config_param :time_format, :string, default: nil
config_param :localtime, :bool, default: true # if localtime is false and timezone is nil, then utc
config_param :timezone, :string, default: nil

def configure(conf)
# TODO: make a utility method in TimeFormatter to handle these conversion
Expand Down Expand Up @@ -63,7 +60,7 @@ def configure(conf)
when :float then ->(time){ time.to_r.to_f }
when :unixtime then ->(time){ time.to_i }
else
Fluent::TimeFormatter.new(@time_format, @localtime, @timezone)
time_formatter_create
end
end

Expand Down
50 changes: 3 additions & 47 deletions lib/fluent/plugin/parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@
require 'fluent/time'
require 'fluent/plugin/string_util'

require 'strptime'

module Fluent
module Plugin
class Parser < Base
include OwnedByMixin
include TimeMixin::Parser

class ParserError < StandardError; end

Expand All @@ -52,57 +51,14 @@ def call(*a, &b)
parse(*a, &b)
end

class TimeParser
def initialize(time_format)
@cache1_key = nil
@cache1_time = nil
@cache2_key = nil
@cache2_time = nil
@parser =
if time_format
begin
strptime = Strptime.new(time_format)
Proc.new { |value| Fluent::EventTime.from_time(strptime.exec(value)) }
rescue
Proc.new { |value| Fluent::EventTime.from_time(Time.strptime(value, time_format)) }
end
else
Proc.new { |value| Fluent::EventTime.parse(value) }
end
end

# TODO: new cache mechanism using format string
def parse(value)
unless value.is_a?(String)
raise ParserError, "value must be string: #{value}"
end

if @cache1_key == value
return @cache1_time
elsif @cache2_key == value
return @cache2_time
else
begin
time = @parser.call(value)
rescue => e
raise ParserError, "invalid time format: value = #{value}, error_class = #{e.class.name}, error = #{e.message}"
end
@cache1_key = @cache2_key
@cache1_time = @cache2_time
@cache2_key = value
@cache2_time = time
return time
end
end
end
TimeParser = Fluent::TimeParser
end

class ValuesParser < Parser
include Fluent::TypeConverter

config_param :keys, :array, default: []
config_param :time_key, :string, default: nil
config_param :time_format, :string, default: nil
config_param :null_value_pattern, :string, default: nil
config_param :null_empty_string, :bool, default: false

Expand All @@ -117,7 +73,7 @@ def configure(conf)
raise ConfigError, "time_format parameter is ignored because time_key parameter is not set. at #{conf.inspect}"
end

@time_parser = TimeParser.new(@time_format)
@time_parser = time_parser_create

if @null_value_pattern
@null_value_pattern = Regexp.new(@null_value_pattern)
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/parser_apache2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class Apache2Parser < Parser

def initialize
super
@time_parser = TimeParser.new(TIME_FORMAT)
@time_parser = time_parser_create(format: TIME_FORMAT)
@mutex = Mutex.new
end

Expand Down
5 changes: 2 additions & 3 deletions lib/fluent/plugin/parser_json.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ class JSONParser < Parser
Plugin.register_parser('json', self)

config_param :time_key, :string, default: 'time'
config_param :time_format, :string, default: nil
config_param :json_parser, :string, default: 'oj'

def configure(conf)
super

unless @time_format.nil?
@time_parser = TimeParser.new(@time_format)
if @time_format
@time_parser = time_parser_create
@mutex = Mutex.new
end

Expand Down
3 changes: 1 addition & 2 deletions lib/fluent/plugin/parser_regexp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ class RegexpParser < Parser
config_param :ignorecase, :bool, default: false
config_param :multiline, :bool, default: false
config_param :time_key, :string, default: 'time'
config_param :time_format, :string, default: nil

def initialize
super
Expand All @@ -18,7 +17,7 @@ def initialize

def configure(conf)
super
@time_parser = TimeParser.new(@time_format)
@time_parser = time_parser_create
unless @expression.empty?
if @expression[0] == "/" && @expression[-1] == "/"
regexp_option = 0
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/parser_syslog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class SyslogParser < Parser
# From in_syslog default pattern
REGEXP_WITH_PRI = /^\<(?<pri>[0-9]+)\>(?<time>[^ ]* {1,2}[^ ]* [^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$/

config_param :time_format, :string, default: "%b %d %H:%M:%S"
config_set_default :time_format, "%b %d %H:%M:%S"
config_param :with_priority, :bool, default: false

def initialize
Expand All @@ -40,7 +40,7 @@ def configure(conf)
super

@regexp = @with_priority ? REGEXP_WITH_PRI : REGEXP
@time_parser = TimeParser.new(@time_format)
@time_parser = time_parser_create
end

def patterns
Expand Down
1 change: 1 addition & 0 deletions lib/fluent/plugin_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
require 'fluent/plugin_helper/parser'
require 'fluent/plugin_helper/formatter'
require 'fluent/plugin_helper/inject'
require 'fluent/plugin_helper/extract'
require 'fluent/plugin_helper/retry_state'
require 'fluent/plugin_helper/compat_parameters'

Expand Down
90 changes: 90 additions & 0 deletions lib/fluent/plugin_helper/extract.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/event'
require 'fluent/time'
require 'fluent/configurable'

module Fluent
module PluginHelper
module Extract
def extract_tag_from_record(record)
return nil unless @_extract_enabled

if @_extract_tag_key && record.has_key?(@_extract_tag_key)
return record[@_extract_tag_key].to_s
end

nil
end

def extract_time_from_record(record)
return nil unless @_extract_enabled

if @_extract_time_key && record.has_key?(@_extract_time_key)
return @_extract_time_parser.call(record[@_extract_time_key])
end

nil
end

module ExtractParams
include Fluent::Configurable
config_section :extract, required: false, multi: false, param_name: :extract_config do
config_param :tag_key, :string, default: nil
config_param :time_key, :string, default: nil
config_param :time_type, :enum, list: [:float, :unixtime, :string], default: :float

Fluent::TimeMixin::TIME_PARAMETERS.each do |name, type, opts|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you use include Fluent::TimeMixin::TimeParameters?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because that mixin adds parameters on the top namespace, not in <extract> sections.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. implementation limitation...

config_param name, type, opts
end
end
end

def self.included(mod)
mod.include ExtractParams
end

def initialize
super
@_extract_enabled = false
@_extract_tag_key = nil
@_extract_time_key = nil
@_extract_time_parser = nil
end

def configure(conf)
super

if @extract_config
@_extract_tag_key = @extract_config.tag_key
@_extract_time_key = @extract_config.time_key
if @_extract_time_key
@_extract_time_parser = case @extract_config.time_type
when :float then ->(v){ Fluent::EventTime.new(v.to_i, ((v.to_f - v.to_i) * 1_000_000_000).to_i) }
when :unixtime then ->(v){ Fluent::EventTime.new(v.to_i, 0) }
else
localtime = @extract_config.localtime && !@extract_config.utc
Fluent::TimeParser.new(@extract_config.time_format, localtime, @extract_config.timezone)
end
end

@_extract_enabled = @_extract_tag_key || @_extract_time_key
end
end
end
end
end
20 changes: 8 additions & 12 deletions lib/fluent/plugin_helper/inject.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
#

require 'fluent/event'
require 'time'
require 'fluent/time'
require 'fluent/configurable'
require 'socket'

module Fluent
module PluginHelper
Expand Down Expand Up @@ -67,10 +68,10 @@ module InjectParams
config_param :tag_key, :string, default: nil
config_param :time_key, :string, default: nil
config_param :time_type, :enum, list: [:float, :unixtime, :string], default: :float
config_param :time_format, :string, default: nil
config_param :localtime, :bool, default: true # if localtime is false and timezone is nil, then utc
config_param :utc, :bool, default: false # placeholder to turn localtime to false
config_param :timezone, :string, default: nil

Fluent::TimeMixin::TIME_PARAMETERS.each do |name, type, opts|
config_param name, type, opts
end
end
end

Expand All @@ -89,12 +90,6 @@ def initialize
end

def configure(conf)
conf.elements('inject').each do |e|
if e.has_key?('utc') && Fluent::Config.bool_value(e['utc'])
e['localtime'] = 'false'
end
end

super

if @inject_config
Expand All @@ -113,7 +108,8 @@ def configure(conf)
when :float then ->(time){ time.to_r.truncate(+6).to_f } # microsecond floating point value
when :unixtime then ->(time){ time.to_i }
else
Fluent::TimeFormatter.new(@inject_config.time_format, @inject_config.localtime, @inject_config.timezone)
localtime = @inject_config.localtime && !@inject_config.utc
Fluent::TimeFormatter.new(@inject_config.time_format, localtime, @inject_config.timezone)
end
end

Expand Down
7 changes: 0 additions & 7 deletions lib/fluent/test/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,3 @@ def run(num_waits = 10, &block)
end
end
end

Test::Unit::Assertions.module_eval do
def assert_equal_event_time(a, b)
assert_equal(a.sec, b.sec)
assert_equal(a.nsec, b.nsec)
end
end
18 changes: 18 additions & 0 deletions lib/fluent/test/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@
module Fluent
module Test
module Helpers
# See "Example Custom Assertion: http://test-unit.github.io/test-unit/en/Test/Unit/Assertions.html
def assert_equal_event_time(expected, actual, message = nil)
message = build_message(message, <<EOT, expected, actual)
<?> expected but was
<?>.
EOT
assert_block(message) do
expected.is_a?(Fluent::EventTime) && actual.is_a?(Fluent::EventTime) && expected.sec == actual.sec && expected.nsec == actual.nsec
end
end

def config_element(name = 'test', argument = '', params = {}, elements = [])
Fluent::Config::Element.new(name, argument, params, elements)
end
Expand All @@ -37,6 +48,13 @@ def event_time(str=nil, format: nil)
end
end

def with_timezone(tz)
oldtz, ENV['TZ'] = ENV['TZ'], tz
yield
ensure
ENV['TZ'] = oldtz
end

def time2str(time, localtime: false, format: nil)
if format
if localtime
Expand Down
Loading