diff --git a/.changeset/brave-memes-raise.md b/.changeset/brave-memes-raise.md new file mode 100644 index 000000000000..ceee1b9c3ec0 --- /dev/null +++ b/.changeset/brave-memes-raise.md @@ -0,0 +1,5 @@ +--- +"wrangler": patch +--- + +Changes fileSizeMB->file-size for compaction arg. Small fixes for pipelines commands diff --git a/packages/wrangler/src/__tests__/pipelines.test.ts b/packages/wrangler/src/__tests__/pipelines.test.ts index faffcadd20fd..a33912439623 100644 --- a/packages/wrangler/src/__tests__/pipelines.test.ts +++ b/packages/wrangler/src/__tests__/pipelines.test.ts @@ -1366,7 +1366,7 @@ describe("wrangler pipelines", () => { Batching: File Size: none - Time Interval: 30s + Time Interval: 300s Format: Type: json" @@ -1400,7 +1400,7 @@ describe("wrangler pipelines", () => { Batching: File Size: none - Time Interval: 30s + Time Interval: 300s Format: Type: json" @@ -1564,7 +1564,7 @@ describe("wrangler pipelines", () => { Batching: File Size: none - Time Interval: 30s + Time Interval: 300s Format: Type: json" diff --git a/packages/wrangler/src/__tests__/r2.test.ts b/packages/wrangler/src/__tests__/r2.test.ts index 256a04462f27..3c4f62a2d404 100644 --- a/packages/wrangler/src/__tests__/r2.test.ts +++ b/packages/wrangler/src/__tests__/r2.test.ts @@ -1301,7 +1301,7 @@ For more details, refer to: https://developers.cloudflare.com/r2/api/s3/tokens/" ) ); await runWrangler( - "r2 bucket catalog compaction enable testBucket --token fakecloudflaretoken --targetSizeMb 512" + "r2 bucket catalog compaction enable testBucket --token fakecloudflaretoken --target-size 512" ); expect(std.out).toMatchInlineSnapshot( `"✨ Successfully enabled file compaction for the data catalog for bucket 'testBucket'."` @@ -1332,8 +1332,8 @@ For more details, refer to: https://developers.cloudflare.com/r2/api/s3/tokens/" -v, --version Show version number [boolean] OPTIONS - --targetSizeMb The target size for compacted files (allowed values: 64, 128, 256, 512) [number] [default: 128] - --token A cloudflare api token with access to R2 and R2 Data Catalog which will be used to read/write files for compaction. [string] [required]" + --target-size The target size for compacted files in MB (allowed values: 64, 128, 256, 512) [number] [default: 128] + --token A cloudflare api token with access to R2 and R2 Data Catalog which will be used to read/write files for compaction. [string] [required]" `); expect(std.err).toMatchInlineSnapshot(` "X [ERROR] Not enough non-option arguments: got 0, need at least 1 @@ -1345,7 +1345,7 @@ For more details, refer to: https://developers.cloudflare.com/r2/api/s3/tokens/" it("should error if --token is not provided", async () => { await expect( runWrangler( - "r2 bucket catalog compaction enable testBucket --targetSizeMb 512" + "r2 bucket catalog compaction enable testBucket --target-size 512" ) ).rejects.toThrowErrorMatchingInlineSnapshot( `[Error: Missing required argument: token]` diff --git a/packages/wrangler/src/pipelines/cli/setup.ts b/packages/wrangler/src/pipelines/cli/setup.ts index ce04bf5bb1cd..7e0ff2bbc6f1 100644 --- a/packages/wrangler/src/pipelines/cli/setup.ts +++ b/packages/wrangler/src/pipelines/cli/setup.ts @@ -14,6 +14,7 @@ import { deleteStream, validateSql, } from "../client"; +import { SINK_DEFAULTS } from "../defaults"; import { authorizeR2Bucket } from "../index"; import { displayUsageExamples, @@ -231,10 +232,8 @@ async function buildField( { title: "string", value: "string" }, { title: "int32", value: "int32" }, { title: "int64", value: "int64" }, - { title: "u_int32", value: "u_int32" }, - { title: "u_int64", value: "u_int64" }, - { title: "f32", value: "f32" }, - { title: "f64", value: "f64" }, + { title: "float32", value: "float32" }, + { title: "float64", value: "float64" }, { title: "bool", value: "bool" }, { title: "timestamp", value: "timestamp" }, { title: "json", value: "json" }, @@ -419,13 +418,16 @@ async function setupR2Sink( }); } - const fileSizeMB = await prompt("Roll file when size reaches (MB):", { - defaultValue: "100", - }); + const fileSizeMB = await prompt( + "Roll file when size reaches (MB, minimum 5):", + { + defaultValue: "100", + } + ); const intervalSeconds = await prompt( - "Roll file when time reaches (seconds):", + "Roll file when time reaches (seconds, minimum 10):", { - defaultValue: "300", + defaultValue: String(SINK_DEFAULTS.rolling_policy.interval_seconds), } ); @@ -511,17 +513,20 @@ async function setupDataCatalogSink(setupConfig: SetupConfig): Promise { { title: "zstd", value: "zstd" }, { title: "lz4", value: "lz4" }, ], - defaultOption: 0, - fallbackOption: 0, + defaultOption: 3, + fallbackOption: 3, }); - const fileSizeMB = await prompt("Roll file when size reaches (MB):", { - defaultValue: "100", - }); + const fileSizeMB = await prompt( + "Roll file when size reaches (MB, minimum 5):", + { + defaultValue: "100", + } + ); const intervalSeconds = await prompt( - "Roll file when time reaches (seconds):", + "Roll file when time reaches (seconds, minimum 10):", { - defaultValue: "300", + defaultValue: String(SINK_DEFAULTS.rolling_policy.interval_seconds), } ); diff --git a/packages/wrangler/src/pipelines/cli/sinks/create.ts b/packages/wrangler/src/pipelines/cli/sinks/create.ts index a445cf21a509..352da69b2056 100644 --- a/packages/wrangler/src/pipelines/cli/sinks/create.ts +++ b/packages/wrangler/src/pipelines/cli/sinks/create.ts @@ -69,7 +69,6 @@ export const pipelinesSinksCreateCommand = createCommand({ "roll-size": { describe: "Roll file size in MB", type: "number", - default: SINK_DEFAULTS.rolling_policy.file_size_bytes / (1024 * 1024), }, "roll-interval": { describe: "Roll file interval in seconds", @@ -183,7 +182,7 @@ export const pipelinesSinksCreateCommand = createCommand({ } if (args.rollSize || args.rollInterval) { - let file_size_bytes: number = + let file_size_bytes: number | undefined = SINK_DEFAULTS.rolling_policy.file_size_bytes; let interval_seconds: number = SINK_DEFAULTS.rolling_policy.interval_seconds; @@ -196,7 +195,7 @@ export const pipelinesSinksCreateCommand = createCommand({ } sinkConfig.config.rolling_policy = { - file_size_bytes, + ...(file_size_bytes !== undefined && { file_size_bytes }), interval_seconds, }; } diff --git a/packages/wrangler/src/pipelines/cli/sinks/utils.ts b/packages/wrangler/src/pipelines/cli/sinks/utils.ts index 5a2690d7b778..1285511233ca 100644 --- a/packages/wrangler/src/pipelines/cli/sinks/utils.ts +++ b/packages/wrangler/src/pipelines/cli/sinks/utils.ts @@ -55,7 +55,7 @@ export function displaySinkConfiguration( const batching: Record = { "File Size": - fileSizeBytes === 0 + fileSizeBytes === undefined || fileSizeBytes === 0 ? "none" : `${Math.round(fileSizeBytes / (1024 * 1024))}MB`, "Time Interval": `${intervalSeconds}s`, diff --git a/packages/wrangler/src/pipelines/cli/streams/utils.ts b/packages/wrangler/src/pipelines/cli/streams/utils.ts index 966a9c170a69..1e7a0feef9fb 100644 --- a/packages/wrangler/src/pipelines/cli/streams/utils.ts +++ b/packages/wrangler/src/pipelines/cli/streams/utils.ts @@ -135,8 +135,8 @@ function generateSampleValue(field: SchemaField): SampleValue { return 42; case "int64": return "9223372036854775807"; // Large numbers as strings to avoid JS precision issues - case "f32": - case "f64": + case "float32": + case "float64": return 3.14; case "json": return { example: "json_value" }; diff --git a/packages/wrangler/src/pipelines/defaults.ts b/packages/wrangler/src/pipelines/defaults.ts index 634e4af5e45f..5584f38a5768 100644 --- a/packages/wrangler/src/pipelines/defaults.ts +++ b/packages/wrangler/src/pipelines/defaults.ts @@ -7,8 +7,8 @@ export const SINK_DEFAULTS = { row_group_bytes: 1024 * 1024 * 1024, } as ParquetFormat, rolling_policy: { - file_size_bytes: 0, - interval_seconds: 30, + file_size_bytes: undefined, + interval_seconds: 300, }, r2: { path: "", @@ -38,11 +38,18 @@ export function applyDefaultsToSink(sink: Sink): Sink { if (!withDefaults.config.rolling_policy) { withDefaults.config.rolling_policy = { - file_size_bytes: SINK_DEFAULTS.rolling_policy.file_size_bytes, interval_seconds: SINK_DEFAULTS.rolling_policy.interval_seconds, }; + // Only add file_size_bytes if it has a value + if (SINK_DEFAULTS.rolling_policy.file_size_bytes !== undefined) { + withDefaults.config.rolling_policy.file_size_bytes = + SINK_DEFAULTS.rolling_policy.file_size_bytes; + } } else { - if (!withDefaults.config.rolling_policy.file_size_bytes) { + if ( + !withDefaults.config.rolling_policy.file_size_bytes && + SINK_DEFAULTS.rolling_policy.file_size_bytes !== undefined + ) { withDefaults.config.rolling_policy.file_size_bytes = SINK_DEFAULTS.rolling_policy.file_size_bytes; } diff --git a/packages/wrangler/src/pipelines/types.ts b/packages/wrangler/src/pipelines/types.ts index 6148ac8d6f1f..3e8e78af5242 100644 --- a/packages/wrangler/src/pipelines/types.ts +++ b/packages/wrangler/src/pipelines/types.ts @@ -137,8 +137,8 @@ export type SchemaField = { | "bool" | "int32" | "int64" - | "f32" - | "f64" + | "float32" + | "float64" | "string" | "timestamp" | "json" @@ -169,7 +169,7 @@ export type Sink = { time_pattern: string; }; rolling_policy?: { - file_size_bytes: number; + file_size_bytes?: number; interval_seconds: number; }; // r2_data_catalog specific fields @@ -207,7 +207,7 @@ export interface CreateSinkRequest { time_pattern: string; }; rolling_policy?: { - file_size_bytes: number; + file_size_bytes?: number; interval_seconds: number; }; // R2 credentials (for r2 type) diff --git a/packages/wrangler/src/r2/catalog.ts b/packages/wrangler/src/r2/catalog.ts index 0344121e3e8b..8a2810ec081b 100644 --- a/packages/wrangler/src/r2/catalog.ts +++ b/packages/wrangler/src/r2/catalog.ts @@ -181,9 +181,9 @@ export const r2BucketCatalogCompactionEnableCommand = createCommand({ type: "string", demandOption: true, }, - targetSizeMb: { + "target-size": { describe: - "The target size for compacted files (allowed values: 64, 128, 256, 512)", + "The target size for compacted files in MB (allowed values: 64, 128, 256, 512)", type: "number", demandOption: false, default: 128, @@ -209,7 +209,7 @@ export const r2BucketCatalogCompactionEnableCommand = createCommand({ config, accountId, args.bucket, - args.targetSizeMb + args.targetSize ); logger.log(