Skip to content

Commit 2d9594d

Browse files
author
spalger
committed
resort every segment
When using segmented fetch we currently assume that documents are stored in index patterns based on time, but this is no long necessary with the use of the field stats API. Since documents can be in any order we need to fetch documents from every index and then sort them client side. Since fetching the documents can take some time, we are using the bounds of the time field in an index to try and identify indices which can't produce hits for a result set, and then not fetch their documents. It works like this: - get the list of indices and the min/max of their timefield - fetch the entire sample size from each index until we have enough documents to satisfy the sample. - for each remaining index pattern - if the min/max of it's timefield overlaps the min/max of the fetched documents also fetch the documents for that index pattern - otherwise fetch with size=0 Fixes #5642
1 parent ba15f25 commit 2d9594d

File tree

7 files changed

+236
-39
lines changed

7 files changed

+236
-39
lines changed

src/fixtures/stubbed_logstash_index_pattern.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
define(function (require) {
22
return function stubbedLogstashIndexPatternService(Private) {
3-
var StubIndexPattern = Private(require('testUtils/stubIndexPattern'));
3+
var StubIndexPattern = Private(require('testUtils/stub_index_pattern'));
44
var fieldTypes = Private(require('ui/index_patterns/_field_types'));
55
var mockLogstashFields = Private(require('fixtures/logstash_fields'));
66

src/fixtures/stubbed_search_source.js

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ define(function (require) {
22
var sinon = require('auto-release-sinon');
33
var searchResponse = require('fixtures/search_response');
44

5-
return function stubSearchSource(Private, $q) {
5+
return function stubSearchSource(Private, $q, Promise) {
66
var deferedResult = $q.defer();
7+
var indexPattern = Private(require('fixtures/stubbed_logstash_index_pattern'));
78

89
return {
910
sort: sinon.spy(),
@@ -13,7 +14,7 @@ define(function (require) {
1314
get: function (param) {
1415
switch (param) {
1516
case 'index':
16-
return Private(require('fixtures/stubbed_logstash_index_pattern'));
17+
return indexPattern;
1718
default:
1819
throw new Error('Param "' + param + '" is not implemented in the stubbed search source');
1920
}
@@ -29,7 +30,9 @@ define(function (require) {
2930
return deferedResult.promise;
3031
},
3132
onError: function () { return $q.defer().promise; },
32-
33+
_flatten() {
34+
return Promise.resolve({ index: indexPattern, body: {} });
35+
}
3336
};
3437

3538
};

src/plugins/kibana/public/discover/controllers/discover.js

Lines changed: 15 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -339,13 +339,14 @@ define(function (require) {
339339
}());
340340

341341
var sortFn = null;
342-
if (sortBy === 'non-time') {
342+
if (sortBy !== 'implicit') {
343343
sortFn = new HitSortFn(sort[1]);
344344
}
345345

346346
$scope.updateTime();
347347
if (sort[0] === '_score') segmented.setMaxSegments(1);
348348
segmented.setDirection(sortBy === 'time' ? (sort[1] || 'desc') : 'desc');
349+
segmented.setSortFn(sortFn);
349350
segmented.setSize(sortBy === 'time' ? $scope.opts.sampleSize : false);
350351

351352
// triggered when the status updated
@@ -364,30 +365,26 @@ define(function (require) {
364365
return failure.index + failure.shard + failure.reason;
365366
});
366367
}
368+
}));
367369

368-
var rows = $scope.rows;
369-
var indexPattern = $scope.searchSource.get('index');
370+
segmented.on('mergedSegment', function (merged) {
371+
$scope.mergedEsResp = merged;
372+
$scope.hits = merged.hits.total;
370373

371-
// merge the rows and the hits, use a new array to help watchers
372-
rows = $scope.rows = rows.concat(resp.hits.hits);
374+
var indexPattern = $scope.searchSource.get('index');
373375

374-
if (sortFn) {
375-
notify.event('resort rows', function () {
376-
rows.sort(sortFn);
377-
rows = $scope.rows = rows.slice(0, totalSize);
378-
$scope.fieldCounts = {};
379-
});
380-
}
376+
// the merge rows, use a new array to help watchers
377+
$scope.rows = merged.hits.hits.slice();
381378

382379
notify.event('flatten hit and count fields', function () {
383-
var counts = $scope.fieldCounts;
380+
var counts = $scope.fieldCounts = (sortFn ? {} : $scope.fieldCounts) || {};
384381
$scope.rows.forEach(function (hit) {
385-
// skip this work if we have already done it and we are NOT sorting.
386-
// ---
382+
// skip this work if we have already done it
383+
if (hit.$$_counted) return;
384+
387385
// when we are sorting results, we need to redo the counts each time because the
388-
// "top 500" may change with each response
389-
if (hit.$$_counted && !sortFn) return;
390-
hit.$$_counted = true;
386+
// "top 500" may change with each response, so don't mark this as counted
387+
if (!sortFn) hit.$$_counted = true;
391388

392389
var fields = _.keys(indexPattern.flattenHit(hit));
393390
var n = fields.length;
@@ -398,13 +395,6 @@ define(function (require) {
398395
}
399396
});
400397
});
401-
402-
}));
403-
404-
segmented.on('mergedSegment', function (merged) {
405-
$scope.mergedEsResp = merged;
406-
$scope.hits = merged.hits.total;
407-
408398
});
409399

410400
segmented.on('complete', function () {

src/testUtils/stubIndexPattern.js renamed to src/testUtils/stub_index_pattern.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@ define(function (require) {
2323
this.routes = IndexPattern.prototype.routes;
2424

2525
this.toIndexList = _.constant(Promise.resolve([pattern]));
26+
this.toDetailedIndexList = _.constant(Promise.resolve([
27+
{
28+
index: pattern,
29+
min: 0,
30+
max: 1
31+
}
32+
]));
2633
this.getComputedFields = _.bind(getComputedFields, this);
2734
this.flattenHit = flattenHit(this);
2835
this.formatHit = formatHit(this, fieldFormats.getDefaultInstance('string'));
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
import ngMock from 'ngMock';
2+
import expect from 'expect.js';
3+
import { times } from 'lodash';
4+
import sinon from 'auto-release-sinon';
5+
6+
import HitSortFnProv from 'plugins/kibana/discover/_hit_sort_fn';
7+
import NoDigestPromises from 'testUtils/noDigestPromises';
8+
9+
describe('Segmented Request Index Selection', function () {
10+
let Promise;
11+
let $rootScope;
12+
let SegmentedReq;
13+
let MockSource;
14+
let HitSortFn;
15+
16+
NoDigestPromises.activateForSuite();
17+
18+
beforeEach(ngMock.module('kibana'));
19+
beforeEach(ngMock.inject((Private, $injector) => {
20+
Promise = $injector.get('Promise');
21+
HitSortFn = Private(HitSortFnProv);
22+
$rootScope = $injector.get('$rootScope');
23+
SegmentedReq = Private(require('ui/courier/fetch/request/segmented'));
24+
25+
const StubbedSearchSourceProvider = require('fixtures/stubbed_search_source');
26+
MockSource = class {
27+
constructor() {
28+
return $injector.invoke(StubbedSearchSourceProvider);
29+
}
30+
};
31+
}));
32+
33+
it('queries with size until all 500 docs returned', async function () {
34+
const search = new MockSource();
35+
const indexPattern = search.get('index');
36+
sinon.stub(indexPattern, 'toDetailedIndexList').returns(Promise.resolve([
37+
{ index: 'one', min: 0, max: 1 },
38+
{ index: 'two', min: 0, max: 1 },
39+
{ index: 'three', min: 0, max: 1 },
40+
{ index: 'four', min: 0, max: 1 },
41+
{ index: 'five', min: 0, max: 1 },
42+
]));
43+
44+
const req = new SegmentedReq(search);
45+
req._handle.setDirection('desc');
46+
req._handle.setSortFn(new HitSortFn('desc'));
47+
req._handle.setSize(500);
48+
await req.start();
49+
50+
// first 200
51+
expect((await req.getFetchParams()).body.size).to.be(500);
52+
await req.handleResponse({
53+
hits: { total: 1000, hits: times(200, (i) => ({ i })) }
54+
});
55+
56+
// total = 400
57+
expect((await req.getFetchParams()).body.size).to.be(500);
58+
await req.handleResponse({
59+
hits: { total: 1000, hits: times(200, (i) => ({ i })) }
60+
});
61+
62+
// total = 600
63+
expect((await req.getFetchParams()).body.size).to.be(500);
64+
await req.handleResponse({
65+
hits: { total: 1000, hits: times(200, (i) => ({ i })) }
66+
});
67+
68+
expect((await req.getFetchParams()).body.size).to.be(0);
69+
await req.handleResponse({
70+
hits: { total: 1000, hits: times(200, (i) => ({ i })) }
71+
});
72+
73+
expect((await req.getFetchParams()).body.size).to.be(0);
74+
await req.handleResponse({
75+
hits: { total: 1000, hits: times(200, (i) => ({ i })) }
76+
});
77+
});
78+
79+
it(`sets size 0 for indices that couldn't procude hits`, async function () {
80+
const search = new MockSource();
81+
const indexPattern = search.get('index');
82+
83+
// the segreq is looking for 10 documents, and we will give it ten docs with time:5 in the first response.
84+
// on the second index it should still request 10 documents because it could produce documents with time:5.
85+
// the next two indexes will get size 0, since they couldn't produce documents with the time:5
86+
// the final index will get size:10, because it too can produce docs with time:5
87+
sinon.stub(indexPattern, 'toDetailedIndexList').returns(Promise.resolve([
88+
{ index: 'one', min: 0, max: 10 },
89+
{ index: 'two', min: 0, max: 10 },
90+
{ index: 'three', min: 12, max: 20 },
91+
{ index: 'four', min: 15, max: 20 },
92+
{ index: 'five', min: 5, max: 50 },
93+
]));
94+
95+
const req = new SegmentedReq(search);
96+
req._handle.setDirection('desc');
97+
req._handle.setSortFn(new HitSortFn('desc'));
98+
req._handle.setSize(10);
99+
await req.start();
100+
101+
// first 10
102+
expect((await req.getFetchParams()).body.size).to.be(10);
103+
await req.handleResponse({
104+
hits: { total: 1000, hits: times(10, () => ({ _source: { time: 5 } })) }
105+
});
106+
107+
// total = 400
108+
expect((await req.getFetchParams()).body.size).to.be(10);
109+
await req.handleResponse({
110+
hits: { total: 1000, hits: times(10, () => ({ _source: { time: 5 } })) }
111+
});
112+
113+
// total = 600
114+
expect((await req.getFetchParams()).body.size).to.be(0);
115+
await req.handleResponse({
116+
hits: { total: 1000, hits: [] }
117+
});
118+
119+
expect((await req.getFetchParams()).body.size).to.be(0);
120+
await req.handleResponse({
121+
hits: { total: 1000, hits: [] }
122+
});
123+
124+
expect((await req.getFetchParams()).body.size).to.be(10);
125+
await req.handleResponse({
126+
hits: { total: 1000, hits: times(10, () => ({ _source: { time: 5 } })) }
127+
});
128+
});
129+
});

src/ui/public/courier/fetch/request/_segmented_handle.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ define(function (require) {
2222
this.setDirection = _.bindKey(req, 'setDirection');
2323
this.setSize = _.bindKey(req, 'setSize');
2424
this.setMaxSegments = _.bindKey(req, 'setMaxSegments');
25+
this.setSortFn = _.bindKey(req, 'setSortFn');
2526
}
2627

2728
return SegmentedHandle;

0 commit comments

Comments
 (0)