Skip to content

Commit c5c4ee5

Browse files
danlapidclaudepenalosa
authored
feat: add streaming tail consumer configuration support (#10703)
* feat: add streaming tail consumer configuration support Implement [[streaming_tail_consumer]] configuration similar to tail_consumer, sending to streamingTails instead of tails. Includes: - Configuration schema and validation for streaming_tail_consumers - Miniflare integration with streamingTails support - Development and deployment workflow updates - Print bindings integration for CLI output - Comprehensive test coverage 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]> * fixups * Create clever-eggs-sit.md * fix types * Work in Vite + Vitest * fix tests * Add E2E test * Address comments --------- Co-authored-by: Claude <[email protected]> Co-authored-by: Samuel Macleod <[email protected]> Co-authored-by: Somhairle MacLeòid <[email protected]>
1 parent a352c7f commit c5c4ee5

File tree

27 files changed

+413
-51
lines changed

27 files changed

+413
-51
lines changed

.changeset/clever-eggs-sit.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"miniflare": minor
3+
"wrangler": minor
4+
---
5+
6+
Add support for streaming tail consumers in local dev. This is an experimental new feature that allows you to register a `tailStream()` handler (compared to the existing `tail()` handler), which will receeive streamed tail events from your Worker (compared to the `tail()` handler, which only receives batched events after your Worker has finished processing).

packages/miniflare/src/plugins/core/index.ts

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ const CoreOptionsSchemaInput = z.intersection(
171171
hasAssetsAndIsVitest: z.boolean().optional(),
172172

173173
tails: z.array(ServiceDesignatorSchema).optional(),
174+
streamingTails: z.array(ServiceDesignatorSchema).optional(),
174175

175176
// Strip the CF-Connecting-IP header from outbound fetches
176177
stripCfConnectingIp: z.boolean().default(true),
@@ -847,19 +848,28 @@ export const CORE_PLUGIN: Plugin<
847848
sharedOptions.unsafeModuleFallbackService !== undefined
848849
? `localhost:${loopbackPort}`
849850
: undefined,
850-
tails:
851-
options.tails === undefined
852-
? undefined
853-
: options.tails.map<ServiceDesignator>((service) => {
854-
return getCustomServiceDesignator(
855-
/* referrer */ options.name,
856-
workerIndex,
857-
CustomServiceKind.UNKNOWN,
858-
name,
859-
service,
860-
options.hasAssetsAndIsVitest
861-
);
862-
}),
851+
tails: options.tails?.map<ServiceDesignator>((service) => {
852+
return getCustomServiceDesignator(
853+
/* referrer */ options.name,
854+
workerIndex,
855+
CustomServiceKind.UNKNOWN,
856+
name,
857+
service,
858+
options.hasAssetsAndIsVitest
859+
);
860+
}),
861+
streamingTails: options.streamingTails?.map<ServiceDesignator>(
862+
(service) => {
863+
return getCustomServiceDesignator(
864+
/* referrer */ options.name,
865+
workerIndex,
866+
CustomServiceKind.UNKNOWN,
867+
name,
868+
service,
869+
options.hasAssetsAndIsVitest
870+
);
871+
}
872+
),
863873
containerEngine: getContainerEngine(options.containerEngine),
864874
},
865875
});
@@ -878,16 +888,24 @@ export const CORE_PLUGIN: Plugin<
878888
}
879889
}
880890

