Skip to content

Commit

Permalink
Support Ruby's Time class in msgpack serde
Browse files Browse the repository at this point in the history
Recent msgpack-ruby can (de)serialize Time class.
Add enable_msgpack_time_support parameter to system config for this feature.
This is useful for multiple time fields.

Signed-off-by: Masahiro Nakagawa <[email protected]>
  • Loading branch information
repeatedly committed Jan 14, 2020
1 parent afdd907 commit b8625f0
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 5 deletions.
2 changes: 1 addition & 1 deletion fluentd.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Gem::Specification.new do |gem|

gem.required_ruby_version = '>= 2.4'

gem.add_runtime_dependency("msgpack", [">= 1.2.0", "< 2.0.0"])
gem.add_runtime_dependency("msgpack", [">= 1.3.1", "< 2.0.0"])
gem.add_runtime_dependency("yajl-ruby", ["~> 1.0"])
gem.add_runtime_dependency("cool.io", [">= 1.4.5", "< 2.0.0"])
gem.add_runtime_dependency("serverengine", [">= 2.0.4", "< 3.0.0"])
Expand Down
8 changes: 7 additions & 1 deletion lib/fluent/msgpack_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,15 @@ def self.unpacker(*args)
factory.unpacker(*args)
end

def self.init
def self.init(enable_time_support: false)
factory = MessagePack::Factory.new
factory.register_type(Fluent::EventTime::TYPE, Fluent::EventTime)
if enable_time_support
factory.register_type(
MessagePack::Timestamp::TYPE, Time,
packer: MessagePack::Time::Packer,
unpacker: MessagePack::Time::Unpacker)
end
@@engine_factory = factory
end

Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ def run_supervisor(dry_run: false)

begin
ServerEngine::Privilege.change(@chuser, @chgroup)
MessagePackFactory.init
MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support)
Fluent::Engine.init(@system_config, supervisor_mode: true)
Fluent::Engine.run_configure(@conf, dry_run: dry_run)
rescue Fluent::ConfigError => e
Expand Down Expand Up @@ -584,7 +584,7 @@ def run_worker
main_process do
create_socket_manager if @standalone_worker
ServerEngine::Privilege.change(@chuser, @chgroup) if @standalone_worker
MessagePackFactory.init
MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support)
Fluent::Engine.init(@system_config)
Fluent::Engine.run_configure(@conf)
Fluent::Engine.run
Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/system_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class SystemConfig
:log_event_verbose,
:without_source, :rpc_endpoint, :enable_get_dump, :process_name,
:file_permission, :dir_permission, :counter_server, :counter_client,
:strict_config_value
:strict_config_value, :enable_msgpack_time_support
]

config_param :workers, :integer, default: 1
Expand All @@ -42,6 +42,7 @@ class SystemConfig
config_param :enable_get_dump, :bool, default: nil
config_param :process_name, :string, default: nil
config_param :strict_config_value, :bool, default: nil
config_param :enable_msgpack_time_support, :bool, default: nil
config_param :file_permission, default: nil do |v|
v.to_i(8)
end
Expand Down
2 changes: 2 additions & 0 deletions test/config/test_system_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def parse_text(text)
assert_nil(sc.emit_error_log_interval)
assert_nil(sc.suppress_config_dump)
assert_nil(sc.without_source)
assert_nil(sc.enable_msgpack_time_support)
assert_equal(:text, sc.log.format)
assert_equal('%Y-%m-%d %H:%M:%S %z', sc.log.time_format)
end
Expand All @@ -89,6 +90,7 @@ def parse_text(text)
'suppress_config_dump' => ['suppress_config_dump', true],
'without_source' => ['without_source', true],
'strict_config_value' => ['strict_config_value', true],
'enable_msgpack_time_support' => ['enable_msgpack_time_support', true],
)
test "accepts parameters" do |(k, v)|
conf = parse_text(<<-EOS)
Expand Down

0 comments on commit b8625f0

Please sign in to comment.