Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ export {
apiCanLockHoverActions,
type CanLockHoverActions,
} from './interfaces/can_lock_hover_actions';
export { fetch$, useFetchContext, type FetchContext } from './interfaces/fetch/fetch';
export { fetch$, useFetchContext } from './interfaces/fetch/fetch';
export type { FetchContext } from './interfaces/fetch/fetch_context';
export {
type PublishesPauseFetch,
apiPublishesPauseFetch,
} from './interfaces/fetch/publishes_pause_fetch';
export {
initializeTimeRangeManager,
timeRangeComparators,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,58 @@ describe('onFetchContextChanged', () => {
});
});

describe('with isFetchPaused$', () => {
test('should skip emits while fetch is paused', async () => {
const isFetchPaused$ = new BehaviorSubject<boolean>(true);
const api = {
parentApi,
isFetchPaused$,
};
const subscription = fetch$(api).subscribe(onFetchMock);

parentApi.filters$.next([]);
parentApi.query$.next({ language: 'kquery', query: 'hello' });
parentApi.reload$.next();

await new Promise((resolve) => setTimeout(resolve, 0));
expect(onFetchMock).not.toHaveBeenCalled();

subscription.unsubscribe();
});

test('should emit most recent context when fetch becomes un-paused', async () => {
const isFetchPaused$ = new BehaviorSubject<boolean>(true);
const api = {
parentApi,
isFetchPaused$,
};
const subscription = fetch$(api).subscribe(onFetchMock);

parentApi.filters$.next([]);
parentApi.query$.next({ language: 'kquery', query: '' });
parentApi.reload$.next();

isFetchPaused$.next(false);

await new Promise((resolve) => setTimeout(resolve, 100));
expect(onFetchMock).toHaveBeenCalledTimes(1);
const fetchContext = onFetchMock.mock.calls[0][0];
expect(fetchContext).toEqual({
filters: [],
isReload: true,
query: {
language: 'kquery',
query: '',
},
searchSessionId: undefined,
timeRange: undefined,
timeslice: undefined,
});

subscription.unsubscribe();
});
});

