Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 53 additions & 20 deletions src/core_plugins/elasticsearch/lib/__tests__/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import Boom from 'boom';
describe('plugins/elasticsearch', function () {
describe('cluster', function () {
let cluster;
let client;
let config = {
url: 'http://localhost:9200',
ssl: { verify: false },
Expand Down Expand Up @@ -44,56 +43,90 @@ describe('plugins/elasticsearch', function () {
expect(localConfig.requestHeadersWhitelist.length).to.not.equal(config.requestHeadersWhitelist);
});

describe('callAsKibanaUserFactory', () => {
let callAsKibanaUser;
describe('adding a plugin', () => {
const plugin = (Client, config, components) => {
Client.prototype.marco = () => {
return Promise.resolve('polo');
};
};

beforeEach(() => {
client = sinon.stub();
set(client, 'nodes.info', sinon.stub().returns(Promise.resolve()));
cluster.addClientPlugins([plugin]);
});

it('persists previous plugins', () => {
const pluginTwo = (Client, config, components) => {
Client.prototype.foo = () => {
return Promise.resolve('bar');
};
};

expect(cluster.config().plugins).to.have.length(1);
expect(cluster.config().plugins[0]).to.be(plugin);

cluster.addClientPlugins([pluginTwo]);

expect(cluster.config().plugins).to.have.length(2);
expect(cluster.config().plugins).to.eql([plugin, pluginTwo]);
});

it('is available for callAsKibanUser', async () => {
const marco = await cluster.callAsKibanaUser('marco');
expect(marco).to.eql('polo');
});

it('is available for callWithRequest', async () => {
const marco = await cluster.callWithRequest({}, 'marco');
expect(marco).to.eql('polo');
});
});

callAsKibanaUser = cluster.callAsKibanaUserFactory(client);
describe('callAsKibanaUser', () => {
let client;

beforeEach(() => {
client = cluster._client = sinon.stub();
set(client, 'nodes.info', sinon.stub().returns(Promise.resolve()));
});

it('should return a function', () => {
expect(callAsKibanaUser).to.be.a('function');
expect(cluster.callAsKibanaUser).to.be.a('function');
});

it('throws an error for an invalid endpoint', () => {
const fn = partial(callAsKibanaUser, 'foo');
const fn = partial(cluster.callAsKibanaUser, 'foo');
expect(fn).to.throwException(/called with an invalid endpoint: foo/);
});

it('calls the client with params', () => {
const params = { foo: 'Foo' };
callAsKibanaUser('nodes.info', params);
cluster.callAsKibanaUser('nodes.info', params);

sinon.assert.calledOnce(client.nodes.info);
expect(client.nodes.info.getCall(0).args[0]).to.eql(params);
});
});

describe('callWithRequestFactory', () => {
let callWithRequest;
describe('callWithRequest', () => {
let client;

beforeEach(() => {
client = sinon.stub();
client = cluster._noAuthClient = sinon.stub();
set(client, 'nodes.info', sinon.stub().returns(Promise.resolve()));

callWithRequest = cluster.callWithRequestFactory(client);
});

it('should return a function', () => {
expect(callWithRequest).to.be.a('function');
expect(cluster.callWithRequest).to.be.a('function');
});

it('throws an error for an invalid endpoint', () => {
const fn = partial(callWithRequest, {}, 'foo');
const fn = partial(cluster.callWithRequest, {}, 'foo');
expect(fn).to.throwException(/called with an invalid endpoint: foo/);
});

it('calls the client with params', () => {
const params = { foo: 'Foo' };
callWithRequest({}, 'nodes.info', params);
cluster.callWithRequest({}, 'nodes.info', params);

sinon.assert.calledOnce(client.nodes.info);
expect(client.nodes.info.getCall(0).args[0]).to.eql(params);
Expand All @@ -103,7 +136,7 @@ describe('plugins/elasticsearch', function () {
const headers = { authorization: 'Basic TEST' };
const request = { headers: Object.assign({}, headers, { foo: 'Foo' }) };

callWithRequest(request, 'nodes.info');
cluster.callWithRequest(request, 'nodes.info');

sinon.assert.calledOnce(client.nodes.info);
expect(client.nodes.info.getCall(0).args[0]).to.eql({
Expand All @@ -122,7 +155,7 @@ describe('plugins/elasticsearch', function () {

it('ensures WWW-Authenticate header', async () => {
set(client, 'mock.401', sinon.stub().returns(Promise.reject(error)));
await callWithRequest({}, 'mock.401', {}, { wrap401Errors: true }).catch(handler);
await cluster.callWithRequest({}, 'mock.401', {}, { wrap401Errors: true }).catch(handler);

sinon.assert.calledOnce(handler);
expect(handler.getCall(0).args[0].output.headers['WWW-Authenticate']).to.eql('Basic realm="Authorization Required"');
Expand All @@ -131,7 +164,7 @@ describe('plugins/elasticsearch', function () {
it('persists WWW-Authenticate header', async () => {
set(error, 'body.error.header[WWW-Authenticate]', 'Basic realm="Test"');
set(client, 'mock.401', sinon.stub().returns(Promise.reject(error)));
await callWithRequest({}, 'mock.401', {}, { wrap401Errors: true }).catch(handler);
await cluster.callWithRequest({}, 'mock.401', {}, { wrap401Errors: true }).catch(handler);

sinon.assert.calledOnce(handler);
expect(handler.getCall(0).args[0].output.headers['WWW-Authenticate']).to.eql('Basic realm="Test"');
Expand Down
140 changes: 77 additions & 63 deletions src/core_plugins/elasticsearch/lib/cluster.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import elasticsearch from 'elasticsearch';
import { bindKey, partial, get, set, isEmpty, cloneDeep } from 'lodash';
import { get, set, isEmpty, cloneDeep } from 'lodash';
import toPath from 'lodash/internal/toPath';
import Promise from 'bluebird';
import Boom from 'boom';

import createClient from './create_client';
Expand All @@ -12,89 +11,104 @@ export default class Cluster {
this._config = Object.assign({}, config);
this.errors = elasticsearch.errors;

this._client = this.createClient();
this.callAsKibanaUser = this.callAsKibanaUserFactory(this._client);

this._noAuthClient = this.createClient({ auth: false });
this.callWithRequest = this.callWithRequestFactory(this._noAuthClient);
createClients.call(this);

return this;
}

/**
* callAsKibanaUser
* callWithRequest
*
* Makes a call to ES using the credentials in kibana.yml
* Performs a call to ES, passing through whitelisted headers in the request
*
* The whitelisted headers are defined in the config under _requestHeadersWhitelist_
*
* @param {Object|undefined} req - The request object
* @param {string} endpoint
* @param {Object} clientParams
* @param {Object} options
* @param {boolean} options.wrap401Errors
* @returns {Promise}
*/
callAsKibanaUserFactory(client) {
return partial(this.callWithRequestFactory(client), undefined);
callWithRequest = (req = {}, endpoint, clientParams = {}, options = {}) => {
if (req.headers) {
const filteredHeaders = filterHeaders(req.headers, this.config('requestHeadersWhitelist'));
set(clientParams, 'headers', filteredHeaders);
}

return callAPI(this._noAuthClient, endpoint, clientParams, options);
}

callWithRequestFactory(client) {
/**
* callWithRequest
*
* Makes a call to ES, passing through whitelisted headers in the request
*
* The whitelisted headers are defined in the config under _requestHeadersWhitelist_
*
* @param {Object|undefined} req - The request object
* @param {string} endpoint
* @param {Object} clientParams
* @param {Object} options
* @param {boolean} options.wrap401Errors
* @returns {Promise}
*/
return (req = {}, endpoint, clientParams = {}, options = {}) => {
const wrap401Errors = options.wrap401Errors !== false;

if (req.headers) {
const filteredHeaders = filterHeaders(req.headers, this.config('requestHeadersWhitelist'));
set(clientParams, 'headers', filteredHeaders);
}

const clientPath = toPath(endpoint);
const api = get(client, clientPath);

let apiContext = get(client, clientPath.slice(0, -1));
if (isEmpty(apiContext)) {
apiContext = client;
}

if (!api) {
throw new Error(`called with an invalid endpoint: ${endpoint}`);
}

return api.call(apiContext, clientParams).catch((err) => {
if (!wrap401Errors || err.statusCode !== 401) {
return Promise.reject(err);
}

const boomError = Boom.wrap(err, err.statusCode);
const wwwAuthHeader = get(err, 'body.error.header[WWW-Authenticate]');
boomError.output.headers['WWW-Authenticate'] = wwwAuthHeader || 'Basic realm="Authorization Required"';

throw boomError;
});
};
/**
* callAsKibanaUser
*
* Performs a call to ES using the credentials in kibana.yml
*
* @param {string} endpoint
* @param {Object} clientParams
* @param {Object} options
* @returns {Promise}
*/

callAsKibanaUser = (endpoint, clientParams = {}, options = {}) => {
return callAPI(this._client, endpoint, clientParams, options);
}

config(path) {
config = (path) => {
return cloneDeep(path ? get(this._config, path) : this._config);
}

createClient(options = {}) {
return createClient(Object.assign({}, this.config(), options));
addClientPlugins(plugins = []) {
this.close(); // close existing client connections

if (Array.isArray(this._config.plugins)) {
this._config.plugins = this._config.plugins.concat(plugins);
} else {
this._config.plugins = plugins;
}

createClients.call(this);
}

close() {
this._client.close();
this._noAuthClient.close();
if (this._client) {
this._client.close();
}

if (this._noAuthClient) {
this._noAuthClient.close();
}
}
}

function callAPI(client, endpoint, clientParams = {}, options = {}) {
const wrap401Errors = options.wrap401Errors !== false;
const clientPath = toPath(endpoint);
const api = get(client, clientPath);

let apiContext = get(client, clientPath.slice(0, -1));
if (isEmpty(apiContext)) {
apiContext = client;
}

if (!api) {
throw new Error(`called with an invalid endpoint: ${endpoint}`);
}

return api.call(apiContext, clientParams).catch((err) => {
if (!wrap401Errors || err.statusCode !== 401) {
return Promise.reject(err);
}

const boomError = Boom.wrap(err, err.statusCode);
const wwwAuthHeader = get(err, 'body.error.header[WWW-Authenticate]');
boomError.output.headers['WWW-Authenticate'] = wwwAuthHeader || 'Basic realm="Authorization Required"';

throw boomError;
});
}

function createClients() {
this._client = createClient(this.config());
this._noAuthClient = createClient(Object.assign({}, this.config(), { auth: false }));
}