Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/plugins/code/server/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ async function initCodeNode(server: Server, serverOptions: ServerOptions, log: L
log,
esClient,
[lspIndexerFactory],
serverOptions,
cancellationService
).bind();

Expand Down
9 changes: 5 additions & 4 deletions x-pack/plugins/code/server/queue/abstract_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export abstract class AbstractWorker implements Worker {
constructor(protected readonly queue: Esqueue, protected readonly log: Logger) {}

// Assemble jobs, for now most of the job object construction should be the same.
public createJob(payload: any, options: any): Job {
public async createJob(payload: any, options: any): Promise<Job> {
const timestamp = moment().valueOf();
if (options.timeout !== undefined && options.timeout !== null) {
// If the job explicitly specify the timeout, then honor it.
Expand All @@ -35,11 +35,12 @@ export abstract class AbstractWorker implements Worker {
};
} else {
// Otherwise, use a default timeout.
const timeout = await this.getTimeoutMs(payload);
return {
payload,
options: {
...options,
timeout: this.getTimeoutMs(payload),
timeout,
},
timestamp,
};
Expand All @@ -55,7 +56,7 @@ export abstract class AbstractWorker implements Worker {

// Enqueue the job.
public async enqueueJob(payload: any, options: any) {
const job: Job = this.createJob(payload, options);
const job: Job = await this.createJob(payload, options);
return new Promise((resolve, reject) => {
const jobInternal: JobInternal<Job> = this.queue.addJob(this.id, job, job.options);
jobInternal.on(esqueueEvents.EVENT_JOB_CREATED, async (createdJob: JobInternal<Job>) => {
Expand Down Expand Up @@ -139,7 +140,7 @@ export abstract class AbstractWorker implements Worker {
});
}

protected getTimeoutMs(payload: any) {
protected async getTimeoutMs(payload: any) {
// Set to 1 hour by default. Override this function for sub classes if necessary.
return moment.duration(1, 'hour').asMilliseconds();
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/code/server/queue/delete_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ export class DeleteWorker extends AbstractWorker {
return await this.objectClient.updateRepositoryDeleteStatus(uri, p);
}

protected getTimeoutMs(_: any) {
protected async getTimeoutMs(_: any) {
return (
moment.duration(1, 'hour').asMilliseconds() + moment.duration(10, 'minutes').asMilliseconds()
);
Expand Down
5 changes: 5 additions & 0 deletions x-pack/plugins/code/server/queue/index_worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import sinon from 'sinon';
import { IndexerFactory } from '../indexer';
import { AnyObject, CancellationToken, EsClient, Esqueue } from '../lib/esqueue';
import { Logger } from '../log';
import { ServerOptions } from '../server_options';
import { ConsoleLoggerFactory } from '../utils/console_logger_factory';
import { CancellationSerivce } from './cancellation_service';
import { IndexWorker } from './index_worker';
Expand Down Expand Up @@ -58,6 +59,7 @@ test('Execute index job.', async () => {
log,
{} as EsClient,
[(indexerFactory as any) as IndexerFactory],
{} as ServerOptions,
(cancellationService as any) as CancellationSerivce
);

Expand Down Expand Up @@ -110,6 +112,7 @@ test('Execute index job and then cancel.', async () => {
log,
{} as EsClient,
[(indexerFactory as any) as IndexerFactory],
{} as ServerOptions,
(cancellationService as any) as CancellationSerivce
);

Expand Down Expand Up @@ -146,6 +149,7 @@ test('On index job enqueued.', async () => {
log,
esClient as EsClient,
[],
{} as ServerOptions,
{} as CancellationSerivce
);

Expand Down Expand Up @@ -173,6 +177,7 @@ test('On index job completed.', async () => {
log,
esClient as EsClient,
[],
{} as ServerOptions,
{} as CancellationSerivce
);

Expand Down
23 changes: 20 additions & 3 deletions x-pack/plugins/code/server/queue/index_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import moment from 'moment';

import { IndexStats, IndexWorkerResult, RepositoryUri, WorkerProgress } from '../../model';
import { GitOperations } from '../git_operations';
import { IndexerFactory, IndexProgress } from '../indexer';
import { EsClient, Esqueue } from '../lib/esqueue';
import { Logger } from '../log';
import { RepositoryObjectClient } from '../search';
import { ServerOptions } from '../server_options';
import { aggregateIndexStats } from '../utils/index_stats_aggregator';
import { AbstractWorker } from './abstract_worker';
import { CancellationSerivce } from './cancellation_service';
Expand All @@ -25,6 +27,7 @@ export class IndexWorker extends AbstractWorker {
protected readonly log: Logger,
protected readonly client: EsClient,
protected readonly indexerFactories: IndexerFactory[],
protected readonly options: ServerOptions,
private readonly cancellationService: CancellationSerivce
) {
super(queue, log);
Expand Down Expand Up @@ -87,9 +90,23 @@ export class IndexWorker extends AbstractWorker {
}
}

protected getTimeoutMs(_: any) {
// TODO(mengwei): query object/file number of a repo and come up with a number in here.
return moment.duration(5, 'hour').asMilliseconds();
protected async getTimeoutMs(payload: any) {
try {
const gitOperator = new GitOperations(this.options.repoPath);
const totalCount = await gitOperator.countRepoFiles(payload.uri, 'head');
let timeout = moment.duration(1, 'hour').asMilliseconds();
if (totalCount > 0) {
// timeout = ln(file_count) in hour
// e.g. 10 files -> 2.3 hours, 100 files -> 4.6 hours, 1000 -> 6.9 hours, 10000 -> 9.2 hours
timeout = moment.duration(Math.log(totalCount), 'hour').asMilliseconds();
}
this.log.info(`Set index job timeout to be ${timeout} ms.`);
return timeout;
} catch (error) {
this.log.error(`Get repo file total count error.`);
this.log.error(error);
throw error;
}
}

private getProgressReporter(
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/code/server/queue/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import { Job } from './job';

export interface Worker {
createJob(payload: any, options: any): Job;
createJob(payload: any, options: any): Promise<Job>;
executeJob(job: Job): void;
enqueueJob(payload: any, options: any): void;

Expand Down