Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
export { ScopedClusterClient } from './src/scoped_cluster_client';
export { ClusterClient } from './src/cluster_client';
export { configureClient } from './src/configure_client';
export { type AgentStore, AgentManager, type NetworkAgent } from './src/agent_manager';
export { type AgentStatsProvider, AgentManager, type NetworkAgent } from './src/agent_manager';
export { getRequestDebugMeta, getErrorMessage } from './src/log_query_and_deprecation';
export {
PRODUCT_RESPONSE_HEADER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
* Side Public License, v 1.
*/

import { AgentManager } from './agent_manager';
import { Agent as HttpAgent } from 'http';
import { Agent as HttpsAgent } from 'https';
import type { ElasticsearchClientsMetrics } from '@kbn/core-metrics-server';
import { loggerMock, type MockedLogger } from '@kbn/logging-mocks';
import { getAgentsSocketsStatsMock } from './get_agents_sockets_stats.test.mocks';
import { AgentManager } from './agent_manager';

jest.mock('http');
jest.mock('https');
Expand All @@ -17,14 +20,20 @@ const HttpAgentMock = HttpAgent as jest.Mock<HttpAgent>;
const HttpsAgentMock = HttpsAgent as jest.Mock<HttpsAgent>;

describe('AgentManager', () => {
let logger: MockedLogger;

beforeEach(() => {
logger = loggerMock.create();
});

afterEach(() => {
HttpAgentMock.mockClear();
HttpsAgentMock.mockClear();
});

describe('#getAgentFactory()', () => {
it('provides factories which are different at each call', () => {
const agentManager = new AgentManager();
const agentManager = new AgentManager(logger);
const agentFactory1 = agentManager.getAgentFactory();
const agentFactory2 = agentManager.getAgentFactory();
expect(agentFactory1).not.toEqual(agentFactory2);
Expand All @@ -36,7 +45,7 @@ describe('AgentManager', () => {
HttpAgentMock.mockImplementationOnce(() => mockedHttpAgent);
const mockedHttpsAgent = new HttpsAgent();
HttpsAgentMock.mockImplementationOnce(() => mockedHttpsAgent);
const agentManager = new AgentManager();
const agentManager = new AgentManager(logger);
const agentFactory = agentManager.getAgentFactory();
const httpAgent = agentFactory({ url: new URL('http://elastic-node-1:9200') });
const httpsAgent = agentFactory({ url: new URL('https://elastic-node-1:9200') });
Expand All @@ -45,7 +54,7 @@ describe('AgentManager', () => {
});

it('takes into account the provided configurations', () => {
const agentManager = new AgentManager();
const agentManager = new AgentManager(logger);
const agentFactory = agentManager.getAgentFactory({
maxTotalSockets: 1024,
scheduling: 'fifo',
Expand All @@ -68,7 +77,7 @@ describe('AgentManager', () => {
});

it('provides Agents that match the URLs protocol', () => {
const agentManager = new AgentManager();
const agentManager = new AgentManager(logger);
const agentFactory = agentManager.getAgentFactory();
agentFactory({ url: new URL('http://elastic-node-1:9200') });
expect(HttpAgent).toHaveBeenCalledTimes(1);
Expand All @@ -79,7 +88,7 @@ describe('AgentManager', () => {
});

it('provides the same Agent if URLs use the same protocol', () => {
const agentManager = new AgentManager();
const agentManager = new AgentManager(logger);
const agentFactory = agentManager.getAgentFactory();
const agent1 = agentFactory({ url: new URL('http://elastic-node-1:9200') });
const agent2 = agentFactory({ url: new URL('http://elastic-node-2:9200') });
Expand All @@ -92,7 +101,7 @@ describe('AgentManager', () => {
});

it('dereferences an agent instance when the agent is closed', () => {
const agentManager = new AgentManager();
const agentManager = new AgentManager(logger);
const agentFactory = agentManager.getAgentFactory();
const agent = agentFactory({ url: new URL('http://elastic-node-1:9200') });
// eslint-disable-next-line dot-notation
Expand All @@ -105,7 +114,7 @@ describe('AgentManager', () => {

describe('two agent factories', () => {
it('never provide the same Agent instance even if they use the same type', () => {
const agentManager = new AgentManager();
const agentManager = new AgentManager(logger);
const agentFactory1 = agentManager.getAgentFactory();
const agentFactory2 = agentManager.getAgentFactory();
const agent1 = agentFactory1({ url: new URL('http://elastic-node-1:9200') });
Expand All @@ -115,20 +124,34 @@ describe('AgentManager', () => {
});
});

describe('#getAgents()', () => {
it('returns the created HTTP and HTTPs Agent instances', () => {
const agentManager = new AgentManager();
const agentFactory1 = agentManager.getAgentFactory();
const agentFactory2 = agentManager.getAgentFactory();
const agent1 = agentFactory1({ url: new URL('http://elastic-node-1:9200') });
const agent2 = agentFactory2({ url: new URL('http://elastic-node-1:9200') });
const agent3 = agentFactory1({ url: new URL('https://elastic-node-1:9200') });
const agent4 = agentFactory2({ url: new URL('https://elastic-node-1:9200') });
describe('#getAgentsStats()', () => {
it('returns the stats of the agents', () => {
const agentManager = new AgentManager(logger);
const metrics: ElasticsearchClientsMetrics = {
totalQueuedRequests: 0,
totalIdleSockets: 100,
totalActiveSockets: 400,
};
getAgentsSocketsStatsMock.mockReturnValue(metrics);

expect(agentManager.getAgentsStats()).toStrictEqual(metrics);
});

it('warns when there are queued requests (requests unassigned to any socket)', () => {
const agentManager = new AgentManager(logger);
const metrics: ElasticsearchClientsMetrics = {
totalQueuedRequests: 2,
totalIdleSockets: 100, // There may be idle sockets when many clients are initialized. It should not be taken as an indicator of health.
totalActiveSockets: 400,
};
getAgentsSocketsStatsMock.mockReturnValue(metrics);

const agents = agentManager.getAgents();
expect(agentManager.getAgentsStats()).toStrictEqual(metrics);

expect(agents.size).toEqual(4);
expect([...agents]).toEqual(expect.arrayContaining([agent1, agent2, agent3, agent4]));
expect(logger.warn).toHaveBeenCalledTimes(1);
expect(logger.warn).toHaveBeenCalledWith(
'There are 2 queued requests. If this number is constantly high, consider scaling Kibana horizontally or increasing "elasticsearch.maxSockets" in the config.'
);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import { Agent as HttpAgent, type AgentOptions } from 'http';
import { Agent as HttpsAgent } from 'https';
import type { ConnectionOptions, HttpAgentOptions } from '@elastic/elasticsearch';
import type { Logger } from '@kbn/logging';
import type { ElasticsearchClientsMetrics } from '@kbn/core-metrics-server';
import { getAgentsSocketsStats } from './get_agents_sockets_stats';

const HTTPS = 'https:';

Expand All @@ -19,8 +22,14 @@ export interface AgentFactoryProvider {
getAgentFactory(agentOptions?: HttpAgentOptions): AgentFactory;
}

export interface AgentStore {
getAgents(): Set<NetworkAgent>;
/**
* Exposes the APIs to fetch stats of the existing agents.
*/
export interface AgentStatsProvider {
/**
* Returns the {@link ElasticsearchClientsMetrics}, to understand the load on the Elasticsearch HTTP agents.
*/
getAgentsStats(): ElasticsearchClientsMetrics;
}

/**
Expand All @@ -34,10 +43,10 @@ export interface AgentStore {
* exposes methods that can modify the underlying pools, effectively impacting the connections of other Clients.
* @internal
**/
export class AgentManager implements AgentFactoryProvider, AgentStore {
private agents: Set<HttpAgent>;
export class AgentManager implements AgentFactoryProvider, AgentStatsProvider {
private readonly agents: Set<HttpAgent>;

constructor() {
constructor(private readonly logger: Logger) {
this.agents = new Set();
}

Expand Down Expand Up @@ -69,8 +78,16 @@ export class AgentManager implements AgentFactoryProvider, AgentStore {
};
}

public getAgents(): Set<NetworkAgent> {
return this.agents;
public getAgentsStats(): ElasticsearchClientsMetrics {
const stats = getAgentsSocketsStats(this.agents);

if (stats.totalQueuedRequests > 0) {
this.logger.warn(
`There are ${stats.totalQueuedRequests} queued requests. If this number is constantly high, consider scaling Kibana horizontally or increasing "elasticsearch.maxSockets" in the config.`
);
}

return stats;
Comment on lines 81 to 90
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the calculation logic closer to the Agent Manager so that we didn't need to expose the logger publicly.

At the same time, it feels safer if we are not exposing the agents publicly for anyone else to tamper with.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ describe('ClusterClient', () => {
logger = loggingSystemMock.createLogger();
internalClient = createClient();
scopedClient = createClient();
agentFactoryProvider = new AgentManager();
agentFactoryProvider = new AgentManager(logger);

authHeaders = httpServiceMock.createAuthHeaderStorage();
authHeaders.get.mockImplementation(() => ({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ describe('configureClient', () => {
config = createFakeConfig();
parseClientOptionsMock.mockReturnValue({});
ClientMock.mockImplementation(() => createFakeClient());
agentFactoryProvider = new AgentManager();
agentFactoryProvider = new AgentManager(logger);
});

afterEach(() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
* Side Public License, v 1.
*/

import { NetworkAgent } from '@kbn/core-elasticsearch-client-server-internal';
import type { ElasticsearchClientsMetrics } from '@kbn/core-metrics-server';
import type { NetworkAgent } from './agent_manager';

export const getAgentsSocketsStats = (agents: Set<NetworkAgent>): ElasticsearchClientsMetrics => {
const nodes = new Set<string>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"@kbn/logging-mocks",
"@kbn/core-logging-server-mocks",
"@kbn/core-http-server-mocks",
"@kbn/core-metrics-server",
],
"exclude": [
"target/**/*",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ export type {
DeeplyMockedApi,
ElasticsearchClientMock,
} from './src/mocks';
export { createAgentStoreMock } from './src/agent_manager.mocks';
export { createAgentStatsProviderMock } from './src/agent_manager.mocks';
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
* Side Public License, v 1.
*/

import type { AgentStore, NetworkAgent } from '@kbn/core-elasticsearch-client-server-internal';
import type { AgentStatsProvider } from '@kbn/core-elasticsearch-client-server-internal';

export const createAgentStoreMock = (agents: Set<NetworkAgent> = new Set()): AgentStore => ({
getAgents: jest.fn(() => agents),
export const createAgentStatsProviderMock = (): jest.Mocked<AgentStatsProvider> => ({
getAgentsStats: jest.fn(),
});
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type { AgentManager } from '@kbn/core-elasticsearch-client-server-interna

export const MockClusterClient = jest.fn();
export const MockAgentManager: jest.MockedClass<typeof AgentManager> = jest.fn().mockReturnValue({
getAgents: jest.fn(),
getAgentsStats: jest.fn(),
getAgentFactory: jest.fn(),
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ describe('#setup', () => {
);
});

it('returns an AgentStore as part of the contract', async () => {
it('returns an AgentStatsProvider as part of the contract', async () => {
const setupContract = await elasticsearchService.setup(setupDeps);
expect(typeof setupContract.agentStore.getAgents).toEqual('function');
expect(typeof setupContract.agentStatsProvider.getAgentsStats).toEqual('function');
});

it('esNodeVersionCompatibility$ only starts polling when subscribed to', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export class ElasticsearchService
this.config$ = coreContext.configService
.atPath<ElasticsearchConfigType>('elasticsearch')
.pipe(map((rawConfig) => new ElasticsearchConfig(rawConfig)));
this.agentManager = new AgentManager();
this.agentManager = new AgentManager(this.log.get('agent-manager'));
}

public async preboot(): Promise<InternalElasticsearchServicePreboot> {
Expand Down Expand Up @@ -120,7 +120,9 @@ export class ElasticsearchService
}
this.unauthorizedErrorHandler = handler;
},
agentStore: this.agentManager,
agentStatsProvider: {
getAgentsStats: this.agentManager.getAgentsStats.bind(this.agentManager),
},
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import type {
ElasticsearchServiceStart,
ElasticsearchServiceSetup,
} from '@kbn/core-elasticsearch-server';
import type { AgentStore } from '@kbn/core-elasticsearch-client-server-internal';
import type { AgentStatsProvider } from '@kbn/core-elasticsearch-client-server-internal';
import type { ServiceStatus } from '@kbn/core-status-common';
import type { NodesVersionCompatibility, NodeInfo } from './version_check/ensure_es_version';
import type { ClusterInfo } from './get_cluster_info';
Expand All @@ -22,7 +22,7 @@ export type InternalElasticsearchServicePreboot = ElasticsearchServicePreboot;

/** @internal */
export interface InternalElasticsearchServiceSetup extends ElasticsearchServiceSetup {
agentStore: AgentStore;
agentStatsProvider: AgentStatsProvider;
clusterInfo$: Observable<ClusterInfo>;
esNodesCompatibility$: Observable<NodesVersionCompatibility>;
status$: Observable<ServiceStatus<ElasticsearchStatusMeta>>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
elasticsearchClientMock,
type ClusterClientMock,
type CustomClusterClientMock,
createAgentStoreMock,
createAgentStatsProviderMock,
} from '@kbn/core-elasticsearch-client-server-mocks';
import type {
ElasticsearchClientConfig,
Expand Down Expand Up @@ -95,7 +95,7 @@ const createInternalSetupContractMock = () => {
level: ServiceStatusLevels.available,
summary: 'Elasticsearch is available',
}),
agentStore: createAgentStoreMock(),
agentStatsProvider: createAgentStatsProviderMock(),
};
return internalSetupContract;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,9 @@
* Side Public License, v 1.
*/

import { Agent as HttpAgent } from 'http';
import { Agent as HttpsAgent } from 'https';
import type { ElasticsearchClientsMetrics } from '@kbn/core-metrics-server';
import { createAgentStoreMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { getAgentsSocketsStatsMock } from './get_agents_sockets_stats.test.mocks';
import { createAgentStatsProviderMock } from '@kbn/core-elasticsearch-client-server-mocks';
import { ElasticsearchClientsMetricsCollector } from './elasticsearch_client';
import { getAgentsSocketsStats } from './get_agents_sockets_stats';

jest.mock('@kbn/core-elasticsearch-client-server-internal');

Expand All @@ -24,16 +20,12 @@ export const sampleEsClientMetrics: ElasticsearchClientsMetrics = {

describe('ElasticsearchClientsMetricsCollector', () => {
test('#collect calls getAgentsSocketsStats with the Agents managed by the provided AgentManager', async () => {
const agents = new Set<HttpAgent>([new HttpAgent(), new HttpsAgent()]);
const agentStore = createAgentStoreMock(agents);
getAgentsSocketsStatsMock.mockReturnValueOnce(sampleEsClientMetrics);
const agentStatsProvider = createAgentStatsProviderMock();
agentStatsProvider.getAgentsStats.mockReturnValue(sampleEsClientMetrics);

const esClientsMetricsCollector = new ElasticsearchClientsMetricsCollector(agentStore);
const esClientsMetricsCollector = new ElasticsearchClientsMetricsCollector(agentStatsProvider);
const metrics = await esClientsMetricsCollector.collect();

expect(agentStore.getAgents).toHaveBeenCalledTimes(1);
expect(getAgentsSocketsStats).toHaveBeenCalledTimes(1);
expect(getAgentsSocketsStats).toHaveBeenNthCalledWith(1, agents);
expect(metrics).toEqual(sampleEsClientMetrics);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@
*/

import type { ElasticsearchClientsMetrics, MetricsCollector } from '@kbn/core-metrics-server';
import type { AgentStore } from '@kbn/core-elasticsearch-client-server-internal';
import { getAgentsSocketsStats } from './get_agents_sockets_stats';
import type { AgentStatsProvider } from '@kbn/core-elasticsearch-client-server-internal';

export class ElasticsearchClientsMetricsCollector
implements MetricsCollector<ElasticsearchClientsMetrics>
{
constructor(private readonly agentStore: AgentStore) {}
constructor(private readonly agentStatsProvider: AgentStatsProvider) {}

public async collect(): Promise<ElasticsearchClientsMetrics> {
return await getAgentsSocketsStats(this.agentStore.getAgents());
return await this.agentStatsProvider.getAgentsStats();
}

public reset() {
Expand Down
Loading