Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
38c93f7
feat: add PQL manifest support for persisted operations
StarpTech Mar 11, 2026
5def8fa
feat: update persisted operations tests and improve manifest handling
StarpTech Mar 11, 2026
980f7a9
feat: enhance README with PQL manifest details and lookup order
StarpTech Mar 11, 2026
b46e35c
Merge branch 'main' into dustin/eng-9107-router-control-plane-impleme…
StarpTech Mar 11, 2026
f2101bd
feat: implement persisted query list (PQL) support in configuration a…
StarpTech Mar 11, 2026
1ac5f25
feat: refactor persisted operation deletion and manifest generation l…
StarpTech Mar 11, 2026
5623d04
feat: add error handling for PQL manifest regeneration in publishPers…
StarpTech Mar 11, 2026
b4e0c0e
feat: update NewPoller to enforce minimum values for pollInterval and…
StarpTech Mar 11, 2026
8bbfd3f
fix: correct default value for pollJitter in NewPoller to use time.Du…
StarpTech Mar 11, 2026
b4c791b
feat: add truncation logic for manifest operations and corresponding …
StarpTech Mar 11, 2026
e12cc19
Merge branch 'main' into dustin/eng-9107-router-control-plane-impleme…
alepane21 Mar 12, 2026
1f75382
Merge branch 'main' into dustin/eng-9107-router-control-plane-impleme…
StarpTech Mar 12, 2026
f4f9221
feat(persisted-query): implement persisted query list and cache inval…
StarpTech Mar 12, 2026
e57088b
Merge branch 'main' into dustin/eng-9107-router-control-plane-impleme…
alepane21 Mar 12, 2026
ebd9e29
fix(telemetry): remove sample_rate comment for clarity
StarpTech Mar 12, 2026
23ba808
Merge branch 'main' into dustin/eng-9107-router-control-plane-impleme…
StarpTech Mar 13, 2026
077d34f
refactor: move generateAndUploadManifest to OperationsRepository and …
JivusAyrus Mar 23, 2026
b4bc772
refactor: remove Cache-Control header from response in persisted oper…
JivusAyrus Mar 23, 2026
5cb5ec9
feat: implement operation limit check in publishPersistedOperations a…
JivusAyrus Mar 24, 2026
716a670
feat: implement persisted query list (PQL) loading from various stora…
JivusAyrus Mar 24, 2026
8c62000
fix: correct operation ID usage in operations mapping and improve CDN…
JivusAyrus Mar 25, 2026
9d8e98e
feat: implement FetchManifest methods for CDN, S3, and filesystem pro…
JivusAyrus Mar 25, 2026
876b24b
feat: add path property for PQL manifest JSON file in config schema
JivusAyrus Mar 25, 2026
9fd935c
Merge branch 'main' of github.com:wundergraph/cosmo into dustin/eng-9…
JivusAyrus Mar 25, 2026
6fad863
docs: update PQL Manifest section in README to clarify loading behavi…
JivusAyrus Mar 25, 2026
6738083
fix: lint
JivusAyrus Mar 25, 2026
5c55b93
fix: tests
JivusAyrus Mar 26, 2026
2f3819e
Merge branch 'main' into dustin/eng-9107-router-control-plane-impleme…
JivusAyrus Mar 26, 2026
6a13a13
feat(persisted-query): enhance CDN client and add PQL manifest support
StarpTech Mar 26, 2026
c000a99
Merge branch 'main' into dustin/eng-9107-router-control-plane-impleme…
StarpTech Mar 26, 2026
c9bcc31
feat(pql): add test for CDN manifest fetch failure and improve error …
StarpTech Mar 26, 2026
ffcbd35
feat(pql): add test for CDN manifest fetch failure and improve error …
StarpTech Mar 26, 2026
a155ccf
refactor(tests): simplify handler assignment in fetcher tests
StarpTech Mar 26, 2026
d9aa7de
fix(cdn): change HTTP method to GET for fetching manifest
StarpTech Mar 26, 2026
8af64f0
feat(pql): implement ReadManifest method for fetching PQL manifests f…
StarpTech Mar 26, 2026
9db9c6b
chore(config): remove unused persisted_operations section from demo.c…
StarpTech Mar 26, 2026
062a4ee
docs(push): clarify client-name option usage in push.mdx
StarpTech Mar 26, 2026
52d41a3
feat(docs): enhance tutorial on using persisted operations with PQL m…
JivusAyrus Mar 27, 2026
93befb6
feat(manifest): add cache warmup configuration for PQL operations
StarpTech Mar 27, 2026
c2a946e
Merge branch 'main' into dustin/eng-9107-router-control-plane-impleme…
StarpTech Mar 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 65 additions & 3 deletions cdn-server/cdn/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ export interface BlobStorage {
headObject({
context,
key,
schemaVersionId,
version,
}: {
context: Context;
abortSignal?: AbortSignal;
key: string;
schemaVersionId: string;
version: string;
}): Promise<boolean>;
}

