Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->

[Home](./index.md) &gt; [kibana-plugin-core-server](./kibana-plugin-core-server.md) &gt; [KibanaRequestEvents](./kibana-plugin-core-server.kibanarequestevents.md) &gt; [completed$](./kibana-plugin-core-server.kibanarequestevents.completed_.md)

## KibanaRequestEvents.completed$ property

Observable that emits once if and when the request has been completely handled.

<b>Signature:</b>

```typescript
completed$: Observable<void>;
```

## Remarks

The request may be considered completed if: - A response has been sent to the client; or - The request was aborted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ export interface KibanaRequestEvents
| Property | Type | Description |
| --- | --- | --- |
| [aborted$](./kibana-plugin-core-server.kibanarequestevents.aborted_.md) | <code>Observable&lt;void&gt;</code> | Observable that emits once if and when the request has been aborted. |
| [completed$](./kibana-plugin-core-server.kibanarequestevents.completed_.md) | <code>Observable&lt;void&gt;</code> | Observable that emits once if and when the request has been completely handled. |

91 changes: 91 additions & 0 deletions src/core/server/http/integration_tests/request.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { HttpService } from '../http_service';
import { contextServiceMock } from '../../context/context_service.mock';
import { loggingSystemMock } from '../../logging/logging_system.mock';
import { createHttpServer } from '../test_utils';
import { schema } from '@kbn/config-schema';

let server: HttpService;

Expand Down Expand Up @@ -195,6 +196,96 @@ describe('KibanaRequest', () => {
expect(nextSpy).toHaveBeenCalledTimes(0);
expect(completeSpy).toHaveBeenCalledTimes(1);
});

it('does not complete before response has been sent', async () => {
const { server: innerServer, createRouter, registerOnPreAuth } = await server.setup(
setupDeps
);
const router = createRouter('/');

const nextSpy = jest.fn();
const completeSpy = jest.fn();

registerOnPreAuth((req, res, toolkit) => {
req.events.aborted$.subscribe({
next: nextSpy,
complete: completeSpy,
});
return toolkit.next();
});

router.post(
{ path: '/', validate: { body: schema.any() } },
async (context, request, res) => {
expect(completeSpy).not.toHaveBeenCalled();
return res.ok({ body: 'ok' });
}
);

await server.start();

await supertest(innerServer.listener).post('/').send({ data: 'test' }).expect(200);

expect(nextSpy).toHaveBeenCalledTimes(0);
expect(completeSpy).toHaveBeenCalledTimes(1);
});
});

describe('completed$', () => {
it('emits once and completes when response is sent', async () => {
const { server: innerServer, createRouter } = await server.setup(setupDeps);
const router = createRouter('/');

const nextSpy = jest.fn();
const completeSpy = jest.fn();

router.get({ path: '/', validate: false }, async (context, req, res) => {
req.events.completed$.subscribe({
next: nextSpy,
complete: completeSpy,
});

expect(nextSpy).not.toHaveBeenCalled();
expect(completeSpy).not.toHaveBeenCalled();
return res.ok({ body: 'ok' });
});

await server.start();

await supertest(innerServer.listener).get('/').expect(200);
expect(nextSpy).toHaveBeenCalledTimes(1);
expect(completeSpy).toHaveBeenCalledTimes(1);
});

it('emits once and completes when response is aborted', async (done) => {
expect.assertions(2);
const { server: innerServer, createRouter } = await server.setup(setupDeps);
const router = createRouter('/');

const nextSpy = jest.fn();

router.get({ path: '/', validate: false }, async (context, req, res) => {
req.events.completed$.subscribe({
next: nextSpy,
complete: () => {
expect(nextSpy).toHaveBeenCalledTimes(1);
done();
},
});

expect(nextSpy).not.toHaveBeenCalled();
await delay(30000);
return res.ok({ body: 'ok' });
});

await server.start();

const incomingRequest = supertest(innerServer.listener)
.get('/')
// end required to send request
.end();
setTimeout(() => incomingRequest.abort(), 50);
});
});
});
});
19 changes: 17 additions & 2 deletions src/core/server/http/router/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ export interface KibanaRequestEvents {
* Observable that emits once if and when the request has been aborted.
*/
aborted$: Observable<void>;

/**
* Observable that emits once if and when the request has been completely handled.
*
* @remarks
* The request may be considered completed if:
* - A response has been sent to the client; or
* - The request was aborted.
*/
completed$: Observable<void>;
}

/**
Expand Down Expand Up @@ -186,11 +196,16 @@ export class KibanaRequest<

private getEvents(request: Request): KibanaRequestEvents {
const finish$ = merge(
fromEvent(request.raw.req, 'end'), // all data consumed
fromEvent(request.raw.res, 'finish'), // Response has been sent
fromEvent(request.raw.req, 'close') // connection was closed
).pipe(shareReplay(1), first());

const aborted$ = fromEvent<void>(request.raw.req, 'aborted').pipe(first(), takeUntil(finish$));
const completed$ = merge<void, void>(finish$, aborted$).pipe(shareReplay(1), first());

return {
aborted$: fromEvent<void>(request.raw.req, 'aborted').pipe(first(), takeUntil(finish$)),
aborted$,
completed$,
} as const;
}

Expand Down
1 change: 1 addition & 0 deletions src/core/server/server.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,7 @@ export class KibanaRequest<Params = unknown, Query = unknown, Body = unknown, Me
// @public
export interface KibanaRequestEvents {
aborted$: Observable<void>;
completed$: Observable<void>;
}

// @public
Expand Down