Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ export class StreamsClient {
scopedClusterClient: this.dependencies.scopedClusterClient,
});
if (!privileges.read) {
throw new DefinitionNotFoundError(`Stream definition for ${name} not found`);
throw new SecurityError(`Cannot read stream, insufficient privileges`);
}
}
return streamDefinition;
Expand Down Expand Up @@ -382,7 +382,7 @@ export class StreamsClient {
checkAccess({ name, scopedClusterClient: this.dependencies.scopedClusterClient }).then(
(privileges) => {
if (!privileges.read) {
throw new DefinitionNotFoundError(`Stream definition for ${name} not found`);
throw new SecurityError(`Cannot read stream, insufficient privileges`);
}
}
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import {
isUnwiredStreamDefinition,
} from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import { SecurityError } from '../../../../lib/streams/errors/security_error';
import { WrongStreamTypeError } from '../../../../lib/streams/errors/wrong_stream_type_error';
import {
checkAccess,
getUnmanagedElasticsearchAssetDetails,
getUnmanagedElasticsearchAssets,
} from '../../../../lib/streams/stream_crud';
import { createServerRoute } from '../../../create_server_route';
import { DefinitionNotFoundError } from '../../../../lib/streams/errors/definition_not_found_error';

export const sampleStreamRoute = createServerRoute({
endpoint: 'POST /internal/streams/{name}/_sample',
Expand Down Expand Up @@ -49,7 +49,7 @@ export const sampleStreamRoute = createServerRoute({
const { read } = await checkAccess({ name: params.path.name, scopedClusterClient });

if (!read) {
throw new DefinitionNotFoundError(`Stream definition for ${params.path.name} not found`);
throw new SecurityError(`Cannot read stream ${params.path.name}, insufficient privileges`);
}

const { if: condition, start, end, size } = params.body;
Expand Down Expand Up @@ -125,7 +125,7 @@ export const unmanagedAssetDetailsRoute = createServerRoute({
const { read } = await checkAccess({ name: params.path.name, scopedClusterClient });

if (!read) {
throw new DefinitionNotFoundError(`Stream definition for ${params.path.name} not found`);
throw new SecurityError(`Cannot read stream ${params.path.name}, insufficient privileges`);
}

const stream = await streamsClient.getStream(params.path.name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import {
processorWithIdDefinitionSchema,
} from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import { SecurityError } from '../../../../lib/streams/errors/security_error';
import { checkAccess } from '../../../../lib/streams/stream_crud';
import { createServerRoute } from '../../../create_server_route';
import { DefinitionNotFoundError } from '../../../../lib/streams/errors/definition_not_found_error';
import { ProcessingSimulationParams, simulateProcessing } from './simulation_handler';
import { handleProcessingSuggestion } from './suggestions_handler';

Expand Down Expand Up @@ -45,7 +45,7 @@ export const simulateProcessorRoute = createServerRoute({

const { read } = await checkAccess({ name: params.path.name, scopedClusterClient });
if (!read) {
throw new DefinitionNotFoundError(`Stream definition for ${params.path.name} not found.`);
throw new SecurityError(`Cannot read stream ${params.path.name}, insufficient privileges`);
}

return simulateProcessing({ params, scopedClusterClient, streamsClient });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { z } from '@kbn/zod';
import { getFlattenedObject } from '@kbn/std';
import {
SampleDocument,
fieldDefinitionConfigSchema,
isWiredStreamDefinition,
} from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import { SecurityError } from '../../../../lib/streams/errors/security_error';
import { checkAccess } from '../../../../lib/streams/stream_crud';
import { createServerRoute } from '../../../create_server_route';
import { DefinitionNotFoundError } from '../../../../lib/streams/errors/definition_not_found_error';

const UNMAPPED_SAMPLE_SIZE = 500;

Expand Down Expand Up @@ -118,7 +118,7 @@ export const schemaFieldsSimulationRoute = createServerRoute({
const { read } = await checkAccess({ name: params.path.name, scopedClusterClient });

if (!read) {
throw new DefinitionNotFoundError(`Stream definition for ${params.path.name} not found.`);
throw new SecurityError(`Cannot read stream ${params.path.name}, insufficient privileges`);
}

const userFieldDefinitions = params.body.field_definitions.flatMap((field) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
* 2.0.
*/
import type { StreamsRouteRepository } from '@kbn/streams-plugin/server';
import { RoleScopedSupertestProvider } from '../../../../services/role_scoped_supertest';
import {
RepositorySupertestClient,
getAdminApiClient,
getCustomRoleApiClient,
} from '../../../../../../common/utils/server_route_repository/create_admin_service_from_repository';
import { CustomRoleScopedSupertestProvider } from '../../../../services/custom_role_scoped_supertest';
import { RoleScopedSupertestProvider } from '../../../../services/role_scoped_supertest';

export type StreamsSupertestRepositoryClient = RepositorySupertestClient<StreamsRouteRepository>;

Expand All @@ -18,3 +20,9 @@ export async function createStreamsRepositoryAdminClient(
): Promise<StreamsSupertestRepositoryClient> {
return getAdminApiClient<StreamsRouteRepository>(st);
}

export async function createStreamsRepositoryCustomRoleClient(
st: ReturnType<typeof CustomRoleScopedSupertestProvider>
): Promise<StreamsSupertestRepositoryClient> {
return getCustomRoleApiClient<StreamsRouteRepository>(st);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { DeploymentAgnosticFtrProviderContext } from '../ftr_provider_context';
import { SupertestWithRoleScope } from './role_scoped_supertest';

export interface RequestHeadersOptions {
useCookieHeader?: boolean;
withInternalHeaders?: boolean;
withCommonHeaders?: boolean;
withCustomHeaders?: Record<string, string>;
}

/**
* Provides a customized 'supertest' instance that is authenticated using the custom role-based API key
* and enriched with the appropriate request headers. This service allows you to perform
* HTTP requests with specific authentication and header configurations, ensuring that
* the requests are scoped to the provided role and environment.
*
* Use this service to easily test API endpoints with role-specific authorization and
* custom headers, both in serverless and stateful environments.
*
* Pass '{ useCookieHeader: true }' to use Cookie header for authentication instead of API key.
* It is the correct way to perform HTTP requests for internal end-points.
*/
export function CustomRoleScopedSupertestProvider({
getService,
}: DeploymentAgnosticFtrProviderContext) {
const supertestWithoutAuth = getService('supertestWithoutAuth');
const samlAuth = getService('samlAuth');

return {
async getSupertestWithCustomRoleScope(
options: RequestHeadersOptions = {
useCookieHeader: false,
withCommonHeaders: false,
withInternalHeaders: false,
}
) {
// if 'useCookieHeader' set to 'true', HTTP requests will be called with cookie Header (like in browser)
if (options.useCookieHeader) {
const cookieHeader = await samlAuth.getM2MApiCookieCredentialsWithCustomRoleScope();
return new SupertestWithRoleScope(cookieHeader, supertestWithoutAuth, samlAuth, options);
}

// HTTP requests will be called with API key in header by default
const roleAuthc = await samlAuth.createM2mApiKeyWithCustomRoleScope();
return new SupertestWithRoleScope(roleAuthc, supertestWithoutAuth, samlAuth, options);
},
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { DataViewApiProvider } from './data_view_api';
import { deploymentAgnosticServices } from './deployment_agnostic_services';
import { PackageApiProvider } from './package_api';
import { RoleScopedSupertestProvider, SupertestWithRoleScope } from './role_scoped_supertest';
import { CustomRoleScopedSupertestProvider } from './custom_role_scoped_supertest';
import { SloApiProvider } from './slo_api';
import { SynthtraceProvider } from './synthtrace';
import { ApmApiProvider } from './apm_api';
Expand All @@ -31,6 +32,7 @@ export const services = {
packageApi: PackageApiProvider,
sloApi: SloApiProvider,
roleScopedSupertest: RoleScopedSupertestProvider,
customRoleScopedSupertest: CustomRoleScopedSupertestProvider,
// create a new deployment-agnostic service and load here
synthtrace: SynthtraceProvider,
apmApi: ApmApiProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ import {
import { Subtract, RequiredKeys } from 'utility-types';
import { format } from 'url';
import supertest from 'supertest';
import { RoleScopedSupertestProvider } from '../../../api_integration/deployment_agnostic/services/role_scoped_supertest';
import {
RoleScopedSupertestProvider,
SupertestWithRoleScope,
} from '../../../api_integration/deployment_agnostic/services/role_scoped_supertest';
import { CustomRoleScopedSupertestProvider } from '../../../api_integration/deployment_agnostic/services/custom_role_scoped_supertest';

type MaybeOptional<TArgs extends Record<string, any>> = RequiredKeys<TArgs> extends never
? [TArgs] | []
Expand Down Expand Up @@ -44,13 +48,9 @@ type RepositorySupertestReturnOf<
}>
>;

export async function getAdminApiClient<TServerRouteRepository extends ServerRouteRepository>(
st: ReturnType<typeof RoleScopedSupertestProvider>
async function getApiClient<TServerRouteRepository extends ServerRouteRepository>(
supertestWithRoleScoped: SupertestWithRoleScope
): Promise<RepositorySupertestClient<TServerRouteRepository>> {
const supertestAdmin = await st.getSupertestWithRoleScope('admin', {
useCookieHeader: true,
withInternalHeaders: true,
});
return {
fetch: (endpoint, ...rest) => {
const options = rest.length ? rest[0] : { type: undefined };
Expand All @@ -74,7 +74,7 @@ export async function getAdminApiClient<TServerRouteRepository extends ServerRou
let res: unknown;
if (type === 'form-data') {
const fields: Array<[string, any]> = Object.entries(params.body);
const formDataRequest = supertestAdmin[method](url)
const formDataRequest = supertestWithRoleScoped[method](url)
.set(headers)
.set('Content-type', 'multipart/form-data');

Expand All @@ -84,9 +84,9 @@ export async function getAdminApiClient<TServerRouteRepository extends ServerRou

res = formDataRequest;
} else if (params.body) {
res = supertestAdmin[method](url).send(params.body).set(headers);
res = supertestWithRoleScoped[method](url).send(params.body).set(headers);
} else {
res = supertestAdmin[method](url).set(headers);
res = supertestWithRoleScoped[method](url).set(headers);
}

return res as RepositorySupertestReturnOf<
Expand All @@ -97,6 +97,26 @@ export async function getAdminApiClient<TServerRouteRepository extends ServerRou
};
}

export async function getCustomRoleApiClient<TServerRouteRepository extends ServerRouteRepository>(
st: ReturnType<typeof CustomRoleScopedSupertestProvider>
): Promise<RepositorySupertestClient<TServerRouteRepository>> {
const supertestWithCustomRoleScoped = await st.getSupertestWithCustomRoleScope({
useCookieHeader: true,
withInternalHeaders: true,
});
return await getApiClient(supertestWithCustomRoleScoped);
}

export async function getAdminApiClient<TServerRouteRepository extends ServerRouteRepository>(
st: ReturnType<typeof RoleScopedSupertestProvider>
): Promise<RepositorySupertestClient<TServerRouteRepository>> {
const supertestWithRoleScoped = await st.getSupertestWithRoleScope('admin', {
useCookieHeader: true,
withInternalHeaders: true,
});
return await getApiClient(supertestWithRoleScoped);
}

type WithoutPromise<T extends Promise<any>> = Subtract<T, Promise<any>>;

// this is a little intense, but without it, method overrides are lost
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ export default function ({ loadTestFile }: FtrProviderContext) {
// add tests that require feature flags, defined in config.feature_flags.ts
loadTestFile(require.resolve('./role_management'));
loadTestFile(require.resolve('./infra'));
loadTestFile(require.resolve('./streams'));
});
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { FtrProviderContext } from '../../../ftr_provider_context';

export default function ({ loadTestFile }: FtrProviderContext) {
describe('Observability Streams', function () {
loadTestFile(require.resolve('./read_privilege'));
});
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { WiredIngestUpsertRequest } from '@kbn/streams-schema';
import { DeploymentAgnosticFtrProviderContext } from '@kbn/test-suites-xpack/api_integration/deployment_agnostic/ftr_provider_context';
import {
StreamsSupertestRepositoryClient,
createStreamsRepositoryAdminClient,
createStreamsRepositoryCustomRoleClient,
} from '@kbn/test-suites-xpack/api_integration/deployment_agnostic/apis/observability/streams/helpers/repository_client';
import {
disableStreams,
enableStreams,
getStream,
putStream,
} from '@kbn/test-suites-xpack/api_integration/deployment_agnostic/apis/observability/streams/helpers/requests';

const STREAM_NAME = 'logs.crud';
const stream: WiredIngestUpsertRequest = {
ingest: {
lifecycle: { inherit: {} },
processing: [],
wired: {
routing: [],
fields: {
numberfield: {
type: 'long',
},
},
},
},
};
export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const roleScopedSupertest = getService('roleScopedSupertest');
const customRoleScopedSupertest = getService('customRoleScopedSupertest');
const samlAuth = getService('samlAuth');

let adminApiClient: StreamsSupertestRepositoryClient;
let customRoleApiClient: StreamsSupertestRepositoryClient;

describe('Read privilege', () => {
before(async () => {
await samlAuth.setCustomRole({
elasticsearch: {
indices: [
{
names: ['irrelevant'],
privileges: ['read', 'view_index_metadata'],
},
],
},
kibana: [
{
feature: { discover: ['read'] },
spaces: ['*'],
},
],
});

adminApiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest);
customRoleApiClient = await createStreamsRepositoryCustomRoleClient(
customRoleScopedSupertest
);
await enableStreams(adminApiClient);
});

after(async () => {
await disableStreams(adminApiClient);
await samlAuth.deleteCustomRole();
});

beforeEach(async () => {
await putStream(adminApiClient, STREAM_NAME, {
stream,
dashboards: [],
queries: [],
});
});

describe('Get streams', () => {
it('fails when users has not read access', async () => {
await getStream(customRoleApiClient, STREAM_NAME, 403);
});

it('succeed when users has read access', async () => {
await getStream(adminApiClient, STREAM_NAME, 200);
});
});
});
}
1 change: 1 addition & 0 deletions x-pack/test_serverless/shared/services/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export const services = {
svlUserManager: commonFunctionalServices.samlAuth,
samlAuth: commonFunctionalServices.samlAuth, // <--temp workaround until we can unify naming
roleScopedSupertest: deploymentAgnosticServices.roleScopedSupertest,
customRoleScopedSupertest: deploymentAgnosticServices.customRoleScopedSupertest,
dataViewApi: DataViewApiProvider,
platformSecurityUtils: PlatformSecurityUtilsProvider,
};
Loading