Expand Down Expand Up @@ -106,6 +106,9 @@ const jwtMiddleware = (secret: string | ((c: Context) => string)) => {
};
};

// Deprecated: Individual persisted operation lookups via CDN are deprecated.
// The router now downloads all operations at once via the PQL manifest, avoiding
// per-request latency. This handler is kept for backward compatibility with older routers.
const persistedOperation = (storage: BlobStorage) => {
return async (c: Context) => {
const organizationId = c.get('authenticatedOrganizationId');
Expand Down Expand Up @@ -165,7 +168,7 @@ const latestValidRouterConfig = (storage: BlobStorage) => {
// starts for the first time, and we need to return a config anyway.
if (body?.version) {
try {
isModified = await storage.headObject({ context: c, key, schemaVersionId: body.version });
isModified = await storage.headObject({ context: c, key, version: body.version });
} catch (e: any) {
if (e instanceof BlobNotFoundError) {
return c.notFound();
Expand Down Expand Up @@ -262,6 +265,60 @@ const cacheOperations = (storage: BlobStorage) => {
};
};

const persistedOperationsManifest = (storage: BlobStorage) => {
return async (c: Context) => {
const organizationId = c.get('authenticatedOrganizationId');
const federatedGraphId = c.get('authenticatedFederatedGraphId');

if (organizationId !== c.req.param('organization_id') || federatedGraphId !== c.req.param('federated_graph_id')) {
return c.text('Bad Request', 400);
}

const key = `${organizationId}/${federatedGraphId}/operations/manifest.json`;

const ifNoneMatch = c.req.header('If-None-Match');
const clientVersion = ifNoneMatch?.replace(/^"(.*)"$/, '$1') || null;

let isModified = true;

if (clientVersion) {
try {
isModified = await storage.headObject({ context: c, key, version: clientVersion });
} catch (e: any) {
if (e instanceof BlobNotFoundError) {
return c.notFound();
}
throw e;
}
}

if (!isModified) {
c.header('ETag', `"${clientVersion}"`);
return c.body(null, 304);
}

let blobObject: BlobObject;

try {
blobObject = await storage.getObject({ context: c, key, cacheControl: 'no-cache' });
} catch (e: any) {
if (e instanceof BlobNotFoundError) {
return c.notFound();
}
throw e;
}

if (blobObject.metadata?.version) {
c.header('ETag', `"${blobObject.metadata.version}"`);
}
c.header('Content-Type', 'application/json; charset=UTF-8');

return stream(c, async (stream) => {
await stream.pipe(blobObject.stream);
});
};
};

const subgraphChecks = (storage: BlobStorage) => {
return async (c: Context) => {
const organizationId = c.get('authenticatedOrganizationId');
Expand Down Expand Up @@ -301,6 +358,11 @@ export const cdn = <E extends Env, S extends Schema = {}, BasePath extends strin
hono: Hono<E, S, BasePath>,
opts: CdnOptions,
) => {
const manifestPath = '/:organization_id/:federated_graph_id/operations/manifest.json';
hono
.use(manifestPath, jwtMiddleware(opts.authJwtSecret))
.get(manifestPath, persistedOperationsManifest(opts.blobStorage));

const operations = '/:organization_id/:federated_graph_id/operations/:client_id/:operation{.+\\.json$}';
const latestValidRouterConfigs = '/:organization_id/:federated_graph_id/routerconfigs/latest.json';
hono.use(operations, jwtMiddleware(opts.authJwtSecret)).get(operations, persistedOperation(opts.blobStorage));
Expand Down
205 changes: 203 additions & 2 deletions cdn-server/cdn/test/cdn.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ class InMemoryBlobStorage implements BlobStorage {
return Promise.resolve({ stream, metadata: obj.metadata });
}

headObject({ key, schemaVersionId }: { key: string; schemaVersionId: string }): Promise<boolean> {
headObject({ key, version }: { key: string; version: string }): Promise<boolean> {
const obj = this.objects.get(key);
if (!obj) {
return Promise.reject(new BlobNotFoundError(`Object with key ${key} not found`));
}
if (obj.metadata?.version === schemaVersionId) {
if (obj.metadata?.version === version) {
return Promise.resolve(false);
}
return Promise.resolve(true);
Expand Down Expand Up @@ -554,6 +554,207 @@ describe('CDN handlers', () => {
});
});

describe('Test persisted operations manifest handler', async () => {
const federatedGraphId = 'federatedGraphId';
const organizationId = 'organizationId';
const token = await generateToken(organizationId, federatedGraphId, secretKey);
const blobStorage = new InMemoryBlobStorage();
const requestPath = `/${organizationId}/${federatedGraphId}/operations/manifest.json`;

const app = new Hono();

cdn(app, {
authJwtSecret: secretKey,
authAdmissionJwtSecret: secretAdmissionKey,
blobStorage,
});

test('it returns a 401 if no Authorization header is provided', async () => {
const res = await app.request(requestPath, {
method: 'GET',
});
expect(res.status).toBe(401);
});

test('it returns a 401 if an invalid Authorization header is provided', async () => {
const res = await app.request(requestPath, {
method: 'GET',
headers: {
Authorization: `Bearer ${token.slice(0, -1)}}`,
},
});
expect(res.status).toBe(401);
});

test('it returns a 400 if the graph or organization ids does not match with the JWT payload', async () => {
const res = await app.request(`/foo/bar/operations/manifest.json`, {
method: 'GET',
headers: {
Authorization: `Bearer ${token}`,
},
});
expect(res.status).toBe(400);
});

test('it returns a 401 if the token has expired', async () => {
const token = await new SignJWT({
organization_id: organizationId,
federated_graph_id: federatedGraphId,
exp: Math.floor(Date.now() / 1000) - 60,
})
.setProtectedHeader({ alg: 'HS256' })
.sign(new TextEncoder().encode(secretKey));
const res = await app.request(requestPath, {
method: 'GET',
headers: {
Authorization: `Bearer ${token}`,
},
});
expect(res.status).toBe(401);
});

test('it returns the manifest with ETag on first request', async () => {
const manifestContents = JSON.stringify({
version: 1,
revision: 'abc123',
generatedAt: '2025-01-01T00:00:00.000Z',
operations: {
sha256hash1: 'query { hello }',
},
});

blobStorage.objects.set(`${organizationId}/${federatedGraphId}/operations/manifest.json`, {
buffer: Buffer.from(manifestContents),
metadata: { version: 'abc123' },
});

const res = await app.request(requestPath, {
method: 'GET',
headers: {
Authorization: `Bearer ${token}`,
},
});
expect(res.status).toBe(200);
expect(res.headers.get('Content-Type')).toBe('application/json; charset=UTF-8');
expect(res.headers.get('ETag')).toBe('"abc123"');
expect(await res.text()).toBe(manifestContents);
});

test('it returns 304 with ETag when If-None-Match matches', async () => {
blobStorage.objects.set(`${organizationId}/${federatedGraphId}/operations/manifest.json`, {
buffer: Buffer.from(JSON.stringify({ version: 1, revision: 'abc123', operations: {} })),
metadata: { version: 'abc123' },
});

const res = await app.request(requestPath, {
method: 'GET',
headers: {
Authorization: `Bearer ${token}`,
'If-None-Match': '"abc123"',
},
});
expect(res.status).toBe(304);
expect(res.headers.get('ETag')).toBe('"abc123"');
});

test('it returns 200 with new ETag when If-None-Match does not match', async () => {
const manifestContents = JSON.stringify({
version: 1,
revision: 'def456',
generatedAt: '2025-01-01T00:00:00.000Z',
operations: {
sha256hash1: 'query { hello }',
},
});

blobStorage.objects.set(`${organizationId}/${federatedGraphId}/operations/manifest.json`, {
buffer: Buffer.from(manifestContents),
metadata: { version: 'def456' },
});

const res = await app.request(requestPath, {
method: 'GET',
headers: {
Authorization: `Bearer ${token}`,
'If-None-Match': '"old-revision"',
},
});
expect(res.status).toBe(200);
expect(res.headers.get('ETag')).toBe('"def456"');
expect(await res.text()).toBe(manifestContents);
});

test('ETag round-trip: fetch returns ETag, re-fetch with that ETag returns 304', async () => {
const manifestContents = JSON.stringify({
version: 1,
revision: 'rev-round-trip',
generatedAt: '2025-01-01T00:00:00.000Z',
operations: { hash1: 'query { hello }' },
});

blobStorage.objects.set(`${organizationId}/${federatedGraphId}/operations/manifest.json`, {
buffer: Buffer.from(manifestContents),
metadata: { version: 'rev-round-trip' },
});

// First request: no ETag, should get 200 with ETag
const res1 = await app.request(requestPath, {
method: 'GET',
headers: { Authorization: `Bearer ${token}` },
});
expect(res1.status).toBe(200);
const etag = res1.headers.get('ETag');
expect(etag).toBe('"rev-round-trip"');
expect(await res1.text()).toBe(manifestContents);

// Second request: send ETag back as If-None-Match, should get 304
const res2 = await app.request(requestPath, {
method: 'GET',
headers: {
Authorization: `Bearer ${token}`,
'If-None-Match': etag!,
},
});
expect(res2.status).toBe(304);
expect(res2.headers.get('ETag')).toBe(etag);
});

test('it returns a 404 if the manifest does not exist', async () => {
const otherBlobStorage = new InMemoryBlobStorage();
const otherApp = new Hono();

cdn(otherApp, {
authJwtSecret: secretKey,
authAdmissionJwtSecret: secretAdmissionKey,
blobStorage: otherBlobStorage,
});

const res = await otherApp.request(requestPath, {
method: 'GET',
headers: {
Authorization: `Bearer ${token}`,
},
});
expect(res.status).toBe(404);
});

test('it does not conflict with the individual persisted operations route', async () => {
const operationContents = JSON.stringify({ version: 1, body: 'query { hello }' });
blobStorage.objects.set(`${organizationId}/${federatedGraphId}/operations/clientName/operation.json`, {
buffer: Buffer.from(operationContents),
});

const res = await app.request(`/${organizationId}/${federatedGraphId}/operations/clientName/operation.json`, {
method: 'GET',
headers: {
Authorization: `Bearer ${token}`,
},
});
expect(res.status).toBe(200);
expect(await res.text()).toBe(operationContents);
});
});

describe('schema check extensions handler', async () => {
const organizationId = 'organizationId';
const checkId = randomUUID();
Expand Down
12 changes: 2 additions & 10 deletions cdn-server/src/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,7 @@ class S3BlobStorage implements BlobStorage {
}
}

async headObject({
context,
key,
schemaVersionId,
}: {
context: Context;
key: string;
schemaVersionId: string;
}): Promise<boolean> {
async headObject({ context, key, version }: { context: Context; key: string; version: string }): Promise<boolean> {
const command = new HeadObjectCommand({
Bucket: this.bucketName,
Key: key,
Expand All @@ -72,7 +64,7 @@ class S3BlobStorage implements BlobStorage {
} else if (resp.$metadata.httpStatusCode !== 200) {
throw new Error(`Failed to fetch the metadata of the object.`);
}
if (resp.Metadata && resp.Metadata.version === schemaVersionId) {
if (resp.Metadata && resp.Metadata.version === version) {
return false;
}
return true;
Expand Down
Loading
Loading