Skip to content

Commit dc813c3

Browse files
authored
Handle content stream errors in report pre-deletion (#173792)
Re-addresses #171363 The bug was still evident, especially when using network throttling to add slight lag to the request turnaround times. This PR adds more handling of errors that could be thrown slightly prior to deleting the report document, when we try to clear all chunks of the report using the content stream. <details> <summary>Before</summary> https://github.com/elastic/kibana/assets/908371/c27fe314-0f93-42b4-8076-99a1e30b8d2f </details> <details> <summary>After</summary> https://github.com/elastic/kibana/assets/908371/4c1f5edd-73f1-4ca4-a40a-f900ca5f9c78 </details> ### Checklist - [x] Unit tests
1 parent e62b7ac commit dc813c3

File tree

3 files changed

+295
-221
lines changed

3 files changed

+295
-221
lines changed

x-pack/plugins/reporting/server/routes/common/jobs/get_job_routes.ts

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,17 +81,43 @@ export const commonJobsRouteHandlerFactory = (reporting: ReportingCore) => {
8181
return jobManagementPreRouting(reporting, res, docId, user, counters, async (doc) => {
8282
const docIndex = doc.index;
8383
const stream = await getContentStream(reporting, { id: docId, index: docIndex });
84-
/** @note Overwriting existing content with an empty buffer to remove all the chunks. */
85-
await new Promise<void>((resolve) => {
86-
stream.end('', 'utf8', () => {
87-
resolve();
88-
});
84+
const reportingSetup = reporting.getPluginSetupDeps();
85+
const logger = reportingSetup.logger.get('delete-report');
86+
87+
// An "error" event is emitted if an error is
88+
// passed to the `stream.end` callback from
89+
// the _final method of the ContentStream.
90+
// This event must be handled.
91+
stream.on('error', (err) => {
92+
logger.error(err);
8993
});
90-
await jobsQuery.delete(docIndex, docId);
9194

92-
return res.ok({
93-
body: { deleted: true },
94-
});
95+
try {
96+
// Overwriting existing content with an
97+
// empty buffer to remove all the chunks.
98+
await new Promise<void>((resolve, reject) => {
99+
stream.end('', 'utf8', (error?: Error) => {
100+
if (error) {
101+
// handle error that could be thrown
102+
// from the _write method of the ContentStream
103+
reject(error);
104+
} else {
105+
resolve();
106+
}
107+
});
108+
});
109+
110+
await jobsQuery.delete(docIndex, docId);
111+
112+
return res.ok({
113+
body: { deleted: true },
114+
});
115+
} catch (error) {
116+
logger.error(error);
117+
return res.customError({
118+
statusCode: 500,
119+
});
120+
}
95121
});
96122
};
97123

x-pack/plugins/reporting/server/routes/internal/management/integration_tests/jobs.test.ts

Lines changed: 143 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import { registerJobInfoRoutesInternal as registerJobInfoRoutes } from '../jobs'
3636

3737
type SetupServerReturn = Awaited<ReturnType<typeof setupServer>>;
3838

39-
describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => {
39+
describe(`Reporting Job Management Routes: Internal`, () => {
4040
const reportingSymbol = Symbol('reporting');
4141
let server: SetupServerReturn['server'];
4242
let usageCounter: IUsageCounter;
@@ -144,148 +144,148 @@ describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => {
144144
await server.stop();
145145
});
146146

147-
it('fails on malformed download IDs', async () => {
148-
mockEsClient.search.mockResponseOnce(getHits());
149-
registerJobInfoRoutes(core);
147+
describe('download report', () => {
148+
it('fails on malformed download IDs', async () => {
149+
mockEsClient.search.mockResponseOnce(getHits());
150+
registerJobInfoRoutes(core);
150151

151-
await server.start();
152+
await server.start();
152153

153-
await supertest(httpSetup.server.listener)
154-
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/1`)
155-
.expect(400)
156-
.then(({ body }) =>
157-
expect(body.message).toMatchInlineSnapshot(
158-
'"[request params.docId]: value has length [1] but it must have a minimum length of [3]."'
159-
)
160-
);
161-
});
154+
await supertest(httpSetup.server.listener)
155+
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/1`)
156+
.expect(400)
157+
.then(({ body }) =>
158+
expect(body.message).toMatchInlineSnapshot(
159+
'"[request params.docId]: value has length [1] but it must have a minimum length of [3]."'
160+
)
161+
);
162+
});
162163

163-
it('fails on unauthenticated users', async () => {
164-
mockStartDeps = await createMockPluginStart(
165-
{
166-
licensing: {
167-
...licensingMock.createStart(),
168-
license$: new BehaviorSubject({ isActive: true, isAvailable: true, type: 'gold' }),
164+
it('fails on unauthenticated users', async () => {
165+
mockStartDeps = await createMockPluginStart(
166+
{
167+
licensing: {
168+
...licensingMock.createStart(),
169+
license$: new BehaviorSubject({ isActive: true, isAvailable: true, type: 'gold' }),
170+
},
171+
security: { authc: { getCurrentUser: () => undefined } },
169172
},
170-
security: { authc: { getCurrentUser: () => undefined } },
171-
},
172-
mockConfigSchema
173-
);
174-
core = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps);
175-
registerJobInfoRoutes(core);
173+
mockConfigSchema
174+
);
175+
core = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps);
176+
registerJobInfoRoutes(core);
176177

177-
await server.start();
178+
await server.start();
178179

179-
await supertest(httpSetup.server.listener)
180-
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dope`)
181-
.expect(401)
182-
.then(({ body }) =>
183-
expect(body.message).toMatchInlineSnapshot(`"Sorry, you aren't authenticated"`)
184-
);
185-
});
180+
await supertest(httpSetup.server.listener)
181+
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dope`)
182+
.expect(401)
183+
.then(({ body }) =>
184+
expect(body.message).toMatchInlineSnapshot(`"Sorry, you aren't authenticated"`)
185+
);
186+
});
186187

187-
it('returns 404 if job not found', async () => {
188-
mockEsClient.search.mockResponseOnce(getHits());
189-
registerJobInfoRoutes(core);
188+
it('returns 404 if job not found', async () => {
189+
mockEsClient.search.mockResponseOnce(getHits());
190+
registerJobInfoRoutes(core);
190191

191-
await server.start();
192+
await server.start();
192193

193-
await supertest(httpSetup.server.listener)
194-
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`)
195-
.expect(404);
196-
});
194+
await supertest(httpSetup.server.listener)
195+
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`)
196+
.expect(404);
197+
});
197198

198-
it('returns a 403 if not a valid job type', async () => {
199-
mockEsClient.search.mockResponseOnce(
200-
getHits({
201-
jobtype: 'invalidJobType',
202-
payload: { title: 'invalid!' },
203-
})
204-
);
205-
registerJobInfoRoutes(core);
199+
it('returns a 403 if not a valid job type', async () => {
200+
mockEsClient.search.mockResponseOnce(
201+
getHits({
202+
jobtype: 'invalidJobType',
203+
payload: { title: 'invalid!' },
204+
})
205+
);
206+
registerJobInfoRoutes(core);
206207

207-
await server.start();
208+
await server.start();
208209

209-
await supertest(httpSetup.server.listener)
210-
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`)
211-
.expect(403);
212-
});
210+
await supertest(httpSetup.server.listener)
211+
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`)
212+
.expect(403);
213+
});
213214

214-
it(`returns job's info`, async () => {
215-
mockEsClient.search.mockResponseOnce(
216-
getHits({
217-
jobtype: mockJobTypeBase64Encoded,
218-
payload: {}, // payload is irrelevant
219-
})
220-
);
215+
it(`returns job's info`, async () => {
216+
mockEsClient.search.mockResponseOnce(
217+
getHits({
218+
jobtype: mockJobTypeBase64Encoded,
219+
payload: {}, // payload is irrelevant
220+
})
221+
);
221222

222-
registerJobInfoRoutes(core);
223+
registerJobInfoRoutes(core);
223224

224-
await server.start();
225+
await server.start();
225226

226-
await supertest(httpSetup.server.listener)
227-
.get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`)
228-
.expect(200);
229-
});
227+
await supertest(httpSetup.server.listener)
228+
.get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`)
229+
.expect(200);
230+
});
230231

231-
it(`returns 403 if a user cannot view a job's info`, async () => {
232-
mockEsClient.search.mockResponseOnce(
233-
getHits({
234-
jobtype: 'customForbiddenJobType',
235-
payload: {}, // payload is irrelevant
236-
})
237-
);
232+
it(`returns 403 if a user cannot view a job's info`, async () => {
233+
mockEsClient.search.mockResponseOnce(
234+
getHits({
235+
jobtype: 'customForbiddenJobType',
236+
payload: {}, // payload is irrelevant
237+
})
238+
);
238239

239-
registerJobInfoRoutes(core);
240+
registerJobInfoRoutes(core);
240241

241-
await server.start();
242+
await server.start();
242243

243-
await supertest(httpSetup.server.listener)
244-
.get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`)
245-
.expect(403);
246-
});
244+
await supertest(httpSetup.server.listener)
245+
.get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`)
246+
.expect(403);
247+
});
247248

248-
it('when a job is incomplete', async () => {
249-
mockEsClient.search.mockResponseOnce(
250-
getHits({
251-
jobtype: mockJobTypeUnencoded,
252-
status: 'pending',
253-
payload: { title: 'incomplete!' },
254-
})
255-
);
256-
registerJobInfoRoutes(core);
257-
258-
await server.start();
259-
await supertest(httpSetup.server.listener)
260-
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`)
261-
.expect(503)
262-
.expect('Content-Type', 'text/plain; charset=utf-8')
263-
.expect('Retry-After', '30')
264-
.then(({ text }) => expect(text).toEqual('pending'));
265-
});
249+
it('when a job is incomplete', async () => {
250+
mockEsClient.search.mockResponseOnce(
251+
getHits({
252+
jobtype: mockJobTypeUnencoded,
253+
status: 'pending',
254+
payload: { title: 'incomplete!' },
255+
})
256+
);
257+
registerJobInfoRoutes(core);
266258

267-
it('when a job fails', async () => {
268-
mockEsClient.search.mockResponse(
269-
getHits({
270-
jobtype: mockJobTypeUnencoded,
271-
status: 'failed',
272-
output: { content: 'job failure message' },
273-
payload: { title: 'failing job!' },
274-
})
275-
);
276-
registerJobInfoRoutes(core);
277-
278-
await server.start();
279-
await supertest(httpSetup.server.listener)
280-
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`)
281-
.expect(500)
282-
.expect('Content-Type', 'application/json; charset=utf-8')
283-
.then(({ body }) =>
284-
expect(body.message).toEqual('Reporting generation failed: job failure message')
259+
await server.start();
260+
await supertest(httpSetup.server.listener)
261+
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`)
262+
.expect(503)
263+
.expect('Content-Type', 'text/plain; charset=utf-8')
264+
.expect('Retry-After', '30')
265+
.then(({ text }) => expect(text).toEqual('pending'));
266+
});
267+
268+
it('when a job fails', async () => {
269+
mockEsClient.search.mockResponse(
270+
getHits({
271+
jobtype: mockJobTypeUnencoded,
272+
status: 'failed',
273+
output: { content: 'job failure message' },
274+
payload: { title: 'failing job!' },
275+
})
285276
);
286-
});
277+
registerJobInfoRoutes(core);
278+
279+
await server.start();
280+
await supertest(httpSetup.server.listener)
281+
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`)
282+
.expect(500)
283+
.expect('Content-Type', 'application/json; charset=utf-8')
284+
.then(({ body }) =>
285+
expect(body.message).toEqual('Reporting generation failed: job failure message')
286+
);
287+
});
287288

288-
describe('successful downloads', () => {
289289
it('when a known job-type is complete', async () => {
290290
mockEsClient.search.mockResponseOnce(getCompleteHits());
291291
registerJobInfoRoutes(core);
@@ -483,4 +483,28 @@ describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => {
483483
});
484484
});
485485
});
486+
487+
describe('delete report', () => {
488+
it('handles content stream errors', async () => {
489+
stream = new Readable({
490+
read() {
491+
this.push('test');
492+
this.push(null);
493+
},
494+
}) as typeof stream;
495+
stream.end = jest.fn().mockImplementation((_name, _encoding, callback) => {
496+
callback(new Error('An error occurred in ending the content stream'));
497+
});
498+
499+
(getContentStream as jest.MockedFunction<typeof getContentStream>).mockResolvedValue(stream);
500+
mockEsClient.search.mockResponseOnce(getCompleteHits());
501+
registerJobInfoRoutes(core);
502+
503+
await server.start();
504+
await supertest(httpSetup.server.listener)
505+
.delete(`${INTERNAL_ROUTES.JOBS.DELETE_PREFIX}/dank`)
506+
.expect(500)
507+
.expect('Content-Type', 'application/json; charset=utf-8');
508+
});
509+
});
486510
});

0 commit comments

Comments
 (0)