Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ import { ClusterNodeEndpoint } from './cluster_node_endpoint';
* - serve request locally if the requested resource is on the local node, otherwise reject it
*/
export class ClusterNodeAdapter implements ServiceHandlerAdapter {
private readonly clusterService: ClusterService;
private readonly clusterMembershipService: ClusterMembershipService;
readonly clusterService: ClusterService;
readonly clusterMembershipService: ClusterMembershipService;
private readonly schedulerService: ResourceSchedulerService;
private readonly handlers: Map<any, any> = new Map<any, any>();
// used to forward requests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export class ClusterResourceLocator implements ResourceLocator {
constructor(
private readonly clusterService: ClusterService,
private readonly clusterMembershipService: ClusterMembershipService,
// @ts-ignore
private readonly schedulerService: ResourceSchedulerService
) {}

Expand Down Expand Up @@ -54,12 +55,12 @@ export class ClusterResourceLocator implements ResourceLocator {
);
}

/**
* Return undefined to let NodeRepositoriesService enqueue the clone job in cluster mode.
*/
async allocate(req: Request, resource: string): Promise<Endpoint | undefined> {
// make the cluster service synchronize the meta data and allocate new resources to nodes
await this.clusterService.pollClusterState();
// allocate the repository to nodes
await this.schedulerService.allocateUnassigned();
// the resource should be assigned to a node for now, if possible
return this.locate(req, resource);
return undefined;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

import util from 'util';
import { ClusterMetadata } from './cluster_meta';
import { EsClient } from '../../lib/esqueue';
import { RepositoryObjectClient } from '../../search';
Expand Down Expand Up @@ -79,7 +80,11 @@ export class ClusterService {

private async callClusterStateListeners(event: ClusterStateEvent) {
for (const applier of this.clusterStateListeners) {
await applier.onClusterStateChanged(event);
try {
await applier.onClusterStateChanged(event);
} catch (e) {
this.logger.error(`Failed to apply cluster state ${util.inspect(event)}`);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import sinon from 'sinon';
import { Logger } from '../../log';
import { ConsoleLoggerFactory } from '../../utils/console_logger_factory';
import { NodeRepositoriesService } from './node_repositories_service';
import { ClusterService } from './cluster_service';
import { ClusterMembershipService } from './cluster_membership_service';
import { CodeNode, CodeNodes } from './code_nodes';
import { emptyAsyncFunc } from '../../test_utils';
import { CloneWorker } from '../../queue';
import { ClusterStateEvent } from './cluster_state_event';
import { ClusterState } from './cluster_state';
import { ClusterMetadata } from './cluster_meta';
import { Repository } from '../../../model';
import { ResourceAssignment, RoutingTable } from './routing_table';

const log: Logger = new ConsoleLoggerFactory().getLogger(['test']);

afterEach(() => {
sinon.restore();
});

const cloneWorker = ({
enqueueJob: emptyAsyncFunc,
} as any) as CloneWorker;

const clusterService = {} as ClusterService;

const testNodes = [
{ id: 'node1', address: 'http://node1' } as CodeNode,
{ id: 'node2', address: 'http://node2' } as CodeNode,
];

const testRepos = [
{ uri: 'test1', url: 'http://test1' } as Repository,
{ uri: 'test2', url: 'http://test2' } as Repository,
];

test('Enqueue clone job after new repository is added to the local node', async () => {
const enqueueJobSpy = sinon.spy(cloneWorker, 'enqueueJob');

const clusterMembershipService = {
localNode: testNodes[0],
} as ClusterMembershipService;

const nodeService = new NodeRepositoriesService(
log,
clusterService,
clusterMembershipService,
cloneWorker
);

// event with no new repositories
let event = new ClusterStateEvent(ClusterState.empty(), ClusterState.empty());
await nodeService.onClusterStateChanged(event);
expect(enqueueJobSpy.called).toBeFalsy();
expect(nodeService.localRepos.size).toBe(0);

// event with a new repository
event = new ClusterStateEvent(
new ClusterState(
new ClusterMetadata([testRepos[0]]),
new RoutingTable([
{ nodeId: testNodes[0].id, resource: testRepos[0].uri } as ResourceAssignment,
]),
new CodeNodes([testNodes[0]])
),
event.current
);
await nodeService.onClusterStateChanged(event);
expect(enqueueJobSpy.calledOnce).toBeTruthy();
expect(nodeService.localRepos.size).toBe(1);

// event with removed repository
event = new ClusterStateEvent(ClusterState.empty(), event.current);
await nodeService.onClusterStateChanged(event);
expect(enqueueJobSpy.calledOnce).toBeTruthy();
expect(nodeService.localRepos.size).toBe(0);

// event with two added repositories
event = new ClusterStateEvent(
new ClusterState(
new ClusterMetadata([testRepos[0], testRepos[1]]),
new RoutingTable([
{ nodeId: testNodes[0].id, resource: testRepos[0].uri } as ResourceAssignment,
{ nodeId: testNodes[0].id, resource: testRepos[1].uri } as ResourceAssignment,
]),
new CodeNodes([testNodes[0]])
),
event.current
);
await nodeService.onClusterStateChanged(event);
expect(enqueueJobSpy.callCount).toBe(3);
expect(nodeService.localRepos.size).toBe(2);

// event with removed repository
event = new ClusterStateEvent(ClusterState.empty(), event.current);
await nodeService.onClusterStateChanged(event);
expect(enqueueJobSpy.callCount).toBe(3);
expect(nodeService.localRepos.size).toBe(0);

// event with two added repositories, one for the other node
event = new ClusterStateEvent(
new ClusterState(
new ClusterMetadata([testRepos[0], testRepos[1]]),
new RoutingTable([
{ nodeId: testNodes[0].id, resource: testRepos[0].uri } as ResourceAssignment,
{ nodeId: testNodes[1].id, resource: testRepos[1].uri } as ResourceAssignment,
]),
new CodeNodes([testNodes[0]])
),
event.current
);
await nodeService.onClusterStateChanged(event);
expect(enqueueJobSpy.callCount).toBe(4);
expect(nodeService.localRepos.size).toBe(1);
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import util from 'util';
import { ClusterService, ClusterStateListener } from './cluster_service';
import { ClusterStateEvent } from './cluster_state_event';
import { ClusterMembershipService } from './cluster_membership_service';
import { CloneWorker } from '../../queue';
import { Repository, RepositoryUri } from '../../../model';
import { Logger } from '../../log';
import { RepoState } from '../../../public/actions';

export class NodeRepositoriesService implements ClusterStateListener {
// visible for test
readonly localRepos = new Map<RepositoryUri, LocalRepository>();
private readonly localNodeId = this.clusterMembershipService.localNode.id;

constructor(
private readonly log: Logger,
private readonly clusterService: ClusterService,
private readonly clusterMembershipService: ClusterMembershipService,
private readonly cloneWorker: CloneWorker
) {}

public async start() {
/**
* we can add locally exists repositories to localRepos when the service is started to avoid unnecessarily add clone
* tasks for them, but for now it's OK because clone job is idempotent.
*/
this.clusterService.addClusterStateListener(this);
}

public async stop() {}

async onClusterStateChanged(event: ClusterStateEvent): Promise<void> {
// compare repositories in the cluster state with repositories in the local node, and remove
const repos = event.current.getNodeRepositories(this.clusterMembershipService.localNode.id);
const localNewRepos = repos.filter(repo => !this.localRepos.has(repo.uri));
const localRemovedRepos = Array.from(this.localRepos.values()).filter(
repo =>
event.current.routingTable.getNodeIdByRepositoryURI(repo.metadata.uri) !== this.localNodeId
);
for (const localNewRepo of localNewRepos) {
this.log.info(
`Repository added to node [${this.localNodeId}]: ${util.inspect(localNewRepo)}`
);
await this.cloneWorker.enqueueJob({ url: localNewRepo.url }, {});
this.localRepos.set(localNewRepo.uri, {
metadata: localNewRepo,
currentState: RepoState.CLONING,
});
}
// TODO remove the stale local repo after the Kibana HA is ready
for (const localRemovedRepo of localRemovedRepos) {
this.log.info(
`Repository removed from node [${this.localNodeId}]: ${util.inspect(
localRemovedRepo.metadata
)}`
);
this.localRepos.delete(localRemovedRepo.metadata.uri);
}
}
}

interface LocalRepository {
metadata: Repository;
currentState: RepoState;
}
9 changes: 6 additions & 3 deletions x-pack/legacy/plugins/code/server/init_workers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,16 @@ export function initWorkers(
);

// Initialize schedulers.
const cloneScheduler = new CloneScheduler(cloneWorker, serverOptions, esClient, log);
const updateScheduler = new UpdateScheduler(updateWorker, serverOptions, esClient, log);
const indexScheduler = new IndexScheduler(indexWorker, serverOptions, esClient, log);
updateScheduler.start();
indexScheduler.start();
// Check if the repository is local on the file system.
// This should be executed once at the startup time of Kibana.
cloneScheduler.schedule();
return { indexScheduler, updateScheduler };
// Ignored in cluster mode, leave it to the node level control loop
if (!serverOptions.clusterEnabled) {
const cloneScheduler = new CloneScheduler(cloneWorker, serverOptions, esClient, log);
cloneScheduler.schedule();
}
return { indexScheduler, updateScheduler, cloneWorker, deleteWorker, indexWorker, updateWorker };
}
24 changes: 21 additions & 3 deletions x-pack/legacy/plugins/code/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import { initLocalService } from './init_local';
import { initQueue } from './init_queue';
import { initWorkers } from './init_workers';
import { ClusterNodeAdapter } from './distributed/cluster/cluster_node_adapter';
import { NodeRepositoriesService } from './distributed/cluster/node_repositories_service';

export class CodePlugin {
private isCodeNode = false;
Expand All @@ -66,6 +67,7 @@ export class CodePlugin {
private updateScheduler: UpdateScheduler | null = null;
private lspService: LspService | null = null;
private codeServices: CodeServices | null = null;
private nodeService: NodeRepositoriesService | null = null;

constructor(initializerContext: PluginInitializerContext) {
this.log = {} as Logger;
Expand Down Expand Up @@ -153,10 +155,15 @@ export class CodePlugin {
server,
this.log
);
const codeServices = new CodeServices(
new ClusterNodeAdapter(codeServerRouter, this.log, this.serverOptions, esClient)
const clusterNodeAdapter = new ClusterNodeAdapter(
codeServerRouter,
this.log,
this.serverOptions,
esClient
);

const codeServices = new CodeServices(clusterNodeAdapter);

this.queue = initQueue(server, this.log, esClient);

const { gitOps, lspService } = initLocalService(
Expand All @@ -169,7 +176,7 @@ export class CodePlugin {
);
this.lspService = lspService;
this.gitOps = gitOps;
const { indexScheduler, updateScheduler } = initWorkers(
const { indexScheduler, updateScheduler, cloneWorker } = initWorkers(
server,
this.log,
esClient,
Expand All @@ -182,6 +189,14 @@ export class CodePlugin {
this.indexScheduler = indexScheduler;
this.updateScheduler = updateScheduler;

this.nodeService = new NodeRepositoriesService(
this.log,
clusterNodeAdapter.clusterService,
clusterNodeAdapter.clusterMembershipService,
cloneWorker
);
await this.nodeService.start();

// Execute index version checking and try to migrate index data if necessary.
await tryMigrateIndices(esClient, this.log);

Expand Down Expand Up @@ -240,6 +255,9 @@ export class CodePlugin {
if (this.codeServices) {
await this.codeServices.stop();
}
if (this.nodeService) {
await this.nodeService.stop();
}
}

private async initNonCodeNode(url: string, core: CoreSetup) {
Expand Down