diff --git a/controlplane/src/core/repositories/SchemaCheckRepository.ts b/controlplane/src/core/repositories/SchemaCheckRepository.ts index 77107956c8..933d578683 100644 --- a/controlplane/src/core/repositories/SchemaCheckRepository.ts +++ b/controlplane/src/core/repositories/SchemaCheckRepository.ts @@ -6,6 +6,7 @@ import { CompositionWarning, GraphPruningIssue, LintIssue, + LintSeverity, ProposalSubgraph, SchemaChange, VCSContext, @@ -744,8 +745,13 @@ export class SchemaCheckRepository { const compositionErrors: PlainMessage[] = []; const compositionWarnings: PlainMessage[] = []; + type ExtendedCheckSubgraph = CheckSubgraph & { + lintIssues: SchemaLintIssues; + pruneIssues: SchemaGraphPruningIssues; + }; + const federatedGraphs: FederatedGraphDTO[] = []; - const checkSubgraphs: Map = new Map(); + const checkSubgraphs: Map = new Map(); const fedGraphIdToCheckFedGraphId = new Map(); const changeRetention = await orgRepo.getFeature({ @@ -943,6 +949,8 @@ export class SchemaCheckRepository { checkSubgraphId: schemaCheckSubgraphId, routerCompatibilityVersion, labels: s.isNew ? s.labels : undefined, + lintIssues: { warnings: [], errors: [] }, + pruneIssues: { warnings: [], errors: [] }, }); } @@ -1029,13 +1037,6 @@ export class SchemaCheckRepository { storedBreakingChanges, ); - checkSubgraphs.set(subgraphName, { - ...checkSubgraph, - inspectorChanges, - storedBreakingChanges, - checkSubgraphId: schemaCheckSubgraphId, - }); - const lintIssues: SchemaLintIssues = await schemaLintRepo.performSchemaLintCheck({ schemaCheckID, newSchemaSDL, @@ -1119,6 +1120,15 @@ export class SchemaCheckRepository { }), ), ); + + checkSubgraphs.set(subgraphName, { + ...checkSubgraph, + inspectorChanges, + storedBreakingChanges, + checkSubgraphId: schemaCheckSubgraphId, + lintIssues, + pruneIssues: graphPruningIssues, + }); } const { composedGraphs } = await composer.composeWithProposedSchemas({ @@ -1322,6 +1332,54 @@ export class SchemaCheckRepository { } } + // Execute the subgraph check extension webhook + const sceResult = await webhookService.sendSubgraphCheckExtension({ + actorId, + schemaCheckID, + blobStorage, + admissionConfig, + organization: { id: organizationId, slug: organizationSlug }, + namespace, + vcsContext, + subgraphs: [...checkSubgraphs.entries()].map(([subgraphName, { subgraph, ...check }]) => ({ + id: subgraph?.id ?? '', + name: subgraphName, + labels: subgraph?.labels ?? check.labels ?? [], + schemaSDL: subgraph?.schemaSDL ?? '', + schemaChanges: check.schemaChanges, + lintIssues: check.lintIssues, + pruneIssues: check.pruneIssues, + newSchemaSDL: check.newSchemaSDL, + isDeleted: check.newSchemaSDL === '', + })), + affectedGraphs: federatedGraphs, + composedGraphs, + inspectedOperations, + }); + + if (sceResult?.lintIssuesBySubgraph) { + for (const [subgraphName, check] of checkSubgraphs.entries()) { + const sceLintIssues = sceResult.lintIssuesBySubgraph.get(subgraphName); + if (sceLintIssues && sceLintIssues.length > 0) { + const sceLintWarnings = sceLintIssues.filter((issue) => issue.severity === LintSeverity.warn); + const sceLintErrors = sceLintIssues.filter((issue) => issue.severity === LintSeverity.error); + + check.lintIssues.warnings.push(...sceLintIssues.filter((issue) => issue.severity === LintSeverity.warn)); + check.lintIssues.errors.push(...sceLintIssues.filter((issue) => issue.severity === LintSeverity.error)); + + lintWarnings.push(...sceLintWarnings.map((issue) => new LintIssue({ ...issue, subgraphName }))); + lintErrors.push(...sceLintErrors.map((issue) => new LintIssue({ ...issue, subgraphName }))); + + // Then, we need to add the overwritten lint issues + await schemaLintRepo.addSchemaCheckLintIssues({ + schemaCheckId: schemaCheckID, + lintIssues: sceLintIssues, + schemaCheckSubgraphId: check.checkSubgraphId, + }); + } + } + } + // Update the overall schema check with the results await this.update({ schemaCheckID, @@ -1329,6 +1387,8 @@ export class SchemaCheckRepository { hasBreakingChanges: breakingChanges.length > 0 || composedSchemaBreakingChanges.length > 0, hasLintErrors: lintErrors.length > 0, hasGraphPruningErrors: graphPruneErrors.length > 0, + checkExtensionDeliveryId: sceResult?.deliveryInfo?.id, + checkExtensionErrorMessage: sceResult?.deliveryInfo?.errorMessage ?? undefined, }); let isLinkedTrafficCheckFailed = false; diff --git a/controlplane/src/core/repositories/SubgraphRepository.ts b/controlplane/src/core/repositories/SubgraphRepository.ts index eaadcd5653..6dc9976645 100644 --- a/controlplane/src/core/repositories/SubgraphRepository.ts +++ b/controlplane/src/core/repositories/SubgraphRepository.ts @@ -2299,32 +2299,38 @@ export class SubgraphRepository { organization: { id: this.organizationId, slug: organizationSlug }, namespace, vcsContext, - subgraph, - newSchemaSDL, - isDeleted, + subgraphs: [ + { + id: subgraph?.id ?? '', + name: subgraph?.name ?? subgraphName, + labels: subgraph?.labels ?? labels ?? [], + schemaSDL: subgraph?.schemaSDL ?? '', + schemaChanges, + lintIssues, + pruneIssues: graphPruningIssues, + newSchemaSDL, + isDeleted, + }, + ], affectedGraphs: federatedGraphs, composedGraphs, - schemaChanges, - lintIssues, - pruneIssues: graphPruningIssues, inspectedOperations, }); - if (sceResult && sceResult.additionalLintIssues.length > 0) { - const additionalLintIssues: SchemaLintIssues = { - warnings: sceResult.additionalLintIssues.filter((issue) => issue.severity === LintSeverity.warn), - errors: sceResult.additionalLintIssues.filter((issue) => issue.severity === LintSeverity.error), - }; + if (sceResult?.lintIssuesBySubgraph) { + const sceLintIssues = sceResult.lintIssuesBySubgraph.get(subgraph?.name ?? subgraphName); - lintIssues.warnings.push(...additionalLintIssues.warnings); - lintIssues.errors.push(...additionalLintIssues.errors); + if (sceLintIssues && sceLintIssues.length > 0) { + lintIssues.warnings.push(...sceLintIssues.filter((issue) => issue.severity === LintSeverity.warn)); + lintIssues.errors.push(...sceLintIssues.filter((issue) => issue.severity === LintSeverity.error)); - // Then, we need to add the overwritten lint issues - await schemaLintRepo.addSchemaCheckLintIssues({ - schemaCheckId: schemaCheckID, - lintIssues: sceResult.additionalLintIssues, - schemaCheckSubgraphId, - }); + // Then, we need to add the overwritten lint issues + await schemaLintRepo.addSchemaCheckLintIssues({ + schemaCheckId: schemaCheckID, + lintIssues: sceLintIssues, + schemaCheckSubgraphId, + }); + } } // Update the overall schema check with the results diff --git a/controlplane/src/core/webhooks/OrganizationWebhookService.ts b/controlplane/src/core/webhooks/OrganizationWebhookService.ts index 26520de3cf..af05215ad9 100644 --- a/controlplane/src/core/webhooks/OrganizationWebhookService.ts +++ b/controlplane/src/core/webhooks/OrganizationWebhookService.ts @@ -6,7 +6,7 @@ import { eq } from 'drizzle-orm'; import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; import pino from 'pino'; import { v4 } from 'uuid'; -import z from 'zod'; +import * as z from 'zod'; import { LintSeverity, VCSContext } from '@wundergraph/cosmo-connect/dist/platform/v1/platform_pb'; import * as schema from '../../db/schema.js'; import { FederatedGraphRepository } from '../repositories/FederatedGraphRepository.js'; @@ -20,7 +20,6 @@ import { NamespaceDTO, SchemaGraphPruningIssues, SchemaLintIssues, - SubgraphDTO, } from '../../types/index.js'; import { ComposedFederatedGraph } from '../composition/composer.js'; import { GetDiffBetweenGraphsSuccess } from '../composition/schemaCheck.js'; @@ -33,18 +32,21 @@ import { makeWebhookRequest } from './utils.js'; const subgraphCheckExtensionSchema = z.object({ errors: z.array(z.string()).optional(), lintIssues: z - .array( - z.object({ - lintRuleType: z.string().trim(), - severity: z.nativeEnum(LintSeverity), - message: z.string().trim(), - issueLocation: z.object({ - line: z.number().int().positive(), - column: z.number().int().positive(), - endLine: z.number().int().positive().optional(), - endColumn: z.number().int().positive().optional(), + .record( + z.string(), + z.array( + z.object({ + lintRuleType: z.string().trim(), + severity: z.nativeEnum(LintSeverity), + message: z.string().trim(), + issueLocation: z.object({ + line: z.number().int().positive(), + column: z.number().int().positive(), + endLine: z.number().int().positive().optional(), + endColumn: z.number().int().positive().optional(), + }), }), - }), + ), ) .optional(), }); @@ -585,19 +587,24 @@ export class OrganizationWebhookService { organization: { id: string; slug: string }; namespace: NamespaceDTO; vcsContext: VCSContext | undefined; - subgraph: SubgraphDTO | undefined; - newSchemaSDL: string; - isDeleted: boolean; + subgraphs: { + id: string; + name: string; + labels: Label[]; + schemaChanges: GetDiffBetweenGraphsSuccess; + lintIssues: SchemaLintIssues; + pruneIssues: SchemaGraphPruningIssues; + schemaSDL: string; + newSchemaSDL: string; + isDeleted: boolean; + }[]; affectedGraphs: FederatedGraphDTO[]; composedGraphs: ComposedFederatedGraph[]; - schemaChanges: GetDiffBetweenGraphsSuccess; - lintIssues: SchemaLintIssues; - pruneIssues: SchemaGraphPruningIssues; inspectedOperations: InspectorOperationResult[]; }): Promise< | { deliveryInfo: WebhookDeliveryInfo; - additionalLintIssues: LintIssueResult[]; + lintIssuesBySubgraph: Map; } | undefined > { @@ -629,15 +636,17 @@ export class OrganizationWebhookService { // Compose the contents of the file that we'll provide to the webhook const fileContent: Record = {}; - if (input.subgraph) { - fileContent.subgraphs = [ - { - id: input.subgraph.id, - name: input.subgraph.name, - newComposedSdl: sceConfig.includeLintingIssues ? input.newSchemaSDL : undefined, - oldComposedSdl: sceConfig.includeLintingIssues ? input.subgraph.schemaSDL : undefined, - }, - ]; + if (input.subgraphs.length > 0) { + fileContent.subgraphs = input.subgraphs.map((subgraph) => ({ + id: subgraph.id, + name: subgraph.name, + labels: subgraph.labels, + newComposedSdl: sceConfig.includeComposedSdl ? subgraph.newSchemaSDL : undefined, + oldComposedSdl: sceConfig.includeComposedSdl ? subgraph.schemaSDL : undefined, + lintIssues: sceConfig.includeLintingIssues ? subgraph.lintIssues : undefined, + pruningIssues: sceConfig.includePruningIssues ? subgraph.pruneIssues : undefined, + schemaChanges: sceConfig.includeSchemaChanges ? subgraph.schemaChanges : undefined, + })); } if (sceConfig.includeComposedSdl) { @@ -649,18 +658,6 @@ export class OrganizationWebhookService { })); } - if (sceConfig.includeLintingIssues) { - fileContent.lintIssues = [...input.lintIssues.warnings, ...input.lintIssues.errors]; - } - - if (sceConfig.includePruningIssues) { - fileContent.pruningIssues = [...input.pruneIssues.warnings, ...input.pruneIssues.errors]; - } - - if (sceConfig.includeSchemaChanges) { - fileContent.schemaChanges = input.schemaChanges.changes; - } - if (sceConfig.includeAffectedOperations) { fileContent.affectedOperations = input.inspectedOperations; } @@ -669,11 +666,11 @@ export class OrganizationWebhookService { let url: string | undefined; if (Object.keys(fileContent).length > 0) { // Only upload the file if at least one option is enabled - const blobKey = `/${input.organization.id}/subgraph_checks/${v4()}.json`; + const blobKey = `${input.organization.id}/subgraph_checks/${v4()}.json`; const blobContent = JSON.stringify(fileContent); await input.blobStorage.putObject({ key: blobKey, - contentType: 'application/json', + contentType: 'application/json; charset=utf-8', body: Buffer.from(blobContent, 'utf8'), }); @@ -686,14 +683,14 @@ export class OrganizationWebhookService { }, }); - url = `${input.admissionConfig.cdnBaseUrl}${blobKey}?token=${token}`; + url = `${input.admissionConfig.cdnBaseUrl}/${blobKey}?token=${token}`; } // Compose the webhook payload const payload: Record = { actorId: input.actorId, checkId: input.schemaCheckID, - labels: input.subgraph ? undefined : input.labels, + labels: input.labels, organization: input.organization, namespace: { id: input.namespace.id, name: input.namespace.name }, vcsContext: input.vcsContext, @@ -704,14 +701,13 @@ export class OrganizationWebhookService { url, }; - if (input.subgraph) { - payload.subgraphs = [ - { - id: input.subgraph.id, - name: input.subgraph.name, - isDeleted: input.isDeleted, - }, - ]; + if (input.subgraphs.length > 0) { + payload.subgraphs = input.subgraphs.map((sg) => ({ + id: sg.id, + name: sg.name, + labels: sg.labels, + isDeleted: sg.isDeleted, + })); } // Deliver the webhook @@ -733,13 +729,13 @@ export class OrganizationWebhookService { this.logger, ); + let lintIssuesBySubgraph: Map = new Map(); if (!response?.data) { - return { deliveryInfo, additionalLintIssues: [] }; + return { deliveryInfo, lintIssuesBySubgraph }; } // Validate the response and maybe overwrite the error message based on it let overwriteErrorMessage = false; - let additionalLintIssues: LintIssueResult[] = []; if (response?.status !== 200 && response?.status !== 204) { // We expect the response status to be either 200 (OK) or 204 (No Content) @@ -755,8 +751,13 @@ export class OrganizationWebhookService { deliveryInfo.errorMessage = parsedResponse.data.errors.filter(Boolean).join('\n'); } - if (Array.isArray(parsedResponse.data.lintIssues)) { - additionalLintIssues = parsedResponse.data.lintIssues as LintIssueResult[]; + if (typeof parsedResponse.data.lintIssues === 'object') { + try { + const lintIssuesRecord = parsedResponse.data.lintIssues as Record; + lintIssuesBySubgraph = new Map(Object.entries(lintIssuesRecord)); + } catch { + // ignore + } } } else { overwriteErrorMessage = true; @@ -775,6 +776,6 @@ export class OrganizationWebhookService { } // We are done! - return { deliveryInfo, additionalLintIssues }; + return { deliveryInfo, lintIssuesBySubgraph }; } } diff --git a/controlplane/test/subgraph-check-extensions.test.ts b/controlplane/test/subgraph-check-extensions.test.ts index c09ea8ed28..d7beb4d11d 100644 --- a/controlplane/test/subgraph-check-extensions.test.ts +++ b/controlplane/test/subgraph-check-extensions.test.ts @@ -1,10 +1,34 @@ import { EnumStatusCode } from '@wundergraph/cosmo-connect/dist/common/common_pb'; import { afterAll, afterEach, beforeAll, describe, expect, test, vi } from 'vitest'; -import { TestUser, afterAllSetup, beforeAllSetup } from '../src/core/test-util.js'; +import { joinLabel } from '@wundergraph/cosmo-shared'; +import axios from 'axios'; +import { TestUser, afterAllSetup, beforeAllSetup, genUniqueLabel, genID } from '../src/core/test-util.js'; import { SetupTest } from './test-util.js'; let dbname = ''; +const subgraph1Schema = ` +type Query { + employees: [Employee!]! +} + +type Employee { + id: Int! + name: String! +} +`; + +const subgraph2Schema = ` +type Query { + departments: [Department!]! +} + +type Department { + id: Int! + title: String! +} +`; + vi.mock('../src/core/clickhouse/index.js', () => { const ClickHouseClient = vi.fn(); ClickHouseClient.prototype.queryPromise = vi.fn(); @@ -12,6 +36,87 @@ vi.mock('../src/core/clickhouse/index.js', () => { return { ClickHouseClient }; }); +async function setupTestGraphs(postSpy: ReturnType) { + const { client, server, users, blobStorage } = await SetupTest({ dbname, setupBilling: { plan: 'enterprise' } }); + const response = await client.configureSubgraphCheckExtensions({ + enableSubgraphCheckExtensions: true, + namespace: 'default', + endpoint: 'https://example.com/handler', + }); + + expect(response.response?.code).toBe(EnumStatusCode.OK); + + const fedGraphName = genID('fedGraph'); + const subgraph1Name = genID('subgraph1'); + const subgraph2Name = genID('subgraph2'); + const label = genUniqueLabel(); + + // Create a federated graph + const createFedGraphRes = await client.createFederatedGraph({ + name: fedGraphName, + namespace: 'default', + routingUrl: 'http://localhost:8081', + labelMatchers: [joinLabel(label)], + }); + + expect(createFedGraphRes.response?.code).toBe(EnumStatusCode.OK); + + // Create and publish first subgraph + let resp = await client.createFederatedSubgraph({ + name: subgraph1Name, + namespace: 'default', + labels: [label], + routingUrl: 'http://localhost:8082', + }); + + expect(resp.response?.code).toBe(EnumStatusCode.OK); + + resp = await client.publishFederatedSubgraph({ + name: subgraph1Name, + namespace: 'default', + schema: subgraph1Schema, + }); + + expect(resp.response?.code).toBe(EnumStatusCode.OK); + + // Create and publish second subgraph + resp = await client.createFederatedSubgraph({ + name: subgraph2Name, + namespace: 'default', + labels: [label], + routingUrl: 'http://localhost:8083', + }); + + expect(resp.response?.code).toBe(EnumStatusCode.OK); + + resp = await client.publishFederatedSubgraph({ + name: subgraph2Name, + namespace: 'default', + schema: subgraph2Schema, + }); + + expect(resp.response?.code).toBe(EnumStatusCode.OK); + + vi.spyOn(axios, 'create').mockReturnValue({ + post: postSpy, + get: vi.fn(), + interceptors: { + request: { use: vi.fn() }, + response: { use: vi.fn() }, + }, + } as any); + + return { + client, + server, + subgraph1Name, + subgraph2Name, + fedGraphName, + adminAliceCompanyA: users.adminAliceCompanyA, + blobStorage, + }; +} + describe('Subgraph Check Extensions Tests', (ctx) => { afterEach(() => { vi.clearAllMocks(); @@ -267,4 +372,70 @@ describe('Subgraph Check Extensions Tests', (ctx) => { await server.close(); }); + + test('that the subgraph check extension webhook is sent and handled correctly', async () => { + const postSpy = vi.fn().mockResolvedValue({ status: 400, data: {} }); + + const { client, server, subgraph1Name, fedGraphName, adminAliceCompanyA, blobStorage } = + await setupTestGraphs(postSpy); + + // Run the schema check + const checkResp = await client.checkSubgraphSchema({ + subgraphName: subgraph1Name, + schema: new Uint8Array(Buffer.from(subgraph1Schema)), + namespace: 'default', + }); + + expect(checkResp.response?.code).toBe(EnumStatusCode.OK); + expect(checkResp.checkExtensionErrorMessage).toBe( + "Check extension returned status code '400'. Allowed values are 200 and 204.", + ); + + // Verify that the configured endpoint was hit + expect(postSpy).toHaveBeenCalledOnce(); + + const [url, data] = postSpy.mock.calls[0]; + + expect(url).toBe('https://example.com/handler'); + expect(data.actorId).toBe(adminAliceCompanyA.userId); + expect(data.checkId).toBe(checkResp.checkId); + expect(data.url).toBeDefined(); + expect(data.url).not.toBeNull(); + + // Read the content of the uploaded file from the provided url + const payloadUrl = new URL(data.url); + const blob = await blobStorage.getObject({ key: payloadUrl.pathname.slice(1) }); + expect(blob?.stream).toBeDefined(); + + const reader = blob.stream.getReader(); + const chunks: Uint8Array[] = []; + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + + chunks.push(value); + } + + const text = Buffer.concat(chunks).toString('utf8'); + const json = JSON.parse(text); + + expect(json.subgraphs).toHaveLength(1); + expect(json.subgraphs[0]?.name).toBe(subgraph1Name); + + // Verify that the subgraph check failure is recorded + const checkSummary = await client.getCheckSummary({ + namespace: 'default', + graphName: fedGraphName, + checkId: checkResp.checkId, + }); + + expect(checkSummary.response?.code).toBe(EnumStatusCode.OK); + expect(checkSummary.check?.checkExtensionErrorMessage).toBe( + "Check extension returned status code '400'. Allowed values are 200 and 204.", + ); + + await server.close(); + }); });