Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 118 additions & 18 deletions controlplane/src/core/blobstorage/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,61 @@ import {
} from '@aws-sdk/client-s3';
import { BlobNotFoundError, BlobObject, type BlobStorage } from './index.js';

const maxConcurrency = 10; // Maximum number of concurrent operations (conservative for GCS compatibility)
const batchDelayMs = 200; // Delay between batches in milliseconds (balance between throughput and rate limiting)

/**
* Configuration options for S3BlobStorage
*/
export interface S3BlobStorageConfig {
/**
* Use individual delete operations instead of bulk delete.
* Set to true for GCS compatibility, false for better S3 performance.
* @default false
*/
useIndividualDeletes?: boolean;
}

/**
* Stores objects in S3 given an S3Client and a bucket name
*/
export class S3BlobStorage implements BlobStorage {
private readonly useIndividualDeletes: boolean;

constructor(
private s3Client: S3Client,
private bucketName: string,
) {}
config: S3BlobStorageConfig = {},
) {
this.useIndividualDeletes = config.useIndividualDeletes ?? false;
}

/**
* Sleep for the specified number of milliseconds
*/
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

/**
* Execute promises with limited concurrency and delays between batches
*/
private async executeWithConcurrency<T>(tasks: (() => Promise<T>)[], concurrency: number): Promise<T[]> {
const results: T[] = [];

for (let i = 0; i < tasks.length; i += concurrency) {
const batch = tasks.slice(i, i + concurrency);
const batchResults = await Promise.all(batch.map((task) => task()));
results.push(...batchResults);

// Add delay between batches (except for the last batch)
if (i + concurrency < tasks.length) {
await this.sleep(batchDelayMs);
}
}

return results;
}

