Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions controlplane/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ S3_ENDPOINT=""
S3_ACCESS_KEY_ID=
S3_SECRET_ACCESS_KEY=
S3_FORCE_PATH_STYLE="true"
S3_USE_INDIVIDUAL_DELETES="false"

# Optional for Stripe Integration
DEFAULT_PLAN="developer@1"
Expand Down
136 changes: 118 additions & 18 deletions controlplane/src/core/blobstorage/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,61 @@ import {
} from '@aws-sdk/client-s3';
import { BlobNotFoundError, BlobObject, type BlobStorage } from './index.js';

const maxConcurrency = 10; // Maximum number of concurrent operations (conservative for GCS compatibility)
const batchDelayMs = 200; // Delay between batches in milliseconds (balance between throughput and rate limiting)

/**
* Configuration options for S3BlobStorage
*/
export interface S3BlobStorageConfig {
/**
* Use individual delete operations instead of bulk delete.
* Set to true for GCS compatibility, false for better S3 performance.
* @default false
*/
useIndividualDeletes?: boolean;
}

/**
* Stores objects in S3 given an S3Client and a bucket name
*/
export class S3BlobStorage implements BlobStorage {
private readonly useIndividualDeletes: boolean;

constructor(
private s3Client: S3Client,
private bucketName: string,
) {}
config: S3BlobStorageConfig = {},
) {
this.useIndividualDeletes = config.useIndividualDeletes ?? false;
}

/**
* Sleep for the specified number of milliseconds
*/
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

/**
* Execute promises with limited concurrency and delays between batches
*/
private async executeWithConcurrency<T>(tasks: (() => Promise<T>)[], concurrency: number): Promise<T[]> {
const results: T[] = [];

for (let i = 0; i < tasks.length; i += concurrency) {
const batch = tasks.slice(i, i + concurrency);
const batchResults = await Promise.all(batch.map((task) => task()));
results.push(...batchResults);

// Add delay between batches (except for the last batch)
if (i + concurrency < tasks.length) {
await this.sleep(batchDelayMs);
}
}

return results;
}

