diff --git a/x-pack/plugins/ingest_manager/server/services/agents/actions.ts b/x-pack/plugins/ingest_manager/server/services/agents/actions.ts index 8a038e5de2675..f2cdd1f31e69f 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/actions.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/actions.ts @@ -247,11 +247,16 @@ export async function getNewActionsSince( nodeTypes.literal.buildNode(false), ]) ), + nodeTypes.function.buildNodeWithArgumentNodes('is', [ + nodeTypes.literal.buildNode(`${AGENT_ACTION_SAVED_OBJECT_TYPE}.attributes.agent_id`), + nodeTypes.wildcard.buildNode(nodeTypes.wildcard.wildcardSymbol), + nodeTypes.literal.buildNode(false), + ]), nodeTypes.function.buildNode( 'range', `${AGENT_ACTION_SAVED_OBJECT_TYPE}.attributes.created_at`, { - gte: timestamp, + gt: timestamp, } ), ]); diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.test.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.test.ts index f4a2147131570..04f7f206b3bcb 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.test.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.test.ts @@ -4,11 +4,19 @@ * you may not use this file except in compliance with the Elastic License. */ import { savedObjectsClientMock } from 'src/core/server/mocks'; -import { createAgentActionFromPolicyAction } from './state_new_actions'; -import { OutputType, Agent, AgentPolicyAction } from '../../../types'; +import { take } from 'rxjs/operators'; +import { + createAgentActionFromPolicyAction, + createNewActionsSharedObservable, +} from './state_new_actions'; +import { getNewActionsSince } from '../actions'; +import { OutputType, Agent, AgentAction, AgentPolicyAction } from '../../../types'; jest.mock('../../app_context', () => ({ appContextService: { + getInternalUserSOClient: () => { + return {}; + }, getEncryptedSavedObjects: () => ({ getDecryptedAsInternalUser: () => ({ attributes: { @@ -19,7 +27,83 @@ jest.mock('../../app_context', () => ({ }, })); +jest.mock('../actions'); + +jest.useFakeTimers(); + +function waitForPromiseResolved() { + return new Promise((resolve) => setImmediate(resolve)); +} + +function getMockedNewActionSince() { + return getNewActionsSince as jest.MockedFunction; +} + describe('test agent checkin new action services', () => { + describe('newAgetActionObservable', () => { + beforeEach(() => { + (getNewActionsSince as jest.MockedFunction).mockReset(); + }); + it('should work, call get actions until there is new action', async () => { + const observable = createNewActionsSharedObservable(); + + getMockedNewActionSince() + .mockResolvedValueOnce([]) + .mockResolvedValueOnce([ + ({ id: 'action1', created_at: new Date().toISOString() } as unknown) as AgentAction, + ]) + .mockResolvedValueOnce([ + ({ id: 'action2', created_at: new Date().toISOString() } as unknown) as AgentAction, + ]); + // First call + const promise = observable.pipe(take(1)).toPromise(); + + jest.advanceTimersByTime(5000); + await waitForPromiseResolved(); + jest.advanceTimersByTime(5000); + await waitForPromiseResolved(); + + const res = await promise; + expect(getNewActionsSince).toBeCalledTimes(2); + expect(res).toHaveLength(1); + expect(res[0].id).toBe('action1'); + // Second call + const secondSubscription = observable.pipe(take(1)).toPromise(); + + jest.advanceTimersByTime(5000); + await waitForPromiseResolved(); + + const secondRes = await secondSubscription; + expect(secondRes).toHaveLength(1); + expect(secondRes[0].id).toBe('action2'); + expect(getNewActionsSince).toBeCalledTimes(3); + // It should call getNewActionsSince with the last action returned + expect(getMockedNewActionSince().mock.calls[2][1]).toBe(res[0].created_at); + }); + + it('should not fetch actions concurrently', async () => { + const observable = createNewActionsSharedObservable(); + + const resolves: Array<() => void> = []; + getMockedNewActionSince().mockImplementation(() => { + return new Promise((resolve) => { + resolves.push(resolve); + }); + }); + + observable.pipe(take(1)).toPromise(); + + jest.advanceTimersByTime(5000); + await waitForPromiseResolved(); + jest.advanceTimersByTime(5000); + await waitForPromiseResolved(); + jest.advanceTimersByTime(5000); + await waitForPromiseResolved(); + + expect(getNewActionsSince).toBeCalledTimes(1); + }); + }); + describe('createAgentActionFromPolicyAction()', () => { const mockSavedObjectsClient = savedObjectsClientMock.create(); const mockAgent: Agent = { diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts index 1871cd2cb04f6..aa48d8fe18e9f 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts @@ -12,6 +12,7 @@ import { share, distinctUntilKeyChanged, switchMap, + exhaustMap, concatMap, merge, filter, @@ -62,18 +63,28 @@ function getInternalUserSOClient() { return appContextService.getInternalUserSOClient(fakeRequest); } -function createNewActionsSharedObservable(): Observable { +export function createNewActionsSharedObservable(): Observable { let lastTimestamp = new Date().toISOString(); return timer(0, AGENT_UPDATE_ACTIONS_INTERVAL_MS).pipe( - switchMap(() => { + exhaustMap(() => { const internalSOClient = getInternalUserSOClient(); - const timestamp = lastTimestamp; - lastTimestamp = new Date().toISOString(); - return from(getNewActionsSince(internalSOClient, timestamp)); + return from( + getNewActionsSince(internalSOClient, lastTimestamp).then((data) => { + if (data.length > 0) { + lastTimestamp = data.reduce((acc, action) => { + return acc >= action.created_at ? acc : action.created_at; + }, lastTimestamp); + } + + return data; + }) + ); + }), + filter((data) => { + return data.length > 0; }), - filter((data) => data.length > 0), share() ); }