async putObject<Metadata extends Record<string, string>>({
key,
Expand Down Expand Up @@ -88,28 +135,81 @@ export class S3BlobStorage implements BlobStorage {
}
}

async removeDirectory(data: { key: string; abortSignal?: AbortSignal }): Promise<number> {
const listCommand = new ListObjectsV2Command({
/**
* Delete objects using bulk DeleteObjectsCommand (efficient for S3)
*/
private async deleteObjectsBulk(objects: { Key?: string }[], abortSignal?: AbortSignal): Promise<number> {
const objectsToDelete = objects.filter((item) => item.Key).map((item) => ({ Key: item.Key! }));

if (objectsToDelete.length === 0) {
return 0;
}

const deleteCommand = new DeleteObjectsCommand({
Bucket: this.bucketName,
Prefix: data.key,
Delete: {
Objects: objectsToDelete,
Quiet: false,
},
});
const entries = await this.s3Client.send(listCommand, {
abortSignal: data.abortSignal,

const deleted = await this.s3Client.send(deleteCommand, { abortSignal });

if (deleted.Errors && deleted.Errors.length > 0) {
throw new Error(`Could not delete files: ${JSON.stringify(deleted.Errors)}`);
}

return objectsToDelete.length;
}

Comment thread
StarpTech marked this conversation as resolved.
/**
* Delete objects individually with limited concurrency (for GCS compatibility)
*/
private async deleteObjectsIndividually(objects: { Key?: string }[], abortSignal?: AbortSignal): Promise<number> {
const deleteTasks = objects.map((item) => async () => {
if (item.Key) {
const deleteCommand = new DeleteObjectCommand({
Bucket: this.bucketName,
Key: item.Key,
});
await this.s3Client.send(deleteCommand, { abortSignal });
return 1;
}
return 0;
});
const objectsToDelete = entries.Contents?.map((item) => ({ Key: item.Key }));
if (objectsToDelete && objectsToDelete.length > 0) {
const deleteCommand = new DeleteObjectsCommand({

const deletedCounts = await this.executeWithConcurrency(deleteTasks, maxConcurrency);
return deletedCounts.reduce((sum: number, count: number) => sum + count, 0);
}

async removeDirectory(data: { key: string; abortSignal?: AbortSignal }): Promise<number> {
let totalDeleted = 0;
let continuationToken: string | undefined;

do {
const listCommand = new ListObjectsV2Command({
Bucket: this.bucketName,
Delete: {
Objects: objectsToDelete,
Quiet: false,
},
Prefix: data.key,
ContinuationToken: continuationToken,
});

const entries = await this.s3Client.send(listCommand, {
abortSignal: data.abortSignal,
});
const deleted = await this.s3Client.send(deleteCommand);
if (deleted.Errors) {
throw new Error(`could not delete files: ${deleted.Errors}`);

if (entries.Contents && entries.Contents.length > 0) {
if (this.useIndividualDeletes) {
// Use individual deletes for GCS compatibility
totalDeleted += await this.deleteObjectsIndividually(entries.Contents, data.abortSignal);
} else {
// Use bulk delete for better S3 performance
totalDeleted += await this.deleteObjectsBulk(entries.Contents, data.abortSignal);
}
}
}
return objectsToDelete?.length ?? 0;

continuationToken = entries.IsTruncated ? entries.NextContinuationToken : undefined;
} while (continuationToken);

return totalDeleted;
Comment thread
StarpTech marked this conversation as resolved.
}
}
12 changes: 10 additions & 2 deletions controlplane/src/core/build-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import { BillingRepository } from './repositories/BillingRepository.js';
import { BillingService } from './services/BillingService.js';
import { UserRepository } from './repositories/UserRepository.js';
import { AIGraphReadmeQueue, createAIGraphReadmeWorker } from './workers/AIGraphReadmeWorker.js';
import { fastifyLoggerId, createS3ClientConfig, extractS3BucketName } from './util.js';
import { fastifyLoggerId, createS3ClientConfig, extractS3BucketName, isGoogleCloudStorageUrl } from './util.js';
import { ApiKeyRepository } from './repositories/ApiKeyRepository.js';
import { createDeleteOrganizationWorker, DeleteOrganizationQueue } from './workers/DeleteOrganizationWorker.js';
import {
Expand Down Expand Up @@ -106,6 +106,7 @@ export interface BuildConfig {
username?: string;
password?: string;
forcePathStyle?: boolean;
useIndividualDeletes?: boolean;
};
mailer: {
smtpEnabled: boolean;
Expand Down Expand Up @@ -310,7 +311,14 @@ export default async function build(opts: BuildConfig) {
const s3Config = createS3ClientConfig(bucketName, opts.s3Storage);

const s3Client = new S3Client(s3Config);
const blobStorage = new S3BlobStorage(s3Client, bucketName);
const blobStorage = new S3BlobStorage(s3Client, bucketName, {
// If the configuration option is not set, we try to detect whether the provided endpoint is a
// Google Cloud Storage endpoint, this is because GCS doesn't support the `deleteObjects` request as of
// August 12th, 2025
useIndividualDeletes:
opts.s3Storage.useIndividualDeletes ??
(isGoogleCloudStorageUrl(opts.s3Storage.url) || isGoogleCloudStorageUrl(s3Config.endpoint as string)),
});
Comment thread
StarpTech marked this conversation as resolved.

const platformWebhooks = new PlatformWebhookService(opts.webhook?.url, opts.webhook?.key, logger);

Expand Down
7 changes: 7 additions & 0 deletions controlplane/src/core/env.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ export const envVariables = z
.string()
.transform((val) => val === 'true')
.default('true'),
/**
* Whether to use individual deletes for S3 objects instead of bulking them.
*/
S3_USE_INDIVIDUAL_DELETES: z
.string()
.transform((val) => val === 'true')
.optional(),
Comment thread
wilsonrivera marked this conversation as resolved.
/**
* Email
*/
Expand Down
23 changes: 23 additions & 0 deletions controlplane/src/core/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,29 @@ export function webhookAxiosRetryCond(err: AxiosError) {
return isNetworkError(err) || isRetryableError(err);
}

/**
* Determines whether the given string is a Google Cloud Storage address by checking whether the hostname is
* `storage.googleapis.com` or the protocol is `gs:`.
*/
export function isGoogleCloudStorageUrl(s: string): boolean {
Comment thread
StarpTech marked this conversation as resolved.
if (!s) {
return false;
}

try {
const url = new URL(s);
const hostname = url.hostname.toLowerCase();

return (
url.protocol === 'gs:' || hostname === 'storage.googleapis.com' || hostname.endsWith('.storage.googleapis.com')
);
} catch {
// ignore
}

return false;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

export function createS3ClientConfig(bucketName: string, opts: S3StorageOptions): S3ClientConfig {
const url = new URL(opts.url);
const { region, username, password } = opts;
Expand Down
2 changes: 2 additions & 0 deletions controlplane/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const {
S3_ACCESS_KEY_ID,
S3_SECRET_ACCESS_KEY,
S3_FORCE_PATH_STYLE,
S3_USE_INDIVIDUAL_DELETES,
SMTP_ENABLED,
SMTP_HOST,
SMTP_PORT,
Expand Down Expand Up @@ -128,6 +129,7 @@ const options: BuildConfig = {
username: S3_ACCESS_KEY_ID,
password: S3_SECRET_ACCESS_KEY,
forcePathStyle: S3_FORCE_PATH_STYLE,
useIndividualDeletes: S3_USE_INDIVIDUAL_DELETES,
},
mailer: {
smtpEnabled: SMTP_ENABLED,
Expand Down