Skip to content

Commit b1ec36c

Browse files
committed
[Fleet] Fix agent action observable for long polling (#81376)
1 parent affdeb4 commit b1ec36c

File tree

3 files changed

+109
-9
lines changed

3 files changed

+109
-9
lines changed

x-pack/plugins/ingest_manager/server/services/agents/actions.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,11 +247,16 @@ export async function getNewActionsSince(
247247
nodeTypes.literal.buildNode(false),
248248
])
249249
),
250+
nodeTypes.function.buildNodeWithArgumentNodes('is', [
251+
nodeTypes.literal.buildNode(`${AGENT_ACTION_SAVED_OBJECT_TYPE}.attributes.agent_id`),
252+
nodeTypes.wildcard.buildNode(nodeTypes.wildcard.wildcardSymbol),
253+
nodeTypes.literal.buildNode(false),
254+
]),
250255
nodeTypes.function.buildNode(
251256
'range',
252257
`${AGENT_ACTION_SAVED_OBJECT_TYPE}.attributes.created_at`,
253258
{
254-
gte: timestamp,
259+
gt: timestamp,
255260
}
256261
),
257262
]);

x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.test.ts

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,19 @@
44
* you may not use this file except in compliance with the Elastic License.
55
*/
66
import { savedObjectsClientMock } from 'src/core/server/mocks';
7-
import { createAgentActionFromPolicyAction } from './state_new_actions';
8-
import { OutputType, Agent, AgentPolicyAction } from '../../../types';
7+
import { take } from 'rxjs/operators';
8+
import {
9+
createAgentActionFromPolicyAction,
10+
createNewActionsSharedObservable,
11+
} from './state_new_actions';
12+
import { getNewActionsSince } from '../actions';
13+
import { OutputType, Agent, AgentAction, AgentPolicyAction } from '../../../types';
914

1015
jest.mock('../../app_context', () => ({
1116
appContextService: {
17+
getInternalUserSOClient: () => {
18+
return {};
19+
},
1220
getEncryptedSavedObjects: () => ({
1321
getDecryptedAsInternalUser: () => ({
1422
attributes: {
@@ -19,7 +27,83 @@ jest.mock('../../app_context', () => ({
1927
},
2028
}));
2129

30+
jest.mock('../actions');
31+
32+
jest.useFakeTimers();
33+
34+
function waitForPromiseResolved() {
35+
return new Promise((resolve) => setImmediate(resolve));
36+
}
37+
38+
function getMockedNewActionSince() {
39+
return getNewActionsSince as jest.MockedFunction<typeof getNewActionsSince>;
40+
}
41+
2242
describe('test agent checkin new action services', () => {
43+
describe('newAgetActionObservable', () => {
44+
beforeEach(() => {
45+
(getNewActionsSince as jest.MockedFunction<typeof getNewActionsSince>).mockReset();
46+
});
47+
it('should work, call get actions until there is new action', async () => {
48+
const observable = createNewActionsSharedObservable();
49+
50+
getMockedNewActionSince()
51+
.mockResolvedValueOnce([])
52+
.mockResolvedValueOnce([
53+
({ id: 'action1', created_at: new Date().toISOString() } as unknown) as AgentAction,
54+
])
55+
.mockResolvedValueOnce([
56+
({ id: 'action2', created_at: new Date().toISOString() } as unknown) as AgentAction,
57+
]);
58+
// First call
59+
const promise = observable.pipe(take(1)).toPromise();
60+
61+
jest.advanceTimersByTime(5000);
62+
await waitForPromiseResolved();
63+
jest.advanceTimersByTime(5000);
64+
await waitForPromiseResolved();
65+
66+
const res = await promise;
67+
expect(getNewActionsSince).toBeCalledTimes(2);
68+
expect(res).toHaveLength(1);
69+
expect(res[0].id).toBe('action1');
70+
// Second call
71+
const secondSubscription = observable.pipe(take(1)).toPromise();
72+
73+
jest.advanceTimersByTime(5000);
74+
await waitForPromiseResolved();
75+
76+
const secondRes = await secondSubscription;
77+
expect(secondRes).toHaveLength(1);
78+
expect(secondRes[0].id).toBe('action2');
79+
expect(getNewActionsSince).toBeCalledTimes(3);
80+
// It should call getNewActionsSince with the last action returned
81+
expect(getMockedNewActionSince().mock.calls[2][1]).toBe(res[0].created_at);
82+
});
83+
84+
it('should not fetch actions concurrently', async () => {
85+
const observable = createNewActionsSharedObservable();
86+
87+
const resolves: Array<() => void> = [];
88+
getMockedNewActionSince().mockImplementation(() => {
89+
return new Promise((resolve) => {
90+
resolves.push(resolve);
91+
});
92+
});
93+
94+
observable.pipe(take(1)).toPromise();
95+
96+
jest.advanceTimersByTime(5000);
97+
await waitForPromiseResolved();
98+
jest.advanceTimersByTime(5000);
99+
await waitForPromiseResolved();
100+
jest.advanceTimersByTime(5000);
101+
await waitForPromiseResolved();
102+
103+
expect(getNewActionsSince).toBeCalledTimes(1);
104+
});
105+
});
106+
23107
describe('createAgentActionFromPolicyAction()', () => {
24108
const mockSavedObjectsClient = savedObjectsClientMock.create();
25109
const mockAgent: Agent = {

x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
share,
1313
distinctUntilKeyChanged,
1414
switchMap,
15+
exhaustMap,
1516
concatMap,
1617
merge,
1718
filter,
@@ -62,18 +63,28 @@ function getInternalUserSOClient() {
6263
return appContextService.getInternalUserSOClient(fakeRequest);
6364
}
6465

65-
function createNewActionsSharedObservable(): Observable<AgentAction[]> {
66+
export function createNewActionsSharedObservable(): Observable<AgentAction[]> {
6667
let lastTimestamp = new Date().toISOString();
6768

6869
return timer(0, AGENT_UPDATE_ACTIONS_INTERVAL_MS).pipe(
69-
switchMap(() => {
70+
exhaustMap(() => {
7071
const internalSOClient = getInternalUserSOClient();
7172

72-
const timestamp = lastTimestamp;
73-
lastTimestamp = new Date().toISOString();
74-
return from(getNewActionsSince(internalSOClient, timestamp));
73+
return from(
74+
getNewActionsSince(internalSOClient, lastTimestamp).then((data) => {
75+
if (data.length > 0) {
76+
lastTimestamp = data.reduce((acc, action) => {
77+
return acc >= action.created_at ? acc : action.created_at;
78+
}, lastTimestamp);
79+
}
80+
81+
return data;
82+
})
83+
);
84+
}),
85+
filter((data) => {
86+
return data.length > 0;
7587
}),
76-
filter((data) => data.length > 0),
7788
share()
7889
);
7990
}

0 commit comments

Comments
 (0)