Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions x-pack/plugins/task_manager/server/lib/bulk_update_error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export class BulkUpdateError extends Error {
private _statusCode: number;
private _type: string;

constructor({
statusCode,
message = 'Bulk update failed with unknown reason',
type,
}: {
statusCode: number;
message?: string;
type: string;
}) {
super(message);
this._statusCode = statusCode;
this._type = type;
}

public get statusCode() {
return this._statusCode;
}

public get type() {
return this._type;
}
}

export function getBulkUpdateStatusCode(error: Error | BulkUpdateError): number | undefined {
if (Boolean(error && error instanceof BulkUpdateError)) {
return (error as BulkUpdateError).statusCode;
}
}

export function getBulkUpdateErrorType(error: Error | BulkUpdateError): string | undefined {
if (Boolean(error && error instanceof BulkUpdateError)) {
return (error as BulkUpdateError).type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
import { mockLogger } from '../test_utils';
import { CLAIM_STRATEGY_UPDATE_BY_QUERY, CLAIM_STRATEGY_MGET, TaskManagerConfig } from '../config';
import { MsearchError } from './msearch_error';
import { BulkUpdateError } from './bulk_update_error';

describe('createManagedConfiguration()', () => {
let clock: sinon.SinonFakeTimers;
Expand Down Expand Up @@ -280,6 +281,45 @@ describe('createManagedConfiguration()', () => {
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});

test('should decrease configuration at the next interval when a bulkPartialUpdate 429 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(
new BulkUpdateError({ statusCode: 429, message: 'test', type: 'too_many_requests' })
);
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});

test('should decrease configuration at the next interval when a bulkPartialUpdate 500 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(
new BulkUpdateError({ statusCode: 500, message: 'test', type: 'server_error' })
);
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});

test('should decrease configuration at the next interval when a bulkPartialUpdate 503 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(
new BulkUpdateError({ statusCode: 503, message: 'test', type: 'unavailable' })
);
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});

test('should not change configuration at the next interval when other msearch error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(new MsearchError(404));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { isEsCannotExecuteScriptError } from './identify_es_error';
import { CLAIM_STRATEGY_MGET, DEFAULT_CAPACITY, MAX_CAPACITY, TaskManagerConfig } from '../config';
import { TaskCost } from '../task';
import { getMsearchStatusCode } from './msearch_error';
import { getBulkUpdateStatusCode } from './bulk_update_error';

const FLUSH_MARKER = Symbol('flush');
export const ADJUST_THROUGHPUT_INTERVAL = 10 * 1000;
Expand Down Expand Up @@ -169,7 +170,10 @@ function countErrors(errors$: Observable<Error>, countInterval: number): Observa
isEsCannotExecuteScriptError(e) ||
getMsearchStatusCode(e) === 429 ||
getMsearchStatusCode(e) === 500 ||
getMsearchStatusCode(e) === 503
getMsearchStatusCode(e) === 503 ||
getBulkUpdateStatusCode(e) === 429 ||
getBulkUpdateStatusCode(e) === 500 ||
getBulkUpdateStatusCode(e) === 503
)
)
).pipe(
Expand Down
56 changes: 56 additions & 0 deletions x-pack/plugins/task_manager/server/task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1290,6 +1290,62 @@ describe('TaskStore', () => {
);
expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`);
});

test('pushes errors returned by the saved objects client to errors$', async () => {
const task = {
id: '324242',
version: 'WzQsMV0=',
attempts: 3,
};

const firstErrorPromise = store.errors$.pipe(first()).toPromise();

esClient.bulk.mockResolvedValue({
errors: true,
items: [
{
update: {
_id: '1',
_index: 'test-index',
status: 403,
error: { reason: 'Error reason', type: 'cluster_block_exception' },
},
},
],
took: 10,
});

await store.bulkPartialUpdate([task]);

expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Error reason]`);
});

test('pushes errors for the malformed responses to errors$', async () => {
const task = {
id: '324242',
version: 'WzQsMV0=',
attempts: 3,
};

const firstErrorPromise = store.errors$.pipe(first()).toPromise();

esClient.bulk.mockResolvedValue({
errors: false,
items: [
{
update: {
_index: 'test-index',
status: 200,
},
},
],
took: 10,
});

await store.bulkPartialUpdate([task]);

expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: malformed response]`);
});
});

describe('remove', () => {
Expand Down
17 changes: 16 additions & 1 deletion x-pack/plugins/task_manager/server/task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import { claimSort } from './queries/mark_available_tasks_as_claimed';
import { MAX_PARTITIONS } from './lib/task_partitioner';
import { ErrorOutput } from './lib/bulk_operation_buffer';
import { MsearchError } from './lib/msearch_error';
import { BulkUpdateError } from './lib/bulk_update_error';

export interface StoreOpts {
esClient: ElasticsearchClient;
Expand Down Expand Up @@ -386,11 +387,19 @@ export class TaskStore {
}

return result.items.map((item) => {
const malformedResponseType = 'malformed response';

if (!item.update || !item.update._id) {
const err = new BulkUpdateError({
message: malformedResponseType,
type: malformedResponseType,
statusCode: 500,
});
this.errors$.next(err);
return asErr({
type: 'task',
id: 'unknown',
error: { type: 'malformed response' },
error: { type: malformedResponseType },
});
}

Expand All @@ -399,6 +408,12 @@ export class TaskStore {
: item.update._id;

if (item.update?.error) {
const err = new BulkUpdateError({
message: item.update.error.reason,
type: item.update.error.type,
statusCode: item.update.status,
});
this.errors$.next(err);
return asErr({
type: 'task',
id: docId,
Expand Down