diff --git a/lib/fluent/plugin/in_exec.rb b/lib/fluent/plugin/in_exec.rb index c285136604..b62025c60f 100644 --- a/lib/fluent/plugin/in_exec.rb +++ b/lib/fluent/plugin/in_exec.rb @@ -45,6 +45,8 @@ class ExecInput < Fluent::Plugin::Input config_param :run_interval, :time, default: nil desc 'The default block size to read if parser requires partial read.' config_param :read_block_size, :size, default: 10240 # 10k + desc 'The encoding to receive the result of the command, especially for non-ascii characters.' + config_param :encoding, :string, default: nil attr_reader :parser @@ -63,9 +65,16 @@ def configure(conf) if !@tag && (!@extract_config || !@extract_config.tag_key) raise Fluent::ConfigError, "'tag' or 'tag_key' option is required on exec input" end + validate_encoding(@encoding) if @encoding @parser = parser_create end + def validate_encoding(encoding) + Encoding.find(encoding) + rescue ArgumentError => e + raise Fluent::ConfigError, e.message + end + def multi_workers_ready? true end @@ -73,10 +82,13 @@ def multi_workers_ready? def start super + options = { mode: [@connect_mode] } + options[:external_encoding] = @encoding if @encoding + if @run_interval - child_process_execute(:exec_input, @command, interval: @run_interval, mode: [@connect_mode], &method(:run)) + child_process_execute(:exec_input, @command, interval: @run_interval, **options, &method(:run)) else - child_process_execute(:exec_input, @command, immediate: true, mode: [@connect_mode], &method(:run)) + child_process_execute(:exec_input, @command, immediate: true, **options, &method(:run)) end end diff --git a/test/plugin/test_in_exec.rb b/test/plugin/test_in_exec.rb index cbce44313b..f7c9f6d705 100644 --- a/test/plugin/test_in_exec.rb +++ b/test/plugin/test_in_exec.rb @@ -218,6 +218,43 @@ def create_driver(conf) end end + sub_test_case 'encoding' do + data(immediate: "") + data(run_interval: "run_interval 1") + test 'can handle non-ascii characters' do |additional_setting| + content = 'ひらがな漢字' + + d = create_driver %[ + command ruby -e "puts '#{content}'" + tag test + encoding utf-8 + + @type none + + #{additional_setting} + ] + + d.run(expect_records: 1, timeout: 10) + + assert_equal 1, d.events.length + tag, time, record = d.events.first + assert_equal({"message" => content}, record) + end + + test 'raise ConfigError for invalid encoding' do + assert_raise Fluent::ConfigError do + d = create_driver %[ + command ruby -e "puts foo" + tag test + encoding invalid-encode + + @type none + + ] + end + end + end + data( 'default' => [TSV_CONFIG, "tag1", event_time("2011-01-02 13:14:15"), {"k1"=>"ok"}], 'json' => [JSON_CONFIG, "tag1", event_time("2011-01-02 13:14:15"), {"k1"=>"ok"}],