Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion spartan/environments/next-net.env
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ ETHERSCAN_API_KEY=REPLACE_WITH_GCP_SECRET
DEPLOY_INTERNAL_BOOTNODE=true
STORE_SNAPSHOT_URL=
BLOB_BUCKET_DIRECTORY=${BLOB_BUCKET_DIRECTORY:-next-net/blobs}
TX_FILE_STORE_ENABLED=true
TX_FILE_STORE_BUCKET_DIRECTORY=${TX_FILE_STORE_BUCKET_DIRECTORY:-next-net/txs}
TX_COLLECTION_FILE_STORE_URLS="https://aztec-labs-snapshots.com/${TX_FILE_STORE_BUCKET_DIRECTORY}"
R2_ACCESS_KEY_ID=REPLACE_WITH_GCP_SECRET
R2_SECRET_ACCESS_KEY=REPLACE_WITH_GCP_SECRET
PROVER_FAILED_PROOF_STORE=gs://aztec-develop/next-net/failed-proofs
Expand Down Expand Up @@ -56,4 +59,4 @@ RPC_INGRESS_STATIC_IP_NAME=nextnet-rpc-ip
RPC_INGRESS_SSL_CERT_NAMES='["nextnet-rpc-cert"]'

VALIDATOR_HA_REPLICAS=1
VALIDATOR_RESOURCE_PROFILE="prod-spot"
VALIDATOR_RESOURCE_PROFILE="prod-spot"
13 changes: 13 additions & 0 deletions spartan/scripts/deploy_network.sh
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,16 @@ else
BLOB_FILE_STORE_UPLOAD_URL_TF="null"
fi

# TX filestore configuration
TX_FILE_STORE_ENABLED=${TX_FILE_STORE_ENABLED:-false}
TX_FILE_STORE_URL_TF=""
if [[ -n "${TX_FILE_STORE_URL:-}" ]]; then
TX_FILE_STORE_URL_TF="\"$TX_FILE_STORE_URL\""
else
TX_FILE_STORE_URL_TF="null"
fi
TX_COLLECTION_FILE_STORE_URLS=${TX_COLLECTION_FILE_STORE_URLS:-}

