Skip to content

Commit 8af66d2

Browse files
committed
Add support for scrolling & efficient index copying.
1 parent 4244256 commit 8af66d2

File tree

7 files changed

+102
-15
lines changed

7 files changed

+102
-15
lines changed

lib/es.rb

+6-5
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
require 'es/version'
2-
require 'es/connection'
3-
require 'es/raw_client'
4-
require 'es/client'
5-
61
module ES
72
def self.new(*args) # faux constructor
83
Client.new(*args)
94
end
105
end
6+
7+
require 'es/version'
8+
require 'es/connection'
9+
require 'es/raw_client'
10+
require 'es/client'
11+
require 'es/utils'

lib/es/client.rb

+12-2
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,14 @@ def index(path, data)
2929
@dumper.load(response)
3030
end
3131

32-
def search(path, data)
32+
def search(path, data, params = {})
3333
serialized = serialize(data)
34-
response = @client.search(path, serialized)
34+
response = @client.search(path, serialized, params)
35+
@dumper.load(response)
36+
end
37+
38+
def scroll(params = {})
39+
response = @client.scroll(params)
3540
@dumper.load(response)
3641
end
3742

@@ -52,6 +57,11 @@ def delete_index(path)
5257
@dumper.load(response)
5358
end
5459

60+
def get_mapping(path)
61+
response = @client.get_mapping(path)
62+
@dumper.load(response)
63+
end
64+
5565
private
5666
def serialize(data)
5767
@dumper.dump(data, mode: :compat)

lib/es/raw_client.rb

+17-4
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,12 @@ def index(path, data)
2424
@connection.request(:put, path, data)
2525
end
2626

27-
def search(path, query)
28-
@connection.request(:post, action_path(path, :search), query)
27+
def search(path, query, params = {})
28+
@connection.request(:post, action_path(path, :search, params), query)
29+
end
30+
31+
def scroll(params = {})
32+
@connection.request(:get, action_path(nil, 'search/scroll', params))
2933
end
3034

3135
def update(path, data)
@@ -36,9 +40,18 @@ def bulk(requests, path = nil)
3640
@connection.request(:post, action_path(path, :bulk), requests)
3741
end
3842

43+
def get_mapping(path)
44+
@connection.request(:get, action_path(path, :mapping))
45+
end
46+
3947
private
40-
def action_path(path, action)
41-
path ? "#{path}/_#{action}" : "_#{action}"
48+
def action_path(path, action, params = {})
49+
full_path = path ? "#{path}/_#{action}" : "_#{action}"
50+
params.any? ? "#{full_path}?#{serialize_params(params)}" : full_path
51+
end
52+
53+
def serialize_params(params)
54+
params.map { |k, v| "#{k}=#{v}" }.join('&')
4255
end
4356
end
4457
end

lib/es/utils.rb

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
module ES
2+
def self.copy_index(from_index, to_index, opts = {})
3+
scroll_timeout = opts.fetch(:scroll_timeout, '5m')
4+
scroll_size = opts.fetch(:scroll_size, 100)
5+
6+
from = new(host: opts[:from_host])
7+
to = new(host: opts[:to_host])
8+
9+
results = from.search(from_index, {query: {match_all: {}}}, {scroll: scroll_timeout, size: scroll_size})
10+
11+
while results['hits']['hits'].any?
12+
requests = results['hits']['hits'].each_with_object([]) do |hit, requests|
13+
requests << {index: {_type: hit['_type'], _id: hit['id']}}
14+
requests << hit['_source']
15+
end
16+
17+
to.bulk(requests, to_index)
18+
19+
scroll_id = results['_scroll_id']
20+
results = from.scroll(scroll: scroll_timeout, scroll_id: scroll_id)
21+
end
22+
end
23+
end

spec/es/client_spec.rb

+20-2
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,32 @@
4242
end
4343

4444
it 'serializes data for .search' do
45-
client.should_receive(:search).with('index/1', raw_data).and_return(raw_response)
45+
client.should_receive(:search).with('index/1', raw_data, {size: 20}).and_return(raw_response)
4646

47-
subject.search('index/1', data).should == response
47+
subject.search('index/1', data, size: 20).should == response
48+
end
49+
50+
it 'serializes data from .scroll' do
51+
client.should_receive(:scroll).with({scroll_id: 123}).and_return(raw_response)
52+
53+
subject.scroll(scroll_id: 123)
4854
end
4955

5056
it 'serializes data for .update' do
5157
client.should_receive(:update).with('index/1', raw_data).and_return(raw_response)
5258

5359
subject.update('index/1', data).should == response
5460
end
61+
62+
it 'unserializes response for .get_mapping' do
63+
client.should_receive(:get_mapping).with('index').and_return(raw_response)
64+
65+
subject.get_mapping('index').should == response
66+
end
67+
68+
it 'should respond to all methods of raw client' do
69+
ES::RawClient.instance_methods(false).each do |method|
70+
subject.respond_to?(method).should be_true, "expected to respond to message #{method}"
71+
end
72+
end
5573
end

spec/es/connection_spec.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
describe ES::Connection do
44
let(:data) { :data }
55
let(:response_body) { :response }
6-
let(:response) { stub(response_code: 200, body_str: response_body) }
6+
let(:response) { double(response_code: 200, body_str: response_body) }
77
let(:driver) { double(:Driver) }
88

99
subject { described_class.new('http://localhost:9200', driver) }
@@ -33,7 +33,7 @@
3333
end
3434

3535
it 'throws error if response code is not 2XX' do
36-
error_response = stub(response_code: 400)
36+
error_response = double(response_code: 400)
3737
driver.should_receive(:http_post).and_return(error_response)
3838

3939
expect { subject.request(:post, 'index/_search', data) }.to raise_error(ES::Connection::Error)

spec/es/raw_client_spec.rb

+22
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,20 @@
4444

4545
subject.search('index', data).should == response
4646
end
47+
48+
it 'sends POST to connection with parameters' do
49+
connection.should_receive(:request).with(:post, 'index/_search?scroll=5m&size=100', data).and_return(response)
50+
51+
subject.search('index', data, scroll: '5m', size: 100)
52+
end
53+
end
54+
55+
context '.scroll' do
56+
it 'sends GET to connection with parameters' do
57+
connection.should_receive(:request).with(:get, '_search/scroll?scroll=5m&scroll_id=123').and_return(response)
58+
59+
subject.scroll(scroll: '5m', scroll_id: 123)
60+
end
4761
end
4862

4963
context '.bulk' do
@@ -61,4 +75,12 @@
6175
subject.update('index/1', data).should == response
6276
end
6377
end
78+
79+
context '.get_mapping' do
80+
it 'sends GET to connection' do
81+
connection.should_receive(:request).with(:get, 'index/_mapping').and_return(response)
82+
83+
subject.get_mapping('index').should == response
84+
end
85+
end
6486
end

0 commit comments

Comments
 (0)