Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions x-pack/platform/plugins/shared/fleet/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import type {
} from '@kbn/core/server';
import { DEFAULT_APP_CATEGORIES, SavedObjectsClient, ServiceStatusLevels } from '@kbn/core/server';
import type { UsageCollectionSetup } from '@kbn/usage-collection-plugin/server';

import { LockManagerService } from '@kbn/lock-manager';
import type { TelemetryPluginSetup, TelemetryPluginStart } from '@kbn/telemetry-plugin/server';
import type { PluginStart as DataPluginStart } from '@kbn/data-plugin/server';
import type { LicensingPluginStart } from '@kbn/licensing-plugin/server';
Expand All @@ -49,9 +49,7 @@ import type {
} from '@kbn/task-manager-plugin/server';

import type { CloudSetup } from '@kbn/cloud-plugin/server';

import type { SpacesPluginStart } from '@kbn/spaces-plugin/server';

import type { SavedObjectTaggingStart } from '@kbn/saved-objects-tagging-plugin/server';

import { SECURITY_EXTENSION_ID } from '@kbn/core-saved-objects-server';
Expand Down Expand Up @@ -207,6 +205,7 @@ export interface FleetAppContext {
taskManagerStart?: TaskManagerStartContract;
fetchUsage?: (abortController: AbortController) => Promise<FleetUsage | undefined>;
syncIntegrationsTask: SyncIntegrationsTask;
lockManagerService?: LockManagerService;
}

export type FleetSetupContract = void;
Expand Down Expand Up @@ -317,6 +316,7 @@ export class FleetPlugin
private packagePolicyService?: PackagePolicyService;
private policyWatcher?: PolicyWatcher;
private fetchUsage?: (abortController: AbortController) => Promise<FleetUsage | undefined>;
private lockManagerService?: LockManagerService;

constructor(private readonly initializerContext: PluginInitializerContext) {
this.config$ = this.initializerContext.config.create<FleetConfigType>();
Expand Down Expand Up @@ -673,6 +673,7 @@ export class FleetPlugin
taskManager: deps.taskManager,
logFactory: this.initializerContext.logger,
});
this.lockManagerService = new LockManagerService(core, this.initializerContext.logger.get());

// Register fields metadata extractors
registerFieldsMetadataExtractors({ core, fieldsMetadata: deps.fieldsMetadata });
Expand Down Expand Up @@ -725,6 +726,7 @@ export class FleetPlugin
taskManagerStart: plugins.taskManager,
fetchUsage: this.fetchUsage,
syncIntegrationsTask: this.syncIntegrationsTask!,
lockManagerService: this.lockManagerService,
});
licenseService.start(plugins.licensing.license$);
this.telemetryEventsSender.start(plugins.telemetry, core).catch(() => {});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { kibanaPackageJson } from '@kbn/repo-info';

import type { HttpServiceSetup, KibanaRequest } from '@kbn/core-http-server';
import { kibanaRequestFactory } from '@kbn/core-http-server-utils';

