diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index ae25b36342..b622720b37 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -15,6 +15,7 @@ # require 'fluent/plugin/base' +require 'fluent/plugin_helper/record_accessor' require 'fluent/log' require 'fluent/plugin_id' require 'fluent/plugin_helper' @@ -36,7 +37,7 @@ class Output < Base helpers_internal :thread, :retry_state CHUNK_KEY_PATTERN = /^[-_.@a-zA-Z0-9]+$/ - CHUNK_KEY_PLACEHOLDER_PATTERN = /\$\{[-_.@a-zA-Z0-9]+\}/ + CHUNK_KEY_PLACEHOLDER_PATTERN = /\$\{[-_.@$a-zA-Z0-9]+\}/ CHUNK_TAG_PLACEHOLDER_PATTERN = /\$\{(tag(?:\[\d+\])?)\}/ CHUNKING_FIELD_WARN_NUM = 4 @@ -161,7 +162,7 @@ def expired? attr_reader :num_errors, :emit_count, :emit_records, :write_count, :rollback_count # for tests - attr_reader :buffer, :retry, :secondary, :chunk_keys, :chunk_key_time, :chunk_key_tag + attr_reader :buffer, :retry, :secondary, :chunk_keys, :chunk_key_accessors, :chunk_key_time, :chunk_key_tag attr_accessor :output_enqueue_thread_waiting, :dequeued_chunks, :dequeued_chunks_mutex # output_enqueue_thread_waiting: for test of output.rb itself attr_accessor :retry_for_error_chunk # if true, error flush will be retried even if under_plugin_development is true @@ -203,7 +204,7 @@ def initialize @output_flush_threads = nil @simple_chunking = nil - @chunk_keys = @chunk_key_time = @chunk_key_tag = nil + @chunk_keys = @chunk_key_accessors = @chunk_key_time = @chunk_key_tag = nil @flush_mode = nil @timekey_zone = nil @@ -276,8 +277,25 @@ def configure(conf) @chunk_keys = @buffer_config.chunk_keys.dup @chunk_key_time = !!@chunk_keys.delete('time') @chunk_key_tag = !!@chunk_keys.delete('tag') - if @chunk_keys.any?{ |key| key !~ CHUNK_KEY_PATTERN } + if @chunk_keys.any? { |key| + begin + k = Fluent::PluginHelper::RecordAccessor::Accessor.parse_parameter(key) + if k.is_a?(String) + k !~ CHUNK_KEY_PATTERN + else + if key.start_with?('$[') + raise Fluent::ConfigError, "in chunk_keys: bracket notation is not allowed" + else + false + end + end + rescue => e + raise Fluent::ConfigError, "in chunk_keys: #{e.message}" + end + } raise Fluent::ConfigError, "chunk_keys specification includes invalid char" + else + @chunk_key_accessors = Hash[@chunk_keys.map { |key| [key.to_sym, Fluent::PluginHelper::RecordAccessor::Accessor.new(key)] }] end if @chunk_key_time @@ -778,7 +796,7 @@ def metadata(tag, time, record) else nil end - pairs = Hash[@chunk_keys.map{|k| [k.to_sym, record[k]]}] + pairs = Hash[@chunk_key_accessors.map { |k, a| [k, a.call(record)] }] @buffer.metadata(timekey: timekey, tag: (@chunk_key_tag ? tag : nil), variables: pairs) end end diff --git a/test/plugin/test_output.rb b/test/plugin/test_output.rb index 4937914f77..48dde91d3f 100644 --- a/test/plugin/test_output.rb +++ b/test/plugin/test_output.rb @@ -267,6 +267,18 @@ def waiting(seconds) assert_equal "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/value1/value2/tail", @i.extract_placeholders(tmpl, m) end + test '#extract_placeholders can extract nested variables if variables are configured with dot notation' do + @i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'key,$.nest.key', {})])) + assert !@i.chunk_key_time + assert !@i.chunk_key_tag + assert_equal ['key','$.nest.key'], @i.chunk_keys + tmpl = "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/${key}/${$.nest.key}/tail" + t = event_time('2016-04-11 20:30:00 +0900') + v = {:key => "value1", :"$.nest.key" => "value2"} + m = create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v) + assert_equal "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/value1/value2/tail", @i.extract_placeholders(tmpl, m) + end + test '#extract_placeholders can extract all chunk keys if configured' do @i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'time,tag,key1,key2', {'timekey' => 60*30, 'timekey_zone' => "+0900"})])) assert @i.chunk_key_time @@ -493,11 +505,21 @@ def waiting(seconds) assert_equal ['.hidden', '0001', '@timestamp', 'a_key', 'my-domain'], @i.get_placeholders_keys("http://${my-domain}/${.hidden}/${0001}/${a_key}?timestamp=${@timestamp}") end + data('include space' => 'ke y', + 'bracket notation' => "$['key']", + 'invalid notation' => "$.ke y") + test 'configure checks invalid chunk keys' do |chunk_keys| + i = create_output(:buffered) + assert_raise Fluent::ConfigError do + i.configure(config_element('ROOT' , '', {}, [config_element('buffer', chunk_keys)])) + end + end + test '#metadata returns object which contains tag/timekey/variables from records as specified in configuration' do tag = 'test.output' time = event_time('2016-04-12 15:31:23 -0700') timekey = event_time('2016-04-12 15:00:00 -0700') - record = {"key1" => "value1", "num1" => 1, "message" => "my message"} + record = {"key1" => "value1", "num1" => 1, "message" => "my message", "nest" => {"key" => "nested value"}} i1 = create_output(:buffered) i1.configure(config_element('ROOT','',{},[config_element('buffer', '')])) @@ -530,6 +552,10 @@ def waiting(seconds) i8 = create_output(:buffered) i8.configure(config_element('ROOT','',{},[config_element('buffer', 'time,tag,key1', {"timekey" => 3600, "timekey_zone" => "-0700"})])) assert_equal create_metadata(timekey: timekey, tag: tag, variables: {key1: "value1"}), i8.metadata(tag, time, record) + + i9 = create_output(:buffered) + i9.configure(config_element('ROOT','',{},[config_element('buffer', 'key1,$.nest.key', {})])) + assert_equal create_metadata(variables: {:key1 => "value1", :"$.nest.key" => 'nested value'}), i9.metadata(tag, time, record) end test '#emit calls #process via #emit_sync for non-buffered output' do