From b8625f0488c60310803a7f50a56985952d3deea7 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Tue, 14 Jan 2020 11:27:26 +0900 Subject: [PATCH 1/2] Support Ruby's Time class in msgpack serde Recent msgpack-ruby can (de)serialize Time class. Add enable_msgpack_time_support parameter to system config for this feature. This is useful for multiple time fields. Signed-off-by: Masahiro Nakagawa --- fluentd.gemspec | 2 +- lib/fluent/msgpack_factory.rb | 8 +++++++- lib/fluent/supervisor.rb | 4 ++-- lib/fluent/system_config.rb | 3 ++- test/config/test_system_config.rb | 2 ++ 5 files changed, 14 insertions(+), 5 deletions(-) diff --git a/fluentd.gemspec b/fluentd.gemspec index 3bcddb9c96..94551a2af9 100644 --- a/fluentd.gemspec +++ b/fluentd.gemspec @@ -18,7 +18,7 @@ Gem::Specification.new do |gem| gem.required_ruby_version = '>= 2.4' - gem.add_runtime_dependency("msgpack", [">= 1.2.0", "< 2.0.0"]) + gem.add_runtime_dependency("msgpack", [">= 1.3.1", "< 2.0.0"]) gem.add_runtime_dependency("yajl-ruby", ["~> 1.0"]) gem.add_runtime_dependency("cool.io", [">= 1.4.5", "< 2.0.0"]) gem.add_runtime_dependency("serverengine", [">= 2.0.4", "< 3.0.0"]) diff --git a/lib/fluent/msgpack_factory.rb b/lib/fluent/msgpack_factory.rb index 4ae251029e..044cdf77b1 100644 --- a/lib/fluent/msgpack_factory.rb +++ b/lib/fluent/msgpack_factory.rb @@ -70,9 +70,15 @@ def self.unpacker(*args) factory.unpacker(*args) end - def self.init + def self.init(enable_time_support: false) factory = MessagePack::Factory.new factory.register_type(Fluent::EventTime::TYPE, Fluent::EventTime) + if enable_time_support + factory.register_type( + MessagePack::Timestamp::TYPE, Time, + packer: MessagePack::Time::Packer, + unpacker: MessagePack::Time::Unpacker) + end @@engine_factory = factory end diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index c7e83da3db..db1b35a012 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -536,7 +536,7 @@ def run_supervisor(dry_run: false) begin ServerEngine::Privilege.change(@chuser, @chgroup) - MessagePackFactory.init + MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support) Fluent::Engine.init(@system_config, supervisor_mode: true) Fluent::Engine.run_configure(@conf, dry_run: dry_run) rescue Fluent::ConfigError => e @@ -584,7 +584,7 @@ def run_worker main_process do create_socket_manager if @standalone_worker ServerEngine::Privilege.change(@chuser, @chgroup) if @standalone_worker - MessagePackFactory.init + MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support) Fluent::Engine.init(@system_config) Fluent::Engine.run_configure(@conf) Fluent::Engine.run diff --git a/lib/fluent/system_config.rb b/lib/fluent/system_config.rb index f3f9420c20..e5d2b4401c 100644 --- a/lib/fluent/system_config.rb +++ b/lib/fluent/system_config.rb @@ -27,7 +27,7 @@ class SystemConfig :log_event_verbose, :without_source, :rpc_endpoint, :enable_get_dump, :process_name, :file_permission, :dir_permission, :counter_server, :counter_client, - :strict_config_value + :strict_config_value, :enable_msgpack_time_support ] config_param :workers, :integer, default: 1 @@ -42,6 +42,7 @@ class SystemConfig config_param :enable_get_dump, :bool, default: nil config_param :process_name, :string, default: nil config_param :strict_config_value, :bool, default: nil + config_param :enable_msgpack_time_support, :bool, default: nil config_param :file_permission, default: nil do |v| v.to_i(8) end diff --git a/test/config/test_system_config.rb b/test/config/test_system_config.rb index b347ae5a2b..6297e64314 100644 --- a/test/config/test_system_config.rb +++ b/test/config/test_system_config.rb @@ -76,6 +76,7 @@ def parse_text(text) assert_nil(sc.emit_error_log_interval) assert_nil(sc.suppress_config_dump) assert_nil(sc.without_source) + assert_nil(sc.enable_msgpack_time_support) assert_equal(:text, sc.log.format) assert_equal('%Y-%m-%d %H:%M:%S %z', sc.log.time_format) end @@ -89,6 +90,7 @@ def parse_text(text) 'suppress_config_dump' => ['suppress_config_dump', true], 'without_source' => ['without_source', true], 'strict_config_value' => ['strict_config_value', true], + 'enable_msgpack_time_support' => ['enable_msgpack_time_support', true], ) test "accepts parameters" do |(k, v)| conf = parse_text(<<-EOS) From 8045e5f5137348513f07308395370181e38ad343 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Tue, 14 Jan 2020 17:38:28 +0900 Subject: [PATCH 2/2] Apply same changes to MessagePackFactory.factory Signed-off-by: Masahiro Nakagawa --- lib/fluent/msgpack_factory.rb | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/lib/fluent/msgpack_factory.rb b/lib/fluent/msgpack_factory.rb index 044cdf77b1..c5e06ed49e 100644 --- a/lib/fluent/msgpack_factory.rb +++ b/lib/fluent/msgpack_factory.rb @@ -44,8 +44,8 @@ def msgpack_unpacker(*args) end end - def self.engine_factory - @@engine_factory || factory + def self.engine_factory(enable_time_support: false) + @@engine_factory || factory(enable_time_support: enable_time_support) end def self.msgpack_packer(*args) @@ -56,9 +56,15 @@ def self.msgpack_unpacker(*args) engine_factory.unpacker(*args) end - def self.factory + def self.factory(enable_time_support: false) factory = MessagePack::Factory.new factory.register_type(Fluent::EventTime::TYPE, Fluent::EventTime) + if enable_time_support + factory.register_type( + MessagePack::Timestamp::TYPE, Time, + packer: MessagePack::Time::Packer, + unpacker: MessagePack::Time::Unpacker) + end factory end