Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] v0.14 plugin APIs #562

Closed
wants to merge 111 commits into from
Closed
Show file tree
Hide file tree
Changes from 109 commits
Commits
Show all changes
111 commits
Select commit Hold shift + click to select a range
6a8b284
Refactor to use Registry as Plugin dictionary, instead of original impl.
tagomoris Mar 3, 2015
ba61202
Plugin base class change: Fluent::** -> Fluent::Plugin::**
tagomoris Mar 3, 2015
249bd0e
Add Fluent::Plugin::Base
tagomoris Mar 5, 2015
31f56b2
Add Fluent::PluginSupport to provide thread/timer/child_process opera…
tagomoris Mar 5, 2015
29eeaa3
Add new Test Driver with simplified API
tagomoris Mar 5, 2015
f7e7c39
Re-implement in_exec w/ v0.14 API (tests: green)
tagomoris Mar 5, 2015
06bac36
Fix bug
tagomoris Mar 6, 2015
65d6356
fix not to use unreliable "sleep"
tagomoris Mar 6, 2015
542f4d1
fix tests w/ modern test-unit styles
tagomoris Mar 6, 2015
a4e5f15
fix bug not to count emit_times
tagomoris Mar 6, 2015
5e38ba5
add Fluent::PluginSupport::Timer to provide timer helpers
tagomoris Mar 6, 2015
218aaef
Re-implement in_status w/ v0.14 API (tests: green)
tagomoris Mar 6, 2015
91a7aa5
delete meaningless method implementations
tagomoris Mar 6, 2015
ec915cc
keyword argument w/o default value is not supported 2.0 yet :(
tagomoris Mar 6, 2015
e5258ba
fix bug to call nil.join for child_process_once
tagomoris Mar 6, 2015
28bf3b3
Add event_loop PluginSupport to manage event loop per plugin instance
tagomoris Mar 10, 2015
d618ab4
add missing requirement
tagomoris Mar 10, 2015
48679c7
Add TCPServer PluginSupport to create TCP server per plugin instance
tagomoris Mar 10, 2015
4ebce91
Re-implement in_http w/ v0.14 API (tests: green)
tagomoris Mar 10, 2015
67f9825
fix to check keepalive enabled explicitly
tagomoris Mar 10, 2015
72ed894
remove Timer dependency (circular reference)
tagomoris Mar 16, 2015
3f9f5be
fix to be able to call tcp_server_listen 2 or more times
tagomoris Mar 16, 2015
afb56c4
Add delimiter to specify separator for callback input data
tagomoris Mar 16, 2015
2c5ddbd
Add UDPServer PluginSupport to create UDP server per plugin instance
tagomoris Mar 16, 2015
f4302f0
Re-implement in_syslog w/ v0.14 API (tests: green)
tagomoris Mar 16, 2015
39d79ba
Separate PluginId mixin from config.rb, which is above understanding
tagomoris Mar 18, 2015
7a02271
Add plugin name and its id (if exists) as header of plugin logs
tagomoris Mar 18, 2015
bee2e5e
fix mark Fluent::DEFAULT_LISTEN_PORT as obsolete (it is of in/out_for…
tagomoris Mar 18, 2015
d9bf78e
add checks
tagomoris Mar 18, 2015
8886a02
add API to UDPServer to get socket already bound to send data
tagomoris Mar 18, 2015
ea7fd83
fix timing bug to run event loop w/o attached watchers or to stop loo…
tagomoris Mar 18, 2015
b4984f8
add dependency explicitly
tagomoris Mar 18, 2015
a524e98
raise errors temporally to code correctly
tagomoris Mar 18, 2015
7ceb2dd
Re-implement in_forward standard features w/ v0.14 APIs (tests: green)
tagomoris Mar 18, 2015
50ef811
Separate PluginSupport::EventEmitter to make OutputPlugins not to emi…
tagomoris Mar 23, 2015
bfd1a4b
Fix Plugin shutdown sequence w/ five steps to shutdown them cleanly
tagomoris Mar 23, 2015
2639c29
Fix with refactored method name
tagomoris Mar 23, 2015
8bfc6f7
fix with new plugin lifecycle
tagomoris Mar 23, 2015
614be9b
wrong equality assertion
tagomoris Mar 23, 2015
79d543f
fix to work with new Fluent::Plugin::ForwardInput
tagomoris Mar 23, 2015
ac75066
Add Forwarding over SSL feature in rough, not tested yet
tagomoris Mar 23, 2015
d9fd3f8
add default ciphers referring httpclient defaults
tagomoris Mar 24, 2015
f69131b
fix to help unexpected IOError make tests failed
tagomoris Mar 24, 2015
185852b
add configuration checks for SSL certs
tagomoris Mar 24, 2015
d94de7a
fix bug for variable names
tagomoris Mar 24, 2015
da41aac
Fix bugs
tagomoris Mar 24, 2015
b992d18
Add tests over SSL (green)
tagomoris Mar 24, 2015
1caae14
Add Fluent::PluginSupport::Socket to provide guard for listening sock…
tagomoris Mar 24, 2015
50a59a0
Decide to support Ruby 2.1 or later for Fluentd v0.14 or later.
tagomoris Mar 24, 2015
1a8d026
add TODOs
tagomoris Mar 27, 2015
6a7506b
re-implement in_tcp/in_udp w/ v0.14 PluginSupport API (tests: green)
tagomoris Mar 27, 2015
1f51460
fix to be able to get any number of available ports
tagomoris Mar 27, 2015
07fb5b6
fix to respond UDP heartbeat
tagomoris Mar 27, 2015
4cd54e3
add tests about UDP heartbeats
tagomoris Mar 27, 2015
8633707
Fix bug to break compatibility: default is 1
tagomoris Mar 27, 2015
3f80669
Fix addr/port resolution bug
tagomoris Mar 30, 2015
2e9788b
Add authentication feature to in_forward, and its tests (green)
tagomoris Mar 30, 2015
80d7218
remove buggy specification
tagomoris Mar 30, 2015
34289f0
fix bug not to return a port number
tagomoris Mar 30, 2015
387d6d6
remove unused old test stub
tagomoris Mar 30, 2015
998f9ab
Ruby 2.1 style keyword argument, without default value
tagomoris Mar 30, 2015
b7769f8
event timer can get float
tagomoris Mar 30, 2015
6706998
fix APIs about SO_LINGER and backlog
tagomoris Mar 30, 2015
0e73cca
fix typo
tagomoris Mar 30, 2015
b0c3d09
fix to rescue RuntimeError only (other bugs should be raised directly)
tagomoris Mar 30, 2015
14681be
SO_LINGER make sockets disconnected immediately
tagomoris Mar 30, 2015
c1d391c
shrink socket read loop to simplify
tagomoris Mar 30, 2015
f5d68ce
fix to send heartbeat udp packet continuously for slow environment (T…
tagomoris Mar 30, 2015
4a5fd72
fix to use different variable name for different block argument expli…
tagomoris Mar 30, 2015
f56e913
debug print...
tagomoris Mar 30, 2015
fd31707
typo
tagomoris Mar 30, 2015
b77899c
debug print more
tagomoris Mar 30, 2015
06ff378
rescue SSLErrorReadWouldBlock explicitly, and set buf as blank
tagomoris Mar 30, 2015
a4f9d4a
much more debug print
tagomoris Mar 30, 2015
5d741bf
debug print once more
tagomoris Mar 31, 2015
4ec67b4
fix to flush data on testing write socket
tagomoris Mar 31, 2015
25b7dc5
Fix to sort emitted events with its time
tagomoris Mar 31, 2015
88ff1e7
remove debug prints
tagomoris Mar 31, 2015
c570151
add note for test code
tagomoris Mar 31, 2015
f9a585d
fix to rescue EOF for child process i/o
tagomoris Mar 31, 2015
1a3d979
remove unused file
tagomoris Apr 6, 2015
da4f7f6
Fix bug to overwrite all subsections definition by subclass
tagomoris Apr 6, 2015
7384841
Add plugin storage which is used by plugins over PluginSupport::Storage
tagomoris Apr 7, 2015
1aba7a5
fix to stop timer events independently from event_collector shutdown
tagomoris Apr 7, 2015
96374c2
Re-implement DummyInput plugin w/ v0.14 APIs (test: green)
tagomoris Apr 7, 2015
bca486f
Separate SystemConfig class from Supervisor, and add "plugin_storage_…
tagomoris Apr 8, 2015
8d06747
Add methods for `p` output
tagomoris Apr 8, 2015
9fb4479
Add SystemConfigMixin
tagomoris Apr 8, 2015
e9d4d39
Add method to check whether plugin id is set by user explicitly, or not
tagomoris Apr 8, 2015
d208abc
Fix many bugs of plugin storage
tagomoris Apr 8, 2015
5be758b
Add `suspend` option to restart incremental value after stop-and-star…
tagomoris Apr 8, 2015
bf21a72
Revert "Fix bug to overwrite all subsections definition by subclass"
tagomoris Apr 8, 2015
369f802
Fix bug not to publish method `writing?` on connection handler
tagomoris Apr 14, 2015
8abdd4a
Fix to enforce to use `router.emit` instead of `Engine.emit`
tagomoris Apr 14, 2015
b206b33
Add compatibility layer for output plugin
tagomoris Apr 16, 2015
0d4bde5
fix to specify file/dir permissions
tagomoris Apr 16, 2015
5f7c9ec
Writing v0.14 Buffer API (not tested now)
tagomoris Apr 16, 2015
6d01de6
Implementing new Buffer APIs, especially emitting data
tagomoris Apr 23, 2015
08ebd9a
Fix plugin_id to check duplication
tagomoris Apr 23, 2015
8b9d76f
Re-fix file path encoding, metadata storing and many others w/ fixed …
tagomoris Apr 23, 2015
71b18c9
Fix v0.14 Buffer API and common implementations
tagomoris Apr 23, 2015
522aa3d
Add initial implementations of MemoryBuffer/FileBuffer, but not tested
tagomoris Apr 23, 2015
68b3758
add tests for reserved fields of sections
tagomoris May 25, 2015
e0c890c
first commit for new BufferedOutput plugin class (not tested at all)
tagomoris May 25, 2015
b39cab4
Fix to use server cert or CA cert as far as possible
tagomoris May 29, 2015
478bab1
buffer chunk/total bytes limits are managed by buffer implementation …
tagomoris May 29, 2015
e5e1bf4
fix not to leak metadata_list
tagomoris May 29, 2015
51c1377
implement buffered output plugin base classes with v0.14 API
tagomoris May 29, 2015
55d7f8a
fix require paths and syntax misusage
tagomoris Feb 2, 2016
e33dd5c
remove already obsoleted plugins and these tests
tagomoris Feb 4, 2016
9c58f42
move all plugin registries to fluent/plugin, and fix some APIs, requi…
tagomoris Feb 4, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
language: ruby

rvm:
- 2.0.0
- 2.1
- 2.1.5
- 2.2.3
- 2.3.0
- ruby-head
Expand Down
10 changes: 10 additions & 0 deletions example/in_status.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<source>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in_status will be revmoed. so no need this configuration.

@type status
emit_interval 3
tag "test"
</source>

<match test>
@type stdout
</match>

1 change: 0 additions & 1 deletion lib/fluent/command/debug.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
puts "Usage:"
puts " Engine.match('some.tag').output : get an output plugin instance"
puts " Engine.sources[i] : get input plugin instances"
puts " Plugin.load_plugin(type,name) : load plugin class (use this if you get DRb::DRbUnknown)"
puts ""

Encoding.default_internal = nil if Encoding.respond_to?(:default_internal)
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/command/fluentd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@
opts[:without_source] = b
}

op.on('--plugin-storage-path DIR_PATH', "directory path which is used for storages of plugin internal data") {|s|
opts[:plugin_storage_path] = s
}

op.on('--use-v1-config', "Use v1 configuration format (default)", TrueClass) {|b|
opts[:use_v1_config] = b
}
Expand Down
13 changes: 0 additions & 13 deletions lib/fluent/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,4 @@ def self.new(name = '')
Element.new(name, '', {}, [])
end
end

require 'fluent/configurable'

module PluginId
def configure(conf)
@id = conf['@id'] || conf['id']
super
end

def plugin_id
@id ? @id : "object:#{object_id.to_s(16)}"
end
end
end
14 changes: 1 addition & 13 deletions lib/fluent/config/configure_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,7 @@ def merge(other) # self is base class, other is subclass
merged.argument = other.argument || self.argument
merged.params = self.params.merge(other.params)
merged.defaults = self.defaults.merge(other.defaults)
merged.sections = {}
(self.sections.keys + other.sections.keys).uniq.each do |section_key|
self_section = self.sections[section_key]
other_section = other.sections[section_key]
merged_section = if self_section && other_section
self_section.merge(other_section)
elsif self_section || other_section
self_section || other_section
else
raise "BUG: both of self and other section are nil"
end
merged.sections[section_key] = merged_section
end
merged.sections = self.sections.merge(other.sections)

merged
end
Expand Down
19 changes: 18 additions & 1 deletion lib/fluent/config/section.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ def initialize(params = {})

alias :object_id :__id__

def to_s
inspect
end

def inspect
"<Fluent::Config::Section #{@params.to_json}>"
end
Expand Down Expand Up @@ -62,6 +66,19 @@ def [](key)
@params[key.to_sym]
end

def respond_to?(symbol, include_all=false)
case symbol
when :inspect, :nil?, :to_h, :+, :instance_of?, :kind_of?, :[], :respond_to?, :respond_to_missing?, :method_missing,
true
when :!, :!= , :==, :equal?, :instance_eval, :instane_exec
true
when :method_missing, :singleton_method_added, :singleton_method_removed, :singleton_method_undefined
include_all
else
false
end
end

def respond_to_missing?(symbol, include_private)
@params.has_key?(symbol)
end
Expand All @@ -70,7 +87,7 @@ def method_missing(name, *args)
if @params.has_key?(name)
@params[name]
else
super
::Kernel.raise ::NoMethodError, "undefined method `#{name}' for #{self.inspect}"
end
end
end
Expand Down
19 changes: 11 additions & 8 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ module Fluent
require 'fluent/event_router'
require 'fluent/root_agent'
require 'fluent/time'
require 'fluent/system_config'

class EngineClass
def initialize
Expand All @@ -34,26 +35,30 @@ def initialize

@msgpack_factory = MessagePack::Factory.new
@msgpack_factory.register_type(Fluent::EventTime::TYPE, Fluent::EventTime)
@system_config = SystemConfig.new({})
end

MATCH_CACHE_SIZE = 1024
LOG_EMIT_INTERVAL = 0.1

attr_reader :system_config
attr_reader :root_agent
attr_reader :matches, :sources
attr_reader :msgpack_factory

def init(opts = {})
BasicSocket.do_not_reverse_lookup = true
Plugin.load_plugins
if defined?(Encoding)
Encoding.default_internal = 'ASCII-8BIT' if Encoding.respond_to?(:default_internal)
Encoding.default_external = 'ASCII-8BIT' if Encoding.respond_to?(:default_external)
end

@system_config = opts[:system_config] if opts[:system_config]

suppress_interval(opts[:suppress_interval]) if opts[:suppress_interval]
@suppress_config_dump = opts[:suppress_config_dump] if opts[:suppress_config_dump]
@without_source = opts[:without_source] if opts[:without_source]
@plugin_storage_path = opts[:plugin_storage_path] if opts[:plugin_storage_path]

@root_agent = RootAgent.new(opts)

Expand Down Expand Up @@ -113,22 +118,20 @@ def configure(conf)
end
end

def load_plugin_dir(dir)
Plugin.load_plugin_dir(dir)
def add_plugin_dir(dir)
Plugin.add_plugin_dir(dir)
end

def emit(tag, time, record)
unless record.nil?
emit_stream tag, OneEventStream.new(time, record)
end
raise "BUG: use router.emit instead of Engine.emit"
end

def emit_array(tag, array)
emit_stream tag, ArrayEventStream.new(array)
raise "BUG: use router.emit_array instead of Engine.emit_array"
end

def emit_stream(tag, es)
@event_router.emit_stream(tag, es)
raise "BUG: use router.emit_stream instead of Engine.emit_stream"
end

def flush!
Expand Down
6 changes: 3 additions & 3 deletions lib/fluent/env.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ module Fluent
DEFAULT_CONFIG_PATH = ENV['FLUENT_CONF'] || '/etc/fluent/fluent.conf'
DEFAULT_PLUGIN_DIR = ENV['FLUENT_PLUGIN'] || '/etc/fluent/plugin'
DEFAULT_SOCKET_PATH = ENV['FLUENT_SOCKET'] || '/var/run/fluent/fluent.sock'
DEFAULT_LISTEN_PORT = 24224
DEFAULT_FILE_PERMISSION = 0644
DEFAULT_DIR_PERMISSION = 0755
DEFAULT_LISTEN_PORT = 24224 # TODO: obsolete
DEFAULT_FILE_PERMISSION = 0644 # TODO: configurable w/ <system>
DEFAULT_DIR_PERMISSION = 0755 # TODO: configurable w/ <system>
IS_WINDOWS = /mswin|mingw/ === RUBY_PLATFORM
private_constant :IS_WINDOWS

Expand Down
1 change: 1 addition & 0 deletions lib/fluent/filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

module Fluent
class Filter
# TODO: move to plugin/filter.rb, and make interoperability layer here
include Configurable
include PluginId
include PluginLoggerMixin
Expand Down
31 changes: 4 additions & 27 deletions lib/fluent/input.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,10 @@
# limitations under the License.
#

module Fluent
class Input
include Configurable
include PluginId
include PluginLoggerMixin

attr_accessor :router

def initialize
super
end

def configure(conf)
super
require 'fluent/plugin/input'

if label_name = conf['@label']
label = Engine.root_agent.find_label(label_name)
@router = label.event_router
elsif @router.nil?
@router = Engine.root_agent.event_router
end
end

def start
end

def shutdown
end
module Fluent
class Input < Plugin::Input
# TODO: add interoperability layer
end
end
8 changes: 4 additions & 4 deletions lib/fluent/load.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
require 'fluent/parser'
require 'fluent/formatter'
require 'fluent/event'
require 'fluent/buffer'
require 'fluent/input'
require 'fluent/output'
require 'fluent/filter'
require 'fluent/plugin/buffer'
require 'fluent/plugin/input'
require 'fluent/plugin/output'
require 'fluent/plugin/filter'
require 'fluent/match'
15 changes: 13 additions & 2 deletions lib/fluent/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ def initialize(out=STDERR, level=LEVEL_TRACE, opts={})
# TODO: This variable name is unclear so we should change to better name.
@threads_exclude_events = []

@optional_header = nil
@optional_attrs = nil

if opts.has_key?(:suppress_repeated_stacktrace)
@suppress_repeated_stacktrace = opts[:suppress_repeated_stacktrace]
end
Expand All @@ -74,6 +77,7 @@ def initialize(out=STDERR, level=LEVEL_TRACE, opts={})
attr_accessor :level
attr_accessor :tag
attr_accessor :time_format
attr_accessor :optional_header, :optional_attrs

def enable_debug(b=true)
@debug_mode = b
Expand Down Expand Up @@ -263,8 +267,8 @@ def dump_stacktrace(backtrace, level)

def event(level, args)
time = Time.now
message = ''
map = {}
message = @optional_header ? @optional_header.dup : ''
map = @optional_attrs ? @optional_attrs.dup : {}
args.each {|a|
if a.is_a?(Hash)
a.each_pair {|k,v|
Expand Down Expand Up @@ -364,6 +368,13 @@ def configure(conf)
@log = PluginLogger.new($log)
end
@log.level = @log_level
@log.optional_header = "[#{self.class.name}#{@_id_configured ? "(" + @id + ")" : ""}] "
@log.optional_attrs = {
'plugin_type' => self.class.name,
}
if @_id_configured
@log.optional_attrs.update({'plugin_id' => @id})
end
end
end
end
Expand Down
Loading