diff --git a/controlplane/src/core/blobstorage/s3.ts b/controlplane/src/core/blobstorage/s3.ts index ea7c7c6983..279259db45 100644 --- a/controlplane/src/core/blobstorage/s3.ts +++ b/controlplane/src/core/blobstorage/s3.ts @@ -9,14 +9,49 @@ import { } from '@aws-sdk/client-s3'; import { BlobNotFoundError, BlobObject, type BlobStorage } from './index.js'; +const maxConcurrency = 10; // Maximum number of concurrent operations + +/** + * Configuration options for S3BlobStorage + */ +export interface S3BlobStorageConfig { + /** + * Use individual delete operations instead of bulk delete. + * Set to true for GCS compatibility, false for better S3 performance. + * @default false + */ + useIndividualDeletes?: boolean; +} + /** * Stores objects in S3 given an S3Client and a bucket name */ export class S3BlobStorage implements BlobStorage { + private readonly useIndividualDeletes: boolean; + constructor( private s3Client: S3Client, private bucketName: string, - ) {} + config: S3BlobStorageConfig = {}, + ) { + this.useIndividualDeletes = config.useIndividualDeletes ?? false; + } + + /** + * Execute promises with limited concurrency and delays between batches + * Retries are handled by AWS SDK internally using exponential backoff. Default 3 retries. + */ + private async executeWithConcurrency(tasks: (() => Promise)[], concurrency: number): Promise { + const results: T[] = []; + + for (let i = 0; i < tasks.length; i += concurrency) { + const batch = tasks.slice(i, i + concurrency); + const batchResults = await Promise.all(batch.map((task) => task())); + results.push(...batchResults); + } + + return results; + } async putObject>({ key, @@ -88,28 +123,81 @@ export class S3BlobStorage implements BlobStorage { } } - async removeDirectory(data: { key: string; abortSignal?: AbortSignal }): Promise { - const listCommand = new ListObjectsV2Command({ + /** + * Delete objects using bulk DeleteObjectsCommand (efficient for S3) + */ + private async deleteObjectsBulk(objects: { Key?: string }[], abortSignal?: AbortSignal): Promise { + const objectsToDelete = objects.filter((item) => item.Key).map((item) => ({ Key: item.Key! })); + + if (objectsToDelete.length === 0) { + return 0; + } + + const deleteCommand = new DeleteObjectsCommand({ Bucket: this.bucketName, - Prefix: data.key, + Delete: { + Objects: objectsToDelete, + Quiet: false, + }, }); - const entries = await this.s3Client.send(listCommand, { - abortSignal: data.abortSignal, + + const deleted = await this.s3Client.send(deleteCommand, { abortSignal }); + + if (deleted.Errors && deleted.Errors.length > 0) { + throw new Error(`Could not delete files: ${JSON.stringify(deleted.Errors)}`); + } + + return deleted.Deleted?.length ?? 0; + } + + /** + * Delete objects individually with limited concurrency (for GCS compatibility) + */ + private async deleteObjectsIndividually(objects: { Key?: string }[], abortSignal?: AbortSignal): Promise { + const deleteTasks = objects.map((item) => async () => { + if (item.Key) { + const deleteCommand = new DeleteObjectCommand({ + Bucket: this.bucketName, + Key: item.Key, + }); + await this.s3Client.send(deleteCommand, { abortSignal }); + return 1; + } + return 0; }); - const objectsToDelete = entries.Contents?.map((item) => ({ Key: item.Key })); - if (objectsToDelete && objectsToDelete.length > 0) { - const deleteCommand = new DeleteObjectsCommand({ + + const deletedCounts = await this.executeWithConcurrency(deleteTasks, maxConcurrency); + return deletedCounts.reduce((sum: number, count: number) => sum + count, 0); + } + + async removeDirectory(data: { key: string; abortSignal?: AbortSignal }): Promise { + let totalDeleted = 0; + let continuationToken: string | undefined; + + do { + const listCommand = new ListObjectsV2Command({ Bucket: this.bucketName, - Delete: { - Objects: objectsToDelete, - Quiet: false, - }, + Prefix: data.key, + ContinuationToken: continuationToken, + }); + + const entries = await this.s3Client.send(listCommand, { + abortSignal: data.abortSignal, }); - const deleted = await this.s3Client.send(deleteCommand); - if (deleted.Errors) { - throw new Error(`could not delete files: ${deleted.Errors}`); + + if (entries.Contents && entries.Contents.length > 0) { + if (this.useIndividualDeletes) { + // Use individual deletes for S3 implementation without DeleteObjectsCommand + totalDeleted += await this.deleteObjectsIndividually(entries.Contents, data.abortSignal); + } else { + // Use bulk delete for better S3 performance + totalDeleted += await this.deleteObjectsBulk(entries.Contents, data.abortSignal); + } } - } - return objectsToDelete?.length ?? 0; + + continuationToken = entries.IsTruncated ? entries.NextContinuationToken : undefined; + } while (continuationToken); + + return totalDeleted; } } diff --git a/controlplane/src/core/build-server.ts b/controlplane/src/core/build-server.ts index 115dc5eb7c..0820536ae5 100644 --- a/controlplane/src/core/build-server.ts +++ b/controlplane/src/core/build-server.ts @@ -37,7 +37,7 @@ import { BillingRepository } from './repositories/BillingRepository.js'; import { BillingService } from './services/BillingService.js'; import { UserRepository } from './repositories/UserRepository.js'; import { AIGraphReadmeQueue, createAIGraphReadmeWorker } from './workers/AIGraphReadmeWorker.js'; -import { fastifyLoggerId, createS3ClientConfig, extractS3BucketName } from './util.js'; +import { fastifyLoggerId, createS3ClientConfig, extractS3BucketName, isGoogleCloudStorageUrl } from './util.js'; import { ApiKeyRepository } from './repositories/ApiKeyRepository.js'; import { createDeleteOrganizationWorker, DeleteOrganizationQueue } from './workers/DeleteOrganizationWorker.js'; import { @@ -106,6 +106,7 @@ export interface BuildConfig { username?: string; password?: string; forcePathStyle?: boolean; + useIndividualDeletes?: boolean; }; mailer: { smtpEnabled: boolean; @@ -310,7 +311,13 @@ export default async function build(opts: BuildConfig) { const s3Config = createS3ClientConfig(bucketName, opts.s3Storage); const s3Client = new S3Client(s3Config); - const blobStorage = new S3BlobStorage(s3Client, bucketName); + const blobStorage = new S3BlobStorage(s3Client, bucketName, { + // GCS does not support DeleteObjects; force individual deletes when detected. + useIndividualDeletes: + isGoogleCloudStorageUrl(opts.s3Storage.url) || isGoogleCloudStorageUrl(s3Config.endpoint as string) + ? true + : opts.s3Storage.useIndividualDeletes ?? false, + }); const platformWebhooks = new PlatformWebhookService(opts.webhook?.url, opts.webhook?.key, logger); diff --git a/controlplane/src/core/env.schema.ts b/controlplane/src/core/env.schema.ts index f44ba0f719..a70609ab32 100644 --- a/controlplane/src/core/env.schema.ts +++ b/controlplane/src/core/env.schema.ts @@ -142,6 +142,13 @@ export const envVariables = z .string() .transform((val) => val === 'true') .default('true'), + /** + * Whether to use individual deletes for S3 objects instead of bulking them. + */ + S3_USE_INDIVIDUAL_DELETES: z + .string() + .transform((val) => val === 'true') + .optional(), /** * Email */ diff --git a/controlplane/src/core/util.ts b/controlplane/src/core/util.ts index 3ab6ee8382..9b6c17a3fd 100644 --- a/controlplane/src/core/util.ts +++ b/controlplane/src/core/util.ts @@ -404,6 +404,29 @@ export function webhookAxiosRetryCond(err: AxiosError) { return isNetworkError(err) || isRetryableError(err); } +/** + * Determines whether the given string is a Google Cloud Storage address by checking whether the hostname is + * `storage.googleapis.com` or the protocol is `gs:`. + */ +export function isGoogleCloudStorageUrl(s: string): boolean { + if (!s) { + return false; + } + + try { + const url = new URL(s); + const hostname = url.hostname.toLowerCase(); + + return ( + url.protocol === 'gs:' || hostname === 'storage.googleapis.com' || hostname.endsWith('.storage.googleapis.com') + ); + } catch { + // ignore + } + + return false; +} + export function createS3ClientConfig(bucketName: string, opts: S3StorageOptions): S3ClientConfig { const url = new URL(opts.url); const { region, username, password } = opts; diff --git a/controlplane/src/index.ts b/controlplane/src/index.ts index ef3f008977..1387868cdc 100644 --- a/controlplane/src/index.ts +++ b/controlplane/src/index.ts @@ -46,6 +46,7 @@ const { S3_ACCESS_KEY_ID, S3_SECRET_ACCESS_KEY, S3_FORCE_PATH_STYLE, + S3_USE_INDIVIDUAL_DELETES, SMTP_ENABLED, SMTP_HOST, SMTP_PORT, @@ -128,6 +129,7 @@ const options: BuildConfig = { username: S3_ACCESS_KEY_ID, password: S3_SECRET_ACCESS_KEY, forcePathStyle: S3_FORCE_PATH_STYLE, + useIndividualDeletes: S3_USE_INDIVIDUAL_DELETES, }, mailer: { smtpEnabled: SMTP_ENABLED, diff --git a/controlplane/test/utils.test.ts b/controlplane/test/utils.test.ts index 85cddf31f1..aa6f5a6a19 100644 --- a/controlplane/test/utils.test.ts +++ b/controlplane/test/utils.test.ts @@ -1,5 +1,5 @@ import { describe, expect, test } from 'vitest'; -import { isValidLabelMatchers, mergeUrls, normalizeLabelMatchers } from '../src/core/util.js'; +import { isValidLabelMatchers, mergeUrls, normalizeLabelMatchers, isGoogleCloudStorageUrl } from '../src/core/util.js'; describe('Utils', () => { test('isValidLabelMatchers', () => { @@ -29,4 +29,24 @@ describe('Utils', () => { expect(mergeUrls('http://example.com/auth', '/path')).toBe('http://example.com/auth/path'); expect(mergeUrls('http://example.com/auth/', '/path')).toBe('http://example.com/auth/path'); }); + + describe('isGoogleCloudStorageUrl', () => { + test('that true is returned when a valid Google Cloud Storage URL', () => { + expect(isGoogleCloudStorageUrl('https://storage.googleapis.com/')).toBe(true); + expect(isGoogleCloudStorageUrl('https://STORAGE.GOOGLEAPIS.COM')).toBe(true); + expect(isGoogleCloudStorageUrl('https://storage.googleapis.com/bucket-name')).toBe(true); + expect(isGoogleCloudStorageUrl('https://bucket-name.storage.googleapis.com/')).toBe(true); + }); + + test('that true is returned when an URL with the `gs` protocol', () => { + expect(isGoogleCloudStorageUrl('gs://bucket-name')).toBe(true); + }); + + test('that false is returned when the URL is not a valid Google Cloud Storage URL', () => { + expect(isGoogleCloudStorageUrl('http://minio/cosmo')).toBe(false); + expect(isGoogleCloudStorageUrl('https://bucket-name.s3.amazonaws.com/')).toBe(false); + expect(isGoogleCloudStorageUrl('https://bucket-name.s3.amazonaws.com')).toBe(false); + expect(isGoogleCloudStorageUrl('https://storage.googleapis.com.evil.com')).toBe(false); + }); + }); });