From 88a7b81b9842c3525859906f32b4f40c63327f4e Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Thu, 12 Feb 2026 11:23:41 +0000 Subject: [PATCH] chore: set up tx file store in next-net --- spartan/environments/next-net.env | 5 ++- spartan/scripts/deploy_network.sh | 13 +++++++ spartan/scripts/setup_gcp_secrets.sh | 8 ++++ spartan/terraform/deploy-aztec-infra/main.tf | 7 ++++ .../terraform/deploy-aztec-infra/variables.tf | 19 +++++++++ yarn-project/p2p/src/client/factory.ts | 12 +++++- .../tx_collection/file_store_tx_source.ts | 9 +++-- .../tx_file_store/tx_file_store.test.ts | 39 +++++++++++-------- .../services/tx_file_store/tx_file_store.ts | 8 ++-- 9 files changed, 95 insertions(+), 25 deletions(-) diff --git a/spartan/environments/next-net.env b/spartan/environments/next-net.env index 96e43e756467..33708eed55db 100644 --- a/spartan/environments/next-net.env +++ b/spartan/environments/next-net.env @@ -18,6 +18,9 @@ ETHERSCAN_API_KEY=REPLACE_WITH_GCP_SECRET DEPLOY_INTERNAL_BOOTNODE=true STORE_SNAPSHOT_URL= BLOB_BUCKET_DIRECTORY=${BLOB_BUCKET_DIRECTORY:-next-net/blobs} +TX_FILE_STORE_ENABLED=true +TX_FILE_STORE_BUCKET_DIRECTORY=${TX_FILE_STORE_BUCKET_DIRECTORY:-next-net/txs} +TX_COLLECTION_FILE_STORE_URLS="https://aztec-labs-snapshots.com/${TX_FILE_STORE_BUCKET_DIRECTORY}" R2_ACCESS_KEY_ID=REPLACE_WITH_GCP_SECRET R2_SECRET_ACCESS_KEY=REPLACE_WITH_GCP_SECRET PROVER_FAILED_PROOF_STORE=gs://aztec-develop/next-net/failed-proofs @@ -56,4 +59,4 @@ RPC_INGRESS_STATIC_IP_NAME=nextnet-rpc-ip RPC_INGRESS_SSL_CERT_NAMES='["nextnet-rpc-cert"]' VALIDATOR_HA_REPLICAS=1 -VALIDATOR_RESOURCE_PROFILE="prod-spot" \ No newline at end of file +VALIDATOR_RESOURCE_PROFILE="prod-spot" diff --git a/spartan/scripts/deploy_network.sh b/spartan/scripts/deploy_network.sh index d9ef3fd2e13b..9f38cdd4dbc9 100755 --- a/spartan/scripts/deploy_network.sh +++ b/spartan/scripts/deploy_network.sh @@ -162,6 +162,16 @@ else BLOB_FILE_STORE_UPLOAD_URL_TF="null" fi +# TX filestore configuration +TX_FILE_STORE_ENABLED=${TX_FILE_STORE_ENABLED:-false} +TX_FILE_STORE_URL_TF="" +if [[ -n "${TX_FILE_STORE_URL:-}" ]]; then + TX_FILE_STORE_URL_TF="\"$TX_FILE_STORE_URL\"" +else + TX_FILE_STORE_URL_TF="null" +fi +TX_COLLECTION_FILE_STORE_URLS=${TX_COLLECTION_FILE_STORE_URLS:-} + P2P_GOSSIPSUB_D=${P2P_GOSSIPSUB_D:-6} P2P_GOSSIPSUB_DLO=${P2P_GOSSIPSUB_DLO:-4} P2P_GOSSIPSUB_DHI=${P2P_GOSSIPSUB_DHI:-12} @@ -546,6 +556,9 @@ PROVER_L1_PRIORITY_FEE_BUMP_PERCENTAGE = ${PROVER_L1_PRIORITY_FEE_BUMP_PERCENTAG PROVER_L1_PRIORITY_FEE_RETRY_BUMP_PERCENTAGE = ${PROVER_L1_PRIORITY_FEE_RETRY_BUMP_PERCENTAGE:-null} BLOB_ALLOW_EMPTY_SOURCES = ${BLOB_ALLOW_EMPTY_SOURCES:-false} BLOB_FILE_STORE_UPLOAD_URL = ${BLOB_FILE_STORE_UPLOAD_URL_TF} +TX_FILE_STORE_ENABLED = ${TX_FILE_STORE_ENABLED} +TX_FILE_STORE_URL = ${TX_FILE_STORE_URL_TF} +TX_COLLECTION_FILE_STORE_URLS = "${TX_COLLECTION_FILE_STORE_URLS}" DEBUG_P2P_INSTRUMENT_MESSAGES = ${DEBUG_P2P_INSTRUMENT_MESSAGES:-false} PROVER_AGENT_INCLUDE_METRICS = "${PROVER_AGENT_INCLUDE_METRICS-null}" diff --git a/spartan/scripts/setup_gcp_secrets.sh b/spartan/scripts/setup_gcp_secrets.sh index fb4073388386..2bde3c4e4b15 100755 --- a/spartan/scripts/setup_gcp_secrets.sh +++ b/spartan/scripts/setup_gcp_secrets.sh @@ -157,4 +157,12 @@ if [[ -n "${BLOB_BUCKET_DIRECTORY:-}" ]]; then export BLOB_FILE_STORE_UPLOAD_URL="s3://testnet-bucket/${BLOB_BUCKET_DIRECTORY}/?endpoint=https://${r2_account_id}.r2.cloudflarestorage.com" fi +# Construct TX_FILE_STORE_URL from the r2-account-id secret and TX_FILE_STORE_BUCKET_DIRECTORY +if [[ -n "${TX_FILE_STORE_BUCKET_DIRECTORY:-}" ]]; then + secret_file=$(get_secret "r2-account-id") + mask_secret_value "TX_FILE_STORE_URL" "$secret_file" + r2_account_id=$(cat "$secret_file") + export TX_FILE_STORE_URL="s3://testnet-bucket/${TX_FILE_STORE_BUCKET_DIRECTORY}/?endpoint=https://${r2_account_id}.r2.cloudflarestorage.com" +fi + echo "Successfully set up GCP secrets for $NETWORK" diff --git a/spartan/terraform/deploy-aztec-infra/main.tf b/spartan/terraform/deploy-aztec-infra/main.tf index 8390addb91e4..2e36227b6d2f 100644 --- a/spartan/terraform/deploy-aztec-infra/main.tf +++ b/spartan/terraform/deploy-aztec-infra/main.tf @@ -219,6 +219,7 @@ locals { "validator.node.env.P2P_DROP_TX" = var.P2P_DROP_TX "validator.node.env.P2P_DROP_TX_CHANCE" = var.P2P_DROP_TX_CHANCE "validator.node.env.WS_NUM_HISTORIC_BLOCKS" = var.WS_NUM_HISTORIC_BLOCKS + "validator.node.env.TX_COLLECTION_FILE_STORE_URLS" = var.TX_COLLECTION_FILE_STORE_URLS } # Note: nonsensitive() is required here because helm_releases is used in for_each, @@ -357,6 +358,7 @@ locals { "node.node.env.P2P_DROP_TX" = var.P2P_DROP_TX "node.node.env.P2P_DROP_TX_CHANCE" = var.P2P_DROP_TX_CHANCE "node.node.env.WS_NUM_HISTORIC_BLOCKS" = var.WS_NUM_HISTORIC_BLOCKS + "node.node.env.TX_COLLECTION_FILE_STORE_URLS" = var.TX_COLLECTION_FILE_STORE_URLS "node.service.p2p.nodePortEnabled" = var.P2P_NODEPORT_ENABLED "node.service.p2p.announcePort" = local.p2p_port_prover "node.service.p2p.port" = local.p2p_port_prover @@ -436,6 +438,9 @@ locals { "node.env.P2P_DROP_TX" = var.P2P_DROP_TX "node.env.P2P_DROP_TX_CHANCE" = var.P2P_DROP_TX_CHANCE "node.env.WS_NUM_HISTORIC_BLOCKS" = var.WS_NUM_HISTORIC_BLOCKS + "node.env.TX_FILE_STORE_ENABLED" = var.TX_FILE_STORE_ENABLED + "node.env.TX_FILE_STORE_URL" = var.TX_FILE_STORE_URL + "node.env.TX_COLLECTION_FILE_STORE_URLS" = var.TX_COLLECTION_FILE_STORE_URLS }, # Only set RPC mnemonic config in fisherman mode) var.FISHERMAN_MODE ? { @@ -490,6 +495,7 @@ locals { "node.env.P2P_DROP_TX" = var.P2P_DROP_TX "node.env.P2P_DROP_TX_CHANCE" = var.P2P_DROP_TX_CHANCE "node.env.WS_NUM_HISTORIC_BLOCKS" = var.WS_NUM_HISTORIC_BLOCKS + "node.env.TX_COLLECTION_FILE_STORE_URLS" = var.TX_COLLECTION_FILE_STORE_URLS } boot_node_host_path = "node.env.BOOT_NODE_HOST" bootstrap_nodes_path = "node.env.BOOTSTRAP_NODES" @@ -529,6 +535,7 @@ locals { "node.env.P2P_DROP_TX" = var.P2P_DROP_TX "node.env.P2P_DROP_TX_CHANCE" = var.P2P_DROP_TX_CHANCE "node.env.WS_NUM_HISTORIC_BLOCKS" = var.WS_NUM_HISTORIC_BLOCKS + "node.env.TX_COLLECTION_FILE_STORE_URLS" = var.TX_COLLECTION_FILE_STORE_URLS } boot_node_host_path = "node.env.BOOT_NODE_HOST" bootstrap_nodes_path = "node.env.BOOTSTRAP_NODES" diff --git a/spartan/terraform/deploy-aztec-infra/variables.tf b/spartan/terraform/deploy-aztec-infra/variables.tf index b8821306bde2..1f38933f523b 100644 --- a/spartan/terraform/deploy-aztec-infra/variables.tf +++ b/spartan/terraform/deploy-aztec-infra/variables.tf @@ -656,6 +656,25 @@ variable "BLOB_FILE_STORE_UPLOAD_URL" { default = null } +variable "TX_FILE_STORE_ENABLED" { + description = "Whether to enable uploading transactions to file storage" + type = bool + default = false +} + +variable "TX_FILE_STORE_URL" { + description = "URL for uploading transactions (e.g., s3://bucket/path/, gs://bucket/path/)" + type = string + nullable = true + default = null +} + +variable "TX_COLLECTION_FILE_STORE_URLS" { + description = "Comma-separated URLs for reading transactions from file storage" + type = string + default = "" +} + variable "PROVER_AGENT_POLL_INTERVAL_MS" { description = "Interval in milliseconds between prover agent polls" type = number diff --git a/yarn-project/p2p/src/client/factory.ts b/yarn-project/p2p/src/client/factory.ts index 30299c55aada..1a4a813c5fa5 100644 --- a/yarn-project/p2p/src/client/factory.ts +++ b/yarn-project/p2p/src/client/factory.ts @@ -76,6 +76,9 @@ export async function createP2PClient( const attestationStore = await createStore(P2P_ATTESTATION_STORE_NAME, 1, config, bindings); const l1Constants = await archiver.getL1Constants(); + const rollupAddress = inputConfig.l1Contracts.rollupAddress.toString().toLowerCase().replace(/^0x/, ''); + const txFileStoreBasePath = `aztec-${inputConfig.l1ChainId}-${inputConfig.rollupVersion}-0x${rollupAddress}`; + /** Validator factory for pool re-validation (double-spend + block header only). */ const createPoolTxValidator = async () => { await worldStateSynchronizer.syncImmediate(); @@ -154,6 +157,7 @@ export async function createP2PClient( const fileStoreSources = await createFileStoreTxSources( config.txCollectionFileStoreUrls, + txFileStoreBasePath, logger.createChild('file-store-tx-source'), ); if (fileStoreSources.length > 0) { @@ -174,7 +178,13 @@ export async function createP2PClient( logger.createChild('tx-collection'), ); - const txFileStore = await TxFileStore.create(mempools.txPool, config, logger.createChild('tx-file-store'), telemetry); + const txFileStore = await TxFileStore.create( + mempools.txPool, + config, + txFileStoreBasePath, + logger.createChild('tx-file-store'), + telemetry, + ); return new P2PClient( clientType, diff --git a/yarn-project/p2p/src/services/tx_collection/file_store_tx_source.ts b/yarn-project/p2p/src/services/tx_collection/file_store_tx_source.ts index b88f6b028ede..ec8381d2d6cf 100644 --- a/yarn-project/p2p/src/services/tx_collection/file_store_tx_source.ts +++ b/yarn-project/p2p/src/services/tx_collection/file_store_tx_source.ts @@ -9,6 +9,7 @@ export class FileStoreTxSource implements TxSource { private constructor( private readonly fileStore: ReadOnlyFileStore, private readonly baseUrl: string, + private readonly basePath: string, private readonly log: Logger, ) {} @@ -20,6 +21,7 @@ export class FileStoreTxSource implements TxSource { */ public static async create( url: string, + basePath: string, log: Logger = createLogger('p2p:file_store_tx_source'), ): Promise { try { @@ -28,7 +30,7 @@ export class FileStoreTxSource implements TxSource { log.warn(`Failed to create file store for URL: ${url}`); return undefined; } - return new FileStoreTxSource(fileStore, url, log); + return new FileStoreTxSource(fileStore, url, basePath, log); } catch (err) { log.warn(`Error creating file store for URL: ${url}`, { error: err }); return undefined; @@ -42,7 +44,7 @@ export class FileStoreTxSource implements TxSource { public getTxsByHash(txHashes: TxHash[]): Promise<(Tx | undefined)[]> { return Promise.all( txHashes.map(async txHash => { - const path = `txs/${txHash.toString()}.bin`; + const path = `${this.basePath}/txs/${txHash.toString()}.bin`; try { const buffer = await this.fileStore.read(path); return Tx.fromBuffer(buffer); @@ -63,8 +65,9 @@ export class FileStoreTxSource implements TxSource { */ export async function createFileStoreTxSources( urls: string[], + basePath: string, log: Logger = createLogger('p2p:file_store_tx_source'), ): Promise { - const sources = await Promise.all(urls.map(url => FileStoreTxSource.create(url, log))); + const sources = await Promise.all(urls.map(url => FileStoreTxSource.create(url, basePath, log))); return sources.filter((s): s is FileStoreTxSource => s !== undefined); } diff --git a/yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts b/yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts index 3a35574afc7f..e14f47a2b6dd 100644 --- a/yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts +++ b/yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts @@ -19,6 +19,7 @@ describe('TxFileStore', () => { let config: TxFileStoreConfig; let txFileStore: TxFileStore | undefined; const log = createLogger('test:tx_file_store'); + const basePath = 'aztec-1-1-0x1234'; const makeTx = async () => { const tx = Tx.random(); @@ -29,7 +30,7 @@ describe('TxFileStore', () => { /** Counts files in the txs subdirectory of the temp directory. */ async function countUploadedFiles(): Promise { try { - const files = await readdir(join(tmpDir, 'txs')); + const files = await readdir(join(tmpDir, basePath, 'txs')); return files.length; } catch { return 0; @@ -43,7 +44,7 @@ describe('TxFileStore', () => { beforeEach(async () => { // Clean up any files from previous test try { - await rm(join(tmpDir, 'txs'), { recursive: true, force: true }); + await rm(join(tmpDir, basePath), { recursive: true, force: true }); } catch { // Directory might not exist } @@ -73,25 +74,25 @@ describe('TxFileStore', () => { describe('create', () => { it('returns undefined when disabled', async () => { config.txFileStoreEnabled = false; - const result = await TxFileStore.create(txPool, config, log, undefined, fileStore); + const result = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore); expect(result).toBeUndefined(); }); it('returns undefined when upload URL is not configured', async () => { config.txFileStoreUrl = undefined; - const result = await TxFileStore.create(txPool, config, log, undefined, fileStore); + const result = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore); expect(result).toBeUndefined(); }); it('creates file store when enabled and configured', async () => { - txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore); + txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore); expect(txFileStore).toBeDefined(); }); }); describe('start/stop', () => { it('subscribes to txs-added event on start', async () => { - txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore); + txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore); txFileStore!.start(); const spy = jest.spyOn(fileStore, 'save'); @@ -101,13 +102,15 @@ describe('TxFileStore', () => { await txFileStore!.flush(); - expect(spy).toHaveBeenCalledWith(`txs/${tx.getTxHash().toString()}.bin`, tx.toBuffer(), { compress: false }); + expect(spy).toHaveBeenCalledWith(`${basePath}/txs/${tx.getTxHash().toString()}.bin`, tx.toBuffer(), { + compress: false, + }); spy.mockRestore(); }); it('unsubscribes from txs-added event on stop', async () => { - txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore); + txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore); txFileStore!.start(); const spy = jest.spyOn(fileStore, 'save'); @@ -134,7 +137,7 @@ describe('TxFileStore', () => { describe('tx upload', () => { it('uploads tx when txs-added event fires', async () => { - txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore); + txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore); txFileStore!.start(); const spy = jest.spyOn(fileStore, 'save'); @@ -144,13 +147,15 @@ describe('TxFileStore', () => { await txFileStore!.flush(); - expect(spy).toHaveBeenCalledWith(`txs/${tx.getTxHash().toString()}.bin`, tx.toBuffer(), { compress: false }); + expect(spy).toHaveBeenCalledWith(`${basePath}/txs/${tx.getTxHash().toString()}.bin`, tx.toBuffer(), { + compress: false, + }); spy.mockRestore(); }); it('uploads multiple txs', async () => { - txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore); + txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore); txFileStore!.start(); const spy = jest.spyOn(fileStore, 'save'); @@ -169,7 +174,7 @@ describe('TxFileStore', () => { it('respects concurrency limit', async () => { config.txFileStoreUploadConcurrency = 10; config.txFileStoreMaxQueueSize = 100; // Increase to accommodate 20 txs - txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore); + txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore); txFileStore!.start(); let activeCalls = 0; @@ -204,7 +209,7 @@ describe('TxFileStore', () => { }); it('skips duplicate tx uploads', async () => { - txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore); + txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore); txFileStore!.start(); const spy = jest.spyOn(fileStore, 'save'); @@ -227,7 +232,7 @@ describe('TxFileStore', () => { it('drops oldest txs when queue exceeds max size', async () => { config.txFileStoreUploadConcurrency = 1; config.txFileStoreMaxQueueSize = 2; - txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore); + txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore); txFileStore!.start(); const spy = jest.spyOn(fileStore, 'save'); @@ -251,7 +256,7 @@ describe('TxFileStore', () => { describe('error handling', () => { it('retries on transient failures', async () => { - txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore); + txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore); txFileStore!.start(); const originalSave = fileStore.save.bind(fileStore); @@ -276,7 +281,7 @@ describe('TxFileStore', () => { it('continues processing after exhausting retries', async () => { // Use concurrency=1 to ensure sequential processing for predictable retry behavior config.txFileStoreUploadConcurrency = 1; - txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore); + txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore); txFileStore!.start(); const originalSave = fileStore.save.bind(fileStore); @@ -305,7 +310,7 @@ describe('TxFileStore', () => { describe('getPendingUploadCount', () => { it('returns correct count of pending uploads', async () => { - txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore); + txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore); txFileStore!.start(); expect(txFileStore!.getPendingUploadCount()).toBe(0); diff --git a/yarn-project/p2p/src/services/tx_file_store/tx_file_store.ts b/yarn-project/p2p/src/services/tx_file_store/tx_file_store.ts index 13ea96d8621f..672bdd6d40cc 100644 --- a/yarn-project/p2p/src/services/tx_file_store/tx_file_store.ts +++ b/yarn-project/p2p/src/services/tx_file_store/tx_file_store.ts @@ -31,6 +31,7 @@ export class TxFileStore { private readonly config: TxFileStoreConfig, private readonly instrumentation: TxFileStoreInstrumentation, private readonly log: Logger, + private readonly basePath: string, ) { this.handleTxsAdded = (args: { txs: Tx[]; source?: string }) => { this.enqueueTxs(args.txs); @@ -50,6 +51,7 @@ export class TxFileStore { static async create( txPool: TxPoolV2, config: TxFileStoreConfig, + basePath: string, log: Logger = createLogger('p2p:tx_file_store'), telemetry: TelemetryClient = getTelemetryClient(), fileStoreOverride?: FileStore, @@ -71,8 +73,8 @@ export class TxFileStore { } const instrumentation = new TxFileStoreInstrumentation(telemetry, 'TxFileStore'); - log.info('Created tx file store', { url: config.txFileStoreUrl }); - return new TxFileStore(fileStore, txPool, config, instrumentation, log); + log.info('Created tx file store', { url: config.txFileStoreUrl, basePath }); + return new TxFileStore(fileStore, txPool, config, instrumentation, log, basePath); } /** Starts listening to TxPool events and uploading txs. */ @@ -122,7 +124,7 @@ export class TxFileStore { private async uploadTx(tx: Tx): Promise { const txHash = tx.getTxHash().toString(); - const path = `txs/${txHash}.bin`; + const path = `${this.basePath}/txs/${txHash}.bin`; const timer = new Timer(); if (this.recentUploads.has(txHash)) {