Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .i18nrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"console": "src/plugins/console",
"core": "src/core",
"discover": "src/plugins/discover",
"bfetch": "src/plugins/bfetch",
"dashboard": "src/plugins/dashboard",
"data": "src/plugins/data",
"embeddableApi": "src/plugins/embeddable",
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@
"expiry-js": "0.1.7",
"extract-zip": "^2.0.1",
"fast-deep-equal": "^3.1.1",
"fflate": "^0.6.9",
"file-saver": "^1.3.8",
"file-type": "^10.9.0",
"focus-trap-react": "^3.1.1",
Expand Down
4 changes: 2 additions & 2 deletions packages/kbn-optimizer/limits.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ pageLoadAssetSize:
alerting: 106936
apm: 64385
apmOss: 18996
bfetch: 41874
canvas: 1065624
bfetch: 51874
canvas: 1066647
charts: 195358
cloud: 21076
console: 46235
Expand Down
2 changes: 2 additions & 0 deletions packages/kbn-ui-shared-deps/entry.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ export const Theme = require('./theme.ts');
export const Lodash = require('lodash');
export const LodashFp = require('lodash/fp');

export const Fflate = require('fflate/esm/browser');

// runtime deps which don't need to be copied across all bundles
export const TsLib = require('tslib');
export const KbnAnalytics = require('@kbn/analytics');
Expand Down
1 change: 1 addition & 0 deletions packages/kbn-ui-shared-deps/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ exports.externals = {
'@elastic/eui/dist/eui_theme_dark.json': '__kbnSharedDeps__.Theme.euiDarkVars',
lodash: '__kbnSharedDeps__.Lodash',
'lodash/fp': '__kbnSharedDeps__.LodashFp',
fflate: '__kbnSharedDeps__.Fflate',

/**
* runtime deps which don't need to be copied across all bundles
Expand Down
5 changes: 5 additions & 0 deletions src/plugins/bfetch/common/batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,8 @@ export interface BatchResponseItem<Result extends object, Error extends ErrorLik
result?: Result;
error?: Error;
}

export interface BatchItemWrapper {
compressed: boolean;
payload: string;
}
9 changes: 9 additions & 0 deletions src/plugins/bfetch/common/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

export const DISABLE_BFETCH_COMPRESSION = 'bfetch:disableCompression';
1 change: 1 addition & 0 deletions src/plugins/bfetch/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ export * from './util';
export * from './streaming';
export * from './buffer';
export * from './batch';
export * from './constants';
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import { createStreamingBatchedFunction } from './create_streaming_batched_function';
import { fetchStreaming as fetchStreamingReal } from '../streaming/fetch_streaming';
import { AbortError, defer, of } from '../../../kibana_utils/public';
import { Subject } from 'rxjs';
import { Subject, of as rxof } from 'rxjs';

const flushPromises = () => new Promise((resolve) => setImmediate(resolve));

const getPromiseState = (promise: Promise<unknown>): Promise<'resolved' | 'rejected' | 'pending'> =>
Promise.race<'resolved' | 'rejected' | 'pending'>([
Expand Down Expand Up @@ -52,6 +54,7 @@ describe('createStreamingBatchedFunction()', () => {
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
compressionDisabled$: rxof(true),
});
expect(typeof fn).toBe('function');
});
Expand All @@ -61,6 +64,7 @@ describe('createStreamingBatchedFunction()', () => {
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
compressionDisabled$: rxof(true),
});
const res = fn({});
expect(typeof res.then).toBe('function');
Expand All @@ -74,6 +78,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

expect(fetchStreaming).toHaveBeenCalledTimes(0);
Expand All @@ -93,6 +98,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

expect(fetchStreaming).toHaveBeenCalledTimes(0);
Expand All @@ -107,6 +113,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

fn({ foo: 'bar' });
Expand All @@ -125,6 +132,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

fn({ foo: 'bar' });
Expand All @@ -146,14 +154,18 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

expect(fetchStreaming).toHaveBeenCalledTimes(0);
fn({ foo: 'bar' });
await flushPromises();
expect(fetchStreaming).toHaveBeenCalledTimes(0);
fn({ baz: 'quix' });
await flushPromises();
expect(fetchStreaming).toHaveBeenCalledTimes(0);
fn({ full: 'yep' });
await flushPromises();
expect(fetchStreaming).toHaveBeenCalledTimes(1);
});

Expand All @@ -164,6 +176,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const abortController = new AbortController();
Expand All @@ -186,11 +199,13 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

fn({ a: '1' });
fn({ b: '2' });
fn({ c: '3' });
await flushPromises();

expect(fetchStreaming.mock.calls[0][0]).toMatchObject({
url: '/test',
Expand All @@ -209,13 +224,16 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

fn({ a: '1' });
fn({ b: '2' });
fn({ c: '3' });
await flushPromises();
expect(fetchStreaming).toHaveBeenCalledTimes(1);
fn({ d: '4' });
await flushPromises();
await new Promise((r) => setTimeout(r, 6));
expect(fetchStreaming).toHaveBeenCalledTimes(2);
});
Expand All @@ -229,6 +247,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const promise1 = fn({ a: '1' });
Expand All @@ -246,8 +265,11 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

await flushPromises();

const promise1 = fn({ a: '1' });
const promise2 = fn({ b: '2' });
const promise3 = fn({ c: '3' });
Expand Down Expand Up @@ -287,6 +309,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const promise1 = fn({ a: '1' });
Expand Down Expand Up @@ -314,13 +337,28 @@ describe('createStreamingBatchedFunction()', () => {
expect(await promise3).toEqual({ foo: 'bar 2' });
});

test('compression is false by default', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
flushOnMaxItems: 1,
fetchStreaming,
});

fn({ a: '1' });

const dontCompress = await fetchStreaming.mock.calls[0][0].compressionDisabled$.toPromise();
expect(dontCompress).toBe(false);
});

test('resolves falsy results', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const promise1 = fn({ a: '1' });
Expand Down Expand Up @@ -362,6 +400,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const promise = fn({ a: '1' });
Expand Down Expand Up @@ -390,6 +429,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const promise1 = of(fn({ a: '1' }));
Expand Down Expand Up @@ -442,6 +482,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const abortController = new AbortController();
Expand Down Expand Up @@ -471,6 +512,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const abortController = new AbortController();
Expand Down Expand Up @@ -509,6 +551,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const promise1 = of(fn({ a: '1' }));
Expand Down Expand Up @@ -539,6 +582,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const promise1 = of(fn({ a: '1' }));
Expand Down Expand Up @@ -576,6 +620,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const promise1 = of(fn({ a: '1' }));
Expand Down Expand Up @@ -608,6 +653,7 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});

const promise1 = of(fn({ a: '1' }));
Expand Down Expand Up @@ -644,7 +690,9 @@ describe('createStreamingBatchedFunction()', () => {
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
compressionDisabled$: rxof(true),
});
await flushPromises();

const promise1 = of(fn({ a: '1' }));
const promise2 = of(fn({ a: '2' }));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@
* Side Public License, v 1.
*/

import { Observable, of } from 'rxjs';
import { AbortError, abortSignalToPromise, defer } from '../../../kibana_utils/public';
import {
ItemBufferParams,
TimedItemBufferParams,
createBatchedFunction,
BatchResponseItem,
ErrorLike,
normalizeError,
} from '../../common';
import { fetchStreaming, split } from '../streaming';
import { normalizeError } from '../../common';
import { fetchStreaming } from '../streaming';
import { BatchedFunc, BatchItem } from './types';

export interface BatchedFunctionProtocolError extends ErrorLike {
Expand Down Expand Up @@ -47,6 +47,11 @@ export interface StreamingBatchedFunctionParams<Payload, Result> {
* before sending the batch request.
*/
maxItemAge?: TimedItemBufferParams<any>['maxItemAge'];

/**
* Disabled zlib compression of response chunks.
*/
compressionDisabled$?: Observable<boolean>;
}

/**
Expand All @@ -64,6 +69,7 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
fetchStreaming: fetchStreamingInjected = fetchStreaming,
flushOnMaxItems = 25,
maxItemAge = 10,
compressionDisabled$ = of(false),
} = params;
const [fn] = createBatchedFunction({
onCall: (payload: Payload, signal?: AbortSignal) => {
Expand Down Expand Up @@ -119,6 +125,7 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
body: JSON.stringify({ batch }),
method: 'POST',
signal: abortController.signal,
compressionDisabled$,
});

const handleStreamError = (error: any) => {
Expand All @@ -127,10 +134,10 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
for (const { future } of items) future.reject(normalizedError);
};

stream.pipe(split('\n')).subscribe({
stream.subscribe({
next: (json: string) => {
try {
const response = JSON.parse(json) as BatchResponseItem<Result, ErrorLike>;
const response = JSON.parse(json);
if (response.error) {
items[response.id].future.reject(response.error);
} else if (response.result !== undefined) {
Expand Down
12 changes: 12 additions & 0 deletions src/plugins/bfetch/public/batching/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

export {
createStreamingBatchedFunction,
StreamingBatchedFunctionParams,
} from './create_streaming_batched_function';
Loading