Skip to content

Commit

Permalink
Merge pull request #1063 from fluent/plugin-helper-tag-time-hostname
Browse files Browse the repository at this point in the history
Plugin helper tag time hostname
  • Loading branch information
tagomoris authored Jun 29, 2016
2 parents 9a7fd33 + 047dc83 commit 3d285b8
Show file tree
Hide file tree
Showing 11 changed files with 900 additions and 345 deletions.
74 changes: 37 additions & 37 deletions lib/fluent/compat/set_time_key_mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,54 +16,54 @@

require 'fluent/config/error'
require 'fluent/compat/record_filter_mixin'
require 'fluent/compat/time_formatter'
require 'fluent/time'
require 'fluent/timezone'

module Fluent
module Compat
module SetTimeKeyMixin
require 'fluent/timezone'
include RecordFilterMixin
module SetTimeKeyMixin
include RecordFilterMixin

attr_accessor :include_time_key, :time_key, :localtime, :timezone
attr_accessor :include_time_key, :time_key, :localtime, :timezone

def configure(conf)
@include_time_key = false
@localtime = false
@timezone = nil
def configure(conf)
@include_time_key = false
@localtime = false
@timezone = nil

super
super

if s = conf['include_time_key']
include_time_key = Fluent::Config.bool_value(s)
raise Fluent::ConfigError, "Invalid boolean expression '#{s}' for include_time_key parameter" if include_time_key.nil?
if s = conf['include_time_key']
include_time_key = Fluent::Config.bool_value(s)
raise Fluent::ConfigError, "Invalid boolean expression '#{s}' for include_time_key parameter" if include_time_key.nil?

@include_time_key = include_time_key
end
@include_time_key = include_time_key
end

if @include_time_key
@time_key = conf['time_key'] || 'time'
@time_format = conf['time_format']
if @include_time_key
@time_key = conf['time_key'] || 'time'
@time_format = conf['time_format']

if conf['localtime']
@localtime = true
elsif conf['utc']
@localtime = false
end
if conf['localtime']
@localtime = true
elsif conf['utc']
@localtime = false
end

if conf['timezone']
@timezone = conf['timezone']
Fluent::Timezone.validate!(@timezone)
end
if conf['timezone']
@timezone = conf['timezone']
Fluent::Timezone.validate!(@timezone)
end

@timef = Fluent::Compat::TimeFormatter.new(@time_format, @localtime, @timezone)
end
end
@timef = Fluent::TimeFormatter.new(@time_format, @localtime, @timezone)
end
end

def filter_record(tag, time, record)
super
def filter_record(tag, time, record)
super

record[@time_key] = @timef.format(time) if @include_time_key
end
end
end
end
record[@time_key] = @timef.format(time) if @include_time_key
end
end
end
end
114 changes: 0 additions & 114 deletions lib/fluent/compat/time_formatter.rb

This file was deleted.

46 changes: 37 additions & 9 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,25 @@ class EventStream
include Enumerable
include MessagePackFactory::Mixin

# dup does deep copy for event stream
def dup
raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end

def size
raise NotImplementedError, "DO NOT USE THIS CLASS directly."
end
alias :length :size

def empty?
size == 0
end

# for tests
def ==(other)
other.is_a?(EventStream) && self.to_msgpack_stream == other.to_msgpack_stream
end

def repeatable?
false
end
Expand Down Expand Up @@ -70,6 +84,14 @@ def repeatable?
true
end

def slice(index, num)
if index > 0 || num == 0
ArrayEventStream.new([])
else
self.dup
end
end

def each(&block)
block.call(@time, @record)
nil
Expand All @@ -86,7 +108,7 @@ def initialize(entries)
end

def dup
entries = @entries.map { |entry| entry.dup } # @entries.map(:dup) doesn't work by ArgumentError
entries = @entries.map{ |time, record| [time, record.dup] }
ArrayEventStream.new(entries)
end

Expand Down Expand Up @@ -119,17 +141,13 @@ def each(&block)
# 2. add events
# stream[tag].add(time, record)
class MultiEventStream < EventStream
def initialize
@time_array = []
@record_array = []
def initialize(time_array = [], record_array = [])
@time_array = time_array
@record_array = record_array
end

def dup
es = MultiEventStream.new
@time_array.zip(@record_array).each { |time, record|
es.add(time, record.dup)
}
es
MultiEventStream.new(@time_array.dup, @record_array.map(&:dup))
end

def size
Expand Down Expand Up @@ -166,6 +184,16 @@ def initialize(data, cached_unpacker = nil, size = 0)
@size = size
end

def empty?
# This is not correct, but actual number of records will be shown after iteration, and
# "size" argument is always 0 currently (because forward protocol doesn't tell it to destination)
false
end

def dup
MessagePackEventStream.new(@data.dup, @size)
end

def size
@size
end
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
require 'fluent/compat/handle_tag_name_mixin'
require 'fluent/compat/set_time_key_mixin'
require 'fluent/compat/set_tag_key_mixin'
require 'fluent/compat/time_formatter'
require 'fluent/compat/type_converter'

require 'fluent/time' # Fluent::TimeFormatter

module Fluent
TimeFormatter = Fluent::Compat::TimeFormatter
RecordFilterMixin = Fluent::Compat::RecordFilterMixin
HandleTagNameMixin = Fluent::Compat::HandleTagNameMixin
SetTimeKeyMixin = Fluent::Compat::SetTimeKeyMixin
Expand Down
Loading

0 comments on commit 3d285b8

Please sign in to comment.