Skip to content

Commit

Permalink
in_udp: Add remove_newline parameter. Backport #1747
Browse files Browse the repository at this point in the history
  • Loading branch information
repeatedly committed Nov 14, 2017
1 parent 997ac59 commit 0d2ff73
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 3 deletions.
4 changes: 3 additions & 1 deletion lib/fluent/plugin/in_udp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class UdpInput < SocketUtil::BaseInput
config_param :body_size_limit, :size, default: nil, deprecated: "use message_length_limit instead."
desc "The max bytes of message"
config_param :message_length_limit, :size, default: 4096
desc "Remove newline from the end of incoming payload"
config_param :remove_newline, :bool, default: true

def configure(conf)
super
Expand All @@ -37,7 +39,7 @@ def listen(callback)
log.info "listening udp socket on #{@bind}:#{@port}"
@usock = SocketUtil.create_udp_socket(@bind)
@usock.bind(@bind, @port)
SocketUtil::UdpHandler.new(@usock, log, @message_length_limit, callback, !!@source_hostname_key)
SocketUtil::UdpHandler.new(@usock, log, @message_length_limit, callback, !!@source_hostname_key, @remove_newline)
end
end
end
5 changes: 3 additions & 2 deletions lib/fluent/plugin/socket_util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,19 @@ def create_udp_socket(host)
module_function :create_udp_socket

class UdpHandler < Coolio::IO
def initialize(io, log, body_size_limit, callback, resolve_hostname = false)
def initialize(io, log, body_size_limit, callback, resolve_hostname = false, remove_newline = true)
super(io)
@io = io
@io.do_not_reverse_lookup = !resolve_hostname
@log = log
@body_size_limit = body_size_limit
@remove_newline = remove_newline
@callback = callback
end

def on_readable
msg, addr = @io.recvfrom_nonblock(@body_size_limit)
msg.chomp!
msg.chomp! if @remove_newline
@callback.call(msg, addr)
rescue => e
@log.error "unexpected error", error: e, error_class: e.class
Expand Down
20 changes: 20 additions & 0 deletions test/plugin/test_in_udp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,26 @@ def test_time_format
end
}

test "remove_newline" do
tests = [{'msg' => "test1\n", 'expected' => "test1\n"},
{'msg' => "test2\n", 'expected' => "test2\n"}]

d = create_driver(BASE_CONFIG + %!
format none
remove_newline false
!)
d.run do
u = UDPSocket.new
u.connect('127.0.0.1', PORT)
tests.each { |test|
u.send(test['msg'], 0)
}
sleep 1
end

compare_test_result(d.emits, tests)
end

def compare_test_result(emits, tests)
assert_equal(2, emits.size)
emits.each_index {|i|
Expand Down

0 comments on commit 0d2ff73

Please sign in to comment.