From 50e6a5006d5f491f122010fd10730e28c1f99f98 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Fri, 16 Aug 2019 01:55:23 +0900 Subject: [PATCH 1/2] in_tcp: Use emit_stream to reduce emit call for fixed tag case Signed-off-by: Masahiro Nakagawa --- lib/fluent/plugin/in_tcp.rb | 30 +++++++++++++++++++++++++++--- test/plugin/test_in_tcp.rb | 25 +++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb index 9f7a2a10e7..b1f3bc68d7 100644 --- a/lib/fluent/plugin/in_tcp.rb +++ b/lib/fluent/plugin/in_tcp.rb @@ -61,9 +61,9 @@ def multi_workers_ready? def start super - server_create(:in_tcp_server, @port, bind: @bind, resolve_name: !!@source_hostname_key) do |data, conn| - conn.buffer << data - begin + if @_extract_enabled && @_extract_tag_key + server_create(:in_tcp_server_single_emit, @port, bind: @bind, resolve_name: !!@source_hostname_key) do |data, conn| + conn.buffer << data pos = 0 while i = conn.buffer.index(@delimiter, pos) msg = conn.buffer[pos...i] @@ -85,6 +85,30 @@ def start end conn.buffer.slice!(0, pos) if pos > 0 end + else + server_create(:in_tcp_server_batch_emit, @port, bind: @bind, resolve_name: !!@source_hostname_key) do |data, conn| + conn.buffer << data + pos = 0 + es = Fluent::MultiEventStream.new + while i = conn.buffer.index(@delimiter, pos) + msg = conn.buffer[pos...i] + pos = i + @delimiter.length + + @parser.parse(msg) do |time, record| + unless time && record + log.warn "pattern not matched", message: msg + next + end + + time ||= extract_time_from_record(record) || Fluent::EventTime.now + record[@source_address_key] = conn.remote_addr if @source_address_key + record[@source_hostname_key] = conn.remote_host if @source_hostname_key + es.add(time, record) + end + end + router.emit_stream(@tag, es) + conn.buffer.slice!(0, pos) if pos > 0 + end end end end diff --git a/test/plugin/test_in_tcp.rb b/test/plugin/test_in_tcp.rb index 16736dcd6d..fcb4afdc6f 100755 --- a/test/plugin/test_in_tcp.rb +++ b/test/plugin/test_in_tcp.rb @@ -162,4 +162,29 @@ def create_tcp_socket(host, port, &block) assert event[1].is_a?(Fluent::EventTime) assert_equal address, event[2]['addr'] end + + sub_test_case '' do + test 'extract tag from record field' do + d = create_driver(BASE_CONFIG + %! + + @type json + + + tag_key tag + + !) + d.run(expect_records: 1) do + create_tcp_socket('127.0.0.1', PORT) do |sock| + data = {'msg' => 'hello', 'tag' => 'helper_test'} + sock.send("#{data.to_json}\n", 0) + end + end + + assert_equal 1, d.events.size + event = d.events[0] + assert_equal 'helper_test', event[0] + assert event[1].is_a?(Fluent::EventTime) + assert_equal 'hello', event[2]['msg'] + end + end end From 0b78be019941683248dfa3c81dd6689fc7b912e7 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Fri, 16 Aug 2019 02:25:00 +0900 Subject: [PATCH 2/2] in_tcp: Use local var to reduce method call Signed-off-by: Masahiro Nakagawa --- lib/fluent/plugin/in_tcp.rb | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb index b1f3bc68d7..fb7c3ea395 100644 --- a/lib/fluent/plugin/in_tcp.rb +++ b/lib/fluent/plugin/in_tcp.rb @@ -61,13 +61,15 @@ def multi_workers_ready? def start super + del_size = @delimiter.length if @_extract_enabled && @_extract_tag_key server_create(:in_tcp_server_single_emit, @port, bind: @bind, resolve_name: !!@source_hostname_key) do |data, conn| conn.buffer << data + buf = conn.buffer pos = 0 - while i = conn.buffer.index(@delimiter, pos) - msg = conn.buffer[pos...i] - pos = i + @delimiter.length + while i = buf.index(@delimiter, pos) + msg = buf[pos...i] + pos = i + del_size @parser.parse(msg) do |time, record| unless time && record @@ -83,16 +85,17 @@ def start router.emit(tag, time, record) end end - conn.buffer.slice!(0, pos) if pos > 0 + buf.slice!(0, pos) if pos > 0 end else server_create(:in_tcp_server_batch_emit, @port, bind: @bind, resolve_name: !!@source_hostname_key) do |data, conn| conn.buffer << data + buf = conn.buffer pos = 0 es = Fluent::MultiEventStream.new - while i = conn.buffer.index(@delimiter, pos) - msg = conn.buffer[pos...i] - pos = i + @delimiter.length + while i = buf.index(@delimiter, pos) + msg = buf[pos...i] + pos = i + del_size @parser.parse(msg) do |time, record| unless time && record @@ -107,7 +110,7 @@ def start end end router.emit_stream(@tag, es) - conn.buffer.slice!(0, pos) if pos > 0 + buf.slice!(0, pos) if pos > 0 end end end