Skip to content

Commit 4194195

Browse files
committed
scan and scroll object for distributed search execution
1 parent 58b0928 commit 4194195

File tree

2 files changed

+260
-0
lines changed

2 files changed

+260
-0
lines changed

lib/Elastica/ScanAndScroll.php

+146
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
<?php
2+
3+
namespace Elastica;
4+
5+
/**
6+
* scan and scroll object for distributed search execution
7+
*
8+
* @category Xodoa
9+
* @package Elastica
10+
* @author Manuel Andreo Garcia <[email protected]>
11+
* @link http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/scan-scroll.html
12+
*/
13+
class ScanAndScroll implements \Iterator {
14+
15+
/**
16+
* time value parameter
17+
*
18+
* @link http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-scroll.html
19+
* @var string
20+
*/
21+
public $expiryTime = '1m';
22+
23+
/**
24+
* @var int
25+
*/
26+
public $sizePerShard = 1000;
27+
28+
/**
29+
* @var Search
30+
*/
31+
protected $_search;
32+
33+
/**
34+
* @var null|string
35+
*/
36+
protected $_nextScrollId = null;
37+
38+
/**
39+
* @var null|string
40+
*/
41+
protected $_lastScrollId = null;
42+
43+
/**
44+
* @var ResultSet
45+
*/
46+
protected $_currentResultSet = null;
47+
48+
/**
49+
* Constructs scroll iterator object
50+
*
51+
* @param Search $search
52+
*/
53+
function __construct(Search $search) {
54+
$this->_search = $search;
55+
}
56+
57+
/**
58+
* Return the current result set
59+
*
60+
* @link http://php.net/manual/en/iterator.current.php
61+
* @return ResultSet
62+
*/
63+
public function current() {
64+
return $this->_currentResultSet;
65+
}
66+
67+
/**
68+
* Perform next scroll search
69+
*
70+
* @link http://php.net/manual/en/iterator.next.php
71+
* @return void
72+
*/
73+
public function next() {
74+
$this->_scroll();
75+
}
76+
77+
/**
78+
* Return the scroll id of current scroll search
79+
*
80+
* @link http://php.net/manual/en/iterator.key.php
81+
* @return string
82+
*/
83+
public function key() {
84+
return $this->_lastScrollId;
85+
}
86+
87+
/**
88+
* Returns true if current result set contains hit
89+
*
90+
* @link http://php.net/manual/en/iterator.valid.php
91+
* @return boolean
92+
*/
93+
public function valid() {
94+
return
95+
$this->_nextScrollId !== null
96+
&& $this->_currentResultSet !== null
97+
&& $this->_currentResultSet->count() > 0;
98+
}
99+
100+
/**
101+
* Start the initial scan search
102+
* @link http://php.net/manual/en/iterator.rewind.php
103+
* @throws \Elastica\Exception\InvalidException
104+
* @return void
105+
*/
106+
public function rewind() {
107+
$this->_search->getQuery()->setSize($this->sizePerShard);
108+
109+
$this->_search->setOption(Search::OPTION_SEARCH_TYPE, Search::OPTION_SEARCH_TYPE_SCAN);
110+
$this->_search->setOption(Search::OPTION_SCROLL, $this->expiryTime);
111+
112+
// initial search
113+
$this->_setScrollId($this->_search->search());
114+
115+
// trigger first scroll request
116+
$this->_scroll();
117+
}
118+
119+
/**
120+
* Perform next scroll search
121+
* @throws \Elastica\Exception\InvalidException
122+
* @return void
123+
*/
124+
protected function _scroll() {
125+
$this->_search->setOption(Search::OPTION_SEARCH_TYPE, Search::OPTION_SEARCH_TYPE_SCROLL);
126+
$this->_search->setOption(Search::OPTION_SCROLL_ID, $this->_nextScrollId);
127+
128+
$resultSet = $this->_search->search();
129+
$this->_currentResultSet = $resultSet;
130+
$this->_setScrollId($resultSet);
131+
}
132+
133+
/**
134+
* Save last scroll id and extract the new one if possible
135+
* @param ResultSet $resultSet
136+
*/
137+
protected function _setScrollId(ResultSet $resultSet) {
138+
$this->_lastScrollId = $this->_nextScrollId;
139+
140+
$this->_nextScrollId = null;
141+
if($resultSet->getResponse()->isOk()) {
142+
$this->_nextScrollId = $resultSet->getResponse()->getScrollId();
143+
}
144+
}
145+
146+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
<?php
2+
3+
namespace Elastica\Test;
4+
5+
use Elastica\Document;
6+
use Elastica\Query;
7+
use Elastica\ResultSet;
8+
use Elastica\ScanAndScroll;
9+
use Elastica\Search;
10+
use Elastica\Test\Base as BaseTest;
11+
12+
class ScanAndScrollTest extends BaseTest {
13+
14+
public function testConstruct() {
15+
$scanAndScroll = $this->_prepareScanAndScroll();
16+
17+
$this->assertInstanceOf('Elastica\ScanAndScroll', $scanAndScroll);
18+
}
19+
20+
public function testDefaultProperties() {
21+
$scanAndScroll = $this->_prepareScanAndScroll();
22+
23+
$this->assertEquals('1m', $scanAndScroll->expiryTime);
24+
$this->assertEquals(1000, $scanAndScroll->sizePerShard);
25+
}
26+
27+
public function testQuerySizeOverride() {
28+
$query = new Query();
29+
$query->setSize(100);
30+
31+
$index = $this->_createIndex('test_1');
32+
$type = $index->getType('scanAndScrollTest');
33+
34+
$search = new Search($this->_getClient());
35+
$search->addIndex($index)->addType($type);
36+
$search->setQuery($query);
37+
38+
$scanAndScroll = new ScanAndScroll($search);
39+
$scanAndScroll->sizePerShard = 10;
40+
$scanAndScroll->rewind();
41+
42+
$this->assertEquals(10, $query->getParam('size'));
43+
}
44+
45+
public function testSizePerShard() {
46+
$search = $this->_prepareSearch('test_2', 2, 20);
47+
48+
$scanAndScroll = new ScanAndScroll($search);
49+
$scanAndScroll->sizePerShard = 5;
50+
$scanAndScroll->rewind();
51+
52+
$this->assertEquals(10, $scanAndScroll->current()->count());
53+
}
54+
55+
public function testScrollId() {
56+
$search = $this->_prepareSearch('test_3', 1, 2);
57+
58+
$scanAndScroll = new ScanAndScroll($search);
59+
$scanAndScroll->sizePerShard = 1;
60+
61+
$scanAndScroll->rewind();
62+
$this->assertEquals(
63+
$scanAndScroll->current()->getResponse()->getScrollId(),
64+
$scanAndScroll->key()
65+
);
66+
}
67+
68+
public function testForeach() {
69+
$search = $this->_prepareSearch('test_4', 2, 11);
70+
71+
$scanAndScroll = new ScanAndScroll($search);
72+
$scanAndScroll->sizePerShard = 5;
73+
74+
// We expect 2 scrolls:
75+
// 1. with 10 hits,
76+
// 2. with 1 hit
77+
// Note: there is a 3. scroll with 0 requests
78+
79+
$count = 0;
80+
foreach($scanAndScroll as $resultSet) {
81+
/** @var ResultSet $resultSet */
82+
$count++;
83+
84+
switch(true) {
85+
case $count == 1: $this->assertEquals(10, $resultSet->count()); break;
86+
case $count == 2: $this->assertEquals(1, $resultSet->count()); break;
87+
}
88+
}
89+
90+
$this->assertEquals(2, $count);
91+
}
92+
93+
private function _prepareScanAndScroll() {
94+
return new ScanAndScroll(new Search($this->_getClient()));
95+
}
96+
97+
private function _prepareSearch($indexName, $indexShards, $docs) {
98+
$index = $this->_createIndex($indexName, true, $indexShards);
99+
$type = $index->getType('scanAndScrollTest');
100+
101+
$insert = array();
102+
for ($x = 1; $x <= $docs; $x++) {
103+
$insert[] = new Document($x, array('id' => $x, 'key' => 'value'));
104+
}
105+
106+
$type->addDocuments($insert);
107+
$index->refresh();
108+
109+
$search = new Search($this->_getClient());
110+
$search->addIndex($index)->addType($type);
111+
112+
return $search;
113+
}
114+
}

0 commit comments

Comments
 (0)