Skip to content

Commit

Permalink
Fixes compression on elasticsearch-transport
Browse files Browse the repository at this point in the history
- "compression" option should compress outbound requests.
- Decompress compressed responses, whether or not use_compression is set.
- Properly set header on gzipped request body
- Perform compression inside HTTP adapters
- Explicitly require zlib library
- Content-Encoding is set in the HTTP adapters, so we don't need it in the apply_headers method
- Add specs for curb, faraday, and manticore
- Update curb_spec.rb
  • Loading branch information
johnnyshields authored and picandocodigo committed Oct 8, 2021
1 parent c2f8311 commit 8b326d6
Show file tree
Hide file tree
Showing 9 changed files with 462 additions and 18 deletions.
1 change: 1 addition & 0 deletions lib/elastic/transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
require 'time'
require 'timeout'
require 'uri'
require 'zlib'

require 'elastic/transport/transport/loggable'
require 'elastic/transport/transport/serializer/multi_json'
Expand Down
29 changes: 26 additions & 3 deletions lib/elastic/transport/transport/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def __raise_transport_error(response)
# @api private
#
def __convert_to_json(o=nil, options={})
o = o.is_a?(String) ? o : serializer.dump(o, options)
o.is_a?(String) ? o : serializer.dump(o, options)
end

# Returns a full URL based on information from host
Expand Down Expand Up @@ -373,17 +373,38 @@ def host_unreachable_exceptions

USER_AGENT_STR = 'User-Agent'.freeze
USER_AGENT_REGEX = /user\-?\_?agent/
ACCEPT_ENCODING = 'Accept-Encoding'.freeze
CONTENT_ENCODING = 'Content-Encoding'.freeze
CONTENT_TYPE_STR = 'Content-Type'.freeze
CONTENT_TYPE_REGEX = /content\-?\_?type/
DEFAULT_CONTENT_TYPE = 'application/json'.freeze
GZIP = 'gzip'.freeze
ACCEPT_ENCODING = 'Accept-Encoding'.freeze
GZIP_FIRST_TWO_BYTES = '1f8b'.freeze
HEX_STRING_DIRECTIVE = 'H*'.freeze
RUBY_ENCODING = '1.9'.respond_to?(:force_encoding)

def compress_request(body, headers)
if body
headers ||= {}

if gzipped?(body)
headers[CONTENT_ENCODING] = GZIP
elsif use_compression?
headers[CONTENT_ENCODING] = GZIP
gzip = Zlib::GzipWriter.new(StringIO.new)
gzip << body
body = gzip.close.string
else
headers.delete(CONTENT_ENCODING)
end
elsif headers
headers.delete(CONTENT_ENCODING)
end

[body, headers]
end

def decompress_response(body)
return body unless use_compression?
return body unless gzipped?(body)

io = StringIO.new(body)
Expand All @@ -396,6 +417,8 @@ def decompress_response(body)
end

def gzipped?(body)
return unless body && !body.empty?

body[0..1].unpack(HEX_STRING_DIRECTIVE)[0] == GZIP_FIRST_TWO_BYTES
end

Expand Down
8 changes: 4 additions & 4 deletions lib/elastic/transport/transport/http/curb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,23 @@ module HTTP
#
class Curb
include Base

# Performs the request by invoking {Transport::Base#perform_request} with a block.
#
# @return [Response]
# @see Transport::Base#perform_request
#
def perform_request(method, path, params={}, body=nil, headers=nil, opts={})
super do |connection, url|
super do |connection, _url|
connection.connection.url = connection.full_url(path, params)
body = body ? __convert_to_json(body) : nil
body, headers = compress_request(body, headers)

case method
when 'HEAD'
connection.connection.set :nobody, true
when 'GET', 'POST', 'PUT', 'DELETE'
connection.connection.set :nobody, false

connection.connection.put_data = __convert_to_json(body) if body
connection.connection.put_data = body if body

