diff --git a/src/ui/public/courier/__tests__/requestQueue.js b/src/ui/public/courier/__tests__/requestQueue.js index 723f0f016ccd4..29ebfb11a9ce3 100644 --- a/src/ui/public/courier/__tests__/requestQueue.js +++ b/src/ui/public/courier/__tests__/requestQueue.js @@ -3,19 +3,19 @@ import expect from 'expect.js'; import sinon from 'auto-release-sinon'; import RequestQueueProv from '../_request_queue'; -import SearchStrategyProv from '../fetch/strategy/search'; -import DocStrategyProv from '../fetch/strategy/doc'; +import EsDocStrategyProv from '../fetch_types/es_doc_strategy'; +import EsSearchStrategyProv from '../fetch_types/es_search_strategy'; describe('Courier Request Queue', function () { - let docStrategy; let requestQueue; - let searchStrategy; + let esDocStrategy; + let esSearchStrategy; beforeEach(ngMock.module('kibana')); beforeEach(ngMock.inject(function (Private) { - docStrategy = Private(DocStrategyProv); requestQueue = Private(RequestQueueProv); - searchStrategy = Private(SearchStrategyProv); + esDocStrategy = Private(EsDocStrategyProv); + esSearchStrategy = Private(EsSearchStrategyProv); })); class MockReq { @@ -29,20 +29,20 @@ describe('Courier Request Queue', function () { describe('#getStartable(strategy)', function () { it('only returns requests that match one of the passed strategies', function () { requestQueue.push( - new MockReq(docStrategy), - new MockReq(searchStrategy), - new MockReq(searchStrategy), - new MockReq(searchStrategy) + new MockReq(esDocStrategy), + new MockReq(esSearchStrategy), + new MockReq(esSearchStrategy), + new MockReq(esSearchStrategy) ); - expect(requestQueue.getStartable(docStrategy)).to.have.length(1); - expect(requestQueue.getStartable(searchStrategy)).to.have.length(3); + expect(requestQueue.getStartable(esDocStrategy)).to.have.length(1); + expect(requestQueue.getStartable(esSearchStrategy)).to.have.length(3); }); it('returns all requests when no strategy passed', function () { requestQueue.push( - new MockReq(docStrategy), - new MockReq(searchStrategy) + new MockReq(esDocStrategy), + new MockReq(esSearchStrategy) ); expect(requestQueue.getStartable()).to.have.length(2); @@ -50,8 +50,8 @@ describe('Courier Request Queue', function () { it('returns only startable requests', function () { requestQueue.push( - new MockReq(docStrategy, true), - new MockReq(searchStrategy, false) + new MockReq(esDocStrategy, true), + new MockReq(esSearchStrategy, false) ); expect(requestQueue.getStartable()).to.have.length(1); @@ -61,20 +61,20 @@ describe('Courier Request Queue', function () { describe('#get(strategy)', function () { it('only returns requests that match one of the passed strategies', function () { requestQueue.push( - new MockReq(docStrategy), - new MockReq(searchStrategy), - new MockReq(searchStrategy), - new MockReq(searchStrategy) + new MockReq(esDocStrategy), + new MockReq(esSearchStrategy), + new MockReq(esSearchStrategy), + new MockReq(esSearchStrategy) ); - expect(requestQueue.get(docStrategy)).to.have.length(1); - expect(requestQueue.get(searchStrategy)).to.have.length(3); + expect(requestQueue.get(esDocStrategy)).to.have.length(1); + expect(requestQueue.get(esSearchStrategy)).to.have.length(3); }); it('returns all requests when no strategy passed', function () { requestQueue.push( - new MockReq(docStrategy), - new MockReq(searchStrategy) + new MockReq(esDocStrategy), + new MockReq(esSearchStrategy) ); expect(requestQueue.get()).to.have.length(2); @@ -82,8 +82,8 @@ describe('Courier Request Queue', function () { it('returns startable and not-startable requests', function () { requestQueue.push( - new MockReq(docStrategy, true), - new MockReq(searchStrategy, false) + new MockReq(esDocStrategy, true), + new MockReq(esSearchStrategy, false) ); expect(requestQueue.get()).to.have.length(2); diff --git a/src/ui/public/courier/courier.js b/src/ui/public/courier/courier.js index 486d5bcb20e88..e8e5e34a5b4e5 100644 --- a/src/ui/public/courier/courier.js +++ b/src/ui/public/courier/courier.js @@ -10,7 +10,7 @@ import Notifier from 'ui/notify/notifier'; import DocSourceProvider from './data_source/doc_source'; import SearchSourceProvider from './data_source/search_source'; -import SearchStrategyProvider from './fetch/strategy/search'; +import EsSearchStrategyProvider from './fetch_types/es_search_strategy'; import RequestQueueProvider from './_request_queue'; import ErrorHandlersProvider from './_error_handlers'; import FetchProvider from './fetch'; @@ -28,7 +28,7 @@ uiModules.get('kibana/courier') var DocSource = Private(DocSourceProvider); var SearchSource = Private(SearchSourceProvider); - var searchStrategy = Private(SearchStrategyProvider); + var esSearchStrategy = Private(EsSearchStrategyProvider); var requestQueue = Private(RequestQueueProvider); var errorHandlers = Private(ErrorHandlersProvider); @@ -75,7 +75,7 @@ uiModules.get('kibana/courier') * individual errors are routed to their respective requests. */ self.fetch = function () { - fetch.fetchQueued(searchStrategy).then(function () { + fetch.fetchQueued(esSearchStrategy).then(function () { searchLooper.restart(); }); }; diff --git a/src/ui/public/courier/data_source/doc_source.js b/src/ui/public/courier/data_source/doc_source.js index c2725309bfb7e..c4f39cae01dea 100644 --- a/src/ui/public/courier/data_source/doc_source.js +++ b/src/ui/public/courier/data_source/doc_source.js @@ -5,18 +5,18 @@ import 'ui/storage'; import DocSendToEsProvider from './_doc_send_to_es'; import AbstractDataSourceProvider from './_abstract'; -import DocRequestProvider from '../fetch/request/doc'; -import DocStrategyProvider from '../fetch/strategy/doc'; +import EsDocRequestProvider from '../fetch_types/es_doc_request'; +import EsDocStrategyProvider from '../fetch_types/es_doc_strategy'; export default function DocSourceFactory(Private, Promise, es, sessionStorage) { var sendToEs = Private(DocSendToEsProvider); var SourceAbstract = Private(AbstractDataSourceProvider); - var DocRequest = Private(DocRequestProvider); - var docStrategy = Private(DocStrategyProvider); + var EsDocRequest = Private(EsDocRequestProvider); + var esDocStrategy = Private(EsDocStrategyProvider); _.class(DocSource).inherits(SourceAbstract); function DocSource(initialState) { - DocSource.Super.call(this, initialState, docStrategy); + DocSource.Super.call(this, initialState, esDocStrategy); } DocSource.prototype.onUpdate = SourceAbstract.prototype.onResults; @@ -27,7 +27,7 @@ export default function DocSourceFactory(Private, Promise, es, sessionStorage) { *****/ DocSource.prototype._createRequest = function (defer) { - return new DocRequest(this, defer); + return new EsDocRequest(this, defer); }; /** diff --git a/src/ui/public/courier/data_source/search_source.js b/src/ui/public/courier/data_source/search_source.js index 0cd2a14d749be..72a459ceac9e0 100644 --- a/src/ui/public/courier/data_source/search_source.js +++ b/src/ui/public/courier/data_source/search_source.js @@ -1,22 +1,22 @@ import _ from 'lodash'; -import NormalizeSortRequestProvider from './_normalize_sort_request'; import rootSearchSource from './_root_search_source'; import AbstractDataSourceProvider from './_abstract'; -import SearchRequestProvider from '../fetch/request/search'; -import SegmentedRequestProvider from '../fetch/request/segmented'; -import SearchStrategyProvider from '../fetch/strategy/search'; +import EsSearchRequestProvider from '../fetch_types/es_search_request'; +import EsSegmentedRequestProvider from '../fetch_types/es_segmented_request'; +import EsSearchStrategyProvider from '../fetch_types/es_search_strategy'; +import NormalizeSortRequestProvider from './_normalize_sort_request'; export default function SearchSourceFactory(Promise, Private) { var SourceAbstract = Private(AbstractDataSourceProvider); - var SearchRequest = Private(SearchRequestProvider); - var SegmentedRequest = Private(SegmentedRequestProvider); - var searchStrategy = Private(SearchStrategyProvider); + var SearchRequest = Private(EsSearchRequestProvider); + var SegmentedRequest = Private(EsSegmentedRequestProvider); + var esSearchStrategy = Private(EsSearchStrategyProvider); var normalizeSortRequest = Private(NormalizeSortRequestProvider); _.class(SearchSource).inherits(SourceAbstract); function SearchSource(initialState) { - SearchSource.Super.call(this, initialState, searchStrategy); + SearchSource.Super.call(this, initialState, esSearchStrategy); } /***** diff --git a/src/ui/public/courier/fetch/__tests__/fetch_these.js b/src/ui/public/courier/fetch/__tests__/fetch_these.js index 456a9ec0e9ff1..6ce97ae0631da 100644 --- a/src/ui/public/courier/fetch/__tests__/fetch_these.js +++ b/src/ui/public/courier/fetch/__tests__/fetch_these.js @@ -3,9 +3,11 @@ import sinon from 'auto-release-sinon'; import expect from 'expect.js'; import ngMock from 'ngMock'; -import FetchTheseProvider from '../fetch/fetch_these'; +import FetchTheseProvider from '../fetch_these'; +import ContinueIncompleteProvider from '../continue_incomplete'; +import CallResponseHandlersProvider from '../call_response_handlers'; -describe('ui/courier/fetch/_fetch_these', () => { +describe('ui/courier - fetch_these', () => { let Promise; let $rootScope; @@ -24,9 +26,8 @@ describe('ui/courier/fetch/_fetch_these', () => { return fakeResponses; } - PrivateProvider.swap(require('ui/courier/fetch/_call_client'), FakeResponsesProvider); - PrivateProvider.swap(require('ui/courier/fetch/_call_response_handlers'), FakeResponsesProvider); - PrivateProvider.swap(require('ui/courier/fetch/_continue_incomplete'), FakeResponsesProvider); + PrivateProvider.swap(CallResponseHandlersProvider, FakeResponsesProvider); + PrivateProvider.swap(ContinueIncompleteProvider, FakeResponsesProvider); })); beforeEach(ngMock.inject((Private, $injector) => { @@ -53,7 +54,7 @@ describe('ui/courier/fetch/_fetch_these', () => { expect(request.start.callCount).to.be(1); expect(fakeResponses.callCount).to.be(0); $rootScope.$apply(); - expect(fakeResponses.callCount).to.be(3); + expect(fakeResponses.callCount).to.be(2); }); it('invokes request failure handler if starting fails', () => { @@ -77,7 +78,7 @@ describe('ui/courier/fetch/_fetch_these', () => { expect(request.continue.callCount).to.be(1); expect(fakeResponses.callCount).to.be(0); $rootScope.$apply(); - expect(fakeResponses.callCount).to.be(3); + expect(fakeResponses.callCount).to.be(2); }); it('invokes request failure handler if continuing fails', () => { request.continue = sinon.stub().returns(Promise.reject('some error')); @@ -89,7 +90,10 @@ describe('ui/courier/fetch/_fetch_these', () => { function mockRequest() { return { - strategy: 'mock', + strategy: { + type: 'mock', + execute() {} + }, started: true, aborted: false, handleFailure: sinon.spy(), diff --git a/src/ui/public/courier/fetch/call_client.js b/src/ui/public/courier/fetch/call_client.js deleted file mode 100644 index 2157e4420d958..0000000000000 --- a/src/ui/public/courier/fetch/call_client.js +++ /dev/null @@ -1,125 +0,0 @@ -import _ from 'lodash'; - -import IsRequestProvider from './is_request'; -import MergeDuplicatesRequestProvider from './merge_duplicate_requests'; -import ReqStatusProvider from './req_status'; - -export default function CourierFetchCallClient(Private, Promise, es, esShardTimeout, sessionId) { - - const isRequest = Private(IsRequestProvider); - const mergeDuplicateRequests = Private(MergeDuplicatesRequestProvider); - - const ABORTED = Private(ReqStatusProvider).ABORTED; - const DUPLICATE = Private(ReqStatusProvider).DUPLICATE; - - function callClient(strategy, requests) { - // merging docs can change status to DUPLICATE, capture new statuses - const statuses = mergeDuplicateRequests(requests); - - // get the actual list of requests that we will be fetching - const executable = statuses.filter(isRequest); - let execCount = executable.length; - - // resolved by respond() - let esPromise; - const defer = Promise.defer(); - - // for each respond with either the response or ABORTED - const respond = function (responses) { - responses = responses || []; - return Promise.map(requests, function (req, i) { - switch (statuses[i]) { - case ABORTED: - return ABORTED; - case DUPLICATE: - return req._uniq.resp; - default: - return responses[_.findIndex(executable, req)]; - } - }) - .then( - (res) => defer.resolve(res), - (err) => defer.reject(err) - ); - }; - - - // handle a request being aborted while being fetched - const requestWasAborted = Promise.method(function (req, i) { - if (statuses[i] === ABORTED) { - defer.reject(new Error('Request was aborted twice?')); - } - - execCount -= 1; - if (execCount > 0) { - // the multi-request still contains other requests - return; - } - - if (esPromise && _.isFunction(esPromise.abort)) { - esPromise.abort(); - } - - esPromise = ABORTED; - - return respond(); - }); - - - // attach abort handlers, close over request index - statuses.forEach(function (req, i) { - if (!isRequest(req)) return; - req.whenAborted(function () { - requestWasAborted(req, i).catch(defer.reject); - }); - }); - - - // Now that all of THAT^^^ is out of the way, lets actually - // call out to elasticsearch - Promise.map(executable, function (req) { - return Promise.try(req.getFetchParams, void 0, req) - .then(function (fetchParams) { - return (req.fetchParams = fetchParams); - }); - }) - .then(function (reqsFetchParams) { - return strategy.reqsFetchParamsToBody(reqsFetchParams); - }) - .then(function (body) { - // while the strategy was converting, our request was aborted - if (esPromise === ABORTED) { - throw ABORTED; - } - - return (esPromise = es[strategy.clientMethod]({ - timeout: esShardTimeout, - ignore_unavailable: true, - preference: sessionId, - body: body - })); - }) - .then(function (clientResp) { - return strategy.getResponses(clientResp); - }) - .then(respond) - .catch(function (err) { - if (err === ABORTED) respond(); - else defer.reject(err); - }); - - // return our promise, but catch any errors we create and - // send them to the requests - return defer.promise - .catch(function (err) { - requests.forEach(function (req, i) { - if (statuses[i] !== ABORTED) { - req.handleFailure(err); - } - }); - }); - - } - - return callClient; -}; diff --git a/src/ui/public/courier/fetch/call_response_handlers.js b/src/ui/public/courier/fetch/call_response_handlers.js index 41b4c2bc20205..c9723c9ad0dfc 100644 --- a/src/ui/public/courier/fetch/call_response_handlers.js +++ b/src/ui/public/courier/fetch/call_response_handlers.js @@ -1,4 +1,4 @@ -import { RequestFailure, SearchTimeout, ShardFailure } from 'ui/errors'; +import 'ui/promises'; import ReqStatusProvider from './req_status'; import NotifierProvider from './notifier'; @@ -15,15 +15,7 @@ export default function CourierFetchCallResponseHandlers(Private, Promise) { return ABORTED; } - let resp = responses[i]; - - if (resp.timed_out) { - notify.warning(new SearchTimeout()); - } - - if (resp._shards && resp._shards.failed) { - notify.warning(new ShardFailure(resp)); - } + const resp = responses[i]; function progress() { if (req.isIncomplete()) { @@ -34,20 +26,12 @@ export default function CourierFetchCallResponseHandlers(Private, Promise) { return resp; } - if (resp.error) { - if (req.filterError(resp)) { - return progress(); + return Promise.try(function () { + if (resp instanceof Error) { + return req.handleFailure(resp); } else { - return req.handleFailure(new RequestFailure(null, resp)); + return req.handleResponse(resp); } - } - - return Promise.try(function () { - return req.transformResponse(resp); - }) - .then(function () { - resp = arguments[0]; - return req.handleResponse(resp); }) .then(progress); }); diff --git a/src/ui/public/courier/fetch/fetch_these.js b/src/ui/public/courier/fetch/fetch_these.js index 78affdf29ac53..b5b72086593ca 100644 --- a/src/ui/public/courier/fetch/fetch_these.js +++ b/src/ui/public/courier/fetch/fetch_these.js @@ -1,6 +1,5 @@ import NotifierProvider from './notifier'; import ForEachStrategyProvider from './for_each_strategy'; -import CallClientProvider from './call_client'; import CallResponseHandlersProvider from './call_response_handlers'; import ContinueIncompleteProvider from './continue_incomplete'; import ReqStatusProvider from './req_status'; @@ -10,7 +9,6 @@ export default function FetchTheseProvider(Private, Promise) { const forEachStrategy = Private(ForEachStrategyProvider); // core tasks - const callClient = Private(CallClientProvider); const callResponseHandlers = Private(CallResponseHandlersProvider); const continueIncomplete = Private(ContinueIncompleteProvider); @@ -36,7 +34,7 @@ export default function FetchTheseProvider(Private, Promise) { return startRequests(requests) .then(function () { - return callClient(strategy, requests); + return strategy.execute(requests); }) .then(function (responses) { return callResponseHandlers(requests, responses); diff --git a/src/ui/public/courier/fetch/is_request.js b/src/ui/public/courier/fetch/is_request.js index f64ab1e1ec08d..2b93077d13fd8 100644 --- a/src/ui/public/courier/fetch/is_request.js +++ b/src/ui/public/courier/fetch/is_request.js @@ -1,4 +1,4 @@ -import AbstractRequestProvider from './request'; +import AbstractRequestProvider from '../fetch_types/abstract_request'; export default function IsRequestProvider(Private) { const AbstractRequest = Private(AbstractRequestProvider); diff --git a/src/ui/public/courier/fetch/merge_duplicate_requests.js b/src/ui/public/courier/fetch/merge_duplicate_requests.js deleted file mode 100644 index 5c698f422834b..0000000000000 --- a/src/ui/public/courier/fetch/merge_duplicate_requests.js +++ /dev/null @@ -1,29 +0,0 @@ -import IsRequestProvider from './is_request'; -import ReqStatusProvider from './req_status'; - -export default function FetchMergeDuplicateRequests(Private) { - const isRequest = Private(IsRequestProvider); - const DUPLICATE = Private(ReqStatusProvider).DUPLICATE; - - function mergeDuplicateRequests(requests) { - // dedupe requests - const index = {}; - return requests.map(function (req) { - if (!isRequest(req)) return req; - - const iid = req.source._instanceid; - if (!index[iid]) { - // this request is unique so far - index[iid] = req; - // keep the request - return req; - } - - // the source was requested at least twice - req._uniq = index[iid]; - return DUPLICATE; - }); - } - - return mergeDuplicateRequests; -}; diff --git a/src/ui/public/courier/fetch/request/error_handler.js b/src/ui/public/courier/fetch/request/error_handler.js deleted file mode 100644 index 91e6c0929df39..0000000000000 --- a/src/ui/public/courier/fetch/request/error_handler.js +++ /dev/null @@ -1,29 +0,0 @@ -import Notifier from 'ui/notify/notifier'; - -import ErrorHandlersProvider from '../../_error_handlers'; - -export default function RequestErrorHandlerFactory(Private) { - const errHandlers = Private(ErrorHandlersProvider); - - const notify = new Notifier({ - location: 'Courier Fetch Error' - }); - - function handleError(req, error) { - const myHandlers = []; - - errHandlers.splice(0).forEach(function (handler) { - (handler.source === req.source ? myHandlers : errHandlers).push(handler); - }); - - if (!myHandlers.length) { - notify.fatal(new Error(`unhandled courier request error: ${ notify.describeError(error) }`)); - } else { - myHandlers.forEach(function (handler) { - handler.defer.resolve(error); - }); - } - } - - return handleError; -}; diff --git a/src/ui/public/courier/fetch/request/request.js b/src/ui/public/courier/fetch/request/request.js deleted file mode 100644 index c360cf5dd94e9..0000000000000 --- a/src/ui/public/courier/fetch/request/request.js +++ /dev/null @@ -1,114 +0,0 @@ -import _ from 'lodash'; -import moment from 'moment'; - -import errors from 'ui/errors'; - -import RequestQueueProvider from '../../_request_queue'; -import ErrorHandlerRequestProvider from './error_handler'; - -export default function AbstractReqProvider(Private, Promise) { - const requestQueue = Private(RequestQueueProvider); - const requestErrorHandler = Private(ErrorHandlerRequestProvider); - - const onStop = Symbol('onStopMethod'); - const whenAbortedHandlers = Symbol('whenAbortedHandlers'); - - return class AbstractReq { - constructor(source, defer) { - this.source = source; - this.defer = defer || Promise.defer(); - this[whenAbortedHandlers] = []; - - requestQueue.push(this); - } - - canStart() { - return Boolean(!this.stopped && !this.source._fetchDisabled); - } - - start() { - if (this.started) { - throw new TypeError('Unable to start request because it has already started'); - } - - this.started = true; - this.moment = moment(); - - const source = this.source; - if (source.activeFetchCount) { - source.activeFetchCount += 1; - } else { - source.activeFetchCount = 1; - } - - source.history = [this]; - } - - getFetchParams() { - return this.source._flatten(); - } - - transformResponse(resp) { - return resp; - } - - filterError(resp) { - return false; - } - - handleResponse(resp) { - this.success = true; - this.resp = resp; - } - - handleFailure(error) { - this.success = false; - this.resp = error && error.resp; - this.retry(); - return requestErrorHandler(this, error); - } - - isIncomplete() { - return false; - } - - continue() { - throw new Error('Unable to continue ' + this.type + ' request'); - } - - retry() { - const clone = this.clone(); - this.abort(); - return clone; - } - - [onStop]() { - if (this.stopped) return; - - this.stopped = true; - this.source.activeFetchCount -= 1; - _.pull(requestQueue, this); - } - - abort() { - this[onStop](); - this.defer = null; - this.aborted = true; - _.callEach(this[whenAbortedHandlers]); - } - - whenAborted(cb) { - this[whenAbortedHandlers].push(cb); - } - - completefunction() { - this[onStop](); - this.ms = this.moment.diff() * -1; - this.defer.resolve(this.resp); - } - - clone() { - return new this.constructor(this.source, this.defer); - } - }; -}; diff --git a/src/ui/public/courier/fetch/request/search.js b/src/ui/public/courier/fetch/request/search.js deleted file mode 100644 index 3556a8aee9c42..0000000000000 --- a/src/ui/public/courier/fetch/request/search.js +++ /dev/null @@ -1,19 +0,0 @@ -import _ from 'lodash'; - -import SearchStrategyProvider from '../strategy/search'; -import AbstractRequestProvider from './request'; - -export default function SearchReqProvider(Private) { - - const searchStrategy = Private(SearchStrategyProvider); - const AbstractRequest = Private(AbstractRequestProvider); - - return class SearchReq extends AbstractRequest { - constructor(...args) { - super(...args); - - this.type = 'search'; - this.strategy = searchStrategy; - } - }; -}; diff --git a/src/ui/public/courier/fetch/__tests__/doc.js b/src/ui/public/courier/fetch_types/__tests__/doc.js similarity index 81% rename from src/ui/public/courier/fetch/__tests__/doc.js rename to src/ui/public/courier/fetch_types/__tests__/doc.js index 6ce8059c14ba1..dc8f53f7898db 100644 --- a/src/ui/public/courier/fetch/__tests__/doc.js +++ b/src/ui/public/courier/fetch_types/__tests__/doc.js @@ -2,10 +2,10 @@ import sinon from 'auto-release-sinon'; import expect from 'expect.js'; import ngMock from 'ngMock'; -import DocSourceProvider from '../../data_source/doc_source'; -import DocRequestProvider from '../request/doc'; +import EsDocSourceProvider from '../../data_source/doc_source'; +import EsDocRequestProvider from '../es_doc_request'; -describe('Courier DocFetchRequest class', function () { +describe('Courier EsDocFetchRequest class', function () { let storage; let source; let defer; @@ -15,8 +15,8 @@ describe('Courier DocFetchRequest class', function () { beforeEach(ngMock.module('kibana')); beforeEach(ngMock.inject(function (Private, Promise, $injector) { - const DocSource = Private(DocSourceProvider); - const DocFetchRequest = Private(DocRequestProvider); + const EsDocSource = Private(EsDocSourceProvider); + const EsDocFetchRequest = Private(EsDocRequestProvider); storage = $injector.get('localStorage').store = @@ -27,22 +27,22 @@ describe('Courier DocFetchRequest class', function () { clear: sinon.stub() }; - source = new DocSource({}) + source = new EsDocSource({}) .set('index', 'doc-index') .set('type', 'doc-type') .set('id', 'doc-id'); defer = Promise.defer(); - req = new DocFetchRequest(source, defer); + req = new EsDocFetchRequest(source, defer); /** * Setup the version numbers for tests. There are two versions for the * purposes of these tests. * - * @param {number} mine - the version that the DocSource most + * @param {number} mine - the version that the EsDocSource most * recently received from elasticsearch. - * @param {number} theirs - the version that other DocSources have + * @param {number} theirs - the version that other EsDocSources have * received from elasticsearfch. */ setVersion = function (mine, theirs) { diff --git a/src/ui/public/courier/fetch/strategy/__tests__/search.js b/src/ui/public/courier/fetch_types/__tests__/es_search_strategy.js similarity index 73% rename from src/ui/public/courier/fetch/strategy/__tests__/search.js rename to src/ui/public/courier/fetch_types/__tests__/es_search_strategy.js index 21d62a2bbb419..75714ddb962fe 100644 --- a/src/ui/public/courier/fetch/strategy/__tests__/search.js +++ b/src/ui/public/courier/fetch_types/__tests__/es_search_strategy.js @@ -3,21 +3,21 @@ import sinon from 'auto-release-sinon'; import expect from 'expect.js'; import ngMock from 'ngMock'; -import SearchStrategyProvider from '../search'; +import EsSearchStrategyProvider from '../es_search_strategy'; -describe('ui/courier/fetch/strategy/search', () => { +describe('ui/courier es_search_strategy', () => { let Promise; let $rootScope; - let search; let reqsFetchParams; + let esSearchStrategy; beforeEach(ngMock.module('kibana')); beforeEach(ngMock.inject((Private, $injector) => { Promise = $injector.get('Promise'); $rootScope = $injector.get('$rootScope'); - search = Private(SearchStrategyProvider); + esSearchStrategy = Private(EsSearchStrategyProvider); reqsFetchParams = [ { index: ['logstash-123'], @@ -30,14 +30,14 @@ describe('ui/courier/fetch/strategy/search', () => { describe('#clientMethod', () => { it('is msearch', () => { - expect(search.clientMethod).to.equal('msearch'); + expect(esSearchStrategy.clientMethod).to.equal('msearch'); }); }); describe('#reqsFetchParamsToBody()', () => { it('filters out any body properties that begin with $', () => { let value; - search.reqsFetchParamsToBody(reqsFetchParams).then(val => value = val); + esSearchStrategy.reqsFetchParamsToBody(reqsFetchParams).then(val => value = val); $rootScope.$apply(); expect(_.includes(value, 'foo')).to.be(true); expect(_.includes(value, '$foo')).to.be(false); @@ -46,7 +46,7 @@ describe('ui/courier/fetch/strategy/search', () => { context('when indexList is not empty', () => { it('includes the index', () => { let value; - search.reqsFetchParamsToBody(reqsFetchParams).then(val => value = val); + esSearchStrategy.reqsFetchParamsToBody(reqsFetchParams).then(val => value = val); $rootScope.$apply(); expect(_.includes(value, '"index":["logstash-123"]')).to.be(true); }); @@ -57,7 +57,7 @@ describe('ui/courier/fetch/strategy/search', () => { it('queries .kibana-devnull instead', () => { let value; - search.reqsFetchParamsToBody(reqsFetchParams).then(val => value = val); + esSearchStrategy.reqsFetchParamsToBody(reqsFetchParams).then(val => value = val); $rootScope.$apply(); expect(_.includes(value, '"index":[".kibana-devnull"]')).to.be(true); }); @@ -67,7 +67,7 @@ describe('ui/courier/fetch/strategy/search', () => { describe('#getResponses()', () => { it('returns the `responses` property of the given arg', () => { const responses = [{}]; - const returned = search.getResponses({ responses }); + const returned = esSearchStrategy.getResponses({ responses }); expect(returned).to.be(responses); }); }); diff --git a/src/ui/public/courier/fetch/request/__tests__/segmented.js b/src/ui/public/courier/fetch_types/__tests__/es_segmented_request.js similarity index 66% rename from src/ui/public/courier/fetch/request/__tests__/segmented.js rename to src/ui/public/courier/fetch_types/__tests__/es_segmented_request.js index cabfae475a296..748c78a3065fb 100644 --- a/src/ui/public/courier/fetch/request/__tests__/segmented.js +++ b/src/ui/public/courier/fetch_types/__tests__/es_segmented_request.js @@ -2,30 +2,30 @@ import sinon from 'auto-release-sinon'; import expect from 'expect.js'; import ngMock from 'ngMock'; -import SegmentedRequestProvider from '../segmented'; -import SearchRequestProvider from '../search'; +import EsSegmentedRequestProvider from '../es_segmented_request'; +import EsSearchRequestProvider from '../es_search_request'; describe('ui/courier/fetch/request/segmented', () => { let Promise; let $rootScope; - let SegmentedReq; - let segmentedReq; - let searchReqStart; + let EsSegmentedReq; + let esSegmentedReq; + let esSearchReqStart; beforeEach(ngMock.module('kibana')); beforeEach(ngMock.inject((Private, $injector) => { Promise = $injector.get('Promise'); $rootScope = $injector.get('$rootScope'); - SegmentedReq = Private(SegmentedRequestProvider); - searchReqStart = sinon.spy(Private(SearchRequestProvider).prototype, 'start'); + EsSegmentedReq = Private(EsSegmentedRequestProvider); + esSearchReqStart = sinon.spy(Private(EsSearchRequestProvider).prototype, 'start'); })); describe('#start()', () => { let returned; beforeEach(() => { init(); - returned = segmentedReq.start(); + returned = esSegmentedReq.start(); }); it('returns promise', () => { @@ -33,14 +33,14 @@ describe('ui/courier/fetch/request/segmented', () => { }); it('does not call super.start() until promise is resolved', () => { - expect(searchReqStart.called).to.be(false); + expect(esSearchReqStart.called).to.be(false); $rootScope.$apply(); - expect(searchReqStart.called).to.be(true); + expect(esSearchReqStart.called).to.be(true); }); }); function init() { - segmentedReq = new SegmentedReq(mockSource()); + esSegmentedReq = new EsSegmentedReq(mockSource()); } function mockSource() { diff --git a/src/ui/public/courier/fetch/request/__tests__/segmented_create_queue.js b/src/ui/public/courier/fetch_types/__tests__/segmented_create_queue.js similarity index 85% rename from src/ui/public/courier/fetch/request/__tests__/segmented_create_queue.js rename to src/ui/public/courier/fetch_types/__tests__/segmented_create_queue.js index e7a20f988090b..55004ddf9e391 100644 --- a/src/ui/public/courier/fetch/request/__tests__/segmented_create_queue.js +++ b/src/ui/public/courier/fetch_types/__tests__/segmented_create_queue.js @@ -4,14 +4,14 @@ import ngMock from 'ngMock'; import StubbedSearchSourceProvider from 'fixtures/stubbed_search_source'; -import SegmentedRequestProvider from '../segmented'; +import EsSegmentedRequestProvider from '../es_segmented_request'; describe('ui/courier/fetch/request/segmented/_createQueue', () => { let Promise; - let $rootScope; - let SegmentedReq; let MockSource; + let $rootScope; + let EsSegmentedReq; require('testUtils/noDigestPromises').activateForSuite(); @@ -19,7 +19,7 @@ describe('ui/courier/fetch/request/segmented/_createQueue', () => { beforeEach(ngMock.inject((Private, $injector) => { Promise = $injector.get('Promise'); $rootScope = $injector.get('$rootScope'); - SegmentedReq = Private(SegmentedRequestProvider); + EsSegmentedReq = Private(EsSegmentedRequestProvider); MockSource = class { constructor() { @@ -29,7 +29,7 @@ describe('ui/courier/fetch/request/segmented/_createQueue', () => { })); it('manages the req._queueCreated flag', async function () { - const req = new SegmentedReq(new MockSource()); + const req = new EsSegmentedReq(new MockSource()); req._queueCreated = null; const promise = req._createQueue(); @@ -44,7 +44,7 @@ describe('ui/courier/fetch/request/segmented/_createQueue', () => { const indices = [1,2,3]; sinon.stub(ip, 'toDetailedIndexList').returns(Promise.resolve(indices)); - const req = new SegmentedReq(source); + const req = new EsSegmentedReq(source); const output = await req._createQueue(); expect(output).to.equal(indices); }); @@ -52,7 +52,7 @@ describe('ui/courier/fetch/request/segmented/_createQueue', () => { it('tells the index pattern its direction', async function () { const source = new MockSource(); const ip = source.get('index'); - const req = new SegmentedReq(source); + const req = new EsSegmentedReq(source); sinon.stub(ip, 'toDetailedIndexList').returns(Promise.resolve([1,2,3])); req.setDirection('asc'); diff --git a/src/ui/public/courier/fetch/request/__tests__/segmented_index_selection.js b/src/ui/public/courier/fetch_types/__tests__/segmented_index_selection.js similarity index 94% rename from src/ui/public/courier/fetch/request/__tests__/segmented_index_selection.js rename to src/ui/public/courier/fetch_types/__tests__/segmented_index_selection.js index 89a2c1735a95c..c9810d3762bed 100644 --- a/src/ui/public/courier/fetch/request/__tests__/segmented_index_selection.js +++ b/src/ui/public/courier/fetch_types/__tests__/segmented_index_selection.js @@ -7,14 +7,14 @@ import HitSortFnProv from 'plugins/kibana/discover/_hit_sort_fn'; import NoDigestPromises from 'testUtils/noDigestPromises'; import StubbedSearchSourceProvider from 'fixtures/stubbed_search_source'; -import SegmentedRequestProvider from '../segmented'; +import EsSegmentedRequestProvider from '../es_segmented_request'; describe('Segmented Request Index Selection', function () { let Promise; + let HitSortFn; let $rootScope; - let SegmentedReq; let MockSource; - let HitSortFn; + let EsSegmentedReq; NoDigestPromises.activateForSuite(); @@ -23,7 +23,7 @@ describe('Segmented Request Index Selection', function () { Promise = $injector.get('Promise'); HitSortFn = Private(HitSortFnProv); $rootScope = $injector.get('$rootScope'); - SegmentedReq = Private(SegmentedRequestProvider); + EsSegmentedReq = Private(EsSegmentedRequestProvider); MockSource = class { constructor() { @@ -43,7 +43,7 @@ describe('Segmented Request Index Selection', function () { { index: 'five', min: 0, max: 1 }, ])); - const req = new SegmentedReq(search); + const req = new EsSegmentedReq(search); req._handle.setDirection('desc'); req._handle.setSortFn(new HitSortFn('desc')); req._handle.setSize(500); @@ -94,7 +94,7 @@ describe('Segmented Request Index Selection', function () { { index: 'five', min: 5, max: 50 }, ])); - const req = new SegmentedReq(search); + const req = new EsSegmentedReq(search); req._handle.setDirection('desc'); req._handle.setSortFn(new HitSortFn('desc')); req._handle.setSize(10); diff --git a/src/ui/public/courier/fetch/request/__tests__/segmented_size_picking.js b/src/ui/public/courier/fetch_types/__tests__/segmented_size_picking.js similarity index 85% rename from src/ui/public/courier/fetch/request/__tests__/segmented_size_picking.js rename to src/ui/public/courier/fetch_types/__tests__/segmented_size_picking.js index 3339b21d2e925..45e3982244c88 100644 --- a/src/ui/public/courier/fetch/request/__tests__/segmented_size_picking.js +++ b/src/ui/public/courier/fetch_types/__tests__/segmented_size_picking.js @@ -7,14 +7,14 @@ import HitSortFnProv from 'plugins/kibana/discover/_hit_sort_fn'; import NoDigestPromises from 'testUtils/noDigestPromises'; import StubbedSearchSourceProvider from 'fixtures/stubbed_search_source'; -import SegmentedRequestProvider from '../segmented'; +import EsSegmentedRequestProvider from '../es_segmented_request'; describe('Segmented Request Size Picking', function () { let Promise; + let HitSortFn; let $rootScope; - let SegmentedReq; let MockSource; - let HitSortFn; + let EsSegmentedReq; NoDigestPromises.activateForSuite(); @@ -23,7 +23,7 @@ describe('Segmented Request Size Picking', function () { Promise = $injector.get('Promise'); HitSortFn = Private(HitSortFnProv); $rootScope = $injector.get('$rootScope'); - SegmentedReq = Private(SegmentedRequestProvider); + EsSegmentedReq = Private(EsSegmentedRequestProvider); MockSource = class { constructor() { @@ -34,7 +34,7 @@ describe('Segmented Request Size Picking', function () { context('without a size', function () { it('does not set the request size', async function () { - const req = new SegmentedReq(new MockSource()); + const req = new EsSegmentedReq(new MockSource()); req._handle.setDirection('desc'); req._handle.setSortFn(new HitSortFn('desc')); await req.start(); @@ -45,7 +45,7 @@ describe('Segmented Request Size Picking', function () { context('with a size', function () { it('sets the request size to the entire desired size', async function () { - const req = new SegmentedReq(new MockSource()); + const req = new EsSegmentedReq(new MockSource()); req._handle.setDirection('desc'); req._handle.setSize(555); req._handle.setSortFn(new HitSortFn('desc')); diff --git a/src/ui/public/courier/fetch_types/abstract_request.js b/src/ui/public/courier/fetch_types/abstract_request.js new file mode 100644 index 0000000000000..c7011ba7d7179 --- /dev/null +++ b/src/ui/public/courier/fetch_types/abstract_request.js @@ -0,0 +1,163 @@ +import _ from 'lodash'; +import moment from 'moment'; + +import errors from 'ui/errors'; + +import RequestQueueProvider from '../_request_queue'; + +export default function AbstractRequestProvider(Private, Promise) { + const requestQueue = Private(RequestQueueProvider); + + const onStop = Symbol('onStopMethod'); + const whenAbortedHandlers = Symbol('whenAbortedHandlers'); + + return class AbstractRequest { + /** + * Initialize the request and add it to the couriers request + * queue so that the request will be considered for execution + * by the courier + * + * @constructor + */ + constructor() { + if (this.constructor === AbstractRequest) { + throw new Error('AbstractRequest should not be constructed directly'); + } + + this[whenAbortedHandlers] = []; + requestQueue.push(this); + } + + /** + * Determine if the request can be started + * @return {Boolean} + */ + canStart() { + return Boolean(!this.stopped); + } + + /** + * Mark the request as started + * + * @return {undefined} + */ + start() { + if (this.started) { + throw new TypeError('Unable to start request because it has already started'); + } + + this.started = true; + this.moment = moment(); + } + + /** + * Mark the request as successful + * + * @return {undefined} + */ + handleResponse(resp) { + this.success = true; + } + + /** + * Mark the request as failed + * + * @return {undefined} + */ + handleFailure(error) { + this.success = false; + } + + /** + * Check to see if this request could be continued + * + * @return {Boolean} + */ + isIncomplete() { + return false; + } + + /** + * Mark this request as continued, or in this case + * throw an error because requests are not continuable + * by default + * + * @return {undefined} + */ + continue() { + throw new Error('Unable to continue ' + this.type + ' request'); + } + + /** + * Retry this request by aborting the current request + * and reentering the request in the request queue + * + * @return {AbstractRequest} + */ + retry() { + const clone = this.clone(); + this.abort(); + return clone; + } + + /** + * Mark the request as stopped and remove it from + * the request queue. Used by both #abort() and + * #complete(). + * + * @private + * @return {undefined} + */ + [onStop]() { + if (this.stopped) return; + + this.stopped = true; + _.pull(requestQueue, this); + } + + /** + * Mark the request as aborted and remove it from the + * request queue. + * + * @return {undefined} + */ + abort() { + this[onStop](); + this.aborted = true; + _.callEach(this[whenAbortedHandlers]); + } + + /** + * Add a function that should be called when this + * request is aborted. + * + * @param {Function} cb - the function to call + * @return {undefined} + */ + whenAborted(cb) { + this[whenAbortedHandlers].push(cb); + } + + /** + * Mark this request as complete and remove it from + * the request queue. + * + * @return {undefined} + */ + complete() { + this[onStop](); + this.ms = this.moment.diff() * -1; + } + + /** + * Create a clone of this request. Used when retrying a + * request. Causes the new request to be added to the + * request queue. + * + * @return {AbstractRequest} - the clone + */ + clone() { + return new this.constructor(); + } + }; +}; diff --git a/src/ui/public/courier/fetch_types/abstract_strategy.js b/src/ui/public/courier/fetch_types/abstract_strategy.js new file mode 100644 index 0000000000000..097d55e25741b --- /dev/null +++ b/src/ui/public/courier/fetch_types/abstract_strategy.js @@ -0,0 +1,7 @@ +export default function AbstractStrategyProvider() { + return class AbstractStrategy { + exectute() { + throw new Error('Subclasses of AbstractStrategy must override the execute() method.'); + } + }; +} diff --git a/src/ui/public/courier/fetch_types/data_source_request.js b/src/ui/public/courier/fetch_types/data_source_request.js new file mode 100644 index 0000000000000..e4c93e51661a2 --- /dev/null +++ b/src/ui/public/courier/fetch_types/data_source_request.js @@ -0,0 +1,144 @@ +import notify from 'ui/notify'; + +import AbstractRequestProvider from './abstract_request'; +import ErrorHandlersProvider from '../_error_handlers'; + +export default function DataSourceRequestProvider(Private, Promise) { + const errorHandlers = Private(ErrorHandlersProvider); + const AbstractRequest = Private(AbstractRequestProvider); + + return class DataSourceRequest extends AbstractRequest { + + /** + * Overriden to track the source for this request + * + * @override + * @constructor + */ + constructor(source, defer) { + super(); + + this.defer = defer || Promise.defer(); + this.source = source; + } + + /** + * Overriden to consult wether the source has fetch disabled for + * some reason + * + * @override + * @return {Boolean} + */ + canStart() { + return Boolean(super.canStart() && !this.source._fetchDisabled); + } + + /** + * Read the data source to produce fetch params for the elastisearch client + * + * @return {Object} - request parameters for the es client + */ + getFetchParams() { + return this.source._flatten(); + } + + /** + * Overridden to track the active request count on the search source + * + * @override + * @return {undefined} + */ + start() { + super.start(); + + const source = this.source; + if (source.activeFetchCount) { + source.activeFetchCount += 1; + } else { + source.activeFetchCount = 1; + } + + source.history = [this]; + } + + /** + * Overridden to track the active request count on the + * search source and to dereference the defer + * + * @override + * @return {undefined} + */ + abort() { + if (this.stopped) return; + + super.abort(); + this.defer = null; + this.source.activeFetchCount -= 1; + } + + /** + * Overridden to track the response, which will + * later be sent to the defer + * + * @override + * @return {undefined} + */ + handleResponse(resp) { + super.handleResponse(); + this.resp = resp; + } + + /** + * Overridden to track the response from the error, which will + * later be sent to the defer + * + * @override + * @return {undefined} + */ + handleFailure(error) { + super.handleFailure(); + + this.resp = error && error.resp; + const ownHandlers = []; + const otherHandlers = []; + errorHandlers.forEach(handler => { + const list = handler.source === this.source ? ownHandlers : otherHandlers; + list.push(handler); + }); + + // remove all handlers from errorHandlers and replace + // them with just the list of other listeners + errorHandlers.splice(0, errorHandlers.length, otherHandlers); + + if (!ownHandlers.length) { + notify.fatal(new Error(`unhandled courier request error: ${notify.describeError(error) }`)); + } else { + ownHandlers.forEach(handler => handler.defer.resolve(error)); + } + + this.retry(); + } + + /** + * Overridden to send response to the defer + * + * @override + * @return {undefined} + */ + complete() { + super.complete(); + this.source.activeFetchCount -= 1; + this.defer.resolve(this.resp); + } + + /** + * Overridden to inject the source argument when cloned + * + * @override + * @return {undefined} + */ + clone() { + return new this.constructor(this.source, this.defer); + } + }; +}; diff --git a/src/ui/public/courier/fetch_types/es_abstract_strategy.js b/src/ui/public/courier/fetch_types/es_abstract_strategy.js new file mode 100644 index 0000000000000..043332c6eb7b2 --- /dev/null +++ b/src/ui/public/courier/fetch_types/es_abstract_strategy.js @@ -0,0 +1,207 @@ +import _ from 'lodash'; + +import { RequestFailure, SearchTimeout, ShardFailure } from 'ui/errors'; + +import IsRequestProvider from '../fetch/is_request'; +import ReqStatusProvider from '../fetch/req_status'; +import NotifierProvider from '../fetch/notifier'; +import AbstractStrategyProvider from './abstract_strategy'; + +export default function EsClientExecutorProvider(Private, Promise, es, esShardTimeout, sessionId) { + const notify = Private(NotifierProvider); + const isRequest = Private(IsRequestProvider); + const AbstractStrategy = Private(AbstractStrategyProvider); + + const { ABORTED, DUPLICATE } = Private(ReqStatusProvider); + + return class EsAbstractStrategy extends AbstractStrategy { + constructor() { + super(); + if (this.constructor === EsAbstractStrategy) { + throw new Error('EsAbstractStrategy should not be constructed directly'); + } + } + + /** + * Convert the list of requests into the body which should be sent to + * elasticsearch. + * + * @return {Any} + */ + reqsFetchParamsToBody() { + throw new Error('this method must be overriden by subclassses of EsAbstractStrategy'); + } + + /** + * Convert the response from elasticsearch into an array of ordered responses + * which matches the request order. + * + * @return {Array[Any]} + */ + getResponses() { + throw new Error('this method must be overriden by subclassses of EsAbstractStrategy'); + } + + /** + * Combine any requests that are the same, so that they are + * only executed in elasticsearch once. + * + * Duplicate values are converted into DUPLICATE status + * objects, and deduplcilated on response + * + * @private + * @param {Array[Requests]} requests + * @return {Array[Requests]} + */ + mergeDuplicateRequests(requests) { + // dedupe requests + const index = {}; + return requests.map(function (req) { + if (!isRequest(req)) return req; + + const iid = req.source._instanceid; + if (!index[iid]) { + // this request is unique so far + index[iid] = req; + // keep the request + return req; + } + + // the source was requested at least twice + req._uniq = index[iid]; + return DUPLICATE; + }); + } + + /** + * Called by the courier when requests with this strategy must + * be executed. + * + * @param {Array[Request]} requests + * @return {Promise} + */ + execute(requests) { + // merging docs can change status to DUPLICATE, capture new statuses + const statuses = this.mergeDuplicateRequests(requests); + + // get the actual list of requests that we will be fetching + const executable = statuses.filter(isRequest); + let execCount = executable.length; + + // resolved by respond() + let esPromise; + const defer = Promise.defer(); + + const checkForEsError = (req, resp) => { + if (resp.error && req.filterError && !req.filterError(resp)) { + return new RequestFailure(null, resp); + } + + if (resp.timed_out) { + notify.warning(new SearchTimeout()); + } + + if (resp._shards && resp._shards.failed) { + notify.warning(new ShardFailure(resp)); + } + + return resp; + }; + + // for each respond with either the response or ABORTED + const respond = (responses) => { + responses = responses || []; + return Promise.map(requests, (req, i) => { + switch (statuses[i]) { + case ABORTED: + return ABORTED; + case DUPLICATE: + return req._uniq.resp; + default: + return checkForEsError(req, responses[_.findIndex(executable, req)]); + } + }) + .then( + (res) => defer.resolve(res), + (err) => defer.reject(err) + ); + }; + + // handle a request being aborted while being fetched + const requestWasAborted = Promise.method((req, i) => { + if (statuses[i] === ABORTED) { + defer.reject(new Error('Request was aborted twice?')); + } + + execCount -= 1; + if (execCount > 0) { + // the multi-request still contains other requests + return; + } + + if (esPromise && _.isFunction(esPromise.abort)) { + esPromise.abort(); + } + + esPromise = ABORTED; + + return respond(); + }); + + + // attach abort handlers, close over request index + statuses.forEach((req, i) => { + if (!isRequest(req)) return; + req.whenAborted(() => { + requestWasAborted(req, i).catch(defer.reject); + }); + }); + + + // Now that all of THAT^^^ is out of the way, lets actually + // call out to elasticsearch + Promise.map(executable, (req) => { + return Promise.try(req.getFetchParams, void 0, req) + .then((fetchParams) => { + return (req.fetchParams = fetchParams); + }); + }) + .then((reqsFetchParams) => { + return this.reqsFetchParamsToBody(reqsFetchParams); + }) + .then((body) => { + // while the reqsFetchParamsToBody was converting, our request may have been aborted + if (esPromise === ABORTED) { + throw ABORTED; + } + + return (esPromise = es[this.clientMethod]({ + timeout: esShardTimeout, + ignore_unavailable: true, + preference: sessionId, + body: body + })); + }) + .then((clientResp) => { + return this.getResponses(clientResp); + }) + .then(respond) + .catch((err) => { + if (err === ABORTED) respond(); + else defer.reject(err); + }); + + // return our promise, but catch any errors we create and + // send them to the requests + return defer.promise + .catch((err) => { + requests.forEach((req, i) => { + if (statuses[i] !== ABORTED) { + req.handleFailure(err); + } + }); + }); + } + }; + +} diff --git a/src/ui/public/courier/fetch/request/doc.js b/src/ui/public/courier/fetch_types/es_doc_request.js similarity index 67% rename from src/ui/public/courier/fetch/request/doc.js rename to src/ui/public/courier/fetch_types/es_doc_request.js index ac41d141c8cdd..1655539c240f2 100644 --- a/src/ui/public/courier/fetch/request/doc.js +++ b/src/ui/public/courier/fetch_types/es_doc_request.js @@ -1,19 +1,19 @@ import _ from 'lodash'; -import DocStrategyProvider from '../strategy/doc'; -import AbstractRequestProvider from './request'; +import EsDocStrategyProvider from './es_doc_strategy'; +import DataSourceRequestProvider from './data_source_request'; export default function DocRequestProvider(Private) { - const docStrategy = Private(DocStrategyProvider); - const AbstractRequest = Private(AbstractRequestProvider); + const esDocStrategy = Private(EsDocStrategyProvider); + const DataSourceRequest = Private(DataSourceRequestProvider); - class DocRequest extends AbstractRequest { + return class EsDocRequest extends DataSourceRequest { constructor(...args) { super(...args); - this.type = 'doc'; - this.strategy = docStrategy; + this.type = 'es_doc'; + this.strategy = esDocStrategy; } canStart() { @@ -39,7 +39,5 @@ export default function DocRequestProvider(Private) { return super.handleResponse(resp); } - } - - return DocRequest; + }; }; diff --git a/src/ui/public/courier/fetch/strategy/doc.js b/src/ui/public/courier/fetch_types/es_doc_strategy.js similarity index 55% rename from src/ui/public/courier/fetch/strategy/doc.js rename to src/ui/public/courier/fetch_types/es_doc_strategy.js index 86a46af90ca8a..f4ba86e31d32c 100644 --- a/src/ui/public/courier/fetch/strategy/doc.js +++ b/src/ui/public/courier/fetch_types/es_doc_strategy.js @@ -1,24 +1,31 @@ -export default function FetchStrategyForDoc(Promise) { - return { - clientMethod: 'mget', +import EsAbstractStrategyProvider from './es_abstract_strategy'; + +export default function FetchStrategyForDoc(Private, Promise) { + const EsAbstractStrategy = Private(EsAbstractStrategyProvider); + + return new class EsDocStrategy extends EsAbstractStrategy { + constructor() { + super(); + this.clientMethod = 'mget'; + } /** * Flatten a series of requests into as ES request body * @param {array} requests - an array of flattened requests * @return {Promise} - a promise that is fulfilled by the request body */ - reqsFetchParamsToBody: function (reqsFetchParams) { + reqsFetchParamsToBody(reqsFetchParams) { return Promise.resolve({ docs: reqsFetchParams }); - }, + } /** * Fetch the multiple responses from the ES Response * @param {object} resp - The response sent from Elasticsearch * @return {array} - the list of responses */ - getResponses: function (resp) { + getResponses(resp) { return resp.docs; } }; diff --git a/src/ui/public/courier/fetch_types/es_search_request.js b/src/ui/public/courier/fetch_types/es_search_request.js new file mode 100644 index 0000000000000..87bc92a60e699 --- /dev/null +++ b/src/ui/public/courier/fetch_types/es_search_request.js @@ -0,0 +1,19 @@ +import _ from 'lodash'; + +import EsSearchStrategyProvider from './es_search_strategy'; +import DataSourceRequestProvider from './data_source_request'; + +export default function SearchReqProvider(Private) { + + const esSearchStrategy = Private(EsSearchStrategyProvider); + const DataSourceRequest = Private(DataSourceRequestProvider); + + return class EsSearchReq extends DataSourceRequest { + constructor(...args) { + super(...args); + + this.type = 'es_search'; + this.strategy = esSearchStrategy; + } + }; +}; diff --git a/src/ui/public/courier/fetch/strategy/search.js b/src/ui/public/courier/fetch_types/es_search_strategy.js similarity index 70% rename from src/ui/public/courier/fetch/strategy/search.js rename to src/ui/public/courier/fetch_types/es_search_strategy.js index 2abda799343b6..6222e9af29331 100644 --- a/src/ui/public/courier/fetch/strategy/search.js +++ b/src/ui/public/courier/fetch_types/es_search_strategy.js @@ -2,19 +2,24 @@ import _ from 'lodash'; import angular from 'angular'; import { toJson } from 'ui/utils/aggressive_parse'; +import EsAbstractStrategyProvider from './es_abstract_strategy'; export default function FetchStrategyForSearch(Private, Promise, timefilter) { + const EsAbstractStrategy = Private(EsAbstractStrategyProvider); - return { - clientMethod: 'msearch', + return new class EsSearchStrategy extends EsAbstractStrategy { + constructor() { + super(); + this.clientMethod = 'msearch'; + } /** - * Flatten a series of requests into as ES request body - * - * @param {array} requests - the requests to serialize - * @return {Promise} - a promise that is fulfilled by the request body - */ - reqsFetchParamsToBody: function (reqsFetchParams) { + * Flatten a series of requests into as ES request body + * + * @param {array} requests - the requests to serialize + * @return {Promise} - a promise that is fulfilled by the request body + */ + reqsFetchParamsToBody(reqsFetchParams) { return Promise.map(reqsFetchParams, function (fetchParams) { return Promise.resolve(fetchParams.index) .then(function (indexList) { @@ -50,14 +55,14 @@ export default function FetchStrategyForSearch(Private, Promise, timefilter) { .then(function (requests) { return requests.join('\n') + '\n'; }); - }, + } /** - * Fetch the multiple responses from the ES Response - * @param {object} resp - The response sent from Elasticsearch - * @return {array} - the list of responses - */ - getResponses: function (resp) { + * Fetch the multiple responses from the ES Response + * @param {object} resp - The response sent from Elasticsearch + * @return {array} - the list of responses + */ + getResponses(resp) { return resp.responses; } }; diff --git a/src/ui/public/courier/fetch/request/segmented.js b/src/ui/public/courier/fetch_types/es_segmented_request.js similarity index 95% rename from src/ui/public/courier/fetch/request/segmented.js rename to src/ui/public/courier/fetch_types/es_segmented_request.js index 71ae5e2fa8b8d..d8a095a8c5af7 100644 --- a/src/ui/public/courier/fetch/request/segmented.js +++ b/src/ui/public/courier/fetch_types/es_segmented_request.js @@ -3,18 +3,18 @@ import { isNumber } from 'lodash'; import Notifier from 'ui/notify/notifier'; -import SearchRequestProvider from './search'; -import SegmentedHandleProvider from './segmented_handle'; +import EsSearchRequestProvider from './es_search_request'; +import EsSegmentedRequestHandleProvider from './es_segmented_request_handle'; export default function SegmentedReqProvider(es, Private, Promise, timefilter, config) { - const SearchReq = Private(SearchRequestProvider); - const SegmentedHandle = Private(SegmentedHandleProvider); + const EsSearchReq = Private(EsSearchRequestProvider); + const EsSegmentedRequestHandle = Private(EsSegmentedRequestHandleProvider); const notify = new Notifier({ location: 'Segmented Fetch' }); - class SegmentedReq extends SearchReq { + class SegmentedReq extends EsSearchReq { constructor(source, defer, initFn) { super(source, defer); @@ -28,7 +28,7 @@ export default function SegmentedReqProvider(es, Private, Promise, timefilter, c this._direction = 'desc'; this._sortFn = null; this._queueCreated = false; - this._handle = new SegmentedHandle(this); + this._handle = new EsSegmentedRequestHandle(this); this._hitWindow = null; @@ -98,7 +98,9 @@ export default function SegmentedReqProvider(es, Private, Promise, timefilter, c } handleResponse(resp) { - return this._consumeSegment(resp); + if (!resp.error) { + return this._consumeSegment(resp); + } } filterError(resp) { diff --git a/src/ui/public/courier/fetch/request/segmented_handle.js b/src/ui/public/courier/fetch_types/es_segmented_request_handle.js similarity index 82% rename from src/ui/public/courier/fetch/request/segmented_handle.js rename to src/ui/public/courier/fetch_types/es_segmented_request_handle.js index b4dda1c70a49f..811267e835b1c 100644 --- a/src/ui/public/courier/fetch/request/segmented_handle.js +++ b/src/ui/public/courier/fetch_types/es_segmented_request_handle.js @@ -24,19 +24,19 @@ export default function CourierSegmentedReqHandle(Private) { } setDirection(...args) { - this[segmentedRequest](...args); + this[segmentedRequest].setDirection(...args); } setSize(...args) { - this[segmentedRequest](...args); + this[segmentedRequest].setSize(...args); } setMaxSegments(...args) { - this[segmentedRequest](...args); + this[segmentedRequest].setMaxSegments(...args); } setSortFn(...args) { - this[segmentedRequest](...args); + this[segmentedRequest].setSortFn(...args); } }; }; diff --git a/src/ui/public/courier/looper/doc.js b/src/ui/public/courier/looper/doc.js index a1c6104ce20aa..f6f7ac5698da3 100644 --- a/src/ui/public/courier/looper/doc.js +++ b/src/ui/public/courier/looper/doc.js @@ -1,18 +1,18 @@ import FetchProvider from '../fetch'; import LooperProvider from './_looper'; -import DocStrategyProvider from '../fetch/strategy/doc'; +import EsDocStrategyProvider from '../fetch_types/es_doc_strategy'; export default function DocLooperService(Private) { var fetch = Private(FetchProvider); var Looper = Private(LooperProvider); - var DocStrategy = Private(DocStrategyProvider); + var esDocStrategy = Private(EsDocStrategyProvider); /** * The Looper which will manage the doc fetch interval * @type {Looper} */ var docLooper = new Looper(1500, function () { - fetch.fetchQueued(DocStrategy); + fetch.fetchQueued(esDocStrategy); }); return docLooper; diff --git a/src/ui/public/courier/looper/search.js b/src/ui/public/courier/looper/search.js index cb3914c601c0e..cd324aae07988 100644 --- a/src/ui/public/courier/looper/search.js +++ b/src/ui/public/courier/looper/search.js @@ -1,11 +1,11 @@ import FetchProvider from '../fetch'; -import SearchStrategyProvider from '../fetch/strategy/search'; +import EsSearchStrategyProvider from '../fetch_types/es_search_strategy'; import RequestQueueProvider from '../_request_queue'; import LooperProvider from './_looper'; export default function SearchLooperService(Private, Promise, Notifier, $rootScope) { var fetch = Private(FetchProvider); - var searchStrategy = Private(SearchStrategyProvider); + var esSearchStrategy = Private(EsSearchStrategyProvider); var requestQueue = Private(RequestQueueProvider); var Looper = Private(LooperProvider); @@ -18,7 +18,7 @@ export default function SearchLooperService(Private, Promise, Notifier, $rootSco var searchLooper = new Looper(null, function () { $rootScope.$broadcast('courier:searchRefresh'); return fetch.these( - requestQueue.getInactive(searchStrategy) + requestQueue.getInactive(esSearchStrategy) ); });