describe('no searchSession$', () => {
test('should emit once on reload', async () => {
const subscription = fetch$({ parentApi }).pipe(skip(1)).subscribe(onFetchMock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,49 +7,49 @@
* License v3.0 only", or the "Server Side Public License, v 1".
*/

import type { Observable } from 'rxjs';
import { useEffect, useMemo } from 'react';
import {
BehaviorSubject,
Subject,
combineLatest,
combineLatestWith,
debounceTime,
delay,
distinctUntilChanged,
filter,
map,
merge,
of,
skip,
startWith,
Subject,
switchMap,
takeUntil,
tap,
type Observable,
} from 'rxjs';
import type { AggregateQuery, Filter, Query, TimeRange } from '@kbn/es-query';
import { useMemo, useEffect } from 'react';
import type { PublishesTimeRange, PublishesUnifiedSearch } from './publishes_unified_search';
import { apiPublishesTimeRange, apiPublishesUnifiedSearch } from './publishes_unified_search';
import type { PublishesSearchSession } from './publishes_search_session';
import { apiPublishesSearchSession } from './publishes_search_session';
import type { HasParentApi } from '../has_parent_api';
import { apiHasParentApi } from '../has_parent_api';
import { apiPublishesReload } from './publishes_reload';
import { useStateFromPublishingSubject } from '../../publishing_subject';
import { apiHasParentApi, type HasParentApi } from '../has_parent_api';
import {
type FetchContext,
type ReloadTimeFetchContext,
isReloadTimeFetchContextEqual,
} from './fetch_context';
import { apiPublishesPauseFetch } from './publishes_pause_fetch';
import { apiPublishesReload } from './publishes_reload';
import { apiPublishesSearchSession, type PublishesSearchSession } from './publishes_search_session';
import {
apiPublishesTimeRange,
apiPublishesUnifiedSearch,
type PublishesTimeRange,
type PublishesUnifiedSearch,
} from './publishes_unified_search';

export interface FetchContext {
isReload: boolean;
filters: Filter[] | undefined;
query: Query | AggregateQuery | undefined;
searchSessionId: string | undefined;
timeRange: TimeRange | undefined;
timeslice: [number, number] | undefined;
}

function getFetchContext(api: unknown, isReload: boolean) {
function getReloadTimeFetchContext(api: unknown, reloadTimestamp?: number): ReloadTimeFetchContext {
const typeApi = api as Partial<
PublishesTimeRange & HasParentApi<Partial<PublishesUnifiedSearch & PublishesSearchSession>>
>;
return {
isReload,
reloadTimestamp,
filters: typeApi?.parentApi?.filters$?.value,
query: typeApi?.parentApi?.query$?.value,
searchSessionId: typeApi?.parentApi?.searchSessionId$?.value,
Expand Down Expand Up @@ -121,36 +121,69 @@ export function fetch$(api: unknown): Observable<FetchContext> {
const batchedObservables = getBatchedObservables(api);
const immediateObservables = getImmediateObservables(api);

if (immediateObservables.length === 0) {
return merge(...batchedObservables).pipe(
startWith(getFetchContext(api, false)),
debounceTime(0),
map(() => getFetchContext(api, false))
const fetchContext$ = (() => {
if (immediateObservables.length === 0) {
return merge(...batchedObservables).pipe(
startWith(getReloadTimeFetchContext(api)),
debounceTime(0),
map(() => getReloadTimeFetchContext(api))
);
}
const interrupt = new Subject<void>();
const batchedChanges$ = merge(...batchedObservables).pipe(
switchMap((value) =>
of(value).pipe(
delay(0),
takeUntil(interrupt),
map(() => getReloadTimeFetchContext(api))
)
)
);
}

const interrupt = new Subject<void>();
const batchedChanges$ = merge(...batchedObservables).pipe(
switchMap((value) =>
of(value).pipe(
delay(0),
takeUntil(interrupt),
map(() => getFetchContext(api, false))
)
const immediateChange$ = merge(...immediateObservables).pipe(
tap(() => {
interrupt.next();
}),
map(() => getReloadTimeFetchContext(api, Date.now()))
);
return merge(immediateChange$, batchedChanges$).pipe(startWith(getReloadTimeFetchContext(api)));
})();

const parentPauseFetch =
apiHasParentApi(api) && apiPublishesPauseFetch(api.parentApi)
? api.parentApi.isFetchPaused$
: of(false);
const apiPauseFetch = apiPublishesPauseFetch(api) ? api.isFetchPaused$ : of(false);
const isFetchPaused$ = combineLatest([parentPauseFetch, apiPauseFetch]).pipe(
map(
([parentRequestingPause, apiRequestingPause]) => parentRequestingPause || apiRequestingPause
)
);

const immediateChange$ = merge(...immediateObservables).pipe(
tap(() => interrupt.next()),
map(() => getFetchContext(api, true))
return fetchContext$.pipe(
combineLatestWith(isFetchPaused$),
filter(([, isFetchPaused]) => !isFetchPaused),
map(([fetchContext]) => fetchContext),
distinctUntilChanged((prevContext, nextContext) =>
isReloadTimeFetchContextEqual(prevContext, nextContext)
),
map((reloadTimeFetchContext) => ({
isReload: Boolean(reloadTimeFetchContext.reloadTimestamp),
filters: reloadTimeFetchContext.filters,
query: reloadTimeFetchContext.query,
timeRange: reloadTimeFetchContext.timeRange,
timeslice: reloadTimeFetchContext.timeslice,
searchSessionId: reloadTimeFetchContext.searchSessionId,
}))
);

return merge(immediateChange$, batchedChanges$).pipe(startWith(getFetchContext(api, false)));
}

export const useFetchContext = (api: unknown): FetchContext => {
const context$: BehaviorSubject<FetchContext> = useMemo(() => {
return new BehaviorSubject<FetchContext>(getFetchContext(api, false));
return new BehaviorSubject<FetchContext>({
...getReloadTimeFetchContext(api),
isReload: false,
});
// eslint-disable-next-line react-hooks/exhaustive-deps
}, []);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

import type { AggregateQuery, Filter, Query, TimeRange } from '@kbn/es-query';
import { COMPARE_ALL_OPTIONS, onlyDisabledFiltersChanged } from '@kbn/es-query';
import fastIsEqual from 'fast-deep-equal';

export interface FetchContext {
isReload: boolean;
filters: Filter[] | undefined;
query: Query | AggregateQuery | undefined;
searchSessionId: string | undefined;
timeRange: TimeRange | undefined;
timeslice: [number, number] | undefined;
}

export interface ReloadTimeFetchContext extends Omit<FetchContext, 'isReload'> {
reloadTimestamp?: number;
}

export function isReloadTimeFetchContextEqual(
currentContext: ReloadTimeFetchContext,
lastContext: ReloadTimeFetchContext
): boolean {
if (currentContext.searchSessionId !== lastContext.searchSessionId) return false;

return (
isReloadTimestampEqualForFetch(currentContext.reloadTimestamp, lastContext.reloadTimestamp) &&
areFiltersEqualForFetch(currentContext.filters, lastContext.filters) &&
isQueryEqualForFetch(currentContext.query, lastContext.query) &&
isTimeRangeEqualForFetch(currentContext.timeRange, lastContext.timeRange) &&
isTimeSliceEqualForFetch(currentContext.timeslice, lastContext.timeslice)
);
}

export const areFiltersEqualForFetch = (currentFilters?: Filter[], lastFilters?: Filter[]) => {
return onlyDisabledFiltersChanged(currentFilters, lastFilters, {
...COMPARE_ALL_OPTIONS,
// do not compare $state to avoid refreshing when filter is pinned/unpinned (which does not impact results)
state: false,
});
};

export const isReloadTimestampEqualForFetch = (
currentReloadTimestamp?: number,
lastReloadTimestamp?: number
) => {
if (!currentReloadTimestamp) return true; // if current reload timestamp is not set, this is not a force refresh.
return currentReloadTimestamp === lastReloadTimestamp;
};

export const isQueryEqualForFetch = (
currentQuery: Query | AggregateQuery | undefined,
lastQuery: Query | AggregateQuery | undefined
) => fastIsEqual(currentQuery, lastQuery);

export const isTimeRangeEqualForFetch = (
currentTimeRange: TimeRange | undefined,
lastTimeRange: TimeRange | undefined
) => fastIsEqual(currentTimeRange, lastTimeRange);

export const isTimeSliceEqualForFetch = (
currentTimeslice: [number, number] | undefined,
lastTimeslice: [number, number] | undefined
) => fastIsEqual(currentTimeslice, lastTimeslice);
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

import type { Observable } from 'rxjs';

export interface PublishesPauseFetch {
isFetchPaused$: Observable<boolean>;
}

export const apiPublishesPauseFetch = (
unknownApi: null | unknown
): unknownApi is PublishesPauseFetch => {
return Boolean(unknownApi && (unknownApi as PublishesPauseFetch)?.isFetchPaused$ !== undefined);
};
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import type { Observable } from 'rxjs';

export interface PublishesReload {
reload$: Omit<Observable<void>, 'next'>;
reload$: Observable<void>;
Comment thread
nreese marked this conversation as resolved.
}

export const apiPublishesReload = (unknownApi: null | unknown): unknownApi is PublishesReload => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
*/

import type { AggregateQuery, Filter, Query, TimeRange } from '@kbn/es-query';
import { COMPARE_ALL_OPTIONS, onlyDisabledFiltersChanged } from '@kbn/es-query';
import fastIsEqual from 'fast-deep-equal';
import { useEffect, useMemo } from 'react';
import { BehaviorSubject } from 'rxjs';
import type { PublishingSubject } from '../../publishing_subject';
import {
areFiltersEqualForFetch,
isQueryEqualForFetch,
isTimeRangeEqualForFetch,
} from './fetch_context';

export interface PublishesTimeslice {
timeslice$: PublishingSubject<[number, number] | undefined>;
Expand Down Expand Up @@ -115,25 +118,19 @@ export function useSearchApi({
}, []);

useEffect(() => {
if (
!onlyDisabledFiltersChanged(searchApi.filters$.getValue(), filters, {
...COMPARE_ALL_OPTIONS,
// do not compare $state to avoid refreshing when filter is pinned/unpinned (which does not impact results)
state: false,
})
) {
if (!areFiltersEqualForFetch(searchApi.filters$.getValue(), filters)) {
searchApi.filters$.next(filters);
}
}, [filters, searchApi.filters$]);

useEffect(() => {
if (!fastIsEqual(searchApi.query$.getValue(), query)) {
if (!isQueryEqualForFetch(searchApi.query$.getValue(), query)) {
searchApi.query$.next(query);
}
}, [query, searchApi.query$]);

useEffect(() => {
if (!fastIsEqual(searchApi.timeRange$.getValue(), timeRange)) {
if (!isTimeRangeEqualForFetch(searchApi.timeRange$.getValue(), timeRange)) {
searchApi.timeRange$.next(timeRange);
}
}, [timeRange, searchApi.timeRange$]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ describe('saved search embeddable', () => {
expect(resolveDataSourceProfileSpy).not.toHaveBeenCalled();

// trigger a refetch
dashboadFilters.next([]);
dashboadFilters.next([{ meta: {} }]);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does meta do? Should this use reload API?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR introduces diffing into the keys of the fetch context before firing the requests. Adding this blank filter object triggers a diff which makes the request fire again. Meta is there because it's required by the type.

In this test, firing an update with a blank array fails the diff and doesn't trigger a refetch. I thought that using filters here would retain the spirit of the test better than switching to reload, but either would work IMO.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. Makes sense and its only a test

await waitOneTick();
expect(resolveDataSourceProfileSpy).toHaveBeenCalled();
});
Expand Down
Loading