Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { Subject } from 'rxjs';
import { loggingSystemMock, savedObjectsServiceMock } from 'src/core/server/mocks';
import { LicenseService } from '../../../../common/license/license';
import { createPackagePolicyServiceMock } from '../../../../../fleet/server/mocks';
import { PolicyWatcher } from './license_watch';
import { ILicense } from '../../../../../licensing/common/types';
import { licenseMock } from '../../../../../licensing/common/licensing.mock';
import { PackagePolicyServiceInterface } from '../../../../../fleet/server';
import { PackagePolicy } from '../../../../../fleet/common';
import { createPackagePolicyMock } from '../../../../../fleet/common/mocks';
import { factory } from '../../../../common/endpoint/models/policy_config';
import { PolicyConfig } from '../../../../common/endpoint/types';

const MockPPWithEndpointPolicy = (cb?: (p: PolicyConfig) => PolicyConfig): PackagePolicy => {
const packagePolicy = createPackagePolicyMock();
if (!cb) {
// eslint-disable-next-line no-param-reassign
cb = (p) => p;
}
const policyConfig = cb(factory());
packagePolicy.inputs[0].config = { policy: { value: policyConfig } };
return packagePolicy;
};

describe('Policy-Changing license watcher', () => {
const logger = loggingSystemMock.create().get('license_watch.test');
const soStartMock = savedObjectsServiceMock.createStartContract();
let packagePolicySvcMock: jest.Mocked<PackagePolicyServiceInterface>;

const Platinum = licenseMock.createLicense({ license: { type: 'platinum', mode: 'platinum' } });
const Gold = licenseMock.createLicense({ license: { type: 'gold', mode: 'gold' } });
const Basic = licenseMock.createLicense({ license: { type: 'basic', mode: 'basic' } });

beforeEach(() => {
packagePolicySvcMock = createPackagePolicyServiceMock();
});

it('is activated on license changes', () => {
// mock a license-changing service to test reactivity
const licenseEmitter: Subject<ILicense> = new Subject();
const licenseService = new LicenseService();
const pw = new PolicyWatcher(packagePolicySvcMock, soStartMock, logger);

// swap out watch function, just to ensure it gets called when a license change happens
const mockWatch = jest.fn();
pw.watch = mockWatch;

// licenseService is watching our subject for incoming licenses
licenseService.start(licenseEmitter);
pw.start(licenseService); // and the PolicyWatcher under test, uses that to subscribe as well

// Enqueue a license change!
licenseEmitter.next(Platinum);

// policywatcher should have triggered
expect(mockWatch.mock.calls.length).toBe(1);

pw.stop();
licenseService.stop();
licenseEmitter.complete();
});

it('pages through all endpoint policies', async () => {
const TOTAL = 247;

// set up the mocked package policy service to return and do what we want
packagePolicySvcMock.list
.mockResolvedValueOnce({
items: Array.from({ length: 100 }, () => MockPPWithEndpointPolicy()),
total: TOTAL,
page: 1,
perPage: 100,
})
.mockResolvedValueOnce({
items: Array.from({ length: 100 }, () => MockPPWithEndpointPolicy()),
total: TOTAL,
page: 2,
perPage: 100,
})
.mockResolvedValueOnce({
items: Array.from({ length: TOTAL - 200 }, () => MockPPWithEndpointPolicy()),
total: TOTAL,
page: 3,
perPage: 100,
});

const pw = new PolicyWatcher(packagePolicySvcMock, soStartMock, logger);
await pw.watch(Gold); // just manually trigger with a given license

expect(packagePolicySvcMock.list.mock.calls.length).toBe(3); // should have asked for 3 pages of resuts

// Assert: on the first call to packagePolicy.list, we asked for page 1
expect(packagePolicySvcMock.list.mock.calls[0][1].page).toBe(1);
expect(packagePolicySvcMock.list.mock.calls[1][1].page).toBe(2); // second call, asked for page 2
expect(packagePolicySvcMock.list.mock.calls[2][1].page).toBe(3); // etc
});

it('alters no-longer-licensed features', async () => {
const CustomMessage = 'Custom string';

// mock a Policy with a higher-tiered feature enabled
packagePolicySvcMock.list.mockResolvedValueOnce({
items: [
MockPPWithEndpointPolicy(
(pc: PolicyConfig): PolicyConfig => {
pc.windows.popup.malware.message = CustomMessage;
return pc;
}
),
],
total: 1,
page: 1,
perPage: 100,
});

const pw = new PolicyWatcher(packagePolicySvcMock, soStartMock, logger);

// emulate a license change below paid tier
await pw.watch(Basic);

expect(packagePolicySvcMock.update).toHaveBeenCalled();
expect(
packagePolicySvcMock.update.mock.calls[0][2].inputs[0].config!.policy.value.windows.popup
.malware.message
).not.toEqual(CustomMessage);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { Subscription } from 'rxjs';

import {
KibanaRequest,
Logger,
SavedObjectsClientContract,
SavedObjectsServiceStart,
} from 'src/core/server';
import { PackagePolicy, PACKAGE_POLICY_SAVED_OBJECT_TYPE } from '../../../../../fleet/common';
import { PackagePolicyServiceInterface } from '../../../../../fleet/server';
import { ILicense } from '../../../../../licensing/common/types';
import {
isEndpointPolicyValidForLicense,
unsetPolicyFeaturesAboveLicenseLevel,
} from '../../../../common/license/policy_config';
import { isAtLeast, LicenseService } from '../../../../common/license/license';

export class PolicyWatcher {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PolicyWatcher confused me a little because it did not mention that it's really a license watcher. Maybe LicenseWatcher would be more appropriate?

(ps. I can see this being used by other things in the future that might not be Policy related)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(two hard problems joke here)

This is in server/endpoint/lib/policy, and it's job is to watch license changes, in order to keep policy up to date. I'm open to better names. LicenseWatcher is a possibility, but note that our other class, LicenseService is copy-pasted from about 4 other plugins that have the same class. But some of them call that LicenseWatcher. Wanted to prevent that confusion.

Also I do agree we may want other side-effects to license changes that may not be policy related. But those should create their own subscription via LicenseService observable. Not use this. Observables are designed to have anyone subscribe who also needs it, not collectively boggarting off of one sub.

private logger: Logger;
private soClient: SavedObjectsClientContract;
private policyService: PackagePolicyServiceInterface;
private subscription: Subscription | undefined;
constructor(
policyService: PackagePolicyServiceInterface,
soStart: SavedObjectsServiceStart,
logger: Logger
) {
this.policyService = policyService;
this.soClient = this.makeInternalSOClient(soStart);
this.logger = logger;
}

/**
* The policy watcher is not called as part of a HTTP request chain, where the
* request-scoped SOClient could be passed down. It is called via license observable
* changes. We are acting as the 'system' in response to license changes, so we are
* intentionally using the system user here. Be very aware of what you are using this
* client to do
*/
private makeInternalSOClient(soStart: SavedObjectsServiceStart): SavedObjectsClientContract {
const fakeRequest = ({
headers: {},
getBasePath: () => '',
path: '/',
route: { settings: {} },
url: { href: {} },
raw: { req: { url: '/' } },
} as unknown) as KibanaRequest;
return soStart.getScopedClient(fakeRequest, { excludedWrappers: ['security'] });
}

public start(licenseService: LicenseService) {
this.subscription = licenseService.getLicenseInformation$()?.subscribe(this.watch.bind(this));
}

public stop() {
if (this.subscription) {
this.subscription.unsubscribe();
}
}

public async watch(license: ILicense) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At one point we had talked about possibly doing the "migration" slightly delayed from when the observable triggers in order to try and account for edge case where requests might be in flight that would override our changes here. Delaying the migration would allow for existing one to complete ++ any future ones would be auto-rejected by the API handles, so migrating should be able to account for all records.

Also - do you need to protect against the license changing while a migration is in flight?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I was part of any of these discussions. We want to add a sleep here?

Do we expect other things changing at the same moment the license changes? What's this about migrations?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re:

Do we expect other things changing at the same moment the license changes? What's this about migrations?

My understanding is that changing of license can happen while the system is running - there is downtime when it happens. It just happens

re:

What's this about migrations?

Sorry for the confusion. In my mind, what this service here is doing is a migration - thats what I mean.

if (isAtLeast(license, 'platinum')) {
return;
}

let page = 1;
let response: {
items: PackagePolicy[];
total: number;
page: number;
perPage: number;
};
do {
try {
response = await this.policyService.list(this.soClient, {
page: page++,
perPage: 100,
kuery: `${PACKAGE_POLICY_SAVED_OBJECT_TYPE}.package.name: endpoint`,
});
} catch (e) {
this.logger.warn(
`Unable to verify endpoint policies in line with license change: failed to fetch package policies: ${e.message}`
);
return;
}
response.items.forEach(async (policy) => {
const policyConfig = policy.inputs[0].config?.policy.value;
if (!isEndpointPolicyValidForLicense(policyConfig, license)) {
policy.inputs[0].config!.policy.value = unsetPolicyFeaturesAboveLicenseLevel(
policyConfig,
license
);
try {
await this.policyService.update(this.soClient, policy.id, policy);
} catch (e) {
// try again for transient issues
try {
await this.policyService.update(this.soClient, policy.id, policy);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the problem encountered as a version conflict error, then you can't just "try" again with the same payload. You will need to retrieve the record again and redo updates, and then use that to update record for the update.

One way to test this with a running kibana:

above line 102, update policy.version to something like 123. That update should fail for the version missmatch.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given our most recent conversation, will you leave this "retry" here or remove it and address it in the subsequent issue?

I'm ok either way

} catch (ee) {
this.logger.warn(
`Unable to remove platinum features from policy ${policy.id}: ${ee.message}`
);
}
}
}
});
} while (response.page * response.perPage < response.total);
}
}
10 changes: 9 additions & 1 deletion x-pack/plugins/security_solution/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ import {
TelemetryPluginSetup,
} from '../../../../src/plugins/telemetry/server';
import { licenseService } from './lib/license/license';
import { PolicyWatcher } from './endpoint/lib/policy/license_watch';

export interface SetupPlugins {
alerts: AlertingSetup;
Expand Down Expand Up @@ -127,6 +128,7 @@ export class Plugin implements IPlugin<PluginSetup, PluginStart, SetupPlugins, S

private lists: ListPluginSetup | undefined; // TODO: can we create ListPluginStart?
private licensing$!: Observable<ILicense>;
private policyWatcher?: PolicyWatcher;

private manifestTask: ManifestTask | undefined;
private exceptionsCache: LRU<string, Buffer>;
Expand Down Expand Up @@ -370,14 +372,20 @@ export class Plugin implements IPlugin<PluginSetup, PluginStart, SetupPlugins, S
this.telemetryEventsSender.start(core, plugins.telemetry);
this.licensing$ = plugins.licensing.license$;
licenseService.start(this.licensing$);

this.policyWatcher = new PolicyWatcher(
plugins.fleet!.packagePolicyService,
core.savedObjects,
this.logger
);
this.policyWatcher.start(licenseService);
return {};
}

public stop() {
this.logger.debug('Stopping plugin');
this.telemetryEventsSender.stop();
this.endpointAppContextService.stop();
this.policyWatcher?.stop();
licenseService.stop();
}
}