Skip to content

Commit

Permalink
adding root_dir system config param and plugin_root_dir method for pl…
Browse files Browse the repository at this point in the history
…ugins
  • Loading branch information
tagomoris committed Dec 15, 2016
1 parent 4bc9ebd commit ad8fda7
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 24 deletions.
4 changes: 4 additions & 0 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ def has_router?
false
end

def plugin_root_dir
nil # override this in plugin_id.rb
end

def configure(conf)
super
@_state ||= State.new(false, false, false, false, false, false, false, false, false)
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class FileBuffer < Fluent::Plugin::Buffer

DIR_PERMISSION = 0755

# TODO: buffer_path based on system config
# TODO: plugin_root_dir
desc 'The path where buffer chunks are stored.'
config_param :path, :string

Expand Down
1 change: 1 addition & 0 deletions lib/fluent/plugin/storage_local.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class LocalStorage < Storage
DEFAULT_DIR_MODE = 0755
DEFAULT_FILE_MODE = 0644

# TODO: plugin_root_dir
config_param :path, :string, default: nil
config_param :mode, :integer, default: DEFAULT_FILE_MODE
config_param :dir_mode, :integer, default: DEFAULT_DIR_MODE
Expand Down
12 changes: 12 additions & 0 deletions lib/fluent/plugin_id.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def configure(conf)
end
@@configured_ids.add(@id)
end
@_plugin_root_dir = nil

super
end
Expand Down Expand Up @@ -59,5 +60,16 @@ def plugin_id
"object:#{object_id.to_s(16)}"
end
end

def plugin_root_dir
return @_plugin_root_dir if @_plugin_root_dir
return nil unless system_config.root_dir
return nil unless plugin_id_configured?
worker_id = (ENV['SERVERENGINE_WORKER_ID'] || 0).to_i
dir = File.join(system_config.root_dir, "worker#{worker_id}", plugin_id)
FileUtils.mkdir_p(dir) unless Dir.exist?(dir)
@_plugin_root_dir = dir.freeze
dir
end
end
end
27 changes: 25 additions & 2 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

require 'etc'
require 'fcntl'
require 'fileutils'

require 'fluent/config'
require 'fluent/env'
Expand Down Expand Up @@ -192,7 +193,10 @@ def supervisor_get_dump_config_handler
module WorkerModule
def spawn(process_manager)
main_cmd = config[:main_cmd]
@pm = process_manager.spawn(*main_cmd)
env = {
'SERVERENGINE_WORKER_ID' => @worker_id.to_i.to_s,
}
@pm = process_manager.spawn(env, *main_cmd)
end

def after_start
Expand Down Expand Up @@ -226,6 +230,7 @@ def self.load_config(path, params = {})
fluentd_conf = Fluent::Config.parse(config_data, config_fname, config_basedir, params['use_v1_config'])
system_config = SystemConfig.create(fluentd_conf)

root_dir = system_config.root_dir || params['root_dir']
log_level = system_config.log_level || params['log_level']
suppress_repeated_stacktrace = system_config.suppress_repeated_stacktrace || params['suppress_repeated_stacktrace']
log_path = params['log_path']
Expand Down Expand Up @@ -264,6 +269,7 @@ def self.load_config(path, params = {})
auto_heartbeat: false,
unrecoverable_exit_codes: [2],
stop_immediately_at_unrecoverable_exit: true,
root_dir: root_dir,
logger: logger,
log: logger.out,
log_path: log_path,
Expand Down Expand Up @@ -365,6 +371,7 @@ def self.default_options
setup_path: nil,
chuser: nil,
chgroup: nil,
root_dir: nil,
suppress_interval: 0,
suppress_repeated_stacktrace: true,
without_source: false,
Expand Down Expand Up @@ -393,6 +400,7 @@ def initialize(opt)
@rpc_server = nil
@process_name = nil

@root_dir = opt[:root_dir]
@log_level = opt[:log_level]
@log_rotate_age = opt[:log_rotate_age]
@log_rotate_size = opt[:log_rotate_size]
Expand All @@ -417,6 +425,20 @@ def run_supervisor
read_config
set_system_config

if @root_dir
if File.exist?(@root_dir)
unless Dir.exist?(@root_dir)
raise Fluent::InvalidRootDirectory, "non directory entry exists:#{@root_dir}"
end
else
begin
FileUtils.mkdir_p(@root_dir)
rescue => e
raise Fluent::InvalidRootDirectory, "failed to create root directory:#{@root_dir}, #{e.inspect}"
end
end
end

dry_run if @dry_run
supervise
end
Expand All @@ -426,7 +448,8 @@ def options
'config_path' => @config_path,
'pid_file' => @daemonize,
'plugin_dirs' => @plugin_dirs,
'log_path' => @log_path
'log_path' => @log_path,
'root_dir' => @root_dir,
}
end

Expand Down
45 changes: 24 additions & 21 deletions lib/fluent/system_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ module Fluent
class SystemConfig
include Configurable

SYSTEM_CONFIG_PARAMETERS = [
:root_dir, :log_level,
:suppress_repeated_stacktrace, :emit_error_log_interval, :suppress_config_dump,
:without_source, :rpc_endpoint, :enable_get_dump, :process_name,
:file_permission, :dir_permission,
]

config_param :root_dir, :string, default: nil
config_param :log_level, default: nil do |level|
Log.str_to_level(level)
end
Expand Down Expand Up @@ -68,33 +76,28 @@ def initialize(conf=nil)

def dup
s = SystemConfig.new
s.log_level = @log_level
s.suppress_repeated_stacktrace = @suppress_repeated_stacktrace
s.emit_error_log_interval = @emit_error_log_interval
s.suppress_config_dump = @suppress_config_dump
s.without_source = @without_source
s.rpc_endpoint = @rpc_endpoint
s.enable_get_dump = @enable_get_dump
s.process_name = @process_name
s.file_permission = @file_permission
s.dir_permission = @dir_permission

SYSTEM_CONFIG_PARAMETERS.each do |param|
s.__send__("#{param}=", instance_variable_get("@#{param}")
end
s
end

def apply(supervisor)
system = self
supervisor.instance_eval {
@log.level = @log_level = system.log_level unless system.log_level.nil?
@suppress_interval = system.emit_error_log_interval unless system.emit_error_log_interval.nil?
@suppress_config_dump = system.suppress_config_dump unless system.suppress_config_dump.nil?
@suppress_repeated_stacktrace = system.suppress_repeated_stacktrace unless system.suppress_repeated_stacktrace.nil?
@without_source = system.without_source unless system.without_source.nil?
@rpc_endpoint = system.rpc_endpoint unless system.rpc_endpoint.nil?
@enable_get_dump = system.enable_get_dump unless system.enable_get_dump.nil?
@process_name = system.process_name unless system.process_name.nil?
@file_permission = system.file_permission unless system.file_permission.nil?
@dir_permission = system.dir_permission unless system.dir_permission.nil?
SYSTEM_CONFIG_PARAMETERS.each do |param|
param_value = system.send(param)
next if param_value.nil?

case param
when :log_level
@log.level = @log_level = param_value
when :emit_error_log_interval
@suppress_interval = param_value
else
instance_variable_set("@#{param}", param_value)
end
end
}
end

Expand Down

0 comments on commit ad8fda7

Please sign in to comment.