Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions lib/logstash/outputs/opensearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ def bulk(actions)

body_stream = StringIO.new
if http_compression
body_stream.set_encoding "BINARY"
stream_writer = gzip_writer(body_stream)
else
stream_writer = body_stream
Expand All @@ -126,9 +125,24 @@ def bulk(actions)
:payload_size => stream_writer.pos,
:content_length => body_stream.size,
:batch_offset => (index + 1 - batch_actions.size))

# Have to close gzip writer before reading from body_stream; otherwise stream doesn't end properly
# and will cause server side error
if http_compression
stream_writer.close
end

bulk_responses << bulk_send(body_stream, batch_actions)
body_stream.truncate(0) && body_stream.seek(0)
stream_writer = gzip_writer(body_stream) if http_compression

if http_compression
# Get a new StringIO object and gzip writer
body_stream = StringIO.new
stream_writer = gzip_writer(body_stream)
else
# Clear existing StringIO object and reuse existing stream writer
body_stream.truncate(0) && body_stream.seek(0)
end

batch_actions.clear
end
stream_writer.write(as_json)
Expand All @@ -149,6 +163,7 @@ def gzip_writer(io)
fail(ArgumentError, "Cannot create gzip writer on IO with unread bytes") unless io.eof?
fail(ArgumentError, "Cannot create gzip writer on non-empty IO") unless io.pos == 0

io.set_encoding "BINARY"
Zlib::GzipWriter.new(io, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY)
end

Expand Down