diff --git a/src/core_plugins/elasticsearch/index.js b/src/core_plugins/elasticsearch/index.js index 46cce51c73d59..6f8c2928db56b 100644 --- a/src/core_plugins/elasticsearch/index.js +++ b/src/core_plugins/elasticsearch/index.js @@ -1,8 +1,13 @@ -import { trim, trimRight } from 'lodash'; +import { trim, trimRight, bindKey, get } from 'lodash'; import { methodNotAllowed } from 'boom'; import healthCheck from './lib/health_check'; -import exposeClient from './lib/expose_client'; +import { createDataCluster } from './lib/create_data_cluster'; +import { createAdminCluster } from './lib/create_admin_cluster'; +import { clientLogger } from './lib/client_logger'; +import { createClusters } from './lib/create_clusters'; +import filterHeaders from './lib/filter_headers'; + import createProxy, { createPath } from './lib/create_proxy'; const DEFAULT_REQUEST_HEADERS = [ 'authorization' ]; @@ -26,6 +31,7 @@ module.exports = function ({ Plugin }) { customHeaders: object().default({}), pingTimeout: number().default(ref('requestTimeout')), startupTimeout: number().default(5000), + logQueries: boolean().default(false), ssl: object({ verify: boolean().default(true), ca: array().single().items(string()), @@ -33,6 +39,29 @@ module.exports = function ({ Plugin }) { key: string() }).default(), apiVersion: Joi.string().default('master'), + healthCheck: object({ + delay: number().default(2500) + }).default(), + tribe: object({ + url: string().uri({ scheme: ['http', 'https'] }), + preserveHost: boolean().default(true), + username: string(), + password: string(), + shardTimeout: number().default(0), + requestTimeout: number().default(30000), + requestHeadersWhitelist: array().items().single().default(DEFAULT_REQUEST_HEADERS), + customHeaders: object().default({}), + pingTimeout: number().default(ref('requestTimeout')), + startupTimeout: number().default(5000), + logQueries: boolean().default(false), + ssl: object({ + verify: boolean().default(true), + ca: array().single().items(string()), + cert: string(), + key: string() + }).default(), + apiVersion: Joi.string().default('master'), + }).default() }).default(); }, @@ -42,15 +71,24 @@ module.exports = function ({ Plugin }) { esRequestTimeout: options.requestTimeout, esShardTimeout: options.shardTimeout, esApiVersion: options.apiVersion, + esDataIsTribe: get(options, 'tribe.url') ? true : false, }; } }, init(server, options) { const kibanaIndex = server.config().get('kibana.index'); + const clusters = createClusters(server); + + server.expose('getCluster', clusters.get); + server.expose('createCluster', clusters.create); + + server.expose('filterHeaders', filterHeaders); + server.expose('ElasticsearchClientLogging', clientLogger(server)); + + createDataCluster(server); + createAdminCluster(server); - // Expose the client to the server - exposeClient(server); createProxy(server, 'GET', '/{paths*}'); createProxy(server, 'POST', '/_mget'); createProxy(server, 'POST', '/{index}/_search'); @@ -69,7 +107,7 @@ module.exports = function ({ Plugin }) { function noDirectIndex({ path }, reply) { const requestPath = trimRight(trim(path), '/'); - const matchPath = createPath(kibanaIndex); + const matchPath = createPath('/elasticsearch', kibanaIndex); if (requestPath === matchPath) { return reply(methodNotAllowed('You cannot modify the primary kibana index through this interface.')); diff --git a/src/core_plugins/elasticsearch/lib/__tests__/cluster.js b/src/core_plugins/elasticsearch/lib/__tests__/cluster.js new file mode 100644 index 0000000000000..1d9c2ed0e717f --- /dev/null +++ b/src/core_plugins/elasticsearch/lib/__tests__/cluster.js @@ -0,0 +1,134 @@ +import expect from 'expect.js'; +import { Cluster } from '../cluster'; +import sinon from 'sinon'; +import { errors as esErrors } from 'elasticsearch'; +import { set, partial, cloneDeep } from 'lodash'; +import Boom from 'boom'; + +describe('plugins/elasticsearch', function () { + describe('cluster', function () { + let cluster; + const config = { + url: 'http://localhost:9200', + ssl: { verify: false }, + requestHeadersWhitelist: [ 'authorization' ] + }; + + beforeEach(() => { + cluster = new Cluster(config); + }); + + it('persists the config', () => { + expect(cluster._config).to.eql(config); + }); + + it('exposes error definitions', () => { + expect(cluster.errors).to.be(esErrors); + }); + + it('closes the clients', () => { + cluster._client.close = sinon.spy(); + cluster._noAuthClient.close = sinon.spy(); + cluster.close(); + + sinon.assert.calledOnce(cluster._client.close); + sinon.assert.calledOnce(cluster._noAuthClient.close); + }); + + it('protects the config from changes', () => { + const localRequestHeadersWhitelist = cluster.getRequestHeadersWhitelist(); + expect(localRequestHeadersWhitelist.length).to.not.equal(config.requestHeadersWhitelist); + }); + + describe('callWithInternalUser', () => { + let client; + + beforeEach(() => { + client = cluster._client = sinon.stub(); + set(client, 'nodes.info', sinon.stub().returns(Promise.resolve())); + }); + + it('should return a function', () => { + expect(cluster.callWithInternalUser).to.be.a('function'); + }); + + it('throws an error for an invalid endpoint', () => { + const fn = partial(cluster.callWithInternalUser, 'foo'); + expect(fn).to.throwException(/called with an invalid endpoint: foo/); + }); + + it('calls the client with params', () => { + const params = { foo: 'Foo' }; + cluster.callWithInternalUser('nodes.info', params); + + sinon.assert.calledOnce(client.nodes.info); + expect(client.nodes.info.getCall(0).args[0]).to.eql(params); + }); + }); + + describe('callWithRequest', () => { + let client; + + beforeEach(() => { + client = cluster._noAuthClient = sinon.stub(); + set(client, 'nodes.info', sinon.stub().returns(Promise.resolve())); + }); + + it('should return a function', () => { + expect(cluster.callWithRequest).to.be.a('function'); + }); + + it('throws an error for an invalid endpoint', () => { + const fn = partial(cluster.callWithRequest, {}, 'foo'); + expect(fn).to.throwException(/called with an invalid endpoint: foo/); + }); + + it('calls the client with params', () => { + const params = { foo: 'Foo' }; + cluster.callWithRequest({}, 'nodes.info', params); + + sinon.assert.calledOnce(client.nodes.info); + expect(client.nodes.info.getCall(0).args[0]).to.eql(params); + }); + + it('passes only whitelisted headers', () => { + const headers = { authorization: 'Basic TEST' }; + const request = { headers: Object.assign({}, headers, { foo: 'Foo' }) }; + + cluster.callWithRequest(request, 'nodes.info'); + + sinon.assert.calledOnce(client.nodes.info); + expect(client.nodes.info.getCall(0).args[0]).to.eql({ + headers: headers + }); + }); + + describe('wrap401Errors', () => { + let handler; + const error = new Error('Authentication required'); + error.statusCode = 401; + + beforeEach(() => { + handler = sinon.stub(); + }); + + it('ensures WWW-Authenticate header', async () => { + set(client, 'mock.401', sinon.stub().returns(Promise.reject(error))); + await cluster.callWithRequest({}, 'mock.401', {}, { wrap401Errors: true }).catch(handler); + + sinon.assert.calledOnce(handler); + expect(handler.getCall(0).args[0].output.headers['WWW-Authenticate']).to.eql('Basic realm="Authorization Required"'); + }); + + it('persists WWW-Authenticate header', async () => { + set(error, 'body.error.header[WWW-Authenticate]', 'Basic realm="Test"'); + set(client, 'mock.401', sinon.stub().returns(Promise.reject(error))); + await cluster.callWithRequest({}, 'mock.401', {}, { wrap401Errors: true }).catch(handler); + + sinon.assert.calledOnce(handler); + expect(handler.getCall(0).args[0].output.headers['WWW-Authenticate']).to.eql('Basic realm="Test"'); + }); + }); + }); + }); +}); diff --git a/src/core_plugins/elasticsearch/lib/__tests__/create_admin_cluster.js b/src/core_plugins/elasticsearch/lib/__tests__/create_admin_cluster.js new file mode 100644 index 0000000000000..c1798ef315f2d --- /dev/null +++ b/src/core_plugins/elasticsearch/lib/__tests__/create_admin_cluster.js @@ -0,0 +1,66 @@ +import expect from 'expect.js'; +import sinon from 'sinon'; +import { bindKey, set, get, partial } from 'lodash'; +import { createAdminCluster } from '../create_admin_cluster'; + +describe('plugins/elasticsearch', function () { + describe('create_admin_cluster', function () { + let cluster; + let server; + + beforeEach(() => { + const config = { + elasticsearch: { + url: 'http://localhost:9200', + logQueries: true + } + }; + + server = sinon.spy(); + + cluster = { + close: sinon.spy() + }; + + set(server, 'plugins.elasticsearch.createCluster', sinon.mock().returns(cluster)); + set(server, 'on', sinon.spy()); + + server.config = () => { + return { get: partial(get, config) }; + }; + + createAdminCluster(server); + }); + + it('creates the cluster', () => { + const { createCluster } = server.plugins.elasticsearch; + + sinon.assert.calledOnce(createCluster); + expect(createCluster.getCall(0).args[0]).to.eql('admin'); + expect(createCluster.getCall(0).args[1].url).to.eql('http://localhost:9200'); + }); + + it('sets client logger for cluster options', () => { + const { createCluster } = server.plugins.elasticsearch; + const firstCall = createCluster.getCall(0); + const Log = firstCall.args[1].log; + const logger = new Log; + + sinon.assert.calledOnce(createCluster); + expect(firstCall.args[0]).to.eql('admin'); + expect(firstCall.args[1].url).to.eql('http://localhost:9200'); + expect(logger.tags).to.eql(['admin']); + expect(logger.logQueries).to.eql(true); + }); + + it('close cluster of server close', () => { + const clusterClose = server.on.getCall(0).args[1]; + + clusterClose(); + + sinon.assert.calledOnce(cluster.close); + sinon.assert.calledOnce(server.on); + expect(server.on.getCall(0).args[0]).to.eql('close'); + }); + }); +}); diff --git a/src/core_plugins/elasticsearch/lib/__tests__/create_clusters.js b/src/core_plugins/elasticsearch/lib/__tests__/create_clusters.js new file mode 100644 index 0000000000000..bf0e7e3d245ca --- /dev/null +++ b/src/core_plugins/elasticsearch/lib/__tests__/create_clusters.js @@ -0,0 +1,50 @@ +import expect from 'expect.js'; +import { createClusters } from '../create_clusters'; +import { Cluster } from '../cluster'; +import sinon from 'sinon'; +import { partial } from 'lodash'; + +describe('plugins/elasticsearch', function () { + describe('createClusters', function () { + let clusters; + let server; + + beforeEach(() => { + server = { + plugins: { + elasticsearch: {} + }, + expose: sinon.mock() + }; + + clusters = createClusters(server); + }); + + describe('createCluster', () => { + let cluster; + const config = { + url: 'http://localhost:9200', + ssl: { + verify: false + } + }; + + beforeEach(() => { + cluster = clusters.create('admin', config); + }); + + it('returns a cluster', () => { + expect(cluster).to.be.a(Cluster); + }); + + it('persists the cluster', () => { + expect(clusters.get('admin')).to.be.a(Cluster); + }); + + it('throws if cluster already exists', () => { + const fn = partial(clusters.create, 'admin', config); + expect(fn).to.throwException(/cluster \'admin\' already exists/); + }); + }); + }); +}); diff --git a/src/core_plugins/elasticsearch/lib/__tests__/create_data_cluster.js b/src/core_plugins/elasticsearch/lib/__tests__/create_data_cluster.js new file mode 100644 index 0000000000000..fc919060fb0b2 --- /dev/null +++ b/src/core_plugins/elasticsearch/lib/__tests__/create_data_cluster.js @@ -0,0 +1,85 @@ +import expect from 'expect.js'; +import sinon from 'sinon'; +import { bindKey, set, get, partial } from 'lodash'; +import { createDataCluster } from '../create_data_cluster'; + +describe('plugins/elasticsearch', function () { + describe('create_data_cluster', function () { + let cluster; + let server; + let config; + + beforeEach(() => { + config = { + elasticsearch: { + url: 'http://localhost:9200', + logQueries: true + } + }; + + server = sinon.spy(); + + cluster = { + close: sinon.spy() + }; + + set(server, 'plugins.elasticsearch.createCluster', sinon.mock().returns(cluster)); + set(server, 'on', sinon.spy()); + + server.config = () => { + return { get: partial(get, config) }; + }; + }); + + it('creates the cluster with elasticsearch config', () => { + createDataCluster(server); + + const { createCluster } = server.plugins.elasticsearch; + + sinon.assert.calledOnce(createCluster); + expect(createCluster.getCall(0).args[0]).to.eql('data'); + expect(createCluster.getCall(0).args[1].url).to.eql('http://localhost:9200'); + }); + + it('creates the cluster with elasticsearch.tribe config', () => { + config.elasticsearch.tribe = { + url: 'http://localhost:9201' + }; + + createDataCluster(server); + + const { createCluster } = server.plugins.elasticsearch; + + sinon.assert.calledOnce(createCluster); + expect(createCluster.getCall(0).args[0]).to.eql('data'); + expect(createCluster.getCall(0).args[1].url).to.eql('http://localhost:9201'); + }); + + it('sets client logger for cluster options', () => { + createDataCluster(server); + + const { createCluster } = server.plugins.elasticsearch; + const firstCall = createCluster.getCall(0); + const Log = firstCall.args[1].log; + const logger = new Log; + + sinon.assert.calledOnce(createCluster); + expect(firstCall.args[0]).to.eql('data'); + expect(firstCall.args[1].url).to.eql('http://localhost:9200'); + expect(logger.tags).to.eql(['data']); + expect(logger.logQueries).to.eql(true); + }); + + it('close cluster of server close', () => { + createDataCluster(server); + + const clusterClose = server.on.getCall(0).args[1]; + + clusterClose(); + + sinon.assert.calledOnce(cluster.close); + sinon.assert.calledOnce(server.on); + expect(server.on.getCall(0).args[0]).to.eql('close'); + }); + }); +}); diff --git a/src/core_plugins/elasticsearch/lib/__tests__/create_kibana_index.js b/src/core_plugins/elasticsearch/lib/__tests__/create_kibana_index.js index a6796ff7048ab..40a3a27f52289 100644 --- a/src/core_plugins/elasticsearch/lib/__tests__/create_kibana_index.js +++ b/src/core_plugins/elasticsearch/lib/__tests__/create_kibana_index.js @@ -10,39 +10,45 @@ describe('plugins/elasticsearch', function () { describe('lib/create_kibana_index', function () { let server; - let client; + let callWithInternalUser; + let cluster; + beforeEach(function () { server = {}; - client = {}; + let config = { kibana: { index: '.my-kibana' } }; const get = sinon.stub(); + get.returns(config); get.withArgs('kibana.index').returns(config.kibana.index); config = function () { return { get: get }; }; - _.set(client, 'indices.create', sinon.stub()); - _.set(client, 'cluster.health', sinon.stub()); - _.set(server, 'plugins.elasticsearch.client', client); + + _.set(server, 'plugins.elasticsearch', {}); _.set(server, 'config', config); + + callWithInternalUser = sinon.stub(); + cluster = { callWithInternalUser: callWithInternalUser }; + + server.plugins.elasticsearch.getCluster = sinon.stub().withArgs('admin').returns(cluster); }); describe('successful requests', function () { - beforeEach(function () { - client.indices.create.returns(Promise.resolve()); - client.cluster.health.returns(Promise.resolve()); + callWithInternalUser.withArgs('indices.create', sinon.match.any).returns(Promise.resolve()); + callWithInternalUser.withArgs('cluster.health', sinon.match.any).returns(Promise.resolve()); }); it('should check cluster.health upon successful index creation', function () { const fn = createKibanaIndex(server); return fn.then(function () { - sinon.assert.calledOnce(client.cluster.health); + sinon.assert.calledOnce(callWithInternalUser.withArgs('cluster.health', sinon.match.any)); }); }); it('should be created with mappings for config.buildNum', function () { const fn = createKibanaIndex(server); return fn.then(function () { - const params = client.indices.create.args[0][0]; + const params = callWithInternalUser.args[0][1]; expect(params) .to.have.property('body'); expect(params.body) @@ -63,7 +69,7 @@ describe('plugins/elasticsearch', function () { it('should be created with 1 shard and default replica', function () { const fn = createKibanaIndex(server); return fn.then(function () { - const params = client.indices.create.args[0][0]; + const params = callWithInternalUser.args[0][1]; expect(params) .to.have.property('body'); expect(params.body) @@ -78,19 +84,17 @@ describe('plugins/elasticsearch', function () { it('should be created with index name set in the config', function () { const fn = createKibanaIndex(server); return fn.then(function () { - const params = client.indices.create.args[0][0]; + const params = callWithInternalUser.args[0][1]; expect(params) .to.have.property('index', '.my-kibana'); }); }); - - }); describe('failure requests', function () { it('should reject with a SetupError', function () { const error = new Error('Oops!'); - client.indices.create.returns(Promise.reject(error)); + callWithInternalUser.withArgs('indices.create', sinon.match.any).returns(Promise.reject(error)); const fn = createKibanaIndex(server); return fn.catch(function (err) { expect(err).to.be.a(SetupError); @@ -99,7 +103,7 @@ describe('plugins/elasticsearch', function () { it('should reject with an error if index creation fails', function () { const error = new Error('Oops!'); - client.indices.create.returns(Promise.reject(error)); + callWithInternalUser.withArgs('indices.create', sinon.match.any).returns(Promise.reject(error)); const fn = createKibanaIndex(server); return fn.catch(function (err) { expect(err.message).to.be('Unable to create Kibana index ".my-kibana"'); @@ -107,18 +111,14 @@ describe('plugins/elasticsearch', function () { }); }); - it('should reject with an error if health check fails', function () { - const error = new Error('Oops!'); - client.indices.create.returns(Promise.resolve()); - client.cluster.health.returns(Promise.reject(error)); + callWithInternalUser.withArgs('indices.create', sinon.match.any).returns(Promise.resolve()); + callWithInternalUser.withArgs('cluster.health', sinon.match.any).returns(Promise.reject(new Error())); const fn = createKibanaIndex(server); return fn.catch(function (err) { expect(err.message).to.be('Waiting for Kibana index ".my-kibana" to come online failed.'); - expect(err).to.have.property('origError', error); }); }); }); - }); }); diff --git a/src/core_plugins/elasticsearch/lib/__tests__/create_proxy.js b/src/core_plugins/elasticsearch/lib/__tests__/create_proxy.js index b54621608d2e7..f09dae25fc99d 100644 --- a/src/core_plugins/elasticsearch/lib/__tests__/create_proxy.js +++ b/src/core_plugins/elasticsearch/lib/__tests__/create_proxy.js @@ -3,20 +3,21 @@ import createProxy from '../create_proxy'; describe('plugins/elasticsearch', function () { describe('lib/create_proxy', function () { - describe('#createPath', function () { it('prepends /elasticsearch to route', function () { - const path = createProxy.createPath('/wat'); - expect(path).to.equal('/elasticsearch/wat'); + const path = createProxy.createPath('/foobar', '/wat'); + expect(path).to.equal('/foobar/wat'); }); - context('when arg does not start with a slash', function () { - it('adds slash anyway', function () { - const path = createProxy.createPath('wat'); - expect(path).to.equal('/elasticsearch/wat'); - }); + it('ensures leading slash for prefix', function () { + const path = createProxy.createPath('foobar', '/wat'); + expect(path).to.equal('/foobar/wat'); }); - }); + it('ensures leading slash for path', function () { + const path = createProxy.createPath('/foobar', 'wat'); + expect(path).to.equal('/foobar/wat'); + }); + }); }); }); diff --git a/src/core_plugins/elasticsearch/lib/__tests__/check_es_version.js b/src/core_plugins/elasticsearch/lib/__tests__/ensure_es_version.js similarity index 74% rename from src/core_plugins/elasticsearch/lib/__tests__/check_es_version.js rename to src/core_plugins/elasticsearch/lib/__tests__/ensure_es_version.js index 4bf2c6861705d..b902c5b57ee60 100644 --- a/src/core_plugins/elasticsearch/lib/__tests__/check_es_version.js +++ b/src/core_plugins/elasticsearch/lib/__tests__/ensure_es_version.js @@ -6,14 +6,15 @@ import url from 'url'; import SetupError from '../setup_error'; import serverConfig from '../../../../../test/server_config'; -import checkEsVersion from '../check_es_version'; +import { ensureEsVersion } from '../ensure_es_version'; describe('plugins/elasticsearch', () => { - describe('lib/check_es_version', () => { + describe('lib/ensure_es_version', () => { const KIBANA_VERSION = '5.1.0'; let server; let plugin; + let callWithInternalUser; beforeEach(function () { server = { @@ -24,9 +25,7 @@ describe('plugins/elasticsearch', () => { }), plugins: { elasticsearch: { - client: { - nodes: {} - }, + getCluster: sinon.stub().withArgs('admin').returns({ callWithInternalUser: sinon.stub() }), status: { red: sinon.stub() }, @@ -57,25 +56,27 @@ describe('plugins/elasticsearch', () => { nodes[name] = node; } - const client = server.plugins.elasticsearch.client; - client.nodes.info = sinon.stub().returns(Promise.resolve({ nodes: nodes })); + const cluster = server.plugins.elasticsearch.getCluster('admin'); + cluster.callWithInternalUser.withArgs('nodes.info', sinon.match.any).returns(Promise.resolve({ nodes: nodes })); + callWithInternalUser = cluster.callWithInternalUser; } function setNodeWithoutHTTP(version) { const nodes = { 'node-without-http': { version, ip: 'ip' } }; - const client = server.plugins.elasticsearch.client; - client.nodes.info = sinon.stub().returns(Promise.resolve({ nodes: nodes })); + const cluster = server.plugins.elasticsearch.getCluster('admin'); + cluster.callWithInternalUser.withArgs('nodes.info', sinon.match.any).returns(Promise.resolve({ nodes: nodes })); + callWithInternalUser = cluster.callWithInternalUser; } it('returns true with single a node that matches', async () => { setNodes('5.1.0'); - const result = await checkEsVersion(server, KIBANA_VERSION); + const result = await ensureEsVersion(server, KIBANA_VERSION); expect(result).to.be(true); }); it('returns true with multiple nodes that satisfy', async () => { setNodes('5.1.0', '5.2.0', '5.1.1-Beta1'); - const result = await checkEsVersion(server, KIBANA_VERSION); + const result = await ensureEsVersion(server, KIBANA_VERSION); expect(result).to.be(true); }); @@ -83,7 +84,7 @@ describe('plugins/elasticsearch', () => { // 5.0.0 ES is too old to work with a 5.1.0 version of Kibana. setNodes('5.1.0', '5.2.0', '5.0.0'); try { - await checkEsVersion(server, KIBANA_VERSION); + await ensureEsVersion(server, KIBANA_VERSION); } catch (e) { expect(e).to.be.a(SetupError); } @@ -96,7 +97,7 @@ describe('plugins/elasticsearch', () => { { version: '5.0.0', attributes: { client: 'true' } }, ); try { - await checkEsVersion(server, KIBANA_VERSION); + await ensureEsVersion(server, KIBANA_VERSION); } catch (e) { expect(e).to.be.a(SetupError); } @@ -104,7 +105,7 @@ describe('plugins/elasticsearch', () => { it('warns if a node is only off by a patch version', async () => { setNodes('5.1.1'); - await checkEsVersion(server, KIBANA_VERSION); + await ensureEsVersion(server, KIBANA_VERSION); sinon.assert.callCount(server.log, 2); expect(server.log.getCall(0).args[0]).to.contain('debug'); expect(server.log.getCall(1).args[0]).to.contain('warning'); @@ -112,7 +113,7 @@ describe('plugins/elasticsearch', () => { it('warns if a node is off by a patch version and without http publish address', async () => { setNodeWithoutHTTP('5.1.1'); - await checkEsVersion(server, KIBANA_VERSION); + await ensureEsVersion(server, KIBANA_VERSION); sinon.assert.callCount(server.log, 2); expect(server.log.getCall(0).args[0]).to.contain('debug'); expect(server.log.getCall(1).args[0]).to.contain('warning'); @@ -121,7 +122,7 @@ describe('plugins/elasticsearch', () => { it('errors if a node incompatible and without http publish address', async () => { setNodeWithoutHTTP('6.1.1'); try { - await checkEsVersion(server, KIBANA_VERSION); + await ensureEsVersion(server, KIBANA_VERSION); } catch (e) { expect(e.message).to.contain('incompatible nodes'); expect(e).to.be.a(Error); @@ -131,12 +132,12 @@ describe('plugins/elasticsearch', () => { it('only warns once per node list', async () => { setNodes('5.1.1'); - await checkEsVersion(server, KIBANA_VERSION); + await ensureEsVersion(server, KIBANA_VERSION); sinon.assert.callCount(server.log, 2); expect(server.log.getCall(0).args[0]).to.contain('debug'); expect(server.log.getCall(1).args[0]).to.contain('warning'); - await checkEsVersion(server, KIBANA_VERSION); + await ensureEsVersion(server, KIBANA_VERSION); sinon.assert.callCount(server.log, 3); expect(server.log.getCall(2).args[0]).to.contain('debug'); }); @@ -144,13 +145,13 @@ describe('plugins/elasticsearch', () => { it('warns again if the node list changes', async () => { setNodes('5.1.1'); - await checkEsVersion(server, KIBANA_VERSION); + await ensureEsVersion(server, KIBANA_VERSION); sinon.assert.callCount(server.log, 2); expect(server.log.getCall(0).args[0]).to.contain('debug'); expect(server.log.getCall(1).args[0]).to.contain('warning'); setNodes('5.1.2'); - await checkEsVersion(server, KIBANA_VERSION); + await ensureEsVersion(server, KIBANA_VERSION); sinon.assert.callCount(server.log, 4); expect(server.log.getCall(2).args[0]).to.contain('debug'); expect(server.log.getCall(3).args[0]).to.contain('warning'); diff --git a/src/core_plugins/elasticsearch/lib/__tests__/ensure_not_tribe.js b/src/core_plugins/elasticsearch/lib/__tests__/ensure_not_tribe.js new file mode 100644 index 0000000000000..38b5d0b865750 --- /dev/null +++ b/src/core_plugins/elasticsearch/lib/__tests__/ensure_not_tribe.js @@ -0,0 +1,48 @@ +import expect from 'expect.js'; +import { noop } from 'lodash'; +import sinon from 'sinon'; + +import { ensureNotTribe } from '../ensure_not_tribe'; + +describe('plugins/elasticsearch ensureNotTribe', () => { + const sandbox = sinon.sandbox.create(); + afterEach(() => sandbox.restore()); + + const stubcallWithInternalUser = (nodesInfoResp = { nodes: {} }) => { + return sinon.stub().withArgs( + 'nodes.info', + sinon.match.any + ).returns( + Promise.resolve(nodesInfoResp) + ); + }; + + + it('fetches the local node stats of the node that the elasticsearch client is connected to', async () => { + const callWithInternalUser = stubcallWithInternalUser(); + await ensureNotTribe(callWithInternalUser); + sinon.assert.calledOnce(callWithInternalUser); + }); + + it('throws a SetupError when the node info contains tribe settings', async () => { + const nodeInfo = { + nodes: { + __nodeId__: { + settings: { + tribe: { + t1: {}, + t2: {}, + } + } + } + } + }; + + try { + await ensureNotTribe(stubcallWithInternalUser(nodeInfo)); + throw new Error('ensureNotTribe() should have thrown'); + } catch (err) { + expect(err).to.be.a(Error); + } + }); +}); diff --git a/src/core_plugins/elasticsearch/lib/__tests__/health_check.js b/src/core_plugins/elasticsearch/lib/__tests__/health_check.js index 58382d2b68526..016b22c164733 100644 --- a/src/core_plugins/elasticsearch/lib/__tests__/health_check.js +++ b/src/core_plugins/elasticsearch/lib/__tests__/health_check.js @@ -13,10 +13,12 @@ const esPort = serverConfig.servers.elasticsearch.port; const esUrl = url.format(serverConfig.servers.elasticsearch); describe('plugins/elasticsearch', () => { - describe('lib/health_check', () => { + describe('lib/health_check', function () { + this.timeout(3000); + let health; let plugin; - let client; + let cluster; beforeEach(() => { const COMPATIBLE_VERSION_NUMBER = '5.0.0'; @@ -34,19 +36,11 @@ describe('plugins/elasticsearch', () => { } }; - // set up the elasticsearch client stub - client = { - cluster: { health: sinon.stub() }, - indices: { create: sinon.stub() }, - nodes: { info: sinon.stub() }, - ping: sinon.stub(), - create: sinon.stub(), - index: sinon.stub().returns(Promise.resolve()), - get: sinon.stub().returns(Promise.resolve({ found: false })), - search: sinon.stub().returns(Promise.resolve({ hits: { hits: [] } })), - }; - - client.nodes.info.returns(Promise.resolve({ + cluster = { callWithInternalUser: sinon.stub() }; + cluster.callWithInternalUser.withArgs('index', sinon.match.any).returns(Promise.resolve()); + cluster.callWithInternalUser.withArgs('get', sinon.match.any).returns(Promise.resolve({ found: false })); + cluster.callWithInternalUser.withArgs('search', sinon.match.any).returns(Promise.resolve({ hits: { hits: [] } })); + cluster.callWithInternalUser.withArgs('nodes.info', sinon.match.any).returns(Promise.resolve({ nodes: { 'node-01': { version: COMPATIBLE_VERSION_NUMBER, @@ -68,7 +62,11 @@ describe('plugins/elasticsearch', () => { log: sinon.stub(), info: { port: 5601 }, config: function () { return { get, set }; }, - plugins: { elasticsearch: { client } } + plugins: { + elasticsearch: { + getCluster: sinon.stub().returns(cluster) + } + } }; health = healthCheck(plugin, server); @@ -79,44 +77,59 @@ describe('plugins/elasticsearch', () => { }); it('should set the cluster green if everything is ready', function () { - client.ping.returns(Promise.resolve()); - client.cluster.health.returns(Promise.resolve({ timed_out: false, status: 'green' })); + cluster.callWithInternalUser.withArgs('ping').returns(Promise.resolve()); + cluster.callWithInternalUser.withArgs('cluster.health', sinon.match.any).returns( + Promise.resolve({ timed_out: false, status: 'green' }) + ); + return health.run() .then(function () { sinon.assert.calledOnce(plugin.status.yellow); expect(plugin.status.yellow.args[0][0]).to.be('Waiting for Elasticsearch'); - sinon.assert.calledOnce(client.ping); - sinon.assert.calledOnce(client.nodes.info); - sinon.assert.calledOnce(client.cluster.health); + + sinon.assert.calledOnce(cluster.callWithInternalUser.withArgs('ping')); + sinon.assert.calledTwice(cluster.callWithInternalUser.withArgs('nodes.info', sinon.match.any)); + sinon.assert.calledOnce(cluster.callWithInternalUser.withArgs('cluster.health', sinon.match.any)); sinon.assert.calledOnce(plugin.status.green); + expect(plugin.status.green.args[0][0]).to.be('Kibana index ready'); }); }); it('should set the cluster red if the ping fails, then to green', function () { - client.ping.onCall(0).returns(Promise.reject(new NoConnections())); - client.ping.onCall(1).returns(Promise.resolve()); - client.cluster.health.returns(Promise.resolve({ timed_out: false, status: 'green' })); + const ping = cluster.callWithInternalUser.withArgs('ping'); + ping.onCall(0).returns(Promise.reject(new NoConnections())); + ping.onCall(1).returns(Promise.resolve()); + + cluster.callWithInternalUser.withArgs('cluster.health', sinon.match.any).returns( + Promise.resolve({ timed_out: false, status: 'green' }) + ); + return health.run() .then(function () { sinon.assert.calledOnce(plugin.status.yellow); expect(plugin.status.yellow.args[0][0]).to.be('Waiting for Elasticsearch'); + sinon.assert.calledOnce(plugin.status.red); expect(plugin.status.red.args[0][0]).to.be( `Unable to connect to Elasticsearch at ${esUrl}.` ); - sinon.assert.calledTwice(client.ping); - sinon.assert.calledOnce(client.nodes.info); - sinon.assert.calledOnce(client.cluster.health); + + sinon.assert.calledTwice(ping); + sinon.assert.calledTwice(cluster.callWithInternalUser.withArgs('nodes.info', sinon.match.any)); + sinon.assert.calledOnce(cluster.callWithInternalUser.withArgs('cluster.health', sinon.match.any)); sinon.assert.calledOnce(plugin.status.green); expect(plugin.status.green.args[0][0]).to.be('Kibana index ready'); }); }); it('should set the cluster red if the health check status is red, then to green', function () { - client.ping.returns(Promise.resolve()); - client.cluster.health.onCall(0).returns(Promise.resolve({ timed_out: false, status: 'red' })); - client.cluster.health.onCall(1).returns(Promise.resolve({ timed_out: false, status: 'green' })); + cluster.callWithInternalUser.withArgs('ping').returns(Promise.resolve()); + + const clusterHealth = cluster.callWithInternalUser.withArgs('cluster.health', sinon.match.any); + clusterHealth.onCall(0).returns(Promise.resolve({ timed_out: false, status: 'red' })); + clusterHealth.onCall(1).returns(Promise.resolve({ timed_out: false, status: 'green' })); + return health.run() .then(function () { sinon.assert.calledOnce(plugin.status.yellow); @@ -125,39 +138,45 @@ describe('plugins/elasticsearch', () => { expect(plugin.status.red.args[0][0]).to.be( 'Elasticsearch is still initializing the kibana index.' ); - sinon.assert.calledOnce(client.ping); - sinon.assert.calledOnce(client.nodes.info); - sinon.assert.calledTwice(client.cluster.health); + sinon.assert.calledOnce(cluster.callWithInternalUser.withArgs('ping')); + sinon.assert.calledTwice(cluster.callWithInternalUser.withArgs('nodes.info', sinon.match.any)); + sinon.assert.calledTwice(cluster.callWithInternalUser.withArgs('cluster.health', sinon.match.any)); sinon.assert.calledOnce(plugin.status.green); expect(plugin.status.green.args[0][0]).to.be('Kibana index ready'); }); }); it('should set the cluster yellow if the health check timed_out and create index', function () { - client.ping.returns(Promise.resolve()); - client.cluster.health.onCall(0).returns(Promise.resolve({ timed_out: true, status: 'red' })); - client.cluster.health.onCall(1).returns(Promise.resolve({ timed_out: false, status: 'green' })); - client.indices.create.returns(Promise.resolve()); + cluster.callWithInternalUser.withArgs('ping').returns(Promise.resolve()); + + const clusterHealth = cluster.callWithInternalUser.withArgs('cluster.health', sinon.match.any); + clusterHealth.onCall(0).returns(Promise.resolve({ timed_out: true, status: 'red' })); + clusterHealth.onCall(1).returns(Promise.resolve({ timed_out: false, status: 'green' })); + + cluster.callWithInternalUser.withArgs('indices.create', sinon.match.any).returns(Promise.resolve()); + return health.run() .then(function () { sinon.assert.calledTwice(plugin.status.yellow); expect(plugin.status.yellow.args[0][0]).to.be('Waiting for Elasticsearch'); expect(plugin.status.yellow.args[1][0]).to.be('No existing Kibana index found'); - sinon.assert.calledOnce(client.ping); - sinon.assert.calledOnce(client.indices.create); - sinon.assert.calledOnce(client.nodes.info); - sinon.assert.calledTwice(client.cluster.health); + + sinon.assert.calledOnce(cluster.callWithInternalUser.withArgs('ping')); + sinon.assert.calledOnce(cluster.callWithInternalUser.withArgs('indices.create', sinon.match.any)); + sinon.assert.calledTwice(cluster.callWithInternalUser.withArgs('nodes.info', sinon.match.any)); + sinon.assert.calledTwice(clusterHealth); }); }); describe('#waitUntilReady', function () { it('polls health until index is ready', function () { - client.cluster.health.onCall(0).returns(Promise.resolve({ timed_out: true })); // no index - client.cluster.health.onCall(1).returns(Promise.resolve({ status: 'red' })); // initializing - client.cluster.health.onCall(2).returns(Promise.resolve({ status: 'green' })); // ready + const clusterHealth = cluster.callWithInternalUser.withArgs('cluster.health', sinon.match.any); + clusterHealth.onCall(0).returns(Promise.resolve({ timed_out: true })); + clusterHealth.onCall(1).returns(Promise.resolve({ status: 'red' })); + clusterHealth.onCall(2).returns(Promise.resolve({ status: 'green' })); return health.waitUntilReady().then(function () { - sinon.assert.calledThrice(client.cluster.health); + sinon.assert.calledThrice(clusterHealth); }); }); }); diff --git a/src/core_plugins/elasticsearch/lib/__tests__/map_uri.js b/src/core_plugins/elasticsearch/lib/__tests__/map_uri.js index 0859ae8640b3f..1aad55a799bbb 100644 --- a/src/core_plugins/elasticsearch/lib/__tests__/map_uri.js +++ b/src/core_plugins/elasticsearch/lib/__tests__/map_uri.js @@ -2,20 +2,25 @@ import expect from 'expect.js'; import mapUri from '../map_uri'; import { get, defaults } from 'lodash'; import sinon from 'sinon'; +import url from 'url'; describe('plugins/elasticsearch', function () { describe('lib/map_uri', function () { let request; - function stubServer(settings) { - const values = defaults(settings || {}, { - 'elasticsearch.url': 'http://localhost:9200', - 'elasticsearch.requestHeadersWhitelist': ['authorization'], - 'elasticsearch.customHeaders': {} + function stubCluster(settings) { + settings = defaults(settings || {}, { + url: 'http://localhost:9200', + requestHeadersWhitelist: ['authorization'], + customHeaders: {} }); - const config = { get: (key, def) => get(values, key, def) }; - return { config: () => config }; + + return { + getUrl: () => settings.url, + getCustomHeaders: () => settings.customHeaders, + getRequestHeadersWhitelist: () => settings.requestHeadersWhitelist + }; } beforeEach(function () { @@ -34,34 +39,34 @@ describe('plugins/elasticsearch', function () { }); it('sends custom headers if set', function () { - const server = stubServer({ - 'elasticsearch.customHeaders': { foo: 'bar' } - }); + const settings = { + customHeaders: { foo: 'bar' } + }; - mapUri(server)(request, function (err, upstreamUri, upstreamHeaders) { + mapUri(stubCluster(settings), '/elasticsearch')(request, function (err, upstreamUri, upstreamHeaders) { expect(err).to.be(null); expect(upstreamHeaders).to.have.property('foo', 'bar'); }); }); it('sends configured custom headers even if the same named header exists in request', function () { - const server = stubServer({ - 'elasticsearch.requestHeadersWhitelist': ['x-my-custom-header'], - 'elasticsearch.customHeaders': {'x-my-custom-header': 'asconfigured'} - }); + const settings = { + requestHeadersWhitelist: ['x-my-custom-header'], + customHeaders: { 'x-my-custom-header': 'asconfigured' } + }; - mapUri(server)(request, function (err, upstreamUri, upstreamHeaders) { + mapUri(stubCluster(settings), '/elasticsearch')(request, function (err, upstreamUri, upstreamHeaders) { expect(err).to.be(null); expect(upstreamHeaders).to.have.property('x-my-custom-header', 'asconfigured'); }); }); it('only proxies the whitelisted request headers', function () { - const server = stubServer({ - 'elasticsearch.requestHeadersWhitelist': ['x-my-custom-HEADER', 'Authorization'], - }); + const settings = { + requestHeadersWhitelist: ['x-my-custom-HEADER', 'Authorization'], + }; - mapUri(server)(request, function (err, upstreamUri, upstreamHeaders) { + mapUri(stubCluster(settings), '/elasticsearch')(request, function (err, upstreamUri, upstreamHeaders) { expect(err).to.be(null); expect(upstreamHeaders).to.have.property('authorization'); expect(upstreamHeaders).to.have.property('x-my-custom-header'); @@ -70,24 +75,24 @@ describe('plugins/elasticsearch', function () { }); it('proxies no headers if whitelist is set to []', function () { - const server = stubServer({ - 'elasticsearch.requestHeadersWhitelist': [], - }); + const settings = { + requestHeadersWhitelist: [], + }; - mapUri(server)(request, function (err, upstreamUri, upstreamHeaders) { + mapUri(stubCluster(settings), '/elasticsearch')(request, function (err, upstreamUri, upstreamHeaders) { expect(err).to.be(null); expect(Object.keys(upstreamHeaders).length).to.be(0); }); }); it('proxies no headers if whitelist is set to no value', function () { - const server = stubServer({ + const settings = { // joi converts `elasticsearch.requestHeadersWhitelist: null` into // an array with a null inside because of the `array().single()` rule. - 'elasticsearch.requestHeadersWhitelist': [ null ], - }); + requestHeadersWhitelist: [ null ], + }; - mapUri(server)(request, function (err, upstreamUri, upstreamHeaders) { + mapUri(stubCluster(settings), '/elasticsearch')(request, function (err, upstreamUri, upstreamHeaders) { expect(err).to.be(null); expect(Object.keys(upstreamHeaders).length).to.be(0); }); @@ -95,9 +100,8 @@ describe('plugins/elasticsearch', function () { it('strips the /elasticsearch prefix from the path', () => { request.path = '/elasticsearch/es/path'; - const server = stubServer(); - mapUri(server)(request, function (err, upstreamUri, upstreamHeaders) { + mapUri(stubCluster(), '/elasticsearch')(request, function (err, upstreamUri, upstreamHeaders) { expect(err).to.be(null); expect(upstreamUri).to.be('http://localhost:9200/es/path'); }); @@ -105,9 +109,9 @@ describe('plugins/elasticsearch', function () { it('extends the es.url path', function () { request.path = '/elasticsearch/index/type'; - const server = stubServer({ 'elasticsearch.url': 'https://localhost:9200/base-path' }); + const settings = { url: 'https://localhost:9200/base-path' }; - mapUri(server)(request, function (err, upstreamUri, upstreamHeaders) { + mapUri(stubCluster(settings), '/elasticsearch')(request, function (err, upstreamUri, upstreamHeaders) { expect(err).to.be(null); expect(upstreamUri).to.be('https://localhost:9200/base-path/index/type'); }); @@ -116,9 +120,9 @@ describe('plugins/elasticsearch', function () { it('extends the es.url query string', function () { request.path = '/elasticsearch/*'; request.query = { foo: 'bar' }; - const server = stubServer({ 'elasticsearch.url': 'https://localhost:9200/?base=query' }); + const settings = { url: 'https://localhost:9200/?base=query' }; - mapUri(server)(request, function (err, upstreamUri, upstreamHeaders) { + mapUri(stubCluster(settings), '/elasticsearch')(request, function (err, upstreamUri, upstreamHeaders) { expect(err).to.be(null); expect(upstreamUri).to.be('https://localhost:9200/*?foo=bar&base=query'); }); @@ -127,9 +131,8 @@ describe('plugins/elasticsearch', function () { it('filters the _ querystring param', function () { request.path = '/elasticsearch/*'; request.query = { _: Date.now() }; - const server = stubServer(); - mapUri(server)(request, function (err, upstreamUri, upstreamHeaders) { + mapUri(stubCluster(), '/elasticsearch')(request, function (err, upstreamUri, upstreamHeaders) { expect(err).to.be(null); expect(upstreamUri).to.be('http://localhost:9200/*'); }); diff --git a/src/core_plugins/elasticsearch/lib/__tests__/upgrade_config.js b/src/core_plugins/elasticsearch/lib/__tests__/upgrade_config.js index 96e744c11ac10..3648b015f03ca 100644 --- a/src/core_plugins/elasticsearch/lib/__tests__/upgrade_config.js +++ b/src/core_plugins/elasticsearch/lib/__tests__/upgrade_config.js @@ -9,7 +9,7 @@ describe('plugins/elasticsearch', function () { describe('lib/upgrade_config', function () { let get; let server; - let client; + let callWithInternalUser; let config; let upgrade; @@ -18,7 +18,9 @@ describe('plugins/elasticsearch', function () { get.withArgs('kibana.index').returns('.my-kibana'); get.withArgs('pkg.version').returns('4.0.1'); get.withArgs('pkg.buildNum').returns(Math.random()); - client = { create: sinon.stub() }; + + callWithInternalUser = sinon.stub(); + server = { log: sinon.stub(), config: function () { @@ -26,7 +28,13 @@ describe('plugins/elasticsearch', function () { get: get }; }, - plugins: { elasticsearch: { client: client } } + plugins: { + elasticsearch: { + getCluster: sinon.stub().withArgs('admin').returns({ + callWithInternalUser: callWithInternalUser + }) + } + } }; upgrade = upgradeConfig(server); }); @@ -35,7 +43,7 @@ describe('plugins/elasticsearch', function () { const response = { hits: { hits:[] } }; beforeEach(function () { - client.create.returns(Promise.resolve()); + callWithInternalUser.withArgs('create', sinon.match.any).returns(Promise.resolve()); }); describe('production', function () { @@ -47,15 +55,15 @@ describe('plugins/elasticsearch', function () { it('should resolve buildNum to pkg.buildNum config', function () { return upgrade(response).then(function (resp) { - sinon.assert.calledOnce(client.create); - const params = client.create.args[0][0]; + sinon.assert.calledOnce(callWithInternalUser); + const params = callWithInternalUser.args[0][1]; expect(params.body).to.have.property('buildNum', get('pkg.buildNum')); }); }); it('should resolve version to pkg.version config', function () { return upgrade(response).then(function (resp) { - const params = client.create.args[0][0]; + const params = callWithInternalUser.args[0][1]; expect(params).to.have.property('id', get('pkg.version')); }); }); @@ -70,14 +78,14 @@ describe('plugins/elasticsearch', function () { it('should resolve buildNum to pkg.buildNum config', function () { return upgrade(response).then(function (resp) { - const params = client.create.args[0][0]; + const params = callWithInternalUser.args[0][1]; expect(params.body).to.have.property('buildNum', get('pkg.buildNum')); }); }); it('should resolve version to pkg.version config', function () { return upgrade(response).then(function (resp) { - const params = client.create.args[0][0]; + const params = callWithInternalUser.args[0][1]; expect(params).to.have.property('id', get('pkg.version')); }); }); @@ -93,11 +101,12 @@ describe('plugins/elasticsearch', function () { it('should create new config if the nothing is upgradeable', function () { get.withArgs('pkg.buildNum').returns(9833); - client.create.returns(Promise.resolve()); + callWithInternalUser.withArgs('create', sinon.match.any).returns(Promise.resolve()); + const response = { hits: { hits: [ { _id: '4.0.1-alpha3' }, { _id: '4.0.1-beta1' }, { _id: '4.0.0-SNAPSHOT1' } ] } }; return upgrade(response).then(function (resp) { - sinon.assert.calledOnce(client.create); - const params = client.create.args[0][0]; + sinon.assert.calledOnce(callWithInternalUser); + const params = callWithInternalUser.args[0][1]; expect(params).to.have.property('body'); expect(params.body).to.have.property('buildNum', 9833); expect(params).to.have.property('index', '.my-kibana'); @@ -108,11 +117,13 @@ describe('plugins/elasticsearch', function () { it('should update the build number on the new config', function () { get.withArgs('pkg.buildNum').returns(5801); - client.create.returns(Promise.resolve()); + callWithInternalUser.withArgs('create', sinon.match.any).returns(Promise.resolve()); + const response = { hits: { hits: [ { _id: '4.0.0', _source: { buildNum: 1 } } ] } }; + return upgrade(response).then(function (resp) { - sinon.assert.calledOnce(client.create); - const params = client.create.args[0][0]; + sinon.assert.calledOnce(callWithInternalUser); + const params = callWithInternalUser.args[0][1]; expect(params).to.have.property('body'); expect(params.body).to.have.property('buildNum', 5801); expect(params).to.have.property('index', '.my-kibana'); @@ -123,8 +134,10 @@ describe('plugins/elasticsearch', function () { it('should log a message for upgrades', function () { get.withArgs('pkg.buildNum').returns(5801); - client.create.returns(Promise.resolve()); + callWithInternalUser.withArgs('create', sinon.match.any).returns(Promise.resolve()); + const response = { hits: { hits: [ { _id: '4.0.0', _source: { buildNum: 1 } } ] } }; + return upgrade(response).then(function (resp) { sinon.assert.calledOnce(server.log); expect(server.log.args[0][0]).to.eql(['plugin', 'elasticsearch']); @@ -137,11 +150,13 @@ describe('plugins/elasticsearch', function () { it('should copy attributes from old config', function () { get.withArgs('pkg.buildNum').returns(5801); - client.create.returns(Promise.resolve()); + callWithInternalUser.withArgs('create', sinon.match.any).returns(Promise.resolve()); + const response = { hits: { hits: [ { _id: '4.0.0', _source: { buildNum: 1, defaultIndex: 'logstash-*' } } ] } }; + return upgrade(response).then(function (resp) { - sinon.assert.calledOnce(client.create); - const params = client.create.args[0][0]; + sinon.assert.calledOnce(callWithInternalUser); + const params = callWithInternalUser.args[0][1]; expect(params).to.have.property('body'); expect(params.body).to.have.property('defaultIndex', 'logstash-*'); }); diff --git a/src/core_plugins/elasticsearch/lib/call_with_request.js b/src/core_plugins/elasticsearch/lib/call_with_request.js deleted file mode 100644 index ed53ee36ca372..0000000000000 --- a/src/core_plugins/elasticsearch/lib/call_with_request.js +++ /dev/null @@ -1,31 +0,0 @@ -import _ from 'lodash'; -import Promise from 'bluebird'; -import Boom from 'boom'; -import toPath from 'lodash/internal/toPath'; -import filterHeaders from './filter_headers'; - -module.exports = (server, client) => { - return (req, endpoint, clientParams = {}, options = {}) => { - const wrap401Errors = options.wrap401Errors !== false; - const filteredHeaders = filterHeaders(req.headers, server.config().get('elasticsearch.requestHeadersWhitelist')); - _.set(clientParams, 'headers', filteredHeaders); - const path = toPath(endpoint); - const api = _.get(client, path); - let apiContext = _.get(client, path.slice(0, -1)); - if (_.isEmpty(apiContext)) { - apiContext = client; - } - if (!api) throw new Error(`callWithRequest called with an invalid endpoint: ${endpoint}`); - return api.call(apiContext, clientParams) - .catch((err) => { - if (!wrap401Errors || err.statusCode !== 401) { - return Promise.reject(err); - } - - const boomError = Boom.wrap(err, err.statusCode); - const wwwAuthHeader = _.get(err, 'body.error.header[WWW-Authenticate]'); - boomError.output.headers['WWW-Authenticate'] = wwwAuthHeader || 'Basic realm="Authorization Required"'; - throw boomError; - }); - }; -}; diff --git a/src/core_plugins/elasticsearch/lib/client_logger.js b/src/core_plugins/elasticsearch/lib/client_logger.js new file mode 100644 index 0000000000000..dac94311f4bfc --- /dev/null +++ b/src/core_plugins/elasticsearch/lib/client_logger.js @@ -0,0 +1,39 @@ +export function clientLogger(server) { + return class ElasticsearchClientLogging { + // additional tags to differentiate connection + tags = []; + + logQueries = false; + + error(err) { + server.log(['error', 'elasticsearch'].concat(this.tags), err); + } + + warning(message) { + server.log(['warning', 'elasticsearch'].concat(this.tags), message); + } + + trace(method, options, query, _response, statusCode) { + /* Check if query logging is enabled + * It requires Kibana to be configured with verbose logging turned on. */ + if (this.logQueries) { + const methodAndPath = `${method} ${options.path}`; + const queryDsl = query ? query.trim() : ''; + server.log(['elasticsearch', 'query', 'debug'].concat(this.tags), [ + statusCode, + methodAndPath, + queryDsl + ].join('\n')); + } + } + + + // elasticsearch-js expects the following functions to exist + + info() {} + + debug() {} + + close() {} + }; +} diff --git a/src/core_plugins/elasticsearch/lib/cluster.js b/src/core_plugins/elasticsearch/lib/cluster.js new file mode 100644 index 0000000000000..9172468e828fa --- /dev/null +++ b/src/core_plugins/elasticsearch/lib/cluster.js @@ -0,0 +1,110 @@ +import elasticsearch from 'elasticsearch'; +import { get, set, isEmpty, cloneDeep, pick } from 'lodash'; +import toPath from 'lodash/internal/toPath'; +import Boom from 'boom'; + +import filterHeaders from './filter_headers'; +import { parseConfig } from './parse_config'; + +export class Cluster { + constructor(config) { + this._config = Object.assign({}, config); + this.errors = elasticsearch.errors; + + this._client = this.createClient(); + this._noAuthClient = this.createClient({ auth: false }); + + return this; + } + + callWithRequest = (req = {}, endpoint, clientParams = {}, options = {}) => { + if (req.headers) { + const filteredHeaders = filterHeaders(req.headers, this.getRequestHeadersWhitelist()); + set(clientParams, 'headers', filteredHeaders); + } + + return callAPI(this._noAuthClient, endpoint, clientParams, options); + } + + callWithInternalUser = (endpoint, clientParams = {}, options = {}) => { + return callAPI(this._client, endpoint, clientParams, options); + } + + getRequestHeadersWhitelist = () => getClonedProperty(this._config, 'requestHeadersWhitelist'); + + getCustomHeaders = () => getClonedProperty(this._config, 'customHeaders'); + + getRequestTimeout = () => getClonedProperty(this._config, 'requestTimeout'); + + getUrl = () => getClonedProperty(this._config, 'url'); + + getSsl = () => getClonedProperty(this._config, 'ssl'); + + getClient = () => this._client; + + close() { + if (this._client) { + this._client.close(); + } + + if (this._noAuthClient) { + this._noAuthClient.close(); + } + } + + createClient = configOverrides => { + const config = Object.assign({}, this._getClientConfig(), configOverrides); + return new elasticsearch.Client(parseConfig(config)); + } + + _getClientConfig = () => { + return getClonedProperties(this._config, [ + 'url', + 'ssl', + 'username', + 'password', + 'customHeaders', + 'plugins', + 'apiVersion', + 'keepAlive', + 'pingTimeout', + 'requestTimeout', + 'log' + ]); + } +} + +function callAPI(client, endpoint, clientParams = {}, options = {}) { + const wrap401Errors = options.wrap401Errors !== false; + const clientPath = toPath(endpoint); + const api = get(client, clientPath); + + let apiContext = get(client, clientPath.slice(0, -1)); + if (isEmpty(apiContext)) { + apiContext = client; + } + + if (!api) { + throw new Error(`called with an invalid endpoint: ${endpoint}`); + } + + return api.call(apiContext, clientParams).catch((err) => { + if (!wrap401Errors || err.statusCode !== 401) { + return Promise.reject(err); + } + + const boomError = Boom.wrap(err, err.statusCode); + const wwwAuthHeader = get(err, 'body.error.header[WWW-Authenticate]'); + boomError.output.headers['WWW-Authenticate'] = wwwAuthHeader || 'Basic realm="Authorization Required"'; + + throw boomError; + }); +} + +function getClonedProperties(config, paths) { + return cloneDeep(paths ? pick(config, paths) : config); +} + +function getClonedProperty(config, path) { + return cloneDeep(path ? get(config, path) : config); +} diff --git a/src/core_plugins/elasticsearch/lib/create_admin_cluster.js b/src/core_plugins/elasticsearch/lib/create_admin_cluster.js new file mode 100644 index 0000000000000..be638a1c57de1 --- /dev/null +++ b/src/core_plugins/elasticsearch/lib/create_admin_cluster.js @@ -0,0 +1,19 @@ +import { bindKey } from 'lodash'; +import { clientLogger } from './client_logger'; + +export function createAdminCluster(server) { + const config = server.config(); + const ElasticsearchClientLogging = clientLogger(server); + + class AdminClientLogging extends ElasticsearchClientLogging { + tags = ['admin']; + logQueries = config.get('elasticsearch.logQueries'); + } + + const adminCluster = server.plugins.elasticsearch.createCluster( + 'admin', + Object.assign({ log: AdminClientLogging }, config.get('elasticsearch')) + ); + + server.on('close', bindKey(adminCluster, 'close')); +} diff --git a/src/core_plugins/elasticsearch/lib/create_agent.js b/src/core_plugins/elasticsearch/lib/create_agent.js index 9297fa637529a..ba82e41b761b0 100644 --- a/src/core_plugins/elasticsearch/lib/create_agent.js +++ b/src/core_plugins/elasticsearch/lib/create_agent.js @@ -1,32 +1,15 @@ import url from 'url'; -import _ from 'lodash'; +import { get, size } from 'lodash'; const readFile = (file) => require('fs').readFileSync(file, 'utf8'); import http from 'http'; import https from 'https'; -module.exports = _.memoize(function (server) { - const config = server.config(); - const target = url.parse(config.get('elasticsearch.url')); +import { parseConfig } from './parse_config'; - if (!/^https/.test(target.protocol)) return new http.Agent(); - - const agentOptions = { - rejectUnauthorized: config.get('elasticsearch.ssl.verify') - }; - - if (_.size(config.get('elasticsearch.ssl.ca'))) { - agentOptions.ca = config.get('elasticsearch.ssl.ca').map(readFile); - } +export default function (config) { + const target = url.parse(get(config, 'url')); - // Add client certificate and key if required by elasticsearch - if (config.get('elasticsearch.ssl.cert') && config.get('elasticsearch.ssl.key')) { - agentOptions.cert = readFile(config.get('elasticsearch.ssl.cert')); - agentOptions.key = readFile(config.get('elasticsearch.ssl.key')); - } - - return new https.Agent(agentOptions); -}); + if (!/^https/.test(target.protocol)) return new http.Agent(); -// See https://lodash.com/docs#memoize: We use a Map() instead of the default, because we want the keys in the cache -// to be the server objects, and by default these would be coerced to strings as keys (which wouldn't be useful) -module.exports.cache = new Map(); + return new https.Agent(parseConfig(config).ssl); +} diff --git a/src/core_plugins/elasticsearch/lib/create_clusters.js b/src/core_plugins/elasticsearch/lib/create_clusters.js new file mode 100644 index 0000000000000..b05ff51b3a043 --- /dev/null +++ b/src/core_plugins/elasticsearch/lib/create_clusters.js @@ -0,0 +1,25 @@ +import { Cluster } from './cluster'; +import { get, set } from 'lodash'; + +export function createClusters(server) { + const esPlugin = server.plugins.elasticsearch; + esPlugin._clusters = esPlugin._clusters || new Map(); + + return { + get(name) { + return esPlugin._clusters.get(name); + }, + + create(name, config) { + const cluster = new Cluster(config); + + if (esPlugin._clusters.has(name)) { + throw new Error(`cluster '${name}' already exists`); + } + + esPlugin._clusters.set(name, cluster); + + return cluster; + } + }; +} diff --git a/src/core_plugins/elasticsearch/lib/create_data_cluster.js b/src/core_plugins/elasticsearch/lib/create_data_cluster.js new file mode 100644 index 0000000000000..90431b111f093 --- /dev/null +++ b/src/core_plugins/elasticsearch/lib/create_data_cluster.js @@ -0,0 +1,27 @@ +import { bindKey } from 'lodash'; +import { clientLogger } from './client_logger'; + +export function createDataCluster(server) { + const config = server.config(); + const ElasticsearchClientLogging = clientLogger(server); + + class DataClientLogging extends ElasticsearchClientLogging { + tags = ['data']; + logQueries = getConfig().logQueries; + } + + function getConfig() { + if (Boolean(config.get('elasticsearch.tribe.url'))) { + return config.get('elasticsearch.tribe'); + } + + return config.get('elasticsearch'); + } + + const dataCluster = server.plugins.elasticsearch.createCluster( + 'data', + Object.assign({ log: DataClientLogging }, getConfig()) + ); + + server.on('close', bindKey(dataCluster, 'close')); +} diff --git a/src/core_plugins/elasticsearch/lib/create_kibana_index.js b/src/core_plugins/elasticsearch/lib/create_kibana_index.js index b35e13fa84156..0b9a775874a3e 100644 --- a/src/core_plugins/elasticsearch/lib/create_kibana_index.js +++ b/src/core_plugins/elasticsearch/lib/create_kibana_index.js @@ -3,7 +3,7 @@ import { format } from 'util'; import { mappings } from './kibana_index_mappings'; module.exports = function (server) { - const client = server.plugins.elasticsearch.client; + const { callWithInternalUser } = server.plugins.elasticsearch.getCluster('admin'); const index = server.config().get('kibana.index'); function handleError(message) { @@ -12,7 +12,7 @@ module.exports = function (server) { }; } - return client.indices.create({ + return callWithInternalUser('indices.create', { index: index, body: { settings: { @@ -23,7 +23,7 @@ module.exports = function (server) { }) .catch(handleError('Unable to create Kibana index "<%= kibana.index %>"')) .then(function () { - return client.cluster.health({ + return callWithInternalUser('cluster.health', { waitForStatus: 'yellow', index: index }) diff --git a/src/core_plugins/elasticsearch/lib/create_proxy.js b/src/core_plugins/elasticsearch/lib/create_proxy.js index 068712ca9c70f..0ce4a2b33bc87 100644 --- a/src/core_plugins/elasticsearch/lib/create_proxy.js +++ b/src/core_plugins/elasticsearch/lib/create_proxy.js @@ -3,48 +3,60 @@ import mapUri from './map_uri'; import { resolve } from 'url'; import { assign } from 'lodash'; -function createProxy(server, method, route, config) { - - const options = { - method: method, - path: createProxy.createPath(route), - config: { - timeout: { - socket: server.config().get('elasticsearch.requestTimeout') - } - }, - handler: { - proxy: { - mapUri: mapUri(server), - agent: createAgent(server), - xforward: true, - timeout: server.config().get('elasticsearch.requestTimeout'), - onResponse: function (err, responseFromUpstream, request, reply) { - if (err) { - reply(err); - return; - } - - if (responseFromUpstream.headers.location) { - // TODO: Workaround for #8705 until hapi has been updated to >= 15.0.0 - responseFromUpstream.headers.location = encodeURI(responseFromUpstream.headers.location); - } - - reply(null, responseFromUpstream); - } - } - }, +function createProxy(server, method, path, config) { + const proxies = new Map([ + ['/elasticsearch', server.plugins.elasticsearch.getCluster('data')], + ['/es_admin', server.plugins.elasticsearch.getCluster('admin')] + ]); + + const responseHandler = function (err, upstreamResponse, request, reply) { + if (err) { + reply(err); + return; + } + + if (upstreamResponse.headers.location) { + // TODO: Workaround for #8705 until hapi has been updated to >= 15.0.0 + upstreamResponse.headers.location = encodeURI(upstreamResponse.headers.location); + } + + reply(null, upstreamResponse); }; - assign(options.config, config); + for (const [proxyPrefix, cluster] of proxies) { + const options = { + method, + path: createProxy.createPath(proxyPrefix, path), + config: { + timeout: { + socket: cluster.getRequestTimeout() + } + }, + handler: { + proxy: { + mapUri: mapUri(cluster, proxyPrefix), + agent: createAgent({ + url: cluster.getUrl(), + ssl: cluster.getSsl() + }), + xforward: true, + timeout: cluster.getRequestTimeout(), + onResponse: responseHandler + } + }, + }; + + assign(options.config, config); - server.route(options); -}; + server.route(options); + } +} + +createProxy.createPath = function createPath(prefix, path) { + path = path[0] === '/' ? path : `/${path}`; + prefix = prefix[0] === '/' ? prefix : `/${prefix}`; -createProxy.createPath = function createPath(path) { - const pre = '/elasticsearch'; - const sep = path[0] === '/' ? '' : '/'; - return `${pre}${sep}${path}`; + return `${prefix}${path}`; }; module.exports = createProxy; diff --git a/src/core_plugins/elasticsearch/lib/check_es_version.js b/src/core_plugins/elasticsearch/lib/ensure_es_version.js similarity index 74% rename from src/core_plugins/elasticsearch/lib/check_es_version.js rename to src/core_plugins/elasticsearch/lib/ensure_es_version.js index 3f61222e12f4d..644b1829c4ed4 100644 --- a/src/core_plugins/elasticsearch/lib/check_es_version.js +++ b/src/core_plugins/elasticsearch/lib/ensure_es_version.js @@ -3,26 +3,31 @@ * that defined in Kibana's package.json. */ -import _ from 'lodash'; +import { forEach, get } from 'lodash'; import isEsCompatibleWithKibana from './is_es_compatible_with_kibana'; import SetupError from './setup_error'; /** - * tracks the node descriptions that get logged in warnings so - * that we don't spam the log with the same message over and over. + * tracks the node descriptions that get logged in warnings so + * that we don't spam the log with the same message over and over. * - * There are situations, like in testing or multi-tenancy, where - * the server argument changes, so we must track the previous - * node warnings per server + * There are situations, like in testing or multi-tenancy, where + * the server argument changes, so we must track the previous + * node warnings per server */ const lastWarnedNodesForServer = new WeakMap(); -module.exports = function checkEsVersion(server, kibanaVersion) { - server.log(['plugin', 'debug'], 'Checking Elasticsearch version'); - - const client = server.plugins.elasticsearch.client; +export function ensureEsVersion(server, kibanaVersion) { + const { callWithInternalUser } = server.plugins.elasticsearch.getCluster('admin'); - return client.nodes.info() + server.log(['plugin', 'debug'], 'Checking Elasticsearch version'); + return callWithInternalUser('nodes.info', { + filterPath: [ + 'nodes.*.version', + 'nodes.*.http.publish_address', + 'nodes.*.ip', + ] + }) .then(function (info) { // Aggregate incompatible ES nodes. const incompatibleNodes = []; @@ -30,7 +35,7 @@ module.exports = function checkEsVersion(server, kibanaVersion) { // Aggregate ES nodes which should prompt a Kibana upgrade. const warningNodes = []; - _.forEach(info.nodes, esNode => { + forEach(info.nodes, esNode => { if (!isEsCompatibleWithKibana(esNode.version, kibanaVersion)) { // Exit early to avoid collecting ES nodes with newer major versions in the `warningNodes`. return incompatibleNodes.push(esNode); @@ -45,7 +50,7 @@ module.exports = function checkEsVersion(server, kibanaVersion) { function getHumanizedNodeNames(nodes) { return nodes.map(node => { - const publishAddress = _.get(node, 'http.publish_address') ? (_.get(node, 'http.publish_address') + ' ') : ''; + const publishAddress = get(node, 'http.publish_address') ? (get(node, 'http.publish_address') + ' ') : ''; return 'v' + node.version + ' @ ' + publishAddress + '(' + node.ip + ')'; }); } @@ -54,7 +59,7 @@ module.exports = function checkEsVersion(server, kibanaVersion) { const simplifiedNodes = warningNodes.map(node => ({ version: node.version, http: { - publish_address: _.get(node, 'http.publish_address') + publish_address: get(node, 'http.publish_address') }, ip: node.ip, })); @@ -88,4 +93,4 @@ module.exports = function checkEsVersion(server, kibanaVersion) { return true; }); -}; +} diff --git a/src/core_plugins/elasticsearch/lib/ensure_not_tribe.js b/src/core_plugins/elasticsearch/lib/ensure_not_tribe.js new file mode 100644 index 0000000000000..02e69af15f42d --- /dev/null +++ b/src/core_plugins/elasticsearch/lib/ensure_not_tribe.js @@ -0,0 +1,18 @@ +import { get } from 'lodash'; + +export function ensureNotTribe(callWithInternalUser) { + return callWithInternalUser('nodes.info', { + nodeId: '_local', + filterPath: 'nodes.*.settings.tribe' + }) + .then(function (info) { + const nodeId = Object.keys(info.nodes || {})[0]; + const tribeSettings = get(info, ['nodes', nodeId, 'settings', 'tribe']); + + if (tribeSettings) { + throw new Error('Kibana does not support using tribe nodes as the primary elasticsearch connection.'); + } + + return true; + }); +} diff --git a/src/core_plugins/elasticsearch/lib/expose_client.js b/src/core_plugins/elasticsearch/lib/expose_client.js deleted file mode 100644 index a1748e54ebbeb..0000000000000 --- a/src/core_plugins/elasticsearch/lib/expose_client.js +++ /dev/null @@ -1,99 +0,0 @@ -import elasticsearch from 'elasticsearch'; -import _ from 'lodash'; -import Bluebird from 'bluebird'; -const readFile = (file) => require('fs').readFileSync(file, 'utf8'); -import util from 'util'; -import url from 'url'; -import callWithRequest from './call_with_request'; -import filterHeaders from './filter_headers'; - -module.exports = function (server) { - const config = server.config(); - - class ElasticsearchClientLogging { - error(err) { - server.log(['error', 'elasticsearch'], err); - } - warning(message) { - server.log(['warning', 'elasticsearch'], message); - } - info() {} - debug() {} - trace() {} - close() {} - } - - function createClient(options) { - options = _.defaults(options || {}, { - url: config.get('elasticsearch.url'), - username: config.get('elasticsearch.username'), - password: config.get('elasticsearch.password'), - verifySsl: config.get('elasticsearch.ssl.verify'), - clientCrt: config.get('elasticsearch.ssl.cert'), - clientKey: config.get('elasticsearch.ssl.key'), - ca: config.get('elasticsearch.ssl.ca'), - apiVersion: config.get('elasticsearch.apiVersion'), - pingTimeout: config.get('elasticsearch.pingTimeout'), - requestTimeout: config.get('elasticsearch.requestTimeout'), - keepAlive: true, - auth: true - }); - - const uri = url.parse(options.url); - - let authorization; - if (options.auth && options.username && options.password) { - uri.auth = util.format('%s:%s', options.username, options.password); - } - - const ssl = { rejectUnauthorized: options.verifySsl }; - if (options.clientCrt && options.clientKey) { - ssl.cert = readFile(options.clientCrt); - ssl.key = readFile(options.clientKey); - } - if (options.ca) { - ssl.ca = options.ca.map(readFile); - } - - const host = { - host: uri.hostname, - port: uri.port, - protocol: uri.protocol, - path: uri.pathname, - auth: uri.auth, - query: uri.query, - headers: config.get('elasticsearch.customHeaders') - }; - - return new elasticsearch.Client({ - host, - ssl, - plugins: options.plugins, - apiVersion: options.apiVersion, - keepAlive: options.keepAlive, - pingTimeout: options.pingTimeout, - requestTimeout: options.requestTimeout, - defer: function () { - return Bluebird.defer(); - }, - log: ElasticsearchClientLogging - }); - } - - const client = createClient(); - server.on('close', _.bindKey(client, 'close')); - - const noAuthClient = createClient({ auth: false }); - server.on('close', _.bindKey(noAuthClient, 'close')); - - server.expose('ElasticsearchClientLogging', ElasticsearchClientLogging); - server.expose('client', client); - server.expose('createClient', createClient); - server.expose('callWithRequestFactory', _.partial(callWithRequest, server)); - server.expose('callWithRequest', callWithRequest(server, noAuthClient)); - server.expose('filterHeaders', filterHeaders); - server.expose('errors', elasticsearch.errors); - - return client; - -}; diff --git a/src/core_plugins/elasticsearch/lib/health_check.js b/src/core_plugins/elasticsearch/lib/health_check.js index 00ab4e1686730..6b5e5f4085ea6 100644 --- a/src/core_plugins/elasticsearch/lib/health_check.js +++ b/src/core_plugins/elasticsearch/lib/health_check.js @@ -1,11 +1,11 @@ import _ from 'lodash'; import Promise from 'bluebird'; import elasticsearch from 'elasticsearch'; -import exposeClient from './expose_client'; import migrateConfig from './migrate_config'; import createKibanaIndex from './create_kibana_index'; -import checkEsVersion from './check_es_version'; import kibanaVersion from './kibana_version'; +import { ensureEsVersion } from './ensure_es_version'; +import { ensureNotTribe } from './ensure_not_tribe'; const NoConnections = elasticsearch.errors.NoConnections; import util from 'util'; @@ -15,27 +15,25 @@ const NO_INDEX = 'no_index'; const INITIALIZING = 'initializing'; const READY = 'ready'; -const REQUEST_DELAY = 2500; - module.exports = function (plugin, server) { const config = server.config(); - const client = server.plugins.elasticsearch.client; + const callAdminAsKibanaUser = server.plugins.elasticsearch.getCluster('admin').callWithInternalUser; + const callDataAsKibanaUser = server.plugins.elasticsearch.getCluster('data').callWithInternalUser; + const REQUEST_DELAY = config.get('elasticsearch.healthCheck.delay'); plugin.status.yellow('Waiting for Elasticsearch'); - - function waitForPong() { - return client.ping().catch(function (err) { + function waitForPong(callWithInternalUser, url) { + return callWithInternalUser('ping').catch(function (err) { if (!(err instanceof NoConnections)) throw err; + plugin.status.red(format('Unable to connect to Elasticsearch at %s.', url)); - plugin.status.red(format('Unable to connect to Elasticsearch at %s.', config.get('elasticsearch.url'))); - - return Promise.delay(REQUEST_DELAY).then(waitForPong); + return Promise.delay(REQUEST_DELAY).then(waitForPong.bind(null, callWithInternalUser, url)); }); } // just figure out the current "health" of the es setup function getHealth() { - return client.cluster.health({ + return callAdminAsKibanaUser('cluster.health', { timeout: '5s', // tells es to not sit around and wait forever index: config.get('kibana.index'), ignore: [408] @@ -82,7 +80,7 @@ module.exports = function (plugin, server) { } function waitForEsVersion() { - return checkEsVersion(server, kibanaVersion.get()).catch(err => { + return ensureEsVersion(server, kibanaVersion.get()).catch(err => { plugin.status.red(err); return Promise.delay(REQUEST_DELAY).then(waitForEsVersion); }); @@ -93,14 +91,26 @@ module.exports = function (plugin, server) { } function check() { - return waitForPong() - .then(waitForEsVersion) - .then(waitForShards) + const healthCheck = + waitForPong(callAdminAsKibanaUser, config.get('elasticsearch.url')) + .then(waitForEsVersion) + .then(ensureNotTribe.bind(this, callAdminAsKibanaUser)) + .then(waitForShards) + .then(_.partial(migrateConfig, server)) + .then(() => { + const tribeUrl = config.get('elasticsearch.tribe.url'); + if (tribeUrl) { + return waitForPong(callDataAsKibanaUser, tribeUrl) + .then(() => ensureEsVersion(server, kibanaVersion.get(), callDataAsKibanaUser)); + } + }); + + return healthCheck .then(setGreenStatus) - .then(_.partial(migrateConfig, server)) .catch(err => plugin.status.red(err)); } + let timeoutId = null; function scheduleCheck(ms) { diff --git a/src/core_plugins/elasticsearch/lib/map_uri.js b/src/core_plugins/elasticsearch/lib/map_uri.js index daaf9b73c9633..e0ee29e5ddf5f 100644 --- a/src/core_plugins/elasticsearch/lib/map_uri.js +++ b/src/core_plugins/elasticsearch/lib/map_uri.js @@ -3,9 +3,7 @@ import { parse as parseUrl, format as formatUrl, resolve } from 'url'; import filterHeaders from './filter_headers'; import setHeaders from './set_headers'; -export default function mapUri(server, prefix) { - const config = server.config(); - +export default function mapUri(cluster, proxyPrefix) { function joinPaths(pathA, pathB) { return trimRight(pathA, '/') + '/' + trimLeft(pathB, '/'); } @@ -19,7 +17,7 @@ export default function mapUri(server, prefix) { port: esUrlPort, pathname: esUrlBasePath, query: esUrlQuery - } = parseUrl(config.get('elasticsearch.url'), true); + } = parseUrl(cluster.getUrl(), true); // copy most url components directly from the elasticsearch.url const mappedUrlComponents = { @@ -31,17 +29,17 @@ export default function mapUri(server, prefix) { }; // pathname - const reqSubPath = request.path.replace('/elasticsearch', ''); + const reqSubPath = request.path.replace(proxyPrefix, ''); mappedUrlComponents.pathname = joinPaths(esUrlBasePath, reqSubPath); // querystring - const mappedQuery = defaults(omit(request.query, '_'), esUrlQuery || {}); + const mappedQuery = defaults(omit(request.query, '_'), esUrlQuery); if (Object.keys(mappedQuery).length) { mappedUrlComponents.query = mappedQuery; } - const filteredHeaders = filterHeaders(request.headers, config.get('elasticsearch.requestHeadersWhitelist')); - const mappedHeaders = setHeaders(filteredHeaders, config.get('elasticsearch.customHeaders')); + const filteredHeaders = filterHeaders(request.headers, cluster.getRequestHeadersWhitelist()); + const mappedHeaders = setHeaders(filteredHeaders, cluster.getCustomHeaders()); const mappedUrl = formatUrl(mappedUrlComponents); done(null, mappedUrl, mappedHeaders); }; diff --git a/src/core_plugins/elasticsearch/lib/migrate_config.js b/src/core_plugins/elasticsearch/lib/migrate_config.js index 7b16990e21297..a1d256200a51e 100644 --- a/src/core_plugins/elasticsearch/lib/migrate_config.js +++ b/src/core_plugins/elasticsearch/lib/migrate_config.js @@ -3,7 +3,8 @@ import { mappings } from './kibana_index_mappings'; module.exports = function (server) { const config = server.config(); - const client = server.plugins.elasticsearch.client; + const { callWithInternalUser } = server.plugins.elasticsearch.getCluster('admin'); + const options = { index: config.get('kibana.index'), type: 'config', @@ -20,5 +21,5 @@ module.exports = function (server) { } }; - return client.search(options).then(upgrade(server)); + return callWithInternalUser('search', options).then(upgrade(server)); }; diff --git a/src/core_plugins/elasticsearch/lib/parse_config.js b/src/core_plugins/elasticsearch/lib/parse_config.js new file mode 100644 index 0000000000000..05750535030ea --- /dev/null +++ b/src/core_plugins/elasticsearch/lib/parse_config.js @@ -0,0 +1,47 @@ +import util from 'util'; +import url from 'url'; +import { get, size, pick } from 'lodash'; +import { readFileSync } from 'fs'; +import Bluebird from 'bluebird'; + +const readFile = (file) => readFileSync(file, 'utf8'); + +export function parseConfig(serverConfig = {}) { + const config = Object.assign({ + keepAlive: true + }, pick(serverConfig, [ + 'plugins', 'apiVersion', 'keepAlive', 'pingTimeout', + 'requestTimeout', 'log', 'logQueries' + ])); + + const uri = url.parse(serverConfig.url); + config.host = { + host: uri.hostname, + port: uri.port, + protocol: uri.protocol, + path: uri.pathname, + query: uri.query, + headers: serverConfig.customHeaders + }; + + // Auth + if (serverConfig.auth !== false && serverConfig.username && serverConfig.password) { + config.host.auth = util.format('%s:%s', serverConfig.username, serverConfig.password); + } + + // SSL + config.ssl = { rejectUnauthorized: get(serverConfig, 'ssl.verify') }; + + if (get(serverConfig, 'ssl.cert') && get(serverConfig, 'ssl.key')) { + config.ssl.cert = readFile(serverConfig.ssl.cert); + config.ssl.key = readFile(serverConfig.ssl.key); + } + + if (size(get(serverConfig, 'ssl.ca'))) { + config.ssl.ca = serverConfig.ssl.ca.map(readFile); + } + + config.defer = () => Bluebird.defer(); + + return config; +} diff --git a/src/core_plugins/elasticsearch/lib/upgrade_config.js b/src/core_plugins/elasticsearch/lib/upgrade_config.js index 9050767d969ac..645e95b9852fc 100644 --- a/src/core_plugins/elasticsearch/lib/upgrade_config.js +++ b/src/core_plugins/elasticsearch/lib/upgrade_config.js @@ -6,11 +6,11 @@ import { format } from 'util'; module.exports = function (server) { const MAX_INTEGER = Math.pow(2, 53) - 1; - const client = server.plugins.elasticsearch.client; + const { callWithInternalUser } = server.plugins.elasticsearch.getCluster('admin'); const config = server.config(); function createNewConfig() { - return client.create({ + return callWithInternalUser('create', { index: config.get('kibana.index'), type: 'config', body: { buildNum: config.get('pkg.buildNum') }, @@ -31,7 +31,9 @@ module.exports = function (server) { return hit._id !== '@@version' && hit._id === config.get('pkg.version'); }); - if (devConfig) return Promise.resolve(); + if (devConfig) { + return Promise.resolve(); + } // Look for upgradeable configs. If none of them are upgradeable // then create a new one. @@ -50,7 +52,7 @@ module.exports = function (server) { newVersion: config.get('pkg.version') }); - return client.create({ + return callWithInternalUser('create', { index: config.get('kibana.index'), type: 'config', body: body._source, diff --git a/src/core_plugins/kibana/public/dashboard/services/saved_dashboards.js b/src/core_plugins/kibana/public/dashboard/services/saved_dashboards.js index e5196928140dc..ec1d3a23c9128 100644 --- a/src/core_plugins/kibana/public/dashboard/services/saved_dashboards.js +++ b/src/core_plugins/kibana/public/dashboard/services/saved_dashboards.js @@ -16,6 +16,6 @@ require('plugins/kibana/management/saved_object_registry').register({ }); // This is the only thing that gets injected into controllers -module.service('savedDashboards', function (SavedDashboard, kbnIndex, es, kbnUrl) { - return new SavedObjectLoader(SavedDashboard, kbnIndex, es, kbnUrl); +module.service('savedDashboards', function (SavedDashboard, kbnIndex, esAdmin, kbnUrl) { + return new SavedObjectLoader(SavedDashboard, kbnIndex, esAdmin, kbnUrl); }); diff --git a/src/core_plugins/kibana/public/discover/saved_searches/saved_searches.js b/src/core_plugins/kibana/public/discover/saved_searches/saved_searches.js index cf5da87e23438..57cbd1f085d94 100644 --- a/src/core_plugins/kibana/public/discover/saved_searches/saved_searches.js +++ b/src/core_plugins/kibana/public/discover/saved_searches/saved_searches.js @@ -4,7 +4,6 @@ import 'plugins/kibana/discover/saved_searches/_saved_search'; import 'ui/notify'; import uiModules from 'ui/modules'; import { SavedObjectLoader } from 'ui/courier/saved_object/saved_object_loader'; - const module = uiModules.get('discover/saved_searches', [ 'kibana/notify' ]); @@ -16,8 +15,9 @@ require('plugins/kibana/management/saved_object_registry').register({ title: 'searches' }); -module.service('savedSearches', function (Promise, config, kbnIndex, es, createNotifier, SavedSearch, kbnUrl) { - const savedSearchLoader = new SavedObjectLoader(SavedSearch, kbnIndex, es, kbnUrl); +module.service('savedSearches', function (Promise, config, kbnIndex, esAdmin, createNotifier, SavedSearch, kbnUrl) { + const savedSearchLoader = new SavedObjectLoader(SavedSearch, kbnIndex, esAdmin, kbnUrl); + // Customize loader properties since adding an 's' on type doesn't work for type 'search' . savedSearchLoader.loaderProperties = { name: 'searches', noun: 'Saved Search', @@ -26,5 +26,6 @@ module.service('savedSearches', function (Promise, config, kbnIndex, es, createN savedSearchLoader.urlFor = function (id) { return kbnUrl.eval('#/discover/{{id}}', { id: id }); }; + return savedSearchLoader; }); diff --git a/src/core_plugins/kibana/public/management/sections/indices/_refresh_kibana_index.js b/src/core_plugins/kibana/public/management/sections/indices/_refresh_kibana_index.js index e0becc475ab43..ac3528b22dbb5 100644 --- a/src/core_plugins/kibana/public/management/sections/indices/_refresh_kibana_index.js +++ b/src/core_plugins/kibana/public/management/sections/indices/_refresh_kibana_index.js @@ -1,6 +1,6 @@ -export default function RefreshKibanaIndexFn(es, kbnIndex) { +export default function RefreshKibanaIndexFn(esAdmin, kbnIndex) { return function () { - return es.indices.refresh({ + return esAdmin.indices.refresh({ index: kbnIndex }); }; diff --git a/src/core_plugins/kibana/public/management/sections/objects/_objects.js b/src/core_plugins/kibana/public/management/sections/objects/_objects.js index 7fad2e8805fee..cb6693f60df45 100644 --- a/src/core_plugins/kibana/public/management/sections/objects/_objects.js +++ b/src/core_plugins/kibana/public/management/sections/objects/_objects.js @@ -19,7 +19,7 @@ uiModules.get('apps/management') return { restrict: 'E', controllerAs: 'managementObjectsController', - controller: function ($scope, $injector, $q, AppState, es) { + controller: function ($scope, $injector, $q, AppState, esAdmin) { const notify = new Notifier({ location: 'Saved Objects' }); // TODO: Migrate all scope variables to the controller. @@ -125,7 +125,7 @@ uiModules.get('apps/management') function retrieveAndExportDocs(objs) { if (!objs.length) return notify.error('No saved objects to export.'); - es.mget({ + esAdmin.mget({ index: kbnIndex, body: {docs: objs.map(transformToMget)} }) @@ -167,7 +167,7 @@ uiModules.get('apps/management') }; function refreshIndex() { - return es.indices.refresh({ + return esAdmin.indices.refresh({ index: kbnIndex }); } diff --git a/src/core_plugins/kibana/public/management/sections/objects/_view.js b/src/core_plugins/kibana/public/management/sections/objects/_view.js index ef868fa6bfa90..72abb1a93b92a 100644 --- a/src/core_plugins/kibana/public/management/sections/objects/_view.js +++ b/src/core_plugins/kibana/public/management/sections/objects/_view.js @@ -16,7 +16,7 @@ uiModules.get('apps/management') .directive('kbnManagementObjectsView', function (kbnIndex, Notifier) { return { restrict: 'E', - controller: function ($scope, $injector, $routeParams, $location, $window, $rootScope, es, Private) { + controller: function ($scope, $injector, $routeParams, $location, $window, $rootScope, esAdmin, Private) { const notify = new Notifier({ location: 'SavedObject view' }); const castMappingType = Private(IndexPatternsCastMappingTypeProvider); const serviceObj = registry.get($routeParams.service); @@ -104,7 +104,7 @@ uiModules.get('apps/management') $scope.title = service.type; - es.get({ + esAdmin.get({ index: kbnIndex, type: service.type, id: $routeParams.id @@ -163,7 +163,7 @@ uiModules.get('apps/management') * @returns {type} description */ $scope.delete = function () { - es.delete({ + esAdmin.delete({ index: kbnIndex, type: service.type, id: $routeParams.id @@ -191,7 +191,7 @@ uiModules.get('apps/management') _.set(source, field.name, value); }); - es.index({ + esAdmin.index({ index: kbnIndex, type: service.type, id: $routeParams.id, @@ -204,7 +204,7 @@ uiModules.get('apps/management') }; function redirectHandler(action) { - return es.indices.refresh({ + return esAdmin.indices.refresh({ index: kbnIndex }) .then(function (resp) { diff --git a/src/core_plugins/kibana/public/visualize/saved_visualizations/saved_visualizations.js b/src/core_plugins/kibana/public/visualize/saved_visualizations/saved_visualizations.js index a3070c2ce5f64..35464682d86df 100644 --- a/src/core_plugins/kibana/public/visualize/saved_visualizations/saved_visualizations.js +++ b/src/core_plugins/kibana/public/visualize/saved_visualizations/saved_visualizations.js @@ -15,13 +15,13 @@ require('plugins/kibana/management/saved_object_registry').register({ title: 'visualizations' }); -app.service('savedVisualizations', function (Promise, es, kbnIndex, SavedVis, Private, Notifier, kbnUrl) { +app.service('savedVisualizations', function (Promise, esAdmin, kbnIndex, SavedVis, Private, Notifier, kbnUrl) { const visTypes = Private(RegistryVisTypesProvider); const notify = new Notifier({ location: 'Saved Visualization Service' }); - const saveVisualizationLoader = new SavedObjectLoader(SavedVis, kbnIndex, es, kbnUrl); + const saveVisualizationLoader = new SavedObjectLoader(SavedVis, kbnIndex, esAdmin, kbnUrl); saveVisualizationLoader.mapHits = function (hit) { const source = hit._source; source.id = hit._id; diff --git a/src/core_plugins/kibana/server/routes/api/ingest/register_field_capabilities.js b/src/core_plugins/kibana/server/routes/api/ingest/register_field_capabilities.js index e4d619ab1e322..67c0f560339fb 100644 --- a/src/core_plugins/kibana/server/routes/api/ingest/register_field_capabilities.js +++ b/src/core_plugins/kibana/server/routes/api/ingest/register_field_capabilities.js @@ -6,7 +6,7 @@ export function registerFieldCapabilities(server) { path: '/api/kibana/{indices}/field_capabilities', method: ['GET'], handler: function (req, reply) { - const callWithRequest = server.plugins.elasticsearch.callWithRequest; + const { callWithRequest } = server.plugins.elasticsearch.getCluster('data'); const indices = req.params.indices || ''; return callWithRequest(req, 'fieldStats', { diff --git a/src/core_plugins/kibana/server/routes/api/scripts/register_languages.js b/src/core_plugins/kibana/server/routes/api/scripts/register_languages.js index 82bb5cc9ba22c..125e4b7734ce8 100644 --- a/src/core_plugins/kibana/server/routes/api/scripts/register_languages.js +++ b/src/core_plugins/kibana/server/routes/api/scripts/register_languages.js @@ -6,7 +6,7 @@ export function registerLanguages(server) { path: '/api/kibana/scripts/languages', method: 'GET', handler: function (request, reply) { - const callWithRequest = server.plugins.elasticsearch.callWithRequest; + const { callWithRequest } = server.plugins.elasticsearch.getCluster('data'); return callWithRequest(request, 'cluster.getSettings', { include_defaults: true, diff --git a/src/core_plugins/kibana/server/routes/api/search/count/register_count.js b/src/core_plugins/kibana/server/routes/api/search/count/register_count.js index 8e542afe70cf0..f7b1c9af2c901 100644 --- a/src/core_plugins/kibana/server/routes/api/search/count/register_count.js +++ b/src/core_plugins/kibana/server/routes/api/search/count/register_count.js @@ -6,7 +6,8 @@ export default function registerCount(server) { path: '/api/kibana/{id}/_count', method: ['POST', 'GET'], handler: function (req, reply) { - const boundCallWithRequest = _.partial(server.plugins.elasticsearch.callWithRequest, req); + const { callWithRequest } = server.plugins.elasticsearch.getCluster('data'); + const boundCallWithRequest = _.partial(callWithRequest, req); boundCallWithRequest('count', { allowNoIndices: false, diff --git a/src/core_plugins/timelion/public/services/saved_sheets.js b/src/core_plugins/timelion/public/services/saved_sheets.js index 89f671a1552f1..b2b0efc113e7d 100644 --- a/src/core_plugins/timelion/public/services/saved_sheets.js +++ b/src/core_plugins/timelion/public/services/saved_sheets.js @@ -15,8 +15,8 @@ define(function (require) { }); // This is the only thing that gets injected into controllers - module.service('savedSheets', function (Promise, SavedSheet, kbnIndex, es, kbnUrl) { - const savedSheetLoader = new SavedObjectLoader(SavedSheet, kbnIndex, es, kbnUrl); + module.service('savedSheets', function (Promise, SavedSheet, kbnIndex, esAdmin, kbnUrl) { + const savedSheetLoader = new SavedObjectLoader(SavedSheet, kbnIndex, esAdmin, kbnUrl); savedSheetLoader.urlFor = function (id) { return kbnUrl.eval('#/{{id}}', { id: id }); }; diff --git a/src/core_plugins/timelion/server/routes/validate_es.js b/src/core_plugins/timelion/server/routes/validate_es.js index eaa625beca177..20fa7b88e9f16 100644 --- a/src/core_plugins/timelion/server/routes/validate_es.js +++ b/src/core_plugins/timelion/server/routes/validate_es.js @@ -3,9 +3,9 @@ module.exports = function (server) { method: 'GET', path: '/api/timelion/validate/es', handler: function (request, reply) { - return server.uiSettings().getAll(request).then((uiSettings) => { - var callWithRequest = server.plugins.elasticsearch.callWithRequest; + + var { callWithRequest } = server.plugins.elasticsearch.getCluster('data'); var timefield = uiSettings['timelion:es.timefield']; diff --git a/src/core_plugins/timelion/server/series_functions/__tests__/es.js b/src/core_plugins/timelion/server/series_functions/__tests__/es.js index 6e227b49889e5..d45485f3757c9 100644 --- a/src/core_plugins/timelion/server/series_functions/__tests__/es.js +++ b/src/core_plugins/timelion/server/series_functions/__tests__/es.js @@ -9,18 +9,23 @@ import esResponse from './fixtures/es_response'; import Promise from 'bluebird'; import _ from 'lodash'; -import {expect} from 'chai'; +import { expect } from 'chai'; +import sinon from 'sinon'; import invoke from './helpers/invoke_series_fn.js'; function stubResponse(response) { return { - server: {plugins:{ - elasticsearch: { - callWithRequest: function () { - return Promise.resolve(response); + server: { + plugins:{ + elasticsearch: { + getCluster: sinon.stub().withArgs('data').returns({ + callWithRequest: function () { + return Promise.resolve(response); + } + }) } } - }} + } }; } diff --git a/src/core_plugins/timelion/server/series_functions/es/index.js b/src/core_plugins/timelion/server/series_functions/es/index.js index 16b1c13fcc1bb..216f579d79504 100644 --- a/src/core_plugins/timelion/server/series_functions/es/index.js +++ b/src/core_plugins/timelion/server/series_functions/es/index.js @@ -61,9 +61,10 @@ module.exports = new Datasource('es', { fit: 'nearest' }); - var callWithRequest = tlConfig.server.plugins.elasticsearch.callWithRequest; + const { callWithRequest } = tlConfig.server.plugins.elasticsearch.getCluster('data'); + + const body = buildRequest(config, tlConfig); - var body = buildRequest(config, tlConfig); return callWithRequest(tlConfig.request, 'search', body).then(function (resp) { if (!resp._shards.total) throw new Error('Elasticsearch index not found: ' + config.index); return { diff --git a/src/server/http/short_url_lookup.js b/src/server/http/short_url_lookup.js index 1341128c5ee99..8e64405a34377 100644 --- a/src/server/http/short_url_lookup.js +++ b/src/server/http/short_url_lookup.js @@ -2,7 +2,7 @@ import crypto from 'crypto'; export default function (server) { async function updateMetadata(urlId, urlDoc, req) { - const callWithRequest = server.plugins.elasticsearch.callWithRequest; + const { callWithRequest } = server.plugins.elasticsearch.getCluster('admin'); const kibanaIndex = server.config().get('kibana.index'); try { @@ -25,7 +25,7 @@ export default function (server) { async function getUrlDoc(urlId, req) { const urlDoc = await new Promise((resolve, reject) => { - const callWithRequest = server.plugins.elasticsearch.callWithRequest; + const { callWithRequest } = server.plugins.elasticsearch.getCluster('admin'); const kibanaIndex = server.config().get('kibana.index'); callWithRequest(req, 'get', { @@ -46,7 +46,7 @@ export default function (server) { async function createUrlDoc(url, urlId, req) { const newUrlId = await new Promise((resolve, reject) => { - const callWithRequest = server.plugins.elasticsearch.callWithRequest; + const { callWithRequest } = server.plugins.elasticsearch.getCluster('admin'); const kibanaIndex = server.config().get('kibana.index'); callWithRequest(req, 'index', { diff --git a/src/server/plugins/check_enabled.js b/src/server/plugins/check_enabled.js index 7953165d88007..97fa9bf19c030 100644 --- a/src/server/plugins/check_enabled.js +++ b/src/server/plugins/check_enabled.js @@ -1,12 +1,22 @@ import toPath from 'lodash/internal/toPath'; export default async function (kbnServer, server, config) { + const forcedOverride = { + console: function (enabledInConfig) { + return !config.get('elasticsearch.tribe.url') && enabledInConfig; + } + }; + const { plugins } = kbnServer; for (let plugin of plugins) { const enabledInConfig = config.get([...toPath(plugin.configPrefix), 'enabled']); - - if (!enabledInConfig) { + const hasOveride = forcedOverride.hasOwnProperty(plugin.id); + if (hasOveride) { + if (!forcedOverride[plugin.id](enabledInConfig)) { + plugins.disable(plugin); + } + } else if (!enabledInConfig) { plugins.disable(plugin); } } diff --git a/src/ui/public/chrome/api/angular.js b/src/ui/public/chrome/api/angular.js index 5755dafff9166..cd91ac7cfbcce 100644 --- a/src/ui/public/chrome/api/angular.js +++ b/src/ui/public/chrome/api/angular.js @@ -31,6 +31,11 @@ module.exports = function (chrome, internals) { a.href = chrome.addBasePath('/elasticsearch'); return a.href; }())) + .value('esAdminUrl', (function () { + const a = document.createElement('a'); + a.href = chrome.addBasePath('/es_admin'); + return a.href; + }())) .config(chrome.$setupXsrfRequestInterceptor) .config(['$compileProvider', function ($compileProvider) { if (!internals.devMode) { diff --git a/src/ui/public/courier/__tests__/request_queue.js b/src/ui/public/courier/__tests__/request_queue.js index 947671686e1bd..e693852231acc 100644 --- a/src/ui/public/courier/__tests__/request_queue.js +++ b/src/ui/public/courier/__tests__/request_queue.js @@ -4,7 +4,7 @@ import sinon from 'auto-release-sinon'; import RequestQueueProv from '../_request_queue'; import SearchStrategyProv from '../fetch/strategy/search'; -import DocStrategyProv from '../fetch/strategy/doc'; +import DocStrategyProv from '../fetch/strategy/doc_data'; describe('Courier Request Queue', function () { let docStrategy; diff --git a/src/ui/public/courier/__tests__/saved_object.js b/src/ui/public/courier/__tests__/saved_object.js index 23adcde6eab50..b8dfacaa39162 100644 --- a/src/ui/public/courier/__tests__/saved_object.js +++ b/src/ui/public/courier/__tests__/saved_object.js @@ -9,7 +9,7 @@ import BluebirdPromise from 'bluebird'; import SavedObjectFactory from '../saved_object/saved_object'; import IndexPatternFactory from 'ui/index_patterns/_index_pattern'; -import DocSourceProvider from '../data_source/doc_source'; +import DocSourceProvider from '../data_source/admin_doc_source'; import { stubMapper } from 'test_utils/stub_mapper'; @@ -19,7 +19,8 @@ describe('Saved Object', function () { let SavedObject; let IndexPattern; - let esStub; + let esAdminStub; + let esDataStub; let DocSource; /** @@ -29,7 +30,7 @@ describe('Saved Object', function () { // Allows the type 'dashboard' to be used. // Unfortunately we need to use bluebird here instead of native promises because there is // a call to finally. - sinon.stub(esStub.indices, 'getFieldMapping').returns(BluebirdPromise.resolve({ + sinon.stub(esAdminStub.indices, 'getFieldMapping').returns(BluebirdPromise.resolve({ '.kibana' : { 'mappings': { 'dashboard': {} @@ -38,8 +39,8 @@ describe('Saved Object', function () { })); // Necessary to avoid a timeout condition. - sinon.stub(esStub.indices, 'putMapping').returns(BluebirdPromise.resolve()); - sinon.stub(esStub.indices, 'refresh').returns(BluebirdPromise.resolve()); + sinon.stub(esAdminStub.indices, 'putMapping').returns(BluebirdPromise.resolve()); + sinon.stub(esAdminStub.indices, 'refresh').returns(BluebirdPromise.resolve()); } /** @@ -66,8 +67,10 @@ describe('Saved Object', function () { * @param {Object} mockDocResponse */ function stubESResponse(mockDocResponse) { - sinon.stub(esStub, 'mget').returns(BluebirdPromise.resolve({ docs: [mockDocResponse] })); - sinon.stub(esStub, 'index').returns(BluebirdPromise.resolve(mockDocResponse)); + sinon.stub(esDataStub, 'mget').returns(BluebirdPromise.resolve({ docs: [mockDocResponse] })); + sinon.stub(esDataStub, 'index').returns(BluebirdPromise.resolve(mockDocResponse)); + sinon.stub(esAdminStub, 'mget').returns(BluebirdPromise.resolve({ docs: [mockDocResponse] })); + sinon.stub(esAdminStub, 'index').returns(BluebirdPromise.resolve(mockDocResponse)); } /** @@ -84,10 +87,11 @@ describe('Saved Object', function () { } beforeEach(ngMock.module('kibana')); - beforeEach(ngMock.inject(function (es, Private) { + beforeEach(ngMock.inject(function (es, esAdmin, Private) { SavedObject = Private(SavedObjectFactory); IndexPattern = Private(IndexPatternFactory); - esStub = es; + esAdminStub = esAdmin; + esDataStub = es; DocSource = Private(DocSourceProvider); mockEsService(); diff --git a/src/ui/public/courier/courier.js b/src/ui/public/courier/courier.js index 4e09d24a8ef82..4111f3500c21c 100644 --- a/src/ui/public/courier/courier.js +++ b/src/ui/public/courier/courier.js @@ -14,7 +14,8 @@ import SearchStrategyProvider from './fetch/strategy/search'; import RequestQueueProvider from './_request_queue'; import ErrorHandlersProvider from './_error_handlers'; import FetchProvider from './fetch'; -import DocLooperProvider from './looper/doc'; +import DocDataLooperProvider from './looper/doc_data'; +import DocAdminLooperProvider from './looper/doc_admin'; import SearchLooperProvider from './looper/search'; import RootSearchSourceProvider from './data_source/_root_search_source'; import SavedObjectProvider from './saved_object'; @@ -34,7 +35,8 @@ uiModules.get('kibana/courier') let errorHandlers = Private(ErrorHandlersProvider); let fetch = Private(FetchProvider); - let docLooper = self.docLooper = Private(DocLooperProvider); + let docDataLooper = self.docLooper = Private(DocDataLooperProvider); + let docAdminLooper = self.docLooper = Private(DocAdminLooperProvider); let searchLooper = self.searchLooper = Private(SearchLooperProvider); // expose some internal modules @@ -65,7 +67,8 @@ uiModules.get('kibana/courier') */ self.start = function () { searchLooper.start(); - docLooper.start(); + docDataLooper.start(); + docAdminLooper.start(); return this; }; @@ -124,7 +127,8 @@ uiModules.get('kibana/courier') */ self.close = function () { searchLooper.stop(); - docLooper.stop(); + docAdminLooper.stop(); + docDataLooper.stop(); _.invoke(requestQueue, 'abort'); diff --git a/src/ui/public/courier/data_source/_abstract_doc_source.js b/src/ui/public/courier/data_source/_abstract_doc_source.js new file mode 100644 index 0000000000000..fe74d379fc1aa --- /dev/null +++ b/src/ui/public/courier/data_source/_abstract_doc_source.js @@ -0,0 +1,168 @@ +/** + * @name AbstractDocSource + * + * NOTE: This class is tightly coupled with _doc_send_to_es. Its primary + * methods (`doUpdate`, `doIndex`, `doCreate`) are all proxies for methods + * exposed by _doc_send_to_es (`update`, `index`, `create`). These methods are + * called with AbstractDocSource as the context. When called, they depend on “private” + * AbstractDocSource methods within their execution. + */ + +import _ from 'lodash'; + +import 'ui/es'; +import 'ui/storage'; + +import DocSendToEsProvider from './_doc_send_to_es'; +import AbstractDataSourceProvider from './_abstract'; +import DocRequestProvider from '../fetch/request/_abstract_doc'; + +export default function AbstractDocSourceFactory(Private, Promise, es, sessionStorage) { + const sendToEs = Private(DocSendToEsProvider); + const SourceAbstract = Private(AbstractDataSourceProvider); + const DocRequest = Private(DocRequestProvider); + + _.class(AbstractDocSource).inherits(SourceAbstract); + function AbstractDocSource(initialState, strategy) { + AbstractDocSource.Super.call(this, initialState, strategy); + } + + AbstractDocSource.prototype.onUpdate = SourceAbstract.prototype.onResults; + AbstractDocSource.prototype.onResults = void 0; + + /***** + * PUBLIC API + *****/ + + AbstractDocSource.prototype._createRequest = function (defer) { + return new DocRequest(this, defer); + }; + + /** + * List of methods that is turned into a chainable API in the constructor + * @type {Array} + */ + AbstractDocSource.prototype._methods = [ + 'index', + 'type', + 'id', + 'sourceInclude', + 'sourceExclude' + ]; + + /** + * Applies a partial update to the document + * @param {object} fields - The fields to change and their new values (es doc field) + * @return {undefined} + */ + AbstractDocSource.prototype.doUpdate = function (fields) { + if (!this._state.id) return this.doIndex(fields); + return sendToEs.call(this, 'update', false, { doc: fields }); + }; + + /** + * Update the document stored + * @param {[type]} body [description] + * @return {[type]} [description] + */ + AbstractDocSource.prototype.doIndex = function (body) { + return sendToEs.call(this, 'index', false, body); + }; + + AbstractDocSource.prototype.doCreate = function (body) { + return sendToEs.call(this, 'create', false, body, []); + }; + + /***** + * PRIVATE API + *****/ + + /** + * Get the type of this SourceAbstract + * @return {string} - 'doc' + */ + AbstractDocSource.prototype._getType = function () { + return 'doc'; + }; + + /** + * Used to merge properties into the state within ._flatten(). + * The state is passed in and modified by the function + * + * @param {object} state - the current merged state + * @param {*} val - the value at `key` + * @param {*} key - The key of `val` + * @return {undefined} + */ + AbstractDocSource.prototype._mergeProp = function (state, val, key) { + const flatKey = '_' + key; + + if (val != null && state[flatKey] == null) { + state[flatKey] = val; + } + }; + + /** + * Creates a key based on the doc's index/type/id + * @return {string} + */ + AbstractDocSource.prototype._versionKey = function () { + const state = this._state; + + if (!state.index || !state.type || !state.id) return; + return 'DocVersion:' + ( + [ state.index, state.type, state.id ] + .map(encodeURIComponent) + .join('/') + ); + }; + + /** + * Get the cached version number, not the version that is + * stored/shared with other tabs + * + * @return {number} - the version number, or undefined + */ + AbstractDocSource.prototype._getVersion = function () { + if (this._version) return this._version; + else return this._getStoredVersion(); + }; + + /** + * Fetches the stored version from storage + * @return {[type]} [description] + */ + AbstractDocSource.prototype._getStoredVersion = function () { + const key = this._versionKey(); + if (!key) return; + + const v = sessionStorage.get(key); + this._version = v ? _.parseInt(v) : void 0; + return this._version; + }; + + /** + * Stores the version into storage + * @param {number, NaN} version - the current version number, NaN works well forcing a refresh + * @return {undefined} + */ + AbstractDocSource.prototype._storeVersion = function (version) { + if (!version) return this._clearVersion(); + + const key = this._versionKey(); + if (!key) return; + this._version = version; + sessionStorage.set(key, version); + }; + + /** + * Clears the stored version for a AbstractDocSource + */ + AbstractDocSource.prototype._clearVersion = function () { + const key = this._versionKey(); + if (!key) return; + sessionStorage.remove(key); + }; + + return AbstractDocSource; +} diff --git a/src/ui/public/courier/data_source/_doc_send_to_es.js b/src/ui/public/courier/data_source/_doc_send_to_es.js index f7191b03100a9..15a1c1da87605 100644 --- a/src/ui/public/courier/data_source/_doc_send_to_es.js +++ b/src/ui/public/courier/data_source/_doc_send_to_es.js @@ -11,7 +11,7 @@ import errors from 'ui/errors'; import RequestQueueProvider from 'ui/courier/_request_queue'; import FetchProvider from 'ui/courier/fetch/fetch'; -export default function (Promise, Private, es) { +export default function (Promise, Private, es, esAdmin, kbnIndex) { let requestQueue = Private(RequestQueueProvider); let courierFetch = Private(FetchProvider); @@ -33,7 +33,8 @@ export default function (Promise, Private, es) { params.version = doc._getVersion(); } - return es[method](params) + const client = [].concat(params.index).includes(kbnIndex) ? esAdmin : es; + return client[method](params) .then(function (resp) { if (resp.status === 409) throw new errors.VersionConflict(resp); diff --git a/src/ui/public/courier/data_source/admin_doc_source.js b/src/ui/public/courier/data_source/admin_doc_source.js new file mode 100644 index 0000000000000..8c45896b8b07d --- /dev/null +++ b/src/ui/public/courier/data_source/admin_doc_source.js @@ -0,0 +1,27 @@ +/** + * @name AdminDocSource + */ + +import _ from 'lodash'; + +import AbstractDocSourceProvider from './_abstract_doc_source'; +import DocStrategyProvider from '../fetch/strategy/doc_admin'; +import DocRequestProvider from '../fetch/request/doc_admin'; + +export default function DocSourceFactory(Private) { + const AbstractDocSource = Private(AbstractDocSourceProvider); + const docStrategy = Private(DocStrategyProvider); + const DocRequest = Private(DocRequestProvider); + + class AdminDocSource extends AbstractDocSource { + constructor(initialState) { + super(initialState, docStrategy); + } + + _createRequest(defer) { + return new DocRequest(this, defer); + } + } + + return AdminDocSource; +} diff --git a/src/ui/public/courier/data_source/doc_source.js b/src/ui/public/courier/data_source/doc_source.js index 80be1cf7b6095..1729eabd072b3 100644 --- a/src/ui/public/courier/data_source/doc_source.js +++ b/src/ui/public/courier/data_source/doc_source.js @@ -1,170 +1,27 @@ /** * @name DocSource - * - * NOTE: This class is tightly coupled with _doc_send_to_es. Its primary - * methods (`doUpdate`, `doIndex`, `doCreate`) are all proxies for methods - * exposed by _doc_send_to_es (`update`, `index`, `create`). These methods are - * called with DocSource as the context. When called, they depend on “private” - * DocSource methods within their execution. */ import _ from 'lodash'; -import 'ui/es'; -import 'ui/storage'; +import AbstractDocSourceProvider from './_abstract_doc_source'; +import DocStrategyProvider from '../fetch/strategy/doc_data'; +import DocRequestProvider from '../fetch/request/doc_data'; -import DocSendToEsProvider from './_doc_send_to_es'; -import AbstractDataSourceProvider from './_abstract'; -import DocRequestProvider from '../fetch/request/doc'; -import DocStrategyProvider from '../fetch/strategy/doc'; +export default function DocSourceFactory(Private) { + const AbstractDocSource = Private(AbstractDocSourceProvider); + const docStrategy = Private(DocStrategyProvider); + const DocRequest = Private(DocRequestProvider); -export default function DocSourceFactory(Private, Promise, es, sessionStorage) { - let sendToEs = Private(DocSendToEsProvider); - let SourceAbstract = Private(AbstractDataSourceProvider); - let DocRequest = Private(DocRequestProvider); - let docStrategy = Private(DocStrategyProvider); - - _.class(DocSource).inherits(SourceAbstract); - function DocSource(initialState) { - DocSource.Super.call(this, initialState, docStrategy); - } - - DocSource.prototype.onUpdate = SourceAbstract.prototype.onResults; - DocSource.prototype.onResults = void 0; - - /***** - * PUBLIC API - *****/ - - DocSource.prototype._createRequest = function (defer) { - return new DocRequest(this, defer); - }; - - /** - * List of methods that is turned into a chainable API in the constructor - * @type {Array} - */ - DocSource.prototype._methods = [ - 'index', - 'type', - 'id', - 'sourceInclude', - 'sourceExclude' - ]; - - /** - * Applies a partial update to the document - * @param {object} fields - The fields to change and their new values (es doc field) - * @return {undefined} - */ - DocSource.prototype.doUpdate = function (fields) { - if (!this._state.id) return this.doIndex(fields); - return sendToEs.call(this, 'update', false, { doc: fields }); - }; - - /** - * Update the document stored - * @param {[type]} body [description] - * @return {[type]} [description] - */ - DocSource.prototype.doIndex = function (body) { - return sendToEs.call(this, 'index', false, body); - }; - - DocSource.prototype.doCreate = function (body) { - return sendToEs.call(this, 'create', false, body, []); - }; - - /***** - * PRIVATE API - *****/ - - /** - * Get the type of this SourceAbstract - * @return {string} - 'doc' - */ - DocSource.prototype._getType = function () { - return 'doc'; - }; - - /** - * Used to merge properties into the state within ._flatten(). - * The state is passed in and modified by the function - * - * @param {object} state - the current merged state - * @param {*} val - the value at `key` - * @param {*} key - The key of `val` - * @return {undefined} - */ - DocSource.prototype._mergeProp = function (state, val, key) { - key = '_' + key; - - if (val != null && state[key] == null) { - state[key] = val; + class DocSource extends AbstractDocSource { + constructor(initialState) { + super(initialState, docStrategy); } - }; - - /** - * Creates a key based on the doc's index/type/id - * @return {string} - */ - DocSource.prototype._versionKey = function () { - let state = this._state; - - if (!state.index || !state.type || !state.id) return; - return 'DocVersion:' + ( - [ state.index, state.type, state.id ] - .map(encodeURIComponent) - .join('/') - ); - }; - - /** - * Get the cached version number, not the version that is - * stored/shared with other tabs - * - * @return {number} - the version number, or undefined - */ - DocSource.prototype._getVersion = function () { - if (this._version) return this._version; - else return this._getStoredVersion(); - }; - - /** - * Fetches the stored version from storage - * @return {[type]} [description] - */ - DocSource.prototype._getStoredVersion = function () { - let key = this._versionKey(); - if (!key) return; - let v = sessionStorage.get(key); - this._version = v ? _.parseInt(v) : void 0; - return this._version; - }; - - /** - * Stores the version into storage - * @param {number, NaN} version - the current version number, NaN works well forcing a refresh - * @return {undefined} - */ - DocSource.prototype._storeVersion = function (version) { - if (!version) return this._clearVersion(); - - let key = this._versionKey(); - if (!key) return; - this._version = version; - sessionStorage.set(key, version); - }; - - /** - * Clears the stored version for a DocSource - */ - DocSource.prototype._clearVersion = function () { - let key = this._versionKey(); - if (!key) return; - sessionStorage.remove(key); - }; + _createRequest(defer) { + return new DocRequest(this, defer); + } + } return DocSource; }; diff --git a/src/ui/public/courier/fetch/__tests__/doc.js b/src/ui/public/courier/fetch/__tests__/doc.js index ebecce6b32b98..e33a14273dbeb 100644 --- a/src/ui/public/courier/fetch/__tests__/doc.js +++ b/src/ui/public/courier/fetch/__tests__/doc.js @@ -3,7 +3,7 @@ import expect from 'expect.js'; import ngMock from 'ng_mock'; import DocSourceProvider from '../../data_source/doc_source'; -import DocRequestProvider from '../request/doc'; +import DocRequestProvider from '../request/doc_data'; describe('Courier DocFetchRequest class', function () { let storage; diff --git a/src/ui/public/courier/fetch/call_client.js b/src/ui/public/courier/fetch/call_client.js index 21c130cde4809..2b2a0c5669d40 100644 --- a/src/ui/public/courier/fetch/call_client.js +++ b/src/ui/public/courier/fetch/call_client.js @@ -4,7 +4,7 @@ import IsRequestProvider from './is_request'; import MergeDuplicatesRequestProvider from './merge_duplicate_requests'; import ReqStatusProvider from './req_status'; -export default function CourierFetchCallClient(Private, Promise, es) { +export default function CourierFetchCallClient(Private, Promise, esAdmin, es) { const isRequest = Private(IsRequestProvider); const mergeDuplicateRequests = Private(MergeDuplicatesRequestProvider); @@ -94,7 +94,9 @@ export default function CourierFetchCallClient(Private, Promise, es) { throw ABORTED; } - return (esPromise = es[strategy.clientMethod]({ body })); + const id = strategy.id; + const client = (id && id.includes('admin')) ? esAdmin : es; + return (esPromise = client[strategy.clientMethod]({ body })); }) .then(function (clientResp) { return strategy.getResponses(clientResp); diff --git a/src/ui/public/courier/fetch/request/doc.js b/src/ui/public/courier/fetch/request/_abstract_doc.js similarity index 80% rename from src/ui/public/courier/fetch/request/doc.js rename to src/ui/public/courier/fetch/request/_abstract_doc.js index bbe0d12e7ee6c..58cac9ce5429d 100644 --- a/src/ui/public/courier/fetch/request/doc.js +++ b/src/ui/public/courier/fetch/request/_abstract_doc.js @@ -1,17 +1,14 @@ -import DocStrategyProvider from '../strategy/doc'; import AbstractRequestProvider from './request'; export default function DocRequestProvider(Private) { - const docStrategy = Private(DocStrategyProvider); const AbstractRequest = Private(AbstractRequestProvider); - class DocRequest extends AbstractRequest { + class AbstractDocRequest extends AbstractRequest { constructor(...args) { super(...args); this.type = 'doc'; - this.strategy = docStrategy; } canStart() { @@ -39,5 +36,6 @@ export default function DocRequestProvider(Private) { } } - return DocRequest; -}; + + return AbstractDocRequest; +} diff --git a/src/ui/public/courier/fetch/request/doc_admin.js b/src/ui/public/courier/fetch/request/doc_admin.js new file mode 100644 index 0000000000000..9549766cbd36b --- /dev/null +++ b/src/ui/public/courier/fetch/request/doc_admin.js @@ -0,0 +1,14 @@ +import DocStrategyProvider from '../strategy/doc_admin'; +import AbstractDocRequestProvider from './_abstract_doc'; + +export default function DocRequestProvider(Private) { + + const docStrategy = Private(DocStrategyProvider); + const AbstractDocRequest = Private(AbstractDocRequestProvider); + + class AdminDocRequest extends AbstractDocRequest { + strategy = docStrategy; + } + + return AdminDocRequest; +} diff --git a/src/ui/public/courier/fetch/request/doc_data.js b/src/ui/public/courier/fetch/request/doc_data.js new file mode 100644 index 0000000000000..832e31e3c9770 --- /dev/null +++ b/src/ui/public/courier/fetch/request/doc_data.js @@ -0,0 +1,14 @@ +import DocStrategyProvider from '../strategy/doc_data'; +import AbstractDocRequestProvider from './_abstract_doc'; + +export default function DocRequestProvider(Private) { + + const docStrategy = Private(DocStrategyProvider); + const AbstractDocRequest = Private(AbstractDocRequestProvider); + + class DataDocRequest extends AbstractDocRequest { + strategy = docStrategy; + } + + return DataDocRequest; +} diff --git a/src/ui/public/courier/fetch/strategy/doc_admin.js b/src/ui/public/courier/fetch/strategy/doc_admin.js new file mode 100644 index 0000000000000..8bd768dc2d60a --- /dev/null +++ b/src/ui/public/courier/fetch/strategy/doc_admin.js @@ -0,0 +1,26 @@ +export default function FetchStrategyForDoc(Promise) { + return { + id: 'doc_admin', + clientMethod: 'mget', + + /** + * Flatten a series of requests into as ES request body + * @param {array} requests - an array of flattened requests + * @return {Promise} - a promise that is fulfilled by the request body + */ + reqsFetchParamsToBody: function (reqsFetchParams) { + return Promise.resolve({ + docs: reqsFetchParams + }); + }, + + /** + * Fetch the multiple responses from the ES Response + * @param {object} resp - The response sent from Elasticsearch + * @return {array} - the list of responses + */ + getResponses: function (resp) { + return resp.docs; + } + }; +} diff --git a/src/ui/public/courier/fetch/strategy/doc.js b/src/ui/public/courier/fetch/strategy/doc_data.js similarity index 100% rename from src/ui/public/courier/fetch/strategy/doc.js rename to src/ui/public/courier/fetch/strategy/doc_data.js diff --git a/src/ui/public/courier/looper/doc_admin.js b/src/ui/public/courier/looper/doc_admin.js new file mode 100644 index 0000000000000..09ef5ef568ea5 --- /dev/null +++ b/src/ui/public/courier/looper/doc_admin.js @@ -0,0 +1,19 @@ +import FetchProvider from '../fetch'; +import LooperProvider from './_looper'; +import DocStrategyProvider from '../fetch/strategy/doc_admin'; + +export default function DocLooperService(Private) { + const fetch = Private(FetchProvider); + const Looper = Private(LooperProvider); + const DocStrategy = Private(DocStrategyProvider); + + /** + * The Looper which will manage the doc fetch interval + * @type {Looper} + */ + const docLooper = new Looper(1500, function () { + fetch.fetchQueued(DocStrategy); + }); + + return docLooper; +} diff --git a/src/ui/public/courier/looper/doc.js b/src/ui/public/courier/looper/doc_data.js similarity index 88% rename from src/ui/public/courier/looper/doc.js rename to src/ui/public/courier/looper/doc_data.js index e0f21394777c8..b8f8d9e3081a8 100644 --- a/src/ui/public/courier/looper/doc.js +++ b/src/ui/public/courier/looper/doc_data.js @@ -1,6 +1,6 @@ import FetchProvider from '../fetch'; import LooperProvider from './_looper'; -import DocStrategyProvider from '../fetch/strategy/doc'; +import DocStrategyProvider from '../fetch/strategy/doc_data'; export default function DocLooperService(Private) { let fetch = Private(FetchProvider); diff --git a/src/ui/public/courier/saved_object/saved_object.js b/src/ui/public/courier/saved_object/saved_object.js index a89a7c440d92d..89f7f3f92b72b 100644 --- a/src/ui/public/courier/saved_object/saved_object.js +++ b/src/ui/public/courier/saved_object/saved_object.js @@ -16,10 +16,10 @@ import errors from 'ui/errors'; import uuid from 'node-uuid'; import MappingSetupProvider from 'ui/utils/mapping_setup'; -import DocSourceProvider from '../data_source/doc_source'; +import DocSourceProvider from '../data_source/admin_doc_source'; import SearchSourceProvider from '../data_source/search_source'; -export default function SavedObjectFactory(es, kbnIndex, Promise, Private, Notifier, safeConfirm, indexPatterns) { +export default function SavedObjectFactory(esAdmin, kbnIndex, Promise, Private, Notifier, safeConfirm, indexPatterns) { let DocSource = Private(DocSourceProvider); let SearchSource = Private(SearchSourceProvider); @@ -255,7 +255,7 @@ export default function SavedObjectFactory(es, kbnIndex, Promise, Private, Notif * @returns {Promise} */ function refreshIndex() { - return es.indices.refresh({ index: kbnIndex }); + return esAdmin.indices.refresh({ index: kbnIndex }); } /** @@ -312,7 +312,7 @@ export default function SavedObjectFactory(es, kbnIndex, Promise, Private, Notif * @return {promise} */ this.delete = () => { - return es.delete( + return esAdmin.delete( { index: kbnIndex, type: type, diff --git a/src/ui/public/courier/saved_object/saved_object_loader.js b/src/ui/public/courier/saved_object/saved_object_loader.js index 487f4348e59af..d4ad71729847a 100644 --- a/src/ui/public/courier/saved_object/saved_object_loader.js +++ b/src/ui/public/courier/saved_object/saved_object_loader.js @@ -3,15 +3,15 @@ import Scanner from 'ui/utils/scanner'; import { StringUtils } from 'ui/utils/string_utils'; export class SavedObjectLoader { - constructor(SavedObjectClass, kbnIndex, es, kbnUrl) { + constructor(SavedObjectClass, kbnIndex, esAdmin, kbnUrl) { this.type = SavedObjectClass.type; this.Class = SavedObjectClass; this.lowercaseType = this.type.toLowerCase(); this.kbnIndex = kbnIndex; this.kbnUrl = kbnUrl; - this.es = es; + this.esAdmin = esAdmin; - this.scanner = new Scanner(es, { + this.scanner = new Scanner(esAdmin, { index: kbnIndex, type: this.lowercaseType }); @@ -92,7 +92,7 @@ export class SavedObjectLoader { body = { query: { match_all: {} } }; } - return this.es.search({ + return this.esAdmin.search({ index: this.kbnIndex, type: this.type.toLowerCase(), body, diff --git a/src/ui/public/es.js b/src/ui/public/es.js index def095dc08b41..7bab5006cc1e5 100644 --- a/src/ui/public/es.js +++ b/src/ui/public/es.js @@ -9,38 +9,45 @@ import 'elasticsearch-browser'; import _ from 'lodash'; import uiModules from 'ui/modules'; -let es; // share the client amongst all apps +const plugins = [function (Client, config) { + // esFactory automatically injects the AngularConnector to the config + // https://github.com/elastic/elasticsearch-js/blob/master/src/lib/connectors/angular.js + class CustomAngularConnector extends config.connectionClass { + request = _.wrap(this.request, function (request, params, cb) { + if (String(params.method).toUpperCase() === 'GET') { + params.query = _.defaults({ _: Date.now() }, params.query); + } + + return request.call(this, params, cb); + }); + } + + config.connectionClass = CustomAngularConnector; +}]; + uiModules .get('kibana', ['elasticsearch', 'kibana/config']) - .service('es', function (esFactory, esUrl, $q, esApiVersion, esRequestTimeout) { - if (es) return es; - es = esFactory({ + //Elasticsearch client used for requesting data. Connects to the /elasticsearch proxy, + //Uses a tribe node if configured, otherwise uses the base elasticsearch configuration + .service('es', function (esFactory, esUrl, esApiVersion, esRequestTimeout) { + return esFactory({ host: esUrl, log: 'info', requestTimeout: esRequestTimeout, apiVersion: esApiVersion, - plugins: [function (Client, config) { - - // esFactory automatically injects the AngularConnector to the config - // https://github.com/elastic/elasticsearch-js/blob/master/src/lib/connectors/angular.js - _.class(CustomAngularConnector).inherits(config.connectionClass); - function CustomAngularConnector(host, config) { - CustomAngularConnector.Super.call(this, host, config); - - this.request = _.wrap(this.request, function (request, params, cb) { - if (String(params.method).toUpperCase() === 'GET') { - params.query = _.defaults({ _: Date.now() }, params.query); - } - - return request.call(this, params, cb); - }); - } - - config.connectionClass = CustomAngularConnector; - - }] + plugins }); + }) - return es; + //Elasticsearch client used for managing Kibana's state. Connects to the /es-admin proxy, + //Always uses the base elasticsearch configuartion + .service('esAdmin', function (esFactory, esAdminUrl, esApiVersion, esRequestTimeout) { + return esFactory({ + host: esAdminUrl, + log: 'info', + requestTimeout: esRequestTimeout, + apiVersion: esApiVersion, + plugins + }); }); diff --git a/src/ui/public/index_patterns/__tests__/_index_pattern.js b/src/ui/public/index_patterns/__tests__/_index_pattern.js index 5acab51892973..156a1b6ec8152 100644 --- a/src/ui/public/index_patterns/__tests__/_index_pattern.js +++ b/src/ui/public/index_patterns/__tests__/_index_pattern.js @@ -7,7 +7,7 @@ import errors from 'ui/errors'; import IndexedArray from 'ui/indexed_array'; import FixturesLogstashFieldsProvider from 'fixtures/logstash_fields'; import FixturesStubbedDocSourceResponseProvider from 'fixtures/stubbed_doc_source_response'; -import DocSourceProvider from 'ui/courier/data_source/doc_source'; +import DocSourceProvider from 'ui/courier/data_source/admin_doc_source'; import UtilsMappingSetupProvider from 'ui/utils/mapping_setup'; import IndexPatternsIntervalsProvider from 'ui/index_patterns/_intervals'; import IndexPatternsIndexPatternProvider from 'ui/index_patterns/_index_pattern'; diff --git a/src/ui/public/index_patterns/_get_ids.js b/src/ui/public/index_patterns/_get_ids.js index b499b1dd3859d..99c067d84e5a5 100644 --- a/src/ui/public/index_patterns/_get_ids.js +++ b/src/ui/public/index_patterns/_get_ids.js @@ -1,5 +1,5 @@ import _ from 'lodash'; -export default function GetIndexPatternIdsFn(es, kbnIndex) { +export default function GetIndexPatternIdsFn(esAdmin, kbnIndex) { // many places may require the id list, so we will cache it seperately // didn't incorportate with the indexPattern cache to prevent id collisions. @@ -13,7 +13,7 @@ export default function GetIndexPatternIdsFn(es, kbnIndex) { }); } - cachedPromise = es.search({ + cachedPromise = esAdmin.search({ index: kbnIndex, type: 'index-pattern', storedFields: [], diff --git a/src/ui/public/index_patterns/_index_pattern.js b/src/ui/public/index_patterns/_index_pattern.js index d0bb4a21cd5d6..6f6bed1fe1059 100644 --- a/src/ui/public/index_patterns/_index_pattern.js +++ b/src/ui/public/index_patterns/_index_pattern.js @@ -7,7 +7,7 @@ import RegistryFieldFormatsProvider from 'ui/registry/field_formats'; import IndexPatternsGetIdsProvider from 'ui/index_patterns/_get_ids'; import IndexPatternsMapperProvider from 'ui/index_patterns/_mapper'; import IndexPatternsIntervalsProvider from 'ui/index_patterns/_intervals'; -import DocSourceProvider from 'ui/courier/data_source/doc_source'; +import DocSourceProvider from 'ui/courier/data_source/admin_doc_source'; import UtilsMappingSetupProvider from 'ui/utils/mapping_setup'; import IndexPatternsFieldListProvider from 'ui/index_patterns/_field_list'; import IndexPatternsFlattenHitProvider from 'ui/index_patterns/_flatten_hit'; diff --git a/src/ui/public/index_patterns/_mapper.js b/src/ui/public/index_patterns/_mapper.js index b3a0d5b4584e2..ac3c9a4833c8a 100644 --- a/src/ui/public/index_patterns/_mapper.js +++ b/src/ui/public/index_patterns/_mapper.js @@ -6,7 +6,7 @@ import IndexPatternsTransformMappingIntoFieldsProvider from 'ui/index_patterns/_ import IndexPatternsIntervalsProvider from 'ui/index_patterns/_intervals'; import IndexPatternsPatternToWildcardProvider from 'ui/index_patterns/_pattern_to_wildcard'; import IndexPatternsLocalCacheProvider from 'ui/index_patterns/_local_cache'; -export default function MapperService(Private, Promise, es, config, kbnIndex) { +export default function MapperService(Private, Promise, es, esAdmin, config, kbnIndex) { let enhanceFieldsWithCapabilities = Private(EnhanceFieldsWithCapabilitiesProvider); let transformMappingIntoFields = Private(IndexPatternsTransformMappingIntoFieldsProvider); @@ -37,7 +37,7 @@ export default function MapperService(Private, Promise, es, config, kbnIndex) { if (cache) return Promise.resolve(cache); if (!skipIndexPatternCache) { - return es.get({ + return esAdmin.get({ index: kbnIndex, type: 'index-pattern', id: id, diff --git a/src/ui/public/index_patterns/index_patterns.js b/src/ui/public/index_patterns/index_patterns.js index 18ea1b7c95cfd..190f3f7fe86c7 100644 --- a/src/ui/public/index_patterns/index_patterns.js +++ b/src/ui/public/index_patterns/index_patterns.js @@ -11,7 +11,7 @@ import RegistryFieldFormatsProvider from 'ui/registry/field_formats'; import uiModules from 'ui/modules'; let module = uiModules.get('kibana/index_patterns'); -function IndexPatternsProvider(es, Notifier, Private, Promise, kbnIndex) { +function IndexPatternsProvider(esAdmin, Notifier, Private, Promise, kbnIndex) { let self = this; let IndexPattern = Private(IndexPatternsIndexPatternProvider); @@ -34,7 +34,7 @@ function IndexPatternsProvider(es, Notifier, Private, Promise, kbnIndex) { self.getIds.clearCache(); pattern.destroy(); - return es.delete({ + return esAdmin.delete({ index: kbnIndex, type: 'index-pattern', id: pattern.id diff --git a/src/ui/public/utils/mapping_setup.js b/src/ui/public/utils/mapping_setup.js index fdca52ff7afc9..a4085ff354d13 100644 --- a/src/ui/public/utils/mapping_setup.js +++ b/src/ui/public/utils/mapping_setup.js @@ -1,7 +1,7 @@ import angular from 'angular'; import _ from 'lodash'; define(function () { - return function MappingSetupService(kbnIndex, es) { + return function MappingSetupService(kbnIndex, esAdmin) { let mappingSetup = this; let json = { @@ -23,7 +23,7 @@ define(function () { * @return {[type]} [description] */ let getKnownKibanaTypes = _.once(function () { - return es.indices.getFieldMapping({ + return esAdmin.indices.getFieldMapping({ // only concerned with types in this kibana index index: kbnIndex, // check all types @@ -83,7 +83,7 @@ define(function () { properties: mapping }; - return es.indices.putMapping({ + return esAdmin.indices.putMapping({ index: kbnIndex, type: type, body: body diff --git a/src/ui/settings/__tests__/index.js b/src/ui/settings/__tests__/index.js index d2ffc25764641..f8b41a023a57c 100644 --- a/src/ui/settings/__tests__/index.js +++ b/src/ui/settings/__tests__/index.js @@ -343,7 +343,7 @@ describe('ui settings', function () { }); function expectElasticsearchGetQuery(server, req, configGet) { - const { callWithRequest } = server.plugins.elasticsearch; + const { callWithRequest } = server.plugins.elasticsearch.getCluster('admin'); sinon.assert.calledOnce(callWithRequest); const [reqPassed, method, params] = callWithRequest.args[0]; expect(reqPassed).to.be(req); @@ -356,7 +356,7 @@ function expectElasticsearchGetQuery(server, req, configGet) { } function expectElasticsearchUpdateQuery(server, req, configGet, doc) { - const { callWithRequest } = server.plugins.elasticsearch; + const { callWithRequest } = server.plugins.elasticsearch.getCluster('admin'); sinon.assert.calledOnce(callWithRequest); const [reqPassed, method, params] = callWithRequest.args[0]; expect(reqPassed).to.be(req); @@ -388,27 +388,37 @@ function instantiate({ getResult, callWithRequest, settingsStatusOverrides } = { }, ready: sinon.stub().returns(Promise.resolve()) }; + const req = { __stubHapiRequest: true, path: '', headers: {} }; + + const adminCluster = { + errors: esErrors, + callWithInternalUser: sinon.stub(), + callWithRequest: sinon.spy((withReq, method, params) => { + if (callWithRequest) { + return callWithRequest(withReq, method, params); + } + + expect(withReq).to.be(req); + switch (method) { + case 'get': + return Promise.resolve({ _source: getResult }); + case 'update': + return Promise.resolve(); + default: + throw new Error(`callWithRequest() is using unexpected method "${method}"`); + } + }) + }; + + adminCluster.callWithInternalUser.withArgs('get', sinon.match.any).returns(Promise.resolve({ _source: getResult })); + adminCluster.callWithInternalUser.withArgs('update', sinon.match.any).returns(Promise.resolve()); + const server = { decorate: (_, key, value) => server[key] = value, plugins: { elasticsearch: { - errors: esErrors, - callWithRequest: sinon.spy((withReq, method, params) => { - if (callWithRequest) { - return callWithRequest(withReq, method, params); - } - - expect(withReq).to.be(req); - switch (method) { - case 'get': - return Promise.resolve({ _source: getResult }); - case 'update': - return Promise.resolve(); - default: - throw new Error(`callWithRequest() is using unexpected method "${method}"`); - } - }) + getCluster: sinon.stub().withArgs('admin').returns(adminCluster) } } }; diff --git a/src/ui/settings/index.js b/src/ui/settings/index.js index 986f2f8428fd9..261f1e678e951 100644 --- a/src/ui/settings/index.js +++ b/src/ui/settings/index.js @@ -66,7 +66,7 @@ export default function setupSettings(kbnServer, server, config) { async function getUserProvided(req, { ignore401Errors = false } = {}) { assertRequest(req); - const { callWithRequest, errors } = server.plugins.elasticsearch; + const { callWithRequest, errors } = server.plugins.elasticsearch.getCluster('admin'); // If the ui settings status isn't green, we shouldn't be attempting to get // user settings, since we can't be sure that all the necessary conditions @@ -87,7 +87,7 @@ export default function setupSettings(kbnServer, server, config) { async function setMany(req, changes) { assertRequest(req); - const { callWithRequest } = server.plugins.elasticsearch; + const { callWithRequest } = server.plugins.elasticsearch.getCluster('admin'); const clientParams = { ...getClientSettings(config), body: { doc: changes } diff --git a/tasks/config/esvm.js b/tasks/config/esvm.js index 667735989051f..589d88c69425b 100644 --- a/tasks/config/esvm.js +++ b/tasks/config/esvm.js @@ -14,6 +14,7 @@ module.exports = function (grunt) { } } }, + dev: { options: { directory: resolve(directory, 'dev'), @@ -27,6 +28,60 @@ module.exports = function (grunt) { } } }, + + tribe: { + options: { + directory: resolve(directory, 'tribe'), + config: { + path: { + data: dataDir + } + }, + nodes: [{ + cluster: { name: 'data-01' }, + http: { port: 9201 }, + node: { name: 'node-01', data: true, master: true, max_local_storage_nodes: 5 } + }, { + cluster: { name: 'data-02' }, + http: { port: 9202 }, + node: { name: 'node-02', data: true, master: true, max_local_storage_nodes: 5 } + }, { + cluster: { name: 'admin' }, + http: { port: 9200 }, + node: { name: 'node-03', data: true, master: true, max_local_storage_nodes: 5 } + }, { + cluster: { name: 'tribe' }, + http: { port: 9203 }, + node: { name: 'node-04', max_local_storage_nodes: 5 }, + tribe: { + c1: { + cluster: { + name: 'data-01' + } + }, + c2: { + cluster: { + name: 'data-02' + } + }, + on_conflict: 'prefer_c1', + blocks: { + write: true + } + }, + discovery: { + zen: { + ping: { + unicast: { + hosts: [ 'localhost:9201', 'localhost:9202' ] + } + } + } + } + }] + }, + }, + test: { options: { directory: resolve(directory, 'test'), @@ -37,10 +92,20 @@ module.exports = function (grunt) { }, cluster: { name: 'esvm-test' + }, + discovery: { + zen: { + ping: { + unicast: { + hosts: [ `localhost:${serverConfig.servers.elasticsearch.port}` ] + } + } + } } } } }, + ui: { options: { directory: resolve(directory, 'test'), @@ -51,10 +116,20 @@ module.exports = function (grunt) { }, cluster: { name: 'esvm-ui' + }, + discovery: { + zen: { + ping: { + unicast: { + hosts: [ `localhost:${serverConfig.servers.elasticsearch.port}` ] + } + } + } } } } }, + withPlugins: { options: { version: '2.1.0',