if headers
if connection.connection.headers
Expand Down
12 changes: 8 additions & 4 deletions lib/elastic/transport/transport/http/faraday.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,15 @@ def perform_request(method, path, params = {}, body = nil, headers = nil, opts =
else
headers
end
body = body ? __convert_to_json(body) : nil
body, headers = compress_request(body, headers)

response = connection.connection.run_request(method.downcase.to_sym,
url,
( body ? __convert_to_json(body) : nil ),
headers)
response = connection.connection.run_request(
method.downcase.to_sym,
url,
body,
headers
)

Response.new response.status, decompress_response(response.body), response.headers
end
Expand Down
5 changes: 4 additions & 1 deletion lib/elastic/transport/transport/http/manticore.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ def build_client(options={})
#
def perform_request(method, path, params={}, body=nil, headers=nil, opts={})
super do |connection, url|
params[:body] = __convert_to_json(body) if body
body = body ? __convert_to_json(body) : nil
body, headers = compress_request(body, @request_options[:headers])

params[:body] = body if body
params[:headers] = headers if headers
params = params.merge @request_options
case method
Expand Down
12 changes: 6 additions & 6 deletions spec/elastic/transport/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1405,7 +1405,7 @@
end

it 'sets the Accept-Encoding header' do
expect(client.transport.connections[0].connection.headers['Accept-Encoding'])
expect(client.transport.connections[0].connection.headers['Accept-Encoding']).to eq 'gzip'
end

it 'preserves the other headers' do
Expand All @@ -1423,7 +1423,7 @@
end

it 'sets the Accept-Encoding header' do
expect(client.transport.connections[0].connection.headers['Accept-Encoding'])
expect(client.transport.connections[0].connection.headers['Accept-Encoding']).to eq 'gzip'
end

it 'preserves the other headers' do
Expand All @@ -1441,7 +1441,7 @@
end

it 'sets the Accept-Encoding header' do
expect(client.transport.connections[0].connection.headers['Accept-Encoding'])
expect(client.transport.connections[0].connection.headers['Accept-Encoding']).to eq 'gzip'
end

it 'preserves the other headers' do
Expand All @@ -1459,7 +1459,7 @@
end

it 'sets the Accept-Encoding header' do
expect(client.transport.connections[0].connection.headers['Accept-Encoding'])
expect(client.transport.connections[0].connection.headers['Accept-Encoding']).to eq 'gzip'
end

it 'preserves the other headers' do
Expand All @@ -1477,7 +1477,7 @@
end

it 'sets the Accept-Encoding header' do
expect(client.transport.connections[0].connection.headers['Accept-Encoding'])
expect(client.transport.connections[0].connection.headers['Accept-Encoding']).to eq 'gzip'
end

it 'preserves the other headers' do
Expand All @@ -1499,7 +1499,7 @@
end

it 'sets the Accept-Encoding header' do
expect(client.transport.connections[0].connection.headers['Accept-Encoding'])
expect(client.transport.connections[0].connection.headers['Accept-Encoding']).to eq 'gzip'
end

it 'preserves the other headers' do
Expand Down
126 changes: 126 additions & 0 deletions spec/elastic/transport/http/curb_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

unless defined?(JRUBY_VERSION)
require_relative '../../../spec_helper'

describe Elasticsearch::Transport::Transport::HTTP::Curb do
let(:client) do
Elasticsearch::Transport::Client.new(transport_class: described_class)
end

describe '#perform_request' do
subject(:perform_request) { client.perform_request(*args) }
let(:args) do
['POST', '/', {}, body, headers]
end
let(:body) { '{"foo":"bar"}' }
let(:headers) { { 'Content-Type' => 'application/x-ndjson' } }

before do
allow_any_instance_of(Curl::Easy).to receive(:http).and_return(true)
end

it 'convert body to json' do
expect(client.transport).to receive(:__convert_to_json).with(body)
perform_request
end

it 'call compress_request' do
expect(client.transport).to receive(:compress_request).with(body, headers)
perform_request
end

it 'return response' do
expect(perform_request).to be_kind_of(Elasticsearch::Transport::Transport::Response)
end

it 'put body' do
expect(client.transport.connections.first.connection).to receive('put_data=').with(body)
perform_request
end

context 'when body nil' do
let(:body) { nil }

it 'convert body to json' do
expect(client.transport).not_to receive(:__convert_to_json)
perform_request
end

it 'call compress_request' do
expect(client.transport).to receive(:compress_request).with(body, headers)
perform_request
end

it 'put body' do
expect(client.transport.connections.first.connection).not_to receive('put_data=')
perform_request
end
end

context 'when body is hash' do
let(:body) { { foo: 'bar' } }
let(:body_string) { '{"foo":"bar"}' }

it 'convert body to json' do
expect(client.transport).to receive(:__convert_to_json).with(body)
perform_request
end

it 'call compress_request' do
expect(client.transport).to receive(:compress_request).with(body_string, headers)
perform_request
end

it 'put body' do
expect(client.transport.connections.first.connection).to receive('put_data=').with(body_string)
perform_request
end
end

context 'when compression enabled' do
let(:client) do
Elasticsearch::Transport::Client.new(transport_class: described_class, compression: true)
end
let(:body_string) { '{"foo":"bar"}' }
let(:compressed_body) do
gzip = Zlib::GzipWriter.new(StringIO.new)
gzip << body_string
gzip.close.string
end

before { allow(client.transport).to receive(:decompress_response).and_return('') }

it 'put compressed body' do
expect(client.transport.connections.first.connection).to receive('put_data=').with(compressed_body)
perform_request
end

it 'set Content-Encoding header' do
perform_request
expect(client.transport.connections.first.connection.headers).to include('Content-Encoding')
end

it 'set Content-Encoding to gzip' do
perform_request
expect(client.transport.connections.first.connection.headers['Content-Encoding']).to eql('gzip')
end
end
end
end
end
Loading

0 comments on commit 8b326d6

Please sign in to comment.