Skip to content

Commit

Permalink
Merge pull request #2567 from fluent/improve-in_tcp
Browse files Browse the repository at this point in the history
in_tcp: Improve the performance for multiple events case
  • Loading branch information
repeatedly authored Aug 16, 2019
2 parents 5a7d949 + 0b78be0 commit 2ad5b6d
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 7 deletions.
41 changes: 34 additions & 7 deletions lib/fluent/plugin/in_tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@ def multi_workers_ready?
def start
super

server_create(:in_tcp_server, @port, bind: @bind, resolve_name: !!@source_hostname_key) do |data, conn|
conn.buffer << data
begin
del_size = @delimiter.length
if @_extract_enabled && @_extract_tag_key
server_create(:in_tcp_server_single_emit, @port, bind: @bind, resolve_name: !!@source_hostname_key) do |data, conn|
conn.buffer << data
buf = conn.buffer
pos = 0
while i = conn.buffer.index(@delimiter, pos)
msg = conn.buffer[pos...i]
pos = i + @delimiter.length
while i = buf.index(@delimiter, pos)
msg = buf[pos...i]
pos = i + del_size

@parser.parse(msg) do |time, record|
unless time && record
Expand All @@ -83,7 +85,32 @@ def start
router.emit(tag, time, record)
end
end
conn.buffer.slice!(0, pos) if pos > 0
buf.slice!(0, pos) if pos > 0
end
else
server_create(:in_tcp_server_batch_emit, @port, bind: @bind, resolve_name: !!@source_hostname_key) do |data, conn|
conn.buffer << data
buf = conn.buffer
pos = 0
es = Fluent::MultiEventStream.new
while i = buf.index(@delimiter, pos)
msg = buf[pos...i]
pos = i + del_size

@parser.parse(msg) do |time, record|
unless time && record
log.warn "pattern not matched", message: msg
next
end

time ||= extract_time_from_record(record) || Fluent::EventTime.now
record[@source_address_key] = conn.remote_addr if @source_address_key
record[@source_hostname_key] = conn.remote_host if @source_hostname_key
es.add(time, record)
end
end
router.emit_stream(@tag, es)
buf.slice!(0, pos) if pos > 0
end
end
end
Expand Down
25 changes: 25 additions & 0 deletions test/plugin/test_in_tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,29 @@ def create_tcp_socket(host, port, &block)
assert event[1].is_a?(Fluent::EventTime)
assert_equal address, event[2]['addr']
end

sub_test_case '<extract>' do
test 'extract tag from record field' do
d = create_driver(BASE_CONFIG + %!
<parse>
@type json
</parse>
<extract>
tag_key tag
</extract>
!)
d.run(expect_records: 1) do
create_tcp_socket('127.0.0.1', PORT) do |sock|
data = {'msg' => 'hello', 'tag' => 'helper_test'}
sock.send("#{data.to_json}\n", 0)
end
end

assert_equal 1, d.events.size
event = d.events[0]
assert_equal 'helper_test', event[0]
assert event[1].is_a?(Fluent::EventTime)
assert_equal 'hello', event[2]['msg']
end
end
end

0 comments on commit 2ad5b6d

Please sign in to comment.