881-
if (options.tails !== undefined) {
882-
for (const service of options.tails) {
883-
const maybeService = maybeGetCustomServiceService(
884-
workerIndex,
885-
CustomServiceKind.UNKNOWN,
886-
name,
887-
service
888-
);
889-
if (maybeService !== undefined) services.push(maybeService);
890-
}
891+
for (const service of options.tails ?? []) {
892+
const maybeService = maybeGetCustomServiceService(
893+
workerIndex,
894+
CustomServiceKind.UNKNOWN,
895+
name,
896+
service
897+
);
898+
if (maybeService !== undefined) services.push(maybeService);
899+
}
900+
901+
for (const service of options.streamingTails ?? []) {
902+
const maybeService = maybeGetCustomServiceService(
903+
workerIndex,
904+
CustomServiceKind.UNKNOWN,
905+
name,
906+
service
907+
);
908+
if (maybeService !== undefined) services.push(maybeService);
891909
}
892910

893911
if (options.outboundService !== undefined) {

packages/miniflare/src/runtime/config/workerd.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ export type Worker = (
7373
durableObjectStorage?: Worker_DurableObjectStorage;
7474
moduleFallback?: string;
7575
tails?: ServiceDesignator[];
76+
streamingTails?: ServiceDesignator[];
7677
containerEngine?: Worker_ContainerEngine;
7778
};
7879

packages/vite-plugin-cloudflare/src/workers/runner-worker/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ interface WorkflowEntrypointConstructor<T = Cloudflare.Env> {
4646
}
4747

4848
/** Keys that should be ignored during RPC property access */
49-
const IGNORED_KEYS = ["self", "tailStream"] as const;
49+
const IGNORED_KEYS = ["self"] as const;
5050

5151
/** Available methods for `WorkerEntrypoint` class */
5252
const WORKER_ENTRYPOINT_KEYS = [
@@ -56,6 +56,7 @@ const WORKER_ENTRYPOINT_KEYS = [
5656
"test",
5757
"trace",
5858
"scheduled",
59+
"tailStream",
5960
] as const;
6061

6162
/** Available methods for `DurableObject` class */

packages/vitest-pool-workers/src/worker/entrypoints.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ function importModule(
3333
});
3434
}
3535

36-
const IGNORED_KEYS = ["self", "tailStream"];
36+
const IGNORED_KEYS = ["self"];
3737

3838
/**
3939
* Create a class extending `superClass` with a `Proxy` as a `prototype`.
@@ -183,6 +183,7 @@ const WORKER_ENTRYPOINT_KEYS = [
183183
"scheduled",
184184
"queue",
185185
"test",
186+
"tailStream",
186187
] as const;
187188
const DURABLE_OBJECT_KEYS = [
188189
"fetch",

packages/workers-utils/src/config/config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,7 @@ export const defaultWranglerConfig: Config = {
385385
unsafe: {},
386386
mtls_certificates: [],
387387
tail_consumers: undefined,
388+
streaming_tail_consumers: undefined,
388389
pipelines: [],
389390
vpc_services: [],
390391
};

packages/workers-utils/src/config/environment.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,6 +1126,17 @@ export interface EnvironmentNonInheritable {
11261126
*/
11271127
tail_consumers?: TailConsumer[];
11281128

1129+
/**
1130+
* Specifies a list of Streaming Tail Workers that are bound to this Worker environment
1131+
*
1132+
* NOTE: This field is not automatically inherited from the top level environment,
1133+
* and so must be specified in every named environment.
1134+
*
1135+
* @default []
1136+
* @nonInheritable
1137+
*/
1138+
streaming_tail_consumers?: StreamingTailConsumer[];
1139+
11291140
/**
11301141
* Specifies namespace bindings that are bound to this Worker environment.
11311142
*
@@ -1295,6 +1306,11 @@ export type TailConsumer = {
12951306
environment?: string;
12961307
};
12971308

1309+
export type StreamingTailConsumer = {
1310+
/** The name of the service streaming tail events will be forwarded to. */
1311+
service: string;
1312+
};
1313+
12981314
export interface DispatchNamespaceOutbound {
12991315
/** Name of the service handling the outbound requests */
13001316
service: string;

packages/workers-utils/src/config/validation.ts

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import type {
4343
Observability,
4444
RawEnvironment,
4545
Rule,
46+
StreamingTailConsumer,
4647
TailConsumer,
4748
} from "./environment";
4849
import type { TypeofType, ValidatorFn } from "./validation-helpers";
@@ -1056,6 +1057,59 @@ const validateTailConsumers: ValidatorFn = (diagnostics, field, value) => {
10561057
return isValid;
10571058
};
10581059

1060+
/**
1061+
* Streaming tail consumers should match:
1062+
* {
1063+
* service: string,
1064+
* entrypoint?: string
1065+
* }
1066+
*/
1067+
function validateStreamingTailConsumer(
1068+
diagnostics: Diagnostics,
1069+
field: string,
1070+
value: StreamingTailConsumer
1071+
) {
1072+
if (typeof value !== "object" || value === null || Array.isArray(value)) {
1073+
diagnostics.errors.push(
1074+
`"${field}" should be an object but got ${JSON.stringify(value)}.`
1075+
);
1076+
return false;
1077+
}
1078+
1079+
return validateRequiredProperty(
1080+
diagnostics,
1081+
field,
1082+
"service",
1083+
value.service,
1084+
"string"
1085+
);
1086+
}
1087+
1088+
const validateStreamingTailConsumers: ValidatorFn = (
1089+
diagnostics,
1090+
field,
1091+
value
1092+
) => {
1093+
if (!value) {
1094+
return true;
1095+
}
1096+
if (!Array.isArray(value)) {
1097+
diagnostics.errors.push(
1098+
`Expected "${field}" to be an array but got ${JSON.stringify(value)}.`
1099+
);
1100+
return false;
1101+
}
1102+
1103+
let isValid = true;
1104+
for (let i = 0; i < value.length; i++) {
1105+
isValid =
1106+
validateStreamingTailConsumer(diagnostics, `${field}[${i}]`, value[i]) &&
1107+
isValid;
1108+
}
1109+
1110+
return isValid;
1111+
};
1112+
10591113
/**
10601114
* Validate top-level environment configuration and return the normalized values.
10611115
*/
@@ -1453,6 +1507,16 @@ function normalizeAndValidateEnvironment(
14531507
validateTailConsumers,
14541508
undefined
14551509
),
1510+
streaming_tail_consumers: notInheritable(
1511+
diagnostics,
1512+
topLevelEnv,
1513+
rawConfig,
1514+
rawEnv,
1515+
envName,
1516+
"streaming_tail_consumers",
1517+
validateStreamingTailConsumers,
1518+
undefined
1519+
),
14561520
unsafe: notInheritable(
14571521
diagnostics,
14581522
topLevelEnv,

packages/workers-utils/src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ type WorkerMetadataPut = {
165165
logpush?: boolean;
166166
placement?: CfPlacement;
167167
tail_consumers?: CfTailConsumer[];
168+
streaming_tail_consumers?: CfTailConsumer[];
168169
limits?: CfUserLimits;
169170

170171
assets?: {

packages/workers-utils/src/worker.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,7 @@ export interface CfWorkerInit {
453453
logpush: boolean | undefined;
454454
placement: CfPlacement | undefined;
455455
tail_consumers: CfTailConsumer[] | undefined;
456+
streaming_tail_consumers?: CfTailConsumer[] | undefined;
456457
limits: CfUserLimits | undefined;
457458
annotations?: Record<string, string | undefined>;
458459
keep_assets?: boolean | undefined;

0 commit comments

Comments
 (0)