async putObject<Metadata extends Record<string, string>>({
key,
Expand Down Expand Up @@ -88,28 +135,81 @@ export class S3BlobStorage implements BlobStorage {
}
}

async removeDirectory(data: { key: string; abortSignal?: AbortSignal }): Promise<number> {
const listCommand = new ListObjectsV2Command({
/**
* Delete objects using bulk DeleteObjectsCommand (efficient for S3)
*/
private async deleteObjectsBulk(objects: { Key?: string }[], abortSignal?: AbortSignal): Promise<number> {
const objectsToDelete = objects.filter((item) => item.Key).map((item) => ({ Key: item.Key! }));

if (objectsToDelete.length === 0) {
return 0;
}

const deleteCommand = new DeleteObjectsCommand({
Bucket: this.bucketName,
Prefix: data.key,
Delete: {
Objects: objectsToDelete,
Quiet: false,
},
});
const entries = await this.s3Client.send(listCommand, {
abortSignal: data.abortSignal,

const deleted = await this.s3Client.send(deleteCommand, { abortSignal });

if (deleted.Errors && deleted.Errors.length > 0) {
throw new Error(`Could not delete files: ${JSON.stringify(deleted.Errors)}`);
}

return objectsToDelete.length;
}

Comment thread
StarpTech marked this conversation as resolved.
/**
* Delete objects individually with limited concurrency (for GCS compatibility)
*/
private async deleteObjectsIndividually(objects: { Key?: string }[], abortSignal?: AbortSignal): Promise<number> {
const deleteTasks = objects.map((item) => async () => {
if (item.Key) {
const deleteCommand = new DeleteObjectCommand({
Bucket: this.bucketName,
Key: item.Key,
});
await this.s3Client.send(deleteCommand, { abortSignal });
return 1;
}
return 0;
});
const objectsToDelete = entries.Contents?.map((item) => ({ Key: item.Key }));
if (objectsToDelete && objectsToDelete.length > 0) {
const deleteCommand = new DeleteObjectsCommand({

const deletedCounts = await this.executeWithConcurrency(deleteTasks, maxConcurrency);
return deletedCounts.reduce((sum: number, count: number) => sum + count, 0);
}

async removeDirectory(data: { key: string; abortSignal?: AbortSignal }): Promise<number> {
let totalDeleted = 0;
let continuationToken: string | undefined;

do {
const listCommand = new ListObjectsV2Command({
Bucket: this.bucketName,
Delete: {
Objects: objectsToDelete,
Quiet: false,
},
Prefix: data.key,
ContinuationToken: continuationToken,
});

const entries = await this.s3Client.send(listCommand, {
abortSignal: data.abortSignal,
});
const deleted = await this.s3Client.send(deleteCommand);
if (deleted.Errors) {
throw new Error(`could not delete files: ${deleted.Errors}`);

if (entries.Contents && entries.Contents.length > 0) {
if (this.useIndividualDeletes) {
// Use individual deletes for GCS compatibility
totalDeleted += await this.deleteObjectsIndividually(entries.Contents, data.abortSignal);
} else {
// Use bulk delete for better S3 performance
totalDeleted += await this.deleteObjectsBulk(entries.Contents, data.abortSignal);
}
}
}
return objectsToDelete?.length ?? 0;

continuationToken = entries.IsTruncated ? entries.NextContinuationToken : undefined;
} while (continuationToken);

return totalDeleted;
Comment thread
StarpTech marked this conversation as resolved.
}
}
9 changes: 8 additions & 1 deletion controlplane/src/core/build-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
username?: string;
password?: string;
forcePathStyle?: boolean;
useIndividualDeletes?: boolean;
};
mailer: {
smtpEnabled: boolean;
Expand Down Expand Up @@ -310,7 +311,13 @@
const s3Config = createS3ClientConfig(bucketName, opts.s3Storage);

const s3Client = new S3Client(s3Config);
const blobStorage = new S3BlobStorage(s3Client, bucketName);
const blobStorage = new S3BlobStorage(s3Client, bucketName, {
// When using GCS, we overwrite the configured behavior to always use individual deletes as GCS doesn't
// support bulk object deletion as of August 11th, 2025
useIndividualDeletes: opts.s3Storage.url.toLowerCase().includes('storage.googleapis.com')
Comment thread Fixed
? true
: opts.s3Storage.useIndividualDeletes ?? false,
});
Comment thread
StarpTech marked this conversation as resolved.

const platformWebhooks = new PlatformWebhookService(opts.webhook?.url, opts.webhook?.key, logger);

Expand Down
9 changes: 9 additions & 0 deletions controlplane/src/core/env.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ export const envVariables = z
.string()
.transform((val) => val === 'true')
.default('true'),
/**
* Whether to use individual deletes for S3 objects instead of bulking them.
*
* This value is overwritten when using GCS to always be `true` as GCS does not support bulk object deletes.
*/
S3_USE_INDIVIDUAL_DELETES: z
.string()
.transform((val) => val === 'true')
.optional(),
Comment thread
wilsonrivera marked this conversation as resolved.
/**
* Email
*/
Expand Down
2 changes: 2 additions & 0 deletions controlplane/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const {
S3_ACCESS_KEY_ID,
S3_SECRET_ACCESS_KEY,
S3_FORCE_PATH_STYLE,
S3_USE_INDIVIDUAL_DELETES,
SMTP_ENABLED,
SMTP_HOST,
SMTP_PORT,
Expand Down Expand Up @@ -128,6 +129,7 @@ const options: BuildConfig = {
username: S3_ACCESS_KEY_ID,
password: S3_SECRET_ACCESS_KEY,
forcePathStyle: S3_FORCE_PATH_STYLE,
useIndividualDeletes: S3_USE_INDIVIDUAL_DELETES,
},
mailer: {
smtpEnabled: SMTP_ENABLED,
Expand Down