P2P_GOSSIPSUB_D=${P2P_GOSSIPSUB_D:-6}
P2P_GOSSIPSUB_DLO=${P2P_GOSSIPSUB_DLO:-4}
P2P_GOSSIPSUB_DHI=${P2P_GOSSIPSUB_DHI:-12}
Expand Down Expand Up @@ -546,6 +556,9 @@ PROVER_L1_PRIORITY_FEE_BUMP_PERCENTAGE = ${PROVER_L1_PRIORITY_FEE_BUMP_PERCENTAG
PROVER_L1_PRIORITY_FEE_RETRY_BUMP_PERCENTAGE = ${PROVER_L1_PRIORITY_FEE_RETRY_BUMP_PERCENTAGE:-null}
BLOB_ALLOW_EMPTY_SOURCES = ${BLOB_ALLOW_EMPTY_SOURCES:-false}
BLOB_FILE_STORE_UPLOAD_URL = ${BLOB_FILE_STORE_UPLOAD_URL_TF}
TX_FILE_STORE_ENABLED = ${TX_FILE_STORE_ENABLED}
TX_FILE_STORE_URL = ${TX_FILE_STORE_URL_TF}
TX_COLLECTION_FILE_STORE_URLS = "${TX_COLLECTION_FILE_STORE_URLS}"
DEBUG_P2P_INSTRUMENT_MESSAGES = ${DEBUG_P2P_INSTRUMENT_MESSAGES:-false}

PROVER_AGENT_INCLUDE_METRICS = "${PROVER_AGENT_INCLUDE_METRICS-null}"
Expand Down
8 changes: 8 additions & 0 deletions spartan/scripts/setup_gcp_secrets.sh
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,12 @@ if [[ -n "${BLOB_BUCKET_DIRECTORY:-}" ]]; then
export BLOB_FILE_STORE_UPLOAD_URL="s3://testnet-bucket/${BLOB_BUCKET_DIRECTORY}/?endpoint=https://${r2_account_id}.r2.cloudflarestorage.com"
fi

# Construct TX_FILE_STORE_URL from the r2-account-id secret and TX_FILE_STORE_BUCKET_DIRECTORY
if [[ -n "${TX_FILE_STORE_BUCKET_DIRECTORY:-}" ]]; then
secret_file=$(get_secret "r2-account-id")
mask_secret_value "TX_FILE_STORE_URL" "$secret_file"
r2_account_id=$(cat "$secret_file")
export TX_FILE_STORE_URL="s3://testnet-bucket/${TX_FILE_STORE_BUCKET_DIRECTORY}/?endpoint=https://${r2_account_id}.r2.cloudflarestorage.com"
fi

echo "Successfully set up GCP secrets for $NETWORK"
7 changes: 7 additions & 0 deletions spartan/terraform/deploy-aztec-infra/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ locals {
"validator.node.env.P2P_DROP_TX" = var.P2P_DROP_TX
"validator.node.env.P2P_DROP_TX_CHANCE" = var.P2P_DROP_TX_CHANCE
"validator.node.env.WS_NUM_HISTORIC_BLOCKS" = var.WS_NUM_HISTORIC_BLOCKS
"validator.node.env.TX_COLLECTION_FILE_STORE_URLS" = var.TX_COLLECTION_FILE_STORE_URLS
}

# Note: nonsensitive() is required here because helm_releases is used in for_each,
Expand Down Expand Up @@ -357,6 +358,7 @@ locals {
"node.node.env.P2P_DROP_TX" = var.P2P_DROP_TX
"node.node.env.P2P_DROP_TX_CHANCE" = var.P2P_DROP_TX_CHANCE
"node.node.env.WS_NUM_HISTORIC_BLOCKS" = var.WS_NUM_HISTORIC_BLOCKS
"node.node.env.TX_COLLECTION_FILE_STORE_URLS" = var.TX_COLLECTION_FILE_STORE_URLS
"node.service.p2p.nodePortEnabled" = var.P2P_NODEPORT_ENABLED
"node.service.p2p.announcePort" = local.p2p_port_prover
"node.service.p2p.port" = local.p2p_port_prover
Expand Down Expand Up @@ -436,6 +438,9 @@ locals {
"node.env.P2P_DROP_TX" = var.P2P_DROP_TX
"node.env.P2P_DROP_TX_CHANCE" = var.P2P_DROP_TX_CHANCE
"node.env.WS_NUM_HISTORIC_BLOCKS" = var.WS_NUM_HISTORIC_BLOCKS
"node.env.TX_FILE_STORE_ENABLED" = var.TX_FILE_STORE_ENABLED
"node.env.TX_FILE_STORE_URL" = var.TX_FILE_STORE_URL
"node.env.TX_COLLECTION_FILE_STORE_URLS" = var.TX_COLLECTION_FILE_STORE_URLS
},
# Only set RPC mnemonic config in fisherman mode)
var.FISHERMAN_MODE ? {
Expand Down Expand Up @@ -490,6 +495,7 @@ locals {
"node.env.P2P_DROP_TX" = var.P2P_DROP_TX
"node.env.P2P_DROP_TX_CHANCE" = var.P2P_DROP_TX_CHANCE
"node.env.WS_NUM_HISTORIC_BLOCKS" = var.WS_NUM_HISTORIC_BLOCKS
"node.env.TX_COLLECTION_FILE_STORE_URLS" = var.TX_COLLECTION_FILE_STORE_URLS
}
boot_node_host_path = "node.env.BOOT_NODE_HOST"
bootstrap_nodes_path = "node.env.BOOTSTRAP_NODES"
Expand Down Expand Up @@ -529,6 +535,7 @@ locals {
"node.env.P2P_DROP_TX" = var.P2P_DROP_TX
"node.env.P2P_DROP_TX_CHANCE" = var.P2P_DROP_TX_CHANCE
"node.env.WS_NUM_HISTORIC_BLOCKS" = var.WS_NUM_HISTORIC_BLOCKS
"node.env.TX_COLLECTION_FILE_STORE_URLS" = var.TX_COLLECTION_FILE_STORE_URLS
}
boot_node_host_path = "node.env.BOOT_NODE_HOST"
bootstrap_nodes_path = "node.env.BOOTSTRAP_NODES"
Expand Down
19 changes: 19 additions & 0 deletions spartan/terraform/deploy-aztec-infra/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,25 @@ variable "BLOB_FILE_STORE_UPLOAD_URL" {
default = null
}

variable "TX_FILE_STORE_ENABLED" {
description = "Whether to enable uploading transactions to file storage"
type = bool
default = false
}

variable "TX_FILE_STORE_URL" {
description = "URL for uploading transactions (e.g., s3://bucket/path/, gs://bucket/path/)"
type = string
nullable = true
default = null
}

variable "TX_COLLECTION_FILE_STORE_URLS" {
description = "Comma-separated URLs for reading transactions from file storage"
type = string
default = ""
}

variable "PROVER_AGENT_POLL_INTERVAL_MS" {
description = "Interval in milliseconds between prover agent polls"
type = number
Expand Down
12 changes: 11 additions & 1 deletion yarn-project/p2p/src/client/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ export async function createP2PClient<T extends P2PClientType>(
const attestationStore = await createStore(P2P_ATTESTATION_STORE_NAME, 1, config, bindings);
const l1Constants = await archiver.getL1Constants();

const rollupAddress = inputConfig.l1Contracts.rollupAddress.toString().toLowerCase().replace(/^0x/, '');
const txFileStoreBasePath = `aztec-${inputConfig.l1ChainId}-${inputConfig.rollupVersion}-0x${rollupAddress}`;

/** Validator factory for pool re-validation (double-spend + block header only). */
const createPoolTxValidator = async () => {
await worldStateSynchronizer.syncImmediate();
Expand Down Expand Up @@ -154,6 +157,7 @@ export async function createP2PClient<T extends P2PClientType>(

const fileStoreSources = await createFileStoreTxSources(
config.txCollectionFileStoreUrls,
txFileStoreBasePath,
logger.createChild('file-store-tx-source'),
);
if (fileStoreSources.length > 0) {
Expand All @@ -174,7 +178,13 @@ export async function createP2PClient<T extends P2PClientType>(
logger.createChild('tx-collection'),
);

const txFileStore = await TxFileStore.create(mempools.txPool, config, logger.createChild('tx-file-store'), telemetry);
const txFileStore = await TxFileStore.create(
mempools.txPool,
config,
txFileStoreBasePath,
logger.createChild('tx-file-store'),
telemetry,
);

return new P2PClient(
clientType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export class FileStoreTxSource implements TxSource {
private constructor(
private readonly fileStore: ReadOnlyFileStore,
private readonly baseUrl: string,
private readonly basePath: string,
private readonly log: Logger,
) {}

Expand All @@ -20,6 +21,7 @@ export class FileStoreTxSource implements TxSource {
*/
public static async create(
url: string,
basePath: string,
log: Logger = createLogger('p2p:file_store_tx_source'),
): Promise<FileStoreTxSource | undefined> {
try {
Expand All @@ -28,7 +30,7 @@ export class FileStoreTxSource implements TxSource {
log.warn(`Failed to create file store for URL: ${url}`);
return undefined;
}
return new FileStoreTxSource(fileStore, url, log);
return new FileStoreTxSource(fileStore, url, basePath, log);
} catch (err) {
log.warn(`Error creating file store for URL: ${url}`, { error: err });
return undefined;
Expand All @@ -42,7 +44,7 @@ export class FileStoreTxSource implements TxSource {
public getTxsByHash(txHashes: TxHash[]): Promise<(Tx | undefined)[]> {
return Promise.all(
txHashes.map(async txHash => {
const path = `txs/${txHash.toString()}.bin`;
const path = `${this.basePath}/txs/${txHash.toString()}.bin`;
try {
const buffer = await this.fileStore.read(path);
return Tx.fromBuffer(buffer);
Expand All @@ -63,8 +65,9 @@ export class FileStoreTxSource implements TxSource {
*/
export async function createFileStoreTxSources(
urls: string[],
basePath: string,
log: Logger = createLogger('p2p:file_store_tx_source'),
): Promise<FileStoreTxSource[]> {
const sources = await Promise.all(urls.map(url => FileStoreTxSource.create(url, log)));
const sources = await Promise.all(urls.map(url => FileStoreTxSource.create(url, basePath, log)));
return sources.filter((s): s is FileStoreTxSource => s !== undefined);
}
39 changes: 22 additions & 17 deletions yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ describe('TxFileStore', () => {
let config: TxFileStoreConfig;
let txFileStore: TxFileStore | undefined;
const log = createLogger('test:tx_file_store');
const basePath = 'aztec-1-1-0x1234';

const makeTx = async () => {
const tx = Tx.random();
Expand All @@ -29,7 +30,7 @@ describe('TxFileStore', () => {
/** Counts files in the txs subdirectory of the temp directory. */
async function countUploadedFiles(): Promise<number> {
try {
const files = await readdir(join(tmpDir, 'txs'));
const files = await readdir(join(tmpDir, basePath, 'txs'));
return files.length;
} catch {
return 0;
Expand All @@ -43,7 +44,7 @@ describe('TxFileStore', () => {
beforeEach(async () => {
// Clean up any files from previous test
try {
await rm(join(tmpDir, 'txs'), { recursive: true, force: true });
await rm(join(tmpDir, basePath), { recursive: true, force: true });
} catch {
// Directory might not exist
}
Expand Down Expand Up @@ -73,25 +74,25 @@ describe('TxFileStore', () => {
describe('create', () => {
it('returns undefined when disabled', async () => {
config.txFileStoreEnabled = false;
const result = await TxFileStore.create(txPool, config, log, undefined, fileStore);
const result = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore);
expect(result).toBeUndefined();
});

it('returns undefined when upload URL is not configured', async () => {
config.txFileStoreUrl = undefined;
const result = await TxFileStore.create(txPool, config, log, undefined, fileStore);
const result = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore);
expect(result).toBeUndefined();
});

it('creates file store when enabled and configured', async () => {
txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore);
txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore);
expect(txFileStore).toBeDefined();
});
});

describe('start/stop', () => {
it('subscribes to txs-added event on start', async () => {
txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore);
txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore);
txFileStore!.start();

const spy = jest.spyOn(fileStore, 'save');
Expand All @@ -101,13 +102,15 @@ describe('TxFileStore', () => {

await txFileStore!.flush();

expect(spy).toHaveBeenCalledWith(`txs/${tx.getTxHash().toString()}.bin`, tx.toBuffer(), { compress: false });
expect(spy).toHaveBeenCalledWith(`${basePath}/txs/${tx.getTxHash().toString()}.bin`, tx.toBuffer(), {
compress: false,
});

spy.mockRestore();
});

it('unsubscribes from txs-added event on stop', async () => {
txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore);
txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore);
txFileStore!.start();

const spy = jest.spyOn(fileStore, 'save');
Expand All @@ -134,7 +137,7 @@ describe('TxFileStore', () => {

describe('tx upload', () => {
it('uploads tx when txs-added event fires', async () => {
txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore);
txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore);
txFileStore!.start();

const spy = jest.spyOn(fileStore, 'save');
Expand All @@ -144,13 +147,15 @@ describe('TxFileStore', () => {

await txFileStore!.flush();

expect(spy).toHaveBeenCalledWith(`txs/${tx.getTxHash().toString()}.bin`, tx.toBuffer(), { compress: false });
expect(spy).toHaveBeenCalledWith(`${basePath}/txs/${tx.getTxHash().toString()}.bin`, tx.toBuffer(), {
compress: false,
});

spy.mockRestore();
});

it('uploads multiple txs', async () => {
txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore);
txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore);
txFileStore!.start();

const spy = jest.spyOn(fileStore, 'save');
Expand All @@ -169,7 +174,7 @@ describe('TxFileStore', () => {
it('respects concurrency limit', async () => {
config.txFileStoreUploadConcurrency = 10;
config.txFileStoreMaxQueueSize = 100; // Increase to accommodate 20 txs
txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore);
txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore);
txFileStore!.start();

let activeCalls = 0;
Expand Down Expand Up @@ -204,7 +209,7 @@ describe('TxFileStore', () => {
});

it('skips duplicate tx uploads', async () => {
txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore);
txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore);
txFileStore!.start();

const spy = jest.spyOn(fileStore, 'save');
Expand All @@ -227,7 +232,7 @@ describe('TxFileStore', () => {
it('drops oldest txs when queue exceeds max size', async () => {
config.txFileStoreUploadConcurrency = 1;
config.txFileStoreMaxQueueSize = 2;
txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore);
txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore);
txFileStore!.start();

const spy = jest.spyOn(fileStore, 'save');
Expand All @@ -251,7 +256,7 @@ describe('TxFileStore', () => {

describe('error handling', () => {
it('retries on transient failures', async () => {
txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore);
txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore);
txFileStore!.start();

const originalSave = fileStore.save.bind(fileStore);
Expand All @@ -276,7 +281,7 @@ describe('TxFileStore', () => {
it('continues processing after exhausting retries', async () => {
// Use concurrency=1 to ensure sequential processing for predictable retry behavior
config.txFileStoreUploadConcurrency = 1;
txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore);
txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore);
txFileStore!.start();

const originalSave = fileStore.save.bind(fileStore);
Expand Down Expand Up @@ -305,7 +310,7 @@ describe('TxFileStore', () => {

describe('getPendingUploadCount', () => {
it('returns correct count of pending uploads', async () => {
txFileStore = await TxFileStore.create(txPool, config, log, undefined, fileStore);
txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore);
txFileStore!.start();

expect(txFileStore!.getPendingUploadCount()).toBe(0);
Expand Down
8 changes: 5 additions & 3 deletions yarn-project/p2p/src/services/tx_file_store/tx_file_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export class TxFileStore {
private readonly config: TxFileStoreConfig,
private readonly instrumentation: TxFileStoreInstrumentation,
private readonly log: Logger,
private readonly basePath: string,
) {
this.handleTxsAdded = (args: { txs: Tx[]; source?: string }) => {
this.enqueueTxs(args.txs);
Expand All @@ -50,6 +51,7 @@ export class TxFileStore {
static async create(
txPool: TxPoolV2,
config: TxFileStoreConfig,
basePath: string,
log: Logger = createLogger('p2p:tx_file_store'),
telemetry: TelemetryClient = getTelemetryClient(),
fileStoreOverride?: FileStore,
Expand All @@ -71,8 +73,8 @@ export class TxFileStore {
}

const instrumentation = new TxFileStoreInstrumentation(telemetry, 'TxFileStore');
log.info('Created tx file store', { url: config.txFileStoreUrl });
return new TxFileStore(fileStore, txPool, config, instrumentation, log);
log.info('Created tx file store', { url: config.txFileStoreUrl, basePath });
return new TxFileStore(fileStore, txPool, config, instrumentation, log, basePath);
}

/** Starts listening to TxPool events and uploading txs. */
Expand Down Expand Up @@ -122,7 +124,7 @@ export class TxFileStore {

private async uploadTx(tx: Tx): Promise<void> {
const txHash = tx.getTxHash().toString();
const path = `txs/${txHash}.bin`;
const path = `${this.basePath}/txs/${txHash}.bin`;
const timer = new Timer();

if (this.recentUploads.has(txHash)) {
Expand Down
Loading