import type { PluginStart as DataPluginStart } from '@kbn/data-plugin/server';
import type {
EncryptedSavedObjectsClient,
Expand All @@ -28,6 +27,7 @@ import type { TaskManagerStartContract } from '@kbn/task-manager-plugin/server';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { SecurityServiceStart } from '@kbn/core-security-server';
import type { Logger } from '@kbn/logging';
import type { LockManagerService } from '@kbn/lock-manager';

import type { FleetConfigType } from '../../common/types';
import {
Expand All @@ -49,10 +49,10 @@ import type { FleetAppContext } from '../plugin';
import type { TelemetryEventsSender } from '../telemetry/sender';
import { UNINSTALL_TOKENS_SAVED_OBJECT_TYPE } from '../constants';
import type { MessageSigningServiceInterface } from '..';
import type { FleetUsage } from '../collectors/register';

import type { BulkActionsResolver } from './agents/bulk_actions_resolver';
import { type UninstallTokenServiceInterface } from './security/uninstall_token_service';
import type { FleetUsage } from '../collectors/register';

class AppContextService {
private encryptedSavedObjects: EncryptedSavedObjectsClient | undefined;
Expand Down Expand Up @@ -82,6 +82,7 @@ class AppContextService {
private uninstallTokenService: UninstallTokenServiceInterface | undefined;
private taskManagerStart: TaskManagerStartContract | undefined;
private fetchUsage?: (abortController: AbortController) => Promise<FleetUsage | undefined>;
private lockManagerService: LockManagerService | undefined;

public start(appContext: FleetAppContext) {
this.data = appContext.data;
Expand All @@ -108,6 +109,7 @@ class AppContextService {
this.uninstallTokenService = appContext.uninstallTokenService;
this.taskManagerStart = appContext.taskManagerStart;
this.fetchUsage = appContext.fetchUsage;
this.lockManagerService = appContext.lockManagerService;

if (appContext.config$) {
this.config$ = appContext.config$;
Expand Down Expand Up @@ -351,6 +353,10 @@ class AppContextService {
public getFetchUsage() {
return this.fetchUsage;
}

public getLockManagerService() {
return this.lockManagerService;
}
}

export const appContextService = new AppContextService();
105 changes: 41 additions & 64 deletions x-pack/platform/plugins/shared/fleet/server/services/setup.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import { savedObjectsClientMock } from '@kbn/core/server/mocks';
import type { ElasticsearchClientMock } from '@kbn/core/server/mocks';
import { loggerMock } from '@kbn/logging-mocks';
import { LockAcquisitionError } from '@kbn/lock-manager';

import type { Logger } from '@kbn/core/server';

Expand All @@ -20,7 +21,7 @@ import { appContextService } from './app_context';
import { getInstallations } from './epm/packages';
import { setupUpgradeManagedPackagePolicies } from './setup/managed_package_policies';
import { getPreconfiguredDeleteUnenrolledAgentsSettingFromConfig } from './preconfiguration/delete_unenrolled_agent_setting';
import { setupFleet } from './setup';
import { _runSetupWithLock, setupFleet } from './setup';
import { isPackageInstalled } from './epm/packages/install';
import { upgradeAgentPolicySchemaVersion } from './setup/upgrade_agent_policy_schema_version';
import { createOrUpdateFleetSyncedIntegrationsIndex } from './setup/fleet_synced_integrations';
Expand Down Expand Up @@ -141,69 +142,6 @@ describe('setupFleet', () => {
});
});

it('should create and delete lock if not exists', async () => {
const soClient = getMockedSoClient();

soClient.get.mockRejectedValue({ isBoom: true, output: { statusCode: 404 } } as any);

const result = await setupFleet(soClient, esClient, { useLock: true });

expect(result).toEqual({
isInitialized: true,
nonFatalErrors: [],
});
expect(soClient.create).toHaveBeenCalledWith('fleet-setup-lock', expect.anything(), {
id: 'fleet-setup-lock',
});
expect(soClient.delete).toHaveBeenCalledWith('fleet-setup-lock', 'fleet-setup-lock', {
refresh: true,
});
});

it('should return not initialized if lock exists', async () => {
const soClient = getMockedSoClient();

const result = await setupFleet(soClient, esClient, { useLock: true });

expect(result).toEqual({
isInitialized: false,
nonFatalErrors: [],
});
expect(soClient.create).not.toHaveBeenCalled();
expect(soClient.delete).not.toHaveBeenCalled();
});

it('should return not initialized if lock could not be created', async () => {
const soClient = getMockedSoClient();

soClient.get.mockRejectedValue({ isBoom: true, output: { statusCode: 404 } } as any);
soClient.create.mockRejectedValue({ isBoom: true, output: { statusCode: 409 } } as any);
const result = await setupFleet(soClient, esClient, { useLock: true });

expect(result).toEqual({
isInitialized: false,
nonFatalErrors: [],
});
expect(soClient.delete).not.toHaveBeenCalled();
});

it('should delete previous lock if created more than 1 hour ago', async () => {
const soClient = getMockedSoClient();

soClient.get.mockResolvedValue({
attributes: { started_at: new Date(Date.now() - 60 * 60 * 1000 - 1000).toISOString() },
} as any);

const result = await setupFleet(soClient, esClient, { useLock: true });

expect(result).toEqual({
isInitialized: true,
nonFatalErrors: [],
});
expect(soClient.create).toHaveBeenCalled();
expect(soClient.delete).toHaveBeenCalledTimes(2);
});

it('should return non fatal errors when generateKeyPair result has errors', async () => {
const soClient = getMockedSoClient();

Expand All @@ -228,3 +166,42 @@ describe('setupFleet', () => {
});
});
});

describe('_runSetupWithLock', () => {
let mockedWithLock: jest.Mock<any, any, any>;
beforeEach(() => {
mockedWithLock = jest.fn();
mockedAppContextService.getLockManagerService.mockReturnValue({
withLock: mockedWithLock as any,
} as any);
});
it('should retry on lock acquisition error', async () => {
mockedWithLock
.mockImplementationOnce(async () => {
throw new LockAcquisitionError('test');
})
.mockImplementationOnce(async (id, fn) => {
return fn();
});

const setupFn = jest.fn();
await _runSetupWithLock(setupFn);

expect(setupFn).toHaveBeenCalled();
expect(mockedWithLock).toHaveBeenCalledTimes(2);
});

it('should not retry on setupFn error', async () => {
mockedWithLock.mockImplementation(async (id, fn) => {
return fn();
});

const setupFn = jest.fn();
setupFn.mockRejectedValue(new Error('test'));

await expect(_runSetupWithLock(setupFn)).rejects.toThrow(/test/);

expect(setupFn).toHaveBeenCalled();
expect(mockedWithLock).toHaveBeenCalledTimes(1);
});
});
99 changes: 24 additions & 75 deletions x-pack/platform/plugins/shared/fleet/server/services/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@ import apm from 'elastic-apm-node';

import { compact } from 'lodash';
import pMap from 'p-map';
import { v4 as uuidv4 } from 'uuid';
import pRetry from 'p-retry';

import type { ElasticsearchClient, SavedObjectsClientContract } from '@kbn/core/server';
import { DEFAULT_SPACE_ID } from '@kbn/spaces-plugin/common/constants';
import { LockAcquisitionError } from '@kbn/lock-manager';

import { MessageSigningError } from '../../common/errors';

import { AUTO_UPDATE_PACKAGES, FLEET_SETUP_LOCK_TYPE } from '../../common/constants';
import { AUTO_UPDATE_PACKAGES } from '../../common/constants';
import type { PreconfigurationError } from '../../common/constants';
import type { DefaultPackagesInstallationError, FleetSetupLock } from '../../common/types';
import type { DefaultPackagesInstallationError } from '../../common/types';

import { MAX_CONCURRENT_EPM_PACKAGES_INSTALLATIONS } from '../constants';

Expand Down Expand Up @@ -78,6 +80,20 @@ export interface SetupStatus {
>;
}

export async function _runSetupWithLock(setupFn: () => Promise<SetupStatus>) {
return await pRetry(
() => appContextService.getLockManagerService()!.withLock('fleet-setup', () => setupFn()),
{
onFailedAttempt: async (error) => {
if (!(error instanceof LockAcquisitionError)) {
throw error;
}
},
maxRetryTime: 5 * 60 * 1000, // Retry for 5 minute to get the lock
}
);
}
Comment on lines +83 to +95
Copy link
Member

@sorenlouv sorenlouv May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a bug in the retry logic here. From the pRetry documentation:

If the onFailedAttempt function throws, all retries will be aborted and the original promise will reject with the thrown error.

Right now all errors but LockAcquisitionError are thrown, meaning they are aborted without retries. Only LockAcquisitionError's are retried.

Is that the intention?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that the intention, we already have some retry logic in the Fleet setup.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All right. Why would you want to retry setupFn, if it's already running? Then it would run multiple times - although sequentially, not in parallel.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All right. Why would you want to retry setupFn, if it's already running? Then

We want to be sure the setup ran and finish at least once If it's running on a different instance Kibana, but that instance is killed for any reason, this way we will retry, if the setup was already successful that operation should be relatively quick.


export async function setupFleet(
soClient: SavedObjectsClientContract,
esClient: ElasticsearchClient,
Expand All @@ -86,87 +102,20 @@ export async function setupFleet(
} = { useLock: false }
): Promise<SetupStatus> {
const t = apm.startTransaction('fleet-setup', 'fleet');
let created = false;
try {
if (options.useLock) {
const { created: isCreated, toReturn } = await createLock(soClient);
created = isCreated;
if (toReturn) return toReturn;
return _runSetupWithLock(() =>
awaitIfPending(async () => createSetupSideEffects(soClient, esClient))
);
} else {
return await awaitIfPending(async () => createSetupSideEffects(soClient, esClient));
}
return await awaitIfPending(async () => createSetupSideEffects(soClient, esClient));
} catch (error) {
apm.captureError(error);
t.setOutcome('failure');
throw error;
} finally {
t.end();
// only delete lock if it was created by this instance
if (options.useLock && created) {
await deleteLock(soClient);
}
}
}

async function createLock(
soClient: SavedObjectsClientContract
): Promise<{ created: boolean; toReturn?: SetupStatus }> {
const logger = appContextService.getLogger();
let created;
try {
// check if fleet setup is already started
const fleetSetupLock = await soClient.get<FleetSetupLock>(
FLEET_SETUP_LOCK_TYPE,
FLEET_SETUP_LOCK_TYPE
);

const LOCK_TIMEOUT = 60 * 60 * 1000; // 1 hour

// started more than 1 hour ago, delete previous lock
if (
fleetSetupLock.attributes.started_at &&
new Date(fleetSetupLock.attributes.started_at).getTime() < Date.now() - LOCK_TIMEOUT
) {
await deleteLock(soClient);
} else {
logger.info('Fleet setup already in progress, abort setup');
return { created: false, toReturn: { isInitialized: false, nonFatalErrors: [] } };
}
} catch (error) {
if (error.isBoom && error.output.statusCode === 404) {
logger.debug('Fleet setup lock does not exist, continue setup');
}
}

try {
created = await soClient.create<FleetSetupLock>(
FLEET_SETUP_LOCK_TYPE,
{
status: 'in_progress',
uuid: uuidv4(),
started_at: new Date().toISOString(),
},
{ id: FLEET_SETUP_LOCK_TYPE }
);
if (logger.isLevelEnabled('debug')) {
logger.debug(`Fleet setup lock created: ${JSON.stringify(created)}`);
}
} catch (error) {
logger.info(`Could not create fleet setup lock, abort setup: ${error}`);
return { created: false, toReturn: { isInitialized: false, nonFatalErrors: [] } };
}
return { created: !!created };
}

async function deleteLock(soClient: SavedObjectsClientContract) {
const logger = appContextService.getLogger();
try {
await soClient.delete(FLEET_SETUP_LOCK_TYPE, FLEET_SETUP_LOCK_TYPE, { refresh: true });
logger.debug(`Fleet setup lock deleted`);
} catch (error) {
// ignore 404 errors
if (error.statusCode !== 404) {
logger.error('Could not delete fleet setup lock', error);
}
}
}

Expand Down
1 change: 1 addition & 0 deletions x-pack/platform/plugins/shared/fleet/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,6 @@
"@kbn/core-http-server-utils",
"@kbn/core-notifications-browser-mocks",
"@kbn/handlebars",
"@kbn/lock-manager",
]
}
Loading